/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandlerTest;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class PartitionRequestClientTest {
    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRetriggerPartitionRequest() throws Exception {
        long deadline = System.currentTimeMillis() + 30000L;
        PartitionRequestClientHandler handler = new PartitionRequestClientHandler();
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{handler});
        PartitionRequestClient client = new PartitionRequestClient((Channel)channel, (NetworkClientHandler)handler, (ConnectionID)Mockito.mock(ConnectionID.class), (PartitionRequestClientFactory)Mockito.mock(PartitionRequestClientFactory.class));
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
        SingleInputGate inputGate = PartitionRequestClientHandlerTest.createSingleInputGate();
        RemoteInputChannel inputChannel = PartitionRequestClientHandlerTest.createRemoteInputChannel(inputGate, client, 1, 2);
        try {
            BufferPool bufferPool = networkBufferPool.createBufferPool(6, 6);
            inputGate.setBufferPool(bufferPool);
            int numExclusiveBuffers = 2;
            inputGate.assignExclusiveSegments(networkBufferPool, 2);
            inputChannel.requestSubpartition(0);
            Assert.assertTrue((boolean)channel.isWritable());
            Object readFromOutbound = channel.readOutbound();
            Assert.assertThat((Object)readFromOutbound, (Matcher)Matchers.instanceOf(NettyMessage.PartitionRequest.class));
            Assert.assertEquals((Object)inputChannel.getInputChannelId(), (Object)((NettyMessage.PartitionRequest)readFromOutbound).receiverId);
            Assert.assertEquals((long)2L, (long)((NettyMessage.PartitionRequest)readFromOutbound).credit);
            inputGate.retriggerPartitionRequest(inputChannel.getPartitionId().getPartitionId());
            this.runAllScheduledPendingTasks(channel, deadline);
            readFromOutbound = channel.readOutbound();
            Assert.assertThat((Object)readFromOutbound, (Matcher)Matchers.instanceOf(NettyMessage.PartitionRequest.class));
            Assert.assertEquals((Object)inputChannel.getInputChannelId(), (Object)((NettyMessage.PartitionRequest)readFromOutbound).receiverId);
            Assert.assertEquals((long)2L, (long)((NettyMessage.PartitionRequest)readFromOutbound).credit);
            inputGate.retriggerPartitionRequest(inputChannel.getPartitionId().getPartitionId());
            this.runAllScheduledPendingTasks(channel, deadline);
            readFromOutbound = channel.readOutbound();
            Assert.assertThat((Object)readFromOutbound, (Matcher)Matchers.instanceOf(NettyMessage.PartitionRequest.class));
            Assert.assertEquals((Object)inputChannel.getInputChannelId(), (Object)((NettyMessage.PartitionRequest)readFromOutbound).receiverId);
            Assert.assertEquals((long)2L, (long)((NettyMessage.PartitionRequest)readFromOutbound).credit);
            Assert.assertNull((Object)channel.readOutbound());
        }
        finally {
            inputGate.releaseAllResources();
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDoublePartitionRequest() throws Exception {
        PartitionRequestClientHandler handler = new PartitionRequestClientHandler();
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{handler});
        PartitionRequestClient client = new PartitionRequestClient((Channel)channel, (NetworkClientHandler)handler, (ConnectionID)Mockito.mock(ConnectionID.class), (PartitionRequestClientFactory)Mockito.mock(PartitionRequestClientFactory.class));
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
        SingleInputGate inputGate = PartitionRequestClientHandlerTest.createSingleInputGate();
        RemoteInputChannel inputChannel = PartitionRequestClientHandlerTest.createRemoteInputChannel(inputGate, client);
        try {
            BufferPool bufferPool = networkBufferPool.createBufferPool(6, 6);
            inputGate.setBufferPool(bufferPool);
            int numExclusiveBuffers = 2;
            inputGate.assignExclusiveSegments(networkBufferPool, 2);
            inputChannel.requestSubpartition(0);
            Assert.assertTrue((boolean)channel.isWritable());
            Object readFromOutbound = channel.readOutbound();
            Assert.assertThat((Object)readFromOutbound, (Matcher)Matchers.instanceOf(NettyMessage.PartitionRequest.class));
            Assert.assertEquals((Object)inputChannel.getInputChannelId(), (Object)((NettyMessage.PartitionRequest)readFromOutbound).receiverId);
            Assert.assertEquals((long)2L, (long)((NettyMessage.PartitionRequest)readFromOutbound).credit);
            Assert.assertNull((Object)channel.readOutbound());
        }
        finally {
            inputGate.releaseAllResources();
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
        }
    }

    void runAllScheduledPendingTasks(EmbeddedChannel channel, long deadline) throws InterruptedException {
        while (channel.runScheduledPendingTasks() != -1L && System.currentTimeMillis() < deadline) {
            Thread.sleep(1L);
        }
    }
}

