/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.io.IOException;
import java.net.InetAddress;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.TestingSlotProvider;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.NotCancelAckingTaskGateway;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;

public class ExecutionGraphRestartTest
extends TestLogger {
    private static final int NUM_TASKS = 31;
    private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(4);

    @After
    public void shutdown() {
        this.executor.shutdownNow();
    }

    @Test
    public void testNoManualRestart() throws Exception {
        NoRestartStrategy restartStrategy = new NoRestartStrategy();
        Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = ExecutionGraphRestartTest.createExecutionGraph((RestartStrategy)restartStrategy);
        ExecutionGraph eg = (ExecutionGraph)executionGraphInstanceTuple.f0;
        ((ExecutionVertex)eg.getAllExecutionVertices().iterator().next()).fail((Throwable)new Exception("Test Exception"));
        this.completeCanceling(eg);
        Assert.assertEquals((Object)JobStatus.FAILED, (Object)eg.getState());
        eg.restart(eg.getGlobalModVersion());
        Assert.assertEquals((Object)JobStatus.FAILED, (Object)eg.getState());
    }

    private void completeCanceling(ExecutionGraph eg) {
        this.executeOperationForAllExecutions(eg, Execution::cancelingComplete);
    }

    private void executeOperationForAllExecutions(ExecutionGraph eg, Consumer<Execution> operation) {
        for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
            operation.accept(vertex.getCurrentExecutionAttempt());
        }
    }

    @Test
    public void testRestartAutomatically() throws Exception {
        FixedDelayRestartStrategy restartStrategy = new FixedDelayRestartStrategy(1, 0L);
        Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = ExecutionGraphRestartTest.createExecutionGraph((RestartStrategy)restartStrategy);
        ExecutionGraph eg = (ExecutionGraph)executionGraphInstanceTuple.f0;
        ExecutionGraphRestartTest.restartAfterFailure(eg, new FiniteDuration(2L, TimeUnit.MINUTES), true);
    }

    @Test
    public void testCancelWhileRestarting() throws Exception {
        InfiniteDelayRestartStrategy restartStrategy = new InfiniteDelayRestartStrategy();
        Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = ExecutionGraphRestartTest.createExecutionGraph(restartStrategy);
        ExecutionGraph executionGraph = (ExecutionGraph)executionGraphInstanceTuple.f0;
        Instance instance = (Instance)executionGraphInstanceTuple.f1;
        instance.markDead();
        Deadline deadline = TestingUtils.TESTING_DURATION().fromNow();
        while (deadline.hasTimeLeft() && executionGraph.getState() != JobStatus.RESTARTING) {
            Thread.sleep(100L);
        }
        Assert.assertEquals((Object)JobStatus.RESTARTING, (Object)executionGraph.getState());
        executionGraph.cancel();
        Assert.assertEquals((Object)JobStatus.CANCELED, (Object)executionGraph.getState());
        executionGraph.restart(executionGraph.getGlobalModVersion());
        Assert.assertEquals((Object)JobStatus.CANCELED, (Object)executionGraph.getState());
    }

    @Test
    public void testFailWhileRestarting() throws Exception {
        Scheduler scheduler = new Scheduler((Executor)TestingUtils.defaultExecutionContext());
        Instance instance = ExecutionGraphTestUtils.getInstance((TaskManagerGateway)new ActorTaskManagerGateway((ActorGateway)new ExecutionGraphTestUtils.SimpleActorGateway((ExecutionContext)TestingUtils.directExecutionContext())), 31);
        scheduler.newInstanceAvailable(instance);
        ExecutionGraph executionGraph = new ExecutionGraph(TestingUtils.defaultExecutor(), (Executor)TestingUtils.defaultExecutor(), new JobID(), "TestJob", new Configuration(), new SerializedValue((Object)new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), (RestartStrategy)new InfiniteDelayRestartStrategy(), (SlotProvider)scheduler);
        JobVertex jobVertex = new JobVertex("NoOpInvokable");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(31);
        JobGraph jobGraph = new JobGraph("TestJob", new JobVertex[]{jobVertex});
        executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals((Object)JobStatus.CREATED, (Object)executionGraph.getState());
        executionGraph.scheduleForExecution();
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)executionGraph.getState());
        instance.markDead();
        ExecutionGraphTestUtils.waitUntilJobStatus(executionGraph, JobStatus.RESTARTING, TestingUtils.TESTING_DURATION().toMillis());
        Assert.assertEquals((Object)JobStatus.RESTARTING, (Object)executionGraph.getState());
        long globalModVersion = executionGraph.getGlobalModVersion();
        Exception testException = new Exception("Test exception");
        executionGraph.failGlobal((Throwable)testException);
        Assert.assertNotEquals((long)globalModVersion, (long)executionGraph.getGlobalModVersion());
        Assert.assertEquals((Object)JobStatus.RESTARTING, (Object)executionGraph.getState());
        Assert.assertEquals((Object)testException, (Object)executionGraph.getFailureCause());
        executionGraph.failGlobal((Throwable)new SuppressRestartsException((Throwable)new Exception("Suppress restart exception")));
        Assert.assertEquals((Object)JobStatus.FAILED, (Object)executionGraph.getState());
        executionGraph.restart(executionGraph.getGlobalModVersion());
        Assert.assertEquals((Object)JobStatus.FAILED, (Object)executionGraph.getState());
    }

    @Test
    public void testCancelWhileFailing() throws Exception {
        InfiniteDelayRestartStrategy restartStrategy = new InfiniteDelayRestartStrategy();
        ExecutionGraph graph = (ExecutionGraph)ExecutionGraphRestartTest.createExecutionGraph((RestartStrategy)restartStrategy).f0;
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)graph.getState());
        for (ExecutionVertex vertex : ((ExecutionJobVertex)graph.getVerticesTopologically().iterator().next()).getTaskVertices()) {
            vertex.getCurrentExecutionAttempt().switchToRunning();
        }
        graph.failGlobal((Throwable)new Exception("test"));
        Assert.assertEquals((Object)JobStatus.FAILING, (Object)graph.getState());
        graph.cancel();
        Assert.assertEquals((Object)JobStatus.CANCELLING, (Object)graph.getState());
        this.completeCanceling(graph);
        Assert.assertEquals((Object)JobStatus.CANCELED, (Object)graph.getState());
    }

    @Test
    public void testFailWhileCanceling() throws Exception {
        NoRestartStrategy restartStrategy = new NoRestartStrategy();
        ExecutionGraph graph = (ExecutionGraph)ExecutionGraphRestartTest.createExecutionGraph((RestartStrategy)restartStrategy).f0;
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)graph.getState());
        this.switchAllTasksToRunning(graph);
        graph.cancel();
        Assert.assertEquals((Object)JobStatus.CANCELLING, (Object)graph.getState());
        graph.failGlobal((Throwable)new Exception("test"));
        Assert.assertEquals((Object)JobStatus.FAILING, (Object)graph.getState());
        this.completeCanceling(graph);
        Assert.assertEquals((Object)JobStatus.FAILED, (Object)graph.getState());
    }

    private void switchAllTasksToRunning(ExecutionGraph graph) {
        this.executeOperationForAllExecutions(graph, Execution::switchToRunning);
    }

    @Test
    public void testNoRestartOnSuppressException() throws Exception {
        ExecutionGraph eg = (ExecutionGraph)ExecutionGraphRestartTest.createExecutionGraph((RestartStrategy)new FixedDelayRestartStrategy((int)Integer.MAX_VALUE, (long)0L)).f0;
        ((ExecutionVertex)eg.getAllExecutionVertices().iterator().next()).fail((Throwable)new SuppressRestartsException((Throwable)new Exception("Test Exception")));
        Assert.assertEquals((Object)JobStatus.FAILING, (Object)eg.getState());
        this.completeCanceling(eg);
        eg.waitUntilTerminal();
        Assert.assertEquals((Object)JobStatus.FAILED, (Object)eg.getState());
        RestartStrategy restartStrategy = eg.getRestartStrategy();
        Assert.assertTrue((boolean)(restartStrategy instanceof FixedDelayRestartStrategy));
        Assert.assertEquals((long)0L, (long)((FixedDelayRestartStrategy)restartStrategy).getCurrentRestartAttempt());
    }

    @Test
    public void testFailingExecutionAfterRestart() throws Exception {
        Instance instance = ExecutionGraphTestUtils.getInstance((TaskManagerGateway)new ActorTaskManagerGateway((ActorGateway)new ExecutionGraphTestUtils.SimpleActorGateway((ExecutionContext)TestingUtils.directExecutionContext())), 2);
        Scheduler scheduler = new Scheduler((Executor)TestingUtils.defaultExecutionContext());
        scheduler.newInstanceAvailable(instance);
        JobVertex sender = ExecutionGraphTestUtils.createJobVertex("Task1", 1, NoOpInvokable.class);
        JobVertex receiver = ExecutionGraphTestUtils.createJobVertex("Task2", 1, NoOpInvokable.class);
        JobGraph jobGraph = new JobGraph("Pointwise job", new JobVertex[]{sender, receiver});
        ExecutionGraph eg = ExecutionGraphRestartTest.newExecutionGraph((RestartStrategy)new FixedDelayRestartStrategy(1, 0L), (SlotProvider)scheduler);
        eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals((Object)JobStatus.CREATED, (Object)eg.getState());
        eg.scheduleForExecution();
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        Iterator executionVertices = eg.getAllExecutionVertices().iterator();
        Execution finishedExecution = ((ExecutionVertex)executionVertices.next()).getCurrentExecutionAttempt();
        Execution failedExecution = ((ExecutionVertex)executionVertices.next()).getCurrentExecutionAttempt();
        finishedExecution.markFinished();
        failedExecution.fail((Throwable)new Exception("Test Exception"));
        failedExecution.cancelingComplete();
        FiniteDuration timeout = new FiniteDuration(2L, TimeUnit.MINUTES);
        ExecutionGraphRestartTest.waitForAsyncRestart(eg, timeout);
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        ExecutionGraphRestartTest.waitForAllResourcesToBeAssignedAfterAsyncRestart(eg, timeout.fromNow());
        for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
            Assert.assertNotNull((String)"No assigned resource (test instability).", (Object)vertex.getCurrentAssignedResource());
            vertex.getCurrentExecutionAttempt().switchToRunning();
        }
        finishedExecution.fail((Throwable)new Exception("This should have no effect"));
        for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
            vertex.getCurrentExecutionAttempt().markFinished();
        }
        Assert.assertEquals((Object)ExecutionState.FINISHED, (Object)finishedExecution.getState());
        Assert.assertEquals((Object)JobStatus.FINISHED, (Object)eg.getState());
    }

    @Test
    public void testFailExecutionAfterCancel() throws Exception {
        Instance instance = ExecutionGraphTestUtils.getInstance((TaskManagerGateway)new ActorTaskManagerGateway((ActorGateway)new ExecutionGraphTestUtils.SimpleActorGateway((ExecutionContext)TestingUtils.directExecutionContext())), 2);
        Scheduler scheduler = new Scheduler((Executor)TestingUtils.defaultExecutionContext());
        scheduler.newInstanceAvailable(instance);
        JobVertex vertex = ExecutionGraphTestUtils.createJobVertex("Test Vertex", 1, NoOpInvokable.class);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)Integer.MAX_VALUE, (long)Integer.MAX_VALUE));
        JobGraph jobGraph = new JobGraph("Test Job", new JobVertex[]{vertex});
        jobGraph.setExecutionConfig(executionConfig);
        ExecutionGraph eg = ExecutionGraphRestartTest.newExecutionGraph(new InfiniteDelayRestartStrategy(), (SlotProvider)scheduler);
        eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals((Object)JobStatus.CREATED, (Object)eg.getState());
        eg.scheduleForExecution();
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        eg.cancel();
        for (ExecutionVertex v : eg.getAllExecutionVertices()) {
            v.getCurrentExecutionAttempt().fail((Throwable)new Exception("Test Exception"));
        }
        Assert.assertEquals((Object)JobStatus.CANCELED, eg.getTerminationFuture().get());
        Execution execution = ((ExecutionVertex)eg.getAllExecutionVertices().iterator().next()).getCurrentExecutionAttempt();
        execution.cancelingComplete();
        Assert.assertEquals((Object)JobStatus.CANCELED, (Object)eg.getState());
    }

    @Test
    public void testFailExecutionGraphAfterCancel() throws Exception {
        Instance instance = ExecutionGraphTestUtils.getInstance((TaskManagerGateway)new ActorTaskManagerGateway((ActorGateway)new ExecutionGraphTestUtils.SimpleActorGateway((ExecutionContext)TestingUtils.directExecutionContext())), 2);
        Scheduler scheduler = new Scheduler((Executor)TestingUtils.defaultExecutionContext());
        scheduler.newInstanceAvailable(instance);
        JobVertex vertex = ExecutionGraphTestUtils.createJobVertex("Test Vertex", 1, NoOpInvokable.class);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)Integer.MAX_VALUE, (long)Integer.MAX_VALUE));
        JobGraph jobGraph = new JobGraph("Test Job", new JobVertex[]{vertex});
        jobGraph.setExecutionConfig(executionConfig);
        ExecutionGraph eg = ExecutionGraphRestartTest.newExecutionGraph(new InfiniteDelayRestartStrategy(), (SlotProvider)scheduler);
        eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals((Object)JobStatus.CREATED, (Object)eg.getState());
        eg.scheduleForExecution();
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        eg.cancel();
        Assert.assertEquals((Object)JobStatus.CANCELLING, (Object)eg.getState());
        eg.failGlobal((Throwable)new Exception("Test Exception"));
        Assert.assertEquals((Object)JobStatus.FAILING, (Object)eg.getState());
        Execution execution = ((ExecutionVertex)eg.getAllExecutionVertices().iterator().next()).getCurrentExecutionAttempt();
        execution.cancelingComplete();
        Assert.assertEquals((Object)JobStatus.RESTARTING, (Object)eg.getState());
    }

    @Test
    public void testSuspendWhileRestarting() throws Exception {
        Time timeout = Time.of((long)1L, (TimeUnit)TimeUnit.MINUTES);
        Instance instance = ExecutionGraphTestUtils.getInstance((TaskManagerGateway)new ActorTaskManagerGateway((ActorGateway)new ExecutionGraphTestUtils.SimpleActorGateway((ExecutionContext)TestingUtils.directExecutionContext())), 31);
        Scheduler scheduler = new Scheduler((Executor)TestingUtils.defaultExecutionContext());
        scheduler.newInstanceAvailable(instance);
        JobVertex sender = new JobVertex("Task");
        sender.setInvokableClass(NoOpInvokable.class);
        sender.setParallelism(31);
        JobGraph jobGraph = new JobGraph("Pointwise job", new JobVertex[]{sender});
        ControllableRestartStrategy controllableRestartStrategy = new ControllableRestartStrategy(timeout);
        ExecutionGraph eg = new ExecutionGraph(TestingUtils.defaultExecutor(), (Executor)TestingUtils.defaultExecutor(), new JobID(), "Test job", new Configuration(), new SerializedValue((Object)new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), (RestartStrategy)controllableRestartStrategy, (SlotProvider)scheduler);
        eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals((Object)JobStatus.CREATED, (Object)eg.getState());
        eg.scheduleForExecution();
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        instance.markDead();
        controllableRestartStrategy.getReachedCanRestart().await(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        Assert.assertEquals((Object)JobStatus.RESTARTING, (Object)eg.getState());
        eg.suspend((Throwable)new Exception("Test exception"));
        Assert.assertEquals((Object)JobStatus.SUSPENDED, (Object)eg.getState());
        controllableRestartStrategy.unlockRestart();
        controllableRestartStrategy.getRestartDone().await(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        Assert.assertEquals((Object)JobStatus.SUSPENDED, (Object)eg.getState());
    }

    @Test
    public void testConcurrentLocalFailAndRestart() throws Exception {
        int parallelism = 10;
        SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
        OneShotLatch restartLatch = new OneShotLatch();
        TriggeredRestartStrategy triggeredRestartStrategy = new TriggeredRestartStrategy(restartLatch);
        ExecutionGraph eg = ExecutionGraphTestUtils.createSimpleTestGraph(new JobID(), taskManagerGateway, (RestartStrategy)triggeredRestartStrategy, ExecutionGraphTestUtils.createNoOpVertex(10));
        WaitForTasks waitForTasks = new WaitForTasks(10);
        WaitForTasks waitForTasksCancelled = new WaitForTasks(10);
        taskManagerGateway.setSubmitConsumer(waitForTasks);
        taskManagerGateway.setCancelConsumer(waitForTasksCancelled);
        eg.setScheduleMode(ScheduleMode.EAGER);
        eg.scheduleForExecution();
        waitForTasks.getFuture().get(1000L, TimeUnit.MILLISECONDS);
        ExecutionGraphTestUtils.switchToRunning(eg);
        ExecutionJobVertex vertex = (ExecutionJobVertex)eg.getVerticesTopologically().iterator().next();
        final Execution first = vertex.getTaskVertices()[0].getCurrentExecutionAttempt();
        final Execution last = vertex.getTaskVertices()[vertex.getParallelism() - 1].getCurrentExecutionAttempt();
        final OneShotLatch failTrigger = new OneShotLatch();
        final CountDownLatch readyLatch = new CountDownLatch(2);
        Thread failure1 = new Thread(){

            @Override
            public void run() {
                readyLatch.countDown();
                try {
                    failTrigger.await();
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                first.fail((Throwable)new Exception("intended test failure 1"));
            }
        };
        Thread failure2 = new Thread(){

            @Override
            public void run() {
                readyLatch.countDown();
                try {
                    failTrigger.await();
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                last.fail((Throwable)new Exception("intended test failure 2"));
            }
        };
        failure1.start();
        failure2.start();
        readyLatch.await();
        failTrigger.trigger();
        ExecutionGraphTestUtils.waitUntilJobStatus(eg, JobStatus.FAILING, 1000L);
        WaitForTasks waitForTasksAfterRestart = new WaitForTasks(10);
        taskManagerGateway.setSubmitConsumer(waitForTasksAfterRestart);
        waitForTasksCancelled.getFuture().get(1000L, TimeUnit.MILLISECONDS);
        ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
        restartLatch.trigger();
        ExecutionGraphTestUtils.waitUntilJobStatus(eg, JobStatus.RUNNING, 1000L);
        waitForTasksAfterRestart.getFuture().get(1000L, TimeUnit.MILLISECONDS);
        ExecutionGraphTestUtils.switchToRunning(eg);
        ExecutionGraphTestUtils.finishAllVertices(eg);
        eg.waitUntilTerminal();
        Assert.assertEquals((Object)JobStatus.FINISHED, (Object)eg.getState());
    }

    @Test
    public void testConcurrentGlobalFailAndRestarts() throws Exception {
        OneShotLatch restartTrigger = new OneShotLatch();
        int parallelism = 10;
        JobID jid = new JobID();
        JobVertex vertex = ExecutionGraphTestUtils.createNoOpVertex(10);
        NotCancelAckingTaskGateway taskManagerGateway = new NotCancelAckingTaskGateway();
        SimpleSlotProvider slots = new SimpleSlotProvider(jid, 10, taskManagerGateway);
        TriggeredRestartStrategy restartStrategy = new TriggeredRestartStrategy(restartTrigger);
        ExecutionGraph eg = ExecutionGraphTestUtils.createSimpleTestGraph(jid, slots, (RestartStrategy)restartStrategy, vertex);
        WaitForTasks waitForTasks = new WaitForTasks(10);
        taskManagerGateway.setSubmitConsumer(waitForTasks);
        eg.setScheduleMode(ScheduleMode.EAGER);
        eg.scheduleForExecution();
        waitForTasks.getFuture().get(1000L, TimeUnit.MILLISECONDS);
        ExecutionGraphTestUtils.switchToRunning(eg);
        eg.failGlobal((Throwable)new Exception("intended test failure 1"));
        Assert.assertEquals((Object)JobStatus.FAILING, (Object)eg.getState());
        WaitForTasks waitForTasksRestart = new WaitForTasks(10);
        taskManagerGateway.setSubmitConsumer(waitForTasksRestart);
        ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
        ExecutionGraphTestUtils.waitUntilJobStatus(eg, JobStatus.RESTARTING, 1000L);
        eg.failGlobal((Throwable)new Exception("intended test failure 2"));
        Assert.assertEquals((Object)JobStatus.RESTARTING, (Object)eg.getState());
        restartTrigger.trigger();
        ExecutionGraphTestUtils.waitUntilJobStatus(eg, JobStatus.RUNNING, 1000L);
        waitForTasksRestart.getFuture().get(1000L, TimeUnit.MILLISECONDS);
        ExecutionGraphTestUtils.switchToRunning(eg);
        ExecutionGraphTestUtils.finishAllVertices(eg);
        eg.waitUntilTerminal();
        Assert.assertEquals((Object)JobStatus.FINISHED, (Object)eg.getState());
        if (eg.getNumberOfFullRestarts() > 2L) {
            Assert.fail((String)("Too many restarts: " + eg.getNumberOfFullRestarts()));
        }
    }

    @Test
    public void testRestartWithEagerSchedulingAndSlotSharing() throws Exception {
        Assert.assertTrue((String)"test assumptions violated", (((ThreadPoolExecutor)((Object)this.executor)).getCorePoolSize() > 1 ? 1 : 0) != 0);
        SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
        int parallelism = 20;
        Scheduler scheduler = this.createSchedulerWithInstances(20, taskManagerGateway);
        SlotSharingGroup sharingGroup = new SlotSharingGroup();
        JobVertex source = new JobVertex("source");
        source.setInvokableClass(NoOpInvokable.class);
        source.setParallelism(20);
        source.setSlotSharingGroup(sharingGroup);
        JobVertex sink = new JobVertex("sink");
        sink.setInvokableClass(NoOpInvokable.class);
        sink.setParallelism(20);
        sink.setSlotSharingGroup(sharingGroup);
        sink.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED);
        ExecutionGraph eg = ExecutionGraphTestUtils.createExecutionGraph(new JobID(), (SlotProvider)scheduler, (RestartStrategy)new FixedDelayRestartStrategy(Integer.MAX_VALUE, 0L), this.executor, source, sink);
        WaitForTasks waitForTasks = new WaitForTasks(40);
        taskManagerGateway.setSubmitConsumer(waitForTasks);
        eg.setScheduleMode(ScheduleMode.EAGER);
        eg.scheduleForExecution();
        waitForTasks.getFuture().get(1000L, TimeUnit.MILLISECONDS);
        ExecutionGraphTestUtils.switchToRunning(eg);
        ((ExecutionVertex)eg.getAllExecutionVertices().iterator().next()).getCurrentExecutionAttempt().fail((Throwable)new Exception("intended test failure"));
        Assert.assertEquals((Object)JobStatus.FAILING, (Object)eg.getState());
        WaitForTasks waitForTasksAfterRestart = new WaitForTasks(40);
        taskManagerGateway.setSubmitConsumer(waitForTasksAfterRestart);
        ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
        ExecutionGraphTestUtils.waitUntilJobStatus(eg, JobStatus.RUNNING, 1000L);
        waitForTasksAfterRestart.getFuture().get(1000L, TimeUnit.MILLISECONDS);
        ExecutionGraphTestUtils.switchToRunning(eg);
        ExecutionGraphTestUtils.finishAllVertices(eg);
        ExecutionGraphTestUtils.waitUntilJobStatus(eg, JobStatus.FINISHED, 1000L);
    }

    @Test
    public void testRestartWithSlotSharingAndNotEnoughResources() throws Exception {
        Assert.assertTrue((String)"test assumptions violated", (((ThreadPoolExecutor)((Object)this.executor)).getCorePoolSize() > 1 ? 1 : 0) != 0);
        int numRestarts = 10;
        int parallelism = 20;
        SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
        Scheduler scheduler = this.createSchedulerWithInstances(19, taskManagerGateway);
        SlotSharingGroup sharingGroup = new SlotSharingGroup();
        JobVertex source = new JobVertex("source");
        source.setInvokableClass(NoOpInvokable.class);
        source.setParallelism(20);
        source.setSlotSharingGroup(sharingGroup);
        JobVertex sink = new JobVertex("sink");
        sink.setInvokableClass(NoOpInvokable.class);
        sink.setParallelism(20);
        sink.setSlotSharingGroup(sharingGroup);
        sink.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED);
        ExecutionGraph eg = ExecutionGraphTestUtils.createExecutionGraph(new JobID(), (SlotProvider)scheduler, (RestartStrategy)new FixedDelayRestartStrategy(10, 0L), this.executor, source, sink);
        eg.setScheduleMode(ScheduleMode.EAGER);
        eg.scheduleForExecution();
        while (eg.getNumberOfFullRestarts() < 10L) {
            Thread.sleep(1L);
        }
        ExecutionGraphTestUtils.waitUntilJobStatus(eg, JobStatus.FAILED, 1000L);
        Throwable t = eg.getFailureCause();
        if (!(t instanceof NoResourceAvailableException)) {
            ExceptionUtils.rethrowException((Throwable)t, (String)t.getMessage());
        }
    }

    @Test
    public void testConcurrentFailureWhileRestarting() throws Exception {
        long timeout = 5000L;
        CountDownLatch countDownLatch = new CountDownLatch(2);
        CountDownLatchRestartStrategy restartStrategy = new CountDownLatchRestartStrategy(countDownLatch);
        ExecutionGraph executionGraph = ExecutionGraphRestartTest.createSimpleExecutionGraph(restartStrategy, new TestingSlotProvider(ignored -> new CompletableFuture()));
        executionGraph.setQueuedSchedulingAllowed(true);
        executionGraph.scheduleForExecution();
        Assert.assertThat((Object)executionGraph.getState(), (Matcher)Matchers.is((Object)JobStatus.RUNNING));
        executionGraph.failGlobal((Throwable)new FlinkException("Test exception"));
        this.executor.execute(() -> {
            countDownLatch.countDown();
            try {
                countDownLatch.await();
            }
            catch (InterruptedException e) {
                ExceptionUtils.rethrow((Throwable)e);
            }
            executionGraph.failGlobal((Throwable)new FlinkException("Concurrent exception"));
        });
        ExecutionGraphTestUtils.waitUntilJobStatus(executionGraph, JobStatus.RUNNING, 5000L);
    }

    private Scheduler createSchedulerWithInstances(int num, TaskManagerGateway taskManagerGateway) {
        Scheduler scheduler = new Scheduler((Executor)this.executor);
        Instance[] instances = new Instance[num];
        for (int i = 0; i < instances.length; ++i) {
            instances[i] = ExecutionGraphRestartTest.createInstance(taskManagerGateway, 55443 + i);
            scheduler.newInstanceAvailable(instances[i]);
        }
        return scheduler;
    }

    private static Instance createInstance(TaskManagerGateway taskManagerGateway, int port) {
        HardwareDescription resources = new HardwareDescription(4, 1000000000L, 500000000L, 400000000L);
        TaskManagerLocation location = new TaskManagerLocation(ResourceID.generate(), InetAddress.getLoopbackAddress(), port);
        return new Instance(taskManagerGateway, location, new InstanceID(), resources, 1);
    }

    private static Tuple2<ExecutionGraph, Instance> createExecutionGraph(RestartStrategy restartStrategy) throws Exception {
        Instance instance = ExecutionGraphTestUtils.getInstance((TaskManagerGateway)new ActorTaskManagerGateway((ActorGateway)new ExecutionGraphTestUtils.SimpleActorGateway((ExecutionContext)TestingUtils.directExecutionContext())), 31);
        Scheduler scheduler = new Scheduler((Executor)TestingUtils.defaultExecutionContext());
        scheduler.newInstanceAvailable(instance);
        ExecutionGraph eg = ExecutionGraphRestartTest.createSimpleExecutionGraph(restartStrategy, (SlotProvider)scheduler);
        Assert.assertEquals((Object)JobStatus.CREATED, (Object)eg.getState());
        eg.scheduleForExecution();
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        return new Tuple2((Object)eg, (Object)instance);
    }

    private static ExecutionGraph createSimpleExecutionGraph(RestartStrategy restartStrategy, SlotProvider slotProvider) throws IOException, JobException {
        JobGraph jobGraph = ExecutionGraphRestartTest.createJobGraph(31);
        ExecutionGraph eg = ExecutionGraphRestartTest.newExecutionGraph(restartStrategy, slotProvider);
        eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        return eg;
    }

    @Nonnull
    private static JobGraph createJobGraph(int parallelism) {
        JobVertex sender = ExecutionGraphTestUtils.createJobVertex("Task", parallelism, NoOpInvokable.class);
        return new JobGraph("Pointwise job", new JobVertex[]{sender});
    }

    private static ExecutionGraph newExecutionGraph(RestartStrategy restartStrategy, SlotProvider slotProvider) throws IOException {
        return new ExecutionGraph(TestingUtils.defaultExecutor(), (Executor)TestingUtils.defaultExecutor(), new JobID(), "Test job", new Configuration(), new SerializedValue((Object)new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), restartStrategy, slotProvider);
    }

    private static void restartAfterFailure(ExecutionGraph eg, FiniteDuration timeout, boolean haltAfterRestart) throws InterruptedException, TimeoutException {
        ExecutionGraphTestUtils.failExecutionGraph(eg, new Exception("Test Exception"));
        ExecutionGraphRestartTest.waitForAsyncRestart(eg, timeout);
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        Deadline deadline = timeout.fromNow();
        ExecutionGraphRestartTest.waitUntilAllExecutionsReachDeploying(eg, deadline);
        if (haltAfterRestart) {
            if (deadline.hasTimeLeft()) {
                ExecutionGraphRestartTest.haltExecution(eg);
            } else {
                Assert.fail((String)"Failed to wait until all execution attempts left the state DEPLOYING.");
            }
        }
    }

    private static void waitUntilAllExecutionsReachDeploying(ExecutionGraph eg, Deadline deadline) throws TimeoutException {
        ExecutionGraphTestUtils.waitForAllExecutionsPredicate(eg, ExecutionGraphTestUtils.isInExecutionState(ExecutionState.DEPLOYING), deadline.timeLeft().toMillis());
    }

    private static void waitForAllResourcesToBeAssignedAfterAsyncRestart(ExecutionGraph eg, Deadline deadline) throws TimeoutException {
        ExecutionGraphTestUtils.waitForAllExecutionsPredicate(eg, ExecutionGraphTestUtils.hasResourceAssigned, deadline.timeLeft().toMillis());
    }

    private static void waitForAsyncRestart(ExecutionGraph eg, FiniteDuration timeout) throws InterruptedException {
        Deadline deadline = timeout.fromNow();
        long waitingTime = 10L;
        while (deadline.hasTimeLeft() && eg.getState() != JobStatus.RUNNING) {
            Thread.sleep(waitingTime);
            waitingTime = Math.min(waitingTime << 1, 100L);
        }
    }

    private static void haltExecution(ExecutionGraph eg) {
        ExecutionGraphTestUtils.finishAllVertices(eg);
        Assert.assertEquals((Object)JobStatus.FINISHED, (Object)eg.getState());
    }

    public static class WaitForTasks
    implements Consumer<ExecutionAttemptID> {
        private final int tasksToWaitFor;
        private final CompletableFuture<Boolean> allTasksReceived;
        private final AtomicInteger counter;

        public WaitForTasks(int tasksToWaitFor) {
            this.tasksToWaitFor = tasksToWaitFor;
            this.allTasksReceived = new CompletableFuture();
            this.counter = new AtomicInteger();
        }

        public CompletableFuture<Boolean> getFuture() {
            return this.allTasksReceived;
        }

        @Override
        public void accept(ExecutionAttemptID executionAttemptID) {
            if (this.counter.incrementAndGet() >= this.tasksToWaitFor) {
                this.allTasksReceived.complete(true);
            }
        }
    }

    private static final class TriggeredRestartStrategy
    implements RestartStrategy {
        private final OneShotLatch latch;

        TriggeredRestartStrategy(OneShotLatch latch) {
            this.latch = latch;
        }

        public boolean canRestart() {
            return true;
        }

        public void restart(final RestartCallback restarter, ScheduledExecutor executor) {
            executor.execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        latch.await();
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    restarter.triggerFullRecovery();
                }
            });
        }
    }

    private static class ControllableRestartStrategy
    implements RestartStrategy {
        private final OneShotLatch reachedCanRestart = new OneShotLatch();
        private final OneShotLatch doRestart = new OneShotLatch();
        private final OneShotLatch restartDone = new OneShotLatch();
        private final Time timeout;
        private volatile Exception exception;

        public ControllableRestartStrategy(Time timeout) {
            this.timeout = timeout;
        }

        public void unlockRestart() {
            this.doRestart.trigger();
        }

        public Exception getException() {
            return this.exception;
        }

        public OneShotLatch getReachedCanRestart() {
            return this.reachedCanRestart;
        }

        public OneShotLatch getRestartDone() {
            return this.restartDone;
        }

        public boolean canRestart() {
            this.reachedCanRestart.trigger();
            return true;
        }

        public void restart(final RestartCallback restarter, ScheduledExecutor executor) {
            executor.execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        doRestart.await(timeout.getSize(), timeout.getUnit());
                        restarter.triggerFullRecovery();
                    }
                    catch (Exception e) {
                        exception = e;
                    }
                    restartDone.trigger();
                }
            });
        }
    }

    private static final class CountDownLatchRestartStrategy
    implements RestartStrategy {
        private final CountDownLatch countDownLatch;

        private CountDownLatchRestartStrategy(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }

        public boolean canRestart() {
            return true;
        }

        public void restart(RestartCallback restarter, ScheduledExecutor executor) {
            executor.execute(() -> {
                this.countDownLatch.countDown();
                try {
                    this.countDownLatch.await();
                }
                catch (InterruptedException e) {
                    ExceptionUtils.rethrow((Throwable)e);
                }
                restarter.triggerFullRecovery();
            });
        }
    }
}

