/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.powermock.api.mockito.PowerMockito;

@RunWith(value=Parameterized.class)
public class NetworkEnvironmentTest {
    private static final int numBuffers = 1024;
    private static final int memorySegmentSize = 128;
    @Parameterized.Parameter
    public boolean enableCreditBasedFlowControl;
    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    @Parameterized.Parameters(name="Credit-based = {0}")
    public static List<Boolean> parameters() {
        return Arrays.asList(Boolean.TRUE, Boolean.FALSE);
    }

    @Test
    public void testRegisterTaskUsesBoundedBuffers() throws Exception {
        NetworkEnvironment network = new NetworkEnvironment(1024, 128, 0, 0, 2, 8, this.enableCreditBasedFlowControl);
        ResultPartition rp1 = NetworkEnvironmentTest.createResultPartition(ResultPartitionType.PIPELINED, 2);
        ResultPartition rp2 = NetworkEnvironmentTest.createResultPartition(ResultPartitionType.BLOCKING, 2);
        ResultPartition rp3 = NetworkEnvironmentTest.createResultPartition(ResultPartitionType.PIPELINED_BOUNDED, 2);
        ResultPartition rp4 = NetworkEnvironmentTest.createResultPartition(ResultPartitionType.PIPELINED_BOUNDED, 8);
        ResultPartition[] resultPartitions = new ResultPartition[]{rp1, rp2, rp3, rp4};
        SingleInputGate ig1 = this.createSingleInputGate(ResultPartitionType.PIPELINED, 2);
        SingleInputGate ig2 = this.createSingleInputGate(ResultPartitionType.BLOCKING, 2);
        SingleInputGate ig3 = this.createSingleInputGate(ResultPartitionType.PIPELINED_BOUNDED, 2);
        SingleInputGate ig4 = this.createSingleInputGate(ResultPartitionType.PIPELINED_BOUNDED, 8);
        SingleInputGate[] inputGates = new SingleInputGate[]{ig1, ig2, ig3, ig4};
        Task task = (Task)Mockito.mock(Task.class);
        Mockito.when((Object)task.getProducedPartitions()).thenReturn((Object)resultPartitions);
        Mockito.when((Object)task.getAllInputGates()).thenReturn((Object)inputGates);
        network.registerTask(task);
        Assert.assertEquals((long)rp1.getNumberOfSubpartitions(), (long)rp1.getBufferPool().getNumberOfRequiredMemorySegments());
        Assert.assertEquals((long)rp2.getNumberOfSubpartitions(), (long)rp2.getBufferPool().getNumberOfRequiredMemorySegments());
        Assert.assertEquals((long)rp3.getNumberOfSubpartitions(), (long)rp3.getBufferPool().getNumberOfRequiredMemorySegments());
        Assert.assertEquals((long)rp4.getNumberOfSubpartitions(), (long)rp4.getBufferPool().getNumberOfRequiredMemorySegments());
        Assert.assertEquals((long)Integer.MAX_VALUE, (long)rp1.getBufferPool().getMaxNumberOfMemorySegments());
        Assert.assertEquals((long)Integer.MAX_VALUE, (long)rp2.getBufferPool().getMaxNumberOfMemorySegments());
        Assert.assertEquals((long)12L, (long)rp3.getBufferPool().getMaxNumberOfMemorySegments());
        Assert.assertEquals((long)24L, (long)rp4.getBufferPool().getMaxNumberOfMemorySegments());
        Assert.assertEquals((long)(this.enableCreditBasedFlowControl ? 0L : 2L), (long)ig1.getBufferPool().getNumberOfRequiredMemorySegments());
        Assert.assertEquals((long)(this.enableCreditBasedFlowControl ? 0L : 2L), (long)ig2.getBufferPool().getNumberOfRequiredMemorySegments());
        Assert.assertEquals((long)(this.enableCreditBasedFlowControl ? 0L : 2L), (long)ig3.getBufferPool().getNumberOfRequiredMemorySegments());
        Assert.assertEquals((long)(this.enableCreditBasedFlowControl ? 0L : 8L), (long)ig4.getBufferPool().getNumberOfRequiredMemorySegments());
        Assert.assertEquals((long)Integer.MAX_VALUE, (long)ig1.getBufferPool().getMaxNumberOfMemorySegments());
        Assert.assertEquals((long)Integer.MAX_VALUE, (long)ig2.getBufferPool().getMaxNumberOfMemorySegments());
        Assert.assertEquals((long)(this.enableCreditBasedFlowControl ? 8L : 12L), (long)ig3.getBufferPool().getMaxNumberOfMemorySegments());
        Assert.assertEquals((long)(this.enableCreditBasedFlowControl ? 8L : 24L), (long)ig4.getBufferPool().getMaxNumberOfMemorySegments());
        int invokations = this.enableCreditBasedFlowControl ? 1 : 0;
        ((SingleInputGate)Mockito.verify((Object)ig1, (VerificationMode)Mockito.times((int)invokations))).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
        ((SingleInputGate)Mockito.verify((Object)ig2, (VerificationMode)Mockito.times((int)invokations))).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
        ((SingleInputGate)Mockito.verify((Object)ig3, (VerificationMode)Mockito.times((int)invokations))).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
        ((SingleInputGate)Mockito.verify((Object)ig4, (VerificationMode)Mockito.times((int)invokations))).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
        for (ResultPartition resultPartition : resultPartitions) {
            resultPartition.release();
        }
        for (ResultPartition resultPartition : inputGates) {
            resultPartition.releaseAllResources();
        }
        network.shutdown();
    }

    @Test
    public void testRegisterTaskWithLimitedBuffers() throws Exception {
        int bufferCount = !this.enableCreditBasedFlowControl ? 20 : 10 + 10 * (Integer)TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue();
        this.testRegisterTaskWithLimitedBuffers(bufferCount);
    }

    @Test
    public void testRegisterTaskWithInsufficientBuffers() throws Exception {
        int bufferCount = !this.enableCreditBasedFlowControl ? 19 : 10 + 10 * (Integer)TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() - 1;
        this.expectedException.expect(IOException.class);
        this.expectedException.expectMessage("Insufficient number of network buffers");
        this.testRegisterTaskWithLimitedBuffers(bufferCount);
    }

    private void testRegisterTaskWithLimitedBuffers(int bufferPoolSize) throws Exception {
        NetworkEnvironment network = new NetworkEnvironment(bufferPoolSize, 128, 0, 0, 2, 8, this.enableCreditBasedFlowControl);
        ConnectionManager connManager = InputChannelTestUtils.createDummyConnectionManager();
        ResultPartition rp1 = NetworkEnvironmentTest.createResultPartition(ResultPartitionType.PIPELINED, 2);
        ResultPartition rp2 = NetworkEnvironmentTest.createResultPartition(ResultPartitionType.BLOCKING, 2);
        ResultPartition rp3 = NetworkEnvironmentTest.createResultPartition(ResultPartitionType.PIPELINED_BOUNDED, 2);
        ResultPartition rp4 = NetworkEnvironmentTest.createResultPartition(ResultPartitionType.PIPELINED_BOUNDED, 4);
        ResultPartition[] resultPartitions = new ResultPartition[]{rp1, rp2, rp3, rp4};
        SingleInputGate ig1 = this.createSingleInputGate(ResultPartitionType.PIPELINED, 2);
        SingleInputGate ig2 = this.createSingleInputGate(ResultPartitionType.BLOCKING, 2);
        SingleInputGate ig3 = this.createSingleInputGate(ResultPartitionType.PIPELINED_BOUNDED, 2);
        SingleInputGate ig4 = this.createSingleInputGate(ResultPartitionType.PIPELINED_BOUNDED, 4);
        SingleInputGate[] inputGates = new SingleInputGate[]{ig1, ig2, ig3, ig4};
        if (this.enableCreditBasedFlowControl) {
            NetworkEnvironmentTest.createRemoteInputChannel(ig4, 0, rp1, connManager);
            NetworkEnvironmentTest.createRemoteInputChannel(ig4, 0, rp2, connManager);
            NetworkEnvironmentTest.createRemoteInputChannel(ig4, 0, rp3, connManager);
            NetworkEnvironmentTest.createRemoteInputChannel(ig4, 0, rp4, connManager);
            NetworkEnvironmentTest.createRemoteInputChannel(ig1, 1, rp1, connManager);
            NetworkEnvironmentTest.createRemoteInputChannel(ig1, 1, rp4, connManager);
            NetworkEnvironmentTest.createRemoteInputChannel(ig2, 1, rp2, connManager);
            NetworkEnvironmentTest.createRemoteInputChannel(ig2, 2, rp4, connManager);
            NetworkEnvironmentTest.createRemoteInputChannel(ig3, 1, rp3, connManager);
            NetworkEnvironmentTest.createRemoteInputChannel(ig3, 3, rp4, connManager);
        }
        Task task = (Task)Mockito.mock(Task.class);
        Mockito.when((Object)task.getProducedPartitions()).thenReturn((Object)resultPartitions);
        Mockito.when((Object)task.getAllInputGates()).thenReturn((Object)inputGates);
        network.registerTask(task);
        Assert.assertEquals((long)Integer.MAX_VALUE, (long)rp1.getBufferPool().getMaxNumberOfMemorySegments());
        Assert.assertEquals((long)Integer.MAX_VALUE, (long)rp2.getBufferPool().getMaxNumberOfMemorySegments());
        Assert.assertEquals((long)12L, (long)rp3.getBufferPool().getMaxNumberOfMemorySegments());
        Assert.assertEquals((long)16L, (long)rp4.getBufferPool().getMaxNumberOfMemorySegments());
        for (ResultPartition rp : resultPartitions) {
            Assert.assertEquals((long)rp.getNumberOfSubpartitions(), (long)rp.getBufferPool().getNumberOfRequiredMemorySegments());
            Assert.assertEquals((long)rp.getNumberOfSubpartitions(), (long)rp.getBufferPool().getNumBuffers());
        }
        Assert.assertEquals((long)(this.enableCreditBasedFlowControl ? 0L : 2L), (long)ig1.getBufferPool().getNumberOfRequiredMemorySegments());
        Assert.assertEquals((long)(this.enableCreditBasedFlowControl ? 0L : 2L), (long)ig2.getBufferPool().getNumberOfRequiredMemorySegments());
        Assert.assertEquals((long)(this.enableCreditBasedFlowControl ? 0L : 2L), (long)ig3.getBufferPool().getNumberOfRequiredMemorySegments());
        Assert.assertEquals((long)(this.enableCreditBasedFlowControl ? 0L : 4L), (long)ig4.getBufferPool().getNumberOfRequiredMemorySegments());
        Assert.assertEquals((long)Integer.MAX_VALUE, (long)ig1.getBufferPool().getMaxNumberOfMemorySegments());
        Assert.assertEquals((long)Integer.MAX_VALUE, (long)ig2.getBufferPool().getMaxNumberOfMemorySegments());
        Assert.assertEquals((long)(this.enableCreditBasedFlowControl ? 8L : 12L), (long)ig3.getBufferPool().getMaxNumberOfMemorySegments());
        Assert.assertEquals((long)(this.enableCreditBasedFlowControl ? 8L : 16L), (long)ig4.getBufferPool().getMaxNumberOfMemorySegments());
        int invokations = this.enableCreditBasedFlowControl ? 1 : 0;
        ((SingleInputGate)Mockito.verify((Object)ig1, (VerificationMode)Mockito.times((int)invokations))).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
        ((SingleInputGate)Mockito.verify((Object)ig2, (VerificationMode)Mockito.times((int)invokations))).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
        ((SingleInputGate)Mockito.verify((Object)ig3, (VerificationMode)Mockito.times((int)invokations))).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
        ((SingleInputGate)Mockito.verify((Object)ig4, (VerificationMode)Mockito.times((int)invokations))).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
        for (ResultPartition resultPartition : resultPartitions) {
            resultPartition.release();
        }
        for (ResultPartition resultPartition : inputGates) {
            resultPartition.releaseAllResources();
        }
        network.shutdown();
    }

    private static ResultPartition createResultPartition(ResultPartitionType partitionType, int channels) {
        return new ResultPartition("TestTask-" + partitionType + ":" + channels, (TaskActions)Mockito.mock(TaskActions.class), new JobID(), new ResultPartitionID(), partitionType, channels, channels, (ResultPartitionManager)Mockito.mock(ResultPartitionManager.class), (ResultPartitionConsumableNotifier)new NoOpResultPartitionConsumableNotifier(), (IOManager)Mockito.mock(IOManager.class), false);
    }

    private SingleInputGate createSingleInputGate(ResultPartitionType partitionType, int channels) {
        return (SingleInputGate)PowerMockito.spy((Object)new SingleInputGate("Test Task Name", new JobID(), new IntermediateDataSetID(), partitionType, 0, channels, (TaskActions)Mockito.mock(TaskActions.class), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(), this.enableCreditBasedFlowControl));
    }

    private static void createRemoteInputChannel(SingleInputGate inputGate, int channelIndex, ResultPartition resultPartition, ConnectionManager connManager) {
        RemoteInputChannel channel = new RemoteInputChannel(inputGate, channelIndex, resultPartition.getPartitionId(), (ConnectionID)Mockito.mock(ConnectionID.class), connManager, 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
        inputGate.setInputChannel(resultPartition.getPartitionId().getPartitionId(), (InputChannel)channel);
    }
}

