/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.LocalConnectionManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.UnknownInputChannel;
import org.apache.flink.runtime.io.network.util.TestTaskEvent;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

@RunWith(value=Parameterized.class)
public class SingleInputGateTest {
    @Parameterized.Parameter
    public boolean enableCreditBasedFlowControl;

    @Parameterized.Parameters(name="Credit-based = {0}")
    public static List<Boolean> parameters() {
        return Arrays.asList(Boolean.TRUE, Boolean.FALSE);
    }

    @Test(timeout=120000L)
    public void testBasicGetNextLogic() throws Exception {
        SingleInputGate inputGate = this.createInputGate();
        TestInputChannel[] inputChannels = new TestInputChannel[]{new TestInputChannel(inputGate, 0), new TestInputChannel(inputGate, 1)};
        inputGate.setInputChannel(new IntermediateResultPartitionID(), (InputChannel)inputChannels[0]);
        inputGate.setInputChannel(new IntermediateResultPartitionID(), (InputChannel)inputChannels[1]);
        inputChannels[0].readBuffer();
        inputChannels[0].readBuffer();
        inputChannels[1].readBuffer();
        inputChannels[1].readEndOfPartitionEvent();
        inputChannels[0].readEndOfPartitionEvent();
        inputGate.notifyChannelNonEmpty((InputChannel)inputChannels[0]);
        inputGate.notifyChannelNonEmpty((InputChannel)inputChannels[1]);
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate, true, 0, true);
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate, true, 1, true);
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate, true, 0, true);
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate, false, 1, true);
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate, false, 0, false);
        Assert.assertTrue((boolean)inputGate.isFinished());
    }

    @Test(timeout=120000L)
    public void testIsMoreAvailableReadingFromSingleInputChannel() throws Exception {
        SingleInputGate inputGate = this.createInputGate();
        TestInputChannel[] inputChannels = new TestInputChannel[]{new TestInputChannel(inputGate, 0), new TestInputChannel(inputGate, 1)};
        inputGate.setInputChannel(new IntermediateResultPartitionID(), (InputChannel)inputChannels[0]);
        inputGate.setInputChannel(new IntermediateResultPartitionID(), (InputChannel)inputChannels[1]);
        inputChannels[0].readBuffer();
        inputChannels[0].readBuffer(false);
        inputGate.notifyChannelNonEmpty((InputChannel)inputChannels[0]);
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate, true, 0, true);
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate, true, 0, false);
    }

    @Test
    public void testBackwardsEventWithUninitializedChannel() throws Exception {
        TaskEventDispatcher taskEventDispatcher = (TaskEventDispatcher)Mockito.mock(TaskEventDispatcher.class);
        Mockito.when((Object)taskEventDispatcher.publish((ResultPartitionID)Matchers.any(ResultPartitionID.class), (TaskEvent)Matchers.any(TaskEvent.class))).thenReturn((Object)true);
        ResultSubpartitionView iterator = (ResultSubpartitionView)Mockito.mock(ResultSubpartitionView.class);
        Mockito.when((Object)iterator.getNextBuffer()).thenReturn((Object)new ResultSubpartition.BufferAndBacklog((Buffer)new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment((int)1024), FreeingBufferRecycler.INSTANCE), false, 0, false));
        ResultPartitionManager partitionManager = (ResultPartitionManager)Mockito.mock(ResultPartitionManager.class);
        Mockito.when((Object)partitionManager.createSubpartitionView((ResultPartitionID)Matchers.any(ResultPartitionID.class), Matchers.anyInt(), (BufferAvailabilityListener)Matchers.any(BufferAvailabilityListener.class))).thenReturn((Object)iterator);
        SingleInputGate inputGate = this.createInputGate();
        BufferPool bufferPool = (BufferPool)Mockito.mock(BufferPool.class);
        Mockito.when((Object)bufferPool.getNumberOfRequiredMemorySegments()).thenReturn((Object)2);
        inputGate.setBufferPool(bufferPool);
        ResultPartitionID localPartitionId = new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID());
        LocalInputChannel local = new LocalInputChannel(inputGate, 0, localPartitionId, partitionManager, taskEventDispatcher, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
        ResultPartitionID unknownPartitionId = new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID());
        UnknownInputChannel unknown = new UnknownInputChannel(inputGate, 1, unknownPartitionId, partitionManager, taskEventDispatcher, (ConnectionManager)Mockito.mock(ConnectionManager.class), 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
        inputGate.setInputChannel(localPartitionId.getPartitionId(), (InputChannel)local);
        inputGate.setInputChannel(unknownPartitionId.getPartitionId(), (InputChannel)unknown);
        inputGate.requestPartitions();
        ((ResultPartitionManager)Mockito.verify((Object)partitionManager, (VerificationMode)Mockito.times((int)1))).createSubpartitionView((ResultPartitionID)Matchers.any(ResultPartitionID.class), Matchers.anyInt(), (BufferAvailabilityListener)Matchers.any(BufferAvailabilityListener.class));
        TestTaskEvent event = new TestTaskEvent();
        inputGate.sendTaskEvent((TaskEvent)event);
        ((TaskEventDispatcher)Mockito.verify((Object)taskEventDispatcher, (VerificationMode)Mockito.times((int)1))).publish((ResultPartitionID)Matchers.any(ResultPartitionID.class), (TaskEvent)Matchers.any(TaskEvent.class));
        inputGate.updateInputChannel(new InputChannelDeploymentDescriptor(new ResultPartitionID(unknownPartitionId.getPartitionId(), unknownPartitionId.getProducerId()), ResultPartitionLocation.createLocal()));
        ((ResultPartitionManager)Mockito.verify((Object)partitionManager, (VerificationMode)Mockito.times((int)2))).createSubpartitionView((ResultPartitionID)Matchers.any(ResultPartitionID.class), Matchers.anyInt(), (BufferAvailabilityListener)Matchers.any(BufferAvailabilityListener.class));
        ((TaskEventDispatcher)Mockito.verify((Object)taskEventDispatcher, (VerificationMode)Mockito.times((int)2))).publish((ResultPartitionID)Matchers.any(ResultPartitionID.class), (TaskEvent)Matchers.any(TaskEvent.class));
    }

    @Test
    public void testUpdateChannelBeforeRequest() throws Exception {
        SingleInputGate inputGate = this.createInputGate(1);
        ResultPartitionManager partitionManager = (ResultPartitionManager)Mockito.mock(ResultPartitionManager.class);
        UnknownInputChannel unknown = new UnknownInputChannel(inputGate, 0, new ResultPartitionID(), partitionManager, new TaskEventDispatcher(), (ConnectionManager)new LocalConnectionManager(), 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
        inputGate.setInputChannel(unknown.partitionId.getPartitionId(), (InputChannel)unknown);
        inputGate.updateInputChannel(new InputChannelDeploymentDescriptor(unknown.partitionId, ResultPartitionLocation.createLocal()));
        ((ResultPartitionManager)Mockito.verify((Object)partitionManager, (VerificationMode)Mockito.never())).createSubpartitionView((ResultPartitionID)Matchers.any(ResultPartitionID.class), Matchers.anyInt(), (BufferAvailabilityListener)Matchers.any(BufferAvailabilityListener.class));
    }

    @Test
    public void testReleaseWhilePollingChannel() throws Exception {
        final AtomicReference asyncException = new AtomicReference();
        final SingleInputGate inputGate = this.createInputGate(1);
        UnknownInputChannel unknown = new UnknownInputChannel(inputGate, 0, new ResultPartitionID(), new ResultPartitionManager(), new TaskEventDispatcher(), (ConnectionManager)new LocalConnectionManager(), 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
        inputGate.setInputChannel(unknown.partitionId.getPartitionId(), (InputChannel)unknown);
        Thread asyncConsumer = new Thread(){

            @Override
            public void run() {
                try {
                    inputGate.getNextBufferOrEvent();
                }
                catch (Exception e) {
                    asyncException.set(e);
                }
            }
        };
        asyncConsumer.start();
        boolean success = false;
        for (int i = 0; i < 50; ++i) {
            if (asyncConsumer.isAlive()) {
                boolean bl = success = asyncConsumer.getState() == Thread.State.WAITING;
            }
            if (success) break;
            Thread.sleep(100L);
        }
        Assert.assertTrue((String)"Did not trigger blocking buffer request.", (boolean)success);
        inputGate.releaseAllResources();
        asyncConsumer.join();
        Assert.assertNotNull(asyncException.get());
        Assert.assertEquals(IllegalStateException.class, ((Exception)asyncException.get()).getClass());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRequestBackoffConfiguration() throws Exception {
        ResultPartitionID[] partitionIds = new ResultPartitionID[]{new ResultPartitionID(), new ResultPartitionID(), new ResultPartitionID()};
        InputChannelDeploymentDescriptor[] channelDescs = new InputChannelDeploymentDescriptor[]{new InputChannelDeploymentDescriptor(partitionIds[0], ResultPartitionLocation.createLocal()), new InputChannelDeploymentDescriptor(partitionIds[1], ResultPartitionLocation.createRemote((ConnectionID)new ConnectionID(new InetSocketAddress("localhost", 5000), 0))), new InputChannelDeploymentDescriptor(partitionIds[2], ResultPartitionLocation.createUnknown())};
        InputGateDeploymentDescriptor gateDesc = new InputGateDeploymentDescriptor(new IntermediateDataSetID(), ResultPartitionType.PIPELINED, 0, channelDescs);
        int initialBackoff = 137;
        int maxBackoff = 1001;
        NetworkEnvironment netEnv = new NetworkEnvironment(100, 32, initialBackoff, maxBackoff, 2, 8, this.enableCreditBasedFlowControl);
        SingleInputGate gate = SingleInputGate.create((String)"TestTask", (JobID)new JobID(), (ExecutionAttemptID)new ExecutionAttemptID(), (InputGateDeploymentDescriptor)gateDesc, (NetworkEnvironment)netEnv, (TaskActions)((TaskActions)Mockito.mock(TaskActions.class)), (TaskIOMetricGroup)UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
        try {
            InputChannel[] channels;
            Assert.assertEquals((Object)gateDesc.getConsumedPartitionType(), (Object)gate.getConsumedPartitionType());
            Map channelMap = gate.getInputChannels();
            Assert.assertEquals((long)3L, (long)channelMap.size());
            InputChannel localChannel = (InputChannel)channelMap.get(partitionIds[0].getPartitionId());
            Assert.assertEquals(LocalInputChannel.class, localChannel.getClass());
            InputChannel remoteChannel = (InputChannel)channelMap.get(partitionIds[1].getPartitionId());
            Assert.assertEquals(RemoteInputChannel.class, remoteChannel.getClass());
            InputChannel unknownChannel = (InputChannel)channelMap.get(partitionIds[2].getPartitionId());
            Assert.assertEquals(UnknownInputChannel.class, unknownChannel.getClass());
            for (InputChannel ch : channels = new InputChannel[]{localChannel, remoteChannel, unknownChannel}) {
                Assert.assertEquals((long)0L, (long)ch.getCurrentBackoff());
                Assert.assertTrue((boolean)ch.increaseBackoff());
                Assert.assertEquals((long)initialBackoff, (long)ch.getCurrentBackoff());
                Assert.assertTrue((boolean)ch.increaseBackoff());
                Assert.assertEquals((long)(initialBackoff * 2), (long)ch.getCurrentBackoff());
                Assert.assertTrue((boolean)ch.increaseBackoff());
                Assert.assertEquals((long)(initialBackoff * 2 * 2), (long)ch.getCurrentBackoff());
                Assert.assertTrue((boolean)ch.increaseBackoff());
                Assert.assertEquals((long)maxBackoff, (long)ch.getCurrentBackoff());
                Assert.assertFalse((boolean)ch.increaseBackoff());
            }
        }
        finally {
            gate.releaseAllResources();
            netEnv.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRequestBuffersWithRemoteInputChannel() throws Exception {
        SingleInputGate inputGate = this.createInputGate(1, ResultPartitionType.PIPELINED_BOUNDED);
        int buffersPerChannel = 2;
        int extraNetworkBuffersPerGate = 8;
        NetworkEnvironment network = new NetworkEnvironment(100, 32, 0, 0, buffersPerChannel, extraNetworkBuffersPerGate, this.enableCreditBasedFlowControl);
        try {
            ResultPartitionID resultPartitionId = new ResultPartitionID();
            ConnectionID connectionId = new ConnectionID(new InetSocketAddress("localhost", 5000), 0);
            this.addRemoteInputChannel(network, inputGate, connectionId, resultPartitionId, 0);
            network.setupInputGate(inputGate);
            NetworkBufferPool bufferPool = network.getNetworkBufferPool();
            if (this.enableCreditBasedFlowControl) {
                RemoteInputChannel remote = (RemoteInputChannel)inputGate.getInputChannels().get(resultPartitionId.getPartitionId());
                Assert.assertEquals((long)buffersPerChannel, (long)remote.getNumberOfAvailableBuffers());
                Assert.assertEquals((long)(bufferPool.getTotalNumberOfMemorySegments() - buffersPerChannel), (long)bufferPool.getNumberOfAvailableMemorySegments());
                Assert.assertEquals((long)extraNetworkBuffersPerGate, (long)bufferPool.countBuffers());
            } else {
                Assert.assertEquals((long)(buffersPerChannel + extraNetworkBuffersPerGate), (long)bufferPool.countBuffers());
            }
        }
        finally {
            inputGate.releaseAllResources();
            network.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRequestBuffersWithUnknownInputChannel() throws Exception {
        SingleInputGate inputGate = this.createInputGate(1, ResultPartitionType.PIPELINED_BOUNDED);
        int buffersPerChannel = 2;
        int extraNetworkBuffersPerGate = 8;
        NetworkEnvironment network = new NetworkEnvironment(100, 32, 0, 0, buffersPerChannel, extraNetworkBuffersPerGate, this.enableCreditBasedFlowControl);
        try {
            ResultPartitionID resultPartitionId = new ResultPartitionID();
            this.addUnknownInputChannel(network, inputGate, resultPartitionId, 0);
            network.setupInputGate(inputGate);
            NetworkBufferPool bufferPool = network.getNetworkBufferPool();
            if (this.enableCreditBasedFlowControl) {
                Assert.assertEquals((long)bufferPool.getTotalNumberOfMemorySegments(), (long)bufferPool.getNumberOfAvailableMemorySegments());
                Assert.assertEquals((long)extraNetworkBuffersPerGate, (long)bufferPool.countBuffers());
            } else {
                Assert.assertEquals((long)(buffersPerChannel + extraNetworkBuffersPerGate), (long)bufferPool.countBuffers());
            }
            ConnectionID connectionId = new ConnectionID(new InetSocketAddress("localhost", 5000), 0);
            inputGate.updateInputChannel(new InputChannelDeploymentDescriptor(resultPartitionId, ResultPartitionLocation.createRemote((ConnectionID)connectionId)));
            if (this.enableCreditBasedFlowControl) {
                RemoteInputChannel remote = (RemoteInputChannel)inputGate.getInputChannels().get(resultPartitionId.getPartitionId());
                Assert.assertEquals((long)buffersPerChannel, (long)remote.getNumberOfAvailableBuffers());
                Assert.assertEquals((long)(bufferPool.getTotalNumberOfMemorySegments() - buffersPerChannel), (long)bufferPool.getNumberOfAvailableMemorySegments());
                Assert.assertEquals((long)extraNetworkBuffersPerGate, (long)bufferPool.countBuffers());
            } else {
                Assert.assertEquals((long)(buffersPerChannel + extraNetworkBuffersPerGate), (long)bufferPool.countBuffers());
            }
        }
        finally {
            inputGate.releaseAllResources();
            network.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testUpdateUnknownInputChannel() throws Exception {
        SingleInputGate inputGate = this.createInputGate(2);
        int buffersPerChannel = 2;
        NetworkEnvironment network = new NetworkEnvironment(100, 32, 0, 0, buffersPerChannel, 8, this.enableCreditBasedFlowControl);
        try {
            ResultPartitionID localResultPartitionId = new ResultPartitionID();
            this.addUnknownInputChannel(network, inputGate, localResultPartitionId, 0);
            ResultPartitionID remoteResultPartitionId = new ResultPartitionID();
            this.addUnknownInputChannel(network, inputGate, remoteResultPartitionId, 1);
            network.setupInputGate(inputGate);
            MatcherAssert.assertThat(inputGate.getInputChannels().get(remoteResultPartitionId.getPartitionId()), (Matcher)org.hamcrest.Matchers.is((Matcher)org.hamcrest.Matchers.instanceOf(UnknownInputChannel.class)));
            MatcherAssert.assertThat(inputGate.getInputChannels().get(localResultPartitionId.getPartitionId()), (Matcher)org.hamcrest.Matchers.is((Matcher)org.hamcrest.Matchers.instanceOf(UnknownInputChannel.class)));
            ConnectionID remoteConnectionId = new ConnectionID(new InetSocketAddress("localhost", 5000), 0);
            inputGate.updateInputChannel(new InputChannelDeploymentDescriptor(remoteResultPartitionId, ResultPartitionLocation.createRemote((ConnectionID)remoteConnectionId)));
            MatcherAssert.assertThat(inputGate.getInputChannels().get(remoteResultPartitionId.getPartitionId()), (Matcher)org.hamcrest.Matchers.is((Matcher)org.hamcrest.Matchers.instanceOf(RemoteInputChannel.class)));
            MatcherAssert.assertThat(inputGate.getInputChannels().get(localResultPartitionId.getPartitionId()), (Matcher)org.hamcrest.Matchers.is((Matcher)org.hamcrest.Matchers.instanceOf(UnknownInputChannel.class)));
            inputGate.updateInputChannel(new InputChannelDeploymentDescriptor(localResultPartitionId, ResultPartitionLocation.createLocal()));
            MatcherAssert.assertThat(inputGate.getInputChannels().get(remoteResultPartitionId.getPartitionId()), (Matcher)org.hamcrest.Matchers.is((Matcher)org.hamcrest.Matchers.instanceOf(RemoteInputChannel.class)));
            MatcherAssert.assertThat(inputGate.getInputChannels().get(localResultPartitionId.getPartitionId()), (Matcher)org.hamcrest.Matchers.is((Matcher)org.hamcrest.Matchers.instanceOf(LocalInputChannel.class)));
        }
        finally {
            inputGate.releaseAllResources();
            network.shutdown();
        }
    }

    private SingleInputGate createInputGate() {
        return this.createInputGate(2);
    }

    private SingleInputGate createInputGate(int numberOfInputChannels) {
        return this.createInputGate(numberOfInputChannels, ResultPartitionType.PIPELINED);
    }

    private SingleInputGate createInputGate(int numberOfInputChannels, ResultPartitionType partitionType) {
        SingleInputGate inputGate = new SingleInputGate("Test Task Name", new JobID(), new IntermediateDataSetID(), partitionType, 0, numberOfInputChannels, (TaskActions)Mockito.mock(TaskActions.class), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(), this.enableCreditBasedFlowControl);
        Assert.assertEquals((Object)partitionType, (Object)inputGate.getConsumedPartitionType());
        return inputGate;
    }

    private void addUnknownInputChannel(NetworkEnvironment network, SingleInputGate inputGate, ResultPartitionID partitionId, int channelIndex) {
        UnknownInputChannel unknown = this.createUnknownInputChannel(network, inputGate, partitionId, channelIndex);
        inputGate.setInputChannel(partitionId.getPartitionId(), (InputChannel)unknown);
    }

    private UnknownInputChannel createUnknownInputChannel(NetworkEnvironment network, SingleInputGate inputGate, ResultPartitionID partitionId, int channelIndex) {
        return new UnknownInputChannel(inputGate, channelIndex, partitionId, network.getResultPartitionManager(), network.getTaskEventDispatcher(), network.getConnectionManager(), network.getPartitionRequestInitialBackoff(), network.getPartitionRequestMaxBackoff(), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
    }

    private void addRemoteInputChannel(NetworkEnvironment network, SingleInputGate inputGate, ConnectionID connectionId, ResultPartitionID partitionId, int channelIndex) {
        RemoteInputChannel remote = this.createUnknownInputChannel(network, inputGate, partitionId, channelIndex).toRemoteInputChannel(connectionId);
        inputGate.setInputChannel(partitionId.getPartitionId(), (InputChannel)remote);
    }

    static void verifyBufferOrEvent(InputGate inputGate, boolean expectedIsBuffer, int expectedChannelIndex, boolean expectedMoreAvailable) throws IOException, InterruptedException {
        block3: {
            Optional bufferOrEvent = inputGate.getNextBufferOrEvent();
            Assert.assertTrue((boolean)bufferOrEvent.isPresent());
            Assert.assertEquals((Object)expectedIsBuffer, (Object)((BufferOrEvent)bufferOrEvent.get()).isBuffer());
            Assert.assertEquals((long)expectedChannelIndex, (long)((BufferOrEvent)bufferOrEvent.get()).getChannelIndex());
            Assert.assertEquals((Object)expectedMoreAvailable, (Object)((BufferOrEvent)bufferOrEvent.get()).moreAvailable());
            if (!expectedMoreAvailable) {
                try {
                    Assert.assertFalse((boolean)inputGate.pollNextBufferOrEvent().isPresent());
                }
                catch (UnsupportedOperationException ex) {
                    if (inputGate instanceof UnionInputGate) break block3;
                    throw ex;
                }
            }
        }
    }
}

