/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.NetworkSequenceViewReader;
import org.apache.flink.runtime.io.network.netty.CreditBasedSequenceNumberingViewReader;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.netty.PartitionRequestQueue;
import org.apache.flink.runtime.io.network.netty.SequenceNumberingViewReader;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class PartitionRequestQueueTest {
    @Test
    public void testNotifyReaderNonEmptyOnEmptyReaders() throws Exception {
        int buffersToWrite = 5;
        PartitionRequestQueue queue = new PartitionRequestQueue();
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{queue});
        CreditBasedSequenceNumberingViewReader reader1 = new CreditBasedSequenceNumberingViewReader(new InputChannelID(0L, 0L), 10, queue);
        CreditBasedSequenceNumberingViewReader reader2 = new CreditBasedSequenceNumberingViewReader(new InputChannelID(1L, 1L), 10, queue);
        reader1.requestSubpartitionView((partitionId, index, availabilityListener) -> new EmptyAlwaysAvailableResultSubpartitionView(), new ResultPartitionID(), 0);
        reader1.notifyDataAvailable();
        Assert.assertTrue((boolean)reader1.isAvailable());
        Assert.assertFalse((boolean)reader1.isRegisteredAsAvailable());
        channel.unsafe().outboundBuffer().setUserDefinedWritability(1, false);
        Assert.assertFalse((boolean)channel.isWritable());
        reader1.notifyDataAvailable();
        channel.runPendingTasks();
        reader2.notifyDataAvailable();
        reader2.requestSubpartitionView((partitionId, index, availabilityListener) -> new DefaultBufferResultSubpartitionView(5), new ResultPartitionID(), 0);
        Assert.assertTrue((boolean)reader2.isAvailable());
        Assert.assertFalse((boolean)reader2.isRegisteredAsAvailable());
        reader2.notifyDataAvailable();
        channel.unsafe().outboundBuffer().setUserDefinedWritability(1, true);
        channel.runPendingTasks();
        Assert.assertEquals((long)5L, (long)channel.outboundMessages().size());
    }

    @Test
    public void testProducerFailedException() throws Exception {
        PartitionRequestQueue queue = new PartitionRequestQueue();
        ReleasedResultSubpartitionView view = new ReleasedResultSubpartitionView();
        ResultPartitionProvider partitionProvider = (partitionId, index, availabilityListener) -> view;
        EmbeddedChannel ch = new EmbeddedChannel(new ChannelHandler[]{queue});
        CreditBasedSequenceNumberingViewReader seqView = new CreditBasedSequenceNumberingViewReader(new InputChannelID(), 2, queue);
        seqView.requestSubpartitionView(partitionProvider, new ResultPartitionID(), 0);
        seqView.notifyDataAvailable();
        ch.runPendingTasks();
        Object msg = ch.readOutbound();
        Assert.assertEquals(msg.getClass(), NettyMessage.ErrorResponse.class);
        NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse)msg;
        Assert.assertTrue((boolean)(err.cause instanceof CancelTaskException));
    }

    @Test
    public void testDefaultBufferWriting() throws Exception {
        this.testBufferWriting(new DefaultBufferResultSubpartitionView(1));
    }

    @Test
    public void testReadOnlyBufferWriting() throws Exception {
        this.testBufferWriting(new ReadOnlyBufferResultSubpartitionView(1));
    }

    private void testBufferWriting(ResultSubpartitionView view) throws IOException {
        ResultPartitionProvider partitionProvider = (partitionId, index, availabilityListener) -> view;
        InputChannelID receiverId = new InputChannelID();
        PartitionRequestQueue queue = new PartitionRequestQueue();
        SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, queue);
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{queue});
        reader.requestSubpartitionView(partitionProvider, new ResultPartitionID(), 0);
        reader.notifyDataAvailable();
        channel.runPendingTasks();
        Object read = channel.readOutbound();
        Assert.assertNotNull((Object)read);
        if (read instanceof NettyMessage.ErrorResponse) {
            ((NettyMessage.ErrorResponse)read).cause.printStackTrace();
        }
        Assert.assertThat((Object)read, (Matcher)Matchers.instanceOf(NettyMessage.BufferResponse.class));
        read = channel.readOutbound();
        Assert.assertNull((Object)read);
    }

    @Test
    public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
        NextIsEventResultSubpartitionView view = new NextIsEventResultSubpartitionView();
        ResultPartitionProvider partitionProvider = (partitionId, index, availabilityListener) -> view;
        InputChannelID receiverId = new InputChannelID();
        PartitionRequestQueue queue = new PartitionRequestQueue();
        CreditBasedSequenceNumberingViewReader reader = new CreditBasedSequenceNumberingViewReader(receiverId, 0, queue);
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{queue});
        reader.requestSubpartitionView(partitionProvider, new ResultPartitionID(), 0);
        ByteBuf channelBlockingBuffer = PartitionRequestQueueTest.blockChannel(channel);
        Assert.assertNull((Object)channel.readOutbound());
        reader.notifyDataAvailable();
        channel.runPendingTasks();
        Assert.assertThat((Object)queue.getAvailableReaders(), (Matcher)Matchers.contains((Object[])new NetworkSequenceViewReader[]{reader}));
        Assert.assertEquals((long)0L, (long)reader.getNumCreditsAvailable());
        channel.flush();
        Assert.assertSame((Object)channelBlockingBuffer, (Object)channel.readOutbound());
        Assert.assertEquals((long)0L, (long)queue.getAvailableReaders().size());
        Assert.assertEquals((long)0L, (long)reader.getNumCreditsAvailable());
        Assert.assertNull((Object)channel.readOutbound());
    }

    @Test
    public void testEnqueueReaderByNotifyingBufferAndCredit() throws Exception {
        int i;
        DefaultBufferResultSubpartitionView view = new DefaultBufferResultSubpartitionView(10);
        ResultPartitionProvider partitionProvider = (partitionId, index, availabilityListener) -> view;
        InputChannelID receiverId = new InputChannelID();
        PartitionRequestQueue queue = new PartitionRequestQueue();
        CreditBasedSequenceNumberingViewReader reader = new CreditBasedSequenceNumberingViewReader(receiverId, 0, queue);
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{queue});
        reader.requestSubpartitionView(partitionProvider, new ResultPartitionID(), 0);
        queue.notifyReaderCreated((NetworkSequenceViewReader)reader);
        ByteBuf channelBlockingBuffer = PartitionRequestQueueTest.blockChannel(channel);
        Assert.assertNull((Object)channel.readOutbound());
        int notifyNumBuffers = 5;
        for (int i2 = 0; i2 < 5; ++i2) {
            reader.notifyDataAvailable();
        }
        channel.runPendingTasks();
        Assert.assertEquals((long)0L, (long)queue.getAvailableReaders().size());
        Assert.assertTrue((boolean)reader.hasBuffersAvailable());
        Assert.assertFalse((boolean)reader.isRegisteredAsAvailable());
        Assert.assertEquals((long)0L, (long)reader.getNumCreditsAvailable());
        int notifyNumCredits = 3;
        for (i = 1; i <= 3; ++i) {
            queue.addCredit(receiverId, 1);
            Assert.assertTrue((boolean)reader.isRegisteredAsAvailable());
            Assert.assertThat((Object)queue.getAvailableReaders(), (Matcher)Matchers.contains((Object[])new NetworkSequenceViewReader[]{reader}));
            Assert.assertEquals((long)i, (long)reader.getNumCreditsAvailable());
            Assert.assertTrue((boolean)reader.hasBuffersAvailable());
        }
        channel.flush();
        Assert.assertSame((Object)channelBlockingBuffer, (Object)channel.readOutbound());
        Assert.assertEquals((long)0L, (long)queue.getAvailableReaders().size());
        Assert.assertEquals((long)0L, (long)reader.getNumCreditsAvailable());
        Assert.assertTrue((boolean)reader.hasBuffersAvailable());
        Assert.assertFalse((boolean)reader.isRegisteredAsAvailable());
        for (i = 1; i <= 3; ++i) {
            Assert.assertThat((Object)channel.readOutbound(), (Matcher)Matchers.instanceOf(NettyMessage.BufferResponse.class));
        }
        Assert.assertNull((Object)channel.readOutbound());
    }

    static ByteBuf blockChannel(EmbeddedChannel channel) {
        int highWaterMark = channel.config().getWriteBufferHighWaterMark();
        ByteBuf channelBlockingBuffer = Unpooled.buffer((int)highWaterMark).writerIndex(highWaterMark);
        channel.write((Object)channelBlockingBuffer);
        Assert.assertFalse((boolean)channel.isWritable());
        return channelBlockingBuffer;
    }

    private static class NoOpResultSubpartitionView
    implements ResultSubpartitionView {
        private NoOpResultSubpartitionView() {
        }

        @Nullable
        public ResultSubpartition.BufferAndBacklog getNextBuffer() {
            return null;
        }

        public void notifyDataAvailable() {
        }

        public void releaseAllResources() {
        }

        public void notifySubpartitionConsumed() {
        }

        public boolean isReleased() {
            return true;
        }

        public Throwable getFailureCause() {
            return null;
        }

        public boolean nextBufferIsEvent() {
            return false;
        }

        public boolean isAvailable() {
            return false;
        }
    }

    private static class NextIsEventResultSubpartitionView
    extends NoOpResultSubpartitionView {
        private NextIsEventResultSubpartitionView() {
        }

        @Override
        public boolean nextBufferIsEvent() {
            return true;
        }

        @Override
        public boolean isAvailable() {
            return true;
        }
    }

    private static class ReleasedResultSubpartitionView
    extends EmptyAlwaysAvailableResultSubpartitionView {
        private ReleasedResultSubpartitionView() {
        }

        @Override
        public boolean isReleased() {
            return true;
        }

        @Override
        public Throwable getFailureCause() {
            return new RuntimeException("Expected test exception");
        }
    }

    private static class EmptyAlwaysAvailableResultSubpartitionView
    extends NoOpResultSubpartitionView {
        private EmptyAlwaysAvailableResultSubpartitionView() {
        }

        @Override
        public boolean isReleased() {
            return false;
        }

        @Override
        public boolean isAvailable() {
            return true;
        }
    }

    private static class ReadOnlyBufferResultSubpartitionView
    extends DefaultBufferResultSubpartitionView {
        private ReadOnlyBufferResultSubpartitionView(int buffersInBacklog) {
            super(buffersInBacklog);
        }

        @Override
        @Nullable
        public ResultSubpartition.BufferAndBacklog getNextBuffer() {
            ResultSubpartition.BufferAndBacklog nextBuffer = super.getNextBuffer();
            return new ResultSubpartition.BufferAndBacklog(nextBuffer.buffer().readOnlySlice(), nextBuffer.isMoreAvailable(), nextBuffer.buffersInBacklog(), nextBuffer.nextBufferIsEvent());
        }
    }

    private static class DefaultBufferResultSubpartitionView
    extends NoOpResultSubpartitionView {
        private final AtomicInteger buffersInBacklog;

        private DefaultBufferResultSubpartitionView(int buffersInBacklog) {
            this.buffersInBacklog = new AtomicInteger(buffersInBacklog);
        }

        @Override
        @Nullable
        public ResultSubpartition.BufferAndBacklog getNextBuffer() {
            int buffers = this.buffersInBacklog.decrementAndGet();
            return new ResultSubpartition.BufferAndBacklog(TestBufferFactory.createBuffer(10), buffers > 0, buffers, false);
        }

        @Override
        public boolean isAvailable() {
            return this.buffersInBacklog.get() > 0;
        }
    }
}

