/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskexecutor;

import java.net.InetAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.PermanentBlobCache;
import org.apache.flink.runtime.blob.TransientBlobCache;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.taskexecutor.JobLeaderListener;
import org.apache.flink.runtime.taskexecutor.JobLeaderService;
import org.apache.flink.runtime.taskexecutor.JobManagerConnection;
import org.apache.flink.runtime.taskexecutor.JobManagerTable;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.TimerService;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;

public class TaskExecutorTest
extends TestLogger {
    private final Time timeout = Time.milliseconds((long)10000L);
    private TestingRpcService rpc;
    @Rule
    public TestName name = new TestName();

    @Before
    public void setup() {
        this.rpc = new TestingRpcService();
    }

    @After
    public void teardown() {
        if (this.rpc != null) {
            this.rpc.stopService();
            this.rpc = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testHeartbeatTimeoutWithJobManager() throws Exception {
        JobID jobId = new JobID();
        Configuration configuration = new Configuration();
        TaskManagerConfiguration tmConfig = TaskManagerConfiguration.fromConfiguration((Configuration)configuration);
        ResourceID tmResourceId = new ResourceID("tm");
        final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(tmResourceId, InetAddress.getLoopbackAddress(), 1234);
        TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList((ResourceProfile)Mockito.mock(ResourceProfile.class)), (TimerService)Mockito.mock(TimerService.class));
        JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
        TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
        TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService(null, null);
        TestingLeaderRetrievalService jmLeaderRetrievalService = new TestingLeaderRetrievalService(null, null);
        haServices.setJobMasterLeaderRetriever(jobId, jmLeaderRetrievalService);
        haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
        TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
        long heartbeatTimeout = 10L;
        HeartbeatServices heartbeatServices = (HeartbeatServices)Mockito.mock(HeartbeatServices.class);
        Mockito.when((Object)heartbeatServices.createHeartbeatManager((ResourceID)Mockito.eq((Object)taskManagerLocation.getResourceID()), (HeartbeatListener)Mockito.any(HeartbeatListener.class), (ScheduledExecutor)Mockito.any(ScheduledExecutor.class), (Logger)Mockito.any(Logger.class))).thenAnswer((Answer)new Answer<HeartbeatManagerImpl<Void, Void>>(){

            public HeartbeatManagerImpl<Void, Void> answer(InvocationOnMock invocation) throws Throwable {
                return new HeartbeatManagerImpl(10L, taskManagerLocation.getResourceID(), (HeartbeatListener)invocation.getArguments()[1], (Executor)invocation.getArguments()[2], (ScheduledExecutor)invocation.getArguments()[2], (Logger)invocation.getArguments()[3]);
            }
        });
        String jobMasterAddress = "jm";
        UUID jmLeaderId = UUID.randomUUID();
        ResourceID jmResourceId = new ResourceID("jm");
        JobMasterGateway jobMasterGateway = (JobMasterGateway)Mockito.mock(JobMasterGateway.class);
        int blobPort = 42;
        Mockito.when((Object)jobMasterGateway.registerTaskManager((String)Mockito.any(String.class), (TaskManagerLocation)Mockito.eq((Object)taskManagerLocation), (Time)Mockito.any(Time.class))).thenReturn(CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId, 42)));
        Mockito.when((Object)jobMasterGateway.getAddress()).thenReturn((Object)"jm");
        Mockito.when((Object)jobMasterGateway.getHostname()).thenReturn((Object)"localhost");
        TaskExecutor taskManager = new TaskExecutor((RpcService)this.rpc, tmConfig, taskManagerLocation, (MemoryManager)Mockito.mock(MemoryManager.class), (IOManager)Mockito.mock(IOManager.class), (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class), (HighAvailabilityServices)haServices, heartbeatServices, (TaskManagerMetricGroup)Mockito.mock(TaskManagerMetricGroup.class), (BroadcastVariableManager)Mockito.mock(BroadcastVariableManager.class), (FileCache)Mockito.mock(FileCache.class), taskSlotTable, new JobManagerTable(), jobLeaderService, (FatalErrorHandler)testingFatalErrorHandler);
        try {
            taskManager.start();
            this.rpc.registerGateway("jm", (RpcGateway)jobMasterGateway);
            jobLeaderService.addJob(jobId, "jm");
            jmLeaderRetrievalService.notifyListener("jm", jmLeaderId);
            ((JobMasterGateway)Mockito.verify((Object)jobMasterGateway, (VerificationMode)Mockito.timeout((long)this.timeout.toMilliseconds()))).registerTaskManager((String)Mockito.eq((Object)taskManager.getAddress()), (TaskManagerLocation)Mockito.eq((Object)taskManagerLocation), (Time)Mockito.any(Time.class));
            ((JobMasterGateway)Mockito.verify((Object)jobMasterGateway, (VerificationMode)Mockito.timeout((long)500L))).disconnectTaskManager((ResourceID)Mockito.eq((Object)taskManagerLocation.getResourceID()), (Exception)Mockito.any(TimeoutException.class));
            testingFatalErrorHandler.rethrowError();
        }
        finally {
            taskManager.shutDown();
            taskManager.getTerminationFuture().get(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testHeartbeatTimeoutWithResourceManager() throws Exception {
        String rmAddress = "rm";
        String tmAddress = "tm";
        ResourceID rmResourceId = new ResourceID("rm");
        ResourceID tmResourceId = new ResourceID("tm");
        UUID rmLeaderId = UUID.randomUUID();
        ResourceManagerGateway rmGateway = (ResourceManagerGateway)Mockito.mock(ResourceManagerGateway.class);
        Mockito.when((Object)rmGateway.registerTaskExecutor(Mockito.anyString(), (ResourceID)Mockito.any(ResourceID.class), (SlotReport)Mockito.any(SlotReport.class), (Time)Mockito.any(Time.class))).thenReturn(CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId, 10L)));
        this.rpc.registerGateway("rm", (RpcGateway)rmGateway);
        TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService(null, null);
        TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
        haServices.setResourceManagerLeaderRetriever(testLeaderService);
        TaskManagerConfiguration taskManagerConfiguration = (TaskManagerConfiguration)Mockito.mock(TaskManagerConfiguration.class);
        Mockito.when((Object)taskManagerConfiguration.getNumberSlots()).thenReturn((Object)1);
        final TaskManagerLocation taskManagerLocation = (TaskManagerLocation)Mockito.mock(TaskManagerLocation.class);
        Mockito.when((Object)taskManagerLocation.getResourceID()).thenReturn((Object)tmResourceId);
        TaskSlotTable taskSlotTable = (TaskSlotTable)Mockito.mock(TaskSlotTable.class);
        SlotReport slotReport = new SlotReport();
        Mockito.when((Object)taskSlotTable.createSlotReport((ResourceID)Mockito.any(ResourceID.class))).thenReturn((Object)slotReport);
        TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
        long heartbeatTimeout = 10L;
        HeartbeatServices heartbeatServices = (HeartbeatServices)Mockito.mock(HeartbeatServices.class);
        Mockito.when((Object)heartbeatServices.createHeartbeatManager((ResourceID)Mockito.eq((Object)taskManagerLocation.getResourceID()), (HeartbeatListener)Mockito.any(HeartbeatListener.class), (ScheduledExecutor)Mockito.any(ScheduledExecutor.class), (Logger)Mockito.any(Logger.class))).thenAnswer((Answer)new Answer<HeartbeatManagerImpl<SlotReport, Void>>(){

            public HeartbeatManagerImpl<SlotReport, Void> answer(InvocationOnMock invocation) throws Throwable {
                return new HeartbeatManagerImpl(10L, taskManagerLocation.getResourceID(), (HeartbeatListener)invocation.getArguments()[1], (Executor)invocation.getArguments()[2], (ScheduledExecutor)invocation.getArguments()[2], (Logger)invocation.getArguments()[3]);
            }
        });
        TaskExecutor taskManager = new TaskExecutor((RpcService)this.rpc, taskManagerConfiguration, taskManagerLocation, (MemoryManager)Mockito.mock(MemoryManager.class), (IOManager)Mockito.mock(IOManager.class), (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class), (HighAvailabilityServices)haServices, heartbeatServices, (TaskManagerMetricGroup)Mockito.mock(TaskManagerMetricGroup.class), (BroadcastVariableManager)Mockito.mock(BroadcastVariableManager.class), (FileCache)Mockito.mock(FileCache.class), taskSlotTable, (JobManagerTable)Mockito.mock(JobManagerTable.class), (JobLeaderService)Mockito.mock(JobLeaderService.class), (FatalErrorHandler)testingFatalErrorHandler);
        try {
            taskManager.start();
            testLeaderService.notifyListener("rm", rmLeaderId);
            ((ResourceManagerGateway)Mockito.verify((Object)rmGateway, (VerificationMode)Mockito.timeout((long)this.timeout.toMilliseconds()).atLeast(1))).registerTaskExecutor((String)Mockito.eq((Object)taskManager.getAddress()), (ResourceID)Mockito.eq((Object)tmResourceId), (SlotReport)Mockito.any(SlotReport.class), (Time)Mockito.any(Time.class));
            ((ResourceManagerGateway)Mockito.verify((Object)rmGateway, (VerificationMode)Mockito.timeout((long)500L))).disconnectTaskManager((ResourceID)Mockito.eq((Object)taskManagerLocation.getResourceID()), (Exception)Mockito.any(TimeoutException.class));
            testingFatalErrorHandler.rethrowError();
        }
        finally {
            taskManager.shutDown();
            taskManager.getTerminationFuture().get(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testHeartbeatSlotReporting() throws Exception {
        long verificationTimeout = 1000L;
        String rmAddress = "rm";
        String tmAddress = "tm";
        ResourceID rmResourceId = new ResourceID("rm");
        ResourceID tmResourceId = new ResourceID("tm");
        UUID rmLeaderId = UUID.randomUUID();
        ResourceManagerGateway rmGateway = (ResourceManagerGateway)Mockito.mock(ResourceManagerGateway.class);
        Mockito.when((Object)rmGateway.registerTaskExecutor(Mockito.anyString(), (ResourceID)Mockito.any(ResourceID.class), (SlotReport)Mockito.any(SlotReport.class), (Time)Mockito.any(Time.class))).thenReturn(CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId, 10L)));
        this.rpc.registerGateway("rm", (RpcGateway)rmGateway);
        TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService(null, null);
        TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
        haServices.setResourceManagerLeaderRetriever(testLeaderService);
        TaskManagerConfiguration taskManagerConfiguration = (TaskManagerConfiguration)Mockito.mock(TaskManagerConfiguration.class);
        Mockito.when((Object)taskManagerConfiguration.getNumberSlots()).thenReturn((Object)1);
        Mockito.when((Object)taskManagerConfiguration.getTimeout()).thenReturn((Object)Time.seconds((long)10L));
        final TaskManagerLocation taskManagerLocation = (TaskManagerLocation)Mockito.mock(TaskManagerLocation.class);
        Mockito.when((Object)taskManagerLocation.getResourceID()).thenReturn((Object)tmResourceId);
        TaskSlotTable taskSlotTable = (TaskSlotTable)Mockito.mock(TaskSlotTable.class);
        SlotID slotId = new SlotID(tmResourceId, 0);
        ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
        SlotReport slotReport1 = new SlotReport(new SlotStatus(slotId, resourceProfile));
        SlotReport slotReport2 = new SlotReport(new SlotStatus(slotId, resourceProfile, new JobID(), new AllocationID()));
        Mockito.when((Object)taskSlotTable.createSlotReport((ResourceID)Mockito.any(ResourceID.class))).thenReturn((Object)slotReport1, (Object[])new SlotReport[]{slotReport2});
        TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
        long heartbeatTimeout = 10000L;
        HeartbeatServices heartbeatServices = (HeartbeatServices)Mockito.mock(HeartbeatServices.class);
        Mockito.when((Object)heartbeatServices.createHeartbeatManager((ResourceID)Mockito.eq((Object)taskManagerLocation.getResourceID()), (HeartbeatListener)Mockito.any(HeartbeatListener.class), (ScheduledExecutor)Mockito.any(ScheduledExecutor.class), (Logger)Mockito.any(Logger.class))).thenAnswer((Answer)new Answer<HeartbeatManagerImpl<SlotReport, Void>>(){

            public HeartbeatManagerImpl<SlotReport, Void> answer(InvocationOnMock invocation) throws Throwable {
                return (HeartbeatManagerImpl)Mockito.spy((Object)new HeartbeatManagerImpl(10000L, taskManagerLocation.getResourceID(), (HeartbeatListener)invocation.getArguments()[1], (Executor)invocation.getArguments()[2], (ScheduledExecutor)invocation.getArguments()[2], (Logger)invocation.getArguments()[3]));
            }
        });
        TaskExecutor taskManager = new TaskExecutor((RpcService)this.rpc, taskManagerConfiguration, taskManagerLocation, (MemoryManager)Mockito.mock(MemoryManager.class), (IOManager)Mockito.mock(IOManager.class), (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class), (HighAvailabilityServices)haServices, heartbeatServices, (TaskManagerMetricGroup)Mockito.mock(TaskManagerMetricGroup.class), (BroadcastVariableManager)Mockito.mock(BroadcastVariableManager.class), (FileCache)Mockito.mock(FileCache.class), taskSlotTable, (JobManagerTable)Mockito.mock(JobManagerTable.class), (JobLeaderService)Mockito.mock(JobLeaderService.class), (FatalErrorHandler)testingFatalErrorHandler);
        try {
            taskManager.start();
            HeartbeatManager heartbeatManager = taskManager.getResourceManagerHeartbeatManager();
            testLeaderService.notifyListener("rm", rmLeaderId);
            ((ResourceManagerGateway)Mockito.verify((Object)rmGateway, (VerificationMode)Mockito.timeout((long)1000L).atLeast(1))).registerTaskExecutor((String)Mockito.eq((Object)taskManager.getAddress()), (ResourceID)Mockito.eq((Object)tmResourceId), (SlotReport)Mockito.eq((Object)slotReport1), (Time)Mockito.any(Time.class));
            ((HeartbeatManager)Mockito.verify((Object)heartbeatManager, (VerificationMode)Mockito.timeout((long)1000L))).monitorTarget((ResourceID)Mockito.any(ResourceID.class), (HeartbeatTarget)Mockito.any(HeartbeatTarget.class));
            TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)taskManager.getSelfGateway(TaskExecutorGateway.class);
            taskExecutorGateway.heartbeatFromResourceManager(rmResourceId);
            ArgumentCaptor slotReportArgumentCaptor = ArgumentCaptor.forClass(SlotReport.class);
            ((ResourceManagerGateway)Mockito.verify((Object)rmGateway, (VerificationMode)Mockito.timeout((long)1000L))).heartbeatFromTaskManager((ResourceID)Mockito.eq((Object)taskManagerLocation.getResourceID()), (SlotReport)slotReportArgumentCaptor.capture());
            SlotReport actualSlotReport = (SlotReport)slotReportArgumentCaptor.getValue();
            Assert.assertEquals((Object)slotReport2, (Object)actualSlotReport);
            testingFatalErrorHandler.rethrowError();
        }
        finally {
            taskManager.shutDown();
            taskManager.getTerminationFuture().get(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception {
        ResourceID resourceID = ResourceID.generate();
        String resourceManagerAddress = "/resource/manager/address/one";
        ResourceID resourceManagerResourceId = new ResourceID("/resource/manager/address/one");
        String dispatcherAddress = "localhost";
        String jobManagerAddress = "localhost";
        ResourceManagerGateway rmGateway = (ResourceManagerGateway)Mockito.mock(ResourceManagerGateway.class);
        Mockito.when((Object)rmGateway.registerTaskExecutor(Mockito.anyString(), (ResourceID)Mockito.any(ResourceID.class), (SlotReport)Mockito.any(SlotReport.class), (Time)Mockito.any(Time.class))).thenReturn(CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), resourceManagerResourceId, 10L)));
        TaskManagerConfiguration taskManagerServicesConfiguration = (TaskManagerConfiguration)Mockito.mock(TaskManagerConfiguration.class);
        Mockito.when((Object)taskManagerServicesConfiguration.getNumberSlots()).thenReturn((Object)1);
        this.rpc.registerGateway("/resource/manager/address/one", (RpcGateway)rmGateway);
        TaskManagerLocation taskManagerLocation = (TaskManagerLocation)Mockito.mock(TaskManagerLocation.class);
        Mockito.when((Object)taskManagerLocation.getResourceID()).thenReturn((Object)resourceID);
        StandaloneHaServices haServices = new StandaloneHaServices("/resource/manager/address/one", "localhost", "localhost");
        TaskSlotTable taskSlotTable = (TaskSlotTable)Mockito.mock(TaskSlotTable.class);
        SlotReport slotReport = new SlotReport();
        Mockito.when((Object)taskSlotTable.createSlotReport((ResourceID)Mockito.any(ResourceID.class))).thenReturn((Object)slotReport);
        TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
        TaskExecutor taskManager = new TaskExecutor((RpcService)this.rpc, taskManagerServicesConfiguration, taskManagerLocation, (MemoryManager)Mockito.mock(MemoryManager.class), (IOManager)Mockito.mock(IOManager.class), (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class), (HighAvailabilityServices)haServices, (HeartbeatServices)Mockito.mock(HeartbeatServices.class, (Answer)Mockito.RETURNS_MOCKS), (TaskManagerMetricGroup)Mockito.mock(TaskManagerMetricGroup.class), (BroadcastVariableManager)Mockito.mock(BroadcastVariableManager.class), (FileCache)Mockito.mock(FileCache.class), taskSlotTable, (JobManagerTable)Mockito.mock(JobManagerTable.class), (JobLeaderService)Mockito.mock(JobLeaderService.class), (FatalErrorHandler)testingFatalErrorHandler);
        try {
            taskManager.start();
            String taskManagerAddress = taskManager.getAddress();
            ((ResourceManagerGateway)Mockito.verify((Object)rmGateway, (VerificationMode)Mockito.timeout((long)this.timeout.toMilliseconds()))).registerTaskExecutor((String)Mockito.eq((Object)taskManagerAddress), (ResourceID)Mockito.eq((Object)resourceID), (SlotReport)Mockito.eq((Object)slotReport), (Time)Mockito.any(Time.class));
            testingFatalErrorHandler.rethrowError();
        }
        finally {
            taskManager.shutDown();
            taskManager.getTerminationFuture().get(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTriggerRegistrationOnLeaderChange() throws Exception {
        ResourceID tmResourceID = ResourceID.generate();
        String address1 = "/resource/manager/address/one";
        String address2 = "/resource/manager/address/two";
        UUID leaderId1 = UUID.randomUUID();
        UUID leaderId2 = UUID.randomUUID();
        ResourceID rmResourceId1 = new ResourceID("/resource/manager/address/one");
        ResourceID rmResourceId2 = new ResourceID("/resource/manager/address/two");
        ResourceManagerGateway rmGateway1 = (ResourceManagerGateway)Mockito.mock(ResourceManagerGateway.class);
        ResourceManagerGateway rmGateway2 = (ResourceManagerGateway)Mockito.mock(ResourceManagerGateway.class);
        Mockito.when((Object)rmGateway1.registerTaskExecutor(Mockito.anyString(), (ResourceID)Mockito.any(ResourceID.class), (SlotReport)Mockito.any(SlotReport.class), (Time)Mockito.any(Time.class))).thenReturn(CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId1, 10L)));
        Mockito.when((Object)rmGateway2.registerTaskExecutor(Mockito.anyString(), (ResourceID)Mockito.any(ResourceID.class), (SlotReport)Mockito.any(SlotReport.class), (Time)Mockito.any(Time.class))).thenReturn(CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId2, 10L)));
        this.rpc.registerGateway("/resource/manager/address/one", (RpcGateway)rmGateway1);
        this.rpc.registerGateway("/resource/manager/address/two", (RpcGateway)rmGateway2);
        TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService(null, null);
        TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
        haServices.setResourceManagerLeaderRetriever(testLeaderService);
        TaskManagerConfiguration taskManagerServicesConfiguration = (TaskManagerConfiguration)Mockito.mock(TaskManagerConfiguration.class);
        Mockito.when((Object)taskManagerServicesConfiguration.getNumberSlots()).thenReturn((Object)1);
        Mockito.when((Object)taskManagerServicesConfiguration.getConfiguration()).thenReturn((Object)new Configuration());
        Mockito.when((Object)taskManagerServicesConfiguration.getTmpDirectories()).thenReturn((Object)new String[1]);
        TaskManagerLocation taskManagerLocation = (TaskManagerLocation)Mockito.mock(TaskManagerLocation.class);
        Mockito.when((Object)taskManagerLocation.getResourceID()).thenReturn((Object)tmResourceID);
        Mockito.when((Object)taskManagerLocation.getHostname()).thenReturn((Object)"foobar");
        TaskSlotTable taskSlotTable = (TaskSlotTable)Mockito.mock(TaskSlotTable.class);
        SlotReport slotReport = new SlotReport();
        Mockito.when((Object)taskSlotTable.createSlotReport((ResourceID)Mockito.any(ResourceID.class))).thenReturn((Object)slotReport);
        TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
        TaskExecutor taskManager = new TaskExecutor((RpcService)this.rpc, taskManagerServicesConfiguration, taskManagerLocation, (MemoryManager)Mockito.mock(MemoryManager.class), (IOManager)Mockito.mock(IOManager.class), (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class), (HighAvailabilityServices)haServices, (HeartbeatServices)Mockito.mock(HeartbeatServices.class, (Answer)Mockito.RETURNS_MOCKS), (TaskManagerMetricGroup)Mockito.mock(TaskManagerMetricGroup.class), (BroadcastVariableManager)Mockito.mock(BroadcastVariableManager.class), (FileCache)Mockito.mock(FileCache.class), taskSlotTable, (JobManagerTable)Mockito.mock(JobManagerTable.class), (JobLeaderService)Mockito.mock(JobLeaderService.class), (FatalErrorHandler)testingFatalErrorHandler);
        try {
            taskManager.start();
            String taskManagerAddress = taskManager.getAddress();
            Assert.assertNull((Object)taskManager.getResourceManagerConnection());
            testLeaderService.notifyListener("/resource/manager/address/one", leaderId1);
            ((ResourceManagerGateway)Mockito.verify((Object)rmGateway1, (VerificationMode)Mockito.timeout((long)this.timeout.toMilliseconds()))).registerTaskExecutor((String)Mockito.eq((Object)taskManagerAddress), (ResourceID)Mockito.eq((Object)tmResourceID), (SlotReport)Mockito.any(SlotReport.class), (Time)Mockito.any(Time.class));
            Assert.assertNotNull((Object)taskManager.getResourceManagerConnection());
            testLeaderService.notifyListener(null, null);
            testLeaderService.notifyListener("/resource/manager/address/two", leaderId2);
            ((ResourceManagerGateway)Mockito.verify((Object)rmGateway2, (VerificationMode)Mockito.timeout((long)this.timeout.toMilliseconds()))).registerTaskExecutor((String)Mockito.eq((Object)taskManagerAddress), (ResourceID)Mockito.eq((Object)tmResourceID), (SlotReport)Mockito.eq((Object)slotReport), (Time)Mockito.any(Time.class));
            Assert.assertNotNull((Object)taskManager.getResourceManagerConnection());
            testingFatalErrorHandler.rethrowError();
        }
        finally {
            taskManager.shutDown();
            taskManager.getTerminationFuture().get(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=10000L)
    public void testTaskSubmission() throws Exception {
        Configuration configuration = new Configuration();
        TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration((Configuration)configuration);
        JobID jobId = new JobID();
        AllocationID allocationId = new AllocationID();
        JobMasterId jobMasterId = JobMasterId.generate();
        JobVertexID jobVertexId = new JobVertexID();
        JobInformation jobInformation = new JobInformation(jobId, this.name.getMethodName(), new SerializedValue((Object)new ExecutionConfig()), new Configuration(), Collections.emptyList(), Collections.emptyList());
        TaskInformation taskInformation = new TaskInformation(jobVertexId, "test task", 1, 1, TestInvokable.class.getName(), new Configuration());
        SerializedValue serializedJobInformation = new SerializedValue((Object)jobInformation);
        SerializedValue serializedJobVertexInformation = new SerializedValue((Object)taskInformation);
        TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jobId, (TaskDeploymentDescriptor.MaybeOffloaded)new TaskDeploymentDescriptor.NonOffloaded(serializedJobInformation), (TaskDeploymentDescriptor.MaybeOffloaded)new TaskDeploymentDescriptor.NonOffloaded(serializedJobVertexInformation), new ExecutionAttemptID(), allocationId, 0, 0, 0, null, Collections.emptyList(), Collections.emptyList());
        LibraryCacheManager libraryCacheManager = (LibraryCacheManager)Mockito.mock(LibraryCacheManager.class);
        Mockito.when((Object)libraryCacheManager.getClassLoader((JobID)Mockito.any(JobID.class))).thenReturn((Object)ClassLoader.getSystemClassLoader());
        JobMasterGateway jobMasterGateway = (JobMasterGateway)Mockito.mock(JobMasterGateway.class);
        Mockito.when((Object)jobMasterGateway.getFencingToken()).thenReturn((Object)jobMasterId);
        BlobCacheService blobService = new BlobCacheService((PermanentBlobCache)Mockito.mock(PermanentBlobCache.class), (TransientBlobCache)Mockito.mock(TransientBlobCache.class));
        JobManagerConnection jobManagerConnection = new JobManagerConnection(jobId, ResourceID.generate(), jobMasterGateway, (TaskManagerActions)Mockito.mock(TaskManagerActions.class), (CheckpointResponder)Mockito.mock(CheckpointResponder.class), blobService, libraryCacheManager, (ResultPartitionConsumableNotifier)Mockito.mock(ResultPartitionConsumableNotifier.class), (PartitionProducerStateChecker)Mockito.mock(PartitionProducerStateChecker.class));
        JobManagerTable jobManagerTable = new JobManagerTable();
        jobManagerTable.put(jobId, jobManagerConnection);
        TaskSlotTable taskSlotTable = (TaskSlotTable)Mockito.mock(TaskSlotTable.class);
        Mockito.when((Object)taskSlotTable.existsActiveSlot((JobID)Mockito.eq((Object)jobId), (AllocationID)Mockito.eq((Object)allocationId))).thenReturn((Object)true);
        Mockito.when((Object)taskSlotTable.addTask((Task)Mockito.any(Task.class))).thenReturn((Object)true);
        NetworkEnvironment networkEnvironment = (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class);
        Mockito.when((Object)networkEnvironment.createKvStateTaskRegistry((JobID)Mockito.eq((Object)jobId), (JobVertexID)Mockito.eq((Object)jobVertexId))).thenReturn(Mockito.mock(TaskKvStateRegistry.class));
        TaskManagerMetricGroup taskManagerMetricGroup = (TaskManagerMetricGroup)Mockito.mock(TaskManagerMetricGroup.class);
        TaskMetricGroup taskMetricGroup = (TaskMetricGroup)Mockito.mock(TaskMetricGroup.class);
        Mockito.when((Object)taskMetricGroup.getIOMetricGroup()).thenReturn(Mockito.mock(TaskIOMetricGroup.class));
        Mockito.when((Object)taskManagerMetricGroup.addTaskForJob((JobID)Mockito.any(JobID.class), Mockito.anyString(), (JobVertexID)Mockito.any(JobVertexID.class), (ExecutionAttemptID)Mockito.any(ExecutionAttemptID.class), Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt())).thenReturn((Object)taskMetricGroup);
        HighAvailabilityServices haServices = (HighAvailabilityServices)Mockito.mock(HighAvailabilityServices.class);
        Mockito.when((Object)haServices.getResourceManagerLeaderRetriever()).thenReturn(Mockito.mock(LeaderRetrievalService.class));
        TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
        TaskExecutor taskManager = new TaskExecutor((RpcService)this.rpc, taskManagerConfiguration, (TaskManagerLocation)Mockito.mock(TaskManagerLocation.class), (MemoryManager)Mockito.mock(MemoryManager.class), (IOManager)Mockito.mock(IOManager.class), networkEnvironment, haServices, (HeartbeatServices)Mockito.mock(HeartbeatServices.class, (Answer)Mockito.RETURNS_MOCKS), taskManagerMetricGroup, (BroadcastVariableManager)Mockito.mock(BroadcastVariableManager.class), (FileCache)Mockito.mock(FileCache.class), taskSlotTable, jobManagerTable, (JobLeaderService)Mockito.mock(JobLeaderService.class), (FatalErrorHandler)testingFatalErrorHandler);
        try {
            taskManager.start();
            TaskExecutorGateway tmGateway = (TaskExecutorGateway)taskManager.getSelfGateway(TaskExecutorGateway.class);
            tmGateway.submitTask(tdd, jobMasterId, this.timeout);
            CompletableFuture<Boolean> completionFuture = TestInvokable.completableFuture;
            completionFuture.get();
            testingFatalErrorHandler.rethrowError();
        }
        finally {
            taskManager.shutDown();
            taskManager.getTerminationFuture().get(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testJobLeaderDetection() throws Exception {
        JobID jobId = new JobID();
        Configuration configuration = new Configuration();
        TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration((Configuration)configuration);
        ResourceID resourceId = new ResourceID("foobar");
        TaskManagerLocation taskManagerLocation = new TaskManagerLocation(resourceId, InetAddress.getLoopbackAddress(), 1234);
        TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
        TimerService timerService = (TimerService)Mockito.mock(TimerService.class);
        TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList((ResourceProfile)Mockito.mock(ResourceProfile.class)), timerService);
        JobManagerTable jobManagerTable = new JobManagerTable();
        JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
        TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
        TestingLeaderRetrievalService resourceManagerLeaderRetrievalService = new TestingLeaderRetrievalService(null, null);
        TestingLeaderRetrievalService jobManagerLeaderRetrievalService = new TestingLeaderRetrievalService(null, null);
        haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetrievalService);
        haServices.setJobMasterLeaderRetriever(jobId, jobManagerLeaderRetrievalService);
        String resourceManagerAddress = "rm";
        ResourceManagerId resourceManagerLeaderId = ResourceManagerId.generate();
        ResourceID resourceManagerResourceId = new ResourceID("rm");
        ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway)Mockito.mock(ResourceManagerGateway.class);
        InstanceID registrationId = new InstanceID();
        Mockito.when((Object)resourceManagerGateway.registerTaskExecutor((String)Mockito.any(String.class), (ResourceID)Mockito.eq((Object)resourceId), (SlotReport)Mockito.any(SlotReport.class), (Time)Mockito.any(Time.class))).thenReturn(CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 1000L)));
        String jobManagerAddress = "jm";
        UUID jobManagerLeaderId = UUID.randomUUID();
        ResourceID jmResourceId = new ResourceID("jm");
        int blobPort = 42;
        JobMasterGateway jobMasterGateway = (JobMasterGateway)Mockito.mock(JobMasterGateway.class);
        Mockito.when((Object)jobMasterGateway.registerTaskManager((String)Mockito.any(String.class), (TaskManagerLocation)Mockito.eq((Object)taskManagerLocation), (Time)Mockito.any(Time.class))).thenReturn(CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId, 42)));
        Mockito.when((Object)jobMasterGateway.getHostname()).thenReturn((Object)"jm");
        Mockito.when((Object)jobMasterGateway.offerSlots((ResourceID)Mockito.any(ResourceID.class), (Iterable)Mockito.any(Iterable.class), (Time)Mockito.any(Time.class))).thenReturn(Mockito.mock(CompletableFuture.class, (Answer)Mockito.RETURNS_MOCKS));
        this.rpc.registerGateway("rm", (RpcGateway)resourceManagerGateway);
        this.rpc.registerGateway("jm", (RpcGateway)jobMasterGateway);
        AllocationID allocationId = new AllocationID();
        SlotID slotId = new SlotID(resourceId, 0);
        SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN);
        TaskExecutor taskManager = new TaskExecutor((RpcService)this.rpc, taskManagerConfiguration, taskManagerLocation, (MemoryManager)Mockito.mock(MemoryManager.class), (IOManager)Mockito.mock(IOManager.class), (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class), (HighAvailabilityServices)haServices, (HeartbeatServices)Mockito.mock(HeartbeatServices.class, (Answer)Mockito.RETURNS_MOCKS), (TaskManagerMetricGroup)Mockito.mock(TaskManagerMetricGroup.class), (BroadcastVariableManager)Mockito.mock(BroadcastVariableManager.class), (FileCache)Mockito.mock(FileCache.class), taskSlotTable, jobManagerTable, jobLeaderService, (FatalErrorHandler)testingFatalErrorHandler);
        try {
            taskManager.start();
            TaskExecutorGateway tmGateway = (TaskExecutorGateway)taskManager.getSelfGateway(TaskExecutorGateway.class);
            resourceManagerLeaderRetrievalService.notifyListener("rm", resourceManagerLeaderId.toUUID());
            CompletableFuture slotRequestAck = tmGateway.requestSlot(slotId, jobId, allocationId, "jm", resourceManagerLeaderId, this.timeout);
            slotRequestAck.get();
            jobManagerLeaderRetrievalService.notifyListener("jm", jobManagerLeaderId);
            ((JobMasterGateway)Mockito.verify((Object)jobMasterGateway, (VerificationMode)Mockito.timeout((long)this.timeout.toMilliseconds()))).offerSlots((ResourceID)Mockito.any(ResourceID.class), (Iterable)Matchers.argThat((Matcher)org.hamcrest.Matchers.contains((Object[])new SlotOffer[]{slotOffer})), (Time)Mockito.any(Time.class));
            testingFatalErrorHandler.rethrowError();
        }
        finally {
            taskManager.shutDown();
            taskManager.getTerminationFuture().get(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSlotAcceptance() throws Exception {
        JobID jobId = new JobID();
        Configuration configuration = new Configuration();
        TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration((Configuration)configuration);
        ResourceID resourceId = new ResourceID("foobar");
        TaskManagerLocation taskManagerLocation = new TaskManagerLocation(resourceId, InetAddress.getLoopbackAddress(), 1234);
        TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
        TimerService timerService = (TimerService)Mockito.mock(TimerService.class);
        TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList((ResourceProfile)Mockito.mock(ResourceProfile.class), (ResourceProfile)Mockito.mock(ResourceProfile.class)), timerService);
        JobManagerTable jobManagerTable = new JobManagerTable();
        JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
        TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
        String resourceManagerAddress = "rm";
        UUID resourceManagerLeaderId = UUID.randomUUID();
        ResourceID resourceManagerResourceId = new ResourceID("rm");
        String jobManagerAddress = "jm";
        UUID jobManagerLeaderId = UUID.randomUUID();
        TestingLeaderRetrievalService resourceManagerLeaderRetrievalService = new TestingLeaderRetrievalService("rm", resourceManagerLeaderId);
        TestingLeaderRetrievalService jobManagerLeaderRetrievalService = new TestingLeaderRetrievalService("jm", jobManagerLeaderId);
        haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetrievalService);
        haServices.setJobMasterLeaderRetriever(jobId, jobManagerLeaderRetrievalService);
        ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway)Mockito.mock(ResourceManagerGateway.class);
        InstanceID registrationId = new InstanceID();
        Mockito.when((Object)resourceManagerGateway.registerTaskExecutor((String)Mockito.any(String.class), (ResourceID)Mockito.eq((Object)resourceId), (SlotReport)Mockito.any(SlotReport.class), (Time)Mockito.any(Time.class))).thenReturn(CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 1000L)));
        ResourceID jmResourceId = new ResourceID("jm");
        int blobPort = 42;
        AllocationID allocationId1 = new AllocationID();
        AllocationID allocationId2 = new AllocationID();
        SlotOffer offer1 = new SlotOffer(allocationId1, 0, ResourceProfile.UNKNOWN);
        JobMasterGateway jobMasterGateway = (JobMasterGateway)Mockito.mock(JobMasterGateway.class);
        Mockito.when((Object)jobMasterGateway.registerTaskManager((String)Mockito.any(String.class), (TaskManagerLocation)Mockito.eq((Object)taskManagerLocation), (Time)Mockito.any(Time.class))).thenReturn(CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId, 42)));
        Mockito.when((Object)jobMasterGateway.getHostname()).thenReturn((Object)"jm");
        Mockito.when((Object)jobMasterGateway.offerSlots((ResourceID)Mockito.any(ResourceID.class), (Iterable)Mockito.any(Iterable.class), (Time)Mockito.any(Time.class))).thenReturn(CompletableFuture.completedFuture(Collections.singleton(offer1)));
        this.rpc.registerGateway("rm", (RpcGateway)resourceManagerGateway);
        this.rpc.registerGateway("jm", (RpcGateway)jobMasterGateway);
        TaskExecutor taskManager = new TaskExecutor((RpcService)this.rpc, taskManagerConfiguration, taskManagerLocation, (MemoryManager)Mockito.mock(MemoryManager.class), (IOManager)Mockito.mock(IOManager.class), (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class), (HighAvailabilityServices)haServices, (HeartbeatServices)Mockito.mock(HeartbeatServices.class, (Answer)Mockito.RETURNS_MOCKS), (TaskManagerMetricGroup)Mockito.mock(TaskManagerMetricGroup.class), (BroadcastVariableManager)Mockito.mock(BroadcastVariableManager.class), (FileCache)Mockito.mock(FileCache.class), taskSlotTable, jobManagerTable, jobLeaderService, (FatalErrorHandler)testingFatalErrorHandler);
        try {
            taskManager.start();
            ((ResourceManagerGateway)Mockito.verify((Object)resourceManagerGateway, (VerificationMode)Mockito.timeout((long)this.timeout.toMilliseconds()))).registerTaskExecutor((String)Mockito.eq((Object)taskManager.getAddress()), (ResourceID)Mockito.eq((Object)resourceId), (SlotReport)Mockito.any(SlotReport.class), (Time)Mockito.any(Time.class));
            taskSlotTable.allocateSlot(0, jobId, allocationId1, Time.milliseconds((long)10000L));
            taskSlotTable.allocateSlot(1, jobId, allocationId2, Time.milliseconds((long)10000L));
            jobLeaderService.addJob(jobId, "jm");
            ((ResourceManagerGateway)Mockito.verify((Object)resourceManagerGateway, (VerificationMode)Mockito.timeout((long)this.timeout.toMilliseconds()))).notifySlotAvailable((InstanceID)Mockito.eq((Object)registrationId), (SlotID)Mockito.eq((Object)new SlotID(resourceId, 1)), (AllocationID)Mockito.eq((Object)allocationId2));
            Assert.assertTrue((boolean)taskSlotTable.existsActiveSlot(jobId, allocationId1));
            Assert.assertFalse((boolean)taskSlotTable.existsActiveSlot(jobId, allocationId2));
            Assert.assertTrue((boolean)taskSlotTable.isSlotFree(1));
            testingFatalErrorHandler.rethrowError();
        }
        finally {
            taskManager.shutDown();
            taskManager.getTerminationFuture().get(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Ignore
    @Test
    public void testRejectAllocationRequestsForOutOfSyncSlots() throws Exception {
        ResourceID resourceID = ResourceID.generate();
        String address1 = "/resource/manager/address/one";
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        JobID jobId = new JobID();
        String jobManagerAddress = "foobar";
        ResourceManagerGateway rmGateway1 = (ResourceManagerGateway)Mockito.mock(ResourceManagerGateway.class);
        this.rpc.registerGateway("/resource/manager/address/one", (RpcGateway)rmGateway1);
        TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService("/resource/manager/address/one", HighAvailabilityServices.DEFAULT_LEADER_ID);
        TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
        haServices.setResourceManagerLeaderRetriever(testLeaderService);
        TaskManagerConfiguration taskManagerServicesConfiguration = (TaskManagerConfiguration)Mockito.mock(TaskManagerConfiguration.class);
        Mockito.when((Object)taskManagerServicesConfiguration.getNumberSlots()).thenReturn((Object)1);
        TaskManagerLocation taskManagerLocation = (TaskManagerLocation)Mockito.mock(TaskManagerLocation.class);
        Mockito.when((Object)taskManagerLocation.getResourceID()).thenReturn((Object)resourceID);
        TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
        TaskSlotTable taskSlotTable = (TaskSlotTable)Mockito.mock(TaskSlotTable.class);
        Mockito.when((Object)taskSlotTable.createSlotReport((ResourceID)Mockito.any(ResourceID.class))).thenReturn((Object)new SlotReport());
        Mockito.when((Object)taskSlotTable.getCurrentAllocation(1)).thenReturn((Object)new AllocationID());
        Mockito.when((Object)rmGateway1.registerTaskExecutor(Mockito.anyString(), (ResourceID)Mockito.eq((Object)resourceID), (SlotReport)Mockito.any(SlotReport.class), (Time)Mockito.any(Time.class))).thenReturn(CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), ResourceID.generate(), 1000L)));
        TaskExecutor taskManager = new TaskExecutor((RpcService)this.rpc, taskManagerServicesConfiguration, taskManagerLocation, (MemoryManager)Mockito.mock(MemoryManager.class), (IOManager)Mockito.mock(IOManager.class), (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class), (HighAvailabilityServices)haServices, (HeartbeatServices)Mockito.mock(HeartbeatServices.class, (Answer)Mockito.RETURNS_MOCKS), (TaskManagerMetricGroup)Mockito.mock(TaskManagerMetricGroup.class), (BroadcastVariableManager)Mockito.mock(BroadcastVariableManager.class), (FileCache)Mockito.mock(FileCache.class), taskSlotTable, (JobManagerTable)Mockito.mock(JobManagerTable.class), (JobLeaderService)Mockito.mock(JobLeaderService.class), (FatalErrorHandler)testingFatalErrorHandler);
        try {
            taskManager.start();
            TaskExecutorGateway tmGateway = (TaskExecutorGateway)taskManager.getSelfGateway(TaskExecutorGateway.class);
            String taskManagerAddress = tmGateway.getAddress();
            Assert.assertNull((Object)taskManager.getResourceManagerConnection());
            testLeaderService.notifyListener("/resource/manager/address/one", resourceManagerId.toUUID());
            ((ResourceManagerGateway)Mockito.verify((Object)rmGateway1, (VerificationMode)Mockito.timeout((long)this.timeout.toMilliseconds()))).registerTaskExecutor((String)Mockito.eq((Object)taskManagerAddress), (ResourceID)Mockito.eq((Object)resourceID), (SlotReport)Mockito.any(SlotReport.class), (Time)Mockito.any(Time.class));
            Assert.assertNotNull((Object)taskManager.getResourceManagerConnection());
            SlotID slotID = new SlotID(resourceID, 0);
            tmGateway.requestSlot(slotID, jobId, new AllocationID(), "foobar", resourceManagerId, this.timeout);
            SlotID unconfirmedFreeSlotID = new SlotID(resourceID, 1);
            CompletableFuture requestSlotFuture = tmGateway.requestSlot(unconfirmedFreeSlotID, jobId, new AllocationID(), "foobar", resourceManagerId, this.timeout);
            try {
                requestSlotFuture.get();
                Assert.fail((String)"The slot request should have failed.");
            }
            catch (Exception e) {
                Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, SlotAllocationException.class).isPresent());
            }
            ((ResourceManagerGateway)Mockito.verify((Object)rmGateway1, (VerificationMode)Mockito.timeout((long)this.timeout.toMilliseconds()))).registerTaskExecutor((String)Mockito.eq((Object)taskManagerAddress), (ResourceID)Mockito.eq((Object)resourceID), (SlotReport)Mockito.any(SlotReport.class), (Time)Mockito.any(Time.class));
            testLeaderService.notifyListener("/resource/manager/address/one", resourceManagerId.toUUID());
            tmGateway.requestSlot(unconfirmedFreeSlotID, jobId, new AllocationID(), "foobar", resourceManagerId, this.timeout);
            testingFatalErrorHandler.rethrowError();
        }
        finally {
            taskManager.shutDown();
            taskManager.getTerminationFuture().get(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSubmitTaskBeforeAcceptSlot() throws Exception {
        JobID jobId = new JobID();
        Configuration configuration = new Configuration();
        TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration((Configuration)configuration);
        ResourceID resourceId = new ResourceID("foobar");
        TaskManagerLocation taskManagerLocation = new TaskManagerLocation(resourceId, InetAddress.getLoopbackAddress(), 1234);
        TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
        TimerService timerService = (TimerService)Mockito.mock(TimerService.class);
        TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList((ResourceProfile)Mockito.mock(ResourceProfile.class), (ResourceProfile)Mockito.mock(ResourceProfile.class)), timerService);
        JobManagerTable jobManagerTable = new JobManagerTable();
        JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
        TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
        String resourceManagerAddress = "rm";
        UUID resourceManagerLeaderId = UUID.randomUUID();
        ResourceID resourceManagerResourceId = new ResourceID("rm");
        String jobManagerAddress = "jm";
        JobMasterId jobMasterId = JobMasterId.generate();
        TestingLeaderRetrievalService resourceManagerLeaderRetrievalService = new TestingLeaderRetrievalService("rm", resourceManagerLeaderId);
        TestingLeaderRetrievalService jobManagerLeaderRetrievalService = new TestingLeaderRetrievalService("jm", jobMasterId.toUUID());
        haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetrievalService);
        haServices.setJobMasterLeaderRetriever(jobId, jobManagerLeaderRetrievalService);
        ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway)Mockito.mock(ResourceManagerGateway.class);
        InstanceID registrationId = new InstanceID();
        Mockito.when((Object)resourceManagerGateway.registerTaskExecutor((String)Mockito.any(String.class), (ResourceID)Mockito.eq((Object)resourceId), (SlotReport)Mockito.any(SlotReport.class), (Time)Mockito.any(Time.class))).thenReturn(CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 1000L)));
        ResourceID jmResourceId = new ResourceID("jm");
        int blobPort = 42;
        AllocationID allocationId1 = new AllocationID();
        AllocationID allocationId2 = new AllocationID();
        SlotOffer offer1 = new SlotOffer(allocationId1, 0, ResourceProfile.UNKNOWN);
        JobMasterGateway jobMasterGateway = (JobMasterGateway)Mockito.mock(JobMasterGateway.class);
        Mockito.when((Object)jobMasterGateway.registerTaskManager((String)Mockito.any(String.class), (TaskManagerLocation)Mockito.eq((Object)taskManagerLocation), (Time)Mockito.any(Time.class))).thenReturn(CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId, 42)));
        Mockito.when((Object)jobMasterGateway.getHostname()).thenReturn((Object)"jm");
        Mockito.when((Object)jobMasterGateway.updateTaskExecutionState((TaskExecutionState)Mockito.any(TaskExecutionState.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
        this.rpc.registerGateway("rm", (RpcGateway)resourceManagerGateway);
        this.rpc.registerGateway("jm", (RpcGateway)jobMasterGateway);
        LibraryCacheManager libraryCacheManager = (LibraryCacheManager)Mockito.mock(LibraryCacheManager.class);
        Mockito.when((Object)libraryCacheManager.getClassLoader((JobID)Mockito.eq((Object)jobId))).thenReturn((Object)((Object)((Object)this)).getClass().getClassLoader());
        BlobCacheService blobService = new BlobCacheService((PermanentBlobCache)Mockito.mock(PermanentBlobCache.class), (TransientBlobCache)Mockito.mock(TransientBlobCache.class));
        JobManagerConnection jobManagerConnection = new JobManagerConnection(jobId, jmResourceId, jobMasterGateway, (TaskManagerActions)Mockito.mock(TaskManagerActions.class), (CheckpointResponder)Mockito.mock(CheckpointResponder.class), blobService, libraryCacheManager, (ResultPartitionConsumableNotifier)Mockito.mock(ResultPartitionConsumableNotifier.class), (PartitionProducerStateChecker)Mockito.mock(PartitionProducerStateChecker.class));
        TaskManagerMetricGroup taskManagerMetricGroup = (TaskManagerMetricGroup)Mockito.mock(TaskManagerMetricGroup.class);
        TaskMetricGroup taskMetricGroup = (TaskMetricGroup)Mockito.mock(TaskMetricGroup.class);
        Mockito.when((Object)taskMetricGroup.getIOMetricGroup()).thenReturn(Mockito.mock(TaskIOMetricGroup.class));
        Mockito.when((Object)taskManagerMetricGroup.addTaskForJob((JobID)Mockito.any(JobID.class), Mockito.anyString(), (JobVertexID)Mockito.any(JobVertexID.class), (ExecutionAttemptID)Mockito.any(ExecutionAttemptID.class), Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt())).thenReturn((Object)taskMetricGroup);
        NetworkEnvironment networkMock = (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class, (Answer)Mockito.RETURNS_MOCKS);
        TaskExecutor taskManager = new TaskExecutor((RpcService)this.rpc, taskManagerConfiguration, taskManagerLocation, (MemoryManager)Mockito.mock(MemoryManager.class), (IOManager)Mockito.mock(IOManager.class), networkMock, (HighAvailabilityServices)haServices, (HeartbeatServices)Mockito.mock(HeartbeatServices.class, (Answer)Mockito.RETURNS_MOCKS), taskManagerMetricGroup, (BroadcastVariableManager)Mockito.mock(BroadcastVariableManager.class), (FileCache)Mockito.mock(FileCache.class), taskSlotTable, jobManagerTable, jobLeaderService, (FatalErrorHandler)testingFatalErrorHandler);
        try {
            taskManager.start();
            TaskExecutorGateway tmGateway = (TaskExecutorGateway)taskManager.getSelfGateway(TaskExecutorGateway.class);
            taskSlotTable.allocateSlot(0, jobId, allocationId1, Time.milliseconds((long)10000L));
            taskSlotTable.allocateSlot(1, jobId, allocationId2, Time.milliseconds((long)10000L));
            JobVertexID jobVertexId = new JobVertexID();
            JobInformation jobInformation = new JobInformation(jobId, this.name.getMethodName(), new SerializedValue((Object)new ExecutionConfig()), new Configuration(), Collections.emptyList(), Collections.emptyList());
            TaskInformation taskInformation = new TaskInformation(jobVertexId, "test task", 1, 1, TestInvokable.class.getName(), new Configuration());
            SerializedValue serializedJobInformation = new SerializedValue((Object)jobInformation);
            SerializedValue serializedJobVertexInformation = new SerializedValue((Object)taskInformation);
            TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jobId, (TaskDeploymentDescriptor.MaybeOffloaded)new TaskDeploymentDescriptor.NonOffloaded(serializedJobInformation), (TaskDeploymentDescriptor.MaybeOffloaded)new TaskDeploymentDescriptor.NonOffloaded(serializedJobVertexInformation), new ExecutionAttemptID(), allocationId1, 0, 0, 0, null, Collections.emptyList(), Collections.emptyList());
            CompletableFuture<Set<SlotOffer>> offerResultFuture = new CompletableFuture<Set<SlotOffer>>();
            Mockito.when((Object)jobMasterGateway.offerSlots((ResourceID)Mockito.any(ResourceID.class), (Iterable)Mockito.any(Iterable.class), (Time)Mockito.any(Time.class))).thenReturn(offerResultFuture);
            jobLeaderService.addJob(jobId, "jm");
            ((JobMasterGateway)Mockito.verify((Object)jobMasterGateway, (VerificationMode)Mockito.timeout((long)this.timeout.toMilliseconds()))).offerSlots((ResourceID)Mockito.any(ResourceID.class), (Iterable)Mockito.any(Iterable.class), (Time)Mockito.any(Time.class));
            tmGateway.submitTask(tdd, jobMasterId, this.timeout);
            offerResultFuture.complete(Collections.singleton(offer1));
            ((ResourceManagerGateway)Mockito.verify((Object)resourceManagerGateway, (VerificationMode)Mockito.timeout((long)this.timeout.toMilliseconds()))).notifySlotAvailable((InstanceID)Mockito.eq((Object)registrationId), (SlotID)Mockito.eq((Object)new SlotID(resourceId, 1)), (AllocationID)Mockito.any(AllocationID.class));
            Assert.assertTrue((boolean)taskSlotTable.existsActiveSlot(jobId, allocationId1));
            Assert.assertFalse((boolean)taskSlotTable.existsActiveSlot(jobId, allocationId2));
            Assert.assertTrue((boolean)taskSlotTable.isSlotFree(1));
            testingFatalErrorHandler.rethrowError();
        }
        finally {
            taskManager.shutDown();
            taskManager.getTerminationFuture().get(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFilterOutDuplicateJobMasterRegistrations() throws Exception {
        long verificationTimeout = 500L;
        Configuration configuration = new Configuration();
        TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
        JobLeaderService jobLeaderService = (JobLeaderService)Mockito.mock(JobLeaderService.class);
        TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration((Configuration)configuration);
        TaskManagerLocation taskManagerLocation = new TaskManagerLocation(ResourceID.generate(), InetAddress.getLocalHost(), 1234);
        HighAvailabilityServices haServicesMock = (HighAvailabilityServices)Mockito.mock(HighAvailabilityServices.class, (Answer)Mockito.RETURNS_MOCKS);
        HeartbeatServices heartbeatServicesMock = (HeartbeatServices)Mockito.mock(HeartbeatServices.class, (Answer)Mockito.RETURNS_MOCKS);
        JobID jobId = new JobID();
        JobMasterGateway jobMasterGateway = (JobMasterGateway)Mockito.mock(JobMasterGateway.class);
        Mockito.when((Object)jobMasterGateway.getHostname()).thenReturn((Object)"localhost");
        JMTMRegistrationSuccess registrationMessage = new JMTMRegistrationSuccess(ResourceID.generate(), 1);
        JobManagerTable jobManagerTableMock = (JobManagerTable)Mockito.spy((Object)new JobManagerTable());
        TaskExecutor taskExecutor = new TaskExecutor((RpcService)this.rpc, taskManagerConfiguration, taskManagerLocation, (MemoryManager)Mockito.mock(MemoryManager.class), (IOManager)Mockito.mock(IOManager.class), (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class), haServicesMock, heartbeatServicesMock, (TaskManagerMetricGroup)Mockito.mock(TaskManagerMetricGroup.class), (BroadcastVariableManager)Mockito.mock(BroadcastVariableManager.class), (FileCache)Mockito.mock(FileCache.class), (TaskSlotTable)Mockito.mock(TaskSlotTable.class), jobManagerTableMock, jobLeaderService, (FatalErrorHandler)testingFatalErrorHandler);
        try {
            taskExecutor.start();
            ArgumentCaptor jobLeaderListenerArgumentCaptor = ArgumentCaptor.forClass(JobLeaderListener.class);
            ((JobLeaderService)Mockito.verify((Object)jobLeaderService)).start(Mockito.anyString(), (RpcService)Mockito.any(RpcService.class), (HighAvailabilityServices)Mockito.any(HighAvailabilityServices.class), (JobLeaderListener)jobLeaderListenerArgumentCaptor.capture());
            JobLeaderListener taskExecutorListener = (JobLeaderListener)jobLeaderListenerArgumentCaptor.getValue();
            taskExecutorListener.jobManagerGainedLeadership(jobId, jobMasterGateway, registrationMessage);
            taskExecutorListener.jobManagerGainedLeadership(jobId, jobMasterGateway, registrationMessage);
            ArgumentCaptor jobManagerConnectionArgumentCaptor = ArgumentCaptor.forClass(JobManagerConnection.class);
            ((JobManagerTable)Mockito.verify((Object)jobManagerTableMock, (VerificationMode)Mockito.timeout((long)500L).times(1))).put((JobID)Mockito.eq((Object)jobId), (JobManagerConnection)jobManagerConnectionArgumentCaptor.capture());
            JobManagerConnection jobManagerConnection = (JobManagerConnection)jobManagerConnectionArgumentCaptor.getValue();
            Assert.assertEquals((Object)jobMasterGateway, (Object)jobManagerConnection.getJobManagerGateway());
            testingFatalErrorHandler.rethrowError();
        }
        finally {
            taskExecutor.shutDown();
            taskExecutor.getTerminationFuture().get(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        }
    }

    public static class TestInvokable
    extends AbstractInvokable {
        static final CompletableFuture<Boolean> completableFuture = new CompletableFuture();

        public void invoke() throws Exception {
            completableFuture.complete(true);
        }
    }
}

