/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.OperatorInstanceID;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.AcknowledgeStreamMockEnvironment;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class RestoreStreamTaskTest
extends TestLogger {
    private static final Set<OperatorID> RESTORED_OPERATORS = ConcurrentHashMap.newKeySet();

    @Before
    public void setup() {
        RESTORED_OPERATORS.clear();
    }

    @Test
    public void testRestore() throws Exception {
        final OperatorID headOperatorID = new OperatorID(42L, 42L);
        final OperatorID tailOperatorID = new OperatorID(44L, 44L);
        AcknowledgeStreamMockEnvironment environment1 = this.createRunAndCheckpointOperatorChain(headOperatorID, new CounterOperator(), tailOperatorID, new CounterOperator(), Optional.empty());
        Assert.assertEquals((long)2L, (long)environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
        TaskStateSnapshot stateHandles = environment1.getCheckpointStateHandles();
        AcknowledgeStreamMockEnvironment environment2 = this.createRunAndCheckpointOperatorChain(headOperatorID, new CounterOperator(), tailOperatorID, new CounterOperator(), Optional.of(stateHandles));
        Assert.assertEquals((Object)new HashSet<OperatorID>(){
            {
                this.add(headOperatorID);
                this.add(tailOperatorID);
            }
        }, RESTORED_OPERATORS);
    }

    @Test
    public void testRestoreHeadWithNewId() throws Exception {
        final OperatorID tailOperatorID = new OperatorID(44L, 44L);
        AcknowledgeStreamMockEnvironment environment1 = this.createRunAndCheckpointOperatorChain(new OperatorID(42L, 42L), new CounterOperator(), tailOperatorID, new CounterOperator(), Optional.empty());
        Assert.assertEquals((long)2L, (long)environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
        TaskStateSnapshot stateHandles = environment1.getCheckpointStateHandles();
        AcknowledgeStreamMockEnvironment environment2 = this.createRunAndCheckpointOperatorChain(new OperatorID(4242L, 4242L), new CounterOperator(), tailOperatorID, new CounterOperator(), Optional.of(stateHandles));
        Assert.assertEquals((Object)new HashSet<OperatorID>(){
            {
                this.add(tailOperatorID);
            }
        }, RESTORED_OPERATORS);
    }

    @Test
    public void testRestoreTailWithNewId() throws Exception {
        final OperatorID headOperatorID = new OperatorID(42L, 42L);
        AcknowledgeStreamMockEnvironment environment1 = this.createRunAndCheckpointOperatorChain(headOperatorID, new CounterOperator(), new OperatorID(44L, 44L), new CounterOperator(), Optional.empty());
        Assert.assertEquals((long)2L, (long)environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
        TaskStateSnapshot stateHandles = environment1.getCheckpointStateHandles();
        AcknowledgeStreamMockEnvironment environment2 = this.createRunAndCheckpointOperatorChain(headOperatorID, new CounterOperator(), new OperatorID(4444L, 4444L), new CounterOperator(), Optional.of(stateHandles));
        Assert.assertEquals((Object)new HashSet<OperatorID>(){
            {
                this.add(headOperatorID);
            }
        }, RESTORED_OPERATORS);
    }

    @Test
    public void testRestoreAfterScaleUp() throws Exception {
        final OperatorID headOperatorID = new OperatorID(42L, 42L);
        final OperatorID tailOperatorID = new OperatorID(44L, 44L);
        AcknowledgeStreamMockEnvironment environment1 = this.createRunAndCheckpointOperatorChain(headOperatorID, new CounterOperator(), tailOperatorID, new CounterOperator(), Optional.empty());
        Assert.assertEquals((long)2L, (long)environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
        OperatorSubtaskState emptyHeadOperatorState = StateAssignmentOperation.operatorSubtaskStateFrom((OperatorInstanceID)new OperatorInstanceID(0, headOperatorID), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap());
        TaskStateSnapshot stateHandles = environment1.getCheckpointStateHandles();
        stateHandles.putSubtaskStateByOperatorID(headOperatorID, emptyHeadOperatorState);
        AcknowledgeStreamMockEnvironment environment2 = this.createRunAndCheckpointOperatorChain(headOperatorID, new CounterOperator(), tailOperatorID, new CounterOperator(), Optional.of(stateHandles));
        Assert.assertEquals((Object)new HashSet<OperatorID>(){
            {
                this.add(headOperatorID);
                this.add(tailOperatorID);
            }
        }, RESTORED_OPERATORS);
    }

    @Test
    public void testRestoreWithoutState() throws Exception {
        final OperatorID headOperatorID = new OperatorID(42L, 42L);
        final OperatorID tailOperatorID = new OperatorID(44L, 44L);
        AcknowledgeStreamMockEnvironment environment1 = this.createRunAndCheckpointOperatorChain(headOperatorID, new StatelessOperator(), tailOperatorID, new CounterOperator(), Optional.empty());
        Assert.assertEquals((long)2L, (long)environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
        TaskStateSnapshot stateHandles = environment1.getCheckpointStateHandles();
        AcknowledgeStreamMockEnvironment environment2 = this.createRunAndCheckpointOperatorChain(headOperatorID, new StatelessOperator(), tailOperatorID, new CounterOperator(), Optional.of(stateHandles));
        Assert.assertEquals((Object)new HashSet<OperatorID>(){
            {
                this.add(headOperatorID);
                this.add(tailOperatorID);
            }
        }, RESTORED_OPERATORS);
    }

    private AcknowledgeStreamMockEnvironment createRunAndCheckpointOperatorChain(OperatorID headId, OneInputStreamOperator<String, String> headOperator, OperatorID tailId, OneInputStreamOperator<String, String> tailOperator, Optional<TaskStateSnapshot> stateHandles) throws Exception {
        OneInputStreamTask streamTask = new OneInputStreamTask();
        OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<String, String>((OneInputStreamTask<String, String>)streamTask, 1, 1, (TypeInformation<String>)BasicTypeInfo.STRING_TYPE_INFO, (TypeInformation<String>)BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOperatorChain(headId, headOperator).chain(tailId, tailOperator, StringSerializer.INSTANCE).finish();
        AcknowledgeStreamMockEnvironment environment = new AcknowledgeStreamMockEnvironment(testHarness.jobConfig, testHarness.taskConfig, testHarness.executionConfig, testHarness.memorySize, new MockInputSplitProvider(), testHarness.bufferSize);
        if (stateHandles.isPresent()) {
            streamTask.setInitialState(stateHandles.get());
        }
        testHarness.invoke(environment);
        testHarness.waitForTaskRunning();
        this.processRecords(testHarness);
        this.triggerCheckpoint(testHarness, environment, (OneInputStreamTask<String, String>)streamTask);
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        return environment;
    }

    private void triggerCheckpoint(OneInputStreamTaskTestHarness<String, String> testHarness, AcknowledgeStreamMockEnvironment environment, OneInputStreamTask<String, String> streamTask) throws Exception {
        long checkpointId = 1L;
        CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 1L);
        while (!streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpoint())) {
        }
        environment.getCheckpointLatch().await();
        Assert.assertEquals((long)checkpointId, (long)environment.getCheckpointId());
    }

    private void processRecords(OneInputStreamTaskTestHarness<String, String> testHarness) throws Exception {
        ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
        testHarness.processElement(new StreamRecord((Object)"10"), 0, 0);
        testHarness.processElement(new StreamRecord((Object)"20"), 0, 0);
        testHarness.processElement(new StreamRecord((Object)"30"), 0, 0);
        testHarness.waitForInputProcessing();
        expectedOutput.add(new StreamRecord((Object)"10"));
        expectedOutput.add(new StreamRecord((Object)"20"));
        expectedOutput.add(new StreamRecord((Object)"30"));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    }

    private static class StatelessOperator
    extends RestoreWatchOperator<String, String> {
        private static final long serialVersionUID = 2048954179291813244L;

        private StatelessOperator() {
        }

        public void processElement(StreamRecord<String> element) throws Exception {
            this.output.collect(element);
        }

        public void snapshotState(StateSnapshotContext context) throws Exception {
        }
    }

    private static class CounterOperator
    extends RestoreWatchOperator<String, String> {
        private static final long serialVersionUID = 2048954179291813243L;
        private ListState<Long> counterState;
        private long counter = 0L;

        private CounterOperator() {
        }

        public void processElement(StreamRecord<String> element) throws Exception {
            ++this.counter;
            this.output.collect(element);
        }

        @Override
        public void initializeState(StateInitializationContext context) throws Exception {
            super.initializeState(context);
            this.counterState = context.getOperatorStateStore().getListState(new ListStateDescriptor("counter-state", (TypeSerializer)LongSerializer.INSTANCE));
            if (context.isRestored()) {
                for (Long value : (Iterable)this.counterState.get()) {
                    this.counter += value.longValue();
                }
                this.counterState.clear();
            }
        }

        public void snapshotState(StateSnapshotContext context) throws Exception {
            this.counterState.add((Object)this.counter);
        }
    }

    private static abstract class RestoreWatchOperator<IN, OUT>
    extends AbstractStreamOperator<OUT>
    implements OneInputStreamOperator<IN, OUT> {
        private RestoreWatchOperator() {
        }

        public void initializeState(StateInitializationContext context) throws Exception {
            if (context.isRestored()) {
                RESTORED_OPERATORS.add(this.getOperatorID());
            }
        }
    }
}

