/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class ExecutionGraphSuspendTest
extends TestLogger {
    @Test
    public void testSuspendedOutOfCreated() throws Exception {
        TaskManagerGateway gateway = (TaskManagerGateway)Mockito.spy((Object)new SimpleAckingTaskManagerGateway());
        int parallelism = 10;
        ExecutionGraph eg = ExecutionGraphSuspendTest.createExecutionGraph(gateway, 10);
        Assert.assertEquals((Object)JobStatus.CREATED, (Object)eg.getState());
        eg.suspend((Throwable)new Exception("suspend"));
        Assert.assertEquals((Object)JobStatus.SUSPENDED, (Object)eg.getState());
        ExecutionGraphSuspendTest.validateAllVerticesInState(eg, ExecutionState.CANCELED);
        ExecutionGraphSuspendTest.validateCancelRpcCalls(gateway, 0);
        ExecutionGraphSuspendTest.ensureCannotLeaveSuspendedState(eg, gateway);
    }

    @Test
    public void testSuspendedOutOfDeploying() throws Exception {
        TaskManagerGateway gateway = (TaskManagerGateway)Mockito.spy((Object)new SimpleAckingTaskManagerGateway());
        int parallelism = 10;
        ExecutionGraph eg = ExecutionGraphSuspendTest.createExecutionGraph(gateway, 10);
        eg.scheduleForExecution();
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        ExecutionGraphSuspendTest.validateAllVerticesInState(eg, ExecutionState.DEPLOYING);
        eg.suspend((Throwable)new Exception("suspend"));
        Assert.assertEquals((Object)JobStatus.SUSPENDING, (Object)eg.getState());
        ExecutionGraphSuspendTest.validateAllVerticesInState(eg, ExecutionState.CANCELING);
        ExecutionGraphSuspendTest.validateCancelRpcCalls(gateway, 10);
        ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
        Assert.assertEquals((Object)JobStatus.SUSPENDED, (Object)eg.getState());
        ExecutionGraphSuspendTest.ensureCannotLeaveSuspendedState(eg, gateway);
    }

    @Test
    public void testSuspendedOutOfRunning() throws Exception {
        TaskManagerGateway gateway = (TaskManagerGateway)Mockito.spy((Object)new SimpleAckingTaskManagerGateway());
        int parallelism = 10;
        ExecutionGraph eg = ExecutionGraphSuspendTest.createExecutionGraph(gateway, 10);
        eg.scheduleForExecution();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        ExecutionGraphSuspendTest.validateAllVerticesInState(eg, ExecutionState.RUNNING);
        eg.suspend((Throwable)new Exception("suspend"));
        Assert.assertEquals((Object)JobStatus.SUSPENDING, (Object)eg.getState());
        ExecutionGraphSuspendTest.validateAllVerticesInState(eg, ExecutionState.CANCELING);
        ExecutionGraphSuspendTest.validateCancelRpcCalls(gateway, 10);
        ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
        Assert.assertEquals((Object)JobStatus.SUSPENDED, (Object)eg.getState());
        ExecutionGraphSuspendTest.ensureCannotLeaveSuspendedState(eg, gateway);
    }

    @Test
    public void testSuspendedOutOfFailing() throws Exception {
        TaskManagerGateway gateway = (TaskManagerGateway)Mockito.spy((Object)new SimpleAckingTaskManagerGateway());
        int parallelism = 10;
        ExecutionGraph eg = ExecutionGraphSuspendTest.createExecutionGraph(gateway, 10);
        eg.scheduleForExecution();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
        eg.failGlobal((Throwable)new Exception("fail global"));
        Assert.assertEquals((Object)JobStatus.FAILING, (Object)eg.getState());
        ExecutionGraphSuspendTest.validateCancelRpcCalls(gateway, 10);
        eg.suspend((Throwable)new Exception("suspend"));
        Assert.assertEquals((Object)JobStatus.SUSPENDING, (Object)eg.getState());
        ExecutionGraphSuspendTest.ensureCannotLeaveSuspendingState(eg, gateway);
        ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
        Assert.assertEquals((Object)JobStatus.SUSPENDED, (Object)eg.getState());
        ExecutionGraphSuspendTest.ensureCannotLeaveSuspendedState(eg, gateway);
    }

    @Test
    public void testSuspendedOutOfFailed() throws Exception {
        TaskManagerGateway gateway = (TaskManagerGateway)Mockito.spy((Object)new SimpleAckingTaskManagerGateway());
        int parallelism = 10;
        ExecutionGraph eg = ExecutionGraphSuspendTest.createExecutionGraph(gateway, 10);
        eg.scheduleForExecution();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
        eg.failGlobal((Throwable)new Exception("fail global"));
        Assert.assertEquals((Object)JobStatus.FAILING, (Object)eg.getState());
        ExecutionGraphSuspendTest.validateCancelRpcCalls(gateway, 10);
        ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
        Assert.assertEquals((Object)JobStatus.FAILED, (Object)eg.getState());
        eg.suspend((Throwable)new Exception("suspend"));
        Assert.assertEquals((Object)JobStatus.FAILED, (Object)eg.getState());
        ExecutionGraphSuspendTest.validateCancelRpcCalls(gateway, 10);
    }

    @Test
    public void testSuspendedOutOfCanceling() throws Exception {
        TaskManagerGateway gateway = (TaskManagerGateway)Mockito.spy((Object)new SimpleAckingTaskManagerGateway());
        int parallelism = 10;
        ExecutionGraph eg = ExecutionGraphSuspendTest.createExecutionGraph(gateway, 10);
        eg.scheduleForExecution();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
        eg.cancel();
        Assert.assertEquals((Object)JobStatus.CANCELLING, (Object)eg.getState());
        ExecutionGraphSuspendTest.validateCancelRpcCalls(gateway, 10);
        eg.suspend((Throwable)new Exception("suspend"));
        Assert.assertEquals((Object)JobStatus.SUSPENDING, (Object)eg.getState());
        ExecutionGraphSuspendTest.ensureCannotLeaveSuspendingState(eg, gateway);
        ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
        Assert.assertEquals((Object)JobStatus.SUSPENDED, (Object)eg.getState());
        ExecutionGraphSuspendTest.ensureCannotLeaveSuspendedState(eg, gateway);
    }

    @Test
    public void testSuspendedOutOfCanceled() throws Exception {
        TaskManagerGateway gateway = (TaskManagerGateway)Mockito.spy((Object)new SimpleAckingTaskManagerGateway());
        int parallelism = 10;
        ExecutionGraph eg = ExecutionGraphSuspendTest.createExecutionGraph(gateway, 10);
        eg.scheduleForExecution();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
        eg.cancel();
        Assert.assertEquals((Object)JobStatus.CANCELLING, (Object)eg.getState());
        ExecutionGraphSuspendTest.validateCancelRpcCalls(gateway, 10);
        ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
        Assert.assertEquals((Object)JobStatus.CANCELED, eg.getTerminationFuture().get());
        eg.suspend((Throwable)new Exception("suspend"));
        Assert.assertEquals((Object)JobStatus.CANCELED, (Object)eg.getState());
        ExecutionGraphSuspendTest.validateCancelRpcCalls(gateway, 10);
    }

    @Test
    public void testSuspendWhileRestarting() throws Exception {
        ExecutionGraph eg = ExecutionGraphTestUtils.createSimpleTestGraph(new InfiniteDelayRestartStrategy(10));
        eg.scheduleForExecution();
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
        eg.failGlobal((Throwable)new Exception("test"));
        Assert.assertEquals((Object)JobStatus.FAILING, (Object)eg.getState());
        ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
        Assert.assertEquals((Object)JobStatus.RESTARTING, (Object)eg.getState());
        Exception exception = new Exception("Suspended");
        eg.suspend((Throwable)exception);
        Assert.assertEquals((Object)JobStatus.SUSPENDED, (Object)eg.getState());
        Assert.assertEquals((Object)exception, (Object)eg.getFailureCause());
    }

    private static void ensureCannotLeaveSuspendedState(ExecutionGraph eg, TaskManagerGateway gateway) {
        Assert.assertEquals((Object)JobStatus.SUSPENDED, (Object)eg.getState());
        Mockito.reset((Object[])new TaskManagerGateway[]{gateway});
        eg.failGlobal((Throwable)new Exception("fail"));
        Assert.assertEquals((Object)JobStatus.SUSPENDED, (Object)eg.getState());
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{gateway});
        eg.cancel();
        Assert.assertEquals((Object)JobStatus.SUSPENDED, (Object)eg.getState());
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{gateway});
        eg.suspend((Throwable)new Exception("suspend again"));
        Assert.assertEquals((Object)JobStatus.SUSPENDED, (Object)eg.getState());
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{gateway});
        for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
            Assert.assertEquals((long)0L, (long)ev.getCurrentExecutionAttempt().getAttemptNumber());
        }
    }

    private static void ensureCannotLeaveSuspendingState(ExecutionGraph eg, TaskManagerGateway gateway) {
        Assert.assertEquals((Object)JobStatus.SUSPENDING, (Object)eg.getState());
        Mockito.reset((Object[])new TaskManagerGateway[]{gateway});
        eg.failGlobal((Throwable)new Exception("fail"));
        Assert.assertEquals((Object)JobStatus.SUSPENDING, (Object)eg.getState());
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{gateway});
        eg.cancel();
        Assert.assertEquals((Object)JobStatus.SUSPENDING, (Object)eg.getState());
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{gateway});
        eg.suspend((Throwable)new Exception("suspend again"));
        Assert.assertEquals((Object)JobStatus.SUSPENDING, (Object)eg.getState());
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{gateway});
        for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
            Assert.assertEquals((long)0L, (long)ev.getCurrentExecutionAttempt().getAttemptNumber());
        }
    }

    private static void validateAllVerticesInState(ExecutionGraph eg, ExecutionState expected) {
        for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
            Assert.assertEquals((Object)expected, (Object)ev.getCurrentExecutionAttempt().getState());
        }
    }

    private static void validateCancelRpcCalls(TaskManagerGateway gateway, int num) {
        ((TaskManagerGateway)Mockito.verify((Object)gateway, (VerificationMode)Mockito.times((int)num))).cancelTask((ExecutionAttemptID)Matchers.any(ExecutionAttemptID.class), (Time)Matchers.any(Time.class));
    }

    private static ExecutionGraph createExecutionGraph(TaskManagerGateway gateway, int parallelism) throws Exception {
        JobID jobId = new JobID();
        JobVertex vertex = new JobVertex("vertex");
        vertex.setInvokableClass(NoOpInvokable.class);
        vertex.setParallelism(parallelism);
        SimpleSlotProvider slotProvider = new SimpleSlotProvider(jobId, parallelism, gateway);
        return ExecutionGraphTestUtils.createSimpleTestGraph(jobId, slotProvider, (RestartStrategy)new FixedDelayRestartStrategy(0, 0L), vertex);
    }
}

