/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskmanager;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Kill;
import akka.actor.Props;
import akka.actor.Status;
import akka.japi.Creator;
import akka.testkit.JavaTestKit;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.FlinkUntypedActor;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.RegistrationMessages;
import org.apache.flink.runtime.messages.StackTraceSampleMessages;
import org.apache.flink.runtime.messages.StackTraceSampleResponse;
import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.runtime.messages.TaskMessages;
import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testutils.StoppableInvokable;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import scala.util.Failure;

public class TaskManagerTest
extends TestLogger {
    private static final Logger LOG = LoggerFactory.getLogger(TaskManagerTest.class);
    private static final FiniteDuration timeout = new FiniteDuration(1L, TimeUnit.MINUTES);
    private static final FiniteDuration d = new FiniteDuration(60L, TimeUnit.SECONDS);
    private static final Time timeD = Time.seconds((long)60L);
    private static ActorSystem system;
    static final UUID LEADER_SESSION_ID;
    private TestingHighAvailabilityServices highAvailabilityServices;

    @BeforeClass
    public static void setup() {
        system = AkkaUtils.createLocalActorSystem((Configuration)new Configuration());
    }

    @AfterClass
    public static void teardown() {
        JavaTestKit.shutdownActorSystem((ActorSystem)system);
    }

    @Before
    public void setupTest() {
        this.highAvailabilityServices = new TestingHighAvailabilityServices();
    }

    @After
    public void tearDownTest() throws Exception {
        if (this.highAvailabilityServices != null) {
            this.highAvailabilityServices.closeAndCleanupAllData();
            this.highAvailabilityServices = null;
        }
    }

    @Test
    public void testSubmitAndExecuteTask() throws IOException {
        new JavaTestKit(system){
            {
                ActorGateway taskManager = null;
                final ActorGateway jobManager = TestingUtils.createForwardingActor(system, this.getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID, (Option<String>)Option.empty());
                TaskManagerTest.this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, (LeaderRetrievalService)new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
                try {
                    final ActorGateway tm = taskManager = TestingUtils.createTaskManager(system, TaskManagerTest.this.highAvailabilityServices, new Configuration(), true, false);
                    new JavaTestKit.Within(d){

                        protected void run() {
                            this.expectMsgClass(RegistrationMessages.RegisterTaskManager.class);
                            InstanceID iid = new InstanceID();
                            Assert.assertEquals((Object)tm.actor(), (Object)this.getLastSender());
                            tm.tell((Object)new RegistrationMessages.AcknowledgeRegistration(iid, 12345), jobManager);
                        }
                    };
                    final JobID jid = new JobID();
                    JobVertexID vid = new JobVertexID();
                    final ExecutionAttemptID eid = new ExecutionAttemptID();
                    SerializedValue executionConfig = new SerializedValue((Object)new ExecutionConfig());
                    final TaskDeploymentDescriptor tdd = TaskManagerTest.createTaskDeploymentDescriptor(jid, "TestJob", vid, eid, (SerializedValue<ExecutionConfig>)executionConfig, "TestTask", 7, 2, 7, 0, new Configuration(), new Configuration(), TestInvokableCorrect.class.getName(), Collections.emptyList(), Collections.emptyList(), new ArrayList(), Collections.emptyList(), 0);
                    new JavaTestKit.Within(d){

                        protected void run() {
                            Object message;
                            Object message2;
                            tm.tell((Object)new TaskMessages.SubmitTask(tdd), jobManager);
                            long deadline = System.currentTimeMillis() + 10000L;
                            while (!(message2 = this.receiveOne((Duration)d)).equals(Acknowledge.get()) && System.currentTimeMillis() < deadline) {
                            }
                            TaskMessages.UpdateTaskExecutionState toRunning = new TaskMessages.UpdateTaskExecutionState(new TaskExecutionState(jid, eid, ExecutionState.RUNNING));
                            TaskMessages.UpdateTaskExecutionState toFinished = new TaskMessages.UpdateTaskExecutionState(new TaskExecutionState(jid, eid, ExecutionState.FINISHED));
                            deadline = System.currentTimeMillis() + 10000L;
                            while (!(message = this.receiveOne((Duration)d)).equals(toRunning)) {
                                if (!(message instanceof TaskManagerMessages.Heartbeat)) {
                                    Assert.fail((String)("Unexpected message: " + message));
                                }
                                if (System.currentTimeMillis() < deadline) continue;
                            }
                            deadline = System.currentTimeMillis() + 10000L;
                            while (!(message = this.receiveOne((Duration)d)).equals(toFinished)) {
                                if (!(message instanceof TaskManagerMessages.Heartbeat)) {
                                    Assert.fail((String)("Unexpected message: " + message));
                                }
                                if (System.currentTimeMillis() < deadline) continue;
                            }
                        }
                    };
                }
                catch (Throwable throwable) {
                    TestingUtils.stopActor(taskManager);
                    TestingUtils.stopActor(jobManager);
                    throw throwable;
                }
                TestingUtils.stopActor(taskManager);
                TestingUtils.stopActor(jobManager);
            }
        };
    }

    @Test
    public void testJobSubmissionAndCanceling() {
        new JavaTestKit(system){
            {
                block5: {
                    super(x0);
                    AkkaActorGateway jobManager = null;
                    ActorGateway taskManager = null;
                    AkkaActorGateway testActorGateway = new AkkaActorGateway(this.getTestActor(), LEADER_SESSION_ID);
                    try {
                        ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, (Object[])new Object[]{LEADER_SESSION_ID}));
                        jobManager = new AkkaActorGateway(jm, LEADER_SESSION_ID);
                        TaskManagerTest.this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, (LeaderRetrievalService)new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
                        taskManager = TestingUtils.createTaskManager(system, TaskManagerTest.this.highAvailabilityServices, new Configuration(), true, true);
                        JobID jid1 = new JobID();
                        JobID jid2 = new JobID();
                        JobVertexID vid1 = new JobVertexID();
                        JobVertexID vid2 = new JobVertexID();
                        final ExecutionAttemptID eid1 = new ExecutionAttemptID();
                        final ExecutionAttemptID eid2 = new ExecutionAttemptID();
                        final TaskDeploymentDescriptor tdd1 = TaskManagerTest.createTaskDeploymentDescriptor(jid1, "TestJob1", vid1, eid1, (SerializedValue<ExecutionConfig>)new SerializedValue((Object)new ExecutionConfig()), "TestTask1", 5, 1, 5, 0, new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(), Collections.emptyList(), Collections.emptyList(), new ArrayList(), Collections.emptyList(), 0);
                        TaskDeploymentDescriptor tdd2 = TaskManagerTest.createTaskDeploymentDescriptor(jid2, "TestJob2", vid2, eid2, (SerializedValue<ExecutionConfig>)new SerializedValue((Object)new ExecutionConfig()), "TestTask2", 7, 2, 7, 0, new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(), Collections.emptyList(), Collections.emptyList(), new ArrayList(), Collections.emptyList(), 0);
                        final ActorGateway tm = taskManager;
                        new JavaTestKit.Within(d, (ActorGateway)testActorGateway, tdd2){
                            final /* synthetic */ ActorGateway val$testActorGateway;
                            final /* synthetic */ TaskDeploymentDescriptor val$tdd2;
                            {
                                this.val$testActorGateway = actorGateway2;
                                this.val$tdd2 = taskDeploymentDescriptor2;
                                super((JavaTestKit)this, x0);
                            }

                            protected void run() {
                                try {
                                    Future t1Running = tm.ask((Object)new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(eid1), timeout);
                                    Future t2Running = tm.ask((Object)new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(eid2), timeout);
                                    tm.tell((Object)new TaskMessages.SubmitTask(tdd1), this.val$testActorGateway);
                                    tm.tell((Object)new TaskMessages.SubmitTask(this.val$tdd2), this.val$testActorGateway);
                                    this.expectMsgEquals(Acknowledge.get());
                                    this.expectMsgEquals(Acknowledge.get());
                                    Await.ready((Awaitable)t1Running, (Duration)d);
                                    Await.ready((Awaitable)t2Running, (Duration)d);
                                    tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), this.val$testActorGateway);
                                    Map<ExecutionAttemptID, Task> runningTasks = ((TestingTaskManagerMessages.ResponseRunningTasks)this.expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks.class)).asJava();
                                    Assert.assertEquals((long)2L, (long)runningTasks.size());
                                    Task t1 = runningTasks.get(eid1);
                                    Task t2 = runningTasks.get(eid2);
                                    Assert.assertNotNull((Object)t1);
                                    Assert.assertNotNull((Object)t2);
                                    Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)t1.getExecutionState());
                                    Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)t2.getExecutionState());
                                    tm.tell((Object)new TaskMessages.CancelTask(eid1), this.val$testActorGateway);
                                    this.expectMsgEquals(Acknowledge.get());
                                    Future response = tm.ask((Object)new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1), timeout);
                                    Await.ready((Awaitable)response, (Duration)d);
                                    Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)t1.getExecutionState());
                                    tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), this.val$testActorGateway);
                                    runningTasks = ((TestingTaskManagerMessages.ResponseRunningTasks)this.expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks.class)).asJava();
                                    Assert.assertEquals((long)1L, (long)runningTasks.size());
                                    tm.tell((Object)new TaskMessages.CancelTask(eid1), this.val$testActorGateway);
                                    this.expectMsgEquals(Acknowledge.get());
                                    tm.tell((Object)new TaskMessages.CancelTask(eid2), this.val$testActorGateway);
                                    this.expectMsgEquals(Acknowledge.get());
                                    response = tm.ask((Object)new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2), timeout);
                                    Await.ready((Awaitable)response, (Duration)d);
                                    Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)t2.getExecutionState());
                                    tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), this.val$testActorGateway);
                                    runningTasks = ((TestingTaskManagerMessages.ResponseRunningTasks)this.expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks.class)).asJava();
                                    Assert.assertEquals((long)0L, (long)runningTasks.size());
                                }
                                catch (Exception e) {
                                    e.printStackTrace();
                                    Assert.fail((String)e.getMessage());
                                }
                            }
                        };
                        TestingUtils.stopActor(taskManager);
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                        Assert.fail((String)e.getMessage());
                        break block5;
                    }
                    finally {
                        TestingUtils.stopActor(taskManager);
                        TestingUtils.stopActor(jobManager);
                    }
                    TestingUtils.stopActor((ActorGateway)jobManager);
                }
            }
        };
    }

    @Test
    public void testJobSubmissionAndStop() throws Exception {
        new JavaTestKit(system){
            {
                AkkaActorGateway jobManager = null;
                ActorGateway taskManager = null;
                AkkaActorGateway testActorGateway = new AkkaActorGateway(this.getTestActor(), LEADER_SESSION_ID);
                try {
                    ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, (Object[])new Object[]{LEADER_SESSION_ID}));
                    jobManager = new AkkaActorGateway(jm, LEADER_SESSION_ID);
                    TaskManagerTest.this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, (LeaderRetrievalService)new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
                    taskManager = TestingUtils.createTaskManager(system, TaskManagerTest.this.highAvailabilityServices, new Configuration(), true, true);
                    JobID jid1 = new JobID();
                    JobID jid2 = new JobID();
                    JobVertexID vid1 = new JobVertexID();
                    JobVertexID vid2 = new JobVertexID();
                    final ExecutionAttemptID eid1 = new ExecutionAttemptID();
                    final ExecutionAttemptID eid2 = new ExecutionAttemptID();
                    SerializedValue executionConfig = new SerializedValue((Object)new ExecutionConfig());
                    final TaskDeploymentDescriptor tdd1 = TaskManagerTest.createTaskDeploymentDescriptor(jid1, "TestJob", vid1, eid1, (SerializedValue<ExecutionConfig>)executionConfig, "TestTask1", 5, 1, 5, 0, new Configuration(), new Configuration(), StoppableInvokable.class.getName(), Collections.emptyList(), Collections.emptyList(), new ArrayList(), Collections.emptyList(), 0);
                    TaskDeploymentDescriptor tdd2 = TaskManagerTest.createTaskDeploymentDescriptor(jid2, "TestJob", vid2, eid2, (SerializedValue<ExecutionConfig>)executionConfig, "TestTask2", 7, 2, 7, 0, new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(), Collections.emptyList(), Collections.emptyList(), new ArrayList(), Collections.emptyList(), 0);
                    final ActorGateway tm = taskManager;
                    new JavaTestKit.Within(d, (ActorGateway)testActorGateway, tdd2){
                        final /* synthetic */ ActorGateway val$testActorGateway;
                        final /* synthetic */ TaskDeploymentDescriptor val$tdd2;
                        {
                            this.val$testActorGateway = actorGateway2;
                            this.val$tdd2 = taskDeploymentDescriptor2;
                            super((JavaTestKit)this, x0);
                        }

                        protected void run() {
                            try {
                                Future t1Running = tm.ask((Object)new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(eid1), timeout);
                                Future t2Running = tm.ask((Object)new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(eid2), timeout);
                                tm.tell((Object)new TaskMessages.SubmitTask(tdd1), this.val$testActorGateway);
                                tm.tell((Object)new TaskMessages.SubmitTask(this.val$tdd2), this.val$testActorGateway);
                                this.expectMsgEquals(Acknowledge.get());
                                this.expectMsgEquals(Acknowledge.get());
                                Await.ready((Awaitable)t1Running, (Duration)d);
                                Await.ready((Awaitable)t2Running, (Duration)d);
                                tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), this.val$testActorGateway);
                                Map<ExecutionAttemptID, Task> runningTasks = ((TestingTaskManagerMessages.ResponseRunningTasks)this.expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks.class)).asJava();
                                Assert.assertEquals((long)2L, (long)runningTasks.size());
                                Task t1 = runningTasks.get(eid1);
                                Task t2 = runningTasks.get(eid2);
                                Assert.assertNotNull((Object)t1);
                                Assert.assertNotNull((Object)t2);
                                Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)t1.getExecutionState());
                                Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)t2.getExecutionState());
                                tm.tell((Object)new TaskMessages.StopTask(eid1), this.val$testActorGateway);
                                this.expectMsgEquals(Acknowledge.get());
                                Future response = tm.ask((Object)new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1), timeout);
                                Await.ready((Awaitable)response, (Duration)d);
                                Assert.assertEquals((Object)ExecutionState.FINISHED, (Object)t1.getExecutionState());
                                tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), this.val$testActorGateway);
                                runningTasks = ((TestingTaskManagerMessages.ResponseRunningTasks)this.expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks.class)).asJava();
                                Assert.assertEquals((long)1L, (long)runningTasks.size());
                                tm.tell((Object)new TaskMessages.StopTask(eid1), this.val$testActorGateway);
                                this.expectMsgEquals(Acknowledge.get());
                                tm.tell((Object)new TaskMessages.StopTask(eid2), this.val$testActorGateway);
                                this.expectMsgClass(Status.Failure.class);
                                Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)t2.getExecutionState());
                                tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), this.val$testActorGateway);
                                runningTasks = ((TestingTaskManagerMessages.ResponseRunningTasks)this.expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks.class)).asJava();
                                Assert.assertEquals((long)1L, (long)runningTasks.size());
                            }
                            catch (Exception e) {
                                e.printStackTrace();
                                Assert.fail((String)e.getMessage());
                            }
                        }
                    };
                }
                catch (Throwable throwable) {
                    TestingUtils.stopActor(taskManager);
                    TestingUtils.stopActor(jobManager);
                    throw throwable;
                }
                TestingUtils.stopActor(taskManager);
                TestingUtils.stopActor((ActorGateway)jobManager);
            }
        };
    }

    @Test
    public void testGateChannelEdgeMismatch() {
        new JavaTestKit(system){
            {
                block5: {
                    super(x0);
                    AkkaActorGateway jobManager = null;
                    ActorGateway taskManager = null;
                    AkkaActorGateway testActorGateway = new AkkaActorGateway(this.getTestActor(), LEADER_SESSION_ID);
                    try {
                        ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, (Object[])new Object[]{LEADER_SESSION_ID}));
                        jobManager = new AkkaActorGateway(jm, LEADER_SESSION_ID);
                        TaskManagerTest.this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, (LeaderRetrievalService)new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
                        final ActorGateway tm = taskManager = TestingUtils.createTaskManager(system, TaskManagerTest.this.highAvailabilityServices, new Configuration(), true, true);
                        JobID jid = new JobID();
                        JobVertexID vid1 = new JobVertexID();
                        JobVertexID vid2 = new JobVertexID();
                        ExecutionAttemptID eid1 = new ExecutionAttemptID();
                        ExecutionAttemptID eid2 = new ExecutionAttemptID();
                        final TaskDeploymentDescriptor tdd1 = TaskManagerTest.createTaskDeploymentDescriptor(jid, "TestJob", vid1, eid1, (SerializedValue<ExecutionConfig>)new SerializedValue((Object)new ExecutionConfig()), "Sender", 1, 0, 1, 0, new Configuration(), new Configuration(), Tasks.Sender.class.getName(), Collections.emptyList(), Collections.emptyList(), new ArrayList(), Collections.emptyList(), 0);
                        TaskDeploymentDescriptor tdd2 = TaskManagerTest.createTaskDeploymentDescriptor(jid, "TestJob", vid2, eid2, (SerializedValue<ExecutionConfig>)new SerializedValue((Object)new ExecutionConfig()), "Receiver", 7, 2, 7, 0, new Configuration(), new Configuration(), Tasks.Receiver.class.getName(), Collections.emptyList(), Collections.emptyList(), new ArrayList(), Collections.emptyList(), 0);
                        new JavaTestKit.Within(d, (ActorGateway)testActorGateway, tdd2, eid1, eid2){
                            final /* synthetic */ ActorGateway val$testActorGateway;
                            final /* synthetic */ TaskDeploymentDescriptor val$tdd2;
                            final /* synthetic */ ExecutionAttemptID val$eid1;
                            final /* synthetic */ ExecutionAttemptID val$eid2;
                            {
                                this.val$testActorGateway = actorGateway2;
                                this.val$tdd2 = taskDeploymentDescriptor2;
                                this.val$eid1 = executionAttemptID;
                                this.val$eid2 = executionAttemptID2;
                                super((JavaTestKit)this, x0);
                            }

                            protected void run() {
                                try {
                                    tm.tell((Object)new TaskMessages.SubmitTask(tdd1), this.val$testActorGateway);
                                    tm.tell((Object)new TaskMessages.SubmitTask(this.val$tdd2), this.val$testActorGateway);
                                    this.expectMsgEquals(Acknowledge.get());
                                    this.expectMsgEquals(Acknowledge.get());
                                    tm.tell((Object)new TestingTaskManagerMessages.NotifyWhenTaskRemoved(this.val$eid1), this.val$testActorGateway);
                                    tm.tell((Object)new TestingTaskManagerMessages.NotifyWhenTaskRemoved(this.val$eid2), this.val$testActorGateway);
                                    this.expectMsgEquals(true);
                                    this.expectMsgEquals(true);
                                    tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), this.val$testActorGateway);
                                    Map<ExecutionAttemptID, Task> tasks = ((TestingTaskManagerMessages.ResponseRunningTasks)this.expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks.class)).asJava();
                                    Assert.assertEquals((long)0L, (long)tasks.size());
                                }
                                catch (Exception e) {
                                    e.printStackTrace();
                                    Assert.fail((String)e.getMessage());
                                }
                            }
                        };
                        TestingUtils.stopActor(taskManager);
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                        Assert.fail((String)e.getMessage());
                        break block5;
                    }
                    finally {
                        TestingUtils.stopActor(taskManager);
                        TestingUtils.stopActor(jobManager);
                    }
                    TestingUtils.stopActor((ActorGateway)jobManager);
                }
            }
        };
    }

    @Test
    public void testRunJobWithForwardChannel() {
        new JavaTestKit(system){
            {
                block5: {
                    super(x0);
                    AkkaActorGateway jobManager = null;
                    ActorGateway taskManager = null;
                    AkkaActorGateway testActorGateway = new AkkaActorGateway(this.getTestActor(), LEADER_SESSION_ID);
                    try {
                        JobID jid = new JobID();
                        JobVertexID vid1 = new JobVertexID();
                        JobVertexID vid2 = new JobVertexID();
                        final ExecutionAttemptID eid1 = new ExecutionAttemptID();
                        final ExecutionAttemptID eid2 = new ExecutionAttemptID();
                        ActorRef jm = system.actorOf(Props.create((Creator)new SimpleLookupJobManagerCreator(LEADER_SESSION_ID)));
                        jobManager = new AkkaActorGateway(jm, LEADER_SESSION_ID);
                        TaskManagerTest.this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, (LeaderRetrievalService)new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
                        final ActorGateway tm = taskManager = TestingUtils.createTaskManager(system, TaskManagerTest.this.highAvailabilityServices, new Configuration(), true, true);
                        IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID();
                        ArrayList<ResultPartitionDeploymentDescriptor> irpdd = new ArrayList<ResultPartitionDeploymentDescriptor>();
                        irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1, 1, true));
                        InputGateDeploymentDescriptor ircdd = new InputGateDeploymentDescriptor(new IntermediateDataSetID(), ResultPartitionType.PIPELINED, 0, new InputChannelDeploymentDescriptor[]{new InputChannelDeploymentDescriptor(new ResultPartitionID(partitionId, eid1), ResultPartitionLocation.createLocal())});
                        final TaskDeploymentDescriptor tdd1 = TaskManagerTest.createTaskDeploymentDescriptor(jid, "TestJob", vid1, eid1, (SerializedValue<ExecutionConfig>)new SerializedValue((Object)new ExecutionConfig()), "Sender", 1, 0, 1, 0, new Configuration(), new Configuration(), Tasks.Sender.class.getName(), irpdd, Collections.emptyList(), new ArrayList(), Collections.emptyList(), 0);
                        TaskDeploymentDescriptor tdd2 = TaskManagerTest.createTaskDeploymentDescriptor(jid, "TestJob", vid2, eid2, (SerializedValue<ExecutionConfig>)new SerializedValue((Object)new ExecutionConfig()), "Receiver", 7, 2, 7, 0, new Configuration(), new Configuration(), Tasks.Receiver.class.getName(), Collections.emptyList(), Collections.singletonList(ircdd), new ArrayList(), Collections.emptyList(), 0);
                        new JavaTestKit.Within(d, (ActorGateway)testActorGateway, tdd2){
                            final /* synthetic */ ActorGateway val$testActorGateway;
                            final /* synthetic */ TaskDeploymentDescriptor val$tdd2;
                            {
                                this.val$testActorGateway = actorGateway2;
                                this.val$tdd2 = taskDeploymentDescriptor2;
                                super((JavaTestKit)this, x0);
                            }

                            protected void run() {
                                try {
                                    Future response;
                                    Future t1Running = tm.ask((Object)new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(eid1), timeout);
                                    Future t2Running = tm.ask((Object)new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(eid2), timeout);
                                    tm.tell((Object)new TaskMessages.SubmitTask(tdd1), this.val$testActorGateway);
                                    this.expectMsgEquals(Acknowledge.get());
                                    Await.ready((Awaitable)t1Running, (Duration)d);
                                    tm.tell((Object)new TaskMessages.SubmitTask(this.val$tdd2), this.val$testActorGateway);
                                    this.expectMsgEquals(Acknowledge.get());
                                    Await.ready((Awaitable)t2Running, (Duration)d);
                                    tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), this.val$testActorGateway);
                                    Map<ExecutionAttemptID, Task> tasks = ((TestingTaskManagerMessages.ResponseRunningTasks)this.expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks.class)).asJava();
                                    Task t1 = tasks.get(eid1);
                                    Task t2 = tasks.get(eid2);
                                    if (t1 != null) {
                                        response = tm.ask((Object)new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1), timeout);
                                        Await.ready((Awaitable)response, (Duration)d);
                                    }
                                    if (t2 != null) {
                                        response = tm.ask((Object)new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2), timeout);
                                        Await.ready((Awaitable)response, (Duration)d);
                                        Assert.assertEquals((Object)ExecutionState.FINISHED, (Object)t2.getExecutionState());
                                    }
                                    tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), this.val$testActorGateway);
                                    tasks = ((TestingTaskManagerMessages.ResponseRunningTasks)this.expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks.class)).asJava();
                                    Assert.assertEquals((long)0L, (long)tasks.size());
                                }
                                catch (Exception e) {
                                    e.printStackTrace();
                                    Assert.fail((String)e.getMessage());
                                }
                            }
                        };
                        TestingUtils.stopActor(taskManager);
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                        Assert.fail((String)e.getMessage());
                        break block5;
                    }
                    finally {
                        TestingUtils.stopActor(taskManager);
                        TestingUtils.stopActor(jobManager);
                    }
                    TestingUtils.stopActor((ActorGateway)jobManager);
                }
            }
        };
    }

    @Test
    public void testCancellingDependentAndStateUpdateFails() {
        new JavaTestKit(system){
            {
                block5: {
                    super(x0);
                    AkkaActorGateway jobManager = null;
                    ActorGateway taskManager = null;
                    AkkaActorGateway testActorGateway = new AkkaActorGateway(this.getTestActor(), LEADER_SESSION_ID);
                    try {
                        JobID jid = new JobID();
                        JobVertexID vid1 = new JobVertexID();
                        JobVertexID vid2 = new JobVertexID();
                        final ExecutionAttemptID eid1 = new ExecutionAttemptID();
                        final ExecutionAttemptID eid2 = new ExecutionAttemptID();
                        ActorRef jm = system.actorOf(Props.create((Creator)new SimpleLookupFailingUpdateJobManagerCreator(LEADER_SESSION_ID, eid2)));
                        jobManager = new AkkaActorGateway(jm, LEADER_SESSION_ID);
                        TaskManagerTest.this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, (LeaderRetrievalService)new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
                        final ActorGateway tm = taskManager = TestingUtils.createTaskManager(system, TaskManagerTest.this.highAvailabilityServices, new Configuration(), true, true);
                        IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID();
                        ArrayList<ResultPartitionDeploymentDescriptor> irpdd = new ArrayList<ResultPartitionDeploymentDescriptor>();
                        irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1, 1, true));
                        InputGateDeploymentDescriptor ircdd = new InputGateDeploymentDescriptor(new IntermediateDataSetID(), ResultPartitionType.PIPELINED, 0, new InputChannelDeploymentDescriptor[]{new InputChannelDeploymentDescriptor(new ResultPartitionID(partitionId, eid1), ResultPartitionLocation.createLocal())});
                        TaskDeploymentDescriptor tdd1 = TaskManagerTest.createTaskDeploymentDescriptor(jid, "TestJob", vid1, eid1, (SerializedValue<ExecutionConfig>)new SerializedValue((Object)new ExecutionConfig()), "Sender", 1, 0, 1, 0, new Configuration(), new Configuration(), Tasks.Sender.class.getName(), irpdd, Collections.emptyList(), new ArrayList(), Collections.emptyList(), 0);
                        final TaskDeploymentDescriptor tdd2 = TaskManagerTest.createTaskDeploymentDescriptor(jid, "TestJob", vid2, eid2, (SerializedValue<ExecutionConfig>)new SerializedValue((Object)new ExecutionConfig()), "Receiver", 7, 2, 7, 0, new Configuration(), new Configuration(), Tasks.BlockingReceiver.class.getName(), Collections.emptyList(), Collections.singletonList(ircdd), new ArrayList(), Collections.emptyList(), 0);
                        new JavaTestKit.Within(d, (ActorGateway)testActorGateway, tdd1){
                            final /* synthetic */ ActorGateway val$testActorGateway;
                            final /* synthetic */ TaskDeploymentDescriptor val$tdd1;
                            {
                                this.val$testActorGateway = actorGateway2;
                                this.val$tdd1 = taskDeploymentDescriptor2;
                                super((JavaTestKit)this, x0);
                            }

                            protected void run() {
                                try {
                                    Future response;
                                    Future t1Running = tm.ask((Object)new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(eid1), timeout);
                                    Future t2Running = tm.ask((Object)new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(eid2), timeout);
                                    tm.tell((Object)new TaskMessages.SubmitTask(tdd2), this.val$testActorGateway);
                                    tm.tell((Object)new TaskMessages.SubmitTask(this.val$tdd1), this.val$testActorGateway);
                                    this.expectMsgEquals(Acknowledge.get());
                                    this.expectMsgEquals(Acknowledge.get());
                                    Await.ready((Awaitable)t1Running, (Duration)d);
                                    Await.ready((Awaitable)t2Running, (Duration)d);
                                    tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), this.val$testActorGateway);
                                    Map<ExecutionAttemptID, Task> tasks = ((TestingTaskManagerMessages.ResponseRunningTasks)this.expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks.class)).asJava();
                                    Task t1 = tasks.get(eid1);
                                    Task t2 = tasks.get(eid2);
                                    tm.tell((Object)new TaskMessages.CancelTask(eid2), this.val$testActorGateway);
                                    this.expectMsgEquals(Acknowledge.get());
                                    if (t2 != null) {
                                        response = tm.ask((Object)new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2), timeout);
                                        Await.ready((Awaitable)response, (Duration)d);
                                    }
                                    if (t1 != null) {
                                        if (t1.getExecutionState() == ExecutionState.RUNNING) {
                                            tm.tell((Object)new TaskMessages.CancelTask(eid1), this.val$testActorGateway);
                                            this.expectMsgEquals(Acknowledge.get());
                                        }
                                        response = tm.ask((Object)new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1), timeout);
                                        Await.ready((Awaitable)response, (Duration)d);
                                    }
                                    tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), this.val$testActorGateway);
                                    tasks = ((TestingTaskManagerMessages.ResponseRunningTasks)this.expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks.class)).asJava();
                                    Assert.assertEquals((long)0L, (long)tasks.size());
                                }
                                catch (Exception e) {
                                    e.printStackTrace();
                                    Assert.fail((String)e.getMessage());
                                }
                            }
                        };
                        TestingUtils.stopActor(taskManager);
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                        Assert.fail((String)e.getMessage());
                        break block5;
                    }
                    finally {
                        TestingUtils.stopActor(taskManager);
                        TestingUtils.stopActor(jobManager);
                    }
                    TestingUtils.stopActor((ActorGateway)jobManager);
                }
            }
        };
    }

    @Test
    public void testRemotePartitionNotFound() throws Exception {
        new JavaTestKit(system){
            {
                block5: {
                    super(x0);
                    AkkaActorGateway jobManager = null;
                    ActorGateway taskManager = null;
                    AkkaActorGateway testActorGateway = new AkkaActorGateway(this.getTestActor(), LEADER_SESSION_ID);
                    try {
                        IntermediateDataSetID resultId = new IntermediateDataSetID();
                        ActorRef jm = system.actorOf(Props.create((Creator)new SimplePartitionStateLookupJobManagerCreator(LEADER_SESSION_ID, this.getTestActor())));
                        jobManager = new AkkaActorGateway(jm, LEADER_SESSION_ID);
                        TaskManagerTest.this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, (LeaderRetrievalService)new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
                        int dataPort = NetUtils.getAvailablePort();
                        Configuration config = new Configuration();
                        config.setInteger(TaskManagerOptions.DATA_PORT, dataPort);
                        config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
                        config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
                        final ActorGateway tm = taskManager = TestingUtils.createTaskManager(system, TaskManagerTest.this.highAvailabilityServices, config, false, true);
                        JobID jid = new JobID();
                        JobVertexID vid = new JobVertexID();
                        ExecutionAttemptID eid = new ExecutionAttemptID();
                        ResultPartitionID partitionId = new ResultPartitionID();
                        ResultPartitionLocation loc = ResultPartitionLocation.createRemote((ConnectionID)new ConnectionID(new InetSocketAddress("localhost", dataPort), 0));
                        InputChannelDeploymentDescriptor[] icdd = new InputChannelDeploymentDescriptor[]{new InputChannelDeploymentDescriptor(partitionId, loc)};
                        InputGateDeploymentDescriptor igdd = new InputGateDeploymentDescriptor(resultId, ResultPartitionType.PIPELINED, 0, icdd);
                        final TaskDeploymentDescriptor tdd = TaskManagerTest.createTaskDeploymentDescriptor(jid, "TestJob", vid, eid, (SerializedValue<ExecutionConfig>)new SerializedValue((Object)new ExecutionConfig()), "Receiver", 1, 0, 1, 0, new Configuration(), new Configuration(), Tasks.AgnosticReceiver.class.getName(), Collections.emptyList(), Collections.singletonList(igdd), Collections.emptyList(), Collections.emptyList(), 0);
                        new JavaTestKit.Within(d, (ActorGateway)testActorGateway){
                            final /* synthetic */ ActorGateway val$testActorGateway;
                            {
                                this.val$testActorGateway = actorGateway2;
                                super((JavaTestKit)this, x0);
                            }

                            protected void run() {
                                tm.tell((Object)new TaskMessages.SubmitTask(tdd), this.val$testActorGateway);
                                this.expectMsgClass(Acknowledge.get().getClass());
                                TaskExecutionState msg = (TaskExecutionState)this.expectMsgClass(TaskExecutionState.class);
                                Assert.assertEquals((Object)ExecutionState.FAILED, (Object)msg.getExecutionState());
                                Throwable t = msg.getError(ClassLoader.getSystemClassLoader());
                                Assert.assertEquals((String)("Thrown exception was not a PartitionNotFoundException: " + t.getMessage()), PartitionNotFoundException.class, t.getClass());
                            }
                        };
                        TestingUtils.stopActor(taskManager);
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                        Assert.fail((String)e.getMessage());
                        break block5;
                    }
                    finally {
                        TestingUtils.stopActor(taskManager);
                        TestingUtils.stopActor(jobManager);
                    }
                    TestingUtils.stopActor((ActorGateway)jobManager);
                }
            }
        };
    }

    @Test
    public void testTaskManagerServicesConfiguration() throws Exception {
        Configuration config = new Configuration();
        config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
        config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
        config.setInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL, 10);
        config.setInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE, 100);
        TaskManagerServicesConfiguration tmConfig = TaskManagerServicesConfiguration.fromConfiguration((Configuration)config, (InetAddress)InetAddress.getLoopbackAddress(), (boolean)true);
        Assert.assertEquals((long)tmConfig.getNetworkConfig().partitionRequestInitialBackoff(), (long)100L);
        Assert.assertEquals((long)tmConfig.getNetworkConfig().partitionRequestMaxBackoff(), (long)200L);
        Assert.assertEquals((long)tmConfig.getNetworkConfig().networkBuffersPerChannel(), (long)10L);
        Assert.assertEquals((long)tmConfig.getNetworkConfig().floatingNetworkBuffersPerGate(), (long)100L);
    }

    @Test
    public void testLocalPartitionNotFound() throws Exception {
        new JavaTestKit(system){
            {
                block5: {
                    super(x0);
                    AkkaActorGateway jobManager = null;
                    ActorGateway taskManager = null;
                    AkkaActorGateway testActorGateway = new AkkaActorGateway(this.getTestActor(), LEADER_SESSION_ID);
                    try {
                        IntermediateDataSetID resultId = new IntermediateDataSetID();
                        ActorRef jm = system.actorOf(Props.create((Creator)new SimplePartitionStateLookupJobManagerCreator(LEADER_SESSION_ID, this.getTestActor())));
                        jobManager = new AkkaActorGateway(jm, LEADER_SESSION_ID);
                        TaskManagerTest.this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, (LeaderRetrievalService)new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
                        Configuration config = new Configuration();
                        config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
                        config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
                        final ActorGateway tm = taskManager = TestingUtils.createTaskManager(system, TaskManagerTest.this.highAvailabilityServices, config, true, true);
                        JobID jid = new JobID();
                        JobVertexID vid = new JobVertexID();
                        ExecutionAttemptID eid = new ExecutionAttemptID();
                        ResultPartitionID partitionId = new ResultPartitionID();
                        ResultPartitionLocation loc = ResultPartitionLocation.createLocal();
                        InputChannelDeploymentDescriptor[] icdd = new InputChannelDeploymentDescriptor[]{new InputChannelDeploymentDescriptor(partitionId, loc)};
                        InputGateDeploymentDescriptor igdd = new InputGateDeploymentDescriptor(resultId, ResultPartitionType.PIPELINED, 0, icdd);
                        final TaskDeploymentDescriptor tdd = TaskManagerTest.createTaskDeploymentDescriptor(jid, "TestJob", vid, eid, (SerializedValue<ExecutionConfig>)new SerializedValue((Object)new ExecutionConfig()), "Receiver", 1, 0, 1, 0, new Configuration(), new Configuration(), Tasks.AgnosticReceiver.class.getName(), Collections.emptyList(), Collections.singletonList(igdd), Collections.emptyList(), Collections.emptyList(), 0);
                        new JavaTestKit.Within(new FiniteDuration(120L, TimeUnit.SECONDS), (ActorGateway)testActorGateway){
                            final /* synthetic */ ActorGateway val$testActorGateway;
                            {
                                this.val$testActorGateway = actorGateway2;
                                super((JavaTestKit)this, x0);
                            }

                            protected void run() {
                                tm.tell((Object)new TaskMessages.SubmitTask(tdd), this.val$testActorGateway);
                                this.expectMsgClass(Acknowledge.get().getClass());
                                TaskExecutionState msg = (TaskExecutionState)this.expectMsgClass(TaskExecutionState.class);
                                Assert.assertEquals((Object)msg.getExecutionState(), (Object)ExecutionState.FAILED);
                                Throwable error = msg.getError(((Object)((Object)this)).getClass().getClassLoader());
                                if (error.getClass() != PartitionNotFoundException.class) {
                                    error.printStackTrace();
                                    Assert.fail((String)("Wrong exception: " + error.getMessage()));
                                }
                            }
                        };
                        TestingUtils.stopActor(taskManager);
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                        Assert.fail((String)e.getMessage());
                        break block5;
                    }
                    finally {
                        TestingUtils.stopActor(taskManager);
                        TestingUtils.stopActor(jobManager);
                    }
                    TestingUtils.stopActor((ActorGateway)jobManager);
                }
            }
        };
    }

    @Test
    public void testLogNotFoundHandling() throws Exception {
        new JavaTestKit(system){
            {
                AkkaActorGateway jobManager = null;
                ActorGateway taskManager = null;
                try {
                    ActorRef jm = system.actorOf(Props.create((Creator)new SimplePartitionStateLookupJobManagerCreator(LEADER_SESSION_ID, this.getTestActor())));
                    jobManager = new AkkaActorGateway(jm, LEADER_SESSION_ID);
                    int dataPort = NetUtils.getAvailablePort();
                    Configuration config = new Configuration();
                    config.setInteger(TaskManagerOptions.DATA_PORT, dataPort);
                    config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
                    config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
                    config.setString("taskmanager.log.path", "/i/dont/exist");
                    TaskManagerTest.this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, (LeaderRetrievalService)new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
                    final ActorGateway tm = taskManager = TestingUtils.createTaskManager(system, TaskManagerTest.this.highAvailabilityServices, config, false, true);
                    new JavaTestKit.Within(d){

                        protected void run() {
                            Future logFuture = tm.ask(TaskManagerMessages.getRequestTaskManagerLog(), timeout);
                            try {
                                Await.result((Awaitable)logFuture, (Duration)timeout);
                                Assert.fail();
                            }
                            catch (Exception e) {
                                Assert.assertTrue((boolean)e.getMessage().startsWith("TaskManager log files are unavailable. Log file could not be found at"));
                            }
                        }
                    };
                }
                catch (Throwable throwable) {
                    TestingUtils.stopActor(taskManager);
                    TestingUtils.stopActor(jobManager);
                    throw throwable;
                }
                TestingUtils.stopActor(taskManager);
                TestingUtils.stopActor((ActorGateway)jobManager);
            }
        };
    }

    @Test
    public void testTriggerStackTraceSampleMessage() throws Exception {
        new JavaTestKit(system){
            {
                ActorGateway taskManagerActorGateway = null;
                ActorRef jm = system.actorOf(Props.create((Creator)new SimpleLookupJobManagerCreator(HighAvailabilityServices.DEFAULT_LEADER_ID)));
                AkkaActorGateway jobManagerActorGateway = new AkkaActorGateway(jm, HighAvailabilityServices.DEFAULT_LEADER_ID);
                AkkaActorGateway testActorGateway = new AkkaActorGateway(this.getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID);
                try {
                    AkkaActorGateway jobManager = jobManagerActorGateway;
                    TaskManagerTest.this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, (LeaderRetrievalService)new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
                    final ActorGateway taskManager = TestingUtils.createTaskManager(system, TaskManagerTest.this.highAvailabilityServices, new Configuration(), true, false);
                    JobID jobId = new JobID();
                    final TaskDeploymentDescriptor tdd = TaskManagerTest.createTaskDeploymentDescriptor(jobId, "Job", new JobVertexID(), new ExecutionAttemptID(), (SerializedValue<ExecutionConfig>)new SerializedValue((Object)new ExecutionConfig()), "Task", 1, 0, 1, 0, new Configuration(), new Configuration(), BlockingNoOpInvokable.class.getName(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), 0);
                    new JavaTestKit.Within(d, (ActorGateway)jobManager, tdd){
                        final /* synthetic */ ActorGateway val$jobManager;
                        final /* synthetic */ TaskDeploymentDescriptor val$tdd;
                        {
                            this.val$jobManager = actorGateway2;
                            this.val$tdd = taskDeploymentDescriptor;
                            super((JavaTestKit)this, x0);
                        }

                        protected void run() {
                            try {
                                Future connectFuture = taskManager.ask((Object)new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(this.val$jobManager.actor()), this.remaining());
                                Await.ready((Awaitable)connectFuture, (Duration)this.remaining());
                                Future taskRunningFuture = taskManager.ask((Object)new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(this.val$tdd.getExecutionAttemptId()), timeout);
                                taskManager.tell((Object)new TaskMessages.SubmitTask(this.val$tdd));
                                Await.ready((Awaitable)taskRunningFuture, (Duration)d);
                            }
                            catch (Exception e) {
                                e.printStackTrace();
                                Assert.fail((String)e.getMessage());
                            }
                        }
                    };
                    new JavaTestKit.Within(d, (ActorGateway)testActorGateway){
                        final /* synthetic */ ActorGateway val$testActorGateway;
                        {
                            this.val$testActorGateway = actorGateway2;
                            super((JavaTestKit)this, x0);
                        }

                        protected void run() {
                            try {
                                ExecutionAttemptID taskId = new ExecutionAttemptID();
                                taskManager.tell((Object)new StackTraceSampleMessages.TriggerStackTraceSample(112223, taskId, 100, timeD, 0), this.val$testActorGateway);
                                Object[] msg = this.receiveN(1);
                                while (!(msg[0] instanceof Status.Failure)) {
                                    msg = this.receiveN(1);
                                }
                                Status.Failure response = (Status.Failure)msg[0];
                                Assert.assertEquals(IllegalStateException.class, response.cause().getClass());
                            }
                            catch (Exception e) {
                                e.printStackTrace();
                                Assert.fail((String)e.getMessage());
                            }
                        }
                    };
                    new JavaTestKit.Within(d, (ActorGateway)testActorGateway){
                        final /* synthetic */ ActorGateway val$testActorGateway;
                        {
                            this.val$testActorGateway = actorGateway2;
                            super((JavaTestKit)this, x0);
                        }

                        protected void run() {
                            boolean success = false;
                            Throwable lastError = null;
                            for (int i = 0; i < 100 && !success; ++i) {
                                try {
                                    int numSamples = 5;
                                    taskManager.tell((Object)new StackTraceSampleMessages.TriggerStackTraceSample(19230, tdd.getExecutionAttemptId(), numSamples, Time.milliseconds((long)100L), 0), this.val$testActorGateway);
                                    Object[] msg = this.receiveN(1);
                                    while (!(msg[0] instanceof StackTraceSampleResponse)) {
                                        msg = this.receiveN(1);
                                    }
                                    StackTraceSampleResponse response = (StackTraceSampleResponse)msg[0];
                                    Assert.assertEquals((long)19230L, (long)response.getSampleId());
                                    Assert.assertEquals((Object)tdd.getExecutionAttemptId(), (Object)response.getExecutionAttemptID());
                                    List traces = response.getSamples();
                                    Assert.assertEquals((String)"Number of samples", (long)numSamples, (long)traces.size());
                                    Iterator iterator = traces.iterator();
                                    while (iterator.hasNext()) {
                                        Object[] trace;
                                        for (StackTraceElement stackTraceElement : trace = (StackTraceElement[])iterator.next()) {
                                            if (!stackTraceElement.getClassName().equals(BlockingNoOpInvokable.class.getName())) continue;
                                            Assert.assertEquals((Object)"invoke", (Object)stackTraceElement.getMethodName());
                                            success = true;
                                            break;
                                        }
                                        Assert.assertTrue((String)("Unexpected stack trace: " + Arrays.toString(trace)), (boolean)success);
                                    }
                                }
                                catch (Throwable t) {
                                    lastError = t;
                                    LOG.warn("Failed to find invokable.", t);
                                }
                                try {
                                    Thread.sleep(100L);
                                    continue;
                                }
                                catch (InterruptedException e) {
                                    LOG.error("Interrupted while sleeping before retry.", (Throwable)e);
                                    break;
                                }
                            }
                            if (!success) {
                                if (lastError == null) {
                                    Assert.fail((String)"Failed to find invokable");
                                } else {
                                    Assert.fail((String)lastError.getMessage());
                                }
                            }
                        }
                    };
                    new JavaTestKit.Within(d, (ActorGateway)testActorGateway){
                        final /* synthetic */ ActorGateway val$testActorGateway;
                        {
                            this.val$testActorGateway = actorGateway2;
                            super((JavaTestKit)this, x0);
                        }

                        protected void run() {
                            try {
                                int numSamples = 5;
                                int maxDepth = 2;
                                taskManager.tell((Object)new StackTraceSampleMessages.TriggerStackTraceSample(1337, tdd.getExecutionAttemptId(), numSamples, Time.milliseconds((long)100L), maxDepth), this.val$testActorGateway);
                                Object[] msg = this.receiveN(1);
                                while (!(msg[0] instanceof StackTraceSampleResponse)) {
                                    msg = this.receiveN(1);
                                }
                                StackTraceSampleResponse response = (StackTraceSampleResponse)msg[0];
                                Assert.assertEquals((long)1337L, (long)response.getSampleId());
                                Assert.assertEquals((Object)tdd.getExecutionAttemptId(), (Object)response.getExecutionAttemptID());
                                List traces = response.getSamples();
                                Assert.assertEquals((String)"Number of samples", (long)numSamples, (long)traces.size());
                                for (StackTraceElement[] trace : traces) {
                                    Assert.assertEquals((String)"Max depth", (long)maxDepth, (long)trace.length);
                                }
                            }
                            catch (Exception e) {
                                e.printStackTrace();
                                Assert.fail((String)e.getMessage());
                            }
                        }
                    };
                    new JavaTestKit.Within(d, (ActorGateway)testActorGateway, jobId){
                        final /* synthetic */ ActorGateway val$testActorGateway;
                        final /* synthetic */ JobID val$jobId;
                        {
                            this.val$testActorGateway = actorGateway2;
                            this.val$jobId = jobID;
                            super((JavaTestKit)this, x0);
                        }

                        protected void run() {
                            try {
                                int maxAttempts = 10;
                                int sleepTime = 100;
                                int i = 0;
                                while (i < maxAttempts) {
                                    Object[] msg;
                                    taskManager.tell((Object)new StackTraceSampleMessages.TriggerStackTraceSample(44, tdd.getExecutionAttemptId(), Integer.MAX_VALUE, Time.milliseconds((long)10L), 0), this.val$testActorGateway);
                                    Thread.sleep(sleepTime);
                                    Future removeFuture = taskManager.ask((Object)new TestingJobManagerMessages.NotifyWhenJobRemoved(this.val$jobId), this.remaining());
                                    taskManager.tell((Object)new TaskMessages.CancelTask(tdd.getExecutionAttemptId()));
                                    do {
                                        if (!((msg = this.receiveN(1))[0] instanceof StackTraceSampleResponse)) continue;
                                        StackTraceSampleResponse response = (StackTraceSampleResponse)msg[0];
                                        Assert.assertEquals((Object)tdd.getExecutionAttemptId(), (Object)response.getExecutionAttemptID());
                                        Assert.assertEquals((long)44L, (long)response.getSampleId());
                                        return;
                                    } while (!(msg[0] instanceof Failure));
                                    Await.ready((Awaitable)removeFuture, (Duration)this.remaining());
                                    Future taskRunningFuture = taskManager.ask((Object)new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(tdd.getExecutionAttemptId()), timeout);
                                    taskManager.tell((Object)new TaskMessages.SubmitTask(tdd));
                                    Await.ready((Awaitable)taskRunningFuture, (Duration)this.remaining());
                                    ++i;
                                    sleepTime *= 2;
                                }
                            }
                            catch (Exception e) {
                                e.printStackTrace();
                                Assert.fail((String)e.getMessage());
                            }
                        }
                    };
                }
                finally {
                    TestingUtils.stopActor(taskManagerActorGateway);
                    TestingUtils.stopActor((ActorGateway)jobManagerActorGateway);
                }
            }
        };
    }

    @Test
    public void testTerminationOnFatalError() {
        this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, (LeaderRetrievalService)new SettableLeaderRetrievalService());
        new JavaTestKit(system){
            {
                ActorGateway taskManager = TestingUtils.createTaskManager(system, TaskManagerTest.this.highAvailabilityServices, new Configuration(), true, false);
                try {
                    this.watch(taskManager.actor());
                    taskManager.tell((Object)new TaskManagerMessages.FatalError("test fatal error", (Throwable)new Exception("something super bad")));
                    this.expectTerminated((Duration)d, taskManager.actor());
                }
                finally {
                    taskManager.tell((Object)Kill.getInstance());
                }
            }
        };
    }

    @Test(timeout=10000L)
    public void testFailingScheduleOrUpdateConsumersMessage() throws Exception {
        new JavaTestKit(system){
            {
                Configuration configuration = new Configuration();
                configuration.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, "4096");
                JobID jid = new JobID();
                JobVertexID vid = new JobVertexID();
                ExecutionAttemptID eid = new ExecutionAttemptID();
                SerializedValue executionConfig = new SerializedValue((Object)new ExecutionConfig());
                ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor = new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), new IntermediateResultPartitionID(), ResultPartitionType.PIPELINED, 1, 1, true);
                TaskDeploymentDescriptor tdd = TaskManagerTest.createTaskDeploymentDescriptor(jid, "TestJob", vid, eid, (SerializedValue<ExecutionConfig>)executionConfig, "TestTask", 1, 0, 1, 0, new Configuration(), new Configuration(), TestInvokableRecordCancel.class.getName(), Collections.singletonList(resultPartitionDeploymentDescriptor), Collections.emptyList(), new ArrayList(), Collections.emptyList(), 0);
                ActorRef jmActorRef = system.actorOf(Props.create(FailingScheduleOrUpdateConsumersJobManager.class, (Object[])new Object[]{LEADER_SESSION_ID}), "jobmanager");
                AkkaActorGateway jobManager = new AkkaActorGateway(jmActorRef, LEADER_SESSION_ID);
                TaskManagerTest.this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, (LeaderRetrievalService)new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
                ActorGateway taskManager = TestingUtils.createTaskManager(system, TaskManagerTest.this.highAvailabilityServices, configuration, true, true);
                try {
                    TestInvokableRecordCancel.resetGotCanceledFuture();
                    Future result = taskManager.ask((Object)new TaskMessages.SubmitTask(tdd), timeout);
                    Await.result((Awaitable)result, (Duration)timeout);
                    CompletableFuture<Boolean> cancelFuture = TestInvokableRecordCancel.gotCanceled();
                    Assert.assertEquals((Object)true, (Object)cancelFuture.get());
                }
                finally {
                    TestingUtils.stopActor(taskManager);
                    TestingUtils.stopActor((ActorGateway)jobManager);
                }
            }
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSubmitTaskFailure() throws Exception {
        AkkaActorGateway jobManager = null;
        ActorGateway taskManager = null;
        try {
            ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, (Object[])new Object[]{LEADER_SESSION_ID}));
            jobManager = new AkkaActorGateway(jm, LEADER_SESSION_ID);
            this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, (LeaderRetrievalService)new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
            taskManager = TestingUtils.createTaskManager(system, this.highAvailabilityServices, new Configuration(), true, true);
            TaskDeploymentDescriptor tdd = TaskManagerTest.createTaskDeploymentDescriptor(new JobID(), "test job", new JobVertexID(), new ExecutionAttemptID(), (SerializedValue<ExecutionConfig>)new SerializedValue((Object)new ExecutionConfig()), "test task", 0, 0, 1, 0, new Configuration(), new Configuration(), "Foobar", Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), 0);
            Future submitResponse = taskManager.ask((Object)new TaskMessages.SubmitTask(tdd), timeout);
            try {
                Await.result((Awaitable)submitResponse, (Duration)timeout);
                Assert.fail((String)"The submit task message should have failed.");
            }
            catch (IllegalArgumentException illegalArgumentException) {
                // empty catch block
            }
        }
        catch (Throwable throwable) {
            TestingUtils.stopActor(jobManager);
            TestingUtils.stopActor(taskManager);
            throw throwable;
        }
        TestingUtils.stopActor((ActorGateway)jobManager);
        TestingUtils.stopActor(taskManager);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testStopTaskFailure() throws Exception {
        AkkaActorGateway jobManager = null;
        ActorGateway taskManager = null;
        try {
            ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
            ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, (Object[])new Object[]{LEADER_SESSION_ID}));
            jobManager = new AkkaActorGateway(jm, LEADER_SESSION_ID);
            this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, (LeaderRetrievalService)new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
            taskManager = TestingUtils.createTaskManager(system, this.highAvailabilityServices, new Configuration(), true, true);
            TaskDeploymentDescriptor tdd = TaskManagerTest.createTaskDeploymentDescriptor(new JobID(), "test job", new JobVertexID(), executionAttemptId, (SerializedValue<ExecutionConfig>)new SerializedValue((Object)new ExecutionConfig()), "test task", 1, 0, 1, 0, new Configuration(), new Configuration(), BlockingNoOpInvokable.class.getName(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), 0);
            Future submitResponse = taskManager.ask((Object)new TaskMessages.SubmitTask(tdd), timeout);
            Await.result((Awaitable)submitResponse, (Duration)timeout);
            Future taskRunning = taskManager.ask((Object)new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(executionAttemptId), timeout);
            Await.result((Awaitable)taskRunning, (Duration)timeout);
            Future stopResponse = taskManager.ask((Object)new TaskMessages.StopTask(executionAttemptId), timeout);
            try {
                Await.result((Awaitable)stopResponse, (Duration)timeout);
                Assert.fail((String)"The stop task message should have failed.");
            }
            catch (UnsupportedOperationException unsupportedOperationException) {
                // empty catch block
            }
        }
        catch (Throwable throwable) {
            TestingUtils.stopActor(jobManager);
            TestingUtils.stopActor(taskManager);
            throw throwable;
        }
        TestingUtils.stopActor((ActorGateway)jobManager);
        TestingUtils.stopActor(taskManager);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testStackTraceSampleFailure() throws Exception {
        AkkaActorGateway jobManager = null;
        ActorGateway taskManager = null;
        try {
            ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, (Object[])new Object[]{LEADER_SESSION_ID}));
            jobManager = new AkkaActorGateway(jm, LEADER_SESSION_ID);
            this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, (LeaderRetrievalService)new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
            taskManager = TestingUtils.createTaskManager(system, this.highAvailabilityServices, new Configuration(), true, true);
            Future stackTraceResponse = taskManager.ask((Object)new StackTraceSampleMessages.TriggerStackTraceSample(0, new ExecutionAttemptID(), 0, Time.milliseconds((long)1L), 0), timeout);
            try {
                Await.result((Awaitable)stackTraceResponse, (Duration)timeout);
                Assert.fail((String)"The trigger stack trace message should have failed.");
            }
            catch (IllegalStateException illegalStateException) {
                // empty catch block
            }
        }
        catch (Throwable throwable) {
            TestingUtils.stopActor(jobManager);
            TestingUtils.stopActor(taskManager);
            throw throwable;
        }
        TestingUtils.stopActor((ActorGateway)jobManager);
        TestingUtils.stopActor(taskManager);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testUpdateTaskInputPartitionsFailure() throws Exception {
        AkkaActorGateway jobManager = null;
        ActorGateway taskManager = null;
        try {
            ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
            ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, (Object[])new Object[]{LEADER_SESSION_ID}));
            jobManager = new AkkaActorGateway(jm, LEADER_SESSION_ID);
            this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, (LeaderRetrievalService)new StandaloneLeaderRetrievalService(jobManager.path(), jobManager.leaderSessionID()));
            taskManager = TestingUtils.createTaskManager(system, this.highAvailabilityServices, new Configuration(), true, true);
            TaskDeploymentDescriptor tdd = TaskManagerTest.createTaskDeploymentDescriptor(new JobID(), "test job", new JobVertexID(), executionAttemptId, (SerializedValue<ExecutionConfig>)new SerializedValue((Object)new ExecutionConfig()), "test task", 1, 0, 1, 0, new Configuration(), new Configuration(), BlockingNoOpInvokable.class.getName(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), 0);
            Future submitResponse = taskManager.ask((Object)new TaskMessages.SubmitTask(tdd), timeout);
            Await.result((Awaitable)submitResponse, (Duration)timeout);
            Future partitionUpdateResponse = taskManager.ask((Object)new TaskMessages.UpdateTaskSinglePartitionInfo(executionAttemptId, new IntermediateDataSetID(), new InputChannelDeploymentDescriptor(new ResultPartitionID(), ResultPartitionLocation.createLocal())), timeout);
            try {
                Await.result((Awaitable)partitionUpdateResponse, (Duration)timeout);
                Assert.fail((String)"The update task input partitions message should have failed.");
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        catch (Throwable throwable) {
            TestingUtils.stopActor(jobManager);
            TestingUtils.stopActor(taskManager);
            throw throwable;
        }
        TestingUtils.stopActor((ActorGateway)jobManager);
        TestingUtils.stopActor(taskManager);
    }

    private static TaskDeploymentDescriptor createTaskDeploymentDescriptor(JobID jobId, String jobName, JobVertexID jobVertexId, ExecutionAttemptID executionAttemptId, SerializedValue<ExecutionConfig> serializedExecutionConfig, String taskName, int numberOfKeyGroups, int subtaskIndex, int parallelism, int attemptNumber, Configuration jobConfiguration, Configuration taskConfiguration, String invokableClassName, Collection<ResultPartitionDeploymentDescriptor> producedPartitions, Collection<InputGateDeploymentDescriptor> inputGates, Collection<PermanentBlobKey> requiredJarFiles, Collection<URL> requiredClasspaths, int targetSlotNumber) throws IOException {
        JobInformation jobInformation = new JobInformation(jobId, jobName, serializedExecutionConfig, jobConfiguration, requiredJarFiles, requiredClasspaths);
        TaskInformation taskInformation = new TaskInformation(jobVertexId, taskName, parallelism, numberOfKeyGroups, invokableClassName, taskConfiguration);
        SerializedValue serializedJobInformation = new SerializedValue((Object)jobInformation);
        SerializedValue serializedJobVertexInformation = new SerializedValue((Object)taskInformation);
        return new TaskDeploymentDescriptor(jobId, (TaskDeploymentDescriptor.MaybeOffloaded)new TaskDeploymentDescriptor.NonOffloaded(serializedJobInformation), (TaskDeploymentDescriptor.MaybeOffloaded)new TaskDeploymentDescriptor.NonOffloaded(serializedJobVertexInformation), executionAttemptId, new AllocationID(), subtaskIndex, attemptNumber, targetSlotNumber, null, producedPartitions, inputGates);
    }

    static {
        LEADER_SESSION_ID = UUID.randomUUID();
    }

    public static final class TestInvokableRecordCancel
    extends AbstractInvokable {
        private static final Object lock = new Object();
        private static CompletableFuture<Boolean> gotCanceledFuture = new CompletableFuture();

        public TestInvokableRecordCancel(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            Object o = new Object();
            RecordWriter recordWriter = new RecordWriter(this.getEnvironment().getWriter(0));
            for (int i = 0; i < 1024; ++i) {
                recordWriter.emit((IOReadableWritable)new IntValue(42));
            }
            Object object = o;
            synchronized (object) {
                while (true) {
                    o.wait();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void cancel() {
            Object object = lock;
            synchronized (object) {
                gotCanceledFuture.complete(true);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public static void resetGotCanceledFuture() {
            Object object = lock;
            synchronized (object) {
                gotCanceledFuture = new CompletableFuture();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public static CompletableFuture<Boolean> gotCanceled() {
            Object object = lock;
            synchronized (object) {
                return gotCanceledFuture;
            }
        }
    }

    public static class TestInvokableBlockingCancelable
    extends AbstractInvokable {
        public TestInvokableBlockingCancelable(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            Object o;
            Object object = o = new Object();
            synchronized (object) {
                while (true) {
                    o.wait();
                }
            }
        }
    }

    public static final class TestInvokableCorrect
    extends AbstractInvokable {
        public TestInvokableCorrect(Environment environment) {
            super(environment);
        }

        public void invoke() {
        }
    }

    public static class SimplePartitionStateLookupJobManagerCreator
    implements Creator<SimplePartitionStateLookupJobManager> {
        private final UUID leaderSessionID;
        private final ActorRef testActor;

        public SimplePartitionStateLookupJobManagerCreator(UUID leaderSessionID, ActorRef testActor) {
            this.leaderSessionID = leaderSessionID;
            this.testActor = testActor;
        }

        public SimplePartitionStateLookupJobManager create() throws Exception {
            return new SimplePartitionStateLookupJobManager(this.leaderSessionID, this.testActor);
        }
    }

    public static class SimpleLookupFailingUpdateJobManagerCreator
    implements Creator<SimpleLookupFailingUpdateJobManager> {
        private final UUID leaderSessionID;
        private final Set<ExecutionAttemptID> validIDs;

        public SimpleLookupFailingUpdateJobManagerCreator(UUID leaderSessionID, ExecutionAttemptID ... ids) {
            this.leaderSessionID = leaderSessionID;
            this.validIDs = new HashSet<ExecutionAttemptID>();
            for (ExecutionAttemptID id : ids) {
                this.validIDs.add(id);
            }
        }

        public SimpleLookupFailingUpdateJobManager create() throws Exception {
            return new SimpleLookupFailingUpdateJobManager(this.leaderSessionID, this.validIDs);
        }
    }

    public static class SimpleLookupJobManagerCreator
    implements Creator<SimpleLookupJobManager> {
        private final UUID leaderSessionID;

        public SimpleLookupJobManagerCreator(UUID leaderSessionID) {
            this.leaderSessionID = leaderSessionID;
        }

        public SimpleLookupJobManager create() throws Exception {
            return new SimpleLookupJobManager(this.leaderSessionID);
        }
    }

    public static class SimplePartitionStateLookupJobManager
    extends SimpleJobManager {
        private final ActorRef testActor;

        public SimplePartitionStateLookupJobManager(UUID leaderSessionID, ActorRef testActor) {
            super(leaderSessionID);
            this.testActor = testActor;
        }

        @Override
        public void handleMessage(Object message) throws Exception {
            if (message instanceof JobManagerMessages.RequestPartitionProducerState) {
                this.getSender().tell(this.decorateMessage(ExecutionState.RUNNING), this.getSelf());
            } else if (message instanceof TaskMessages.UpdateTaskExecutionState) {
                TaskExecutionState msg = ((TaskMessages.UpdateTaskExecutionState)message).taskExecutionState();
                if (msg.getExecutionState().isTerminal()) {
                    this.testActor.tell((Object)msg, this.self());
                }
            } else {
                super.handleMessage(message);
            }
        }
    }

    public static class SimpleLookupFailingUpdateJobManager
    extends SimpleLookupJobManager {
        private final Set<ExecutionAttemptID> validIDs;

        public SimpleLookupFailingUpdateJobManager(UUID leaderSessionID, Set<ExecutionAttemptID> ids) {
            super(leaderSessionID);
            this.validIDs = new HashSet<ExecutionAttemptID>(ids);
        }

        @Override
        public void handleMessage(Object message) throws Exception {
            if (message instanceof TaskMessages.UpdateTaskExecutionState) {
                TaskMessages.UpdateTaskExecutionState updateMsg = (TaskMessages.UpdateTaskExecutionState)message;
                if (this.validIDs.contains(updateMsg.taskExecutionState().getID())) {
                    this.getSender().tell((Object)true, this.getSelf());
                } else {
                    this.getSender().tell((Object)false, this.getSelf());
                }
            } else {
                super.handleMessage(message);
            }
        }
    }

    public static class SimpleLookupJobManager
    extends SimpleJobManager {
        public SimpleLookupJobManager(UUID leaderSessionID) {
            super(leaderSessionID);
        }

        @Override
        public void handleMessage(Object message) throws Exception {
            if (message instanceof JobManagerMessages.ScheduleOrUpdateConsumers) {
                this.getSender().tell(this.decorateMessage(Acknowledge.get()), this.getSelf());
            } else {
                super.handleMessage(message);
            }
        }
    }

    public static class FailingScheduleOrUpdateConsumersJobManager
    extends SimpleJobManager {
        public FailingScheduleOrUpdateConsumersJobManager(UUID leaderSessionId) {
            super(leaderSessionId);
        }

        @Override
        public void handleMessage(Object message) throws Exception {
            if (message instanceof JobManagerMessages.ScheduleOrUpdateConsumers) {
                this.getSender().tell(this.decorateMessage(new Status.Failure((Throwable)new Exception("Could not schedule or update consumers."))), this.getSelf());
            } else {
                super.handleMessage(message);
            }
        }
    }

    public static class SimpleJobManager
    extends FlinkUntypedActor {
        private final UUID leaderSessionID;

        public SimpleJobManager(UUID leaderSessionID) {
            this.leaderSessionID = leaderSessionID;
        }

        public void handleMessage(Object message) throws Exception {
            if (message instanceof RegistrationMessages.RegisterTaskManager) {
                InstanceID iid = new InstanceID();
                ActorRef self = this.getSelf();
                this.getSender().tell(this.decorateMessage(new RegistrationMessages.AcknowledgeRegistration(iid, 12345)), self);
            } else if (message instanceof TaskMessages.UpdateTaskExecutionState) {
                this.getSender().tell((Object)true, this.getSelf());
            }
        }

        protected UUID getLeaderSessionID() {
            return this.leaderSessionID;
        }
    }
}

