/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.util;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.RunnableFuture;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.OperatorStateRepartitioner;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.RoundRobinOperatorStateRepartitioner;
import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.util.FutureUtil;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

public class AbstractStreamOperatorTestHarness<OUT>
implements AutoCloseable {
    protected final StreamOperator<OUT> operator;
    protected final ConcurrentLinkedQueue<Object> outputList;
    protected final Map<OutputTag<?>, ConcurrentLinkedQueue<Object>> sideOutputLists;
    protected final StreamConfig config;
    protected final ExecutionConfig executionConfig;
    protected final TestProcessingTimeService processingTimeService;
    protected final StreamTask<?, ?> mockTask;
    final Environment environment;
    CloseableRegistry closableRegistry;
    protected StateBackend stateBackend = new MemoryStateBackend();
    private final Object checkpointLock;
    private final OperatorStateRepartitioner operatorStateRepartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE;
    private boolean setupCalled = false;
    private boolean initializeCalled = false;
    private volatile boolean wasFailedExternally = false;

    public AbstractStreamOperatorTestHarness(StreamOperator<OUT> operator, int maxParallelism, int parallelism, int subtaskIndex) throws Exception {
        this(operator, (Environment)new MockEnvironment("MockTask", 0x300000L, new MockInputSplitProvider(), 1024, new Configuration(), new ExecutionConfig(), maxParallelism, parallelism, subtaskIndex));
    }

    public AbstractStreamOperatorTestHarness(StreamOperator<OUT> operator, final Environment environment) throws Exception {
        this.operator = operator;
        this.outputList = new ConcurrentLinkedQueue();
        this.sideOutputLists = new HashMap();
        Configuration underlyingConfig = environment.getTaskConfiguration();
        this.config = new StreamConfig(underlyingConfig);
        this.config.setCheckpointingEnabled(true);
        this.config.setOperatorID(new OperatorID());
        this.executionConfig = environment.getExecutionConfig();
        this.closableRegistry = new CloseableRegistry();
        this.checkpointLock = new Object();
        this.environment = (Environment)Preconditions.checkNotNull((Object)environment);
        this.mockTask = (StreamTask)Mockito.mock(StreamTask.class);
        this.processingTimeService = new TestProcessingTimeService();
        this.processingTimeService.setCurrentTime(0L);
        StreamStatusMaintainer mockStreamStatusMaintainer = new StreamStatusMaintainer(){
            StreamStatus currentStreamStatus = StreamStatus.ACTIVE;

            public void toggleStreamStatus(StreamStatus streamStatus) {
                if (!this.currentStreamStatus.equals((Object)streamStatus)) {
                    this.currentStreamStatus = streamStatus;
                }
            }

            public StreamStatus getStreamStatus() {
                return this.currentStreamStatus;
            }
        };
        Mockito.when((Object)this.mockTask.getName()).thenReturn((Object)"Mock Task");
        Mockito.when((Object)this.mockTask.getCheckpointLock()).thenReturn(this.checkpointLock);
        Mockito.when((Object)this.mockTask.getConfiguration()).thenReturn((Object)this.config);
        Mockito.when((Object)this.mockTask.getTaskConfiguration()).thenReturn((Object)underlyingConfig);
        Mockito.when((Object)this.mockTask.getEnvironment()).thenReturn((Object)environment);
        Mockito.when((Object)this.mockTask.getExecutionConfig()).thenReturn((Object)this.executionConfig);
        ClassLoader cl = environment.getUserClassLoader();
        Mockito.when((Object)this.mockTask.getUserCodeClassLoader()).thenReturn((Object)cl);
        Mockito.when((Object)this.mockTask.getCancelables()).thenReturn((Object)this.closableRegistry);
        Mockito.when((Object)this.mockTask.getStreamStatusMaintainer()).thenReturn((Object)mockStreamStatusMaintainer);
        ((StreamTask)Mockito.doAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock invocation) throws Throwable {
                AbstractStreamOperatorTestHarness.this.wasFailedExternally = true;
                return null;
            }
        }).when(this.mockTask)).handleAsyncException((String)Matchers.any(String.class), (Throwable)Matchers.any(Throwable.class));
        try {
            ((StreamTask)Mockito.doAnswer((Answer)new Answer<CheckpointStreamFactory>(){

                public CheckpointStreamFactory answer(InvocationOnMock invocationOnMock) throws Throwable {
                    StreamOperator operator = (StreamOperator)invocationOnMock.getArguments()[0];
                    return AbstractStreamOperatorTestHarness.this.stateBackend.createStreamFactory(new JobID(), operator.getClass().getSimpleName());
                }
            }).when(this.mockTask)).createCheckpointStreamFactory((StreamOperator)Matchers.any(StreamOperator.class));
        }
        catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
        try {
            ((StreamTask)Mockito.doAnswer((Answer)new Answer<OperatorStateBackend>(){

                public OperatorStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable {
                    StreamOperator operator = (StreamOperator)invocationOnMock.getArguments()[0];
                    Collection stateHandles = (Collection)invocationOnMock.getArguments()[1];
                    OperatorStateBackend osb = AbstractStreamOperatorTestHarness.this.stateBackend.createOperatorStateBackend(environment, operator.getClass().getSimpleName());
                    AbstractStreamOperatorTestHarness.this.mockTask.getCancelables().registerCloseable((Closeable)osb);
                    if (null != stateHandles) {
                        osb.restore(stateHandles);
                    }
                    return osb;
                }
            }).when(this.mockTask)).createOperatorStateBackend((StreamOperator)Matchers.any(StreamOperator.class), (Collection)Matchers.any(Collection.class));
        }
        catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
        ((StreamTask)Mockito.doAnswer((Answer)new Answer<ProcessingTimeService>(){

            public ProcessingTimeService answer(InvocationOnMock invocation) throws Throwable {
                return AbstractStreamOperatorTestHarness.this.processingTimeService;
            }
        }).when(this.mockTask)).getProcessingTimeService();
    }

    public void setStateBackend(StateBackend stateBackend) {
        this.stateBackend = stateBackend;
    }

    public Object getCheckpointLock() {
        return this.mockTask.getCheckpointLock();
    }

    public Environment getEnvironment() {
        return this.mockTask.getEnvironment();
    }

    public ExecutionConfig getExecutionConfig() {
        return this.executionConfig;
    }

    public ConcurrentLinkedQueue<Object> getOutput() {
        return this.outputList;
    }

    public <X> ConcurrentLinkedQueue<StreamRecord<X>> getSideOutput(OutputTag<X> tag) {
        return this.sideOutputLists.get(tag);
    }

    public List<StreamRecord<? extends OUT>> extractOutputStreamRecords() {
        LinkedList<StreamRecord<OUT>> resultElements = new LinkedList<StreamRecord<OUT>>();
        for (Object e : this.getOutput()) {
            if (!(e instanceof StreamRecord)) continue;
            resultElements.add((StreamRecord)e);
        }
        return resultElements;
    }

    public void setup() {
        this.setup(null);
    }

    public void setup(TypeSerializer<OUT> outputSerializer) {
        this.operator.setup(this.mockTask, this.config, (Output)new MockOutput(outputSerializer));
        this.setupCalled = true;
    }

    public void initializeState(OperatorStateHandles operatorStateHandles) throws Exception {
        if (!this.setupCalled) {
            this.setup();
        }
        if (operatorStateHandles != null) {
            int numKeyGroups = this.getEnvironment().getTaskInfo().getMaxNumberOfParallelSubtasks();
            int numSubtasks = this.getEnvironment().getTaskInfo().getNumberOfParallelSubtasks();
            int subtaskIndex = this.getEnvironment().getTaskInfo().getIndexOfThisSubtask();
            List keyGroupPartitions = StateAssignmentOperation.createKeyGroupPartitions((int)numKeyGroups, (int)numSubtasks);
            KeyGroupRange localKeyGroupRange = (KeyGroupRange)keyGroupPartitions.get(subtaskIndex);
            List localManagedKeyGroupState = null;
            if (operatorStateHandles.getManagedKeyedState() != null) {
                localManagedKeyGroupState = StateAssignmentOperation.getKeyedStateHandles((Collection)operatorStateHandles.getManagedKeyedState(), (KeyGroupRange)localKeyGroupRange);
            }
            List localRawKeyGroupState = null;
            if (operatorStateHandles.getRawKeyedState() != null) {
                localRawKeyGroupState = StateAssignmentOperation.getKeyedStateHandles((Collection)operatorStateHandles.getRawKeyedState(), (KeyGroupRange)localKeyGroupRange);
            }
            ArrayList managedOperatorState = new ArrayList();
            if (operatorStateHandles.getManagedOperatorState() != null) {
                managedOperatorState.addAll(operatorStateHandles.getManagedOperatorState());
            }
            Collection localManagedOperatorState = (Collection)this.operatorStateRepartitioner.repartitionState(managedOperatorState, numSubtasks).get(subtaskIndex);
            ArrayList rawOperatorState = new ArrayList();
            if (operatorStateHandles.getRawOperatorState() != null) {
                rawOperatorState.addAll(operatorStateHandles.getRawOperatorState());
            }
            Collection localRawOperatorState = (Collection)this.operatorStateRepartitioner.repartitionState(rawOperatorState, numSubtasks).get(subtaskIndex);
            OperatorSubtaskState massagedOperatorStateHandles = new OperatorSubtaskState(AbstractStreamOperatorTestHarness.nullToEmptyCollection(localManagedOperatorState), AbstractStreamOperatorTestHarness.nullToEmptyCollection(localRawOperatorState), AbstractStreamOperatorTestHarness.nullToEmptyCollection(localManagedKeyGroupState), AbstractStreamOperatorTestHarness.nullToEmptyCollection(localRawKeyGroupState));
            this.operator.initializeState(massagedOperatorStateHandles);
        } else {
            this.operator.initializeState(null);
        }
        this.initializeCalled = true;
    }

    private static <T> Collection<T> nullToEmptyCollection(Collection<T> collection) {
        return collection != null ? collection : Collections.emptyList();
    }

    public static OperatorStateHandles repackageState(OperatorStateHandles ... handles) throws Exception {
        if (handles.length < 1) {
            return null;
        }
        if (handles.length == 1) {
            return handles[0];
        }
        ArrayList mergedManagedOperatorState = new ArrayList(handles.length);
        ArrayList mergedRawOperatorState = new ArrayList(handles.length);
        ArrayList mergedManagedKeyedState = new ArrayList(handles.length);
        ArrayList mergedRawKeyedState = new ArrayList(handles.length);
        for (OperatorStateHandles handle : handles) {
            Collection managedOperatorState = handle.getManagedOperatorState();
            Collection rawOperatorState = handle.getRawOperatorState();
            Collection managedKeyedState = handle.getManagedKeyedState();
            Collection rawKeyedState = handle.getRawKeyedState();
            if (managedOperatorState != null) {
                mergedManagedOperatorState.addAll(managedOperatorState);
            }
            if (rawOperatorState != null) {
                mergedRawOperatorState.addAll(rawOperatorState);
            }
            if (managedKeyedState != null) {
                mergedManagedKeyedState.addAll(managedKeyedState);
            }
            if (rawKeyedState == null) continue;
            mergedRawKeyedState.addAll(rawKeyedState);
        }
        return new OperatorStateHandles(0, mergedManagedKeyedState, mergedRawKeyedState, mergedManagedOperatorState, mergedRawOperatorState);
    }

    public void open() throws Exception {
        if (!this.initializeCalled) {
            this.initializeState(null);
        }
        this.operator.open();
    }

    public OperatorStateHandles snapshot(long checkpointId, long timestamp) throws Exception {
        OperatorSnapshotResult operatorStateResult = this.operator.snapshotState(checkpointId, timestamp, CheckpointOptions.forCheckpoint());
        KeyedStateHandle keyedManaged = (KeyedStateHandle)FutureUtil.runIfNotDoneAndGet((RunnableFuture)operatorStateResult.getKeyedStateManagedFuture());
        KeyedStateHandle keyedRaw = (KeyedStateHandle)FutureUtil.runIfNotDoneAndGet((RunnableFuture)operatorStateResult.getKeyedStateRawFuture());
        OperatorStateHandle opManaged = (OperatorStateHandle)FutureUtil.runIfNotDoneAndGet((RunnableFuture)operatorStateResult.getOperatorStateManagedFuture());
        OperatorStateHandle opRaw = (OperatorStateHandle)FutureUtil.runIfNotDoneAndGet((RunnableFuture)operatorStateResult.getOperatorStateRawFuture());
        return new OperatorStateHandles(0, keyedManaged != null ? Collections.singletonList(keyedManaged) : null, keyedRaw != null ? Collections.singletonList(keyedRaw) : null, opManaged != null ? Collections.singletonList(opManaged) : null, opRaw != null ? Collections.singletonList(opRaw) : null);
    }

    public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
        this.operator.notifyOfCompletedCheckpoint(checkpointId);
    }

    @Override
    public void close() throws Exception {
        this.operator.close();
        this.operator.dispose();
        if (this.processingTimeService != null) {
            this.processingTimeService.shutdownService();
        }
        this.setupCalled = false;
    }

    public void setProcessingTime(long time) throws Exception {
        this.processingTimeService.setCurrentTime(time);
    }

    public long getProcessingTime() {
        return this.processingTimeService.getCurrentProcessingTime();
    }

    public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
        this.config.setTimeCharacteristic(timeCharacteristic);
    }

    public TimeCharacteristic getTimeCharacteristic() {
        return this.config.getTimeCharacteristic();
    }

    public boolean wasFailedExternally() {
        return this.wasFailedExternally;
    }

    @VisibleForTesting
    public int numProcessingTimeTimers() {
        if (this.operator instanceof AbstractStreamOperator) {
            return ((AbstractStreamOperator)this.operator).numProcessingTimeTimers();
        }
        throw new UnsupportedOperationException();
    }

    @VisibleForTesting
    public int numEventTimeTimers() {
        if (this.operator instanceof AbstractStreamOperator) {
            return ((AbstractStreamOperator)this.operator).numEventTimeTimers();
        }
        throw new UnsupportedOperationException();
    }

    private class MockOutput
    implements Output<StreamRecord<OUT>> {
        private TypeSerializer<OUT> outputSerializer;
        private TypeSerializer sideOutputSerializer;

        MockOutput() {
            this(null);
        }

        MockOutput(TypeSerializer<OUT> outputSerializer) {
            this.outputSerializer = outputSerializer;
        }

        public void emitWatermark(Watermark mark) {
            AbstractStreamOperatorTestHarness.this.outputList.add(mark);
        }

        public void emitLatencyMarker(LatencyMarker latencyMarker) {
            AbstractStreamOperatorTestHarness.this.outputList.add(latencyMarker);
        }

        public void collect(StreamRecord<OUT> element) {
            if (this.outputSerializer == null) {
                this.outputSerializer = TypeExtractor.getForObject((Object)element.getValue()).createSerializer(AbstractStreamOperatorTestHarness.this.executionConfig);
            }
            if (element.hasTimestamp()) {
                AbstractStreamOperatorTestHarness.this.outputList.add(new StreamRecord(this.outputSerializer.copy(element.getValue()), element.getTimestamp()));
            } else {
                AbstractStreamOperatorTestHarness.this.outputList.add(new StreamRecord(this.outputSerializer.copy(element.getValue())));
            }
        }

        public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
            this.sideOutputSerializer = TypeExtractor.getForObject((Object)record.getValue()).createSerializer(AbstractStreamOperatorTestHarness.this.executionConfig);
            ConcurrentLinkedQueue<Object> sideOutputList = AbstractStreamOperatorTestHarness.this.sideOutputLists.get(outputTag);
            if (sideOutputList == null) {
                sideOutputList = new ConcurrentLinkedQueue();
                AbstractStreamOperatorTestHarness.this.sideOutputLists.put(outputTag, sideOutputList);
            }
            if (record.hasTimestamp()) {
                sideOutputList.add(new StreamRecord(this.sideOutputSerializer.copy(record.getValue()), record.getTimestamp()));
            } else {
                sideOutputList.add(new StreamRecord(this.sideOutputSerializer.copy(record.getValue())));
            }
        }

        public void close() {
        }
    }
}

