/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.Collection;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionEdge;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertexCancelTest;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotContext;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import scala.concurrent.ExecutionContext;

public class ExecutionVertexDeploymentTest
extends TestLogger {
    @Test
    public void testDeployCall() {
        try {
            JobVertexID jid = new JobVertexID();
            ExecutionJobVertex ejv = ExecutionGraphTestUtils.getExecutionVertex(jid);
            Instance instance = ExecutionGraphTestUtils.getInstance((TaskManagerGateway)new ActorTaskManagerGateway((ActorGateway)new ExecutionGraphTestUtils.SimpleActorGateway((ExecutionContext)TestingUtils.directExecutionContext())));
            SimpleSlot slot = instance.allocateSimpleSlot();
            ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            Assert.assertEquals((Object)ExecutionState.CREATED, (Object)vertex.getExecutionState());
            vertex.deployToSlot(slot);
            Assert.assertEquals((Object)ExecutionState.DEPLOYING, (Object)vertex.getExecutionState());
            try {
                vertex.deployToSlot(slot);
                Assert.fail((String)"Scheduled from wrong state");
            }
            catch (IllegalStateException illegalStateException) {
                // empty catch block
            }
            Assert.assertNull((Object)vertex.getFailureCause());
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CREATED) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0L ? 1 : 0) != 0);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testDeployWithSynchronousAnswer() {
        try {
            JobVertexID jid = new JobVertexID();
            ExecutionJobVertex ejv = ExecutionGraphTestUtils.getExecutionVertex(jid, new DirectScheduledExecutorService());
            Instance instance = ExecutionGraphTestUtils.getInstance((TaskManagerGateway)new ActorTaskManagerGateway((ActorGateway)new ExecutionGraphTestUtils.SimpleActorGateway((ExecutionContext)TestingUtils.directExecutionContext())));
            SimpleSlot slot = instance.allocateSimpleSlot();
            ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            Assert.assertEquals((Object)ExecutionState.CREATED, (Object)vertex.getExecutionState());
            vertex.deployToSlot(slot);
            Assert.assertEquals((Object)ExecutionState.DEPLOYING, (Object)vertex.getExecutionState());
            try {
                vertex.deployToSlot(slot);
                Assert.fail((String)"Scheduled from wrong state");
            }
            catch (IllegalStateException illegalStateException) {
                // empty catch block
            }
            Assert.assertNull((Object)vertex.getFailureCause());
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CREATED) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.RUNNING) == 0L ? 1 : 0) != 0);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testDeployWithAsynchronousAnswer() {
        try {
            JobVertexID jid = new JobVertexID();
            ExecutionJobVertex ejv = ExecutionGraphTestUtils.getExecutionVertex(jid);
            ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            Instance instance = ExecutionGraphTestUtils.getInstance((TaskManagerGateway)new ActorTaskManagerGateway((ActorGateway)new ExecutionGraphTestUtils.SimpleActorGateway((ExecutionContext)TestingUtils.defaultExecutionContext())));
            SimpleSlot slot = instance.allocateSimpleSlot();
            Assert.assertEquals((Object)ExecutionState.CREATED, (Object)vertex.getExecutionState());
            vertex.deployToSlot(slot);
            try {
                vertex.deployToSlot(slot);
                Assert.fail((String)"Scheduled from wrong state");
            }
            catch (IllegalStateException illegalStateException) {
                // empty catch block
            }
            Assert.assertEquals((Object)ExecutionState.DEPLOYING, (Object)vertex.getExecutionState());
            try {
                vertex.deployToSlot(slot);
                Assert.fail((String)"Scheduled from wrong state");
            }
            catch (IllegalStateException illegalStateException) {
                // empty catch block
            }
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CREATED) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.RUNNING) == 0L ? 1 : 0) != 0);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testDeployFailedSynchronous() {
        try {
            JobVertexID jid = new JobVertexID();
            ExecutionJobVertex ejv = ExecutionGraphTestUtils.getExecutionVertex(jid, new DirectScheduledExecutorService());
            ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            Instance instance = ExecutionGraphTestUtils.getInstance((TaskManagerGateway)new ActorTaskManagerGateway((ActorGateway)new ExecutionGraphTestUtils.SimpleFailingActorGateway((ExecutionContext)TestingUtils.directExecutionContext())));
            SimpleSlot slot = instance.allocateSimpleSlot();
            Assert.assertEquals((Object)ExecutionState.CREATED, (Object)vertex.getExecutionState());
            vertex.deployToSlot(slot);
            Assert.assertEquals((Object)ExecutionState.FAILED, (Object)vertex.getExecutionState());
            Assert.assertNotNull((Object)vertex.getFailureCause());
            Assert.assertTrue((boolean)vertex.getFailureCause().getMessage().contains("test_failure_error_message"));
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CREATED) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.FAILED) > 0L ? 1 : 0) != 0);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testDeployFailedAsynchronously() {
        try {
            JobVertexID jid = new JobVertexID();
            ExecutionJobVertex ejv = ExecutionGraphTestUtils.getExecutionVertex(jid);
            ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            Instance instance = ExecutionGraphTestUtils.getInstance((TaskManagerGateway)new ActorTaskManagerGateway((ActorGateway)new ExecutionGraphTestUtils.SimpleFailingActorGateway((ExecutionContext)TestingUtils.directExecutionContext())));
            SimpleSlot slot = instance.allocateSimpleSlot();
            Assert.assertEquals((Object)ExecutionState.CREATED, (Object)vertex.getExecutionState());
            vertex.deployToSlot(slot);
            for (int i = 0; i < 100 && (vertex.getExecutionState() != ExecutionState.FAILED || vertex.getFailureCause() == null); ++i) {
                Thread.sleep(10L);
            }
            Assert.assertEquals((Object)ExecutionState.FAILED, (Object)vertex.getExecutionState());
            Assert.assertNotNull((Object)vertex.getFailureCause());
            Assert.assertTrue((boolean)vertex.getFailureCause().getMessage().contains("test_failure_error_message"));
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CREATED) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.FAILED) > 0L ? 1 : 0) != 0);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testFailExternallyDuringDeploy() {
        try {
            JobVertexID jid = new JobVertexID();
            TestingUtils.QueuedActionExecutionContext ec = TestingUtils.queuedActionExecutionContext();
            TestingUtils.ActionQueue queue = ec.actionQueue();
            ExecutionJobVertex ejv = ExecutionGraphTestUtils.getExecutionVertex(jid, ec);
            ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            Instance instance = ExecutionGraphTestUtils.getInstance((TaskManagerGateway)new ActorTaskManagerGateway((ActorGateway)new ExecutionGraphTestUtils.SimpleActorGateway((ExecutionContext)TestingUtils.directExecutionContext())));
            SimpleSlot slot = instance.allocateSimpleSlot();
            Assert.assertEquals((Object)ExecutionState.CREATED, (Object)vertex.getExecutionState());
            vertex.deployToSlot(slot);
            Assert.assertEquals((Object)ExecutionState.DEPLOYING, (Object)vertex.getExecutionState());
            Exception testError = new Exception("test error");
            vertex.fail((Throwable)testError);
            Assert.assertEquals((Object)ExecutionState.FAILED, (Object)vertex.getExecutionState());
            Assert.assertEquals((Object)testError, (Object)vertex.getFailureCause());
            queue.triggerNextAction();
            queue.triggerNextAction();
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CREATED) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.FAILED) > 0L ? 1 : 0) != 0);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testFailCallOvertakesDeploymentAnswer() {
        try {
            TestingUtils.QueuedActionExecutionContext context = TestingUtils.queuedActionExecutionContext();
            TestingUtils.ActionQueue queue = context.actionQueue();
            JobVertexID jid = new JobVertexID();
            ExecutionJobVertex ejv = ExecutionGraphTestUtils.getExecutionVertex(jid, context);
            ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            ExecutionAttemptID eid = vertex.getCurrentExecutionAttempt().getAttemptId();
            Instance instance = ExecutionGraphTestUtils.getInstance((TaskManagerGateway)new ActorTaskManagerGateway((ActorGateway)new ExecutionVertexCancelTest.CancelSequenceActorGateway(context, 2)));
            SimpleSlot slot = instance.allocateSimpleSlot();
            Assert.assertEquals((Object)ExecutionState.CREATED, (Object)vertex.getExecutionState());
            vertex.deployToSlot(slot);
            Assert.assertEquals((Object)ExecutionState.DEPLOYING, (Object)vertex.getExecutionState());
            Exception testError = new Exception("test error");
            vertex.fail((Throwable)testError);
            Assert.assertEquals((Object)ExecutionState.FAILED, (Object)vertex.getExecutionState());
            Runnable deploy = queue.popNextAction();
            Runnable cancel1 = queue.popNextAction();
            cancel1.run();
            queue.triggerNextAction();
            queue.triggerNextAction();
            deploy.run();
            queue.triggerNextAction();
            Assert.assertEquals((Object)ExecutionState.FAILED, (Object)vertex.getExecutionState());
            Assert.assertEquals((Object)testError, (Object)vertex.getFailureCause());
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CREATED) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.FAILED) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((boolean)queue.isEmpty());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testTddProducedPartitionsLazyScheduling() throws Exception {
        TestingUtils.QueuedActionExecutionContext context = TestingUtils.queuedActionExecutionContext();
        ExecutionJobVertex jobVertex = ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID(), context);
        IntermediateResult result = new IntermediateResult(new IntermediateDataSetID(), jobVertex, 1, ResultPartitionType.PIPELINED);
        ExecutionVertex vertex = new ExecutionVertex(jobVertex, 0, new IntermediateResult[]{result}, Time.minutes((long)1L));
        ExecutionEdge mockEdge = this.createMockExecutionEdge(1);
        result.getPartitions()[0].addConsumerGroup();
        result.getPartitions()[0].addConsumer(mockEdge, 0);
        SlotContext slotContext = (SlotContext)Mockito.mock(SlotContext.class);
        Mockito.when((Object)slotContext.getAllocationId()).thenReturn((Object)new AllocationID());
        LogicalSlot slot = (LogicalSlot)Mockito.mock(LogicalSlot.class);
        Mockito.when((Object)slot.getAllocationId()).thenReturn((Object)new AllocationID());
        for (ScheduleMode mode : ScheduleMode.values()) {
            vertex.getExecutionGraph().setScheduleMode(mode);
            TaskDeploymentDescriptor tdd = vertex.createDeploymentDescriptor(new ExecutionAttemptID(), slot, null, 1);
            Collection producedPartitions = tdd.getProducedPartitions();
            Assert.assertEquals((long)1L, (long)producedPartitions.size());
            ResultPartitionDeploymentDescriptor desc = (ResultPartitionDeploymentDescriptor)producedPartitions.iterator().next();
            Assert.assertEquals((Object)mode.allowLazyDeployment(), (Object)desc.sendScheduleOrUpdateConsumersMessage());
        }
    }

    private ExecutionEdge createMockExecutionEdge(int maxParallelism) {
        ExecutionVertex targetVertex = (ExecutionVertex)Mockito.mock(ExecutionVertex.class);
        ExecutionJobVertex targetJobVertex = (ExecutionJobVertex)Mockito.mock(ExecutionJobVertex.class);
        Mockito.when((Object)targetVertex.getJobVertex()).thenReturn((Object)targetJobVertex);
        Mockito.when((Object)targetJobVertex.getMaxParallelism()).thenReturn((Object)maxParallelism);
        ExecutionEdge edge = (ExecutionEdge)Mockito.mock(ExecutionEdge.class);
        Mockito.when((Object)edge.getTarget()).thenReturn((Object)targetVertex);
        return edge;
    }
}

