/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

public class PartialConsumePipelinedResultTest {
    private static final int NUMBER_OF_TMS = 1;
    private static final int NUMBER_OF_SLOTS_PER_TM = 1;
    private static final int PARALLELISM = 1;
    private static final int NUMBER_OF_NETWORK_BUFFERS = 128;
    private static TestingCluster flink;

    @BeforeClass
    public static void setUp() throws Exception {
        Configuration config = new Configuration();
        config.setInteger("local.number-taskmanager", 1);
        config.setInteger("taskmanager.numberOfTaskSlots", 1);
        config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
        config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 128);
        flink = new TestingCluster(config, true);
        flink.start();
    }

    @AfterClass
    public static void tearDown() throws Exception {
        flink.stop();
    }

    @Test
    public void testPartialConsumePipelinedResultReceiver() throws Exception {
        JobVertex sender = new JobVertex("Sender");
        sender.setInvokableClass(SlowBufferSender.class);
        sender.setParallelism(1);
        JobVertex receiver = new JobVertex("Receiver");
        receiver.setInvokableClass(SingleBufferReceiver.class);
        receiver.setParallelism(1);
        receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        JobGraph jobGraph = new JobGraph("Partial Consume of Pipelined Result", new JobVertex[]{sender, receiver});
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup(new JobVertexID[]{sender.getID(), receiver.getID()});
        sender.setSlotSharingGroup(slotSharingGroup);
        receiver.setSlotSharingGroup(slotSharingGroup);
        flink.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION());
    }

    public static class SingleBufferReceiver
    extends AbstractInvokable {
        public void invoke() throws Exception {
            InputGate gate = this.getEnvironment().getInputGate(0);
            Buffer buffer = gate.getNextBufferOrEvent().getBuffer();
            if (buffer != null) {
                buffer.recycle();
            }
        }
    }

    public static class SlowBufferSender
    extends AbstractInvokable {
        public void invoke() throws Exception {
            ResultPartitionWriter writer = this.getEnvironment().getWriter(0);
            for (int i = 0; i < 8; ++i) {
                Buffer buffer = writer.getBufferProvider().requestBufferBlocking();
                writer.writeBuffer(buffer, 0);
                Thread.sleep(50L);
            }
        }
    }
}

