/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager.slotmanager;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.resourcemanager.slotmanager.PendingSlotRequest;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceActions;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
import org.apache.flink.runtime.resourcemanager.slotmanager.TestingResourceActions;
import org.apache.flink.runtime.resourcemanager.slotmanager.TestingResourceActionsBuilder;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.FunctionWithException;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class SlotManagerTest
extends TestLogger {
    @Test
    public void testTaskManagerRegistration() throws Exception {
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        TestingResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
        ResourceID resourceId = ResourceID.generate();
        TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, (TaskExecutorGateway)taskExecutorGateway);
        SlotID slotId1 = new SlotID(resourceId, 0);
        SlotID slotId2 = new SlotID(resourceId, 1);
        ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
        SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
        SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
        SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
        try (SlotManager slotManager = this.createSlotManager(resourceManagerId, resourceManagerActions);){
            slotManager.registerTaskManager(taskManagerConnection, slotReport);
            Assert.assertTrue((String)"The number registered slots does not equal the expected number.", (2 == slotManager.getNumberRegisteredSlots() ? 1 : 0) != 0);
            Assert.assertNotNull((Object)slotManager.getSlot(slotId1));
            Assert.assertNotNull((Object)slotManager.getSlot(slotId2));
        }
    }

    @Test
    public void testTaskManagerUnregistration() throws Exception {
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ResourceActions resourceManagerActions = (ResourceActions)Mockito.mock(ResourceActions.class);
        JobID jobId = new JobID();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(tuple5 -> {
            Assert.assertThat((Object)tuple5.f4, (Matcher)org.hamcrest.Matchers.is((Matcher)org.hamcrest.Matchers.equalTo((Object)resourceManagerId)));
            return new CompletableFuture();
        }).createTestingTaskExecutorGateway();
        ResourceID resourceId = ResourceID.generate();
        TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, (TaskExecutorGateway)taskExecutorGateway);
        SlotID slotId1 = new SlotID(resourceId, 0);
        SlotID slotId2 = new SlotID(resourceId, 1);
        AllocationID allocationId1 = new AllocationID();
        AllocationID allocationId2 = new AllocationID();
        ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
        SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile, jobId, allocationId1);
        SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
        SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
        SlotRequest slotRequest = new SlotRequest(new JobID(), allocationId2, resourceProfile, "foobar");
        try (SlotManager slotManager = this.createSlotManager(resourceManagerId, resourceManagerActions);){
            slotManager.registerTaskManager(taskManagerConnection, slotReport);
            Assert.assertTrue((String)"The number registered slots does not equal the expected number.", (2 == slotManager.getNumberRegisteredSlots() ? 1 : 0) != 0);
            TaskManagerSlot slot1 = slotManager.getSlot(slotId1);
            TaskManagerSlot slot2 = slotManager.getSlot(slotId2);
            Assert.assertTrue((slot1.getState() == TaskManagerSlot.State.ALLOCATED ? 1 : 0) != 0);
            Assert.assertTrue((slot2.getState() == TaskManagerSlot.State.FREE ? 1 : 0) != 0);
            Assert.assertTrue((boolean)slotManager.registerSlotRequest(slotRequest));
            Assert.assertFalse((slot2.getState() == TaskManagerSlot.State.FREE ? 1 : 0) != 0);
            Assert.assertTrue((slot2.getState() == TaskManagerSlot.State.PENDING ? 1 : 0) != 0);
            PendingSlotRequest pendingSlotRequest = slotManager.getSlotRequest(allocationId2);
            Assert.assertTrue((String)"The pending slot request should have been assigned to slot 2", (boolean)pendingSlotRequest.isAssigned());
            slotManager.unregisterTaskManager(taskManagerConnection.getInstanceID());
            Assert.assertTrue((0 == slotManager.getNumberRegisteredSlots() ? 1 : 0) != 0);
            Assert.assertFalse((boolean)pendingSlotRequest.isAssigned());
        }
    }

    @Test
    public void testSlotRequestWithoutFreeSlots() throws Exception {
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
        SlotRequest slotRequest = new SlotRequest(new JobID(), new AllocationID(), resourceProfile, "localhost");
        CompletableFuture allocateResourceFuture = new CompletableFuture();
        TestingResourceActions resourceManagerActions = new TestingResourceActionsBuilder().setAllocateResourceConsumer(allocateResourceFuture::complete).build();
        try (SlotManager slotManager = this.createSlotManager(resourceManagerId, resourceManagerActions);){
            slotManager.registerSlotRequest(slotRequest);
            Assert.assertThat(allocateResourceFuture.get(), (Matcher)org.hamcrest.Matchers.is((Matcher)org.hamcrest.Matchers.equalTo((Object)resourceProfile)));
        }
    }

    @Test
    public void testSlotRequestWithResourceAllocationFailure() throws Exception {
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
        SlotRequest slotRequest = new SlotRequest(new JobID(), new AllocationID(), resourceProfile, "localhost");
        TestingResourceActions resourceManagerActions = new TestingResourceActionsBuilder().setAllocateResourceFunction((FunctionWithException<ResourceProfile, Collection<ResourceProfile>, ResourceManagerException>)((FunctionWithException)value -> {
            throw new ResourceManagerException("Test exception");
        })).build();
        try (SlotManager slotManager = this.createSlotManager(resourceManagerId, resourceManagerActions);){
            slotManager.registerSlotRequest(slotRequest);
            Assert.fail((String)"The slot request should have failed with a ResourceManagerException.");
        }
        catch (ResourceManagerException resourceManagerException) {
            // empty catch block
        }
    }

    @Test
    public void testSlotRequestWithFreeSlot() throws Exception {
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ResourceID resourceID = ResourceID.generate();
        JobID jobId = new JobID();
        SlotID slotId = new SlotID(resourceID, 0);
        String targetAddress = "localhost";
        AllocationID allocationId = new AllocationID();
        ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
        SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "localhost");
        TestingResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build();
        try (SlotManager slotManager = this.createSlotManager(resourceManagerId, resourceManagerActions);){
            CompletableFuture requestFuture = new CompletableFuture();
            TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(tuple5 -> {
                requestFuture.complete(Tuple5.of((Object)tuple5.f0, (Object)tuple5.f1, (Object)tuple5.f2, (Object)tuple5.f3, (Object)tuple5.f4));
                return CompletableFuture.completedFuture(Acknowledge.get());
            }).createTestingTaskExecutorGateway();
            TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, (TaskExecutorGateway)taskExecutorGateway);
            SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
            SlotReport slotReport = new SlotReport(slotStatus);
            slotManager.registerTaskManager(taskExecutorConnection, slotReport);
            Assert.assertTrue((String)"The slot request should be accepted", (boolean)slotManager.registerSlotRequest(slotRequest));
            Assert.assertThat(requestFuture.get(), (Matcher)org.hamcrest.Matchers.is((Matcher)org.hamcrest.Matchers.equalTo((Object)Tuple5.of((Object)slotId, (Object)jobId, (Object)allocationId, (Object)"localhost", (Object)resourceManagerId))));
            TaskManagerSlot slot = slotManager.getSlot(slotId);
            Assert.assertEquals((String)"The slot has not been allocated to the expected allocation id.", (Object)allocationId, (Object)slot.getAllocationId());
        }
    }

    @Test
    public void testUnregisterPendingSlotRequest() throws Exception {
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ResourceActions resourceManagerActions = (ResourceActions)Mockito.mock(ResourceActions.class);
        ResourceID resourceID = ResourceID.generate();
        SlotID slotId = new SlotID(resourceID, 0);
        AllocationID allocationId = new AllocationID();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(slotIDJobIDAllocationIDStringResourceManagerIdTuple5 -> new CompletableFuture()).createTestingTaskExecutorGateway();
        ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
        SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
        SlotReport slotReport = new SlotReport(slotStatus);
        SlotRequest slotRequest = new SlotRequest(new JobID(), allocationId, resourceProfile, "foobar");
        TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, (TaskExecutorGateway)taskExecutorGateway);
        try (SlotManager slotManager = this.createSlotManager(resourceManagerId, resourceManagerActions);){
            slotManager.registerTaskManager(taskManagerConnection, slotReport);
            TaskManagerSlot slot = slotManager.getSlot(slotId);
            slotManager.registerSlotRequest(slotRequest);
            Assert.assertNotNull((Object)slotManager.getSlotRequest(allocationId));
            Assert.assertTrue((slot.getState() == TaskManagerSlot.State.PENDING ? 1 : 0) != 0);
            slotManager.unregisterSlotRequest(allocationId);
            Assert.assertNull((Object)slotManager.getSlotRequest(allocationId));
            slot = slotManager.getSlot(slotId);
            Assert.assertTrue((slot.getState() == TaskManagerSlot.State.FREE ? 1 : 0) != 0);
        }
    }

    @Test
    public void testFulfillingPendingSlotRequest() throws Exception {
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ResourceID resourceID = ResourceID.generate();
        JobID jobId = new JobID();
        SlotID slotId = new SlotID(resourceID, 0);
        String targetAddress = "localhost";
        AllocationID allocationId = new AllocationID();
        ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
        SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "localhost");
        AtomicInteger numberAllocateResourceCalls = new AtomicInteger(0);
        TestingResourceActions resourceManagerActions = new TestingResourceActionsBuilder().setAllocateResourceConsumer(ignored -> numberAllocateResourceCalls.incrementAndGet()).build();
        CompletableFuture requestFuture = new CompletableFuture();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(tuple5 -> {
            requestFuture.complete(Tuple5.of((Object)tuple5.f0, (Object)tuple5.f1, (Object)tuple5.f2, (Object)tuple5.f3, (Object)tuple5.f4));
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).createTestingTaskExecutorGateway();
        TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, (TaskExecutorGateway)taskExecutorGateway);
        SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
        SlotReport slotReport = new SlotReport(slotStatus);
        try (SlotManager slotManager = this.createSlotManager(resourceManagerId, resourceManagerActions);){
            Assert.assertTrue((String)"The slot request should be accepted", (boolean)slotManager.registerSlotRequest(slotRequest));
            Assert.assertThat((Object)numberAllocateResourceCalls.get(), (Matcher)org.hamcrest.Matchers.is((Object)1));
            slotManager.registerTaskManager(taskExecutorConnection, slotReport);
            Assert.assertThat(requestFuture.get(), (Matcher)org.hamcrest.Matchers.is((Matcher)org.hamcrest.Matchers.equalTo((Object)Tuple5.of((Object)slotId, (Object)jobId, (Object)allocationId, (Object)"localhost", (Object)resourceManagerId))));
            TaskManagerSlot slot = slotManager.getSlot(slotId);
            Assert.assertEquals((String)"The slot has not been allocated to the expected allocation id.", (Object)allocationId, (Object)slot.getAllocationId());
        }
    }

    @Test
    public void testFreeSlot() throws Exception {
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ResourceID resourceID = ResourceID.generate();
        JobID jobId = new JobID();
        SlotID slotId = new SlotID(resourceID, 0);
        AllocationID allocationId = new AllocationID();
        ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
        ResourceActions resourceManagerActions = (ResourceActions)Mockito.mock(ResourceActions.class);
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)Mockito.mock(TaskExecutorGateway.class);
        TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
        SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile, jobId, allocationId);
        SlotReport slotReport = new SlotReport(slotStatus);
        try (SlotManager slotManager = this.createSlotManager(resourceManagerId, resourceManagerActions);){
            slotManager.registerTaskManager(taskExecutorConnection, slotReport);
            TaskManagerSlot slot = slotManager.getSlot(slotId);
            Assert.assertEquals((String)"The slot has not been allocated to the expected allocation id.", (Object)allocationId, (Object)slot.getAllocationId());
            slotManager.freeSlot(slotId, new AllocationID());
            Assert.assertTrue((slot.getState() == TaskManagerSlot.State.ALLOCATED ? 1 : 0) != 0);
            Assert.assertEquals((String)"The slot has not been allocated to the expected allocation id.", (Object)allocationId, (Object)slot.getAllocationId());
            slotManager.freeSlot(slotId, allocationId);
            Assert.assertTrue((slot.getState() == TaskManagerSlot.State.FREE ? 1 : 0) != 0);
            Assert.assertNull((Object)slot.getAllocationId());
        }
    }

    @Test
    public void testDuplicatePendingSlotRequest() throws Exception {
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        AtomicInteger numberAllocateResourceFunctionCalls = new AtomicInteger(0);
        TestingResourceActions resourceManagerActions = new TestingResourceActionsBuilder().setAllocateResourceConsumer(resourceProfile -> numberAllocateResourceFunctionCalls.incrementAndGet()).build();
        AllocationID allocationId = new AllocationID();
        ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 2);
        ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 1);
        SlotRequest slotRequest1 = new SlotRequest(new JobID(), allocationId, resourceProfile1, "foobar");
        SlotRequest slotRequest2 = new SlotRequest(new JobID(), allocationId, resourceProfile2, "barfoo");
        try (SlotManager slotManager = this.createSlotManager(resourceManagerId, resourceManagerActions);){
            Assert.assertTrue((boolean)slotManager.registerSlotRequest(slotRequest1));
            Assert.assertFalse((boolean)slotManager.registerSlotRequest(slotRequest2));
        }
        Assert.assertThat((Object)numberAllocateResourceFunctionCalls.get(), (Matcher)org.hamcrest.Matchers.is((Object)1));
    }

    @Test
    public void testDuplicatePendingSlotRequestAfterSlotReport() throws Exception {
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ResourceActions resourceManagerActions = (ResourceActions)Mockito.mock(ResourceActions.class);
        JobID jobId = new JobID();
        AllocationID allocationId = new AllocationID();
        ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
        ResourceID resourceID = ResourceID.generate();
        SlotID slotId = new SlotID(resourceID, 0);
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)Mockito.mock(TaskExecutorGateway.class);
        TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
        SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile, jobId, allocationId);
        SlotReport slotReport = new SlotReport(slotStatus);
        SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
        try (SlotManager slotManager = this.createSlotManager(resourceManagerId, resourceManagerActions);){
            slotManager.registerTaskManager(taskManagerConnection, slotReport);
            Assert.assertFalse((boolean)slotManager.registerSlotRequest(slotRequest));
        }
    }

    @Test
    public void testDuplicatePendingSlotRequestAfterSuccessfulAllocation() throws Exception {
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        AtomicInteger allocateResourceCalls = new AtomicInteger(0);
        TestingResourceActions resourceManagerActions = new TestingResourceActionsBuilder().setAllocateResourceConsumer(resourceProfile -> allocateResourceCalls.incrementAndGet()).build();
        AllocationID allocationId = new AllocationID();
        ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 2);
        ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 1);
        SlotRequest slotRequest1 = new SlotRequest(new JobID(), allocationId, resourceProfile1, "foobar");
        SlotRequest slotRequest2 = new SlotRequest(new JobID(), allocationId, resourceProfile2, "barfoo");
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
        ResourceID resourceID = ResourceID.generate();
        TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, (TaskExecutorGateway)taskExecutorGateway);
        SlotID slotId = new SlotID(resourceID, 0);
        SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile1);
        SlotReport slotReport = new SlotReport(slotStatus);
        try (SlotManager slotManager = this.createSlotManager(resourceManagerId, resourceManagerActions);){
            slotManager.registerTaskManager(taskManagerConnection, slotReport);
            Assert.assertTrue((boolean)slotManager.registerSlotRequest(slotRequest1));
            TaskManagerSlot slot = slotManager.getSlot(slotId);
            Assert.assertEquals((String)"The slot has not been allocated to the expected allocation id.", (Object)allocationId, (Object)slot.getAllocationId());
            Assert.assertFalse((boolean)slotManager.registerSlotRequest(slotRequest2));
        }
        Assert.assertThat((Object)allocateResourceCalls.get(), (Matcher)org.hamcrest.Matchers.is((Object)0));
    }

    @Test
    public void testAcceptingDuplicateSlotRequestAfterAllocationRelease() throws Exception {
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        AtomicInteger allocateResourceCalls = new AtomicInteger(0);
        TestingResourceActions resourceManagerActions = new TestingResourceActionsBuilder().setAllocateResourceConsumer(resourceProfile -> allocateResourceCalls.incrementAndGet()).build();
        AllocationID allocationId = new AllocationID();
        ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 2);
        ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 1);
        SlotRequest slotRequest1 = new SlotRequest(new JobID(), allocationId, resourceProfile1, "foobar");
        SlotRequest slotRequest2 = new SlotRequest(new JobID(), allocationId, resourceProfile2, "barfoo");
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
        ResourceID resourceID = ResourceID.generate();
        TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, (TaskExecutorGateway)taskExecutorGateway);
        SlotID slotId = new SlotID(resourceID, 0);
        SlotStatus slotStatus = new SlotStatus(slotId, new ResourceProfile(2.0, 2));
        SlotReport slotReport = new SlotReport(slotStatus);
        try (SlotManager slotManager = this.createSlotManager(resourceManagerId, resourceManagerActions);){
            slotManager.registerTaskManager(taskManagerConnection, slotReport);
            Assert.assertTrue((boolean)slotManager.registerSlotRequest(slotRequest1));
            TaskManagerSlot slot = slotManager.getSlot(slotId);
            Assert.assertEquals((String)"The slot has not been allocated to the expected allocation id.", (Object)allocationId, (Object)slot.getAllocationId());
            slotManager.freeSlot(slotId, allocationId);
            Assert.assertTrue((slot.getState() == TaskManagerSlot.State.FREE ? 1 : 0) != 0);
            Assert.assertNull((Object)slot.getAllocationId());
            Assert.assertTrue((boolean)slotManager.registerSlotRequest(slotRequest2));
            Assert.assertEquals((String)"The slot has not been allocated to the expected allocation id.", (Object)allocationId, (Object)slot.getAllocationId());
        }
        Assert.assertThat((Object)allocateResourceCalls.get(), (Matcher)org.hamcrest.Matchers.is((Object)0));
    }

    @Test
    public void testReceivingUnknownSlotReport() throws Exception {
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ResourceActions resourceManagerActions = (ResourceActions)Mockito.mock(ResourceActions.class);
        InstanceID unknownInstanceID = new InstanceID();
        SlotID unknownSlotId = new SlotID(ResourceID.generate(), 0);
        ResourceProfile unknownResourceProfile = new ResourceProfile(1.0, 1);
        SlotStatus unknownSlotStatus = new SlotStatus(unknownSlotId, unknownResourceProfile);
        SlotReport unknownSlotReport = new SlotReport(unknownSlotStatus);
        try (SlotManager slotManager = this.createSlotManager(resourceManagerId, resourceManagerActions);){
            Assert.assertTrue((0 == slotManager.getNumberRegisteredSlots() ? 1 : 0) != 0);
            Assert.assertFalse((boolean)slotManager.reportSlotStatus(unknownInstanceID, unknownSlotReport));
            Assert.assertTrue((0 == slotManager.getNumberRegisteredSlots() ? 1 : 0) != 0);
        }
    }

    @Test
    public void testUpdateSlotReport() throws Exception {
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        TestingResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build();
        JobID jobId = new JobID();
        AllocationID allocationId = new AllocationID();
        ResourceID resourceId = ResourceID.generate();
        SlotID slotId1 = new SlotID(resourceId, 0);
        SlotID slotId2 = new SlotID(resourceId, 1);
        ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
        SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
        SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
        SlotStatus newSlotStatus2 = new SlotStatus(slotId2, resourceProfile, jobId, allocationId);
        SlotReport slotReport1 = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
        SlotReport slotReport2 = new SlotReport(Arrays.asList(newSlotStatus2, slotStatus1));
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)Mockito.mock(TaskExecutorGateway.class);
        TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
        try (SlotManager slotManager = this.createSlotManager(resourceManagerId, resourceManagerActions);){
            Assert.assertTrue((0 == slotManager.getNumberRegisteredSlots() ? 1 : 0) != 0);
            slotManager.registerTaskManager(taskManagerConnection, slotReport1);
            TaskManagerSlot slot1 = slotManager.getSlot(slotId1);
            TaskManagerSlot slot2 = slotManager.getSlot(slotId2);
            Assert.assertTrue((2 == slotManager.getNumberRegisteredSlots() ? 1 : 0) != 0);
            Assert.assertTrue((slot1.getState() == TaskManagerSlot.State.FREE ? 1 : 0) != 0);
            Assert.assertTrue((slot2.getState() == TaskManagerSlot.State.FREE ? 1 : 0) != 0);
            Assert.assertTrue((boolean)slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), slotReport2));
            Assert.assertTrue((2 == slotManager.getNumberRegisteredSlots() ? 1 : 0) != 0);
            Assert.assertNotNull((Object)slotManager.getSlot(slotId1));
            Assert.assertNotNull((Object)slotManager.getSlot(slotId2));
            Assert.assertEquals((Object)allocationId, (Object)slotManager.getSlot(slotId2).getAllocationId());
        }
    }

    @Test
    public void testTaskManagerTimeout() throws Exception {
        long tmTimeout = 10L;
        CompletableFuture releaseFuture = new CompletableFuture();
        TestingResourceActions resourceManagerActions = new TestingResourceActionsBuilder().setReleaseResourceConsumer((instanceID, e) -> releaseFuture.complete(instanceID)).build();
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ResourceID resourceID = ResourceID.generate();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
        TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, (TaskExecutorGateway)taskExecutorGateway);
        SlotID slotId = new SlotID(resourceID, 0);
        ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
        SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
        SlotReport slotReport = new SlotReport(slotStatus);
        ScheduledExecutorService mainThreadExecutor = TestingUtils.defaultExecutor();
        try (SlotManager slotManager = new SlotManager(TestingUtils.defaultScheduledExecutor(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), Time.milliseconds((long)10L));){
            slotManager.start(resourceManagerId, (Executor)mainThreadExecutor, (ResourceActions)resourceManagerActions);
            mainThreadExecutor.execute(() -> slotManager.registerTaskManager(taskManagerConnection, slotReport));
            Assert.assertThat(releaseFuture.get(), (Matcher)org.hamcrest.Matchers.is((Matcher)org.hamcrest.Matchers.equalTo((Object)taskManagerConnection.getInstanceID())));
        }
    }

    @Test
    public void testSlotRequestTimeout() throws Exception {
        long allocationTimeout = 50L;
        CompletableFuture failedAllocationFuture = new CompletableFuture();
        TestingResourceActions resourceManagerActions = new TestingResourceActionsBuilder().setNotifyAllocationFailureConsumer(tuple3 -> failedAllocationFuture.complete(Tuple2.of((Object)tuple3.f0, (Object)tuple3.f1))).build();
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        JobID jobId = new JobID();
        AllocationID allocationId = new AllocationID();
        ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
        SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
        ScheduledExecutorService mainThreadExecutor = TestingUtils.defaultExecutor();
        try (SlotManager slotManager = new SlotManager(TestingUtils.defaultScheduledExecutor(), TestingUtils.infiniteTime(), Time.milliseconds((long)50L), TestingUtils.infiniteTime());){
            slotManager.start(resourceManagerId, (Executor)mainThreadExecutor, (ResourceActions)resourceManagerActions);
            AtomicReference<Object> atomicException = new AtomicReference<Object>(null);
            mainThreadExecutor.execute(() -> {
                try {
                    Assert.assertTrue((boolean)slotManager.registerSlotRequest(slotRequest));
                }
                catch (Exception e) {
                    atomicException.compareAndSet(null, e);
                }
            });
            Assert.assertThat(failedAllocationFuture.get(), (Matcher)org.hamcrest.Matchers.is((Matcher)org.hamcrest.Matchers.equalTo((Object)Tuple2.of((Object)jobId, (Object)allocationId))));
            if (atomicException.get() != null) {
                throw (Exception)atomicException.get();
            }
        }
    }

    @Test
    public void testTaskManagerSlotRequestTimeoutHandling() throws Exception {
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ResourceActions resourceManagerActions = (ResourceActions)Mockito.mock(ResourceActions.class);
        JobID jobId = new JobID();
        AllocationID allocationId = new AllocationID();
        ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
        SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
        CompletableFuture slotRequestFuture1 = new CompletableFuture();
        CompletableFuture<Acknowledge> slotRequestFuture2 = new CompletableFuture<Acknowledge>();
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)Mockito.mock(TaskExecutorGateway.class);
        Mockito.when((Object)taskExecutorGateway.requestSlot((SlotID)Matchers.any(SlotID.class), (JobID)Matchers.any(JobID.class), (AllocationID)Matchers.eq((Object)allocationId), Matchers.anyString(), (ResourceManagerId)Matchers.any(ResourceManagerId.class), (Time)Matchers.any(Time.class))).thenReturn(slotRequestFuture1, (Object[])new CompletableFuture[]{slotRequestFuture2});
        ResourceID resourceId = ResourceID.generate();
        TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
        SlotID slotId1 = new SlotID(resourceId, 0);
        SlotID slotId2 = new SlotID(resourceId, 1);
        SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
        SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
        SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
        try (SlotManager slotManager = this.createSlotManager(resourceManagerId, resourceManagerActions);){
            slotManager.registerTaskManager(taskManagerConnection, slotReport);
            slotManager.registerSlotRequest(slotRequest);
            ArgumentCaptor slotIdCaptor = ArgumentCaptor.forClass(SlotID.class);
            ((TaskExecutorGateway)Mockito.verify((Object)taskExecutorGateway, (VerificationMode)Mockito.times((int)1))).requestSlot((SlotID)slotIdCaptor.capture(), (JobID)Matchers.eq((Object)jobId), (AllocationID)Matchers.eq((Object)allocationId), Matchers.anyString(), (ResourceManagerId)Matchers.eq((Object)resourceManagerId), (Time)Matchers.any(Time.class));
            TaskManagerSlot failedSlot = slotManager.getSlot((SlotID)slotIdCaptor.getValue());
            slotRequestFuture1.completeExceptionally((Throwable)new SlotAllocationException("Test exception."));
            ((TaskExecutorGateway)Mockito.verify((Object)taskExecutorGateway, (VerificationMode)Mockito.times((int)2))).requestSlot((SlotID)slotIdCaptor.capture(), (JobID)Matchers.eq((Object)jobId), (AllocationID)Matchers.eq((Object)allocationId), Matchers.anyString(), (ResourceManagerId)Matchers.eq((Object)resourceManagerId), (Time)Matchers.any(Time.class));
            slotRequestFuture2.complete(Acknowledge.get());
            TaskManagerSlot slot = slotManager.getSlot((SlotID)slotIdCaptor.getValue());
            Assert.assertTrue((slot.getState() == TaskManagerSlot.State.ALLOCATED ? 1 : 0) != 0);
            Assert.assertEquals((Object)allocationId, (Object)slot.getAllocationId());
            if (!failedSlot.getSlotId().equals((Object)slot.getSlotId())) {
                Assert.assertTrue((failedSlot.getState() == TaskManagerSlot.State.FREE ? 1 : 0) != 0);
            }
        }
    }

    @Test
    public void testSlotReportWhileActiveSlotRequest() throws Exception {
        long verifyTimeout = 10000L;
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        TestingResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build();
        JobID jobId = new JobID();
        AllocationID allocationId = new AllocationID();
        ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
        SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
        CompletableFuture slotRequestFuture1 = new CompletableFuture();
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)Mockito.mock(TaskExecutorGateway.class);
        Mockito.when((Object)taskExecutorGateway.requestSlot((SlotID)Matchers.any(SlotID.class), (JobID)Matchers.any(JobID.class), (AllocationID)Matchers.eq((Object)allocationId), Matchers.anyString(), (ResourceManagerId)Matchers.any(ResourceManagerId.class), (Time)Matchers.any(Time.class))).thenReturn(slotRequestFuture1, (Object[])new CompletableFuture[]{CompletableFuture.completedFuture(Acknowledge.get())});
        ResourceID resourceId = ResourceID.generate();
        TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
        SlotID slotId1 = new SlotID(resourceId, 0);
        SlotID slotId2 = new SlotID(resourceId, 1);
        SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
        SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
        SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
        ScheduledExecutorService mainThreadExecutor = TestingUtils.defaultExecutor();
        try (SlotManager slotManager = new SlotManager(TestingUtils.defaultScheduledExecutor(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime());){
            slotManager.start(resourceManagerId, (Executor)mainThreadExecutor, (ResourceActions)resourceManagerActions);
            CompletionStage registrationFuture = CompletableFuture.supplyAsync(() -> {
                slotManager.registerTaskManager(taskManagerConnection, slotReport);
                return null;
            }, mainThreadExecutor).thenAccept(value -> {
                try {
                    slotManager.registerSlotRequest(slotRequest);
                }
                catch (SlotManagerException e) {
                    throw new RuntimeException("Could not register slots.", e);
                }
            });
            ((CompletableFuture)registrationFuture).get();
            ArgumentCaptor slotIdCaptor = ArgumentCaptor.forClass(SlotID.class);
            ((TaskExecutorGateway)Mockito.verify((Object)taskExecutorGateway, (VerificationMode)Mockito.times((int)1))).requestSlot((SlotID)slotIdCaptor.capture(), (JobID)Matchers.eq((Object)jobId), (AllocationID)Matchers.eq((Object)allocationId), Matchers.anyString(), (ResourceManagerId)Matchers.eq((Object)resourceManagerId), (Time)Matchers.any(Time.class));
            SlotID requestedSlotId = (SlotID)slotIdCaptor.getValue();
            SlotID freeSlotId = requestedSlotId.equals((Object)slotId1) ? slotId2 : slotId1;
            CompletableFuture<Boolean> freeSlotFuture = CompletableFuture.supplyAsync(() -> slotManager.getSlot(freeSlotId).getState() == TaskManagerSlot.State.FREE, mainThreadExecutor);
            Assert.assertTrue((boolean)freeSlotFuture.get());
            SlotStatus newSlotStatus1 = new SlotStatus((SlotID)slotIdCaptor.getValue(), resourceProfile, new JobID(), new AllocationID());
            SlotStatus newSlotStatus2 = new SlotStatus(freeSlotId, resourceProfile);
            SlotReport newSlotReport = new SlotReport(Arrays.asList(newSlotStatus1, newSlotStatus2));
            CompletableFuture<Boolean> reportSlotStatusFuture = CompletableFuture.supplyAsync(() -> slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), newSlotReport), mainThreadExecutor);
            Assert.assertTrue((boolean)reportSlotStatusFuture.get());
            ((TaskExecutorGateway)Mockito.verify((Object)taskExecutorGateway, (VerificationMode)Mockito.timeout((long)10000L).times(2))).requestSlot((SlotID)slotIdCaptor.capture(), (JobID)Matchers.eq((Object)jobId), (AllocationID)Matchers.eq((Object)allocationId), Matchers.anyString(), (ResourceManagerId)Matchers.eq((Object)resourceManagerId), (Time)Matchers.any(Time.class));
            SlotID requestedSlotId2 = (SlotID)slotIdCaptor.getValue();
            Assert.assertEquals((Object)slotId2, (Object)requestedSlotId2);
            CompletableFuture<TaskManagerSlot> requestedSlotFuture = CompletableFuture.supplyAsync(() -> slotManager.getSlot(requestedSlotId2), mainThreadExecutor);
            TaskManagerSlot slot = requestedSlotFuture.get();
            Assert.assertTrue((slot.getState() == TaskManagerSlot.State.ALLOCATED ? 1 : 0) != 0);
            Assert.assertEquals((Object)allocationId, (Object)slot.getAllocationId());
        }
    }

    @Test
    public void testTimeoutForUnusedTaskManager() throws Exception {
        long taskManagerTimeout = 50L;
        CompletableFuture releasedResourceFuture = new CompletableFuture();
        TestingResourceActions resourceManagerActions = new TestingResourceActionsBuilder().setReleaseResourceConsumer((instanceID, e) -> releasedResourceFuture.complete(instanceID)).build();
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ScheduledExecutor scheduledExecutor = TestingUtils.defaultScheduledExecutor();
        ResourceID resourceId = ResourceID.generate();
        JobID jobId = new JobID();
        AllocationID allocationId = new AllocationID();
        ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
        SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
        CompletableFuture requestedSlotFuture = new CompletableFuture();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(tuple5 -> {
            requestedSlotFuture.complete(tuple5.f0);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).createTestingTaskExecutorGateway();
        TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, (TaskExecutorGateway)taskExecutorGateway);
        SlotID slotId1 = new SlotID(resourceId, 0);
        SlotID slotId2 = new SlotID(resourceId, 1);
        SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
        SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
        SlotReport initialSlotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
        ScheduledExecutorService mainThreadExecutor = TestingUtils.defaultExecutor();
        try (SlotManager slotManager = new SlotManager(scheduledExecutor, TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), Time.of((long)50L, (TimeUnit)TimeUnit.MILLISECONDS));){
            slotManager.start(resourceManagerId, (Executor)mainThreadExecutor, (ResourceActions)resourceManagerActions);
            CompletableFuture.supplyAsync(() -> {
                try {
                    return slotManager.registerSlotRequest(slotRequest);
                }
                catch (SlotManagerException e) {
                    throw new CompletionException(e);
                }
            }, mainThreadExecutor).thenRun(() -> slotManager.registerTaskManager(taskManagerConnection, initialSlotReport));
            SlotID slotId = (SlotID)requestedSlotFuture.get();
            CompletableFuture<Boolean> idleFuture = CompletableFuture.supplyAsync(() -> slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID()), mainThreadExecutor);
            Assert.assertFalse((boolean)idleFuture.get());
            CompletableFuture<TaskManagerSlot> slotFuture = CompletableFuture.supplyAsync(() -> slotManager.getSlot(slotId), mainThreadExecutor);
            TaskManagerSlot slot = slotFuture.get();
            Assert.assertTrue((slot.getState() == TaskManagerSlot.State.ALLOCATED ? 1 : 0) != 0);
            Assert.assertEquals((Object)allocationId, (Object)slot.getAllocationId());
            CompletionStage idleFuture2 = CompletableFuture.runAsync(() -> slotManager.freeSlot(slotId, allocationId), mainThreadExecutor).thenApply(value -> slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID()));
            Assert.assertTrue((boolean)((Boolean)((CompletableFuture)idleFuture2).get()));
            Assert.assertThat(releasedResourceFuture.get(), (Matcher)org.hamcrest.Matchers.is((Matcher)org.hamcrest.Matchers.equalTo((Object)taskManagerConnection.getInstanceID())));
        }
    }

    @Test
    public void testTaskManagerTimeoutDoesNotRemoveSlots() throws Exception {
        Time taskManagerTimeout = Time.milliseconds((long)10L);
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ResourceID resourceID = ResourceID.generate();
        ResourceActions resourceActions = (ResourceActions)Mockito.mock(ResourceActions.class);
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)Mockito.mock(TaskExecutorGateway.class);
        TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
        SlotStatus slotStatus = new SlotStatus(new SlotID(resourceID, 0), new ResourceProfile(1.0, 1));
        SlotReport initialSlotReport = new SlotReport(slotStatus);
        try (SlotManager slotManager = new SlotManager(TestingUtils.defaultScheduledExecutor(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), taskManagerTimeout);){
            slotManager.start(resourceManagerId, Executors.directExecutor(), resourceActions);
            slotManager.registerTaskManager(taskExecutorConnection, initialSlotReport);
            Assert.assertEquals((long)1L, (long)slotManager.getNumberRegisteredSlots());
            ((ResourceActions)Mockito.verify((Object)resourceActions, (VerificationMode)Mockito.timeout((long)(taskManagerTimeout.toMilliseconds() * 20L)).atLeast(1))).releaseResource((InstanceID)Matchers.eq((Object)taskExecutorConnection.getInstanceID()), (Exception)Matchers.any(Exception.class));
            Assert.assertEquals((long)1L, (long)slotManager.getNumberRegisteredSlots());
            slotManager.unregisterTaskManager(taskExecutorConnection.getInstanceID());
            Assert.assertEquals((long)0L, (long)slotManager.getNumberRegisteredSlots());
        }
    }

    @Test
    public void testReportAllocatedSlot() throws Exception {
        ResourceID taskManagerId = ResourceID.generate();
        TestingResourceActions resourceActions = new TestingResourceActionsBuilder().build();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
        TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskManagerId, (TaskExecutorGateway)taskExecutorGateway);
        try (SlotManager slotManager = new SlotManager(TestingUtils.defaultScheduledExecutor(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime());){
            slotManager.start(ResourceManagerId.generate(), Executors.directExecutor(), (ResourceActions)resourceActions);
            SlotID slotId = new SlotID(taskManagerId, 0);
            SlotStatus initialSlotStatus = new SlotStatus(slotId, ResourceProfile.UNKNOWN);
            SlotReport initialSlotReport = new SlotReport(initialSlotStatus);
            slotManager.registerTaskManager(taskExecutorConnection, initialSlotReport);
            Assert.assertThat((Object)slotManager.getNumberRegisteredSlots(), (Matcher)org.hamcrest.Matchers.is((Matcher)org.hamcrest.Matchers.equalTo((Object)1)));
            SlotStatus slotStatus = new SlotStatus(slotId, ResourceProfile.UNKNOWN, new JobID(), new AllocationID());
            SlotReport slotReport = new SlotReport(slotStatus);
            slotManager.reportSlotStatus(taskExecutorConnection.getInstanceID(), slotReport);
            AllocationID allocationId = new AllocationID();
            SlotRequest slotRequest = new SlotRequest(new JobID(), allocationId, ResourceProfile.UNKNOWN, "foobar");
            slotManager.registerSlotRequest(slotRequest);
            Assert.assertThat((Object)slotManager.getSlotRequest(allocationId).isAssigned(), (Matcher)org.hamcrest.Matchers.is((Object)false));
        }
    }

    @Test
    public void testSlotRequestFailure() throws Exception {
        try (SlotManager slotManager = this.createSlotManager(ResourceManagerId.generate(), new TestingResourceActionsBuilder().build());){
            SlotRequest slotRequest = new SlotRequest(new JobID(), new AllocationID(), ResourceProfile.UNKNOWN, "foobar");
            slotManager.registerSlotRequest(slotRequest);
            ArrayBlockingQueue requestSlotQueue = new ArrayBlockingQueue(1);
            ArrayBlockingQueue responseQueue = new ArrayBlockingQueue(1);
            TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(slotIDJobIDAllocationIDStringResourceManagerIdTuple5 -> {
                requestSlotQueue.offer(slotIDJobIDAllocationIDStringResourceManagerIdTuple5);
                try {
                    return (CompletableFuture)responseQueue.take();
                }
                catch (InterruptedException ignored) {
                    return FutureUtils.completedExceptionally((Throwable)new FlinkException("Response queue was interrupted."));
                }
            }).createTestingTaskExecutorGateway();
            ResourceID taskExecutorResourceId = ResourceID.generate();
            TaskExecutorConnection taskExecutionConnection = new TaskExecutorConnection(taskExecutorResourceId, (TaskExecutorGateway)testingTaskExecutorGateway);
            SlotReport slotReport = new SlotReport(new SlotStatus(new SlotID(taskExecutorResourceId, 0), ResourceProfile.UNKNOWN));
            CompletableFuture firstManualSlotRequestResponse = new CompletableFuture();
            responseQueue.offer(firstManualSlotRequestResponse);
            slotManager.registerTaskManager(taskExecutionConnection, slotReport);
            Tuple5 firstRequest = (Tuple5)requestSlotQueue.take();
            CompletableFuture<Acknowledge> secondManualSlotRequestResponse = new CompletableFuture<Acknowledge>();
            responseQueue.offer(secondManualSlotRequestResponse);
            firstManualSlotRequestResponse.completeExceptionally((Throwable)new SlotAllocationException("Test exception"));
            Tuple5 secondRequest = (Tuple5)requestSlotQueue.take();
            Assert.assertThat((Object)secondRequest.f2, (Matcher)org.hamcrest.Matchers.equalTo((Object)firstRequest.f2));
            Assert.assertThat((Object)secondRequest.f0, (Matcher)org.hamcrest.Matchers.equalTo((Object)firstRequest.f0));
            secondManualSlotRequestResponse.complete(Acknowledge.get());
            TaskManagerSlot slot = slotManager.getSlot((SlotID)secondRequest.f0);
            Assert.assertThat((Object)slot.getState(), (Matcher)org.hamcrest.Matchers.equalTo((Object)TaskManagerSlot.State.ALLOCATED));
            Assert.assertThat((Object)slot.getAllocationId(), (Matcher)org.hamcrest.Matchers.equalTo((Object)secondRequest.f2));
        }
    }

    @Test
    public void testSlotRequestRemovedIfTMReportAllocation() throws Exception {
        try (SlotManager slotManager = this.createSlotManager(ResourceManagerId.generate(), new TestingResourceActionsBuilder().build());){
            JobID jobID = new JobID();
            SlotRequest slotRequest1 = new SlotRequest(jobID, new AllocationID(), ResourceProfile.UNKNOWN, "foobar");
            slotManager.registerSlotRequest(slotRequest1);
            ArrayBlockingQueue requestSlotQueue = new ArrayBlockingQueue(1);
            ArrayBlockingQueue responseQueue = new ArrayBlockingQueue(1);
            TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(slotIDJobIDAllocationIDStringResourceManagerIdTuple5 -> {
                requestSlotQueue.offer(slotIDJobIDAllocationIDStringResourceManagerIdTuple5);
                try {
                    return (CompletableFuture)responseQueue.take();
                }
                catch (InterruptedException ignored) {
                    return FutureUtils.completedExceptionally((Throwable)new FlinkException("Response queue was interrupted."));
                }
            }).createTestingTaskExecutorGateway();
            ResourceID taskExecutorResourceId = ResourceID.generate();
            TaskExecutorConnection taskExecutionConnection = new TaskExecutorConnection(taskExecutorResourceId, (TaskExecutorGateway)testingTaskExecutorGateway);
            SlotReport slotReport = new SlotReport(new SlotStatus(new SlotID(taskExecutorResourceId, 0), ResourceProfile.UNKNOWN));
            CompletableFuture firstManualSlotRequestResponse = new CompletableFuture();
            responseQueue.offer(firstManualSlotRequestResponse);
            slotManager.registerTaskManager(taskExecutionConnection, slotReport);
            Tuple5 firstRequest = (Tuple5)requestSlotQueue.take();
            CompletableFuture<Acknowledge> secondManualSlotRequestResponse = new CompletableFuture<Acknowledge>();
            responseQueue.offer(secondManualSlotRequestResponse);
            SlotRequest slotRequest2 = new SlotRequest(jobID, new AllocationID(), ResourceProfile.UNKNOWN, "foobar");
            slotManager.registerSlotRequest(slotRequest2);
            firstManualSlotRequestResponse.completeExceptionally(new TimeoutException("Test exception to fail first allocation"));
            Tuple5 secondRequest = (Tuple5)requestSlotQueue.take();
            secondManualSlotRequestResponse.completeExceptionally((Throwable)new SlotOccupiedException("Test exception", slotRequest1.getAllocationId(), jobID));
            Assert.assertThat((Object)firstRequest.f2, (Matcher)org.hamcrest.Matchers.equalTo((Object)slotRequest1.getAllocationId()));
            Assert.assertThat((Object)secondRequest.f2, (Matcher)org.hamcrest.Matchers.equalTo((Object)slotRequest2.getAllocationId()));
            Assert.assertThat((Object)secondRequest.f0, (Matcher)org.hamcrest.Matchers.equalTo((Object)firstRequest.f0));
            secondManualSlotRequestResponse.complete(Acknowledge.get());
            TaskManagerSlot slot = slotManager.getSlot((SlotID)secondRequest.f0);
            Assert.assertThat((Object)slot.getState(), (Matcher)org.hamcrest.Matchers.equalTo((Object)TaskManagerSlot.State.ALLOCATED));
            Assert.assertThat((Object)slot.getAllocationId(), (Matcher)org.hamcrest.Matchers.equalTo((Object)firstRequest.f2));
            Assert.assertThat((Object)slotManager.getNumberRegisteredSlots(), (Matcher)org.hamcrest.Matchers.is((Object)1));
        }
    }

    @Test
    public void testNotifyFailedAllocationWhenTaskManagerTerminated() throws Exception {
        ArrayDeque allocationFailures = new ArrayDeque(5);
        TestingResourceActions resourceManagerActions = new TestingResourceActionsBuilder().setNotifyAllocationFailureConsumer(failureMessage -> allocationFailures.offer(Tuple2.of((Object)failureMessage.f0, (Object)failureMessage.f1))).build();
        try (SlotManager slotManager = this.createSlotManager(ResourceManagerId.generate(), resourceManagerActions);){
            Tuple2 allocationFailure;
            JobID jobId1 = new JobID();
            SlotRequest slotRequest11 = this.createSlotRequest(jobId1);
            SlotRequest slotRequest12 = this.createSlotRequest(jobId1);
            slotManager.registerSlotRequest(slotRequest11);
            slotManager.registerSlotRequest(slotRequest12);
            ResourceID taskExecutorResourceId1 = ResourceID.generate();
            TestingTaskExecutorGateway testingTaskExecutorGateway1 = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
            TaskExecutorConnection taskExecutionConnection1 = new TaskExecutorConnection(taskExecutorResourceId1, (TaskExecutorGateway)testingTaskExecutorGateway1);
            SlotReport slotReport1 = this.createSlotReport(taskExecutorResourceId1, 2);
            slotManager.registerTaskManager(taskExecutionConnection1, slotReport1);
            JobID jobId2 = new JobID();
            SlotRequest slotRequest21 = this.createSlotRequest(jobId2);
            SlotRequest slotRequest22 = this.createSlotRequest(jobId2);
            slotManager.registerSlotRequest(slotRequest21);
            slotManager.registerSlotRequest(slotRequest22);
            JobID jobId3 = new JobID();
            SlotRequest slotRequest31 = this.createSlotRequest(jobId3);
            slotManager.registerSlotRequest(slotRequest31);
            ResourceID taskExecutorResourceId2 = ResourceID.generate();
            TestingTaskExecutorGateway testingTaskExecutorGateway2 = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
            TaskExecutorConnection taskExecutionConnection2 = new TaskExecutorConnection(taskExecutorResourceId2, (TaskExecutorGateway)testingTaskExecutorGateway2);
            SlotReport slotReport2 = this.createSlotReport(taskExecutorResourceId2, 3);
            slotManager.registerTaskManager(taskExecutionConnection2, slotReport2);
            slotManager.unregisterTaskManager(taskExecutionConnection1.getInstanceID());
            Assert.assertThat(allocationFailures, (Matcher)org.hamcrest.Matchers.hasSize((int)2));
            HashSet<Object> failedAllocations = new HashSet<Object>(2);
            while ((allocationFailure = (Tuple2)allocationFailures.poll()) != null) {
                Assert.assertThat((Object)allocationFailure.f0, (Matcher)org.hamcrest.Matchers.equalTo((Object)jobId1));
                failedAllocations.add(allocationFailure.f1);
            }
            Assert.assertThat(failedAllocations, (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new AllocationID[]{slotRequest11.getAllocationId(), slotRequest12.getAllocationId()}));
            slotManager.unregisterTaskManager(taskExecutionConnection2.getInstanceID());
            Assert.assertThat(allocationFailures, (Matcher)org.hamcrest.Matchers.hasSize((int)3));
            Map<JobID, List<Tuple2<JobID, AllocationID>>> job2AndJob3FailedAllocationInfo = allocationFailures.stream().collect(Collectors.groupingBy(tuple -> (JobID)tuple.f0));
            Assert.assertThat(job2AndJob3FailedAllocationInfo.entrySet(), (Matcher)org.hamcrest.Matchers.hasSize((int)2));
            Set<AllocationID> job2FailedAllocations = this.extractFailedAllocationsForJob(jobId2, job2AndJob3FailedAllocationInfo);
            Set<AllocationID> job3FailedAllocations = this.extractFailedAllocationsForJob(jobId3, job2AndJob3FailedAllocationInfo);
            Assert.assertThat(job2FailedAllocations, (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new AllocationID[]{slotRequest21.getAllocationId(), slotRequest22.getAllocationId()}));
            Assert.assertThat(job3FailedAllocations, (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new AllocationID[]{slotRequest31.getAllocationId()}));
        }
    }

    private Set<AllocationID> extractFailedAllocationsForJob(JobID jobId2, Map<JobID, List<Tuple2<JobID, AllocationID>>> job2AndJob3FailedAllocationInfo) {
        return job2AndJob3FailedAllocationInfo.get(jobId2).stream().map(t -> (AllocationID)t.f1).collect(Collectors.toSet());
    }

    @Nonnull
    private SlotReport createSlotReport(ResourceID taskExecutorResourceId, int numberSlots) {
        return this.createSlotReport(taskExecutorResourceId, numberSlots, ResourceProfile.UNKNOWN);
    }

    @Nonnull
    private SlotReport createSlotReport(ResourceID taskExecutorResourceId, int numberSlots, ResourceProfile resourceProfile) {
        HashSet<SlotStatus> slotStatusSet = new HashSet<SlotStatus>(numberSlots);
        for (int i = 0; i < numberSlots; ++i) {
            slotStatusSet.add(new SlotStatus(new SlotID(taskExecutorResourceId, i), resourceProfile));
        }
        return new SlotReport(slotStatusSet);
    }

    @Nonnull
    private SlotRequest createSlotRequest(JobID jobId) {
        return this.createSlotRequest(jobId, ResourceProfile.UNKNOWN);
    }

    @Nonnull
    private SlotRequest createSlotRequest(JobID jobId, ResourceProfile resourceProfile) {
        return new SlotRequest(jobId, new AllocationID(), resourceProfile, "foobar1");
    }

    private SlotManager createSlotManager(ResourceManagerId resourceManagerId, ResourceActions resourceManagerActions) {
        SlotManager slotManager = new SlotManager(TestingUtils.defaultScheduledExecutor(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime());
        slotManager.start(resourceManagerId, Executors.directExecutor(), resourceManagerActions);
        return slotManager;
    }

    @Test
    public void testRequestNewResources() throws Exception {
        int numberSlots = 2;
        AtomicInteger resourceRequests = new AtomicInteger(0);
        TestingResourceActions testingResourceActions = new TestingResourceActionsBuilder().setAllocateResourceFunction(SlotManagerTest.convert((FunctionWithException<ResourceProfile, Integer, ResourceManagerException>)((FunctionWithException)ignored -> {
            resourceRequests.incrementAndGet();
            return 2;
        }))).build();
        try (SlotManager slotManager = this.createSlotManager(ResourceManagerId.generate(), testingResourceActions);){
            JobID jobId = new JobID();
            Assert.assertThat((Object)slotManager.registerSlotRequest(this.createSlotRequest(jobId)), (Matcher)org.hamcrest.Matchers.is((Object)true));
            Assert.assertThat((Object)resourceRequests.get(), (Matcher)org.hamcrest.Matchers.is((Object)1));
            Assert.assertThat((Object)slotManager.registerSlotRequest(this.createSlotRequest(jobId)), (Matcher)org.hamcrest.Matchers.is((Object)true));
            Assert.assertThat((Object)resourceRequests.get(), (Matcher)org.hamcrest.Matchers.is((Object)1));
            Assert.assertThat((Object)slotManager.getNumberAssignedPendingTaskManagerSlots(), (Matcher)org.hamcrest.Matchers.is((Object)2));
            Assert.assertThat((Object)slotManager.registerSlotRequest(this.createSlotRequest(jobId)), (Matcher)org.hamcrest.Matchers.is((Object)true));
            Assert.assertThat((Object)resourceRequests.get(), (Matcher)org.hamcrest.Matchers.is((Object)2));
        }
    }

    @Test
    public void testFailingAllocationReturnsPendingTaskManagerSlot() throws Exception {
        int numberSlots = 2;
        TestingResourceActions resourceActions = new TestingResourceActionsBuilder().setAllocateResourceFunction(SlotManagerTest.convert((FunctionWithException<ResourceProfile, Integer, ResourceManagerException>)((FunctionWithException)value -> 2))).build();
        try (SlotManager slotManager = this.createSlotManager(ResourceManagerId.generate(), resourceActions);){
            JobID jobId = new JobID();
            SlotRequest slotRequest = this.createSlotRequest(jobId);
            Assert.assertThat((Object)slotManager.registerSlotRequest(slotRequest), (Matcher)org.hamcrest.Matchers.is((Object)true));
            Assert.assertThat((Object)slotManager.getNumberPendingTaskManagerSlots(), (Matcher)org.hamcrest.Matchers.is((Object)2));
            Assert.assertThat((Object)slotManager.getNumberAssignedPendingTaskManagerSlots(), (Matcher)org.hamcrest.Matchers.is((Object)1));
            slotManager.unregisterSlotRequest(slotRequest.getAllocationId());
            Assert.assertThat((Object)slotManager.getNumberPendingTaskManagerSlots(), (Matcher)org.hamcrest.Matchers.is((Object)2));
            Assert.assertThat((Object)slotManager.getNumberAssignedPendingTaskManagerSlots(), (Matcher)org.hamcrest.Matchers.is((Object)0));
        }
    }

    @Test
    public void testPendingTaskManagerSlotCompletion() throws Exception {
        int numberSlots = 3;
        TestingResourceActions resourceActions = new TestingResourceActionsBuilder().setAllocateResourceFunction(SlotManagerTest.convert((FunctionWithException<ResourceProfile, Integer, ResourceManagerException>)((FunctionWithException)value -> 3))).build();
        try (SlotManager slotManager = this.createSlotManager(ResourceManagerId.generate(), resourceActions);){
            JobID jobId = new JobID();
            Assert.assertThat((Object)slotManager.registerSlotRequest(this.createSlotRequest(jobId)), (Matcher)org.hamcrest.Matchers.is((Object)true));
            Assert.assertThat((Object)slotManager.getNumberPendingTaskManagerSlots(), (Matcher)org.hamcrest.Matchers.is((Object)3));
            Assert.assertThat((Object)slotManager.getNumberAssignedPendingTaskManagerSlots(), (Matcher)org.hamcrest.Matchers.is((Object)1));
            Assert.assertThat((Object)slotManager.getNumberRegisteredSlots(), (Matcher)org.hamcrest.Matchers.is((Object)0));
            TaskExecutorConnection taskExecutorConnection = this.createTaskExecutorConnection();
            SlotReport slotReport = this.createSlotReport(taskExecutorConnection.getResourceID(), 2);
            slotManager.registerTaskManager(taskExecutorConnection, slotReport);
            Assert.assertThat((Object)slotManager.getNumberRegisteredSlots(), (Matcher)org.hamcrest.Matchers.is((Object)2));
            Assert.assertThat((Object)slotManager.getNumberPendingTaskManagerSlots(), (Matcher)org.hamcrest.Matchers.is((Object)1));
        }
    }

    private TaskExecutorConnection createTaskExecutorConnection() {
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
        return new TaskExecutorConnection(ResourceID.generate(), (TaskExecutorGateway)taskExecutorGateway);
    }

    @Test
    public void testRegistrationOfDifferentSlot() throws Exception {
        boolean numberSlots = true;
        TestingResourceActions resourceActions = new TestingResourceActionsBuilder().setAllocateResourceFunction(SlotManagerTest.convert((FunctionWithException<ResourceProfile, Integer, ResourceManagerException>)((FunctionWithException)value -> 1))).build();
        try (SlotManager slotManager = this.createSlotManager(ResourceManagerId.generate(), resourceActions);){
            JobID jobId = new JobID();
            ResourceProfile requestedSlotProfile = new ResourceProfile(1.0, 1);
            Assert.assertThat((Object)slotManager.registerSlotRequest(this.createSlotRequest(jobId, requestedSlotProfile)), (Matcher)org.hamcrest.Matchers.is((Object)true));
            Assert.assertThat((Object)slotManager.getNumberPendingTaskManagerSlots(), (Matcher)org.hamcrest.Matchers.is((Object)1));
            boolean numberOfferedSlots = true;
            TaskExecutorConnection taskExecutorConnection = this.createTaskExecutorConnection();
            ResourceProfile offeredSlotProfile = new ResourceProfile(2.0, 2);
            SlotReport slotReport = this.createSlotReport(taskExecutorConnection.getResourceID(), 1, offeredSlotProfile);
            slotManager.registerTaskManager(taskExecutorConnection, slotReport);
            Assert.assertThat((Object)slotManager.getNumberRegisteredSlots(), (Matcher)org.hamcrest.Matchers.is((Object)1));
            Assert.assertThat((Object)slotManager.getNumberPendingTaskManagerSlots(), (Matcher)org.hamcrest.Matchers.is((Object)1));
            Assert.assertThat((Object)slotManager.getNumberAssignedPendingTaskManagerSlots(), (Matcher)org.hamcrest.Matchers.is((Object)0));
        }
    }

    @Test
    public void testOnlyFreeSlotsCanFulfillPendingTaskManagerSlot() throws Exception {
        boolean numberSlots = true;
        TestingResourceActions resourceActions = new TestingResourceActionsBuilder().setAllocateResourceFunction(SlotManagerTest.convert((FunctionWithException<ResourceProfile, Integer, ResourceManagerException>)((FunctionWithException)value -> 1))).build();
        try (SlotManager slotManager = this.createSlotManager(ResourceManagerId.generate(), resourceActions);){
            JobID jobId = new JobID();
            Assert.assertThat((Object)slotManager.registerSlotRequest(this.createSlotRequest(jobId)), (Matcher)org.hamcrest.Matchers.is((Object)true));
            TaskExecutorConnection taskExecutorConnection = this.createTaskExecutorConnection();
            SlotID slotId = new SlotID(taskExecutorConnection.getResourceID(), 0);
            SlotStatus slotStatus = new SlotStatus(slotId, ResourceProfile.UNKNOWN, jobId, new AllocationID());
            SlotReport slotReport = new SlotReport(slotStatus);
            slotManager.registerTaskManager(taskExecutorConnection, slotReport);
            Assert.assertThat((Object)slotManager.getNumberRegisteredSlots(), (Matcher)org.hamcrest.Matchers.is((Object)1));
            Assert.assertThat((Object)slotManager.getNumberPendingTaskManagerSlots(), (Matcher)org.hamcrest.Matchers.is((Object)1));
            Assert.assertThat((Object)slotManager.getNumberAssignedPendingTaskManagerSlots(), (Matcher)org.hamcrest.Matchers.is((Object)1));
        }
    }

    private static FunctionWithException<ResourceProfile, Collection<ResourceProfile>, ResourceManagerException> convert(FunctionWithException<ResourceProfile, Integer, ResourceManagerException> function) {
        return resourceProfile -> {
            int slots = (Integer)function.apply(resourceProfile);
            ArrayList<ResourceProfile> result = new ArrayList<ResourceProfile>(slots);
            for (int i = 0; i < slots; ++i) {
                result.add((ResourceProfile)resourceProfile);
            }
            return result;
        };
    }
}

