/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager.slotmanager;

import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.resourcemanager.slotmanager.PendingSlotRequest;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceActions;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class SlotManagerTest
extends TestLogger {
    @Test
    public void testTaskManagerRegistration() throws Exception {
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ResourceActions resourceManagerActions = (ResourceActions)Mockito.mock(ResourceActions.class);
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)Mockito.mock(TaskExecutorGateway.class);
        TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
        ResourceID resourceId = ResourceID.generate();
        SlotID slotId1 = new SlotID(resourceId, 0);
        SlotID slotId2 = new SlotID(resourceId, 1);
        ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
        SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
        SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
        SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
        try (SlotManager slotManager = this.createSlotManager(resourceManagerId, resourceManagerActions);){
            slotManager.registerTaskManager(taskManagerConnection, slotReport);
            Assert.assertTrue((String)"The number registered slots does not equal the expected number.", (2 == slotManager.getNumberRegisteredSlots() ? 1 : 0) != 0);
            Assert.assertNotNull((Object)slotManager.getSlot(slotId1));
            Assert.assertNotNull((Object)slotManager.getSlot(slotId2));
        }
    }

    @Test
    public void testTaskManagerUnregistration() throws Exception {
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ResourceActions resourceManagerActions = (ResourceActions)Mockito.mock(ResourceActions.class);
        JobID jobId = new JobID();
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)Mockito.mock(TaskExecutorGateway.class);
        Mockito.when((Object)taskExecutorGateway.requestSlot((SlotID)Matchers.any(SlotID.class), (JobID)Matchers.any(JobID.class), (AllocationID)Matchers.any(AllocationID.class), Matchers.anyString(), (ResourceManagerId)Matchers.eq((Object)resourceManagerId), (Time)Matchers.any(Time.class))).thenReturn(new CompletableFuture());
        TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
        ResourceID resourceId = ResourceID.generate();
        SlotID slotId1 = new SlotID(resourceId, 0);
        SlotID slotId2 = new SlotID(resourceId, 1);
        AllocationID allocationId1 = new AllocationID();
        AllocationID allocationId2 = new AllocationID();
        ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
        SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile, jobId, allocationId1);
        SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
        SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
        SlotRequest slotRequest = new SlotRequest(new JobID(), allocationId2, resourceProfile, "foobar");
        try (SlotManager slotManager = this.createSlotManager(resourceManagerId, resourceManagerActions);){
            slotManager.registerTaskManager(taskManagerConnection, slotReport);
            Assert.assertTrue((String)"The number registered slots does not equal the expected number.", (2 == slotManager.getNumberRegisteredSlots() ? 1 : 0) != 0);
            TaskManagerSlot slot1 = slotManager.getSlot(slotId1);
            TaskManagerSlot slot2 = slotManager.getSlot(slotId2);
            Assert.assertTrue((slot1.getState() == TaskManagerSlot.State.ALLOCATED ? 1 : 0) != 0);
            Assert.assertTrue((slot2.getState() == TaskManagerSlot.State.FREE ? 1 : 0) != 0);
            Assert.assertTrue((boolean)slotManager.registerSlotRequest(slotRequest));
            Assert.assertFalse((slot2.getState() == TaskManagerSlot.State.FREE ? 1 : 0) != 0);
            Assert.assertTrue((slot2.getState() == TaskManagerSlot.State.PENDING ? 1 : 0) != 0);
            PendingSlotRequest pendingSlotRequest = slotManager.getSlotRequest(allocationId2);
            Assert.assertTrue((String)"The pending slot request should have been assigned to slot 2", (boolean)pendingSlotRequest.isAssigned());
            slotManager.unregisterTaskManager(taskManagerConnection.getInstanceID());
            Assert.assertTrue((0 == slotManager.getNumberRegisteredSlots() ? 1 : 0) != 0);
            Assert.assertFalse((boolean)pendingSlotRequest.isAssigned());
        }
    }

    @Test
    public void testSlotRequestWithoutFreeSlots() throws Exception {
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
        SlotRequest slotRequest = new SlotRequest(new JobID(), new AllocationID(), resourceProfile, "localhost");
        ResourceActions resourceManagerActions = (ResourceActions)Mockito.mock(ResourceActions.class);
        try (SlotManager slotManager = this.createSlotManager(resourceManagerId, resourceManagerActions);){
            slotManager.registerSlotRequest(slotRequest);
            ((ResourceActions)Mockito.verify((Object)resourceManagerActions)).allocateResource((ResourceProfile)Matchers.eq((Object)resourceProfile));
        }
    }

    @Test
    public void testSlotRequestWithResourceAllocationFailure() throws Exception {
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
        SlotRequest slotRequest = new SlotRequest(new JobID(), new AllocationID(), resourceProfile, "localhost");
        ResourceActions resourceManagerActions = (ResourceActions)Mockito.mock(ResourceActions.class);
        ((ResourceActions)Mockito.doThrow((Throwable)new ResourceManagerException("Test exception")).when((Object)resourceManagerActions)).allocateResource((ResourceProfile)Matchers.any(ResourceProfile.class));
        try (SlotManager slotManager = this.createSlotManager(resourceManagerId, resourceManagerActions);){
            slotManager.registerSlotRequest(slotRequest);
            Assert.fail((String)"The slot request should have failed with a ResourceManagerException.");
        }
        catch (ResourceManagerException resourceManagerException) {
            // empty catch block
        }
    }

    @Test
    public void testSlotRequestWithFreeSlot() throws Exception {
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ResourceID resourceID = ResourceID.generate();
        JobID jobId = new JobID();
        SlotID slotId = new SlotID(resourceID, 0);
        String targetAddress = "localhost";
        AllocationID allocationId = new AllocationID();
        ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
        SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "localhost");
        ResourceActions resourceManagerActions = (ResourceActions)Mockito.mock(ResourceActions.class);
        try (SlotManager slotManager = this.createSlotManager(resourceManagerId, resourceManagerActions);){
            TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)Mockito.mock(TaskExecutorGateway.class);
            Mockito.when((Object)taskExecutorGateway.requestSlot((SlotID)Matchers.eq((Object)slotId), (JobID)Matchers.eq((Object)jobId), (AllocationID)Matchers.eq((Object)allocationId), Matchers.anyString(), (ResourceManagerId)Matchers.eq((Object)resourceManagerId), (Time)Matchers.any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
            TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
            SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
            SlotReport slotReport = new SlotReport(slotStatus);
            slotManager.registerTaskManager(taskExecutorConnection, slotReport);
            Assert.assertTrue((String)"The slot request should be accepted", (boolean)slotManager.registerSlotRequest(slotRequest));
            ((TaskExecutorGateway)Mockito.verify((Object)taskExecutorGateway)).requestSlot((SlotID)Matchers.eq((Object)slotId), (JobID)Matchers.eq((Object)jobId), (AllocationID)Matchers.eq((Object)allocationId), (String)Matchers.eq((Object)"localhost"), (ResourceManagerId)Matchers.eq((Object)resourceManagerId), (Time)Matchers.any(Time.class));
            TaskManagerSlot slot = slotManager.getSlot(slotId);
            Assert.assertEquals((String)"The slot has not been allocated to the expected allocation id.", (Object)allocationId, (Object)slot.getAllocationId());
        }
    }

    @Test
    public void testUnregisterPendingSlotRequest() throws Exception {
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ResourceActions resourceManagerActions = (ResourceActions)Mockito.mock(ResourceActions.class);
        SlotID slotId = new SlotID(ResourceID.generate(), 0);
        AllocationID allocationId = new AllocationID();
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)Mockito.mock(TaskExecutorGateway.class);
        Mockito.when((Object)taskExecutorGateway.requestSlot((SlotID)Matchers.any(SlotID.class), (JobID)Matchers.any(JobID.class), (AllocationID)Matchers.any(AllocationID.class), Matchers.anyString(), (ResourceManagerId)Matchers.eq((Object)resourceManagerId), (Time)Matchers.any(Time.class))).thenReturn(new CompletableFuture());
        ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
        SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
        SlotReport slotReport = new SlotReport(slotStatus);
        SlotRequest slotRequest = new SlotRequest(new JobID(), allocationId, resourceProfile, "foobar");
        TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
        try (SlotManager slotManager = this.createSlotManager(resourceManagerId, resourceManagerActions);){
            slotManager.registerTaskManager(taskManagerConnection, slotReport);
            TaskManagerSlot slot = slotManager.getSlot(slotId);
            slotManager.registerSlotRequest(slotRequest);
            Assert.assertNotNull((Object)slotManager.getSlotRequest(allocationId));
            Assert.assertTrue((slot.getState() == TaskManagerSlot.State.PENDING ? 1 : 0) != 0);
            slotManager.unregisterSlotRequest(allocationId);
            Assert.assertNull((Object)slotManager.getSlotRequest(allocationId));
            slot = slotManager.getSlot(slotId);
            Assert.assertTrue((slot.getState() == TaskManagerSlot.State.FREE ? 1 : 0) != 0);
        }
    }

    @Test
    public void testFulfillingPendingSlotRequest() throws Exception {
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ResourceID resourceID = ResourceID.generate();
        JobID jobId = new JobID();
        SlotID slotId = new SlotID(resourceID, 0);
        String targetAddress = "localhost";
        AllocationID allocationId = new AllocationID();
        ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
        SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "localhost");
        ResourceActions resourceManagerActions = (ResourceActions)Mockito.mock(ResourceActions.class);
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)Mockito.mock(TaskExecutorGateway.class);
        Mockito.when((Object)taskExecutorGateway.requestSlot((SlotID)Matchers.eq((Object)slotId), (JobID)Matchers.eq((Object)jobId), (AllocationID)Matchers.eq((Object)allocationId), Matchers.anyString(), (ResourceManagerId)Matchers.eq((Object)resourceManagerId), (Time)Matchers.any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
        TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
        SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
        SlotReport slotReport = new SlotReport(slotStatus);
        try (SlotManager slotManager = this.createSlotManager(resourceManagerId, resourceManagerActions);){
            Assert.assertTrue((String)"The slot request should be accepted", (boolean)slotManager.registerSlotRequest(slotRequest));
            ((ResourceActions)Mockito.verify((Object)resourceManagerActions, (VerificationMode)Mockito.times((int)1))).allocateResource((ResourceProfile)Matchers.eq((Object)resourceProfile));
            slotManager.registerTaskManager(taskExecutorConnection, slotReport);
            ((TaskExecutorGateway)Mockito.verify((Object)taskExecutorGateway)).requestSlot((SlotID)Matchers.eq((Object)slotId), (JobID)Matchers.eq((Object)jobId), (AllocationID)Matchers.eq((Object)allocationId), (String)Matchers.eq((Object)"localhost"), (ResourceManagerId)Matchers.eq((Object)resourceManagerId), (Time)Matchers.any(Time.class));
            TaskManagerSlot slot = slotManager.getSlot(slotId);
            Assert.assertEquals((String)"The slot has not been allocated to the expected allocation id.", (Object)allocationId, (Object)slot.getAllocationId());
        }
    }

    @Test
    public void testFreeSlot() throws Exception {
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ResourceID resourceID = ResourceID.generate();
        JobID jobId = new JobID();
        SlotID slotId = new SlotID(resourceID, 0);
        AllocationID allocationId = new AllocationID();
        ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
        ResourceActions resourceManagerActions = (ResourceActions)Mockito.mock(ResourceActions.class);
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)Mockito.mock(TaskExecutorGateway.class);
        TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
        SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile, jobId, allocationId);
        SlotReport slotReport = new SlotReport(slotStatus);
        try (SlotManager slotManager = this.createSlotManager(resourceManagerId, resourceManagerActions);){
            slotManager.registerTaskManager(taskExecutorConnection, slotReport);
            TaskManagerSlot slot = slotManager.getSlot(slotId);
            Assert.assertEquals((String)"The slot has not been allocated to the expected allocation id.", (Object)allocationId, (Object)slot.getAllocationId());
            slotManager.freeSlot(slotId, new AllocationID());
            Assert.assertTrue((slot.getState() == TaskManagerSlot.State.ALLOCATED ? 1 : 0) != 0);
            Assert.assertEquals((String)"The slot has not been allocated to the expected allocation id.", (Object)allocationId, (Object)slot.getAllocationId());
            slotManager.freeSlot(slotId, allocationId);
            Assert.assertTrue((slot.getState() == TaskManagerSlot.State.FREE ? 1 : 0) != 0);
            Assert.assertNull((Object)slot.getAllocationId());
        }
    }

    @Test
    public void testDuplicatePendingSlotRequest() throws Exception {
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ResourceActions resourceManagerActions = (ResourceActions)Mockito.mock(ResourceActions.class);
        AllocationID allocationId = new AllocationID();
        ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 2);
        ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 1);
        SlotRequest slotRequest1 = new SlotRequest(new JobID(), allocationId, resourceProfile1, "foobar");
        SlotRequest slotRequest2 = new SlotRequest(new JobID(), allocationId, resourceProfile2, "barfoo");
        try (SlotManager slotManager = this.createSlotManager(resourceManagerId, resourceManagerActions);){
            Assert.assertTrue((boolean)slotManager.registerSlotRequest(slotRequest1));
            Assert.assertFalse((boolean)slotManager.registerSlotRequest(slotRequest2));
        }
        ((ResourceActions)Mockito.verify((Object)resourceManagerActions, (VerificationMode)Mockito.times((int)1))).allocateResource((ResourceProfile)Matchers.any(ResourceProfile.class));
    }

    @Test
    public void testDuplicatePendingSlotRequestAfterSlotReport() throws Exception {
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ResourceActions resourceManagerActions = (ResourceActions)Mockito.mock(ResourceActions.class);
        JobID jobId = new JobID();
        AllocationID allocationId = new AllocationID();
        ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
        SlotID slotId = new SlotID(ResourceID.generate(), 0);
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)Mockito.mock(TaskExecutorGateway.class);
        TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
        SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile, jobId, allocationId);
        SlotReport slotReport = new SlotReport(slotStatus);
        SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
        try (SlotManager slotManager = this.createSlotManager(resourceManagerId, resourceManagerActions);){
            slotManager.registerTaskManager(taskManagerConnection, slotReport);
            Assert.assertFalse((boolean)slotManager.registerSlotRequest(slotRequest));
        }
    }

    @Test
    public void testDuplicatePendingSlotRequestAfterSuccessfulAllocation() throws Exception {
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ResourceActions resourceManagerActions = (ResourceActions)Mockito.mock(ResourceActions.class);
        AllocationID allocationId = new AllocationID();
        ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 2);
        ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 1);
        SlotRequest slotRequest1 = new SlotRequest(new JobID(), allocationId, resourceProfile1, "foobar");
        SlotRequest slotRequest2 = new SlotRequest(new JobID(), allocationId, resourceProfile2, "barfoo");
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)Mockito.mock(TaskExecutorGateway.class);
        Mockito.when((Object)taskExecutorGateway.requestSlot((SlotID)Matchers.any(SlotID.class), (JobID)Matchers.any(JobID.class), (AllocationID)Matchers.any(AllocationID.class), Matchers.anyString(), (ResourceManagerId)Matchers.eq((Object)resourceManagerId), (Time)Matchers.any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
        TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
        SlotID slotId = new SlotID(ResourceID.generate(), 0);
        SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile1);
        SlotReport slotReport = new SlotReport(slotStatus);
        try (SlotManager slotManager = this.createSlotManager(resourceManagerId, resourceManagerActions);){
            slotManager.registerTaskManager(taskManagerConnection, slotReport);
            Assert.assertTrue((boolean)slotManager.registerSlotRequest(slotRequest1));
            TaskManagerSlot slot = slotManager.getSlot(slotId);
            Assert.assertEquals((String)"The slot has not been allocated to the expected allocation id.", (Object)allocationId, (Object)slot.getAllocationId());
            Assert.assertFalse((boolean)slotManager.registerSlotRequest(slotRequest2));
        }
        ((ResourceActions)Mockito.verify((Object)resourceManagerActions, (VerificationMode)Mockito.never())).allocateResource((ResourceProfile)Matchers.any(ResourceProfile.class));
    }

    @Test
    public void testAcceptingDuplicateSlotRequestAfterAllocationRelease() throws Exception {
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ResourceActions resourceManagerActions = (ResourceActions)Mockito.mock(ResourceActions.class);
        AllocationID allocationId = new AllocationID();
        ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 2);
        ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 1);
        SlotRequest slotRequest1 = new SlotRequest(new JobID(), allocationId, resourceProfile1, "foobar");
        SlotRequest slotRequest2 = new SlotRequest(new JobID(), allocationId, resourceProfile2, "barfoo");
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)Mockito.mock(TaskExecutorGateway.class);
        Mockito.when((Object)taskExecutorGateway.requestSlot((SlotID)Matchers.any(SlotID.class), (JobID)Matchers.any(JobID.class), (AllocationID)Matchers.any(AllocationID.class), Matchers.anyString(), (ResourceManagerId)Matchers.eq((Object)resourceManagerId), (Time)Matchers.any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
        TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
        SlotID slotId = new SlotID(ResourceID.generate(), 0);
        SlotStatus slotStatus = new SlotStatus(slotId, new ResourceProfile(2.0, 2));
        SlotReport slotReport = new SlotReport(slotStatus);
        try (SlotManager slotManager = this.createSlotManager(resourceManagerId, resourceManagerActions);){
            slotManager.registerTaskManager(taskManagerConnection, slotReport);
            Assert.assertTrue((boolean)slotManager.registerSlotRequest(slotRequest1));
            TaskManagerSlot slot = slotManager.getSlot(slotId);
            Assert.assertEquals((String)"The slot has not been allocated to the expected allocation id.", (Object)allocationId, (Object)slot.getAllocationId());
            slotManager.freeSlot(slotId, allocationId);
            Assert.assertTrue((slot.getState() == TaskManagerSlot.State.FREE ? 1 : 0) != 0);
            Assert.assertNull((Object)slot.getAllocationId());
            Assert.assertTrue((boolean)slotManager.registerSlotRequest(slotRequest2));
            Assert.assertEquals((String)"The slot has not been allocated to the expected allocation id.", (Object)allocationId, (Object)slot.getAllocationId());
        }
        ((ResourceActions)Mockito.verify((Object)resourceManagerActions, (VerificationMode)Mockito.never())).allocateResource((ResourceProfile)Matchers.any(ResourceProfile.class));
    }

    @Test
    public void testReceivingUnknownSlotReport() throws Exception {
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ResourceActions resourceManagerActions = (ResourceActions)Mockito.mock(ResourceActions.class);
        InstanceID unknownInstanceID = new InstanceID();
        SlotID unknownSlotId = new SlotID(ResourceID.generate(), 0);
        ResourceProfile unknownResourceProfile = new ResourceProfile(1.0, 1);
        SlotStatus unknownSlotStatus = new SlotStatus(unknownSlotId, unknownResourceProfile);
        SlotReport unknownSlotReport = new SlotReport(unknownSlotStatus);
        try (SlotManager slotManager = this.createSlotManager(resourceManagerId, resourceManagerActions);){
            Assert.assertTrue((0 == slotManager.getNumberRegisteredSlots() ? 1 : 0) != 0);
            Assert.assertFalse((boolean)slotManager.reportSlotStatus(unknownInstanceID, unknownSlotReport));
            Assert.assertTrue((0 == slotManager.getNumberRegisteredSlots() ? 1 : 0) != 0);
        }
    }

    @Test
    public void testUpdateSlotReport() throws Exception {
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ResourceActions resourceManagerActions = (ResourceActions)Mockito.mock(ResourceActions.class);
        JobID jobId = new JobID();
        AllocationID allocationId = new AllocationID();
        ResourceID resourceId = ResourceID.generate();
        SlotID slotId1 = new SlotID(resourceId, 0);
        SlotID slotId2 = new SlotID(resourceId, 1);
        ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
        SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
        SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
        SlotStatus newSlotStatus2 = new SlotStatus(slotId2, resourceProfile, jobId, allocationId);
        SlotReport slotReport1 = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
        SlotReport slotReport2 = new SlotReport(Arrays.asList(newSlotStatus2, slotStatus1));
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)Mockito.mock(TaskExecutorGateway.class);
        TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
        try (SlotManager slotManager = this.createSlotManager(resourceManagerId, resourceManagerActions);){
            Assert.assertTrue((0 == slotManager.getNumberRegisteredSlots() ? 1 : 0) != 0);
            slotManager.registerTaskManager(taskManagerConnection, slotReport1);
            TaskManagerSlot slot1 = slotManager.getSlot(slotId1);
            TaskManagerSlot slot2 = slotManager.getSlot(slotId2);
            Assert.assertTrue((2 == slotManager.getNumberRegisteredSlots() ? 1 : 0) != 0);
            Assert.assertTrue((slot1.getState() == TaskManagerSlot.State.FREE ? 1 : 0) != 0);
            Assert.assertTrue((slot2.getState() == TaskManagerSlot.State.FREE ? 1 : 0) != 0);
            Assert.assertTrue((boolean)slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), slotReport2));
            Assert.assertTrue((2 == slotManager.getNumberRegisteredSlots() ? 1 : 0) != 0);
            Assert.assertNotNull((Object)slotManager.getSlot(slotId1));
            Assert.assertNotNull((Object)slotManager.getSlot(slotId2));
            Assert.assertEquals((Object)allocationId, (Object)slotManager.getSlot(slotId2).getAllocationId());
        }
    }

    @Test
    public void testTaskManagerTimeout() throws Exception {
        long tmTimeout = 500L;
        ResourceActions resourceManagerActions = (ResourceActions)Mockito.mock(ResourceActions.class);
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)Mockito.mock(TaskExecutorGateway.class);
        final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
        SlotID slotId = new SlotID(ResourceID.generate(), 0);
        ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
        SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
        final SlotReport slotReport = new SlotReport(slotStatus);
        ScheduledExecutorService mainThreadExecutor = TestingUtils.defaultExecutor();
        try (final SlotManager slotManager = new SlotManager(TestingUtils.defaultScheduledExecutor(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), Time.milliseconds((long)500L));){
            slotManager.start(resourceManagerId, (Executor)mainThreadExecutor, resourceManagerActions);
            mainThreadExecutor.execute(new Runnable(){

                @Override
                public void run() {
                    slotManager.registerTaskManager(taskManagerConnection, slotReport);
                }
            });
            ((ResourceActions)Mockito.verify((Object)resourceManagerActions, (VerificationMode)Mockito.timeout((long)50000L).times(1))).releaseResource((InstanceID)Matchers.eq((Object)taskManagerConnection.getInstanceID()));
        }
    }

    @Test
    public void testSlotRequestTimeout() throws Exception {
        long allocationTimeout = 50L;
        ResourceActions resourceManagerActions = (ResourceActions)Mockito.mock(ResourceActions.class);
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        JobID jobId = new JobID();
        AllocationID allocationId = new AllocationID();
        ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
        final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
        ScheduledExecutorService mainThreadExecutor = TestingUtils.defaultExecutor();
        try (final SlotManager slotManager = new SlotManager(TestingUtils.defaultScheduledExecutor(), TestingUtils.infiniteTime(), Time.milliseconds((long)50L), TestingUtils.infiniteTime());){
            slotManager.start(resourceManagerId, (Executor)mainThreadExecutor, resourceManagerActions);
            final AtomicReference<Object> atomicException = new AtomicReference<Object>(null);
            mainThreadExecutor.execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        Assert.assertTrue((boolean)slotManager.registerSlotRequest(slotRequest));
                    }
                    catch (Exception e) {
                        atomicException.compareAndSet(null, e);
                    }
                }
            });
            ((ResourceActions)Mockito.verify((Object)resourceManagerActions, (VerificationMode)Mockito.timeout((long)5000L).times(1))).notifyAllocationFailure((JobID)Matchers.eq((Object)jobId), (AllocationID)Matchers.eq((Object)allocationId), (Exception)Matchers.any(TimeoutException.class));
            if (atomicException.get() != null) {
                throw (Exception)atomicException.get();
            }
        }
    }

    @Test
    public void testTaskManagerSlotRequestTimeoutHandling() throws Exception {
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ResourceActions resourceManagerActions = (ResourceActions)Mockito.mock(ResourceActions.class);
        JobID jobId = new JobID();
        AllocationID allocationId = new AllocationID();
        ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
        SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
        CompletableFuture slotRequestFuture1 = new CompletableFuture();
        CompletableFuture<Acknowledge> slotRequestFuture2 = new CompletableFuture<Acknowledge>();
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)Mockito.mock(TaskExecutorGateway.class);
        Mockito.when((Object)taskExecutorGateway.requestSlot((SlotID)Matchers.any(SlotID.class), (JobID)Matchers.any(JobID.class), (AllocationID)Matchers.eq((Object)allocationId), Matchers.anyString(), (ResourceManagerId)Matchers.any(ResourceManagerId.class), (Time)Matchers.any(Time.class))).thenReturn(slotRequestFuture1, (Object[])new CompletableFuture[]{slotRequestFuture2});
        TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
        ResourceID resourceId = ResourceID.generate();
        SlotID slotId1 = new SlotID(resourceId, 0);
        SlotID slotId2 = new SlotID(resourceId, 1);
        SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
        SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
        SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
        try (SlotManager slotManager = this.createSlotManager(resourceManagerId, resourceManagerActions);){
            slotManager.registerTaskManager(taskManagerConnection, slotReport);
            slotManager.registerSlotRequest(slotRequest);
            ArgumentCaptor slotIdCaptor = ArgumentCaptor.forClass(SlotID.class);
            ((TaskExecutorGateway)Mockito.verify((Object)taskExecutorGateway, (VerificationMode)Mockito.times((int)1))).requestSlot((SlotID)slotIdCaptor.capture(), (JobID)Matchers.eq((Object)jobId), (AllocationID)Matchers.eq((Object)allocationId), Matchers.anyString(), (ResourceManagerId)Matchers.eq((Object)resourceManagerId), (Time)Matchers.any(Time.class));
            TaskManagerSlot failedSlot = slotManager.getSlot((SlotID)slotIdCaptor.getValue());
            slotRequestFuture1.completeExceptionally((Throwable)new SlotAllocationException("Test exception."));
            ((TaskExecutorGateway)Mockito.verify((Object)taskExecutorGateway, (VerificationMode)Mockito.times((int)2))).requestSlot((SlotID)slotIdCaptor.capture(), (JobID)Matchers.eq((Object)jobId), (AllocationID)Matchers.eq((Object)allocationId), Matchers.anyString(), (ResourceManagerId)Matchers.eq((Object)resourceManagerId), (Time)Matchers.any(Time.class));
            slotRequestFuture2.complete(Acknowledge.get());
            TaskManagerSlot slot = slotManager.getSlot((SlotID)slotIdCaptor.getValue());
            Assert.assertTrue((slot.getState() == TaskManagerSlot.State.ALLOCATED ? 1 : 0) != 0);
            Assert.assertEquals((Object)allocationId, (Object)slot.getAllocationId());
            if (!failedSlot.getSlotId().equals((Object)slot.getSlotId())) {
                Assert.assertTrue((failedSlot.getState() == TaskManagerSlot.State.FREE ? 1 : 0) != 0);
            }
        }
    }

    @Test
    public void testSlotReportWhileActiveSlotRequest() throws Exception {
        long verifyTimeout = 10000L;
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ResourceActions resourceManagerActions = (ResourceActions)Mockito.mock(ResourceActions.class);
        JobID jobId = new JobID();
        AllocationID allocationId = new AllocationID();
        ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
        SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
        CompletableFuture slotRequestFuture1 = new CompletableFuture();
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)Mockito.mock(TaskExecutorGateway.class);
        Mockito.when((Object)taskExecutorGateway.requestSlot((SlotID)Matchers.any(SlotID.class), (JobID)Matchers.any(JobID.class), (AllocationID)Matchers.eq((Object)allocationId), Matchers.anyString(), (ResourceManagerId)Matchers.any(ResourceManagerId.class), (Time)Matchers.any(Time.class))).thenReturn(slotRequestFuture1, (Object[])new CompletableFuture[]{CompletableFuture.completedFuture(Acknowledge.get())});
        TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
        ResourceID resourceId = ResourceID.generate();
        SlotID slotId1 = new SlotID(resourceId, 0);
        SlotID slotId2 = new SlotID(resourceId, 1);
        SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
        SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
        SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
        ScheduledExecutorService mainThreadExecutor = TestingUtils.defaultExecutor();
        try (SlotManager slotManager = new SlotManager(TestingUtils.defaultScheduledExecutor(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime());){
            slotManager.start(resourceManagerId, (Executor)mainThreadExecutor, resourceManagerActions);
            CompletionStage registrationFuture = CompletableFuture.supplyAsync(() -> {
                slotManager.registerTaskManager(taskManagerConnection, slotReport);
                return null;
            }, mainThreadExecutor).thenAccept(value -> {
                try {
                    slotManager.registerSlotRequest(slotRequest);
                }
                catch (SlotManagerException e) {
                    throw new RuntimeException("Could not register slots.", e);
                }
            });
            ((CompletableFuture)registrationFuture).get();
            ArgumentCaptor slotIdCaptor = ArgumentCaptor.forClass(SlotID.class);
            ((TaskExecutorGateway)Mockito.verify((Object)taskExecutorGateway, (VerificationMode)Mockito.times((int)1))).requestSlot((SlotID)slotIdCaptor.capture(), (JobID)Matchers.eq((Object)jobId), (AllocationID)Matchers.eq((Object)allocationId), Matchers.anyString(), (ResourceManagerId)Matchers.eq((Object)resourceManagerId), (Time)Matchers.any(Time.class));
            SlotID requestedSlotId = (SlotID)slotIdCaptor.getValue();
            SlotID freeSlotId = requestedSlotId.equals((Object)slotId1) ? slotId2 : slotId1;
            CompletableFuture<Boolean> freeSlotFuture = CompletableFuture.supplyAsync(() -> slotManager.getSlot(freeSlotId).getState() == TaskManagerSlot.State.FREE, mainThreadExecutor);
            Assert.assertTrue((boolean)freeSlotFuture.get());
            SlotStatus newSlotStatus1 = new SlotStatus((SlotID)slotIdCaptor.getValue(), resourceProfile, new JobID(), new AllocationID());
            SlotStatus newSlotStatus2 = new SlotStatus(freeSlotId, resourceProfile);
            SlotReport newSlotReport = new SlotReport(Arrays.asList(newSlotStatus1, newSlotStatus2));
            CompletableFuture<Boolean> reportSlotStatusFuture = CompletableFuture.supplyAsync(() -> slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), newSlotReport), mainThreadExecutor);
            Assert.assertTrue((boolean)reportSlotStatusFuture.get());
            ((TaskExecutorGateway)Mockito.verify((Object)taskExecutorGateway, (VerificationMode)Mockito.timeout((long)10000L).times(2))).requestSlot((SlotID)slotIdCaptor.capture(), (JobID)Matchers.eq((Object)jobId), (AllocationID)Matchers.eq((Object)allocationId), Matchers.anyString(), (ResourceManagerId)Matchers.eq((Object)resourceManagerId), (Time)Matchers.any(Time.class));
            SlotID requestedSlotId2 = (SlotID)slotIdCaptor.getValue();
            Assert.assertEquals((Object)slotId2, (Object)requestedSlotId2);
            CompletableFuture<TaskManagerSlot> requestedSlotFuture = CompletableFuture.supplyAsync(() -> slotManager.getSlot(requestedSlotId2), mainThreadExecutor);
            TaskManagerSlot slot = requestedSlotFuture.get();
            Assert.assertTrue((slot.getState() == TaskManagerSlot.State.ALLOCATED ? 1 : 0) != 0);
            Assert.assertEquals((Object)allocationId, (Object)slot.getAllocationId());
        }
    }

    @Test
    public void testTimeoutForUnusedTaskManager() throws Exception {
        long taskManagerTimeout = 50L;
        long verifyTimeout = 500L;
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ResourceActions resourceManagerActions = (ResourceActions)Mockito.mock(ResourceActions.class);
        ScheduledExecutor scheduledExecutor = TestingUtils.defaultScheduledExecutor();
        ResourceID resourceId = ResourceID.generate();
        JobID jobId = new JobID();
        AllocationID allocationId = new AllocationID();
        ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
        SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)Mockito.mock(TaskExecutorGateway.class);
        Mockito.when((Object)taskExecutorGateway.requestSlot((SlotID)Matchers.any(SlotID.class), (JobID)Matchers.eq((Object)jobId), (AllocationID)Matchers.eq((Object)allocationId), Matchers.anyString(), (ResourceManagerId)Matchers.eq((Object)resourceManagerId), (Time)Matchers.any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
        TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
        SlotID slotId1 = new SlotID(resourceId, 0);
        SlotID slotId2 = new SlotID(resourceId, 1);
        SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
        SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
        SlotReport initialSlotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
        ScheduledExecutorService mainThreadExecutor = TestingUtils.defaultExecutor();
        try (SlotManager slotManager = new SlotManager(scheduledExecutor, TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), Time.of((long)50L, (TimeUnit)TimeUnit.MILLISECONDS));){
            slotManager.start(resourceManagerId, (Executor)mainThreadExecutor, resourceManagerActions);
            CompletableFuture.supplyAsync(() -> {
                try {
                    return slotManager.registerSlotRequest(slotRequest);
                }
                catch (SlotManagerException e) {
                    throw new CompletionException(e);
                }
            }, mainThreadExecutor).thenAccept(value -> slotManager.registerTaskManager(taskManagerConnection, initialSlotReport));
            ArgumentCaptor slotIdArgumentCaptor = ArgumentCaptor.forClass(SlotID.class);
            ((TaskExecutorGateway)Mockito.verify((Object)taskExecutorGateway, (VerificationMode)Mockito.timeout((long)500L))).requestSlot((SlotID)slotIdArgumentCaptor.capture(), (JobID)Matchers.eq((Object)jobId), (AllocationID)Matchers.eq((Object)allocationId), Matchers.anyString(), (ResourceManagerId)Matchers.eq((Object)resourceManagerId), (Time)Matchers.any(Time.class));
            CompletableFuture<Boolean> idleFuture = CompletableFuture.supplyAsync(() -> slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID()), mainThreadExecutor);
            Assert.assertFalse((boolean)idleFuture.get());
            SlotID slotId = (SlotID)slotIdArgumentCaptor.getValue();
            CompletableFuture<TaskManagerSlot> slotFuture = CompletableFuture.supplyAsync(() -> slotManager.getSlot(slotId), mainThreadExecutor);
            TaskManagerSlot slot = slotFuture.get();
            Assert.assertTrue((slot.getState() == TaskManagerSlot.State.ALLOCATED ? 1 : 0) != 0);
            Assert.assertEquals((Object)allocationId, (Object)slot.getAllocationId());
            CompletionStage idleFuture2 = CompletableFuture.runAsync(() -> slotManager.freeSlot(slotId, allocationId), mainThreadExecutor).thenApply(value -> slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID()));
            Assert.assertTrue((boolean)((Boolean)((CompletableFuture)idleFuture2).get()));
            ((ResourceActions)Mockito.verify((Object)resourceManagerActions, (VerificationMode)Mockito.timeout((long)500L).times(1))).releaseResource((InstanceID)Matchers.eq((Object)taskManagerConnection.getInstanceID()));
        }
    }

    @Test
    public void testTaskManagerTimeoutDoesNotRemoveSlots() throws Exception {
        Time taskManagerTimeout = Time.milliseconds((long)10L);
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ResourceActions resourceActions = (ResourceActions)Mockito.mock(ResourceActions.class);
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)Mockito.mock(TaskExecutorGateway.class);
        TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
        SlotStatus slotStatus = new SlotStatus(new SlotID(ResourceID.generate(), 0), new ResourceProfile(1.0, 1));
        SlotReport initialSlotReport = new SlotReport(slotStatus);
        try (SlotManager slotManager = new SlotManager(TestingUtils.defaultScheduledExecutor(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), taskManagerTimeout);){
            slotManager.start(resourceManagerId, Executors.directExecutor(), resourceActions);
            slotManager.registerTaskManager(taskExecutorConnection, initialSlotReport);
            Assert.assertEquals((long)1L, (long)slotManager.getNumberRegisteredSlots());
            ((ResourceActions)Mockito.verify((Object)resourceActions, (VerificationMode)Mockito.timeout((long)(taskManagerTimeout.toMilliseconds() * 20L)).atLeast(1))).releaseResource((InstanceID)Matchers.eq((Object)taskExecutorConnection.getInstanceID()));
            Assert.assertEquals((long)1L, (long)slotManager.getNumberRegisteredSlots());
            slotManager.unregisterTaskManager(taskExecutorConnection.getInstanceID());
            Assert.assertEquals((long)0L, (long)slotManager.getNumberRegisteredSlots());
        }
    }

    private SlotManager createSlotManager(ResourceManagerId resourceManagerId, ResourceActions resourceManagerActions) {
        SlotManager slotManager = new SlotManager(TestingUtils.defaultScheduledExecutor(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime());
        slotManager.start(resourceManagerId, Executors.directExecutor(), resourceManagerActions);
        return slotManager;
    }
}

