/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.util;

import java.util.Collection;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

public class KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT>
extends TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> {
    private AbstractKeyedStateBackend<?> keyedStateBackend = null;
    private Collection<KeyedStateHandle> restoredKeyedState = null;

    public KeyedTwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, IN2, OUT> operator, KeySelector<IN1, K> keySelector1, KeySelector<IN2, K> keySelector2, TypeInformation<K> keyType, int maxParallelism, int numSubtasks, int subtaskIndex) throws Exception {
        super(operator, maxParallelism, numSubtasks, subtaskIndex);
        ClosureCleaner.clean(keySelector1, (boolean)false);
        ClosureCleaner.clean(keySelector2, (boolean)false);
        this.config.setStatePartitioner(0, keySelector1);
        this.config.setStatePartitioner(1, keySelector2);
        this.config.setStateKeySerializer(keyType.createSerializer(this.executionConfig));
        this.setupMockTaskCreateKeyedBackend();
    }

    public KeyedTwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, IN2, OUT> operator, KeySelector<IN1, K> keySelector1, KeySelector<IN2, K> keySelector2, TypeInformation<K> keyType) throws Exception {
        this(operator, keySelector1, keySelector2, keyType, 1, 1, 0);
    }

    private void setupMockTaskCreateKeyedBackend() {
        try {
            ((StreamTask)Mockito.doAnswer((Answer)new Answer<KeyedStateBackend>(){

                public KeyedStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable {
                    TypeSerializer keySerializer = (TypeSerializer)invocationOnMock.getArguments()[0];
                    int numberOfKeyGroups = (Integer)invocationOnMock.getArguments()[1];
                    KeyGroupRange keyGroupRange = (KeyGroupRange)invocationOnMock.getArguments()[2];
                    if (KeyedTwoInputStreamOperatorTestHarness.this.keyedStateBackend != null) {
                        KeyedTwoInputStreamOperatorTestHarness.this.keyedStateBackend.close();
                    }
                    KeyedTwoInputStreamOperatorTestHarness.this.keyedStateBackend = KeyedTwoInputStreamOperatorTestHarness.this.stateBackend.createKeyedStateBackend(KeyedTwoInputStreamOperatorTestHarness.this.mockTask.getEnvironment(), new JobID(), "test_op", keySerializer, numberOfKeyGroups, keyGroupRange, KeyedTwoInputStreamOperatorTestHarness.this.mockTask.getEnvironment().getTaskKvStateRegistry());
                    if (KeyedTwoInputStreamOperatorTestHarness.this.restoredKeyedState != null) {
                        KeyedTwoInputStreamOperatorTestHarness.this.keyedStateBackend.restore(KeyedTwoInputStreamOperatorTestHarness.this.restoredKeyedState);
                    }
                    return KeyedTwoInputStreamOperatorTestHarness.this.keyedStateBackend;
                }
            }).when((Object)this.mockTask)).createKeyedStateBackend((TypeSerializer)Matchers.any(TypeSerializer.class), Mockito.anyInt(), (KeyGroupRange)Matchers.any(KeyGroupRange.class));
        }
        catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    @Override
    public void initializeState(OperatorStateHandles operatorStateHandles) throws Exception {
        if (this.restoredKeyedState != null) {
            this.restoredKeyedState = operatorStateHandles.getManagedKeyedState();
        }
        super.initializeState(operatorStateHandles);
    }

    public int numKeyedStateEntries() {
        if (this.keyedStateBackend instanceof HeapKeyedStateBackend) {
            return ((HeapKeyedStateBackend)this.keyedStateBackend).numStateEntries();
        }
        throw new UnsupportedOperationException();
    }
}

