/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.RichFoldFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.operators.StreamGroupedFold;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.junit.Assert;
import org.junit.Test;

public class StreamGroupedFoldTest {
    @Test
    public void testGroupedFold() throws Exception {
        KeySelector<Integer, String> keySelector = new KeySelector<Integer, String>(){

            public String getKey(Integer value) {
                return value.toString();
            }
        };
        StreamGroupedFold operator = new StreamGroupedFold((FoldFunction)new MyFolder(), (Object)"100");
        operator.setOutputType((TypeInformation)BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig());
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, keySelector, (TypeInformation<String>)BasicTypeInfo.STRING_TYPE_INFO);
        long initialTime = 0L;
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)1, initialTime + 1L));
        testHarness.processElement(new StreamRecord((Object)1, initialTime + 2L));
        testHarness.processWatermark(new Watermark(initialTime + 2L));
        testHarness.processElement(new StreamRecord((Object)2, initialTime + 3L));
        testHarness.processElement(new StreamRecord((Object)2, initialTime + 4L));
        testHarness.processElement(new StreamRecord((Object)3, initialTime + 5L));
        expectedOutput.add(new StreamRecord((Object)"1001", initialTime + 1L));
        expectedOutput.add(new StreamRecord((Object)"10011", initialTime + 2L));
        expectedOutput.add(new Watermark(initialTime + 2L));
        expectedOutput.add(new StreamRecord((Object)"1002", initialTime + 3L));
        expectedOutput.add(new StreamRecord((Object)"10022", initialTime + 4L));
        expectedOutput.add(new StreamRecord((Object)"1003", initialTime + 5L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    }

    @Test
    public void testOpenClose() throws Exception {
        KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>(){

            public Integer getKey(Integer value) {
                return value;
            }
        };
        StreamGroupedFold operator = new StreamGroupedFold((FoldFunction)new TestOpenCloseFoldFunction(), (Object)"init");
        operator.setOutputType((TypeInformation)BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig());
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, keySelector, (TypeInformation<Integer>)BasicTypeInfo.INT_TYPE_INFO);
        long initialTime = 0L;
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)1, initialTime));
        testHarness.processElement(new StreamRecord((Object)2, initialTime));
        testHarness.close();
        Assert.assertTrue((String)"RichFunction methods where not called.", (boolean)TestOpenCloseFoldFunction.closeCalled);
        Assert.assertTrue((String)"Output contains no elements.", (testHarness.getOutput().size() > 0 ? 1 : 0) != 0);
    }

    private static class TestOpenCloseFoldFunction
    extends RichFoldFunction<Integer, String> {
        private static final long serialVersionUID = 1L;
        public static boolean openCalled = false;
        public static boolean closeCalled = false;

        private TestOpenCloseFoldFunction() {
        }

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            if (closeCalled) {
                Assert.fail((String)"Close called before open.");
            }
            openCalled = true;
        }

        public void close() throws Exception {
            super.close();
            if (!openCalled) {
                Assert.fail((String)"Open was not called before close.");
            }
            closeCalled = true;
        }

        public String fold(String acc, Integer in) throws Exception {
            if (!openCalled) {
                Assert.fail((String)"Open was not called before run.");
            }
            return acc + in;
        }
    }

    private static class MyFolder
    implements FoldFunction<Integer, String> {
        private MyFolder() {
        }

        public String fold(String accumulator, Integer value) throws Exception {
            return accumulator + value.toString();
        }
    }
}

