/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.concurrent.Executor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.DummyJobInformation;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.RestartPipelinedRegionStrategy;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;

public class PipelinedRegionFailoverConcurrencyTest
extends TestLogger {
    @Test
    public void testCancelWhileInLocalFailover() throws Exception {
        JobID jid = new JobID();
        int parallelism = 2;
        ManuallyTriggeredDirectExecutor executor = new ManuallyTriggeredDirectExecutor();
        SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, 2);
        ExecutionGraph graph = this.createSampleGraph(jid, new FailoverPipelinedRegionWithCustomExecutor((Executor)executor), (RestartStrategy)new FixedDelayRestartStrategy(Integer.MAX_VALUE, 0L), slotProvider, 2);
        ExecutionJobVertex ejv = (ExecutionJobVertex)graph.getVerticesTopologically().iterator().next();
        ExecutionVertex vertex1 = ejv.getTaskVertices()[0];
        ExecutionVertex vertex2 = ejv.getTaskVertices()[1];
        graph.scheduleForExecution();
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)graph.getState());
        vertex1.getCurrentExecutionAttempt().fail((Throwable)new Exception("test failure"));
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)vertex1.getCurrentExecutionAttempt().getState());
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)graph.getState());
        Assert.assertEquals((long)1L, (long)executor.numQueuedRunnables());
        graph.cancel();
        Assert.assertEquals((Object)JobStatus.CANCELLING, (Object)graph.getState());
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)vertex1.getCurrentExecutionAttempt().getState());
        Assert.assertEquals((Object)ExecutionState.CANCELING, (Object)vertex2.getCurrentExecutionAttempt().getState());
        executor.trigger();
        vertex2.getCurrentExecutionAttempt().cancelingComplete();
        Assert.assertEquals((Object)JobStatus.CANCELED, (Object)graph.getState());
        Assert.assertTrue((boolean)vertex1.getCurrentExecutionAttempt().getState().isTerminal());
        Assert.assertTrue((boolean)vertex2.getCurrentExecutionAttempt().getState().isTerminal());
        Assert.assertEquals((long)2L, (long)slotProvider.getNumberOfAvailableSlots());
    }

    @Test
    public void testGlobalFailureConcurrentToLocalFailover() throws Exception {
        JobID jid = new JobID();
        int parallelism = 2;
        ManuallyTriggeredDirectExecutor executor = new ManuallyTriggeredDirectExecutor();
        SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, 2);
        ExecutionGraph graph = this.createSampleGraph(jid, new FailoverPipelinedRegionWithCustomExecutor((Executor)executor), (RestartStrategy)new FixedDelayRestartStrategy(Integer.MAX_VALUE, 0L), slotProvider, 2);
        ExecutionJobVertex ejv = (ExecutionJobVertex)graph.getVerticesTopologically().iterator().next();
        ExecutionVertex vertex1 = ejv.getTaskVertices()[0];
        ExecutionVertex vertex2 = ejv.getTaskVertices()[1];
        graph.scheduleForExecution();
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)graph.getState());
        vertex1.getCurrentExecutionAttempt().fail((Throwable)new Exception("test failure"));
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)vertex1.getCurrentExecutionAttempt().getState());
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)graph.getState());
        Assert.assertEquals((long)1L, (long)executor.numQueuedRunnables());
        graph.failGlobal((Throwable)new SuppressRestartsException((Throwable)new Exception("test exception")));
        Assert.assertEquals((Object)JobStatus.FAILING, (Object)graph.getState());
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)vertex1.getCurrentExecutionAttempt().getState());
        Assert.assertEquals((Object)ExecutionState.CANCELING, (Object)vertex2.getCurrentExecutionAttempt().getState());
        executor.trigger();
        vertex2.getCurrentExecutionAttempt().cancelingComplete();
        Assert.assertEquals((Object)JobStatus.FAILED, (Object)graph.getState());
        Assert.assertTrue((boolean)vertex1.getCurrentExecutionAttempt().getState().isTerminal());
        Assert.assertTrue((boolean)vertex2.getCurrentExecutionAttempt().getState().isTerminal());
        Assert.assertEquals((long)2L, (long)slotProvider.getNumberOfAvailableSlots());
    }

    @Test
    public void testGlobalRecoveryConcurrentToLocalRecovery() throws Exception {
        JobID jid = new JobID();
        int parallelism = 2;
        ManuallyTriggeredDirectExecutor executor = new ManuallyTriggeredDirectExecutor();
        SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, 2);
        ExecutionGraph graph = this.createSampleGraph(jid, new FailoverPipelinedRegionWithCustomExecutor((Executor)executor), (RestartStrategy)new FixedDelayRestartStrategy(2, 0L), slotProvider, 2);
        RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)graph.getFailoverStrategy();
        ExecutionJobVertex ejv = (ExecutionJobVertex)graph.getVerticesTopologically().iterator().next();
        ExecutionVertex vertex1 = ejv.getTaskVertices()[0];
        ExecutionVertex vertex2 = ejv.getTaskVertices()[1];
        graph.scheduleForExecution();
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)graph.getState());
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)strategy.getFailoverRegion(vertex1).getState());
        vertex2.getCurrentExecutionAttempt().fail((Throwable)new Exception("test failure"));
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)vertex2.getCurrentExecutionAttempt().getState());
        Assert.assertEquals((Object)JobStatus.CANCELLING, (Object)strategy.getFailoverRegion(vertex2).getState());
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)graph.getState());
        Assert.assertEquals((long)1L, (long)executor.numQueuedRunnables());
        graph.failGlobal((Throwable)new Exception("test exception"));
        Assert.assertEquals((Object)JobStatus.FAILING, (Object)graph.getState());
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)vertex2.getCurrentExecutionAttempt().getState());
        Assert.assertEquals((Object)ExecutionState.CANCELING, (Object)vertex1.getCurrentExecutionAttempt().getState());
        vertex1.getCurrentExecutionAttempt().cancelingComplete();
        ExecutionGraphTestUtils.waitUntilJobStatus(graph, JobStatus.RUNNING, 1000L);
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)graph.getState());
        ExecutionGraphTestUtils.waitUntilExecutionState(vertex1.getCurrentExecutionAttempt(), ExecutionState.DEPLOYING, 1000L);
        ExecutionGraphTestUtils.waitUntilExecutionState(vertex2.getCurrentExecutionAttempt(), ExecutionState.DEPLOYING, 1000L);
        vertex1.getCurrentExecutionAttempt().switchToRunning();
        vertex2.getCurrentExecutionAttempt().switchToRunning();
        Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)vertex1.getCurrentExecutionAttempt().getState());
        Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)vertex2.getCurrentExecutionAttempt().getState());
        executor.trigger();
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)graph.getState());
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)strategy.getFailoverRegion(vertex1).getState());
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)strategy.getFailoverRegion(vertex2).getState());
        Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)vertex1.getCurrentExecutionAttempt().getState());
        Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)vertex2.getCurrentExecutionAttempt().getState());
        Assert.assertEquals((long)1L, (long)vertex1.getCurrentExecutionAttempt().getAttemptNumber());
        Assert.assertEquals((long)1L, (long)vertex2.getCurrentExecutionAttempt().getAttemptNumber());
        Assert.assertEquals((long)1L, (long)vertex1.getCopyOfPriorExecutionsList().size());
        Assert.assertEquals((long)1L, (long)vertex2.getCopyOfPriorExecutionsList().size());
        Assert.assertEquals((long)0L, (long)slotProvider.getNumberOfAvailableSlots());
        vertex2.getCurrentExecutionAttempt().fail((Throwable)new Exception("test failure"));
        Assert.assertEquals((long)1L, (long)executor.numQueuedRunnables());
        executor.trigger();
        ExecutionGraphTestUtils.waitUntilExecutionState(vertex2.getCurrentExecutionAttempt(), ExecutionState.DEPLOYING, 1000L);
        vertex2.getCurrentExecutionAttempt().switchToRunning();
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)graph.getState());
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)strategy.getFailoverRegion(vertex1).getState());
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)strategy.getFailoverRegion(vertex2).getState());
        Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)vertex1.getCurrentExecutionAttempt().getState());
        Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)vertex2.getCurrentExecutionAttempt().getState());
        Assert.assertEquals((long)1L, (long)vertex1.getCurrentExecutionAttempt().getAttemptNumber());
        Assert.assertEquals((long)2L, (long)vertex2.getCurrentExecutionAttempt().getAttemptNumber());
        Assert.assertEquals((long)1L, (long)vertex1.getCopyOfPriorExecutionsList().size());
        Assert.assertEquals((long)2L, (long)vertex2.getCopyOfPriorExecutionsList().size());
        Assert.assertEquals((long)0L, (long)slotProvider.getNumberOfAvailableSlots());
    }

    private ExecutionGraph createSampleGraph(JobID jid, FailoverStrategy.Factory failoverStrategy, RestartStrategy restartStrategy, SlotProvider slotProvider, int parallelism) throws Exception {
        DummyJobInformation jobInformation = new DummyJobInformation(jid, "test job");
        ExecutionGraph graph = new ExecutionGraph((JobInformation)jobInformation, TestingUtils.defaultExecutor(), (Executor)TestingUtils.defaultExecutor(), Time.seconds((long)10L), restartStrategy, failoverStrategy, slotProvider, ((Object)((Object)this)).getClass().getClassLoader(), (BlobWriter)VoidBlobWriter.getInstance());
        JobVertex jv = new JobVertex("test vertex");
        jv.setInvokableClass(NoOpInvokable.class);
        jv.setParallelism(parallelism);
        JobGraph jg = new JobGraph(jid, "testjob", new JobVertex[]{jv});
        graph.attachJobGraph(jg.getVerticesSortedTopologicallyFromSources());
        return graph;
    }

    private static class FailoverPipelinedRegionWithCustomExecutor
    implements FailoverStrategy.Factory {
        private final Executor executor;

        FailoverPipelinedRegionWithCustomExecutor(Executor executor) {
            this.executor = executor;
        }

        public FailoverStrategy create(ExecutionGraph executionGraph) {
            return new RestartPipelinedRegionStrategy(executionGraph, this.executor);
        }
    }
}

