/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.TestSingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

public class StreamTestSingleInputGate<T>
extends TestSingleInputGate {
    private final int numInputChannels;
    private final TestInputChannel[] inputChannels;
    private final int bufferSize;
    private TypeSerializer<T> serializer;
    private ConcurrentLinkedQueue<InputValue<Object>>[] inputQueues;

    public StreamTestSingleInputGate(int numInputChannels, int bufferSize, TypeSerializer<T> serializer) throws IOException, InterruptedException {
        super(numInputChannels, false);
        this.bufferSize = bufferSize;
        this.serializer = serializer;
        this.numInputChannels = numInputChannels;
        this.inputChannels = new TestInputChannel[numInputChannels];
        this.inputQueues = new ConcurrentLinkedQueue[numInputChannels];
        this.setupInputChannels();
        ((SingleInputGate)Mockito.doReturn((Object)bufferSize).when((Object)this.inputGate)).getPageSize();
    }

    private void setupInputChannels() throws IOException, InterruptedException {
        for (int i = 0; i < this.numInputChannels; ++i) {
            final int channelIndex = i;
            SpanningRecordSerializer recordSerializer = new SpanningRecordSerializer();
            SerializationDelegate delegate = new SerializationDelegate((TypeSerializer)new StreamElementSerializer(this.serializer));
            this.inputQueues[channelIndex] = new ConcurrentLinkedQueue();
            this.inputChannels[channelIndex] = new TestInputChannel(this.inputGate, i);
            Answer<InputChannel.BufferAndAvailability> answer = new Answer<InputChannel.BufferAndAvailability>((RecordSerializer)recordSerializer, delegate){
                final /* synthetic */ RecordSerializer val$recordSerializer;
                final /* synthetic */ SerializationDelegate val$delegate;
                {
                    this.val$recordSerializer = recordSerializer;
                    this.val$delegate = serializationDelegate;
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public InputChannel.BufferAndAvailability answer(InvocationOnMock invocationOnMock) throws Throwable {
                    InputValue input = (InputValue)StreamTestSingleInputGate.this.inputQueues[channelIndex].poll();
                    if (input != null && input.isStreamEnd()) {
                        Mockito.when((Object)StreamTestSingleInputGate.this.inputChannels[channelIndex].getInputChannel().isReleased()).thenReturn((Object)true);
                        return new InputChannel.BufferAndAvailability(EventSerializer.toBuffer((AbstractEvent)EndOfPartitionEvent.INSTANCE), false);
                    }
                    if (input != null && input.isStreamRecord()) {
                        Object inputElement = input.getStreamRecord();
                        Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment((int)StreamTestSingleInputGate.this.bufferSize), (BufferRecycler)Mockito.mock(BufferRecycler.class));
                        this.val$recordSerializer.setNextBuffer(buffer);
                        this.val$delegate.setInstance(inputElement);
                        this.val$recordSerializer.addRecord((IOReadableWritable)this.val$delegate);
                        return new InputChannel.BufferAndAvailability(this.val$recordSerializer.getCurrentBuffer(), false);
                    }
                    if (input != null && input.isEvent()) {
                        AbstractEvent event = input.getEvent();
                        return new InputChannel.BufferAndAvailability(EventSerializer.toBuffer((AbstractEvent)event), false);
                    }
                    ConcurrentLinkedQueue concurrentLinkedQueue = StreamTestSingleInputGate.this.inputQueues[channelIndex];
                    synchronized (concurrentLinkedQueue) {
                        StreamTestSingleInputGate.this.inputQueues[channelIndex].wait();
                        return this.answer(invocationOnMock);
                    }
                }
            };
            Mockito.when((Object)this.inputChannels[channelIndex].getInputChannel().getNextBuffer()).thenAnswer((Answer)answer);
            this.inputGate.setInputChannel(new IntermediateResultPartitionID(), this.inputChannels[channelIndex].getInputChannel());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sendElement(Object element, int channel) {
        ConcurrentLinkedQueue<InputValue<Object>> concurrentLinkedQueue = this.inputQueues[channel];
        synchronized (concurrentLinkedQueue) {
            this.inputQueues[channel].add(InputValue.element(element));
            this.inputQueues[channel].notifyAll();
        }
        this.inputGate.notifyChannelNonEmpty(this.inputChannels[channel].getInputChannel());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sendEvent(AbstractEvent event, int channel) {
        ConcurrentLinkedQueue<InputValue<Object>> concurrentLinkedQueue = this.inputQueues[channel];
        synchronized (concurrentLinkedQueue) {
            this.inputQueues[channel].add(InputValue.event(event));
            this.inputQueues[channel].notifyAll();
        }
        this.inputGate.notifyChannelNonEmpty(this.inputChannels[channel].getInputChannel());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void endInput() {
        for (int i = 0; i < this.numInputChannels; ++i) {
            ConcurrentLinkedQueue<InputValue<Object>> concurrentLinkedQueue = this.inputQueues[i];
            synchronized (concurrentLinkedQueue) {
                this.inputQueues[i].add(InputValue.streamEnd());
                this.inputQueues[i].notifyAll();
            }
            this.inputGate.notifyChannelNonEmpty(this.inputChannels[i].getInputChannel());
        }
    }

    public boolean allQueuesEmpty() {
        for (int i = 0; i < this.numInputChannels; ++i) {
            if (this.inputQueues[i].size() <= 0) continue;
            return false;
        }
        return true;
    }

    private static class InputValue<T> {
        private Object elementOrEvent;
        private boolean isStreamEnd;
        private boolean isStreamRecord;
        private boolean isEvent;

        private InputValue(Object elementOrEvent, boolean isStreamEnd, boolean isEvent, boolean isStreamRecord) {
            this.elementOrEvent = elementOrEvent;
            this.isStreamEnd = isStreamEnd;
            this.isStreamRecord = isStreamRecord;
            this.isEvent = isEvent;
        }

        public static <X> InputValue<X> element(Object element) {
            return new InputValue(element, false, false, true);
        }

        public static <X> InputValue<X> streamEnd() {
            return new InputValue(null, true, false, false);
        }

        public static <X> InputValue<X> event(AbstractEvent event) {
            return new InputValue(event, false, true, false);
        }

        public Object getStreamRecord() {
            return this.elementOrEvent;
        }

        public AbstractEvent getEvent() {
            return (AbstractEvent)this.elementOrEvent;
        }

        public boolean isStreamEnd() {
            return this.isStreamEnd;
        }

        public boolean isStreamRecord() {
            return this.isStreamRecord;
        }

        public boolean isEvent() {
            return this.isEvent;
        }
    }
}

