/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.tasks.AcknowledgeStreamMockEnvironment;
import org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;

public class OneInputStreamTaskTest
extends TestLogger {
    private static final ListStateDescriptor<Integer> TEST_DESCRIPTOR = new ListStateDescriptor("test", (TypeSerializer)new IntSerializer());

    @Test
    public void testOpenCloseAndTimestamps() throws Exception {
        OneInputStreamTask mapTask = new OneInputStreamTask();
        OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        StreamMap mapOperator = new StreamMap((MapFunction)new TestOpenCloseMapFunction());
        streamConfig.setStreamOperator((StreamOperator)mapOperator);
        streamConfig.setOperatorID(new OperatorID());
        long initialTime = 0L;
        ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        testHarness.processElement(new StreamRecord((Object)"Hello", initialTime + 1L));
        testHarness.processElement(new StreamRecord((Object)"Ciao", initialTime + 2L));
        expectedOutput.add(new StreamRecord((Object)"Hello", initialTime + 1L));
        expectedOutput.add(new StreamRecord((Object)"Ciao", initialTime + 2L));
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        Assert.assertTrue((String)"RichFunction methods where not called.", (boolean)TestOpenCloseMapFunction.closeCalled);
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    }

    @Test
    public void testWatermarkAndStreamStatusForwarding() throws Exception {
        OneInputStreamTask mapTask = new OneInputStreamTask();
        OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(mapTask, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        StreamMap mapOperator = new StreamMap((MapFunction)new IdentityMap());
        streamConfig.setStreamOperator((StreamOperator)mapOperator);
        streamConfig.setOperatorID(new OperatorID());
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        long initialTime = 0L;
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        testHarness.processElement(new Watermark(initialTime), 0, 0);
        testHarness.processElement(new Watermark(initialTime), 0, 1);
        testHarness.processElement(new Watermark(initialTime), 1, 0);
        testHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(new Watermark(initialTime), 1, 1);
        testHarness.waitForInputProcessing();
        expectedOutput.add(new Watermark(initialTime));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(new StreamRecord((Object)"Hello", initialTime));
        testHarness.processElement(new StreamRecord((Object)"Ciao", initialTime));
        expectedOutput.add(new StreamRecord((Object)"Hello", initialTime));
        expectedOutput.add(new StreamRecord((Object)"Ciao", initialTime));
        testHarness.processElement(new Watermark(initialTime + 4L), 0, 0);
        testHarness.processElement(new Watermark(initialTime + 3L), 0, 1);
        testHarness.processElement(new Watermark(initialTime + 3L), 1, 0);
        testHarness.processElement(new Watermark(initialTime + 2L), 1, 1);
        testHarness.waitForInputProcessing();
        expectedOutput.add(new Watermark(initialTime + 2L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(new Watermark(initialTime + 4L), 1, 1);
        testHarness.waitForInputProcessing();
        expectedOutput.add(new Watermark(initialTime + 3L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(new Watermark(initialTime + 4L), 0, 1);
        testHarness.processElement(new Watermark(initialTime + 4L), 1, 0);
        testHarness.waitForInputProcessing();
        expectedOutput.add(new Watermark(initialTime + 4L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamStatus.IDLE, 0, 1);
        testHarness.processElement(StreamStatus.IDLE, 1, 0);
        testHarness.processElement(new Watermark(initialTime + 6L), 0, 0);
        testHarness.processElement(new Watermark(initialTime + 5L), 1, 1);
        testHarness.processElement(StreamStatus.IDLE, 1, 1);
        testHarness.waitForInputProcessing();
        expectedOutput.add(new Watermark(initialTime + 5L));
        expectedOutput.add(new Watermark(initialTime + 6L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamStatus.IDLE, 0, 0);
        testHarness.waitForInputProcessing();
        expectedOutput.add(StreamStatus.IDLE);
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamStatus.ACTIVE, 1, 0);
        testHarness.processElement(StreamStatus.ACTIVE, 0, 1);
        testHarness.waitForInputProcessing();
        expectedOutput.add(StreamStatus.ACTIVE);
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        List resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
        Assert.assertEquals((long)2L, (long)resultElements.size());
    }

    @Test
    public void testWatermarksNotForwardedWithinChainWhenIdle() throws Exception {
        OneInputStreamTask testTask = new OneInputStreamTask();
        OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(testTask, 1, 1, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        TriggerableFailOnWatermarkTestOperator headOperator = new TriggerableFailOnWatermarkTestOperator();
        WatermarkGeneratingTestOperator watermarkOperator = new WatermarkGeneratingTestOperator();
        TriggerableFailOnWatermarkTestOperator tailOperator = new TriggerableFailOnWatermarkTestOperator();
        testHarness.setupOperatorChain(new OperatorID(42L, 42L), headOperator).chain(new OperatorID(4711L, 42L), watermarkOperator, StringSerializer.INSTANCE).chain(new OperatorID(123L, 123L), tailOperator, StringSerializer.INSTANCE).finish();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        testHarness.processElement(new StreamRecord((Object)"EXPECT_WATERMARKS"));
        testHarness.processElement(new StreamRecord((Object)"10"), 0, 0);
        testHarness.processElement(new Watermark(15L));
        testHarness.processElement(new StreamRecord((Object)"20"), 0, 0);
        testHarness.processElement(new StreamRecord((Object)"30"), 0, 0);
        testHarness.waitForInputProcessing();
        expectedOutput.add(new StreamRecord((Object)"EXPECT_WATERMARKS"));
        expectedOutput.add(new StreamRecord((Object)"10"));
        expectedOutput.add(new Watermark(10L));
        expectedOutput.add(new StreamRecord((Object)"20"));
        expectedOutput.add(new Watermark(20L));
        expectedOutput.add(new StreamRecord((Object)"30"));
        expectedOutput.add(new Watermark(30L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamStatus.IDLE);
        testHarness.processElement(new StreamRecord((Object)"NO_WATERMARKS"));
        testHarness.processElement(new StreamRecord((Object)"40"), 0, 0);
        testHarness.processElement(new StreamRecord((Object)"50"), 0, 0);
        testHarness.processElement(new StreamRecord((Object)"60"), 0, 0);
        testHarness.processElement(new Watermark(65L));
        testHarness.waitForInputProcessing();
        expectedOutput.add(StreamStatus.IDLE);
        expectedOutput.add(new StreamRecord((Object)"NO_WATERMARKS"));
        expectedOutput.add(new StreamRecord((Object)"40"));
        expectedOutput.add(new StreamRecord((Object)"50"));
        expectedOutput.add(new StreamRecord((Object)"60"));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamStatus.ACTIVE);
        testHarness.processElement(new StreamRecord((Object)"EXPECT_WATERMARKS"));
        testHarness.processElement(new StreamRecord((Object)"70"), 0, 0);
        testHarness.processElement(new StreamRecord((Object)"80"), 0, 0);
        testHarness.processElement(new StreamRecord((Object)"90"), 0, 0);
        testHarness.waitForInputProcessing();
        expectedOutput.add(StreamStatus.ACTIVE);
        expectedOutput.add(new StreamRecord((Object)"EXPECT_WATERMARKS"));
        expectedOutput.add(new StreamRecord((Object)"70"));
        expectedOutput.add(new Watermark(70L));
        expectedOutput.add(new StreamRecord((Object)"80"));
        expectedOutput.add(new Watermark(80L));
        expectedOutput.add(new StreamRecord((Object)"90"));
        expectedOutput.add(new Watermark(90L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        List resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
        Assert.assertEquals((long)12L, (long)resultElements.size());
    }

    @Test
    public void testCheckpointBarriers() throws Exception {
        OneInputStreamTask mapTask = new OneInputStreamTask();
        OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(mapTask, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        StreamMap mapOperator = new StreamMap((MapFunction)new IdentityMap());
        streamConfig.setStreamOperator((StreamOperator)mapOperator);
        streamConfig.setOperatorID(new OperatorID());
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        long initialTime = 0L;
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpoint()), 0, 0);
        testHarness.processElement(new StreamRecord((Object)"Hello-0-0", initialTime), 0, 0);
        testHarness.processElement(new StreamRecord((Object)"Ciao-0-0", initialTime), 0, 0);
        testHarness.processElement(new StreamRecord((Object)"Hello-1-1", initialTime), 1, 1);
        testHarness.processElement(new StreamRecord((Object)"Ciao-1-1", initialTime), 1, 1);
        expectedOutput.add(new StreamRecord((Object)"Hello-1-1", initialTime));
        expectedOutput.add(new StreamRecord((Object)"Ciao-1-1", initialTime));
        testHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpoint()), 0, 1);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpoint()), 1, 0);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpoint()), 1, 1);
        testHarness.waitForInputProcessing();
        expectedOutput.add(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpoint()));
        expectedOutput.add(new StreamRecord((Object)"Hello-0-0", initialTime));
        expectedOutput.add(new StreamRecord((Object)"Ciao-0-0", initialTime));
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    }

    @Test
    public void testOvertakingCheckpointBarriers() throws Exception {
        OneInputStreamTask mapTask = new OneInputStreamTask();
        OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(mapTask, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        StreamMap mapOperator = new StreamMap((MapFunction)new IdentityMap());
        streamConfig.setStreamOperator((StreamOperator)mapOperator);
        streamConfig.setOperatorID(new OperatorID());
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        long initialTime = 0L;
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpoint()), 0, 0);
        testHarness.processElement(new StreamRecord((Object)"Hello-0-0", initialTime), 0, 0);
        testHarness.processElement(new StreamRecord((Object)"Ciao-0-0", initialTime), 0, 0);
        testHarness.processElement(new StreamRecord((Object)"Hello-1-1", initialTime), 1, 1);
        testHarness.processElement(new StreamRecord((Object)"Ciao-1-1", initialTime), 1, 1);
        expectedOutput.add(new StreamRecord((Object)"Hello-1-1", initialTime));
        expectedOutput.add(new StreamRecord((Object)"Ciao-1-1", initialTime));
        testHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpoint()), 0, 0);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpoint()), 0, 1);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpoint()), 1, 0);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpoint()), 1, 1);
        expectedOutput.add(new CancelCheckpointMarker(0L));
        expectedOutput.add(new StreamRecord((Object)"Hello-0-0", initialTime));
        expectedOutput.add(new StreamRecord((Object)"Ciao-0-0", initialTime));
        expectedOutput.add(new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpoint()));
        testHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpoint()), 0, 1);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpoint()), 1, 0);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpoint()), 1, 1);
        testHarness.waitForInputProcessing();
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    }

    @Test
    public void testSnapshottingAndRestoring() throws Exception {
        Deadline deadline = new FiniteDuration(2L, TimeUnit.MINUTES).fromNow();
        OneInputStreamTask streamTask = new OneInputStreamTask();
        OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(streamTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        IdentityKeySelector keySelector = new IdentityKeySelector();
        testHarness.configureForKeyedStream(keySelector, BasicTypeInfo.STRING_TYPE_INFO);
        long checkpointId = 1L;
        long checkpointTimestamp = 1L;
        int numberChainedTasks = 11;
        StreamConfig streamConfig = testHarness.getStreamConfig();
        this.configureChainedTestingStreamOperator(streamConfig, numberChainedTasks);
        AcknowledgeStreamMockEnvironment env = new AcknowledgeStreamMockEnvironment(testHarness.jobConfig, testHarness.taskConfig, testHarness.executionConfig, testHarness.memorySize, new MockInputSplitProvider(), testHarness.bufferSize);
        TestingStreamOperator.numberRestoreCalls = 0;
        testHarness.invoke(env);
        testHarness.waitForTaskRunning(deadline.timeLeft().toMillis());
        CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, checkpointTimestamp);
        while (!streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpoint())) {
        }
        Assert.assertEquals((long)0L, (long)TestingStreamOperator.numberRestoreCalls);
        env.getCheckpointLatch().await();
        Assert.assertEquals((long)checkpointId, (long)env.getCheckpointId());
        testHarness.endInput();
        testHarness.waitForTaskCompletion(deadline.timeLeft().toMillis());
        OneInputStreamTask restoredTask = new OneInputStreamTask();
        OneInputStreamTaskTestHarness restoredTaskHarness = new OneInputStreamTaskTestHarness(restoredTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        restoredTaskHarness.configureForKeyedStream(keySelector, BasicTypeInfo.STRING_TYPE_INFO);
        StreamConfig restoredTaskStreamConfig = restoredTaskHarness.getStreamConfig();
        this.configureChainedTestingStreamOperator(restoredTaskStreamConfig, numberChainedTasks);
        TaskStateSnapshot stateHandles = env.getCheckpointStateHandles();
        Assert.assertEquals((long)numberChainedTasks, (long)stateHandles.getSubtaskStateMappings().size());
        restoredTask.setInitialState(stateHandles);
        TestingStreamOperator.numberRestoreCalls = 0;
        restoredTaskHarness.invoke();
        restoredTaskHarness.endInput();
        restoredTaskHarness.waitForTaskCompletion(deadline.timeLeft().toMillis());
        Assert.assertEquals((long)numberChainedTasks, (long)TestingStreamOperator.numberRestoreCalls);
        TestingStreamOperator.numberRestoreCalls = 0;
    }

    @Test
    public void testQuiesceTimerServiceAfterOpClose() throws Exception {
        OneInputStreamTask task = new OneInputStreamTask();
        SystemProcessingTimeService timeService = new SystemProcessingTimeService((AsyncExceptionHandler)task, task.getCheckpointLock());
        task.setProcessingTimeService((ProcessingTimeService)timeService);
        Assert.assertTrue((boolean)((SystemProcessingTimeService)task.getProcessingTimeService()).isAlive());
        OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(task, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        streamConfig.setStreamOperator((StreamOperator)new TestOperator());
        streamConfig.setOperatorID(new OperatorID());
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        timeService.shutdownService();
    }

    private void configureChainedTestingStreamOperator(StreamConfig streamConfig, int numberChainedTasks) {
        Preconditions.checkArgument((numberChainedTasks >= 1 ? 1 : 0) != 0, (Object)"The operator chain must at least contain one operator.");
        TestingStreamOperator previousOperator = new TestingStreamOperator();
        streamConfig.setStreamOperator(previousOperator);
        streamConfig.setOperatorID(new OperatorID(0L, 0L));
        HashMap<Integer, StreamConfig> chainedTaskConfigs = new HashMap<Integer, StreamConfig>(numberChainedTasks - 1);
        ArrayList<StreamEdge> outputEdges = new ArrayList<StreamEdge>(numberChainedTasks - 1);
        for (int chainedIndex = 1; chainedIndex < numberChainedTasks; ++chainedIndex) {
            TestingStreamOperator chainedOperator = new TestingStreamOperator();
            StreamConfig chainedConfig = new StreamConfig(new Configuration());
            chainedConfig.setStreamOperator(chainedOperator);
            chainedConfig.setOperatorID(new OperatorID(0L, (long)chainedIndex));
            chainedTaskConfigs.put(chainedIndex, chainedConfig);
            StreamEdge outputEdge = new StreamEdge(new StreamNode(null, Integer.valueOf(chainedIndex - 1), null, null, null, null, null), new StreamNode(null, Integer.valueOf(chainedIndex), null, null, null, null, null), 0, Collections.emptyList(), null, null);
            outputEdges.add(outputEdge);
        }
        streamConfig.setChainedOutputs(outputEdges);
        streamConfig.setTransitiveChainedTaskConfigs(chainedTaskConfigs);
    }

    private static class TriggerableFailOnWatermarkTestOperator
    extends AbstractStreamOperator<String>
    implements OneInputStreamOperator<String, String> {
        private static final long serialVersionUID = 2048954179291813243L;
        public static final String EXPECT_FORWARDED_WATERMARKS_MARKER = "EXPECT_WATERMARKS";
        public static final String NO_FORWARDED_WATERMARKS_MARKER = "NO_WATERMARKS";
        protected boolean expectForwardedWatermarks;

        private TriggerableFailOnWatermarkTestOperator() {
        }

        public void processElement(StreamRecord<String> element) throws Exception {
            this.output.collect(element);
            if (((String)element.getValue()).equals(EXPECT_FORWARDED_WATERMARKS_MARKER)) {
                this.expectForwardedWatermarks = true;
            } else if (((String)element.getValue()).equals(NO_FORWARDED_WATERMARKS_MARKER)) {
                this.expectForwardedWatermarks = false;
            } else {
                this.handleElement(element);
            }
        }

        public void processWatermark(Watermark mark) throws Exception {
            if (!this.expectForwardedWatermarks) {
                throw new Exception("Received a " + mark + ", but this operator should not be forwarded watermarks.");
            }
            this.handleWatermark(mark);
        }

        protected void handleElement(StreamRecord<String> element) {
        }

        protected void handleWatermark(Watermark mark) {
            this.output.emitWatermark(mark);
        }
    }

    private static class WatermarkGeneratingTestOperator
    extends TriggerableFailOnWatermarkTestOperator {
        private static final long serialVersionUID = -5064871833244157221L;
        private long lastWatermark;

        private WatermarkGeneratingTestOperator() {
        }

        @Override
        protected void handleElement(StreamRecord<String> element) {
            long timestamp = Long.valueOf((String)element.getValue());
            if (timestamp > this.lastWatermark) {
                this.output.emitWatermark(new Watermark(timestamp));
                this.lastWatermark = timestamp;
            }
        }

        @Override
        protected void handleWatermark(Watermark mark) {
            if (mark.equals((Object)Watermark.MAX_WATERMARK)) {
                this.output.emitWatermark(mark);
                this.lastWatermark = Long.MAX_VALUE;
            }
        }
    }

    private static class IdentityMap
    implements MapFunction<String, String> {
        private static final long serialVersionUID = 1L;

        private IdentityMap() {
        }

        public String map(String value) throws Exception {
            return value;
        }
    }

    private static class TestOpenCloseMapFunction
    extends RichMapFunction<String, String> {
        private static final long serialVersionUID = 1L;
        public static boolean openCalled = false;
        public static boolean closeCalled = false;

        private TestOpenCloseMapFunction() {
        }

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            if (closeCalled) {
                Assert.fail((String)"Close called before open.");
            }
            openCalled = true;
        }

        public void close() throws Exception {
            super.close();
            if (!openCalled) {
                Assert.fail((String)"Open was not called before close.");
            }
            closeCalled = true;
        }

        public String map(String value) throws Exception {
            if (!openCalled) {
                Assert.fail((String)"Open was not called before run.");
            }
            return value;
        }
    }

    private static class TestingStreamOperator<IN, OUT>
    extends AbstractStreamOperator<OUT>
    implements OneInputStreamOperator<IN, OUT> {
        private static final long serialVersionUID = 774614855940397174L;
        public static int numberRestoreCalls = 0;
        public static int numberSnapshotCalls = 0;

        private TestingStreamOperator() {
        }

        public void open() throws Exception {
            super.open();
            ListState partitionableState = this.getOperatorStateBackend().getListState(TEST_DESCRIPTOR);
            if (numberSnapshotCalls == 0) {
                for (Integer v : (Iterable)partitionableState.get()) {
                    Assert.fail();
                }
            } else {
                HashSet<Integer> result = new HashSet<Integer>();
                for (Integer v : (Iterable)partitionableState.get()) {
                    result.add(v);
                }
                Assert.assertEquals((long)2L, (long)result.size());
                Assert.assertTrue((boolean)result.contains(42));
                Assert.assertTrue((boolean)result.contains(4711));
            }
        }

        public void snapshotState(StateSnapshotContext context) throws Exception {
            ListState partitionableState = this.getOperatorStateBackend().getListState(TEST_DESCRIPTOR);
            partitionableState.clear();
            partitionableState.add((Object)42);
            partitionableState.add((Object)4711);
            ++numberSnapshotCalls;
        }

        public void initializeState(StateInitializationContext context) throws Exception {
            if (context.isRestored()) {
                ++numberRestoreCalls;
            }
        }

        public void processElement(StreamRecord<IN> element) throws Exception {
        }
    }

    private static class IdentityKeySelector<IN>
    implements KeySelector<IN, IN> {
        private static final long serialVersionUID = -3555913664416688425L;

        private IdentityKeySelector() {
        }

        public IN getKey(IN value) throws Exception {
            return value;
        }
    }

    private static class TestOperator
    extends AbstractStreamOperator<String>
    implements OneInputStreamOperator<String, String> {
        private static final long serialVersionUID = 1L;

        private TestOperator() {
        }

        public void processElement(StreamRecord<String> element) throws Exception {
            this.output.collect(element);
        }

        public void close() throws Exception {
            Assert.assertTrue((boolean)((SystemProcessingTimeService)this.getContainingTask().getProcessingTimeService()).isAlive());
            super.close();
        }
    }
}

