/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.StoppingException;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.StoppableInvokable;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class ExecutionGraphStopTest
extends TestLogger {
    @Test
    public void testStopIfSourcesNotStoppable() throws Exception {
        ExecutionGraph graph = ExecutionGraphTestUtils.createSimpleTestGraph();
        try {
            graph.stop();
            Assert.fail((String)"exception expected");
        }
        catch (StoppingException stoppingException) {
            // empty catch block
        }
    }

    @Test
    public void testStop() throws Exception {
        SimpleSlot slot;
        int sourcePar1 = 11;
        int sourcePar2 = 7;
        JobVertex source1 = new JobVertex("source 1");
        source1.setInvokableClass(StoppableInvokable.class);
        source1.setParallelism(11);
        JobVertex source2 = new JobVertex("source 2");
        source2.setInvokableClass(StoppableInvokable.class);
        source2.setParallelism(7);
        JobVertex nonSource1 = new JobVertex("non-source-1");
        nonSource1.setInvokableClass(NoOpInvokable.class);
        nonSource1.setParallelism(10);
        JobVertex nonSource2 = new JobVertex("non-source-2");
        nonSource2.setInvokableClass(NoOpInvokable.class);
        nonSource2.setParallelism(10);
        nonSource1.connectNewDataSetAsInput(source1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        nonSource1.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        nonSource2.connectNewDataSetAsInput(nonSource1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        JobID jid = new JobID();
        ExecutionGraph eg = ExecutionGraphTestUtils.createSimpleTestGraph(jid, source1, source2, nonSource1, nonSource2);
        TaskManagerGateway sourceGateway = (TaskManagerGateway)Mockito.spy((Object)new SimpleAckingTaskManagerGateway());
        TaskManagerGateway nonSourceGateway = (TaskManagerGateway)Mockito.spy((Object)new SimpleAckingTaskManagerGateway());
        for (ExecutionVertex ev : eg.getJobVertex(source1.getID()).getTaskVertices()) {
            slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, sourceGateway);
            ev.getCurrentExecutionAttempt().tryAssignResource(slot);
            ev.getCurrentExecutionAttempt().deploy();
        }
        for (ExecutionVertex ev : eg.getJobVertex(source2.getID()).getTaskVertices()) {
            slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, sourceGateway);
            ev.getCurrentExecutionAttempt().tryAssignResource(slot);
            ev.getCurrentExecutionAttempt().deploy();
        }
        for (ExecutionVertex ev : eg.getJobVertex(nonSource1.getID()).getTaskVertices()) {
            slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, nonSourceGateway);
            ev.getCurrentExecutionAttempt().tryAssignResource(slot);
            ev.getCurrentExecutionAttempt().deploy();
        }
        for (ExecutionVertex ev : eg.getJobVertex(nonSource2.getID()).getTaskVertices()) {
            slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, nonSourceGateway);
            ev.getCurrentExecutionAttempt().tryAssignResource(slot);
            ev.getCurrentExecutionAttempt().deploy();
        }
        eg.stop();
        ((TaskManagerGateway)Mockito.verify((Object)sourceGateway, (VerificationMode)Mockito.timeout((long)1000L).times(18))).stopTask((ExecutionAttemptID)Mockito.any(ExecutionAttemptID.class), (Time)Mockito.any(Time.class));
        ((TaskManagerGateway)Mockito.verify((Object)nonSourceGateway, (VerificationMode)Mockito.times((int)0))).stopTask((ExecutionAttemptID)Mockito.any(ExecutionAttemptID.class), (Time)Mockito.any(Time.class));
        ExecutionGraphTestUtils.finishAllVertices(eg);
    }

    @Test
    public void testStopRpc() throws Exception {
        JobID jid = new JobID();
        JobVertex vertex = new JobVertex("vertex");
        vertex.setInvokableClass(NoOpInvokable.class);
        vertex.setParallelism(5);
        ExecutionGraph graph = ExecutionGraphTestUtils.createSimpleTestGraph(jid, vertex);
        Execution exec = graph.getJobVertex(vertex.getID()).getTaskVertices()[0].getCurrentExecutionAttempt();
        TaskManagerGateway gateway = (TaskManagerGateway)Mockito.mock(TaskManagerGateway.class);
        Mockito.when((Object)gateway.submitTask((TaskDeploymentDescriptor)Mockito.any(TaskDeploymentDescriptor.class), (Time)Mockito.any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
        Mockito.when((Object)gateway.stopTask((ExecutionAttemptID)Mockito.any(ExecutionAttemptID.class), (Time)Mockito.any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
        SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, gateway);
        exec.tryAssignResource(slot);
        exec.deploy();
        exec.switchToRunning();
        Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)exec.getState());
        exec.stop();
        Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)exec.getState());
        ((TaskManagerGateway)Mockito.verify((Object)gateway, (VerificationMode)Mockito.times((int)1))).stopTask((ExecutionAttemptID)Mockito.any(ExecutionAttemptID.class), (Time)Mockito.any(Time.class));
        exec.markFinished();
        Assert.assertEquals((Object)ExecutionState.FINISHED, (Object)exec.getState());
    }
}

