/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster.slotpool;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.DummyScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.AvailableSlotsTest;
import org.apache.flink.runtime.jobmaster.slotpool.DummyPayload;
import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSchedulingStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.SchedulingStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.clock.Clock;
import org.apache.flink.runtime.util.clock.ManualClock;
import org.apache.flink.types.SerializableOptional;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SlotPoolTest
extends TestLogger {
    private static final Logger LOG = LoggerFactory.getLogger(SlotPoolTest.class);
    private final Time timeout = Time.seconds((long)10L);
    private RpcService rpcService;
    private JobID jobId;
    private TaskManagerLocation taskManagerLocation;
    private SimpleAckingTaskManagerGateway taskManagerGateway;
    private TestingResourceManagerGateway resourceManagerGateway;

    @Before
    public void setUp() throws Exception {
        this.rpcService = new TestingRpcService();
        this.jobId = new JobID();
        this.taskManagerLocation = new LocalTaskManagerLocation();
        this.taskManagerGateway = new SimpleAckingTaskManagerGateway();
        this.resourceManagerGateway = new TestingResourceManagerGateway();
    }

    @After
    public void tearDown() throws Exception {
        RpcUtils.terminateRpcService((RpcService)this.rpcService, (Time)this.timeout);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAllocateSimpleSlot() throws Exception {
        CompletableFuture slotRequestFuture = new CompletableFuture();
        this.resourceManagerGateway.setRequestSlotConsumer(slotRequest -> slotRequestFuture.complete(slotRequest));
        SlotPool slotPool = new SlotPool(this.rpcService, this.jobId, (SchedulingStrategy)LocationPreferenceSchedulingStrategy.getInstance());
        try {
            SlotPoolGateway slotPoolGateway = SlotPoolTest.setupSlotPool(slotPool, this.resourceManagerGateway);
            slotPoolGateway.registerTaskManager(this.taskManagerLocation.getResourceID());
            SlotRequestId requestId = new SlotRequestId();
            CompletableFuture future = slotPoolGateway.allocateSlot(requestId, (ScheduledUnit)new DummyScheduledUnit(), SlotProfile.noLocality((ResourceProfile)AvailableSlotsTest.DEFAULT_TESTING_PROFILE), true, this.timeout);
            Assert.assertFalse((boolean)future.isDone());
            SlotRequest slotRequest2 = (SlotRequest)slotRequestFuture.get(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            SlotOffer slotOffer = new SlotOffer(slotRequest2.getAllocationId(), 0, AvailableSlotsTest.DEFAULT_TESTING_PROFILE);
            Assert.assertTrue((boolean)((Boolean)slotPoolGateway.offerSlot(this.taskManagerLocation, (TaskManagerGateway)this.taskManagerGateway, slotOffer).get()));
            LogicalSlot slot = (LogicalSlot)future.get(1L, TimeUnit.SECONDS);
            Assert.assertTrue((boolean)future.isDone());
            Assert.assertTrue((boolean)slot.isAlive());
            Assert.assertEquals((Object)this.taskManagerLocation, (Object)slot.getTaskManagerLocation());
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)slotPool, (Time)this.timeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAllocationFulfilledByReturnedSlot() throws Exception {
        ArrayBlockingQueue slotRequestQueue = new ArrayBlockingQueue(2);
        this.resourceManagerGateway.setRequestSlotConsumer(slotRequest -> {
            while (!slotRequestQueue.offer(slotRequest)) {
            }
        });
        SlotPool slotPool = new SlotPool(this.rpcService, this.jobId, (SchedulingStrategy)LocationPreferenceSchedulingStrategy.getInstance());
        try {
            SlotPoolGateway slotPoolGateway = SlotPoolTest.setupSlotPool(slotPool, this.resourceManagerGateway);
            slotPool.registerTaskManager(this.taskManagerLocation.getResourceID());
            CompletableFuture future1 = slotPoolGateway.allocateSlot(new SlotRequestId(), (ScheduledUnit)new DummyScheduledUnit(), SlotProfile.noLocality((ResourceProfile)AvailableSlotsTest.DEFAULT_TESTING_PROFILE), true, this.timeout);
            CompletableFuture future2 = slotPoolGateway.allocateSlot(new SlotRequestId(), (ScheduledUnit)new DummyScheduledUnit(), SlotProfile.noLocality((ResourceProfile)AvailableSlotsTest.DEFAULT_TESTING_PROFILE), true, this.timeout);
            Assert.assertFalse((boolean)future1.isDone());
            Assert.assertFalse((boolean)future2.isDone());
            ArrayList slotRequests = new ArrayList(2);
            for (int i = 0; i < 2; ++i) {
                slotRequests.add(slotRequestQueue.poll(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS));
            }
            SlotOffer slotOffer = new SlotOffer(((SlotRequest)slotRequests.get(0)).getAllocationId(), 0, AvailableSlotsTest.DEFAULT_TESTING_PROFILE);
            Assert.assertTrue((boolean)((Boolean)slotPoolGateway.offerSlot(this.taskManagerLocation, (TaskManagerGateway)this.taskManagerGateway, slotOffer).get()));
            LogicalSlot slot1 = (LogicalSlot)future1.get(1L, TimeUnit.SECONDS);
            Assert.assertTrue((boolean)future1.isDone());
            Assert.assertFalse((boolean)future2.isDone());
            slot1.releaseSlot();
            LogicalSlot slot2 = (LogicalSlot)future2.get(1L, TimeUnit.SECONDS);
            Assert.assertTrue((boolean)future2.isDone());
            Assert.assertNotEquals((Object)slot1, (Object)slot2);
            Assert.assertFalse((boolean)slot1.isAlive());
            Assert.assertTrue((boolean)slot2.isAlive());
            Assert.assertEquals((Object)slot1.getTaskManagerLocation(), (Object)slot2.getTaskManagerLocation());
            Assert.assertEquals((long)slot1.getPhysicalSlotNumber(), (long)slot2.getPhysicalSlotNumber());
            Assert.assertEquals((Object)slot1.getAllocationId(), (Object)slot2.getAllocationId());
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)slotPool, (Time)this.timeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAllocateWithFreeSlot() throws Exception {
        CompletableFuture slotRequestFuture = new CompletableFuture();
        this.resourceManagerGateway.setRequestSlotConsumer(slotRequest -> slotRequestFuture.complete(slotRequest));
        SlotPool slotPool = new SlotPool(this.rpcService, this.jobId, (SchedulingStrategy)LocationPreferenceSchedulingStrategy.getInstance());
        try {
            SlotPoolGateway slotPoolGateway = SlotPoolTest.setupSlotPool(slotPool, this.resourceManagerGateway);
            slotPoolGateway.registerTaskManager(this.taskManagerLocation.getResourceID());
            CompletableFuture future1 = slotPoolGateway.allocateSlot(new SlotRequestId(), (ScheduledUnit)new DummyScheduledUnit(), SlotProfile.noLocality((ResourceProfile)AvailableSlotsTest.DEFAULT_TESTING_PROFILE), true, this.timeout);
            Assert.assertFalse((boolean)future1.isDone());
            SlotRequest slotRequest2 = (SlotRequest)slotRequestFuture.get(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            SlotOffer slotOffer = new SlotOffer(slotRequest2.getAllocationId(), 0, AvailableSlotsTest.DEFAULT_TESTING_PROFILE);
            Assert.assertTrue((boolean)((Boolean)slotPoolGateway.offerSlot(this.taskManagerLocation, (TaskManagerGateway)this.taskManagerGateway, slotOffer).get()));
            LogicalSlot slot1 = (LogicalSlot)future1.get(1L, TimeUnit.SECONDS);
            Assert.assertTrue((boolean)future1.isDone());
            slot1.releaseSlot();
            CompletableFuture future2 = slotPoolGateway.allocateSlot(new SlotRequestId(), (ScheduledUnit)new DummyScheduledUnit(), SlotProfile.noLocality((ResourceProfile)AvailableSlotsTest.DEFAULT_TESTING_PROFILE), true, this.timeout);
            LogicalSlot slot2 = (LogicalSlot)future2.get(1L, TimeUnit.SECONDS);
            Assert.assertTrue((boolean)future2.isDone());
            Assert.assertNotEquals((Object)slot1, (Object)slot2);
            Assert.assertFalse((boolean)slot1.isAlive());
            Assert.assertTrue((boolean)slot2.isAlive());
            Assert.assertEquals((Object)slot1.getTaskManagerLocation(), (Object)slot2.getTaskManagerLocation());
            Assert.assertEquals((long)slot1.getPhysicalSlotNumber(), (long)slot2.getPhysicalSlotNumber());
        }
        finally {
            slotPool.shutDown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testOfferSlot() throws Exception {
        CompletableFuture slotRequestFuture = new CompletableFuture();
        this.resourceManagerGateway.setRequestSlotConsumer(slotRequest -> slotRequestFuture.complete(slotRequest));
        SlotPool slotPool = new SlotPool(this.rpcService, this.jobId, (SchedulingStrategy)LocationPreferenceSchedulingStrategy.getInstance());
        try {
            SlotPoolGateway slotPoolGateway = SlotPoolTest.setupSlotPool(slotPool, this.resourceManagerGateway);
            slotPoolGateway.registerTaskManager(this.taskManagerLocation.getResourceID());
            CompletableFuture future = slotPoolGateway.allocateSlot(new SlotRequestId(), (ScheduledUnit)new DummyScheduledUnit(), SlotProfile.noLocality((ResourceProfile)AvailableSlotsTest.DEFAULT_TESTING_PROFILE), true, this.timeout);
            Assert.assertFalse((boolean)future.isDone());
            SlotRequest slotRequest2 = (SlotRequest)slotRequestFuture.get(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            SlotOffer slotOffer = new SlotOffer(slotRequest2.getAllocationId(), 0, AvailableSlotsTest.DEFAULT_TESTING_PROFILE);
            LocalTaskManagerLocation invalidTaskManagerLocation = new LocalTaskManagerLocation();
            Assert.assertFalse((boolean)((Boolean)slotPoolGateway.offerSlot((TaskManagerLocation)invalidTaskManagerLocation, (TaskManagerGateway)this.taskManagerGateway, slotOffer).get()));
            SlotOffer nonRequestedSlotOffer = new SlotOffer(new AllocationID(), 0, AvailableSlotsTest.DEFAULT_TESTING_PROFILE);
            Assert.assertTrue((boolean)((Boolean)slotPoolGateway.offerSlot(this.taskManagerLocation, (TaskManagerGateway)this.taskManagerGateway, nonRequestedSlotOffer).get()));
            Assert.assertTrue((boolean)((Boolean)slotPoolGateway.offerSlot(this.taskManagerLocation, (TaskManagerGateway)this.taskManagerGateway, slotOffer).get()));
            LogicalSlot slot = (LogicalSlot)future.get(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            Assert.assertTrue((boolean)slot.isAlive());
            Assert.assertTrue((boolean)((Boolean)slotPoolGateway.offerSlot(this.taskManagerLocation, (TaskManagerGateway)this.taskManagerGateway, slotOffer).get()));
            Assert.assertTrue((boolean)slot.isAlive());
            SlotOffer anotherSlotOfferWithSameAllocationId = new SlotOffer(slotRequest2.getAllocationId(), 1, AvailableSlotsTest.DEFAULT_TESTING_PROFILE);
            Assert.assertFalse((boolean)((Boolean)slotPoolGateway.offerSlot(this.taskManagerLocation, (TaskManagerGateway)this.taskManagerGateway, anotherSlotOfferWithSameAllocationId).get()));
            LocalTaskManagerLocation anotherTaskManagerLocation = new LocalTaskManagerLocation();
            Assert.assertFalse((boolean)((Boolean)slotPoolGateway.offerSlot((TaskManagerLocation)anotherTaskManagerLocation, (TaskManagerGateway)this.taskManagerGateway, slotOffer).get()));
            slot.releaseSlot();
            Assert.assertTrue((boolean)((Boolean)slotPoolGateway.offerSlot(this.taskManagerLocation, (TaskManagerGateway)this.taskManagerGateway, slotOffer).get()));
            Assert.assertFalse((boolean)((Boolean)slotPoolGateway.offerSlot(this.taskManagerLocation, (TaskManagerGateway)this.taskManagerGateway, anotherSlotOfferWithSameAllocationId).get()));
            Assert.assertFalse((boolean)((Boolean)slotPoolGateway.offerSlot((TaskManagerLocation)anotherTaskManagerLocation, (TaskManagerGateway)this.taskManagerGateway, slotOffer).get()));
        }
        finally {
            slotPool.shutDown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReleaseResource() throws Exception {
        CompletableFuture slotRequestFuture = new CompletableFuture();
        this.resourceManagerGateway.setRequestSlotConsumer(slotRequest -> slotRequestFuture.complete(slotRequest));
        SlotPool slotPool = new SlotPool(this.rpcService, this.jobId, (SchedulingStrategy)LocationPreferenceSchedulingStrategy.getInstance());
        try {
            SlotPoolGateway slotPoolGateway = SlotPoolTest.setupSlotPool(slotPool, this.resourceManagerGateway);
            slotPoolGateway.registerTaskManager(this.taskManagerLocation.getResourceID());
            CompletableFuture future1 = slotPoolGateway.allocateSlot(new SlotRequestId(), (ScheduledUnit)new DummyScheduledUnit(), SlotProfile.noLocality((ResourceProfile)AvailableSlotsTest.DEFAULT_TESTING_PROFILE), true, this.timeout);
            SlotRequest slotRequest2 = (SlotRequest)slotRequestFuture.get(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            CompletableFuture future2 = slotPoolGateway.allocateSlot(new SlotRequestId(), (ScheduledUnit)new DummyScheduledUnit(), SlotProfile.noLocality((ResourceProfile)AvailableSlotsTest.DEFAULT_TESTING_PROFILE), true, this.timeout);
            SlotOffer slotOffer = new SlotOffer(slotRequest2.getAllocationId(), 0, AvailableSlotsTest.DEFAULT_TESTING_PROFILE);
            Assert.assertTrue((boolean)((Boolean)slotPoolGateway.offerSlot(this.taskManagerLocation, (TaskManagerGateway)this.taskManagerGateway, slotOffer).get()));
            LogicalSlot slot1 = (LogicalSlot)future1.get(1L, TimeUnit.SECONDS);
            Assert.assertTrue((boolean)future1.isDone());
            Assert.assertFalse((boolean)future2.isDone());
            CompletableFuture releaseFuture = new CompletableFuture();
            DummyPayload dummyPayload = new DummyPayload(releaseFuture);
            slot1.tryAssignPayload((LogicalSlot.Payload)dummyPayload);
            slotPoolGateway.releaseTaskManager(this.taskManagerLocation.getResourceID(), null);
            releaseFuture.get();
            Assert.assertFalse((boolean)slot1.isAlive());
            Thread.sleep(10L);
            Assert.assertFalse((boolean)future2.isDone());
        }
        finally {
            slotPool.shutDown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSlotRequestCancellationUponFailingRequest() throws Exception {
        SlotPool slotPool = new SlotPool(this.rpcService, this.jobId, (SchedulingStrategy)LocationPreferenceSchedulingStrategy.getInstance());
        CompletableFuture<Acknowledge> requestSlotFuture = new CompletableFuture<Acknowledge>();
        CompletableFuture cancelSlotFuture = new CompletableFuture();
        CompletableFuture requestSlotFutureAllocationId = new CompletableFuture();
        this.resourceManagerGateway.setRequestSlotFuture(requestSlotFuture);
        this.resourceManagerGateway.setRequestSlotConsumer(slotRequest -> requestSlotFutureAllocationId.complete(slotRequest.getAllocationId()));
        this.resourceManagerGateway.setCancelSlotConsumer(allocationID -> cancelSlotFuture.complete(allocationID));
        ScheduledUnit scheduledUnit = new ScheduledUnit(new JobVertexID(), null, null);
        try {
            SlotPoolGateway slotPoolGateway = SlotPoolTest.setupSlotPool(slotPool, this.resourceManagerGateway);
            SlotProfile slotProfile = new SlotProfile(ResourceProfile.UNKNOWN, Collections.emptyList(), Collections.emptyList());
            CompletableFuture slotFuture = slotPoolGateway.allocateSlot(new SlotRequestId(), scheduledUnit, slotProfile, true, this.timeout);
            requestSlotFuture.completeExceptionally((Throwable)new FlinkException("Testing exception."));
            try {
                slotFuture.get();
                Assert.fail((String)"The slot future should not have been completed properly.");
            }
            catch (Exception exception) {
                // empty catch block
            }
            Assert.assertEquals(requestSlotFutureAllocationId.get(), cancelSlotFuture.get());
        }
        finally {
            try {
                RpcUtils.terminateRpcEndpoint((RpcEndpoint)slotPool, (Time)this.timeout);
            }
            catch (Exception e) {
                LOG.warn("Could not properly terminate the SlotPool.", (Throwable)e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws Exception {
        SlotPool slotPool = new SlotPool(this.rpcService, this.jobId, (SchedulingStrategy)LocationPreferenceSchedulingStrategy.getInstance());
        ArrayBlockingQueue allocationIds = new ArrayBlockingQueue(2);
        this.resourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocationIds.offer(slotRequest.getAllocationId()));
        ArrayBlockingQueue canceledAllocations = new ArrayBlockingQueue(2);
        this.resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::offer);
        SlotRequestId slotRequestId1 = new SlotRequestId();
        SlotRequestId slotRequestId2 = new SlotRequestId();
        try {
            SlotPoolGateway slotPoolGateway = SlotPoolTest.setupSlotPool(slotPool, this.resourceManagerGateway);
            ScheduledUnit scheduledUnit = new ScheduledUnit(new JobVertexID(), null, null);
            CompletableFuture slotFuture1 = slotPoolGateway.allocateSlot(slotRequestId1, scheduledUnit, SlotProfile.noRequirements(), true, this.timeout);
            AllocationID allocationId1 = (AllocationID)allocationIds.take();
            CompletableFuture slotFuture2 = slotPoolGateway.allocateSlot(slotRequestId2, scheduledUnit, SlotProfile.noRequirements(), true, this.timeout);
            AllocationID allocationId2 = (AllocationID)allocationIds.take();
            slotPoolGateway.releaseSlot(slotRequestId1, null, null);
            try {
                slotFuture1.get();
                Assert.fail((String)"The first slot future should have failed because it was cancelled.");
            }
            catch (ExecutionException ee) {
                Assert.assertTrue((boolean)(ExceptionUtils.stripExecutionException((Throwable)ee) instanceof FlinkException));
            }
            Assert.assertEquals((Object)allocationId1, canceledAllocations.take());
            SlotOffer slotOffer = new SlotOffer(allocationId1, 0, ResourceProfile.UNKNOWN);
            slotPoolGateway.registerTaskManager(this.taskManagerLocation.getResourceID()).get();
            Assert.assertTrue((boolean)((Boolean)slotPoolGateway.offerSlot(this.taskManagerLocation, (TaskManagerGateway)this.taskManagerGateway, slotOffer).get()));
            Assert.assertEquals((Object)allocationId1, (Object)((LogicalSlot)slotFuture2.get()).getAllocationId());
            Assert.assertEquals((Object)allocationId2, canceledAllocations.take());
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)slotPool, (Time)this.timeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testShutdownReleasesAllSlots() throws Exception {
        SlotPool slotPool = new SlotPool(this.rpcService, this.jobId, (SchedulingStrategy)LocationPreferenceSchedulingStrategy.getInstance());
        try {
            SlotPoolGateway slotPoolGateway = SlotPoolTest.setupSlotPool(slotPool, this.resourceManagerGateway);
            slotPoolGateway.registerTaskManager(this.taskManagerLocation.getResourceID());
            int numSlotOffers = 2;
            ArrayList<SlotOffer> slotOffers = new ArrayList<SlotOffer>(2);
            for (int i = 0; i < 2; ++i) {
                slotOffers.add(new SlotOffer(new AllocationID(), i, ResourceProfile.UNKNOWN));
            }
            ArrayBlockingQueue freedSlotQueue = new ArrayBlockingQueue(2);
            this.taskManagerGateway.setFreeSlotFunction((allocationID, cause) -> {
                try {
                    freedSlotQueue.put(allocationID);
                    return CompletableFuture.completedFuture(Acknowledge.get());
                }
                catch (InterruptedException e) {
                    return FutureUtils.completedExceptionally((Throwable)e);
                }
            });
            CompletableFuture acceptedSlotOffersFuture = slotPoolGateway.offerSlots(this.taskManagerLocation, (TaskManagerGateway)this.taskManagerGateway, slotOffers);
            Collection acceptedSlotOffers = (Collection)acceptedSlotOffersFuture.get(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            MatcherAssert.assertThat((Object)acceptedSlotOffers, (Matcher)Matchers.equalTo(slotOffers));
            slotPool.shutDown();
            slotPool.getTerminationFuture().get(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            ArrayList freedSlots = new ArrayList(2);
            while (freedSlots.size() < 2) {
                freedSlotQueue.drainTo(freedSlots);
            }
            MatcherAssert.assertThat(freedSlots, (Matcher)Matchers.containsInAnyOrder((Object[])slotOffers.stream().map(SlotOffer::getAllocationId).toArray()));
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)slotPool, (Time)this.timeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCheckIdleSlot() throws Exception {
        ManualClock clock = new ManualClock();
        SlotPool slotPool = new SlotPool(this.rpcService, this.jobId, (SchedulingStrategy)LocationPreferenceSchedulingStrategy.getInstance(), (Clock)clock, TestingUtils.infiniteTime(), this.timeout);
        try {
            ArrayBlockingQueue freedSlots = new ArrayBlockingQueue(1);
            this.taskManagerGateway.setFreeSlotFunction((allocationId, cause) -> {
                try {
                    freedSlots.put(allocationId);
                    return CompletableFuture.completedFuture(Acknowledge.get());
                }
                catch (InterruptedException e) {
                    return FutureUtils.completedExceptionally((Throwable)e);
                }
            });
            SlotPoolGateway slotPoolGateway = SlotPoolTest.setupSlotPool(slotPool, this.resourceManagerGateway);
            AllocationID expiredSlotID = new AllocationID();
            AllocationID freshSlotID = new AllocationID();
            SlotOffer slotToExpire = new SlotOffer(expiredSlotID, 0, ResourceProfile.UNKNOWN);
            SlotOffer slotToNotExpire = new SlotOffer(freshSlotID, 1, ResourceProfile.UNKNOWN);
            MatcherAssert.assertThat(slotPoolGateway.registerTaskManager(this.taskManagerLocation.getResourceID()).get(), (Matcher)Matchers.is((Object)Acknowledge.get()));
            MatcherAssert.assertThat(slotPoolGateway.offerSlot(this.taskManagerLocation, (TaskManagerGateway)this.taskManagerGateway, slotToExpire).get(), (Matcher)Matchers.is((Object)true));
            clock.advanceTime(this.timeout.toMilliseconds() - 1L, TimeUnit.MILLISECONDS);
            MatcherAssert.assertThat(slotPoolGateway.offerSlot(this.taskManagerLocation, (TaskManagerGateway)this.taskManagerGateway, slotToNotExpire).get(), (Matcher)Matchers.is((Object)true));
            clock.advanceTime(1L, TimeUnit.MILLISECONDS);
            slotPool.triggerCheckIdleSlot();
            AllocationID freedSlot = (AllocationID)freedSlots.poll(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            MatcherAssert.assertThat((Object)freedSlot, (Matcher)Matchers.is((Object)expiredSlotID));
            MatcherAssert.assertThat((Object)freedSlots.isEmpty(), (Matcher)Matchers.is((Object)true));
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)slotPool, (Time)this.timeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReleasingIdleSlotFailed() throws Exception {
        ManualClock clock = new ManualClock();
        SlotPool slotPool = new SlotPool(this.rpcService, this.jobId, (SchedulingStrategy)LocationPreferenceSchedulingStrategy.getInstance(), (Clock)clock, TestingUtils.infiniteTime(), this.timeout);
        try {
            SlotPoolGateway slotPoolGateway = SlotPoolTest.setupSlotPool(slotPool, this.resourceManagerGateway);
            AllocationID expiredAllocationId = new AllocationID();
            SlotOffer slotToExpire = new SlotOffer(expiredAllocationId, 0, ResourceProfile.UNKNOWN);
            ArrayDeque responseQueue = new ArrayDeque(2);
            this.taskManagerGateway.setFreeSlotFunction((allocationId, cause) -> {
                if (responseQueue.isEmpty()) {
                    return CompletableFuture.completedFuture(Acknowledge.get());
                }
                return (CompletableFuture)responseQueue.pop();
            });
            responseQueue.add(FutureUtils.completedExceptionally((Throwable)new FlinkException("Test failure")));
            CompletableFuture responseFuture = new CompletableFuture();
            responseQueue.add(responseFuture);
            MatcherAssert.assertThat(slotPoolGateway.registerTaskManager(this.taskManagerLocation.getResourceID()).get(), (Matcher)Matchers.is((Object)Acknowledge.get()));
            MatcherAssert.assertThat(slotPoolGateway.offerSlot(this.taskManagerLocation, (TaskManagerGateway)this.taskManagerGateway, slotToExpire).get(), (Matcher)Matchers.is((Object)true));
            clock.advanceTime(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            slotPool.triggerCheckIdleSlot();
            CompletableFuture<LogicalSlot> allocatedSlotFuture = this.allocateSlot(slotPoolGateway, new SlotRequestId());
            LogicalSlot logicalSlot = allocatedSlotFuture.get();
            MatcherAssert.assertThat((Object)logicalSlot.getAllocationId(), (Matcher)Matchers.is((Object)expiredAllocationId));
            slotPool.getSlotOwner().returnAllocatedSlot(logicalSlot).get();
            clock.advanceTime(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            slotPool.triggerCheckIdleSlot();
            allocatedSlotFuture = this.allocateSlot(slotPoolGateway, new SlotRequestId());
            slotPoolGateway.releaseTaskManager(this.taskManagerLocation.getResourceID(), null).get();
            responseFuture.completeExceptionally((Throwable)new FlinkException("Second test exception"));
            try {
                allocatedSlotFuture.get(10L, TimeUnit.MILLISECONDS);
                Assert.fail((String)"Expected to fail with a timeout.");
            }
            catch (TimeoutException timeoutException) {
                // empty catch block
            }
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)slotPool, (Time)this.timeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFreeFailedSlots() throws Exception {
        SlotPool slotPool = new SlotPool(this.rpcService, this.jobId, (SchedulingStrategy)LocationPreferenceSchedulingStrategy.getInstance());
        try {
            int parallelism = 5;
            ArrayBlockingQueue allocationIds = new ArrayBlockingQueue(5);
            this.resourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocationIds.offer(slotRequest.getAllocationId()));
            SlotPoolGateway slotPoolGateway = SlotPoolTest.setupSlotPool(slotPool, this.resourceManagerGateway);
            HashMap<SlotRequestId, CompletableFuture<LogicalSlot>> slotRequestFutures = new HashMap<SlotRequestId, CompletableFuture<LogicalSlot>>(5);
            for (int i = 0; i < 5; ++i) {
                SlotRequestId slotRequestId = new SlotRequestId();
                slotRequestFutures.put(slotRequestId, this.allocateSlot(slotPoolGateway, slotRequestId));
            }
            ArrayList<SlotOffer> slotOffers = new ArrayList<SlotOffer>(5);
            for (int i = 0; i < 5; ++i) {
                slotOffers.add(new SlotOffer((AllocationID)allocationIds.take(), i, ResourceProfile.UNKNOWN));
            }
            slotPoolGateway.registerTaskManager(this.taskManagerLocation.getResourceID());
            slotPoolGateway.offerSlots(this.taskManagerLocation, (TaskManagerGateway)this.taskManagerGateway, slotOffers);
            FutureUtils.waitForAll(slotRequestFutures.values()).get();
            ArrayBlockingQueue freedSlots = new ArrayBlockingQueue(1);
            this.taskManagerGateway.setFreeSlotFunction((allocationID, throwable) -> {
                freedSlots.offer(allocationID);
                return CompletableFuture.completedFuture(Acknowledge.get());
            });
            FlinkException failException = new FlinkException("Test fail exception");
            for (int i = 0; i < 4; ++i) {
                SlotOffer slotOffer = (SlotOffer)slotOffers.get(i);
                CompletableFuture emptyTaskExecutorFuture = slotPoolGateway.failAllocation(slotOffer.getAllocationId(), (Exception)failException);
                MatcherAssert.assertThat((Object)((SerializableOptional)emptyTaskExecutorFuture.get()).isPresent(), (Matcher)Matchers.is((Object)false));
                MatcherAssert.assertThat(freedSlots.take(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)slotOffer.getAllocationId())));
            }
            SlotOffer slotOffer = (SlotOffer)slotOffers.get(4);
            CompletableFuture emptyTaskExecutorFuture = slotPoolGateway.failAllocation(slotOffer.getAllocationId(), (Exception)failException);
            MatcherAssert.assertThat((Object)((SerializableOptional)emptyTaskExecutorFuture.get()).get(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)this.taskManagerLocation.getResourceID())));
            MatcherAssert.assertThat(freedSlots.take(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)slotOffer.getAllocationId())));
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)slotPool, (Time)this.timeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFailingAllocationFailsPendingSlotRequests() throws Exception {
        SlotPool slotPool = new SlotPool(this.rpcService, this.jobId, (SchedulingStrategy)LocationPreferenceSchedulingStrategy.getInstance());
        try {
            CompletableFuture allocationIdFuture = new CompletableFuture();
            this.resourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocationIdFuture.complete(slotRequest.getAllocationId()));
            SlotPoolGateway slotPoolGateway = SlotPoolTest.setupSlotPool(slotPool, this.resourceManagerGateway);
            CompletableFuture<LogicalSlot> slotFuture = this.allocateSlot(slotPoolGateway, new SlotRequestId());
            AllocationID allocationId = (AllocationID)allocationIdFuture.get();
            MatcherAssert.assertThat((Object)slotFuture.isDone(), (Matcher)Matchers.is((Object)false));
            FlinkException cause = new FlinkException("Fail pending slot request failure.");
            CompletableFuture responseFuture = slotPoolGateway.failAllocation(allocationId, (Exception)cause);
            MatcherAssert.assertThat((Object)((SerializableOptional)responseFuture.get()).isPresent(), (Matcher)Matchers.is((Object)false));
            try {
                slotFuture.get();
                Assert.fail((String)"Expected a slot allocation failure.");
            }
            catch (ExecutionException ee) {
                MatcherAssert.assertThat((Object)ExceptionUtils.stripExecutionException((Throwable)ee), (Matcher)Matchers.equalTo((Object)cause));
            }
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)slotPool, (Time)this.timeout);
        }
    }

    private CompletableFuture<LogicalSlot> allocateSlot(SlotPoolGateway slotPoolGateway, SlotRequestId slotRequestId) {
        return slotPoolGateway.allocateSlot(slotRequestId, (ScheduledUnit)new DummyScheduledUnit(), SlotProfile.noRequirements(), true, this.timeout);
    }

    private static SlotPoolGateway setupSlotPool(SlotPool slotPool, ResourceManagerGateway resourceManagerGateway) throws Exception {
        String jobManagerAddress = "foobar";
        slotPool.start(JobMasterId.generate(), "foobar");
        slotPool.connectToResourceManager(resourceManagerGateway);
        return (SlotPoolGateway)slotPool.getSelfGateway(SlotPoolGateway.class);
    }
}

