/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.Executor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.executiongraph.DummyJobInformation;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.failover.FailoverRegion;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.RestartPipelinedRegionStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.Assert;
import org.junit.Test;

public class RestartPipelinedRegionStrategyTest {
    @Test
    public void testSimpleFailoverRegion() throws Exception {
        JobID jobId = new JobID();
        String jobName = "Test Job Sample Name";
        JobVertex v1 = new JobVertex("vertex1");
        JobVertex v2 = new JobVertex("vertex2");
        JobVertex v3 = new JobVertex("vertex3");
        JobVertex v4 = new JobVertex("vertex4");
        JobVertex v5 = new JobVertex("vertex5");
        v1.setParallelism(5);
        v2.setParallelism(7);
        v3.setParallelism(2);
        v4.setParallelism(11);
        v5.setParallelism(4);
        v1.setInvokableClass(AbstractInvokable.class);
        v2.setInvokableClass(AbstractInvokable.class);
        v3.setInvokableClass(AbstractInvokable.class);
        v4.setInvokableClass(AbstractInvokable.class);
        v5.setInvokableClass(AbstractInvokable.class);
        v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
        Scheduler scheduler = new Scheduler((Executor)TestingUtils.defaultExecutor());
        DummyJobInformation jobInformation = new DummyJobInformation(jobId, "Test Job Sample Name");
        ExecutionGraph eg = new ExecutionGraph((JobInformation)jobInformation, TestingUtils.defaultExecutor(), (Executor)TestingUtils.defaultExecutor(), AkkaUtils.getDefaultTimeout(), (RestartStrategy)new NoRestartStrategy(), (FailoverStrategy.Factory)new RestartPipelinedRegionStrategy.Factory(), (SlotProvider)scheduler, ExecutionGraph.class.getClassLoader(), (BlobWriter)VoidBlobWriter.getInstance(), AkkaUtils.getDefaultTimeout());
        try {
            eg.attachJobGraph(ordered);
        }
        catch (JobException e) {
            e.printStackTrace();
            Assert.fail((String)("Job failed with exception: " + e.getMessage()));
        }
        RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
        ExecutionJobVertex ejv1 = eg.getJobVertex(v1.getID());
        ExecutionJobVertex ejv2 = eg.getJobVertex(v2.getID());
        ExecutionJobVertex ejv3 = eg.getJobVertex(v3.getID());
        ExecutionJobVertex ejv4 = eg.getJobVertex(v4.getID());
        ExecutionJobVertex ejv5 = eg.getJobVertex(v5.getID());
        FailoverRegion region1 = strategy.getFailoverRegion(ejv1.getTaskVertices()[2]);
        FailoverRegion region2 = strategy.getFailoverRegion(ejv2.getTaskVertices()[3]);
        FailoverRegion region3 = strategy.getFailoverRegion(ejv3.getTaskVertices()[0]);
        FailoverRegion region4 = strategy.getFailoverRegion(ejv4.getTaskVertices()[4]);
        FailoverRegion region5 = strategy.getFailoverRegion(ejv5.getTaskVertices()[1]);
        Assert.assertEquals((Object)region1, (Object)region2);
        Assert.assertEquals((Object)region3, (Object)region2);
        Assert.assertEquals((Object)region4, (Object)region2);
        Assert.assertEquals((Object)region5, (Object)region2);
    }

    @Test
    public void testMultipleFailoverRegions() throws Exception {
        JobID jobId = new JobID();
        String jobName = "Test Job Sample Name";
        JobVertex v1 = new JobVertex("vertex1");
        JobVertex v2 = new JobVertex("vertex2");
        JobVertex v3 = new JobVertex("vertex3");
        JobVertex v4 = new JobVertex("vertex4");
        JobVertex v5 = new JobVertex("vertex5");
        v1.setParallelism(3);
        v2.setParallelism(2);
        v3.setParallelism(2);
        v4.setParallelism(5);
        v5.setParallelism(2);
        v1.setInvokableClass(AbstractInvokable.class);
        v2.setInvokableClass(AbstractInvokable.class);
        v3.setInvokableClass(AbstractInvokable.class);
        v4.setInvokableClass(AbstractInvokable.class);
        v5.setInvokableClass(AbstractInvokable.class);
        v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        v4.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
        Scheduler scheduler = new Scheduler((Executor)TestingUtils.defaultExecutor());
        DummyJobInformation jobInformation = new DummyJobInformation(jobId, "Test Job Sample Name");
        ExecutionGraph eg = new ExecutionGraph((JobInformation)jobInformation, TestingUtils.defaultExecutor(), (Executor)TestingUtils.defaultExecutor(), AkkaUtils.getDefaultTimeout(), (RestartStrategy)new NoRestartStrategy(), (FailoverStrategy.Factory)new RestartPipelinedRegionStrategy.Factory(), (SlotProvider)scheduler, ExecutionGraph.class.getClassLoader(), (BlobWriter)VoidBlobWriter.getInstance(), AkkaUtils.getDefaultTimeout());
        try {
            eg.attachJobGraph(ordered);
        }
        catch (JobException e) {
            e.printStackTrace();
            Assert.fail((String)("Job failed with exception: " + e.getMessage()));
        }
        RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
        ExecutionJobVertex ejv1 = eg.getJobVertex(v1.getID());
        ExecutionJobVertex ejv2 = eg.getJobVertex(v2.getID());
        ExecutionJobVertex ejv3 = eg.getJobVertex(v3.getID());
        ExecutionJobVertex ejv4 = eg.getJobVertex(v4.getID());
        ExecutionJobVertex ejv5 = eg.getJobVertex(v5.getID());
        FailoverRegion region1 = strategy.getFailoverRegion(ejv1.getTaskVertices()[1]);
        FailoverRegion region2 = strategy.getFailoverRegion(ejv2.getTaskVertices()[0]);
        FailoverRegion region4 = strategy.getFailoverRegion(ejv4.getTaskVertices()[3]);
        FailoverRegion region31 = strategy.getFailoverRegion(ejv3.getTaskVertices()[0]);
        FailoverRegion region32 = strategy.getFailoverRegion(ejv3.getTaskVertices()[1]);
        FailoverRegion region51 = strategy.getFailoverRegion(ejv5.getTaskVertices()[0]);
        FailoverRegion region52 = strategy.getFailoverRegion(ejv5.getTaskVertices()[1]);
        Assert.assertEquals((Object)region1, (Object)region2);
        Assert.assertEquals((Object)region2, (Object)region4);
        Assert.assertFalse((boolean)region31.equals(region32));
        Assert.assertFalse((boolean)region51.equals(region52));
    }

    @Test
    public void testSingleRegionWithMixedInput() throws Exception {
        JobID jobId = new JobID();
        String jobName = "Test Job Sample Name";
        JobVertex v1 = new JobVertex("vertex1");
        JobVertex v2 = new JobVertex("vertex2");
        JobVertex v3 = new JobVertex("vertex3");
        JobVertex v4 = new JobVertex("vertex4");
        JobVertex v5 = new JobVertex("vertex5");
        v1.setParallelism(3);
        v2.setParallelism(2);
        v3.setParallelism(2);
        v4.setParallelism(5);
        v5.setParallelism(2);
        v1.setInvokableClass(AbstractInvokable.class);
        v2.setInvokableClass(AbstractInvokable.class);
        v3.setInvokableClass(AbstractInvokable.class);
        v4.setInvokableClass(AbstractInvokable.class);
        v5.setInvokableClass(AbstractInvokable.class);
        v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
        Scheduler scheduler = new Scheduler((Executor)TestingUtils.defaultExecutor());
        DummyJobInformation jobInformation = new DummyJobInformation(jobId, "Test Job Sample Name");
        ExecutionGraph eg = new ExecutionGraph((JobInformation)jobInformation, TestingUtils.defaultExecutor(), (Executor)TestingUtils.defaultExecutor(), AkkaUtils.getDefaultTimeout(), (RestartStrategy)new NoRestartStrategy(), (FailoverStrategy.Factory)new RestartPipelinedRegionStrategy.Factory(), (SlotProvider)scheduler, ExecutionGraph.class.getClassLoader(), (BlobWriter)VoidBlobWriter.getInstance(), AkkaUtils.getDefaultTimeout());
        try {
            eg.attachJobGraph(ordered);
        }
        catch (JobException e) {
            e.printStackTrace();
            Assert.fail((String)("Job failed with exception: " + e.getMessage()));
        }
        RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
        ExecutionJobVertex ejv1 = eg.getJobVertex(v1.getID());
        ExecutionJobVertex ejv2 = eg.getJobVertex(v2.getID());
        ExecutionJobVertex ejv3 = eg.getJobVertex(v3.getID());
        ExecutionJobVertex ejv4 = eg.getJobVertex(v4.getID());
        ExecutionJobVertex ejv5 = eg.getJobVertex(v5.getID());
        FailoverRegion region1 = strategy.getFailoverRegion(ejv1.getTaskVertices()[1]);
        FailoverRegion region2 = strategy.getFailoverRegion(ejv2.getTaskVertices()[0]);
        FailoverRegion region4 = strategy.getFailoverRegion(ejv4.getTaskVertices()[3]);
        FailoverRegion region3 = strategy.getFailoverRegion(ejv3.getTaskVertices()[0]);
        FailoverRegion region5 = strategy.getFailoverRegion(ejv5.getTaskVertices()[1]);
        Assert.assertEquals((Object)region1, (Object)region2);
        Assert.assertEquals((Object)region2, (Object)region4);
        Assert.assertEquals((Object)region3, (Object)region2);
        Assert.assertEquals((Object)region1, (Object)region5);
    }

    @Test
    public void testMultiRegionNotAllToAll() throws Exception {
        JobID jobId = new JobID();
        String jobName = "Test Job Sample Name";
        JobVertex v1 = new JobVertex("vertex1");
        JobVertex v2 = new JobVertex("vertex2");
        JobVertex v3 = new JobVertex("vertex3");
        JobVertex v4 = new JobVertex("vertex4");
        JobVertex v5 = new JobVertex("vertex5");
        v1.setParallelism(2);
        v2.setParallelism(2);
        v3.setParallelism(5);
        v4.setParallelism(5);
        v1.setInvokableClass(AbstractInvokable.class);
        v2.setInvokableClass(AbstractInvokable.class);
        v3.setInvokableClass(AbstractInvokable.class);
        v4.setInvokableClass(AbstractInvokable.class);
        v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4));
        Scheduler scheduler = new Scheduler((Executor)TestingUtils.defaultExecutor());
        DummyJobInformation jobInformation = new DummyJobInformation(jobId, "Test Job Sample Name");
        ExecutionGraph eg = new ExecutionGraph((JobInformation)jobInformation, TestingUtils.defaultExecutor(), (Executor)TestingUtils.defaultExecutor(), AkkaUtils.getDefaultTimeout(), (RestartStrategy)new NoRestartStrategy(), (FailoverStrategy.Factory)new RestartPipelinedRegionStrategy.Factory(), (SlotProvider)scheduler, ExecutionGraph.class.getClassLoader(), (BlobWriter)VoidBlobWriter.getInstance(), AkkaUtils.getDefaultTimeout());
        try {
            eg.attachJobGraph(ordered);
        }
        catch (JobException e) {
            e.printStackTrace();
            Assert.fail((String)("Job failed with exception: " + e.getMessage()));
        }
        RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
        ExecutionJobVertex ejv1 = eg.getJobVertex(v1.getID());
        ExecutionJobVertex ejv2 = eg.getJobVertex(v2.getID());
        ExecutionJobVertex ejv3 = eg.getJobVertex(v3.getID());
        ExecutionJobVertex ejv4 = eg.getJobVertex(v4.getID());
        FailoverRegion region11 = strategy.getFailoverRegion(ejv1.getTaskVertices()[0]);
        FailoverRegion region12 = strategy.getFailoverRegion(ejv1.getTaskVertices()[1]);
        FailoverRegion region21 = strategy.getFailoverRegion(ejv2.getTaskVertices()[0]);
        FailoverRegion region22 = strategy.getFailoverRegion(ejv2.getTaskVertices()[1]);
        FailoverRegion region3 = strategy.getFailoverRegion(ejv3.getTaskVertices()[0]);
        FailoverRegion region4 = strategy.getFailoverRegion(ejv4.getTaskVertices()[3]);
        Assert.assertEquals((Object)region11, (Object)region21);
        Assert.assertEquals((Object)region12, (Object)region22);
        Assert.assertFalse((boolean)region11.equals(region12));
        Assert.assertFalse((boolean)region3.equals(region4));
    }
}

