/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferListener;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandlerTest;
import org.apache.flink.runtime.io.network.netty.PartitionRequestQueueTest;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class CreditBasedPartitionRequestClientHandlerTest {
    @Test(timeout=60000L)
    public void testReleaseInputChannelDuringDecode() throws Exception {
        BufferProvider bufferProvider = (BufferProvider)Mockito.mock(BufferProvider.class);
        Mockito.when((Object)bufferProvider.requestBuffer()).thenReturn(null);
        Mockito.when((Object)bufferProvider.isDestroyed()).thenReturn((Object)true);
        Mockito.when((Object)bufferProvider.addBufferListener((BufferListener)org.mockito.Matchers.any(BufferListener.class))).thenReturn((Object)false);
        RemoteInputChannel inputChannel = (RemoteInputChannel)Mockito.mock(RemoteInputChannel.class);
        Mockito.when((Object)inputChannel.getInputChannelId()).thenReturn((Object)new InputChannelID());
        Mockito.when((Object)inputChannel.getBufferProvider()).thenReturn((Object)bufferProvider);
        NettyMessage.BufferResponse receivedBuffer = PartitionRequestClientHandlerTest.createBufferResponse(TestBufferFactory.createBuffer(32768), 0, inputChannel.getInputChannelId(), 2);
        CreditBasedPartitionRequestClientHandler client = new CreditBasedPartitionRequestClientHandler();
        client.addInputChannel(inputChannel);
        client.channelRead((ChannelHandlerContext)Mockito.mock(ChannelHandlerContext.class), (Object)receivedBuffer);
    }

    @Test
    public void testReceiveEmptyBuffer() throws Exception {
        BufferProvider bufferProvider = (BufferProvider)Mockito.mock(BufferProvider.class);
        Mockito.when((Object)bufferProvider.requestBuffer()).thenReturn((Object)TestBufferFactory.createBuffer(0));
        RemoteInputChannel inputChannel = (RemoteInputChannel)Mockito.mock(RemoteInputChannel.class);
        Mockito.when((Object)inputChannel.getInputChannelId()).thenReturn((Object)new InputChannelID());
        Mockito.when((Object)inputChannel.getBufferProvider()).thenReturn((Object)bufferProvider);
        Buffer emptyBuffer = TestBufferFactory.createBuffer(0);
        int backlog = 2;
        NettyMessage.BufferResponse receivedBuffer = PartitionRequestClientHandlerTest.createBufferResponse(emptyBuffer, 0, inputChannel.getInputChannelId(), 2);
        CreditBasedPartitionRequestClientHandler client = new CreditBasedPartitionRequestClientHandler();
        client.addInputChannel(inputChannel);
        client.channelRead((ChannelHandlerContext)Mockito.mock(ChannelHandlerContext.class), (Object)receivedBuffer);
        ((RemoteInputChannel)Mockito.verify((Object)inputChannel, (VerificationMode)Mockito.never())).onError((Throwable)org.mockito.Matchers.any(Throwable.class));
        ((RemoteInputChannel)Mockito.verify((Object)inputChannel, (VerificationMode)Mockito.times((int)1))).onEmptyBuffer(0, 2);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReceiveBuffer() throws Exception {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
        SingleInputGate inputGate = PartitionRequestClientHandlerTest.createSingleInputGate();
        RemoteInputChannel inputChannel = PartitionRequestClientHandlerTest.createRemoteInputChannel(inputGate);
        try {
            BufferPool bufferPool = networkBufferPool.createBufferPool(8, 8);
            inputGate.setBufferPool(bufferPool);
            int numExclusiveBuffers = 2;
            inputGate.assignExclusiveSegments(networkBufferPool, 2);
            CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler();
            handler.addInputChannel(inputChannel);
            int backlog = 2;
            NettyMessage.BufferResponse bufferResponse = PartitionRequestClientHandlerTest.createBufferResponse(TestBufferFactory.createBuffer(32), 0, inputChannel.getInputChannelId(), 2);
            handler.channelRead((ChannelHandlerContext)Mockito.mock(ChannelHandlerContext.class), (Object)bufferResponse);
            Assert.assertEquals((long)1L, (long)inputChannel.getNumberOfQueuedBuffers());
            Assert.assertEquals((long)2L, (long)inputChannel.getSenderBacklog());
        }
        finally {
            inputGate.releaseAllResources();
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
        }
    }

    @Test
    public void testThrowExceptionForNoAvailableBuffer() throws Exception {
        SingleInputGate inputGate = PartitionRequestClientHandlerTest.createSingleInputGate();
        RemoteInputChannel inputChannel = (RemoteInputChannel)Mockito.spy((Object)PartitionRequestClientHandlerTest.createRemoteInputChannel(inputGate));
        CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler();
        handler.addInputChannel(inputChannel);
        Assert.assertEquals((String)"There should be no buffers available in the channel.", (long)0L, (long)inputChannel.getNumberOfAvailableBuffers());
        NettyMessage.BufferResponse bufferResponse = PartitionRequestClientHandlerTest.createBufferResponse(TestBufferFactory.createBuffer(32768), 0, inputChannel.getInputChannelId(), 2);
        handler.channelRead((ChannelHandlerContext)Mockito.mock(ChannelHandlerContext.class), (Object)bufferResponse);
        ((RemoteInputChannel)Mockito.verify((Object)inputChannel, (VerificationMode)Mockito.times((int)1))).onError((Throwable)org.mockito.Matchers.any(IllegalStateException.class));
    }

    @Test
    public void testReceivePartitionNotFoundException() throws Exception {
        BufferProvider bufferProvider = (BufferProvider)Mockito.mock(BufferProvider.class);
        Mockito.when((Object)bufferProvider.requestBuffer()).thenReturn((Object)TestBufferFactory.createBuffer(0));
        RemoteInputChannel inputChannel = (RemoteInputChannel)Mockito.mock(RemoteInputChannel.class);
        Mockito.when((Object)inputChannel.getInputChannelId()).thenReturn((Object)new InputChannelID());
        Mockito.when((Object)inputChannel.getBufferProvider()).thenReturn((Object)bufferProvider);
        NettyMessage.ErrorResponse partitionNotFound = new NettyMessage.ErrorResponse((Throwable)new PartitionNotFoundException(new ResultPartitionID()), inputChannel.getInputChannelId());
        CreditBasedPartitionRequestClientHandler client = new CreditBasedPartitionRequestClientHandler();
        client.addInputChannel(inputChannel);
        ChannelHandlerContext ctx = (ChannelHandlerContext)Mockito.mock(ChannelHandlerContext.class);
        Mockito.when((Object)ctx.channel()).thenReturn(Mockito.mock(Channel.class));
        client.channelActive(ctx);
        client.channelRead(ctx, (Object)partitionNotFound);
        ((RemoteInputChannel)Mockito.verify((Object)inputChannel, (VerificationMode)Mockito.times((int)1))).onFailedPartitionRequest();
    }

    @Test
    public void testCancelBeforeActive() throws Exception {
        RemoteInputChannel inputChannel = (RemoteInputChannel)Mockito.mock(RemoteInputChannel.class);
        Mockito.when((Object)inputChannel.getInputChannelId()).thenReturn((Object)new InputChannelID());
        CreditBasedPartitionRequestClientHandler client = new CreditBasedPartitionRequestClientHandler();
        client.addInputChannel(inputChannel);
        client.cancelRequestFor(null);
        client.cancelRequestFor(inputChannel.getInputChannelId());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testNotifyCreditAvailable() throws Exception {
        CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler();
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{handler});
        PartitionRequestClient client = new PartitionRequestClient((Channel)channel, (NetworkClientHandler)handler, (ConnectionID)Mockito.mock(ConnectionID.class), (PartitionRequestClientFactory)Mockito.mock(PartitionRequestClientFactory.class));
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
        SingleInputGate inputGate = PartitionRequestClientHandlerTest.createSingleInputGate();
        RemoteInputChannel inputChannel1 = PartitionRequestClientHandlerTest.createRemoteInputChannel(inputGate, client);
        RemoteInputChannel inputChannel2 = PartitionRequestClientHandlerTest.createRemoteInputChannel(inputGate, client);
        try {
            BufferPool bufferPool = networkBufferPool.createBufferPool(6, 6);
            inputGate.setBufferPool(bufferPool);
            int numExclusiveBuffers = 2;
            inputGate.assignExclusiveSegments(networkBufferPool, 2);
            inputChannel1.requestSubpartition(0);
            inputChannel2.requestSubpartition(0);
            Assert.assertTrue((boolean)channel.isWritable());
            Object readFromOutbound = channel.readOutbound();
            Assert.assertThat((Object)readFromOutbound, (Matcher)Matchers.instanceOf(NettyMessage.PartitionRequest.class));
            Assert.assertEquals((Object)inputChannel1.getInputChannelId(), (Object)((NettyMessage.PartitionRequest)readFromOutbound).receiverId);
            Assert.assertEquals((long)2L, (long)((NettyMessage.PartitionRequest)readFromOutbound).credit);
            readFromOutbound = channel.readOutbound();
            Assert.assertThat((Object)readFromOutbound, (Matcher)Matchers.instanceOf(NettyMessage.PartitionRequest.class));
            Assert.assertEquals((Object)inputChannel2.getInputChannelId(), (Object)((NettyMessage.PartitionRequest)readFromOutbound).receiverId);
            Assert.assertEquals((long)2L, (long)((NettyMessage.PartitionRequest)readFromOutbound).credit);
            NettyMessage.BufferResponse bufferResponse1 = PartitionRequestClientHandlerTest.createBufferResponse(TestBufferFactory.createBuffer(32), 0, inputChannel1.getInputChannelId(), 1);
            NettyMessage.BufferResponse bufferResponse2 = PartitionRequestClientHandlerTest.createBufferResponse(TestBufferFactory.createBuffer(32), 0, inputChannel2.getInputChannelId(), 1);
            handler.channelRead((ChannelHandlerContext)Mockito.mock(ChannelHandlerContext.class), (Object)bufferResponse1);
            handler.channelRead((ChannelHandlerContext)Mockito.mock(ChannelHandlerContext.class), (Object)bufferResponse2);
            Assert.assertEquals((long)2L, (long)inputChannel1.getUnannouncedCredit());
            Assert.assertEquals((long)2L, (long)inputChannel2.getUnannouncedCredit());
            channel.runPendingTasks();
            readFromOutbound = channel.readOutbound();
            Assert.assertThat((Object)readFromOutbound, (Matcher)Matchers.instanceOf(NettyMessage.AddCredit.class));
            Assert.assertEquals((Object)inputChannel1.getInputChannelId(), (Object)((NettyMessage.AddCredit)readFromOutbound).receiverId);
            Assert.assertEquals((long)2L, (long)((NettyMessage.AddCredit)readFromOutbound).credit);
            readFromOutbound = channel.readOutbound();
            Assert.assertThat((Object)readFromOutbound, (Matcher)Matchers.instanceOf(NettyMessage.AddCredit.class));
            Assert.assertEquals((Object)inputChannel2.getInputChannelId(), (Object)((NettyMessage.AddCredit)readFromOutbound).receiverId);
            Assert.assertEquals((long)2L, (long)((NettyMessage.AddCredit)readFromOutbound).credit);
            Assert.assertNull((Object)channel.readOutbound());
            ByteBuf channelBlockingBuffer = PartitionRequestQueueTest.blockChannel(channel);
            NettyMessage.BufferResponse bufferResponse3 = PartitionRequestClientHandlerTest.createBufferResponse(TestBufferFactory.createBuffer(32), 1, inputChannel1.getInputChannelId(), 1);
            handler.channelRead((ChannelHandlerContext)Mockito.mock(ChannelHandlerContext.class), (Object)bufferResponse3);
            Assert.assertEquals((long)1L, (long)inputChannel1.getUnannouncedCredit());
            Assert.assertEquals((long)0L, (long)inputChannel2.getUnannouncedCredit());
            channel.runPendingTasks();
            Assert.assertFalse((boolean)channel.isWritable());
            Assert.assertNull((Object)channel.readOutbound());
            channel.flush();
            Assert.assertSame((Object)channelBlockingBuffer, (Object)channel.readOutbound());
            Assert.assertTrue((boolean)channel.isWritable());
            readFromOutbound = channel.readOutbound();
            Assert.assertThat((Object)readFromOutbound, (Matcher)Matchers.instanceOf(NettyMessage.AddCredit.class));
            Assert.assertEquals((long)1L, (long)((NettyMessage.AddCredit)readFromOutbound).credit);
            Assert.assertEquals((long)0L, (long)inputChannel1.getUnannouncedCredit());
            Assert.assertEquals((long)0L, (long)inputChannel2.getUnannouncedCredit());
            Assert.assertNull((Object)channel.readOutbound());
        }
        finally {
            inputGate.releaseAllResources();
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testNotifyCreditAvailableAfterReleased() throws Exception {
        CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler();
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{handler});
        PartitionRequestClient client = new PartitionRequestClient((Channel)channel, (NetworkClientHandler)handler, (ConnectionID)Mockito.mock(ConnectionID.class), (PartitionRequestClientFactory)Mockito.mock(PartitionRequestClientFactory.class));
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
        SingleInputGate inputGate = PartitionRequestClientHandlerTest.createSingleInputGate();
        RemoteInputChannel inputChannel = PartitionRequestClientHandlerTest.createRemoteInputChannel(inputGate, client);
        try {
            BufferPool bufferPool = networkBufferPool.createBufferPool(6, 6);
            inputGate.setBufferPool(bufferPool);
            int numExclusiveBuffers = 2;
            inputGate.assignExclusiveSegments(networkBufferPool, 2);
            inputChannel.requestSubpartition(0);
            Object readFromOutbound = channel.readOutbound();
            Assert.assertThat((Object)readFromOutbound, (Matcher)Matchers.instanceOf(NettyMessage.PartitionRequest.class));
            Assert.assertEquals((long)2L, (long)((NettyMessage.PartitionRequest)readFromOutbound).credit);
            NettyMessage.BufferResponse bufferResponse = PartitionRequestClientHandlerTest.createBufferResponse(TestBufferFactory.createBuffer(32), 0, inputChannel.getInputChannelId(), 1);
            handler.channelRead((ChannelHandlerContext)Mockito.mock(ChannelHandlerContext.class), (Object)bufferResponse);
            Assert.assertEquals((long)2L, (long)inputChannel.getUnannouncedCredit());
            inputGate.releaseAllResources();
            readFromOutbound = channel.readOutbound();
            Assert.assertThat((Object)readFromOutbound, (Matcher)Matchers.instanceOf(NettyMessage.CloseRequest.class));
            channel.runPendingTasks();
            Assert.assertNull((Object)channel.readOutbound());
        }
        finally {
            inputGate.releaseAllResources();
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
        }
    }
}

