/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmanager.scheduler;

import java.util.ArrayList;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.TestLogger;
import org.junit.ClassRule;
import org.junit.Test;

public class ScheduleOrUpdateConsumersTest
extends TestLogger {
    private static final int NUMBER_OF_TMS = 2;
    private static final int NUMBER_OF_SLOTS_PER_TM = 2;
    private static final int PARALLELISM = 4;
    @ClassRule
    public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(ScheduleOrUpdateConsumersTest.getFlinkConfiguration()).setNumberTaskManagers(2).setNumberSlotsPerTaskManager(2).build());

    private static Configuration getFlinkConfiguration() {
        Configuration config = new Configuration();
        config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
        return config;
    }

    @Test
    public void testMixedPipelinedAndBlockingResults() throws Exception {
        JobVertex sender = new JobVertex("Sender");
        sender.setInvokableClass(BinaryRoundRobinSubtaskIndexSender.class);
        sender.getConfiguration().setInteger("number-of-times-to-send", 4);
        sender.setParallelism(4);
        JobVertex pipelinedReceiver = new JobVertex("Pipelined Receiver");
        pipelinedReceiver.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class);
        pipelinedReceiver.getConfiguration().setInteger("number-of-indexes-to-receive", 4);
        pipelinedReceiver.setParallelism(4);
        pipelinedReceiver.connectNewDataSetAsInput(sender, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobVertex blockingReceiver = new JobVertex("Blocking Receiver");
        blockingReceiver.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class);
        blockingReceiver.getConfiguration().setInteger("number-of-indexes-to-receive", 4);
        blockingReceiver.setParallelism(4);
        blockingReceiver.connectNewDataSetAsInput(sender, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup(new JobVertexID[]{sender.getID(), pipelinedReceiver.getID(), blockingReceiver.getID()});
        sender.setSlotSharingGroup(slotSharingGroup);
        pipelinedReceiver.setSlotSharingGroup(slotSharingGroup);
        blockingReceiver.setSlotSharingGroup(slotSharingGroup);
        JobGraph jobGraph = new JobGraph("Mixed pipelined and blocking result", new JobVertex[]{sender, pipelinedReceiver, blockingReceiver});
        MINI_CLUSTER_RESOURCE.getMiniCluster().executeJobBlocking(jobGraph);
    }

    public static class BinaryRoundRobinSubtaskIndexSender
    extends AbstractInvokable {
        static final String CONFIG_KEY = "number-of-times-to-send";

        public BinaryRoundRobinSubtaskIndexSender(Environment environment) {
            super(environment);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke() throws Exception {
            ArrayList writers = Lists.newArrayListWithCapacity((int)2);
            RecordWriter pipelinedWriter = new RecordWriter(this.getEnvironment().getWriter(0));
            RecordWriter blockingWriter = new RecordWriter(this.getEnvironment().getWriter(1));
            writers.add(pipelinedWriter);
            writers.add(blockingWriter);
            int numberOfTimesToSend = this.getTaskConfiguration().getInteger(CONFIG_KEY, 0);
            IntValue subtaskIndex = new IntValue(this.getEnvironment().getTaskInfo().getIndexOfThisSubtask());
            for (RecordWriter writer : writers) {
                try {
                    for (int i = 0; i < numberOfTimesToSend; ++i) {
                        writer.emit((IOReadableWritable)subtaskIndex);
                    }
                    writer.flushAll();
                }
                finally {
                    writer.clearBuffers();
                }
            }
        }
    }
}

