/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmanager.scheduler;

import java.util.ArrayList;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.types.IntValue;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

public class ScheduleOrUpdateConsumersTest {
    private static final int NUMBER_OF_TMS = 2;
    private static final int NUMBER_OF_SLOTS_PER_TM = 2;
    private static final int PARALLELISM = 4;
    private static TestingCluster flink;

    @BeforeClass
    public static void setUp() throws Exception {
        flink = TestingUtils.startTestingCluster(2, 2, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
    }

    @AfterClass
    public static void tearDown() throws Exception {
        flink.stop();
    }

    @Test
    public void testMixedPipelinedAndBlockingResults() throws Exception {
        JobVertex sender = new JobVertex("Sender");
        sender.setInvokableClass(BinaryRoundRobinSubtaskIndexSender.class);
        sender.getConfiguration().setInteger("number-of-times-to-send", 4);
        sender.setParallelism(4);
        JobVertex pipelinedReceiver = new JobVertex("Pipelined Receiver");
        pipelinedReceiver.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class);
        pipelinedReceiver.getConfiguration().setInteger("number-of-indexes-to-receive", 4);
        pipelinedReceiver.setParallelism(4);
        pipelinedReceiver.connectNewDataSetAsInput(sender, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobVertex blockingReceiver = new JobVertex("Blocking Receiver");
        blockingReceiver.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class);
        blockingReceiver.getConfiguration().setInteger("number-of-indexes-to-receive", 4);
        blockingReceiver.setParallelism(4);
        blockingReceiver.connectNewDataSetAsInput(sender, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup(new JobVertexID[]{sender.getID(), pipelinedReceiver.getID(), blockingReceiver.getID()});
        sender.setSlotSharingGroup(slotSharingGroup);
        pipelinedReceiver.setSlotSharingGroup(slotSharingGroup);
        blockingReceiver.setSlotSharingGroup(slotSharingGroup);
        JobGraph jobGraph = new JobGraph("Mixed pipelined and blocking result", new JobVertex[]{sender, pipelinedReceiver, blockingReceiver});
        flink.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION());
    }

    public static class BinaryRoundRobinSubtaskIndexSender
    extends AbstractInvokable {
        public static final String CONFIG_KEY = "number-of-times-to-send";

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke() throws Exception {
            ArrayList writers = Lists.newArrayListWithCapacity((int)2);
            RecordWriter pipelinedWriter = new RecordWriter(this.getEnvironment().getWriter(0));
            RecordWriter blockingWriter = new RecordWriter(this.getEnvironment().getWriter(1));
            writers.add(pipelinedWriter);
            writers.add(blockingWriter);
            int numberOfTimesToSend = this.getTaskConfiguration().getInteger(CONFIG_KEY, 0);
            IntValue subtaskIndex = new IntValue(this.getEnvironment().getTaskInfo().getIndexOfThisSubtask());
            for (RecordWriter writer : writers) {
                try {
                    for (int i = 0; i < numberOfTimesToSend; ++i) {
                        writer.emit((IOReadableWritable)subtaskIndex);
                    }
                    writer.flush();
                }
                finally {
                    writer.clearBuffers();
                }
            }
        }
    }
}

