/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.util.LinkedList;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;

public class TwoInputStreamTaskTestHarness<IN1, IN2, OUT>
extends StreamTaskTestHarness<OUT> {
    private TypeInformation<IN1> inputType1;
    private TypeSerializer<IN1> inputSerializer1;
    private TypeInformation<IN2> inputType2;
    private TypeSerializer<IN2> inputSerializer2;
    private int[] inputGateAssignment;

    public TwoInputStreamTaskTestHarness(TwoInputStreamTask<IN1, IN2, OUT> task, int numInputGates, int numInputChannelsPerGate, int[] inputGateAssignment, TypeInformation<IN1> inputType1, TypeInformation<IN2> inputType2, TypeInformation<OUT> outputType) {
        super((AbstractInvokable)task, outputType);
        this.inputType1 = inputType1;
        this.inputSerializer1 = inputType1.createSerializer(this.executionConfig);
        this.inputType2 = inputType2;
        this.inputSerializer2 = inputType2.createSerializer(this.executionConfig);
        this.numInputGates = numInputGates;
        this.numInputChannelsPerGate = numInputChannelsPerGate;
        this.inputGateAssignment = inputGateAssignment;
    }

    public TwoInputStreamTaskTestHarness(TwoInputStreamTask<IN1, IN2, OUT> task, TypeInformation<IN1> inputType1, TypeInformation<IN2> inputType2, TypeInformation<OUT> outputType) {
        this(task, 2, 1, new int[]{1, 2}, inputType1, inputType2, outputType);
    }

    @Override
    protected void initializeInputs() throws IOException, InterruptedException {
        this.inputGates = new StreamTestSingleInputGate[this.numInputGates];
        LinkedList<StreamEdge> inPhysicalEdges = new LinkedList<StreamEdge>();
        AbstractStreamOperator dummyOperator = new AbstractStreamOperator<IN1>(){
            private static final long serialVersionUID = 1L;
        };
        StreamNode sourceVertexDummy = new StreamNode(null, Integer.valueOf(0), "default group", (StreamOperator)dummyOperator, "source dummy", new LinkedList(), SourceStreamTask.class);
        StreamNode targetVertexDummy = new StreamNode(null, Integer.valueOf(1), "default group", (StreamOperator)dummyOperator, "target dummy", new LinkedList(), SourceStreamTask.class);
        for (int i = 0; i < this.numInputGates; ++i) {
            switch (this.inputGateAssignment[i]) {
                case 1: {
                    this.inputGates[i] = new StreamTestSingleInputGate<IN1>(this.numInputChannelsPerGate, this.bufferSize, this.inputSerializer1);
                    StreamEdge streamEdge = new StreamEdge(sourceVertexDummy, targetVertexDummy, 1, new LinkedList(), (StreamPartitioner)new BroadcastPartitioner(), null);
                    inPhysicalEdges.add(streamEdge);
                    break;
                }
                case 2: {
                    this.inputGates[i] = new StreamTestSingleInputGate<IN2>(this.numInputChannelsPerGate, this.bufferSize, this.inputSerializer2);
                    StreamEdge streamEdge = new StreamEdge(sourceVertexDummy, targetVertexDummy, 2, new LinkedList(), (StreamPartitioner)new BroadcastPartitioner(), null);
                    inPhysicalEdges.add(streamEdge);
                    break;
                }
                default: {
                    throw new IllegalStateException("Wrong input gate assignment.");
                }
            }
            this.mockEnv.addInputGate((InputGate)this.inputGates[i].getInputGate());
        }
        this.streamConfig.setInPhysicalEdges(inPhysicalEdges);
        this.streamConfig.setNumberOfInputs(this.numInputGates);
        this.streamConfig.setTypeSerializerIn1(this.inputSerializer1);
        this.streamConfig.setTypeSerializerIn2(this.inputSerializer2);
    }
}

