/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.DummyJobInformation;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.RestartIndividualStrategy;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class IndividualRestartsConcurrencyTest
extends TestLogger {
    @Test
    public void testCancelWhileInLocalFailover() throws Exception {
        JobID jid = new JobID();
        int parallelism = 2;
        ManuallyTriggeredDirectExecutor executor = new ManuallyTriggeredDirectExecutor();
        SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, 2);
        ExecutionGraph graph = this.createSampleGraph(jid, new IndividualFailoverWithCustomExecutor((Executor)executor), slotProvider, 2);
        ExecutionJobVertex ejv = (ExecutionJobVertex)graph.getVerticesTopologically().iterator().next();
        ExecutionVertex vertex1 = ejv.getTaskVertices()[0];
        ExecutionVertex vertex2 = ejv.getTaskVertices()[1];
        graph.scheduleForExecution();
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)graph.getState());
        vertex1.getCurrentExecutionAttempt().fail((Throwable)new Exception("test failure"));
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)vertex1.getCurrentExecutionAttempt().getState());
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)graph.getState());
        Assert.assertEquals((long)1L, (long)executor.numQueuedRunnables());
        graph.cancel();
        Assert.assertEquals((Object)JobStatus.CANCELLING, (Object)graph.getState());
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)vertex1.getCurrentExecutionAttempt().getState());
        Assert.assertEquals((Object)ExecutionState.CANCELING, (Object)vertex2.getCurrentExecutionAttempt().getState());
        executor.trigger();
        vertex2.getCurrentExecutionAttempt().cancelingComplete();
        Assert.assertEquals((Object)JobStatus.CANCELED, (Object)graph.getState());
        Assert.assertTrue((boolean)vertex1.getCurrentExecutionAttempt().getState().isTerminal());
        Assert.assertTrue((boolean)vertex2.getCurrentExecutionAttempt().getState().isTerminal());
        Assert.assertEquals((long)2L, (long)slotProvider.getNumberOfAvailableSlots());
    }

    @Test
    public void testGlobalFailureConcurrentToLocalFailover() throws Exception {
        JobID jid = new JobID();
        int parallelism = 2;
        ManuallyTriggeredDirectExecutor executor = new ManuallyTriggeredDirectExecutor();
        SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, 2);
        ExecutionGraph graph = this.createSampleGraph(jid, new IndividualFailoverWithCustomExecutor((Executor)executor), slotProvider, 2);
        ExecutionJobVertex ejv = (ExecutionJobVertex)graph.getVerticesTopologically().iterator().next();
        ExecutionVertex vertex1 = ejv.getTaskVertices()[0];
        ExecutionVertex vertex2 = ejv.getTaskVertices()[1];
        graph.scheduleForExecution();
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)graph.getState());
        vertex1.getCurrentExecutionAttempt().fail((Throwable)new Exception("test failure"));
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)vertex1.getCurrentExecutionAttempt().getState());
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)graph.getState());
        Assert.assertEquals((long)1L, (long)executor.numQueuedRunnables());
        graph.failGlobal((Throwable)new Exception("test exception"));
        Assert.assertEquals((Object)JobStatus.FAILING, (Object)graph.getState());
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)vertex1.getCurrentExecutionAttempt().getState());
        Assert.assertEquals((Object)ExecutionState.CANCELING, (Object)vertex2.getCurrentExecutionAttempt().getState());
        executor.trigger();
        vertex2.getCurrentExecutionAttempt().cancelingComplete();
        Assert.assertEquals((Object)JobStatus.FAILED, (Object)graph.getState());
        Assert.assertTrue((boolean)vertex1.getCurrentExecutionAttempt().getState().isTerminal());
        Assert.assertTrue((boolean)vertex2.getCurrentExecutionAttempt().getState().isTerminal());
        Assert.assertEquals((long)2L, (long)slotProvider.getNumberOfAvailableSlots());
    }

    @Test
    public void testGlobalRecoveryConcurrentToLocalRecovery() throws Exception {
        JobID jid = new JobID();
        int parallelism = 2;
        ManuallyTriggeredDirectExecutor executor = new ManuallyTriggeredDirectExecutor();
        SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, 2);
        ExecutionGraph graph = this.createSampleGraph(jid, new IndividualFailoverWithCustomExecutor((Executor)executor), (RestartStrategy)new FixedDelayRestartStrategy(1, 0L), slotProvider, 2);
        ExecutionJobVertex ejv = (ExecutionJobVertex)graph.getVerticesTopologically().iterator().next();
        ExecutionVertex vertex1 = ejv.getTaskVertices()[0];
        ExecutionVertex vertex2 = ejv.getTaskVertices()[1];
        graph.scheduleForExecution();
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)graph.getState());
        vertex2.getCurrentExecutionAttempt().fail((Throwable)new Exception("test failure"));
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)vertex2.getCurrentExecutionAttempt().getState());
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)graph.getState());
        Assert.assertEquals((long)1L, (long)executor.numQueuedRunnables());
        graph.failGlobal((Throwable)new Exception("test exception"));
        Assert.assertEquals((Object)JobStatus.FAILING, (Object)graph.getState());
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)vertex2.getCurrentExecutionAttempt().getState());
        Assert.assertEquals((Object)ExecutionState.CANCELING, (Object)vertex1.getCurrentExecutionAttempt().getState());
        vertex1.getCurrentExecutionAttempt().cancelingComplete();
        ExecutionGraphTestUtils.waitUntilJobStatus(graph, JobStatus.RUNNING, 1000L);
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)graph.getState());
        ExecutionGraphTestUtils.waitUntilExecutionState(vertex1.getCurrentExecutionAttempt(), ExecutionState.DEPLOYING, 1000L);
        ExecutionGraphTestUtils.waitUntilExecutionState(vertex2.getCurrentExecutionAttempt(), ExecutionState.DEPLOYING, 1000L);
        vertex1.getCurrentExecutionAttempt().switchToRunning();
        vertex2.getCurrentExecutionAttempt().switchToRunning();
        Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)vertex1.getCurrentExecutionAttempt().getState());
        Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)vertex2.getCurrentExecutionAttempt().getState());
        executor.trigger();
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)graph.getState());
        Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)vertex1.getCurrentExecutionAttempt().getState());
        Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)vertex2.getCurrentExecutionAttempt().getState());
        Assert.assertEquals((long)1L, (long)vertex1.getCurrentExecutionAttempt().getAttemptNumber());
        Assert.assertEquals((long)1L, (long)vertex2.getCurrentExecutionAttempt().getAttemptNumber());
        Assert.assertEquals((long)1L, (long)vertex1.getCopyOfPriorExecutionsList().size());
        Assert.assertEquals((long)1L, (long)vertex2.getCopyOfPriorExecutionsList().size());
        Assert.assertEquals((long)0L, (long)slotProvider.getNumberOfAvailableSlots());
    }

    @Test
    public void testLocalFailureFailsPendingCheckpoints() throws Exception {
        JobID jid = new JobID();
        int parallelism = 2;
        long verifyTimeout = 5000L;
        TaskManagerGateway taskManagerGateway = (TaskManagerGateway)Mockito.mock(TaskManagerGateway.class);
        Mockito.when((Object)taskManagerGateway.submitTask((TaskDeploymentDescriptor)Matchers.any(TaskDeploymentDescriptor.class), (Time)Matchers.any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
        Mockito.when((Object)taskManagerGateway.cancelTask((ExecutionAttemptID)Matchers.any(ExecutionAttemptID.class), (Time)Matchers.any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
        SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, 2, taskManagerGateway);
        ScheduledExecutorService executor = TestingUtils.defaultExecutor();
        CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = new CheckpointCoordinatorConfiguration(10L, 100000L, 1L, 3, ExternalizedCheckpointSettings.none(), true);
        ExecutionGraph graph = this.createSampleGraph(jid, new IndividualFailoverWithCustomExecutor(executor), slotProvider, 2);
        ArrayList allVertices = new ArrayList(graph.getAllVertices().values());
        StandaloneCheckpointIDCounter standaloneCheckpointIDCounter = new StandaloneCheckpointIDCounter();
        graph.enableCheckpointing(checkpointCoordinatorConfiguration.getCheckpointInterval(), checkpointCoordinatorConfiguration.getCheckpointTimeout(), checkpointCoordinatorConfiguration.getMinPauseBetweenCheckpoints(), checkpointCoordinatorConfiguration.getMaxConcurrentCheckpoints(), checkpointCoordinatorConfiguration.getExternalizedCheckpointSettings(), allVertices, allVertices, allVertices, Collections.emptyList(), (CheckpointIDCounter)standaloneCheckpointIDCounter, (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(1), "", (StateBackend)new MemoryStateBackend(), new CheckpointStatsTracker(1, allVertices, checkpointCoordinatorConfiguration, (MetricGroup)new UnregisteredTaskMetricsGroup()));
        CheckpointCoordinator checkpointCoordinator = graph.getCheckpointCoordinator();
        ExecutionJobVertex ejv = (ExecutionJobVertex)graph.getVerticesTopologically().iterator().next();
        ExecutionVertex vertex1 = ejv.getTaskVertices()[0];
        ExecutionVertex vertex2 = ejv.getTaskVertices()[1];
        graph.scheduleForExecution();
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)graph.getState());
        ((TaskManagerGateway)Mockito.verify((Object)taskManagerGateway, (VerificationMode)Mockito.timeout((long)5000L).times(2))).submitTask((TaskDeploymentDescriptor)Matchers.any(TaskDeploymentDescriptor.class), (Time)Matchers.any(Time.class));
        for (ExecutionVertex executionVertex : graph.getAllExecutionVertices()) {
            executionVertex.getCurrentExecutionAttempt().switchToRunning();
        }
        ((TaskManagerGateway)Mockito.verify((Object)taskManagerGateway, (VerificationMode)Mockito.timeout((long)5000L).times(3))).triggerCheckpoint((ExecutionAttemptID)Matchers.eq((Object)vertex1.getCurrentExecutionAttempt().getAttemptId()), (JobID)Matchers.any(JobID.class), Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
        ((TaskManagerGateway)Mockito.verify((Object)taskManagerGateway, (VerificationMode)Mockito.timeout((long)5000L).times(3))).triggerCheckpoint((ExecutionAttemptID)Matchers.eq((Object)vertex2.getCurrentExecutionAttempt().getAttemptId()), (JobID)Matchers.any(JobID.class), Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
        Assert.assertEquals((long)3L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        long checkpointToAcknowledge = standaloneCheckpointIDCounter.getLast();
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), vertex1.getCurrentExecutionAttempt().getAttemptId(), checkpointToAcknowledge));
        HashMap<Long, PendingCheckpoint> oldPendingCheckpoints = new HashMap<Long, PendingCheckpoint>(3);
        for (PendingCheckpoint pendingCheckpoint : checkpointCoordinator.getPendingCheckpoints().values()) {
            Assert.assertFalse((boolean)pendingCheckpoint.isDiscarded());
            oldPendingCheckpoints.put(pendingCheckpoint.getCheckpointId(), pendingCheckpoint);
        }
        vertex1.getCurrentExecutionAttempt().fail((Throwable)new Exception("test failure"));
        for (PendingCheckpoint pendingCheckpoint : oldPendingCheckpoints.values()) {
            if (pendingCheckpoint.getCheckpointId() == checkpointToAcknowledge) {
                Assert.assertFalse((boolean)pendingCheckpoint.isDiscarded());
                continue;
            }
            Assert.assertTrue((boolean)pendingCheckpoint.isDiscarded());
        }
    }

    private ExecutionGraph createSampleGraph(JobID jid, FailoverStrategy.Factory failoverStrategy, SlotProvider slotProvider, int parallelism) throws Exception {
        return this.createSampleGraph(jid, failoverStrategy, (RestartStrategy)new NoRestartStrategy(), slotProvider, parallelism);
    }

    private ExecutionGraph createSampleGraph(JobID jid, FailoverStrategy.Factory failoverStrategy, RestartStrategy restartStrategy, SlotProvider slotProvider, int parallelism) throws Exception {
        ExecutionGraph graph = new ExecutionGraph((JobInformation)new DummyJobInformation(jid, "test job"), TestingUtils.defaultExecutor(), (Executor)TestingUtils.defaultExecutor(), Time.seconds((long)10L), restartStrategy, failoverStrategy, slotProvider);
        JobVertex jv = new JobVertex("test vertex");
        jv.setInvokableClass(NoOpInvokable.class);
        jv.setParallelism(parallelism);
        JobGraph jg = new JobGraph(jid, "testjob", new JobVertex[]{jv});
        graph.attachJobGraph(jg.getVerticesSortedTopologicallyFromSources());
        return graph;
    }

    private static class IndividualFailoverWithCustomExecutor
    implements FailoverStrategy.Factory {
        private final Executor executor;

        IndividualFailoverWithCustomExecutor(Executor executor) {
            this.executor = executor;
        }

        public FailoverStrategy create(ExecutionGraph executionGraph) {
            return new RestartIndividualStrategy(executionGraph, this.executor);
        }
    }
}

