/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.instance;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.instance.AvailableSlotsTest;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.Slot;
import org.apache.flink.runtime.instance.SlotPool;
import org.apache.flink.runtime.instance.SlotPoolGateway;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class SlotPoolTest
extends TestLogger {
    private final Time timeout = Time.seconds((long)10L);
    private RpcService rpcService;
    private JobID jobId;

    @Before
    public void setUp() throws Exception {
        this.rpcService = new TestingRpcService();
        this.jobId = new JobID();
    }

    @After
    public void tearDown() throws Exception {
        this.rpcService.stopService();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAllocateSimpleSlot() throws Exception {
        ResourceManagerGateway resourceManagerGateway = SlotPoolTest.createResourceManagerGatewayMock();
        SlotPool slotPool = new SlotPool(this.rpcService, this.jobId);
        try {
            SlotPoolGateway slotPoolGateway = SlotPoolTest.setupSlotPool(slotPool, resourceManagerGateway);
            ResourceID resourceID = new ResourceID("resource");
            slotPoolGateway.registerTaskManager(resourceID);
            ScheduledUnit task = (ScheduledUnit)Mockito.mock(ScheduledUnit.class);
            CompletableFuture future = slotPoolGateway.allocateSlot(task, AvailableSlotsTest.DEFAULT_TESTING_PROFILE, null, this.timeout);
            Assert.assertFalse((boolean)future.isDone());
            ArgumentCaptor slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
            ((ResourceManagerGateway)Mockito.verify((Object)resourceManagerGateway, (VerificationMode)Mockito.timeout((long)this.timeout.toMilliseconds()))).requestSlot((JobMasterId)Matchers.any(JobMasterId.class), (SlotRequest)slotRequestArgumentCaptor.capture(), (Time)Matchers.any(Time.class));
            SlotRequest slotRequest = (SlotRequest)slotRequestArgumentCaptor.getValue();
            AllocatedSlot allocatedSlot = SlotPoolTest.createAllocatedSlot(resourceID, slotRequest.getAllocationId(), this.jobId, AvailableSlotsTest.DEFAULT_TESTING_PROFILE);
            Assert.assertTrue((boolean)((Boolean)slotPoolGateway.offerSlot(allocatedSlot).get()));
            SimpleSlot slot = (SimpleSlot)future.get(1L, TimeUnit.SECONDS);
            Assert.assertTrue((boolean)future.isDone());
            Assert.assertTrue((boolean)slot.isAlive());
            Assert.assertEquals((Object)resourceID, (Object)slot.getTaskManagerID());
            Assert.assertEquals((Object)this.jobId, (Object)slot.getJobID());
            Assert.assertEquals((Object)slotPool.getSlotOwner(), (Object)slot.getOwner());
            Assert.assertEquals((Object)slotPool.getAllocatedSlots().get(slot.getAllocatedSlot().getSlotAllocationId()), (Object)slot);
        }
        finally {
            slotPool.shutDown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAllocationFulfilledByReturnedSlot() throws Exception {
        ResourceManagerGateway resourceManagerGateway = SlotPoolTest.createResourceManagerGatewayMock();
        SlotPool slotPool = new SlotPool(this.rpcService, this.jobId);
        try {
            SlotPoolGateway slotPoolGateway = SlotPoolTest.setupSlotPool(slotPool, resourceManagerGateway);
            ResourceID resourceID = new ResourceID("resource");
            slotPool.registerTaskManager(resourceID);
            CompletableFuture future1 = slotPoolGateway.allocateSlot((ScheduledUnit)Mockito.mock(ScheduledUnit.class), AvailableSlotsTest.DEFAULT_TESTING_PROFILE, null, this.timeout);
            CompletableFuture future2 = slotPoolGateway.allocateSlot((ScheduledUnit)Mockito.mock(ScheduledUnit.class), AvailableSlotsTest.DEFAULT_TESTING_PROFILE, null, this.timeout);
            Assert.assertFalse((boolean)future1.isDone());
            Assert.assertFalse((boolean)future2.isDone());
            ArgumentCaptor slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
            ((ResourceManagerGateway)Mockito.verify((Object)resourceManagerGateway, (VerificationMode)Mockito.timeout((long)this.timeout.toMilliseconds()).times(2))).requestSlot((JobMasterId)Matchers.any(JobMasterId.class), (SlotRequest)slotRequestArgumentCaptor.capture(), (Time)Matchers.any(Time.class));
            List slotRequests = slotRequestArgumentCaptor.getAllValues();
            AllocatedSlot allocatedSlot = SlotPoolTest.createAllocatedSlot(resourceID, ((SlotRequest)slotRequests.get(0)).getAllocationId(), this.jobId, AvailableSlotsTest.DEFAULT_TESTING_PROFILE);
            Assert.assertTrue((boolean)((Boolean)slotPoolGateway.offerSlot(allocatedSlot).get()));
            SimpleSlot slot1 = (SimpleSlot)future1.get(1L, TimeUnit.SECONDS);
            Assert.assertTrue((boolean)future1.isDone());
            Assert.assertFalse((boolean)future2.isDone());
            slot1.releaseSlot();
            SimpleSlot slot2 = (SimpleSlot)future2.get(1L, TimeUnit.SECONDS);
            Assert.assertTrue((boolean)future2.isDone());
            Assert.assertNotEquals((Object)slot1, (Object)slot2);
            Assert.assertTrue((boolean)slot1.isReleased());
            Assert.assertTrue((boolean)slot2.isAlive());
            Assert.assertEquals((Object)slot1.getTaskManagerID(), (Object)slot2.getTaskManagerID());
            Assert.assertEquals((long)slot1.getSlotNumber(), (long)slot2.getSlotNumber());
            Assert.assertEquals((Object)slotPool.getAllocatedSlots().get(slot1.getAllocatedSlot().getSlotAllocationId()), (Object)slot2);
        }
        finally {
            slotPool.shutDown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAllocateWithFreeSlot() throws Exception {
        ResourceManagerGateway resourceManagerGateway = SlotPoolTest.createResourceManagerGatewayMock();
        SlotPool slotPool = new SlotPool(this.rpcService, this.jobId);
        try {
            SlotPoolGateway slotPoolGateway = SlotPoolTest.setupSlotPool(slotPool, resourceManagerGateway);
            ResourceID resourceID = new ResourceID("resource");
            slotPoolGateway.registerTaskManager(resourceID);
            CompletableFuture future1 = slotPoolGateway.allocateSlot((ScheduledUnit)Mockito.mock(ScheduledUnit.class), AvailableSlotsTest.DEFAULT_TESTING_PROFILE, null, this.timeout);
            Assert.assertFalse((boolean)future1.isDone());
            ArgumentCaptor slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
            ((ResourceManagerGateway)Mockito.verify((Object)resourceManagerGateway, (VerificationMode)Mockito.timeout((long)this.timeout.toMilliseconds()))).requestSlot((JobMasterId)Matchers.any(JobMasterId.class), (SlotRequest)slotRequestArgumentCaptor.capture(), (Time)Matchers.any(Time.class));
            SlotRequest slotRequest = (SlotRequest)slotRequestArgumentCaptor.getValue();
            AllocatedSlot allocatedSlot = SlotPoolTest.createAllocatedSlot(resourceID, slotRequest.getAllocationId(), this.jobId, AvailableSlotsTest.DEFAULT_TESTING_PROFILE);
            Assert.assertTrue((boolean)((Boolean)slotPoolGateway.offerSlot(allocatedSlot).get()));
            SimpleSlot slot1 = (SimpleSlot)future1.get(1L, TimeUnit.SECONDS);
            Assert.assertTrue((boolean)future1.isDone());
            slot1.releaseSlot();
            CompletableFuture future2 = slotPoolGateway.allocateSlot((ScheduledUnit)Mockito.mock(ScheduledUnit.class), AvailableSlotsTest.DEFAULT_TESTING_PROFILE, null, this.timeout);
            SimpleSlot slot2 = (SimpleSlot)future2.get(1L, TimeUnit.SECONDS);
            Assert.assertTrue((boolean)future2.isDone());
            Assert.assertNotEquals((Object)slot1, (Object)slot2);
            Assert.assertTrue((boolean)slot1.isReleased());
            Assert.assertTrue((boolean)slot2.isAlive());
            Assert.assertEquals((Object)slot1.getTaskManagerID(), (Object)slot2.getTaskManagerID());
            Assert.assertEquals((long)slot1.getSlotNumber(), (long)slot2.getSlotNumber());
        }
        finally {
            slotPool.shutDown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testOfferSlot() throws Exception {
        ResourceManagerGateway resourceManagerGateway = SlotPoolTest.createResourceManagerGatewayMock();
        SlotPool slotPool = new SlotPool(this.rpcService, this.jobId);
        try {
            SlotPoolGateway slotPoolGateway = SlotPoolTest.setupSlotPool(slotPool, resourceManagerGateway);
            ResourceID resourceID = new ResourceID("resource");
            slotPoolGateway.registerTaskManager(resourceID);
            CompletableFuture future = slotPoolGateway.allocateSlot((ScheduledUnit)Mockito.mock(ScheduledUnit.class), AvailableSlotsTest.DEFAULT_TESTING_PROFILE, null, this.timeout);
            Assert.assertFalse((boolean)future.isDone());
            ArgumentCaptor slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
            ((ResourceManagerGateway)Mockito.verify((Object)resourceManagerGateway, (VerificationMode)Mockito.timeout((long)this.timeout.toMilliseconds()))).requestSlot((JobMasterId)Matchers.any(JobMasterId.class), (SlotRequest)slotRequestArgumentCaptor.capture(), (Time)Matchers.any(Time.class));
            SlotRequest slotRequest = (SlotRequest)slotRequestArgumentCaptor.getValue();
            AllocatedSlot invalid = SlotPoolTest.createAllocatedSlot(new ResourceID("unregistered"), slotRequest.getAllocationId(), this.jobId, AvailableSlotsTest.DEFAULT_TESTING_PROFILE);
            Assert.assertFalse((boolean)((Boolean)slotPoolGateway.offerSlot(invalid).get()));
            AllocatedSlot notRequested = SlotPoolTest.createAllocatedSlot(resourceID, new AllocationID(), this.jobId, AvailableSlotsTest.DEFAULT_TESTING_PROFILE);
            Assert.assertTrue((boolean)((Boolean)slotPoolGateway.offerSlot(notRequested).get()));
            AllocatedSlot allocatedSlot = SlotPoolTest.createAllocatedSlot(resourceID, slotRequest.getAllocationId(), this.jobId, AvailableSlotsTest.DEFAULT_TESTING_PROFILE);
            Assert.assertTrue((boolean)((Boolean)slotPoolGateway.offerSlot(allocatedSlot).get()));
            SimpleSlot slot = (SimpleSlot)future.get(this.timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            Assert.assertTrue((boolean)slot.isAlive());
            Assert.assertTrue((boolean)((Boolean)slotPoolGateway.offerSlot(allocatedSlot).get()));
            Assert.assertTrue((boolean)slot.isAlive());
            slot.releaseSlot();
            Assert.assertTrue((boolean)((Boolean)slotPoolGateway.offerSlot(allocatedSlot).get()));
        }
        finally {
            slotPool.shutDown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReleaseResource() throws Exception {
        ResourceManagerGateway resourceManagerGateway = SlotPoolTest.createResourceManagerGatewayMock();
        final CompletableFuture slotReturnFuture = new CompletableFuture();
        SlotPool slotPool = new SlotPool(this.rpcService, this.jobId){

            public void returnAllocatedSlot(Slot slot) {
                super.returnAllocatedSlot(slot);
                slotReturnFuture.complete(true);
            }
        };
        try {
            SlotPoolGateway slotPoolGateway = SlotPoolTest.setupSlotPool(slotPool, resourceManagerGateway);
            ResourceID resourceID = new ResourceID("resource");
            slotPoolGateway.registerTaskManager(resourceID);
            CompletableFuture future1 = slotPoolGateway.allocateSlot((ScheduledUnit)Mockito.mock(ScheduledUnit.class), AvailableSlotsTest.DEFAULT_TESTING_PROFILE, null, this.timeout);
            ArgumentCaptor slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
            ((ResourceManagerGateway)Mockito.verify((Object)resourceManagerGateway, (VerificationMode)Mockito.timeout((long)this.timeout.toMilliseconds()))).requestSlot((JobMasterId)Matchers.any(JobMasterId.class), (SlotRequest)slotRequestArgumentCaptor.capture(), (Time)Matchers.any(Time.class));
            SlotRequest slotRequest = (SlotRequest)slotRequestArgumentCaptor.getValue();
            CompletableFuture future2 = slotPoolGateway.allocateSlot((ScheduledUnit)Mockito.mock(ScheduledUnit.class), AvailableSlotsTest.DEFAULT_TESTING_PROFILE, null, this.timeout);
            AllocatedSlot allocatedSlot = SlotPoolTest.createAllocatedSlot(resourceID, slotRequest.getAllocationId(), this.jobId, AvailableSlotsTest.DEFAULT_TESTING_PROFILE);
            Assert.assertTrue((boolean)((Boolean)slotPoolGateway.offerSlot(allocatedSlot).get()));
            SimpleSlot slot1 = (SimpleSlot)future1.get(1L, TimeUnit.SECONDS);
            Assert.assertTrue((boolean)future1.isDone());
            Assert.assertFalse((boolean)future2.isDone());
            slotPoolGateway.releaseTaskManager(resourceID);
            slotReturnFuture.get();
            Assert.assertTrue((boolean)slot1.isReleased());
            Thread.sleep(10L);
            Assert.assertFalse((boolean)future2.isDone());
        }
        finally {
            slotPool.shutDown();
        }
    }

    private static ResourceManagerGateway createResourceManagerGatewayMock() {
        ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway)Mockito.mock(ResourceManagerGateway.class);
        Mockito.when((Object)resourceManagerGateway.requestSlot((JobMasterId)Matchers.any(JobMasterId.class), (SlotRequest)Matchers.any(SlotRequest.class), (Time)Matchers.any(Time.class))).thenReturn(Mockito.mock(CompletableFuture.class, (Answer)Mockito.RETURNS_MOCKS));
        return resourceManagerGateway;
    }

    private static SlotPoolGateway setupSlotPool(SlotPool slotPool, ResourceManagerGateway resourceManagerGateway) throws Exception {
        String jobManagerAddress = "foobar";
        slotPool.start(JobMasterId.generate(), "foobar");
        slotPool.connectToResourceManager(resourceManagerGateway);
        return (SlotPoolGateway)slotPool.getSelfGateway(SlotPoolGateway.class);
    }

    static AllocatedSlot createAllocatedSlot(ResourceID resourceId, AllocationID allocationId, JobID jobId, ResourceProfile resourceProfile) {
        TaskManagerLocation mockTaskManagerLocation = (TaskManagerLocation)Mockito.mock(TaskManagerLocation.class);
        Mockito.when((Object)mockTaskManagerLocation.getResourceID()).thenReturn((Object)resourceId);
        TaskManagerGateway mockTaskManagerGateway = (TaskManagerGateway)Mockito.mock(TaskManagerGateway.class);
        return new AllocatedSlot(allocationId, jobId, mockTaskManagerLocation, 0, resourceProfile, mockTaskManagerGateway);
    }
}

