/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskexecutor;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.BlobView;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.taskexecutor.JobLeaderListener;
import org.apache.flink.runtime.taskexecutor.JobLeaderService;
import org.apache.flink.runtime.taskexecutor.JobManagerConnection;
import org.apache.flink.runtime.taskexecutor.JobManagerTable;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import org.apache.flink.runtime.taskexecutor.TaskManagerServicesBuilder;
import org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException;
import org.apache.flink.runtime.taskexecutor.exceptions.TaskManagerException;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.TimerService;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.hamcrest.MockitoHamcrest;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;

public class TaskExecutorTest
extends TestLogger {
    @Rule
    public final TemporaryFolder tmp = new TemporaryFolder();
    private static final Time timeout = Time.milliseconds((long)10000L);
    private TestingRpcService rpc;
    private BlobCacheService dummyBlobCacheService;
    private TimerService<AllocationID> timerService;
    private Configuration configuration;
    private TaskManagerConfiguration taskManagerConfiguration;
    private TaskManagerLocation taskManagerLocation;
    private JobID jobId;
    private TestingFatalErrorHandler testingFatalErrorHandler;
    private TestingHighAvailabilityServices haServices;
    private SettableLeaderRetrievalService resourceManagerLeaderRetriever;
    private SettableLeaderRetrievalService jobManagerLeaderRetriever;
    @Rule
    public TestName name = new TestName();

    @Before
    public void setup() throws IOException {
        this.rpc = new TestingRpcService();
        this.timerService = new TimerService(TestingUtils.defaultExecutor(), timeout.toMilliseconds());
        this.dummyBlobCacheService = new BlobCacheService(new Configuration(), (BlobView)new VoidBlobStore(), null);
        this.configuration = new Configuration();
        this.taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration((Configuration)this.configuration);
        this.taskManagerLocation = new LocalTaskManagerLocation();
        this.jobId = new JobID();
        this.testingFatalErrorHandler = new TestingFatalErrorHandler();
        this.haServices = new TestingHighAvailabilityServices();
        this.resourceManagerLeaderRetriever = new SettableLeaderRetrievalService();
        this.jobManagerLeaderRetriever = new SettableLeaderRetrievalService();
        this.haServices.setResourceManagerLeaderRetriever((LeaderRetrievalService)this.resourceManagerLeaderRetriever);
        this.haServices.setJobMasterLeaderRetriever(this.jobId, (LeaderRetrievalService)this.jobManagerLeaderRetriever);
    }

    @After
    public void teardown() throws Exception {
        if (this.rpc != null) {
            RpcUtils.terminateRpcService((RpcService)this.rpc, (Time)timeout);
            this.rpc = null;
        }
        if (this.timerService != null) {
            this.timerService.stop();
            this.timerService = null;
        }
        if (this.dummyBlobCacheService != null) {
            this.dummyBlobCacheService.close();
            this.dummyBlobCacheService = null;
        }
        this.testingFatalErrorHandler.rethrowError();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testHeartbeatTimeoutWithJobManager() throws Exception {
        TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(ResourceProfile.UNKNOWN), this.timerService);
        JobLeaderService jobLeaderService = new JobLeaderService(this.taskManagerLocation);
        long heartbeatInterval = 1L;
        long heartbeatTimeout = 3L;
        HeartbeatServices heartbeatServices = new HeartbeatServices(1L, 3L);
        String jobMasterAddress = "jm";
        UUID jmLeaderId = UUID.randomUUID();
        ResourceID jmResourceId = ResourceID.generate();
        CompletableFuture taskManagerLocationFuture = new CompletableFuture();
        CompletableFuture disconnectTaskManagerFuture = new CompletableFuture();
        TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder().setRegisterTaskManagerFunction((s, taskManagerLocation) -> {
            taskManagerLocationFuture.complete(taskManagerLocation);
            return CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId));
        }).setDisconnectTaskManagerFunction(resourceID -> {
            disconnectTaskManagerFuture.complete(resourceID);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).build();
        TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(false, new File[]{this.tmp.newFolder()}, Executors.directExecutor());
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskManagerLocation(this.taskManagerLocation).setTaskSlotTable(taskSlotTable).setJobLeaderService(jobLeaderService).setTaskStateManager(localStateStoresManager).build();
        TaskExecutor taskManager = new TaskExecutor((RpcService)this.rpc, this.taskManagerConfiguration, (HighAvailabilityServices)this.haServices, taskManagerServices, heartbeatServices, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), null, this.dummyBlobCacheService, (FatalErrorHandler)this.testingFatalErrorHandler);
        try {
            taskManager.start();
            this.rpc.registerGateway("jm", (RpcGateway)jobMasterGateway);
            jobLeaderService.addJob(this.jobId, "jm");
            this.jobManagerLeaderRetriever.notifyListener("jm", jmLeaderId);
            TaskManagerLocation taskManagerLocation1 = (TaskManagerLocation)taskManagerLocationFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            Assert.assertThat((Object)taskManagerLocation1, (Matcher)Matchers.equalTo((Object)this.taskManagerLocation));
            ResourceID resourceID2 = (ResourceID)disconnectTaskManagerFuture.get(150L, TimeUnit.MILLISECONDS);
            Assert.assertThat((Object)resourceID2, (Matcher)Matchers.equalTo((Object)this.taskManagerLocation.getResourceID()));
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)taskManager, (Time)timeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testHeartbeatTimeoutWithResourceManager() throws Exception {
        String rmAddress = "rm";
        ResourceID rmResourceId = new ResourceID("rm");
        long heartbeatInterval = 1L;
        long heartbeatTimeout = 3L;
        ResourceManagerId rmLeaderId = ResourceManagerId.generate();
        TestingResourceManagerGateway rmGateway = new TestingResourceManagerGateway(rmLeaderId, rmResourceId, "rm", "rm");
        TaskExecutorRegistrationSuccess registrationResponse = new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId, new ClusterInformation("localhost", 1234));
        CompletableFuture taskExecutorRegistrationFuture = new CompletableFuture();
        CountDownLatch registrationAttempts = new CountDownLatch(2);
        rmGateway.setRegisterTaskExecutorFunction(registration -> {
            taskExecutorRegistrationFuture.complete(registration.f1);
            registrationAttempts.countDown();
            return CompletableFuture.completedFuture(registrationResponse);
        });
        CompletableFuture taskExecutorDisconnectFuture = new CompletableFuture();
        rmGateway.setDisconnectTaskExecutorConsumer(disconnectInfo -> taskExecutorDisconnectFuture.complete(disconnectInfo.f0));
        this.rpc.registerGateway("rm", (RpcGateway)rmGateway);
        TaskSlotTable taskSlotTable = (TaskSlotTable)Mockito.mock(TaskSlotTable.class);
        SlotReport slotReport = new SlotReport();
        Mockito.when((Object)taskSlotTable.createSlotReport((ResourceID)Mockito.any(ResourceID.class))).thenReturn((Object)slotReport);
        HeartbeatServices heartbeatServices = new HeartbeatServices(1L, 3L);
        TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(false, new File[]{this.tmp.newFolder()}, Executors.directExecutor());
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskManagerLocation(this.taskManagerLocation).setTaskSlotTable(taskSlotTable).setTaskStateManager(localStateStoresManager).build();
        TaskExecutor taskManager = new TaskExecutor((RpcService)this.rpc, this.taskManagerConfiguration, (HighAvailabilityServices)this.haServices, taskManagerServices, heartbeatServices, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), null, this.dummyBlobCacheService, (FatalErrorHandler)this.testingFatalErrorHandler);
        try {
            taskManager.start();
            this.resourceManagerLeaderRetriever.notifyListener("rm", rmLeaderId.toUUID());
            Assert.assertThat(taskExecutorRegistrationFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS), (Matcher)Matchers.equalTo((Object)this.taskManagerLocation.getResourceID()));
            Assert.assertThat(taskExecutorDisconnectFuture.get(150L, TimeUnit.MILLISECONDS), (Matcher)Matchers.equalTo((Object)this.taskManagerLocation.getResourceID()));
            registrationAttempts.await();
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)taskManager, (Time)timeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testHeartbeatSlotReporting() throws Exception {
        long verificationTimeout = 1000L;
        long heartbeatTimeout = 10000L;
        String rmAddress = "rm";
        UUID rmLeaderId = UUID.randomUUID();
        TestingResourceManagerGateway rmGateway = new TestingResourceManagerGateway();
        CompletableFuture taskExecutorRegistrationFuture = new CompletableFuture();
        ResourceID rmResourceId = rmGateway.getOwnResourceId();
        CompletableFuture<TaskExecutorRegistrationSuccess> registrationResponse = CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId, new ClusterInformation("localhost", 1234)));
        rmGateway.setRegisterTaskExecutorFunction(stringResourceIDIntegerHardwareDescriptionTuple4 -> {
            taskExecutorRegistrationFuture.complete(stringResourceIDIntegerHardwareDescriptionTuple4.f1);
            return registrationResponse;
        });
        CompletableFuture initialSlotReportFuture = new CompletableFuture();
        rmGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3 -> {
            initialSlotReportFuture.complete(resourceIDInstanceIDSlotReportTuple3.f2);
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        CompletableFuture heartbeatSlotReportFuture = new CompletableFuture();
        rmGateway.setTaskExecutorHeartbeatConsumer((resourceID, slotReport) -> heartbeatSlotReportFuture.complete(slotReport));
        this.rpc.registerGateway("rm", (RpcGateway)rmGateway);
        TaskSlotTable taskSlotTable = (TaskSlotTable)Mockito.mock(TaskSlotTable.class);
        SlotID slotId = new SlotID(this.taskManagerLocation.getResourceID(), 0);
        ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
        SlotReport slotReport1 = new SlotReport(new SlotStatus(slotId, resourceProfile));
        SlotReport slotReport2 = new SlotReport(new SlotStatus(slotId, resourceProfile, new JobID(), new AllocationID()));
        Mockito.when((Object)taskSlotTable.createSlotReport((ResourceID)Mockito.any(ResourceID.class))).thenReturn((Object)slotReport1, (Object[])new SlotReport[]{slotReport2});
        HeartbeatServices heartbeatServices = (HeartbeatServices)Mockito.mock(HeartbeatServices.class);
        Mockito.when((Object)heartbeatServices.createHeartbeatManager((ResourceID)Mockito.eq((Object)this.taskManagerLocation.getResourceID()), (HeartbeatListener)Mockito.any(HeartbeatListener.class), (ScheduledExecutor)Mockito.any(ScheduledExecutor.class), (Logger)Mockito.any(Logger.class))).thenAnswer((Answer)new Answer<HeartbeatManagerImpl<SlotReport, Void>>(){

            public HeartbeatManagerImpl<SlotReport, Void> answer(InvocationOnMock invocation) throws Throwable {
                return (HeartbeatManagerImpl)Mockito.spy((Object)new HeartbeatManagerImpl(10000L, TaskExecutorTest.this.taskManagerLocation.getResourceID(), (HeartbeatListener)invocation.getArguments()[1], (Executor)invocation.getArguments()[2], (ScheduledExecutor)invocation.getArguments()[2], (Logger)invocation.getArguments()[3]));
            }
        });
        TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(false, new File[]{this.tmp.newFolder()}, Executors.directExecutor());
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskManagerLocation(this.taskManagerLocation).setTaskSlotTable(taskSlotTable).setTaskStateManager(localStateStoresManager).build();
        TaskExecutor taskManager = new TaskExecutor((RpcService)this.rpc, this.taskManagerConfiguration, (HighAvailabilityServices)this.haServices, taskManagerServices, heartbeatServices, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), null, this.dummyBlobCacheService, (FatalErrorHandler)this.testingFatalErrorHandler);
        try {
            taskManager.start();
            HeartbeatManager heartbeatManager = taskManager.getResourceManagerHeartbeatManager();
            this.resourceManagerLeaderRetriever.notifyListener("rm", rmLeaderId);
            Assert.assertThat(taskExecutorRegistrationFuture.get(), (Matcher)Matchers.equalTo((Object)this.taskManagerLocation.getResourceID()));
            Assert.assertThat(initialSlotReportFuture.get(), (Matcher)Matchers.equalTo((Object)slotReport1));
            ((HeartbeatManager)Mockito.verify((Object)heartbeatManager, (VerificationMode)Mockito.timeout((long)1000L))).monitorTarget((ResourceID)Mockito.any(ResourceID.class), (HeartbeatTarget)Mockito.any(HeartbeatTarget.class));
            TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)taskManager.getSelfGateway(TaskExecutorGateway.class);
            taskExecutorGateway.heartbeatFromResourceManager(rmResourceId);
            SlotReport actualSlotReport = (SlotReport)heartbeatSlotReportFuture.get();
            Assert.assertEquals((Object)slotReport2, (Object)actualSlotReport);
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)taskManager, (Time)timeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception {
        String resourceManagerAddress = "/resource/manager/address/one";
        ResourceID resourceManagerResourceId = new ResourceID("/resource/manager/address/one");
        String dispatcherAddress = "localhost";
        String jobManagerAddress = "localhost";
        String webMonitorAddress = "localhost";
        ResourceManagerGateway rmGateway = (ResourceManagerGateway)Mockito.mock(ResourceManagerGateway.class);
        Mockito.when((Object)rmGateway.registerTaskExecutor(Mockito.anyString(), (ResourceID)Mockito.any(ResourceID.class), Mockito.anyInt(), (HardwareDescription)Mockito.any(HardwareDescription.class), (Time)Mockito.any(Time.class))).thenReturn(CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), resourceManagerResourceId, new ClusterInformation("localhost", 1234))));
        this.rpc.registerGateway("/resource/manager/address/one", (RpcGateway)rmGateway);
        StandaloneHaServices haServices = new StandaloneHaServices("/resource/manager/address/one", "localhost", "localhost", "localhost");
        TaskSlotTable taskSlotTable = (TaskSlotTable)Mockito.mock(TaskSlotTable.class);
        SlotReport slotReport = new SlotReport();
        Mockito.when((Object)taskSlotTable.createSlotReport((ResourceID)Mockito.any(ResourceID.class))).thenReturn((Object)slotReport);
        TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(false, new File[]{this.tmp.newFolder()}, Executors.directExecutor());
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskManagerLocation(this.taskManagerLocation).setTaskSlotTable(taskSlotTable).setTaskStateManager(localStateStoresManager).build();
        TaskExecutor taskManager = new TaskExecutor((RpcService)this.rpc, this.taskManagerConfiguration, (HighAvailabilityServices)haServices, taskManagerServices, new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), null, this.dummyBlobCacheService, (FatalErrorHandler)this.testingFatalErrorHandler);
        try {
            taskManager.start();
            String taskManagerAddress = taskManager.getAddress();
            ((ResourceManagerGateway)Mockito.verify((Object)rmGateway, (VerificationMode)Mockito.timeout((long)timeout.toMilliseconds()))).registerTaskExecutor((String)Mockito.eq((Object)taskManagerAddress), (ResourceID)Mockito.eq((Object)this.taskManagerLocation.getResourceID()), Mockito.anyInt(), (HardwareDescription)Mockito.any(HardwareDescription.class), (Time)Mockito.any(Time.class));
        }
        finally {
            taskManager.shutDown();
            taskManager.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTriggerRegistrationOnLeaderChange() throws Exception {
        String address1 = "/resource/manager/address/one";
        String address2 = "/resource/manager/address/two";
        UUID leaderId1 = UUID.randomUUID();
        UUID leaderId2 = UUID.randomUUID();
        ResourceID rmResourceId1 = new ResourceID("/resource/manager/address/one");
        ResourceID rmResourceId2 = new ResourceID("/resource/manager/address/two");
        ResourceManagerGateway rmGateway1 = (ResourceManagerGateway)Mockito.mock(ResourceManagerGateway.class);
        ResourceManagerGateway rmGateway2 = (ResourceManagerGateway)Mockito.mock(ResourceManagerGateway.class);
        Mockito.when((Object)rmGateway1.registerTaskExecutor(Mockito.anyString(), (ResourceID)Mockito.any(ResourceID.class), Mockito.anyInt(), (HardwareDescription)Mockito.any(HardwareDescription.class), (Time)Mockito.any(Time.class))).thenReturn(CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId1, new ClusterInformation("localhost", 1234))));
        Mockito.when((Object)rmGateway2.registerTaskExecutor(Mockito.anyString(), (ResourceID)Mockito.any(ResourceID.class), Mockito.anyInt(), (HardwareDescription)Mockito.any(HardwareDescription.class), (Time)Mockito.any(Time.class))).thenReturn(CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId2, new ClusterInformation("localhost", 1234))));
        this.rpc.registerGateway("/resource/manager/address/one", (RpcGateway)rmGateway1);
        this.rpc.registerGateway("/resource/manager/address/two", (RpcGateway)rmGateway2);
        TaskSlotTable taskSlotTable = (TaskSlotTable)Mockito.mock(TaskSlotTable.class);
        SlotReport slotReport = new SlotReport();
        Mockito.when((Object)taskSlotTable.createSlotReport((ResourceID)Mockito.any(ResourceID.class))).thenReturn((Object)slotReport);
        TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(false, new File[]{this.tmp.newFolder()}, Executors.directExecutor());
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskManagerLocation(this.taskManagerLocation).setTaskSlotTable(taskSlotTable).setTaskStateManager(localStateStoresManager).build();
        TaskExecutor taskManager = new TaskExecutor((RpcService)this.rpc, this.taskManagerConfiguration, (HighAvailabilityServices)this.haServices, taskManagerServices, new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), null, this.dummyBlobCacheService, (FatalErrorHandler)this.testingFatalErrorHandler);
        try {
            taskManager.start();
            String taskManagerAddress = taskManager.getAddress();
            Assert.assertNull((Object)taskManager.getResourceManagerConnection());
            this.resourceManagerLeaderRetriever.notifyListener("/resource/manager/address/one", leaderId1);
            ((ResourceManagerGateway)Mockito.verify((Object)rmGateway1, (VerificationMode)Mockito.timeout((long)timeout.toMilliseconds()))).registerTaskExecutor((String)Mockito.eq((Object)taskManagerAddress), (ResourceID)Mockito.eq((Object)this.taskManagerLocation.getResourceID()), Mockito.anyInt(), (HardwareDescription)Mockito.any(HardwareDescription.class), (Time)Mockito.any(Time.class));
            Assert.assertNotNull((Object)taskManager.getResourceManagerConnection());
            this.resourceManagerLeaderRetriever.notifyListener(null, null);
            this.resourceManagerLeaderRetriever.notifyListener("/resource/manager/address/two", leaderId2);
            ((ResourceManagerGateway)Mockito.verify((Object)rmGateway2, (VerificationMode)Mockito.timeout((long)timeout.toMilliseconds()))).registerTaskExecutor((String)Mockito.eq((Object)taskManagerAddress), (ResourceID)Mockito.eq((Object)this.taskManagerLocation.getResourceID()), Mockito.anyInt(), (HardwareDescription)Mockito.any(HardwareDescription.class), (Time)Mockito.any(Time.class));
            Assert.assertNotNull((Object)taskManager.getResourceManagerConnection());
        }
        finally {
            taskManager.shutDown();
            taskManager.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=10000L)
    public void testTaskSubmission() throws Exception {
        AllocationID allocationId = new AllocationID();
        JobMasterId jobMasterId = JobMasterId.generate();
        JobVertexID jobVertexId = new JobVertexID();
        JobInformation jobInformation = new JobInformation(this.jobId, this.name.getMethodName(), new SerializedValue((Object)new ExecutionConfig()), new Configuration(), Collections.emptyList(), Collections.emptyList());
        TaskInformation taskInformation = new TaskInformation(jobVertexId, "test task", 1, 1, TestInvokable.class.getName(), new Configuration());
        SerializedValue serializedJobInformation = new SerializedValue((Object)jobInformation);
        SerializedValue serializedJobVertexInformation = new SerializedValue((Object)taskInformation);
        TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(this.jobId, (TaskDeploymentDescriptor.MaybeOffloaded)new TaskDeploymentDescriptor.NonOffloaded(serializedJobInformation), (TaskDeploymentDescriptor.MaybeOffloaded)new TaskDeploymentDescriptor.NonOffloaded(serializedJobVertexInformation), new ExecutionAttemptID(), allocationId, 0, 0, 0, null, Collections.emptyList(), Collections.emptyList());
        LibraryCacheManager libraryCacheManager = (LibraryCacheManager)Mockito.mock(LibraryCacheManager.class);
        Mockito.when((Object)libraryCacheManager.getClassLoader((JobID)Mockito.any(JobID.class))).thenReturn((Object)ClassLoader.getSystemClassLoader());
        JobMasterGateway jobMasterGateway = (JobMasterGateway)Mockito.mock(JobMasterGateway.class);
        Mockito.when((Object)jobMasterGateway.getFencingToken()).thenReturn((Object)jobMasterId);
        JobManagerConnection jobManagerConnection = new JobManagerConnection(this.jobId, ResourceID.generate(), jobMasterGateway, (TaskManagerActions)Mockito.mock(TaskManagerActions.class), (CheckpointResponder)Mockito.mock(CheckpointResponder.class), libraryCacheManager, (ResultPartitionConsumableNotifier)new NoOpResultPartitionConsumableNotifier(), (PartitionProducerStateChecker)Mockito.mock(PartitionProducerStateChecker.class));
        JobManagerTable jobManagerTable = new JobManagerTable();
        jobManagerTable.put(this.jobId, jobManagerConnection);
        TaskSlotTable taskSlotTable = (TaskSlotTable)Mockito.mock(TaskSlotTable.class);
        Mockito.when((Object)taskSlotTable.tryMarkSlotActive((JobID)Mockito.eq((Object)this.jobId), (AllocationID)Mockito.eq((Object)allocationId))).thenReturn((Object)true);
        Mockito.when((Object)taskSlotTable.addTask((Task)Mockito.any(Task.class))).thenReturn((Object)true);
        TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
        NetworkEnvironment networkEnvironment = (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class);
        Mockito.when((Object)networkEnvironment.createKvStateTaskRegistry((JobID)Mockito.eq((Object)this.jobId), (JobVertexID)Mockito.eq((Object)jobVertexId))).thenReturn(Mockito.mock(TaskKvStateRegistry.class));
        Mockito.when((Object)networkEnvironment.getTaskEventDispatcher()).thenReturn((Object)taskEventDispatcher);
        TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(false, new File[]{this.tmp.newFolder()}, Executors.directExecutor());
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setNetworkEnvironment(networkEnvironment).setTaskSlotTable(taskSlotTable).setJobManagerTable(jobManagerTable).setTaskStateManager(localStateStoresManager).build();
        TaskExecutor taskManager = new TaskExecutor((RpcService)this.rpc, this.taskManagerConfiguration, (HighAvailabilityServices)this.haServices, taskManagerServices, new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), null, this.dummyBlobCacheService, (FatalErrorHandler)this.testingFatalErrorHandler);
        try {
            taskManager.start();
            TaskExecutorGateway tmGateway = (TaskExecutorGateway)taskManager.getSelfGateway(TaskExecutorGateway.class);
            tmGateway.submitTask(tdd, jobMasterId, timeout);
            CompletableFuture<Boolean> completionFuture = TestInvokable.COMPLETABLE_FUTURE;
            completionFuture.get();
        }
        finally {
            taskManager.shutDown();
            taskManager.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testJobLeaderDetection() throws Exception {
        TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), this.timerService);
        JobManagerTable jobManagerTable = new JobManagerTable();
        JobLeaderService jobLeaderService = new JobLeaderService(this.taskManagerLocation);
        String resourceManagerAddress = "rm";
        ResourceManagerId resourceManagerLeaderId = ResourceManagerId.generate();
        TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
        CompletableFuture initialSlotReportFuture = new CompletableFuture();
        resourceManagerGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3 -> {
            initialSlotReportFuture.complete(null);
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        String jobManagerAddress = "jm";
        UUID jobManagerLeaderId = UUID.randomUUID();
        ResourceID jmResourceId = new ResourceID("jm");
        JobMasterGateway jobMasterGateway = (JobMasterGateway)Mockito.mock(JobMasterGateway.class);
        Mockito.when((Object)jobMasterGateway.registerTaskManager((String)Mockito.any(String.class), (TaskManagerLocation)Mockito.eq((Object)this.taskManagerLocation), (Time)Mockito.any(Time.class))).thenReturn(CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId)));
        Mockito.when((Object)jobMasterGateway.getHostname()).thenReturn((Object)"jm");
        Mockito.when((Object)jobMasterGateway.offerSlots((ResourceID)Mockito.any(ResourceID.class), (Collection)Mockito.any(Collection.class), (Time)Mockito.any(Time.class))).thenReturn(Mockito.mock(CompletableFuture.class, (Answer)Mockito.RETURNS_MOCKS));
        this.rpc.registerGateway("rm", (RpcGateway)resourceManagerGateway);
        this.rpc.registerGateway("jm", (RpcGateway)jobMasterGateway);
        AllocationID allocationId = new AllocationID();
        SlotID slotId = new SlotID(this.taskManagerLocation.getResourceID(), 0);
        SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN);
        TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(false, new File[]{this.tmp.newFolder()}, Executors.directExecutor());
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskManagerLocation(this.taskManagerLocation).setTaskSlotTable(taskSlotTable).setJobManagerTable(jobManagerTable).setJobLeaderService(jobLeaderService).setTaskStateManager(localStateStoresManager).build();
        TaskExecutor taskManager = new TaskExecutor((RpcService)this.rpc, this.taskManagerConfiguration, (HighAvailabilityServices)this.haServices, taskManagerServices, new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), null, this.dummyBlobCacheService, (FatalErrorHandler)this.testingFatalErrorHandler);
        try {
            taskManager.start();
            TaskExecutorGateway tmGateway = (TaskExecutorGateway)taskManager.getSelfGateway(TaskExecutorGateway.class);
            this.resourceManagerLeaderRetriever.notifyListener("rm", resourceManagerLeaderId.toUUID());
            initialSlotReportFuture.get();
            CompletableFuture slotRequestAck = tmGateway.requestSlot(slotId, this.jobId, allocationId, "jm", resourceManagerLeaderId, timeout);
            slotRequestAck.get();
            this.jobManagerLeaderRetriever.notifyListener("jm", jobManagerLeaderId);
            ((JobMasterGateway)Mockito.verify((Object)jobMasterGateway, (VerificationMode)Mockito.timeout((long)timeout.toMilliseconds()))).offerSlots((ResourceID)Mockito.any(ResourceID.class), (Collection)MockitoHamcrest.argThat((Matcher)Matchers.contains((Object[])new SlotOffer[]{slotOffer})), (Time)Mockito.any(Time.class));
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)taskManager, (Time)timeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSlotAcceptance() throws Exception {
        TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList((ResourceProfile)Mockito.mock(ResourceProfile.class), (ResourceProfile)Mockito.mock(ResourceProfile.class)), this.timerService);
        JobManagerTable jobManagerTable = new JobManagerTable();
        JobLeaderService jobLeaderService = new JobLeaderService(this.taskManagerLocation);
        String resourceManagerAddress = "rm";
        UUID resourceManagerLeaderId = UUID.randomUUID();
        String jobManagerAddress = "jm";
        UUID jobManagerLeaderId = UUID.randomUUID();
        this.resourceManagerLeaderRetriever.notifyListener("rm", resourceManagerLeaderId);
        this.jobManagerLeaderRetriever.notifyListener("jm", jobManagerLeaderId);
        TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
        ResourceID resourceManagerResourceId = resourceManagerGateway.getOwnResourceId();
        InstanceID registrationId = new InstanceID();
        CompletableFuture registrationFuture = new CompletableFuture();
        resourceManagerGateway.setRegisterTaskExecutorFunction(stringResourceIDIntegerHardwareDescriptionTuple4 -> {
            registrationFuture.complete(stringResourceIDIntegerHardwareDescriptionTuple4.f1);
            return CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, new ClusterInformation("localhost", 1234)));
        });
        CompletableFuture availableSlotFuture = new CompletableFuture();
        resourceManagerGateway.setNotifySlotAvailableConsumer(availableSlotFuture::complete);
        ResourceID jmResourceId = new ResourceID("jm");
        AllocationID allocationId1 = new AllocationID();
        AllocationID allocationId2 = new AllocationID();
        SlotOffer offer1 = new SlotOffer(allocationId1, 0, ResourceProfile.UNKNOWN);
        JobMasterGateway jobMasterGateway = (JobMasterGateway)Mockito.mock(JobMasterGateway.class);
        Mockito.when((Object)jobMasterGateway.registerTaskManager((String)Mockito.any(String.class), (TaskManagerLocation)Mockito.eq((Object)this.taskManagerLocation), (Time)Mockito.any(Time.class))).thenReturn(CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId)));
        Mockito.when((Object)jobMasterGateway.getHostname()).thenReturn((Object)"jm");
        Mockito.when((Object)jobMasterGateway.offerSlots((ResourceID)Mockito.any(ResourceID.class), (Collection)Mockito.any(Collection.class), (Time)Mockito.any(Time.class))).thenReturn(CompletableFuture.completedFuture(Collections.singleton(offer1)));
        this.rpc.registerGateway("rm", (RpcGateway)resourceManagerGateway);
        this.rpc.registerGateway("jm", (RpcGateway)jobMasterGateway);
        TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(false, new File[]{this.tmp.newFolder()}, Executors.directExecutor());
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskManagerLocation(this.taskManagerLocation).setTaskSlotTable(taskSlotTable).setJobManagerTable(jobManagerTable).setJobLeaderService(jobLeaderService).setTaskStateManager(localStateStoresManager).build();
        TaskExecutor taskManager = new TaskExecutor((RpcService)this.rpc, this.taskManagerConfiguration, (HighAvailabilityServices)this.haServices, taskManagerServices, new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), null, this.dummyBlobCacheService, (FatalErrorHandler)this.testingFatalErrorHandler);
        try {
            taskManager.start();
            Assert.assertThat(registrationFuture.get(), (Matcher)Matchers.equalTo((Object)this.taskManagerLocation.getResourceID()));
            taskSlotTable.allocateSlot(0, this.jobId, allocationId1, Time.milliseconds((long)10000L));
            taskSlotTable.allocateSlot(1, this.jobId, allocationId2, Time.milliseconds((long)10000L));
            jobLeaderService.addJob(this.jobId, "jm");
            Tuple3 instanceIDSlotIDAllocationIDTuple3 = (Tuple3)availableSlotFuture.get();
            Tuple3 expectedResult = Tuple3.of((Object)registrationId, (Object)new SlotID(this.taskManagerLocation.getResourceID(), 1), (Object)allocationId2);
            Assert.assertThat((Object)instanceIDSlotIDAllocationIDTuple3, (Matcher)Matchers.equalTo((Object)expectedResult));
            Assert.assertTrue((boolean)taskSlotTable.tryMarkSlotActive(this.jobId, allocationId1));
            Assert.assertFalse((boolean)taskSlotTable.tryMarkSlotActive(this.jobId, allocationId2));
            Assert.assertTrue((boolean)taskSlotTable.isSlotFree(1));
        }
        finally {
            taskManager.shutDown();
            taskManager.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSubmitTaskBeforeAcceptSlot() throws Exception {
        TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList((ResourceProfile)Mockito.mock(ResourceProfile.class), (ResourceProfile)Mockito.mock(ResourceProfile.class)), this.timerService);
        JobManagerTable jobManagerTable = new JobManagerTable();
        JobLeaderService jobLeaderService = new JobLeaderService(this.taskManagerLocation);
        TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
        this.resourceManagerLeaderRetriever.notifyListener(resourceManagerGateway.getAddress(), resourceManagerGateway.getFencingToken().toUUID());
        CompletableFuture availableSlotFuture = new CompletableFuture();
        resourceManagerGateway.setNotifySlotAvailableConsumer(availableSlotFuture::complete);
        AllocationID allocationId1 = new AllocationID();
        AllocationID allocationId2 = new AllocationID();
        SlotOffer offer1 = new SlotOffer(allocationId1, 0, ResourceProfile.UNKNOWN);
        OneShotLatch offerSlotsLatch = new OneShotLatch();
        OneShotLatch taskInTerminalState = new OneShotLatch();
        CompletableFuture<Set<SlotOffer>> offerResultFuture = new CompletableFuture<Set<SlotOffer>>();
        TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder().setOfferSlotsFunction((resourceID, slotOffers) -> {
            offerSlotsLatch.trigger();
            return offerResultFuture;
        }).setUpdateTaskExecutionStateFunction(taskExecutionState -> {
            if (taskExecutionState.getExecutionState().isTerminal()) {
                taskInTerminalState.trigger();
            }
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).build();
        this.jobManagerLeaderRetriever.notifyListener(jobMasterGateway.getAddress(), jobMasterGateway.getFencingToken().toUUID());
        this.rpc.registerGateway(resourceManagerGateway.getAddress(), (RpcGateway)resourceManagerGateway);
        this.rpc.registerGateway(jobMasterGateway.getAddress(), (RpcGateway)jobMasterGateway);
        NetworkEnvironment networkMock = (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class, (Answer)Mockito.RETURNS_MOCKS);
        TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(false, new File[]{this.tmp.newFolder()}, Executors.directExecutor());
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskManagerLocation(this.taskManagerLocation).setNetworkEnvironment(networkMock).setTaskSlotTable(taskSlotTable).setJobLeaderService(jobLeaderService).setJobManagerTable(jobManagerTable).setTaskStateManager(localStateStoresManager).build();
        TaskExecutor taskManager = new TaskExecutor((RpcService)this.rpc, this.taskManagerConfiguration, (HighAvailabilityServices)this.haServices, taskManagerServices, new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), null, this.dummyBlobCacheService, (FatalErrorHandler)this.testingFatalErrorHandler);
        try {
            taskManager.start();
            TaskExecutorGateway tmGateway = (TaskExecutorGateway)taskManager.getSelfGateway(TaskExecutorGateway.class);
            taskSlotTable.allocateSlot(0, this.jobId, allocationId1, Time.milliseconds((long)10000L));
            taskSlotTable.allocateSlot(1, this.jobId, allocationId2, Time.milliseconds((long)10000L));
            JobVertexID jobVertexId = new JobVertexID();
            JobInformation jobInformation = new JobInformation(this.jobId, this.name.getMethodName(), new SerializedValue((Object)new ExecutionConfig()), new Configuration(), Collections.emptyList(), Collections.emptyList());
            TaskInformation taskInformation = new TaskInformation(jobVertexId, "test task", 1, 1, NoOpInvokable.class.getName(), new Configuration());
            SerializedValue serializedJobInformation = new SerializedValue((Object)jobInformation);
            SerializedValue serializedJobVertexInformation = new SerializedValue((Object)taskInformation);
            TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(this.jobId, (TaskDeploymentDescriptor.MaybeOffloaded)new TaskDeploymentDescriptor.NonOffloaded(serializedJobInformation), (TaskDeploymentDescriptor.MaybeOffloaded)new TaskDeploymentDescriptor.NonOffloaded(serializedJobVertexInformation), new ExecutionAttemptID(), allocationId1, 0, 0, 0, null, Collections.emptyList(), Collections.emptyList());
            jobLeaderService.addJob(this.jobId, jobMasterGateway.getAddress());
            offerSlotsLatch.await();
            tmGateway.submitTask(tdd, jobMasterGateway.getFencingToken(), timeout).get();
            offerResultFuture.complete(Collections.singleton(offer1));
            Tuple3 instanceIDSlotIDAllocationIDTuple3 = (Tuple3)availableSlotFuture.get();
            Assert.assertThat((Object)instanceIDSlotIDAllocationIDTuple3.f1, (Matcher)Matchers.equalTo((Object)new SlotID(this.taskManagerLocation.getResourceID(), 1)));
            Assert.assertTrue((boolean)taskSlotTable.tryMarkSlotActive(this.jobId, allocationId1));
            Assert.assertFalse((boolean)taskSlotTable.tryMarkSlotActive(this.jobId, allocationId2));
            Assert.assertTrue((boolean)taskSlotTable.isSlotFree(1));
            taskInTerminalState.await();
        }
        finally {
            taskManager.shutDown();
            taskManager.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFilterOutDuplicateJobMasterRegistrations() throws Exception {
        long verificationTimeout = 500L;
        JobLeaderService jobLeaderService = (JobLeaderService)Mockito.mock(JobLeaderService.class);
        HeartbeatServices heartbeatServicesMock = (HeartbeatServices)Mockito.mock(HeartbeatServices.class, (Answer)Mockito.RETURNS_MOCKS);
        JobMasterGateway jobMasterGateway = (JobMasterGateway)Mockito.mock(JobMasterGateway.class);
        Mockito.when((Object)jobMasterGateway.getHostname()).thenReturn((Object)"localhost");
        JMTMRegistrationSuccess registrationMessage = new JMTMRegistrationSuccess(ResourceID.generate());
        JobManagerTable jobManagerTableMock = (JobManagerTable)Mockito.spy((Object)new JobManagerTable());
        TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(false, new File[]{this.tmp.newFolder()}, Executors.directExecutor());
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskManagerLocation(this.taskManagerLocation).setJobManagerTable(jobManagerTableMock).setJobLeaderService(jobLeaderService).setTaskStateManager(localStateStoresManager).build();
        TaskExecutor taskExecutor = new TaskExecutor((RpcService)this.rpc, this.taskManagerConfiguration, (HighAvailabilityServices)this.haServices, taskManagerServices, heartbeatServicesMock, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), null, this.dummyBlobCacheService, (FatalErrorHandler)this.testingFatalErrorHandler);
        try {
            taskExecutor.start();
            ArgumentCaptor jobLeaderListenerArgumentCaptor = ArgumentCaptor.forClass(JobLeaderListener.class);
            ((JobLeaderService)Mockito.verify((Object)jobLeaderService)).start(Mockito.anyString(), (RpcService)Mockito.any(RpcService.class), (HighAvailabilityServices)Mockito.any(HighAvailabilityServices.class), (JobLeaderListener)jobLeaderListenerArgumentCaptor.capture());
            JobLeaderListener taskExecutorListener = (JobLeaderListener)jobLeaderListenerArgumentCaptor.getValue();
            taskExecutorListener.jobManagerGainedLeadership(this.jobId, jobMasterGateway, registrationMessage);
            taskExecutorListener.jobManagerGainedLeadership(this.jobId, jobMasterGateway, registrationMessage);
            ArgumentCaptor jobManagerConnectionArgumentCaptor = ArgumentCaptor.forClass(JobManagerConnection.class);
            ((JobManagerTable)Mockito.verify((Object)jobManagerTableMock, (VerificationMode)Mockito.timeout((long)500L).times(1))).put((JobID)Mockito.eq((Object)this.jobId), (JobManagerConnection)jobManagerConnectionArgumentCaptor.capture());
            JobManagerConnection jobManagerConnection = (JobManagerConnection)jobManagerConnectionArgumentCaptor.getValue();
            Assert.assertEquals((Object)jobMasterGateway, (Object)jobManagerConnection.getJobManagerGateway());
        }
        finally {
            taskExecutor.shutDown();
            taskExecutor.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRMHeartbeatStopWhenLeadershipRevoked() throws Exception {
        long heartbeatInterval = 1L;
        long heartbeatTimeout = 10000L;
        long pollTimeout = 1000L;
        RecordingHeartbeatServices heartbeatServices = new RecordingHeartbeatServices(1L, 10000L);
        ResourceID rmResourceID = ResourceID.generate();
        TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), this.timerService);
        String rmAddress = "rm";
        TestingResourceManagerGateway rmGateway = new TestingResourceManagerGateway(ResourceManagerId.generate(), rmResourceID, "rm", "rm");
        this.rpc.registerGateway("rm", (RpcGateway)rmGateway);
        TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(false, new File[]{this.tmp.newFolder()}, Executors.directExecutor());
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskManagerLocation(this.taskManagerLocation).setTaskSlotTable(taskSlotTable).setTaskStateManager(localStateStoresManager).build();
        TaskExecutor taskExecutor = new TaskExecutor((RpcService)this.rpc, this.taskManagerConfiguration, (HighAvailabilityServices)this.haServices, taskManagerServices, (HeartbeatServices)heartbeatServices, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), null, this.dummyBlobCacheService, (FatalErrorHandler)this.testingFatalErrorHandler);
        try {
            taskExecutor.start();
            BlockingQueue<ResourceID> unmonitoredTargets = heartbeatServices.getUnmonitoredTargets();
            BlockingQueue<ResourceID> monitoredTargets = heartbeatServices.getMonitoredTargets();
            this.resourceManagerLeaderRetriever.notifyListener("rm", rmGateway.getFencingToken().toUUID());
            Assert.assertThat((Object)monitoredTargets.poll(1000L, TimeUnit.MILLISECONDS), (Matcher)Matchers.equalTo((Object)rmResourceID));
            this.resourceManagerLeaderRetriever.notifyListener(null, null);
            Assert.assertThat((Object)unmonitoredTargets.poll(1000L, TimeUnit.MILLISECONDS), (Matcher)Matchers.equalTo((Object)rmResourceID));
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)taskExecutor, (Time)timeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRemoveJobFromJobLeaderService() throws Exception {
        TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), this.timerService);
        TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(false, new File[]{this.tmp.newFolder()}, Executors.directExecutor());
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskManagerLocation(this.taskManagerLocation).setTaskSlotTable(taskSlotTable).setTaskStateManager(localStateStoresManager).build();
        TaskExecutor taskExecutor = new TaskExecutor((RpcService)this.rpc, this.taskManagerConfiguration, (HighAvailabilityServices)this.haServices, taskManagerServices, new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), null, this.dummyBlobCacheService, (FatalErrorHandler)this.testingFatalErrorHandler);
        try {
            TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
            CompletableFuture initialSlotReport = new CompletableFuture();
            resourceManagerGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3 -> {
                initialSlotReport.complete(null);
                return CompletableFuture.completedFuture(Acknowledge.get());
            });
            ResourceManagerId resourceManagerId = resourceManagerGateway.getFencingToken();
            this.rpc.registerGateway(resourceManagerGateway.getAddress(), (RpcGateway)resourceManagerGateway);
            this.resourceManagerLeaderRetriever.notifyListener(resourceManagerGateway.getAddress(), resourceManagerId.toUUID());
            CompletableFuture startFuture = new CompletableFuture();
            CompletableFuture stopFuture = new CompletableFuture();
            StartStopNotifyingLeaderRetrievalService jobMasterLeaderRetriever = new StartStopNotifyingLeaderRetrievalService(startFuture, stopFuture);
            this.haServices.setJobMasterLeaderRetriever(this.jobId, jobMasterLeaderRetriever);
            taskExecutor.start();
            TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)taskExecutor.getSelfGateway(TaskExecutorGateway.class);
            SlotID slotId = new SlotID(this.taskManagerLocation.getResourceID(), 0);
            AllocationID allocationId = new AllocationID();
            Assert.assertThat((Object)startFuture.isDone(), (Matcher)Matchers.is((Object)false));
            JobLeaderService jobLeaderService = taskManagerServices.getJobLeaderService();
            Assert.assertThat((Object)jobLeaderService.containsJob(this.jobId), (Matcher)Matchers.is((Object)false));
            initialSlotReport.get();
            taskExecutorGateway.requestSlot(slotId, this.jobId, allocationId, "foobar", resourceManagerId, timeout).get();
            startFuture.get();
            Assert.assertThat((Object)jobLeaderService.containsJob(this.jobId), (Matcher)Matchers.is((Object)true));
            taskExecutorGateway.freeSlot(allocationId, (Throwable)new FlinkException("Test exception"), timeout).get();
            stopFuture.get();
            Assert.assertThat((Object)jobLeaderService.containsJob(this.jobId), (Matcher)Matchers.is((Object)false));
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)taskExecutor, (Time)timeout);
        }
    }

    @Test
    public void testMaximumRegistrationDuration() throws Exception {
        this.configuration.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, "10 ms");
        TaskExecutor taskExecutor = this.createTaskExecutor(new TaskManagerServicesBuilder().build());
        taskExecutor.start();
        try {
            Throwable error = this.testingFatalErrorHandler.getErrorFuture().get();
            Assert.assertThat((Object)error, (Matcher)Matchers.is((Matcher)Matchers.notNullValue()));
            Assert.assertThat((Object)ExceptionUtils.stripExecutionException((Throwable)error), (Matcher)Matchers.instanceOf(RegistrationTimeoutException.class));
            this.testingFatalErrorHandler.clearError();
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)taskExecutor, (Time)timeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMaximumRegistrationDurationAfterConnectionLoss() throws Exception {
        this.configuration.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, "100 ms");
        TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), this.timerService);
        long heartbeatInterval = 10L;
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskSlotTable(taskSlotTable).build();
        TaskExecutor taskExecutor = new TaskExecutor((RpcService)this.rpc, TaskManagerConfiguration.fromConfiguration((Configuration)this.configuration), (HighAvailabilityServices)this.haServices, taskManagerServices, new HeartbeatServices(10L, 10L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), null, this.dummyBlobCacheService, (FatalErrorHandler)this.testingFatalErrorHandler);
        taskExecutor.start();
        CompletableFuture registrationFuture = new CompletableFuture();
        OneShotLatch secondRegistration = new OneShotLatch();
        try {
            TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
            testingResourceManagerGateway.setRegisterTaskExecutorFunction(tuple -> {
                if (registrationFuture.complete(tuple.f1)) {
                    return CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), testingResourceManagerGateway.getOwnResourceId(), new ClusterInformation("localhost", 1234)));
                }
                secondRegistration.trigger();
                return CompletableFuture.completedFuture(new RegistrationResponse.Decline("Only the first registration should succeed."));
            });
            this.rpc.registerGateway(testingResourceManagerGateway.getAddress(), (RpcGateway)testingResourceManagerGateway);
            this.resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), UUID.randomUUID());
            ResourceID registrationResourceId = (ResourceID)registrationFuture.get();
            Assert.assertThat((Object)registrationResourceId, (Matcher)Matchers.equalTo((Object)taskManagerServices.getTaskManagerLocation().getResourceID()));
            secondRegistration.await();
            Throwable error = this.testingFatalErrorHandler.getErrorFuture().get();
            Assert.assertThat((Object)error, (Matcher)Matchers.is((Matcher)Matchers.notNullValue()));
            Assert.assertThat((Object)ExceptionUtils.stripExecutionException((Throwable)error), (Matcher)Matchers.instanceOf(RegistrationTimeoutException.class));
            this.testingFatalErrorHandler.clearError();
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)taskExecutor, (Time)timeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testIgnoringSlotRequestsIfNotRegistered() throws Exception {
        TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), this.timerService);
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskSlotTable(taskSlotTable).build();
        TaskExecutor taskExecutor = this.createTaskExecutor(taskManagerServices);
        taskExecutor.start();
        try {
            TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
            CompletableFuture registrationFuture = new CompletableFuture();
            CompletableFuture taskExecutorResourceIdFuture = new CompletableFuture();
            testingResourceManagerGateway.setRegisterTaskExecutorFunction(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5 -> {
                taskExecutorResourceIdFuture.complete(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5.f1);
                return registrationFuture;
            });
            this.rpc.registerGateway(testingResourceManagerGateway.getAddress(), (RpcGateway)testingResourceManagerGateway);
            this.resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID());
            TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)taskExecutor.getSelfGateway(TaskExecutorGateway.class);
            ResourceID resourceId = (ResourceID)taskExecutorResourceIdFuture.get();
            SlotID slotId = new SlotID(resourceId, 0);
            CompletableFuture slotRequestResponse = taskExecutorGateway.requestSlot(slotId, this.jobId, new AllocationID(), "foobar", testingResourceManagerGateway.getFencingToken(), timeout);
            try {
                slotRequestResponse.get();
                Assert.fail((String)"We should not be able to request slots before the TaskExecutor is registered at the ResourceManager.");
            }
            catch (ExecutionException ee) {
                Assert.assertThat((Object)ExceptionUtils.stripExecutionException((Throwable)ee), (Matcher)Matchers.instanceOf(TaskManagerException.class));
            }
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)taskExecutor, (Time)timeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReconnectionAttemptIfExplicitlyDisconnected() throws Exception {
        long heartbeatInterval = 1000L;
        TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), this.timerService);
        LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
        TaskExecutor taskExecutor = new TaskExecutor((RpcService)this.rpc, TaskManagerConfiguration.fromConfiguration((Configuration)this.configuration), (HighAvailabilityServices)this.haServices, new TaskManagerServicesBuilder().setTaskSlotTable(taskSlotTable).setTaskManagerLocation(taskManagerLocation).build(), new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), null, this.dummyBlobCacheService, (FatalErrorHandler)this.testingFatalErrorHandler);
        taskExecutor.start();
        try {
            TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
            ClusterInformation clusterInformation = new ClusterInformation("foobar", 1234);
            CompletableFuture<TaskExecutorRegistrationSuccess> registrationResponseFuture = CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), ResourceID.generate(), clusterInformation));
            ArrayBlockingQueue registrationQueue = new ArrayBlockingQueue(1);
            testingResourceManagerGateway.setRegisterTaskExecutorFunction(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5 -> {
                registrationQueue.offer(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5.f1);
                return registrationResponseFuture;
            });
            this.rpc.registerGateway(testingResourceManagerGateway.getAddress(), (RpcGateway)testingResourceManagerGateway);
            this.resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID());
            ResourceID firstRegistrationAttempt = (ResourceID)registrationQueue.take();
            Assert.assertThat((Object)firstRegistrationAttempt, (Matcher)Matchers.equalTo((Object)taskManagerLocation.getResourceID()));
            TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)taskExecutor.getSelfGateway(TaskExecutorGateway.class);
            Assert.assertThat(registrationQueue, (Matcher)Matchers.is((Matcher)Matchers.empty()));
            taskExecutorGateway.disconnectResourceManager((Exception)new FlinkException("Test exception"));
            ResourceID secondRegistrationAttempt = (ResourceID)registrationQueue.take();
            Assert.assertThat((Object)secondRegistrationAttempt, (Matcher)Matchers.equalTo((Object)taskManagerLocation.getResourceID()));
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)taskExecutor, (Time)timeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testInitialSlotReport() throws Exception {
        TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), this.timerService);
        LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskSlotTable(taskSlotTable).setTaskManagerLocation(taskManagerLocation).build();
        TaskExecutor taskExecutor = this.createTaskExecutor(taskManagerServices);
        taskExecutor.start();
        try {
            TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
            CompletableFuture initialSlotReportFuture = new CompletableFuture();
            testingResourceManagerGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3 -> {
                initialSlotReportFuture.complete(resourceIDInstanceIDSlotReportTuple3.f0);
                return CompletableFuture.completedFuture(Acknowledge.get());
            });
            this.rpc.registerGateway(testingResourceManagerGateway.getAddress(), (RpcGateway)testingResourceManagerGateway);
            this.resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID());
            Assert.assertThat(initialSlotReportFuture.get(), (Matcher)Matchers.equalTo((Object)taskManagerLocation.getResourceID()));
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)taskExecutor, (Time)timeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testInitialSlotReportFailure() throws Exception {
        TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), this.timerService);
        LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskSlotTable(taskSlotTable).setTaskManagerLocation(taskManagerLocation).build();
        TaskExecutor taskExecutor = this.createTaskExecutor(taskManagerServices);
        taskExecutor.start();
        try {
            TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
            ArrayBlockingQueue<CompletableFuture<Acknowledge>> responseQueue = new ArrayBlockingQueue<CompletableFuture<Acknowledge>>(2);
            testingResourceManagerGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3 -> {
                try {
                    return (CompletableFuture)responseQueue.take();
                }
                catch (InterruptedException e) {
                    return FutureUtils.completedExceptionally((Throwable)e);
                }
            });
            final CompletableFuture<TaskExecutorRegistrationSuccess> registrationResponse = CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), testingResourceManagerGateway.getOwnResourceId(), new ClusterInformation("foobar", 1234)));
            final CountDownLatch numberRegistrations = new CountDownLatch(2);
            testingResourceManagerGateway.setRegisterTaskExecutorFunction(new Function<Tuple4<String, ResourceID, Integer, HardwareDescription>, CompletableFuture<RegistrationResponse>>(){

                @Override
                public CompletableFuture<RegistrationResponse> apply(Tuple4<String, ResourceID, Integer, HardwareDescription> stringResourceIDIntegerHardwareDescriptionTuple4) {
                    numberRegistrations.countDown();
                    return registrationResponse;
                }
            });
            responseQueue.offer(FutureUtils.completedExceptionally((Throwable)new FlinkException("Test exception")));
            responseQueue.offer(CompletableFuture.completedFuture(Acknowledge.get()));
            this.rpc.registerGateway(testingResourceManagerGateway.getAddress(), (RpcGateway)testingResourceManagerGateway);
            this.resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID());
            numberRegistrations.await();
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)taskExecutor, (Time)timeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testOfferSlotToJobMasterAfterTimeout() throws Exception {
        TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(ResourceProfile.UNKNOWN, ResourceProfile.UNKNOWN), this.timerService);
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskSlotTable(taskSlotTable).build();
        TaskExecutor taskExecutor = this.createTaskExecutor(taskManagerServices);
        AllocationID allocationId = new AllocationID();
        CompletableFuture initialSlotReportFuture = new CompletableFuture();
        TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        testingResourceManagerGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3 -> {
            initialSlotReportFuture.complete(null);
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        this.rpc.registerGateway(testingResourceManagerGateway.getAddress(), (RpcGateway)testingResourceManagerGateway);
        this.resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID());
        CountDownLatch slotOfferings = new CountDownLatch(3);
        CompletableFuture offeredSlotFuture = new CompletableFuture();
        TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder().setOfferSlotsFunction((resourceID, slotOffers) -> {
            Assert.assertThat((Object)slotOffers.size(), (Matcher)Matchers.is((Object)1));
            slotOfferings.countDown();
            if (slotOfferings.getCount() == 0L) {
                offeredSlotFuture.complete(((SlotOffer)slotOffers.iterator().next()).getAllocationId());
                return CompletableFuture.completedFuture(slotOffers);
            }
            return FutureUtils.completedExceptionally((Throwable)new TimeoutException());
        }).build();
        String jobManagerAddress = jobMasterGateway.getAddress();
        this.rpc.registerGateway(jobManagerAddress, (RpcGateway)jobMasterGateway);
        this.jobManagerLeaderRetriever.notifyListener(jobManagerAddress, jobMasterGateway.getFencingToken().toUUID());
        try {
            taskExecutor.start();
            TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)taskExecutor.getSelfGateway(TaskExecutorGateway.class);
            initialSlotReportFuture.get();
            taskExecutorGateway.requestSlot(new SlotID(taskExecutor.getResourceID(), 0), this.jobId, allocationId, jobManagerAddress, testingResourceManagerGateway.getFencingToken(), timeout).get();
            slotOfferings.await();
            Assert.assertThat(offeredSlotFuture.get(), (Matcher)Matchers.is((Object)allocationId));
            Assert.assertTrue((boolean)taskSlotTable.isSlotFree(1));
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)taskExecutor, (Time)timeout);
        }
    }

    @Nonnull
    private TaskExecutor createTaskExecutor(TaskManagerServices taskManagerServices) {
        return new TaskExecutor((RpcService)this.rpc, TaskManagerConfiguration.fromConfiguration((Configuration)this.configuration), (HighAvailabilityServices)this.haServices, taskManagerServices, new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), null, this.dummyBlobCacheService, (FatalErrorHandler)this.testingFatalErrorHandler);
    }

    private static final class RecordingHeartbeatManagerImpl<I, O>
    extends HeartbeatManagerImpl<I, O> {
        private final BlockingQueue<ResourceID> unmonitoredTargets;
        private final BlockingQueue<ResourceID> monitoredTargets;

        public RecordingHeartbeatManagerImpl(long heartbeatTimeoutIntervalMs, ResourceID ownResourceID, HeartbeatListener<I, O> heartbeatListener, Executor executor, ScheduledExecutor scheduledExecutor, Logger log, BlockingQueue<ResourceID> unmonitoredTargets, BlockingQueue<ResourceID> monitoredTargets) {
            super(heartbeatTimeoutIntervalMs, ownResourceID, heartbeatListener, executor, scheduledExecutor, log);
            this.unmonitoredTargets = unmonitoredTargets;
            this.monitoredTargets = monitoredTargets;
        }

        public void unmonitorTarget(ResourceID resourceID) {
            super.unmonitorTarget(resourceID);
            this.unmonitoredTargets.offer(resourceID);
        }

        public void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget) {
            super.monitorTarget(resourceID, heartbeatTarget);
            this.monitoredTargets.offer(resourceID);
        }
    }

    private static final class RecordingHeartbeatServices
    extends HeartbeatServices {
        private final BlockingQueue<ResourceID> unmonitoredTargets = new ArrayBlockingQueue<ResourceID>(1);
        private final BlockingQueue<ResourceID> monitoredTargets = new ArrayBlockingQueue<ResourceID>(1);

        public RecordingHeartbeatServices(long heartbeatInterval, long heartbeatTimeout) {
            super(heartbeatInterval, heartbeatTimeout);
        }

        public <I, O> HeartbeatManager<I, O> createHeartbeatManager(ResourceID resourceId, HeartbeatListener<I, O> heartbeatListener, ScheduledExecutor scheduledExecutor, Logger log) {
            return new RecordingHeartbeatManagerImpl<I, O>(this.heartbeatTimeout, resourceId, heartbeatListener, (Executor)scheduledExecutor, scheduledExecutor, log, this.unmonitoredTargets, this.monitoredTargets);
        }

        public BlockingQueue<ResourceID> getUnmonitoredTargets() {
            return this.unmonitoredTargets;
        }

        public BlockingQueue<ResourceID> getMonitoredTargets() {
            return this.monitoredTargets;
        }
    }

    private static final class StartStopNotifyingLeaderRetrievalService
    implements LeaderRetrievalService {
        private final CompletableFuture<LeaderRetrievalListener> startFuture;
        private final CompletableFuture<Void> stopFuture;

        private StartStopNotifyingLeaderRetrievalService(CompletableFuture<LeaderRetrievalListener> startFuture, CompletableFuture<Void> stopFuture) {
            this.startFuture = startFuture;
            this.stopFuture = stopFuture;
        }

        public void start(LeaderRetrievalListener listener) throws Exception {
            this.startFuture.complete(listener);
        }

        public void stop() throws Exception {
            this.stopFuture.complete(null);
        }
    }

    public static class TestInvokable
    extends AbstractInvokable {
        static final CompletableFuture<Boolean> COMPLETABLE_FUTURE = new CompletableFuture();

        public TestInvokable(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            COMPLETABLE_FUTURE.complete(true);
        }
    }
}

