/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.DummyJobInformation;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.RestartPipelinedRegionStrategy;
import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import scala.concurrent.ExecutionContext;

public class FailoverRegionTest
extends TestLogger {
    @Test
    public void testSingleRegionFailover() throws Exception {
        InfiniteDelayRestartStrategy restartStrategy = new InfiniteDelayRestartStrategy(10);
        ExecutionGraph eg = FailoverRegionTest.createSingleRegionExecutionGraph(restartStrategy);
        RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
        ExecutionVertex ev = (ExecutionVertex)eg.getAllExecutionVertices().iterator().next();
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)strategy.getFailoverRegion(ev).getState());
        ev.getCurrentExecutionAttempt().fail((Throwable)new Exception("Test Exception"));
        Assert.assertEquals((Object)JobStatus.CANCELLING, (Object)strategy.getFailoverRegion(ev).getState());
        for (ExecutionVertex evs : eg.getAllExecutionVertices()) {
            evs.getCurrentExecutionAttempt().cancelingComplete();
        }
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)strategy.getFailoverRegion(ev).getState());
    }

    @Test
    public void testMultiRegionsFailover() throws Exception {
        JobID jobId = new JobID();
        String jobName = "Test Job Sample Name";
        SimpleSlotProvider slotProvider = new SimpleSlotProvider(jobId, 20);
        JobVertex v1 = new JobVertex("vertex1");
        JobVertex v2 = new JobVertex("vertex2");
        JobVertex v3 = new JobVertex("vertex3");
        JobVertex v4 = new JobVertex("vertex4");
        v1.setParallelism(2);
        v2.setParallelism(2);
        v3.setParallelism(2);
        v4.setParallelism(1);
        v1.setInvokableClass(AbstractInvokable.class);
        v2.setInvokableClass(AbstractInvokable.class);
        v3.setInvokableClass(AbstractInvokable.class);
        v4.setInvokableClass(AbstractInvokable.class);
        v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        List<JobVertex> ordered = Arrays.asList(v1, v2, v3, v4);
        ExecutionGraph eg = new ExecutionGraph((JobInformation)new DummyJobInformation(jobId, "Test Job Sample Name"), TestingUtils.defaultExecutor(), (Executor)TestingUtils.defaultExecutor(), AkkaUtils.getDefaultTimeout(), (RestartStrategy)new InfiniteDelayRestartStrategy(10), (FailoverStrategy.Factory)new FailoverPipelinedRegionWithDirectExecutor(), (SlotProvider)slotProvider);
        eg.attachJobGraph(ordered);
        RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
        ExecutionVertex ev11 = eg.getJobVertex(v1.getID()).getTaskVertices()[0];
        ExecutionVertex ev21 = eg.getJobVertex(v2.getID()).getTaskVertices()[0];
        ExecutionVertex ev12 = eg.getJobVertex(v1.getID()).getTaskVertices()[1];
        ExecutionVertex ev22 = eg.getJobVertex(v2.getID()).getTaskVertices()[1];
        ExecutionVertex ev31 = eg.getJobVertex(v3.getID()).getTaskVertices()[0];
        ExecutionVertex ev32 = eg.getJobVertex(v3.getID()).getTaskVertices()[1];
        ExecutionVertex ev4 = eg.getJobVertex(v3.getID()).getTaskVertices()[0];
        eg.scheduleForExecution();
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)strategy.getFailoverRegion(ev11).getState());
        ev21.scheduleForExecution((SlotProvider)slotProvider, true, LocationPreferenceConstraint.ALL);
        ev21.getCurrentExecutionAttempt().fail((Throwable)new Exception("New fail"));
        Assert.assertEquals((Object)JobStatus.CANCELLING, (Object)strategy.getFailoverRegion(ev11).getState());
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)strategy.getFailoverRegion(ev22).getState());
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)strategy.getFailoverRegion(ev31).getState());
        ev11.getCurrentExecutionAttempt().cancelingComplete();
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)strategy.getFailoverRegion(ev11).getState());
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)strategy.getFailoverRegion(ev22).getState());
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)strategy.getFailoverRegion(ev31).getState());
        ev11.getCurrentExecutionAttempt().markFinished();
        ev21.getCurrentExecutionAttempt().markFinished();
        ev22.scheduleForExecution((SlotProvider)slotProvider, true, LocationPreferenceConstraint.ALL);
        ev22.getCurrentExecutionAttempt().markFinished();
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)strategy.getFailoverRegion(ev11).getState());
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)strategy.getFailoverRegion(ev22).getState());
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)strategy.getFailoverRegion(ev31).getState());
        ExecutionGraphTestUtils.waitUntilExecutionState(ev31.getCurrentExecutionAttempt(), ExecutionState.DEPLOYING, 2000L);
        ExecutionGraphTestUtils.waitUntilExecutionState(ev32.getCurrentExecutionAttempt(), ExecutionState.DEPLOYING, 2000L);
        ev31.getCurrentExecutionAttempt().fail((Throwable)new Exception("New fail"));
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)strategy.getFailoverRegion(ev11).getState());
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)strategy.getFailoverRegion(ev22).getState());
        Assert.assertEquals((Object)JobStatus.CANCELLING, (Object)strategy.getFailoverRegion(ev31).getState());
        ev32.getCurrentExecutionAttempt().cancelingComplete();
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)strategy.getFailoverRegion(ev11).getState());
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)strategy.getFailoverRegion(ev22).getState());
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)strategy.getFailoverRegion(ev31).getState());
    }

    @Test
    public void testNoManualRestart() throws Exception {
        NoRestartStrategy restartStrategy = new NoRestartStrategy();
        ExecutionGraph eg = FailoverRegionTest.createSingleRegionExecutionGraph((RestartStrategy)restartStrategy);
        ExecutionVertex ev = (ExecutionVertex)eg.getAllExecutionVertices().iterator().next();
        ev.fail((Throwable)new Exception("Test Exception"));
        for (ExecutionVertex evs : eg.getAllExecutionVertices()) {
            evs.getCurrentExecutionAttempt().cancelingComplete();
        }
        Assert.assertEquals((Object)JobStatus.FAILED, (Object)eg.getState());
    }

    @Test
    public void testMultiRegionFailoverAtSameTime() throws Exception {
        Instance instance = ExecutionGraphTestUtils.getInstance((TaskManagerGateway)new ActorTaskManagerGateway((ActorGateway)new ExecutionGraphTestUtils.SimpleActorGateway((ExecutionContext)TestingUtils.directExecutionContext())), 16);
        Scheduler scheduler = new Scheduler((Executor)TestingUtils.defaultExecutionContext());
        scheduler.newInstanceAvailable(instance);
        JobID jobId = new JobID();
        String jobName = "Test Job Sample Name";
        JobVertex v1 = new JobVertex("vertex1");
        JobVertex v2 = new JobVertex("vertex2");
        JobVertex v3 = new JobVertex("vertex3");
        JobVertex v4 = new JobVertex("vertex4");
        v1.setParallelism(2);
        v2.setParallelism(2);
        v3.setParallelism(2);
        v4.setParallelism(2);
        v1.setInvokableClass(AbstractInvokable.class);
        v2.setInvokableClass(AbstractInvokable.class);
        v3.setInvokableClass(AbstractInvokable.class);
        v4.setInvokableClass(AbstractInvokable.class);
        v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        List<JobVertex> ordered = Arrays.asList(v1, v2, v3, v4);
        ExecutionGraph eg = new ExecutionGraph((JobInformation)new DummyJobInformation(jobId, "Test Job Sample Name"), TestingUtils.defaultExecutor(), (Executor)TestingUtils.defaultExecutor(), AkkaUtils.getDefaultTimeout(), (RestartStrategy)new InfiniteDelayRestartStrategy(10), (FailoverStrategy.Factory)new RestartPipelinedRegionStrategy.Factory(), (SlotProvider)scheduler);
        try {
            eg.attachJobGraph(ordered);
        }
        catch (JobException e) {
            e.printStackTrace();
            Assert.fail((String)("Job failed with exception: " + e.getMessage()));
        }
        eg.scheduleForExecution();
        RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
        ExecutionVertex ev11 = eg.getJobVertex(v1.getID()).getTaskVertices()[0];
        ExecutionVertex ev12 = eg.getJobVertex(v1.getID()).getTaskVertices()[1];
        ExecutionVertex ev31 = eg.getJobVertex(v3.getID()).getTaskVertices()[0];
        ExecutionVertex ev32 = eg.getJobVertex(v3.getID()).getTaskVertices()[1];
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)strategy.getFailoverRegion(ev11).getState());
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)strategy.getFailoverRegion(ev31).getState());
        ev11.getCurrentExecutionAttempt().fail((Throwable)new Exception("new fail"));
        ev31.getCurrentExecutionAttempt().fail((Throwable)new Exception("new fail"));
        Assert.assertEquals((Object)JobStatus.CANCELLING, (Object)strategy.getFailoverRegion(ev11).getState());
        Assert.assertEquals((Object)JobStatus.CANCELLING, (Object)strategy.getFailoverRegion(ev31).getState());
        ev32.getCurrentExecutionAttempt().cancelingComplete();
        ExecutionGraphTestUtils.waitUntilFailoverRegionState(strategy.getFailoverRegion(ev31), JobStatus.RUNNING, 1000L);
        ev12.getCurrentExecutionAttempt().cancelingComplete();
        ExecutionGraphTestUtils.waitUntilFailoverRegionState(strategy.getFailoverRegion(ev11), JobStatus.RUNNING, 1000L);
    }

    @Ignore
    @Test
    public void testSucceedingNoticePreceding() throws Exception {
        Instance instance = ExecutionGraphTestUtils.getInstance((TaskManagerGateway)new ActorTaskManagerGateway((ActorGateway)new ExecutionGraphTestUtils.SimpleActorGateway((ExecutionContext)TestingUtils.directExecutionContext())), 14);
        Scheduler scheduler = new Scheduler((Executor)TestingUtils.defaultExecutionContext());
        scheduler.newInstanceAvailable(instance);
        JobID jobId = new JobID();
        String jobName = "Test Job Sample Name";
        JobVertex v1 = new JobVertex("vertex1");
        JobVertex v2 = new JobVertex("vertex2");
        v1.setParallelism(1);
        v2.setParallelism(1);
        v1.setInvokableClass(AbstractInvokable.class);
        v2.setInvokableClass(AbstractInvokable.class);
        v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
        ExecutionGraph eg = new ExecutionGraph((JobInformation)new DummyJobInformation(jobId, "Test Job Sample Name"), TestingUtils.defaultExecutor(), (Executor)TestingUtils.defaultExecutor(), AkkaUtils.getDefaultTimeout(), (RestartStrategy)new InfiniteDelayRestartStrategy(10), (FailoverStrategy.Factory)new FailoverPipelinedRegionWithDirectExecutor(), (SlotProvider)scheduler);
        try {
            eg.attachJobGraph(ordered);
        }
        catch (JobException e) {
            e.printStackTrace();
            Assert.fail((String)("Job failed with exception: " + e.getMessage()));
        }
        eg.setScheduleMode(ScheduleMode.EAGER);
        eg.scheduleForExecution();
        RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
        ExecutionVertex ev11 = eg.getJobVertex(v2.getID()).getTaskVertices()[0];
        ExecutionVertex ev21 = eg.getJobVertex(v2.getID()).getTaskVertices()[0];
        ev21.getCurrentExecutionAttempt().fail((Throwable)new Exception("Fail with v1"));
        Assert.assertEquals((Object)JobStatus.CANCELLING, (Object)strategy.getFailoverRegion(ev21).getState());
        Assert.assertEquals((Object)JobStatus.CANCELLING, (Object)strategy.getFailoverRegion(ev11).getState());
    }

    @Test
    public void testFailWhileCancelling() throws Exception {
        InfiniteDelayRestartStrategy restartStrategy = new InfiniteDelayRestartStrategy();
        ExecutionGraph eg = FailoverRegionTest.createSingleRegionExecutionGraph(restartStrategy);
        RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
        Iterator iter = eg.getAllExecutionVertices().iterator();
        ExecutionVertex ev1 = (ExecutionVertex)iter.next();
        ev1.getCurrentExecutionAttempt().switchToRunning();
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)strategy.getFailoverRegion(ev1).getState());
        ev1.getCurrentExecutionAttempt().fail((Throwable)new Exception("new fail"));
        Assert.assertEquals((Object)JobStatus.CANCELLING, (Object)strategy.getFailoverRegion(ev1).getState());
        ExecutionVertex ev2 = (ExecutionVertex)iter.next();
        ev2.getCurrentExecutionAttempt().fail((Throwable)new Exception("new fail"));
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        Assert.assertEquals((Object)JobStatus.CANCELLING, (Object)strategy.getFailoverRegion(ev1).getState());
    }

    @Test
    public void testFailWhileRestarting() throws Exception {
        InfiniteDelayRestartStrategy restartStrategy = new InfiniteDelayRestartStrategy();
        ExecutionGraph eg = FailoverRegionTest.createSingleRegionExecutionGraph(restartStrategy);
        RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
        Iterator iter = eg.getAllExecutionVertices().iterator();
        ExecutionVertex ev1 = (ExecutionVertex)iter.next();
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)strategy.getFailoverRegion(ev1).getState());
        ev1.getCurrentExecutionAttempt().fail((Throwable)new Exception("new fail"));
        Assert.assertEquals((Object)JobStatus.CANCELLING, (Object)strategy.getFailoverRegion(ev1).getState());
        for (ExecutionVertex evs : eg.getAllExecutionVertices()) {
            evs.getCurrentExecutionAttempt().cancelingComplete();
        }
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)strategy.getFailoverRegion(ev1).getState());
        ev1.getCurrentExecutionAttempt().fail((Throwable)new Exception("new fail"));
        Assert.assertEquals((Object)JobStatus.CANCELLING, (Object)strategy.getFailoverRegion(ev1).getState());
    }

    private static ExecutionGraph createSingleRegionExecutionGraph(RestartStrategy restartStrategy) throws Exception {
        Instance instance = ExecutionGraphTestUtils.getInstance((TaskManagerGateway)new ActorTaskManagerGateway((ActorGateway)new ExecutionGraphTestUtils.SimpleActorGateway((ExecutionContext)TestingUtils.directExecutionContext())), 14);
        Scheduler scheduler = new Scheduler((Executor)TestingUtils.defaultExecutionContext());
        scheduler.newInstanceAvailable(instance);
        JobID jobId = new JobID();
        String jobName = "Test Job Sample Name";
        JobVertex v1 = new JobVertex("vertex1");
        JobVertex v2 = new JobVertex("vertex2");
        JobVertex v3 = new JobVertex("vertex3");
        v1.setParallelism(3);
        v2.setParallelism(2);
        v3.setParallelism(2);
        v1.setInvokableClass(AbstractInvokable.class);
        v2.setInvokableClass(AbstractInvokable.class);
        v3.setInvokableClass(AbstractInvokable.class);
        v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        v3.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
        ExecutionGraph eg = new ExecutionGraph((JobInformation)new DummyJobInformation(jobId, "Test Job Sample Name"), TestingUtils.defaultExecutor(), (Executor)TestingUtils.defaultExecutor(), AkkaUtils.getDefaultTimeout(), restartStrategy, (FailoverStrategy.Factory)new FailoverPipelinedRegionWithDirectExecutor(), (SlotProvider)scheduler);
        try {
            eg.attachJobGraph(ordered);
        }
        catch (JobException e) {
            e.printStackTrace();
            Assert.fail((String)("Job failed with exception: " + e.getMessage()));
        }
        eg.scheduleForExecution();
        return eg;
    }

    private static class FailoverPipelinedRegionWithDirectExecutor
    implements FailoverStrategy.Factory {
        private FailoverPipelinedRegionWithDirectExecutor() {
        }

        public FailoverStrategy create(ExecutionGraph executionGraph) {
            return new RestartPipelinedRegionStrategy(executionGraph, Executors.directExecutor());
        }
    }
}

