/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.net.InetAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
import org.apache.flink.runtime.executiongraph.ProgrammedSlotProvider;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.Slot;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.verification.Timeout;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;

public class ExecutionGraphSchedulingTest
extends TestLogger {
    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

    @After
    public void shutdown() {
        this.executor.shutdownNow();
    }

    @Test
    public void testScheduleSourceBeforeTarget() throws Exception {
        boolean parallelism = true;
        JobVertex sourceVertex = new JobVertex("source");
        sourceVertex.setParallelism(1);
        sourceVertex.setInvokableClass(NoOpInvokable.class);
        JobVertex targetVertex = new JobVertex("target");
        targetVertex.setParallelism(1);
        targetVertex.setInvokableClass(NoOpInvokable.class);
        targetVertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobID jobId = new JobID();
        JobGraph jobGraph = new JobGraph(jobId, "test", new JobVertex[]{sourceVertex, targetVertex});
        CompletableFuture<SimpleSlot> sourceFuture = new CompletableFuture<SimpleSlot>();
        CompletableFuture<SimpleSlot> targetFuture = new CompletableFuture<SimpleSlot>();
        ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1);
        slotProvider.addSlot(sourceVertex.getID(), 0, sourceFuture);
        slotProvider.addSlot(targetVertex.getID(), 0, targetFuture);
        ExecutionGraph eg = this.createExecutionGraph(jobGraph, slotProvider);
        TaskManagerGateway gatewaySource = ExecutionGraphSchedulingTest.createTaskManager();
        TaskManagerGateway gatewayTarget = ExecutionGraphSchedulingTest.createTaskManager();
        SimpleSlot sourceSlot = this.createSlot(gatewaySource, jobId);
        SimpleSlot targetSlot = this.createSlot(gatewayTarget, jobId);
        eg.setScheduleMode(ScheduleMode.EAGER);
        eg.setQueuedSchedulingAllowed(true);
        eg.scheduleForExecution();
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        targetFuture.complete(targetSlot);
        ((TaskManagerGateway)Mockito.verify((Object)gatewayTarget, (VerificationMode)new Timeout(50L, Mockito.times((int)0)))).submitTask((TaskDeploymentDescriptor)Mockito.any(TaskDeploymentDescriptor.class), (Time)Mockito.any(Time.class));
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        sourceFuture.complete(sourceSlot);
        ((TaskManagerGateway)Mockito.verify((Object)gatewaySource, (VerificationMode)Mockito.timeout((long)1000L))).submitTask((TaskDeploymentDescriptor)Mockito.any(TaskDeploymentDescriptor.class), (Time)Mockito.any(Time.class));
        ((TaskManagerGateway)Mockito.verify((Object)gatewayTarget, (VerificationMode)Mockito.timeout((long)1000L))).submitTask((TaskDeploymentDescriptor)Mockito.any(TaskDeploymentDescriptor.class), (Time)Mockito.any(Time.class));
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
    }

    @Test
    public void testDeployPipelinedConnectedComponentsTogether() throws Exception {
        int i;
        int parallelism = 8;
        JobVertex sourceVertex = new JobVertex("source");
        sourceVertex.setParallelism(8);
        sourceVertex.setInvokableClass(NoOpInvokable.class);
        JobVertex targetVertex = new JobVertex("target");
        targetVertex.setParallelism(8);
        targetVertex.setInvokableClass(NoOpInvokable.class);
        targetVertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobID jobId = new JobID();
        JobGraph jobGraph = new JobGraph(jobId, "test", new JobVertex[]{sourceVertex, targetVertex});
        CompletableFuture[] sourceFutures = new CompletableFuture[8];
        CompletableFuture[] targetFutures = new CompletableFuture[8];
        TaskManagerGateway[] sourceTaskManagers = new TaskManagerGateway[8];
        TaskManagerGateway[] targetTaskManagers = new TaskManagerGateway[8];
        SimpleSlot[] sourceSlots = new SimpleSlot[8];
        SimpleSlot[] targetSlots = new SimpleSlot[8];
        for (int i2 = 0; i2 < 8; ++i2) {
            sourceTaskManagers[i2] = ExecutionGraphSchedulingTest.createTaskManager();
            targetTaskManagers[i2] = ExecutionGraphSchedulingTest.createTaskManager();
            sourceSlots[i2] = this.createSlot(sourceTaskManagers[i2], jobId);
            targetSlots[i2] = this.createSlot(targetTaskManagers[i2], jobId);
            sourceFutures[i2] = new CompletableFuture();
            targetFutures[i2] = new CompletableFuture();
        }
        ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(8);
        slotProvider.addSlots(sourceVertex.getID(), sourceFutures);
        slotProvider.addSlots(targetVertex.getID(), targetFutures);
        ExecutionGraph eg = this.createExecutionGraph(jobGraph, slotProvider);
        for (i = 0; i < 8; i += 2) {
            sourceFutures[i].complete(sourceSlots[i]);
        }
        eg.setScheduleMode(ScheduleMode.EAGER);
        eg.setQueuedSchedulingAllowed(true);
        eg.scheduleForExecution();
        ExecutionGraphSchedulingTest.verifyNothingDeployed(eg, sourceTaskManagers);
        for (i = 1; i < 8; i += 2) {
            sourceFutures[i].complete(sourceSlots[i]);
        }
        ExecutionGraphSchedulingTest.verifyNothingDeployed(eg, sourceTaskManagers);
        for (i = 1; i < 8; ++i) {
            targetFutures[i].complete(targetSlots[i]);
        }
        ExecutionGraphSchedulingTest.verifyNothingDeployed(eg, targetTaskManagers);
        targetFutures[0].complete(targetSlots[0]);
        for (TaskManagerGateway gateway : sourceTaskManagers) {
            ((TaskManagerGateway)Mockito.verify((Object)gateway, (VerificationMode)Mockito.timeout((long)500L))).submitTask((TaskDeploymentDescriptor)Mockito.any(TaskDeploymentDescriptor.class), (Time)Mockito.any(Time.class));
        }
        for (TaskManagerGateway gateway : targetTaskManagers) {
            ((TaskManagerGateway)Mockito.verify((Object)gateway, (VerificationMode)Mockito.timeout((long)500L))).submitTask((TaskDeploymentDescriptor)Mockito.any(TaskDeploymentDescriptor.class), (Time)Mockito.any(Time.class));
        }
    }

    @Test
    public void testOneSlotFailureAbortsDeploy() throws Exception {
        int i;
        int parallelism = 6;
        JobVertex sourceVertex = new JobVertex("source");
        sourceVertex.setParallelism(6);
        sourceVertex.setInvokableClass(NoOpInvokable.class);
        JobVertex targetVertex = new JobVertex("target");
        targetVertex.setParallelism(6);
        targetVertex.setInvokableClass(NoOpInvokable.class);
        targetVertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        JobID jobId = new JobID();
        JobGraph jobGraph = new JobGraph(jobId, "test", new JobVertex[]{sourceVertex, targetVertex});
        TaskManagerGateway taskManager = (TaskManagerGateway)Mockito.mock(TaskManagerGateway.class);
        SlotOwner slotOwner = (SlotOwner)Mockito.mock(SlotOwner.class);
        SimpleSlot[] sourceSlots = new SimpleSlot[6];
        SimpleSlot[] targetSlots = new SimpleSlot[6];
        CompletableFuture[] sourceFutures = new CompletableFuture[6];
        CompletableFuture[] targetFutures = new CompletableFuture[6];
        for (int i2 = 0; i2 < 6; ++i2) {
            sourceSlots[i2] = this.createSlot(taskManager, jobId, slotOwner);
            targetSlots[i2] = this.createSlot(taskManager, jobId, slotOwner);
            sourceFutures[i2] = new CompletableFuture();
            targetFutures[i2] = new CompletableFuture();
        }
        ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(6);
        slotProvider.addSlots(sourceVertex.getID(), sourceFutures);
        slotProvider.addSlots(targetVertex.getID(), targetFutures);
        ExecutionGraph eg = this.createExecutionGraph(jobGraph, slotProvider);
        for (i = 0; i < 6; i += 2) {
            sourceFutures[i].complete(sourceSlots[i]);
            targetFutures[i].complete(targetSlots[i]);
        }
        eg.setScheduleMode(ScheduleMode.EAGER);
        eg.setQueuedSchedulingAllowed(true);
        eg.scheduleForExecution();
        sourceFutures[1].completeExceptionally(new TestRuntimeException());
        eg.getTerminationFuture().get(2000L, TimeUnit.MILLISECONDS);
        ((SlotOwner)Mockito.verify((Object)slotOwner, (VerificationMode)new Timeout(2000L, Mockito.times((int)6)))).returnAllocatedSlot((Slot)Mockito.any(Slot.class));
        ((TaskManagerGateway)Mockito.verify((Object)taskManager, (VerificationMode)Mockito.times((int)0))).submitTask((TaskDeploymentDescriptor)Mockito.any(TaskDeploymentDescriptor.class), (Time)Mockito.any(Time.class));
        for (i = 0; i < 6; i += 2) {
            Assert.assertTrue((boolean)sourceSlots[i].isCanceled());
            Assert.assertTrue((boolean)targetSlots[i].isCanceled());
        }
    }

    @Test
    public void testTimeoutForSlotAllocation() throws Exception {
        int parallelism = 3;
        JobVertex vertex = new JobVertex("task");
        vertex.setParallelism(3);
        vertex.setInvokableClass(NoOpInvokable.class);
        JobID jobId = new JobID();
        JobGraph jobGraph = new JobGraph(jobId, "test", new JobVertex[]{vertex});
        SlotOwner slotOwner = (SlotOwner)Mockito.mock(SlotOwner.class);
        TaskManagerGateway taskManager = (TaskManagerGateway)Mockito.mock(TaskManagerGateway.class);
        SimpleSlot[] slots = new SimpleSlot[3];
        CompletableFuture[] slotFutures = new CompletableFuture[3];
        for (int i = 0; i < 3; ++i) {
            slots[i] = this.createSlot(taskManager, jobId, slotOwner);
            slotFutures[i] = new CompletableFuture();
        }
        ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(3);
        slotProvider.addSlots(vertex.getID(), slotFutures);
        ExecutionGraph eg = this.createExecutionGraph(jobGraph, slotProvider, Time.milliseconds((long)20L));
        slotFutures[1].complete(slots[1]);
        eg.setScheduleMode(ScheduleMode.EAGER);
        eg.setQueuedSchedulingAllowed(true);
        eg.scheduleForExecution();
        slotFutures[2].complete(slots[2]);
        eg.getTerminationFuture().get(2000L, TimeUnit.MILLISECONDS);
        ((SlotOwner)Mockito.verify((Object)slotOwner, (VerificationMode)new Timeout(2000L, Mockito.times((int)2)))).returnAllocatedSlot((Slot)Mockito.any(Slot.class));
        ((TaskManagerGateway)Mockito.verify((Object)taskManager, (VerificationMode)Mockito.times((int)0))).submitTask((TaskDeploymentDescriptor)Mockito.any(TaskDeploymentDescriptor.class), (Time)Mockito.any(Time.class));
        for (CompletableFuture future : slotFutures) {
            if (!future.isDone()) continue;
            Assert.assertTrue((boolean)((SimpleSlot)future.get()).isCanceled());
        }
    }

    private ExecutionGraph createExecutionGraph(JobGraph jobGraph, SlotProvider slotProvider) throws Exception {
        return this.createExecutionGraph(jobGraph, slotProvider, Time.minutes((long)10L));
    }

    private ExecutionGraph createExecutionGraph(JobGraph jobGraph, SlotProvider slotProvider, Time timeout) throws Exception {
        return ExecutionGraphBuilder.buildGraph(null, (JobGraph)jobGraph, (Configuration)new Configuration(), (ScheduledExecutorService)this.executor, (Executor)this.executor, (SlotProvider)slotProvider, (ClassLoader)((Object)((Object)this)).getClass().getClassLoader(), (CheckpointRecoveryFactory)new StandaloneCheckpointRecoveryFactory(), (Time)timeout, (RestartStrategy)new NoRestartStrategy(), (MetricGroup)new UnregisteredMetricsGroup(), (int)1, (BlobWriter)VoidBlobWriter.getInstance(), (Logger)this.log);
    }

    private SimpleSlot createSlot(TaskManagerGateway taskManager, JobID jobId) {
        return this.createSlot(taskManager, jobId, (SlotOwner)Mockito.mock(SlotOwner.class));
    }

    private SimpleSlot createSlot(TaskManagerGateway taskManager, JobID jobId, SlotOwner slotOwner) {
        TaskManagerLocation location = new TaskManagerLocation(ResourceID.generate(), InetAddress.getLoopbackAddress(), 12345);
        AllocatedSlot slot = new AllocatedSlot(new AllocationID(), jobId, location, 0, ResourceProfile.UNKNOWN, taskManager);
        return new SimpleSlot(slot, slotOwner, 0);
    }

    private static TaskManagerGateway createTaskManager() {
        TaskManagerGateway tm = (TaskManagerGateway)Mockito.mock(TaskManagerGateway.class);
        Mockito.when((Object)tm.submitTask((TaskDeploymentDescriptor)Mockito.any(TaskDeploymentDescriptor.class), (Time)Mockito.any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
        return tm;
    }

    private static void verifyNothingDeployed(ExecutionGraph eg, TaskManagerGateway[] taskManagers) {
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        for (TaskManagerGateway gateway : taskManagers) {
            ((TaskManagerGateway)Mockito.verify((Object)gateway, (VerificationMode)new Timeout(50L, Mockito.times((int)0)))).submitTask((TaskDeploymentDescriptor)Mockito.any(TaskDeploymentDescriptor.class), (Time)Mockito.any(Time.class));
        }
    }

    private static class TestRuntimeException
    extends RuntimeException {
        private static final long serialVersionUID = 1L;

        private TestRuntimeException() {
        }
    }
}

