/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.util;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.Migration;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
extends OneInputStreamOperatorTestHarness<IN, OUT> {
    private AbstractKeyedStateBackend<?> keyedStateBackend = null;
    private List<KeyedStateHandle> restoredKeyedState = null;

    public KeyedOneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator, KeySelector<IN, K> keySelector, TypeInformation<K> keyType, int maxParallelism, int numSubtasks, int subtaskIndex) throws Exception {
        super(operator, maxParallelism, numSubtasks, subtaskIndex);
        ClosureCleaner.clean(keySelector, (boolean)false);
        this.config.setStatePartitioner(0, keySelector);
        this.config.setStateKeySerializer(keyType.createSerializer(this.executionConfig));
        this.setupMockTaskCreateKeyedBackend();
    }

    public KeyedOneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator, KeySelector<IN, K> keySelector, TypeInformation<K> keyType) throws Exception {
        this(operator, keySelector, keyType, 1, 1, 0);
    }

    public KeyedOneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator, KeySelector<IN, K> keySelector, TypeInformation<K> keyType, Environment environment) throws Exception {
        super(operator, environment);
        ClosureCleaner.clean(keySelector, (boolean)false);
        this.config.setStatePartitioner(0, keySelector);
        this.config.setStateKeySerializer(keyType.createSerializer(this.executionConfig));
        this.setupMockTaskCreateKeyedBackend();
    }

    private void setupMockTaskCreateKeyedBackend() {
        try {
            ((StreamTask)Mockito.doAnswer((Answer)new Answer<KeyedStateBackend>(){

                public KeyedStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable {
                    TypeSerializer keySerializer = (TypeSerializer)invocationOnMock.getArguments()[0];
                    int numberOfKeyGroups = (Integer)invocationOnMock.getArguments()[1];
                    KeyGroupRange keyGroupRange = (KeyGroupRange)invocationOnMock.getArguments()[2];
                    if (KeyedOneInputStreamOperatorTestHarness.this.keyedStateBackend != null) {
                        KeyedOneInputStreamOperatorTestHarness.this.keyedStateBackend.dispose();
                    }
                    KeyedOneInputStreamOperatorTestHarness.this.keyedStateBackend = KeyedOneInputStreamOperatorTestHarness.this.stateBackend.createKeyedStateBackend(KeyedOneInputStreamOperatorTestHarness.this.mockTask.getEnvironment(), new JobID(), "test_op", keySerializer, numberOfKeyGroups, keyGroupRange, KeyedOneInputStreamOperatorTestHarness.this.mockTask.getEnvironment().getTaskKvStateRegistry());
                    KeyedOneInputStreamOperatorTestHarness.this.keyedStateBackend.restore((Collection)KeyedOneInputStreamOperatorTestHarness.this.restoredKeyedState);
                    return KeyedOneInputStreamOperatorTestHarness.this.keyedStateBackend;
                }
            }).when((Object)this.mockTask)).createKeyedStateBackend((TypeSerializer)Matchers.any(TypeSerializer.class), Mockito.anyInt(), (KeyGroupRange)Matchers.any(KeyGroupRange.class));
        }
        catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    private static boolean hasMigrationHandles(Collection<KeyedStateHandle> allKeyGroupsHandles) {
        for (KeyedStateHandle handle : allKeyGroupsHandles) {
            if (!(handle instanceof Migration)) continue;
            return true;
        }
        return false;
    }

    public int numKeyedStateEntries() {
        if (this.keyedStateBackend instanceof HeapKeyedStateBackend) {
            return ((HeapKeyedStateBackend)this.keyedStateBackend).numStateEntries();
        }
        throw new UnsupportedOperationException();
    }

    public <N> int numKeyedStateEntries(N namespace) {
        if (this.keyedStateBackend instanceof HeapKeyedStateBackend) {
            return ((HeapKeyedStateBackend)this.keyedStateBackend).numStateEntries(namespace);
        }
        throw new UnsupportedOperationException();
    }

    @Override
    public void initializeState(OperatorStateHandles operatorStateHandles) throws Exception {
        if (operatorStateHandles != null) {
            int numKeyGroups = this.getEnvironment().getTaskInfo().getMaxNumberOfParallelSubtasks();
            int numSubtasks = this.getEnvironment().getTaskInfo().getNumberOfParallelSubtasks();
            int subtaskIndex = this.getEnvironment().getTaskInfo().getIndexOfThisSubtask();
            List keyGroupPartitions = StateAssignmentOperation.createKeyGroupPartitions((int)numKeyGroups, (int)numSubtasks);
            KeyGroupRange localKeyGroupRange = (KeyGroupRange)keyGroupPartitions.get(subtaskIndex);
            this.restoredKeyedState = null;
            Collection managedKeyedState = operatorStateHandles.getManagedKeyedState();
            if (managedKeyedState != null) {
                if (KeyedOneInputStreamOperatorTestHarness.hasMigrationHandles(managedKeyedState)) {
                    ArrayList<KeyedStateHandle> result = new ArrayList<KeyedStateHandle>(managedKeyedState.size());
                    result.addAll(managedKeyedState);
                    this.restoredKeyedState = result;
                } else {
                    this.restoredKeyedState = StateAssignmentOperation.getKeyedStateHandles((Collection)managedKeyedState, (KeyGroupRange)localKeyGroupRange);
                }
            }
        }
        super.initializeState(operatorStateHandles);
    }
}

