/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster.slotpool;

import akka.actor.ActorSystem;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmanager.scheduler.DummyScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.AvailableSlotsTest;
import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSchedulingStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.SchedulingStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.clock.Clock;
import org.apache.flink.runtime.util.clock.SystemClock;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class SlotPoolRpcTest
extends TestLogger {
    private static RpcService rpcService;
    private static final Time timeout;
    private static final Time fastTimeout;

    @BeforeClass
    public static void setup() {
        ActorSystem actorSystem = AkkaUtils.createLocalActorSystem((Configuration)new Configuration());
        rpcService = new AkkaRpcService(actorSystem, Time.seconds((long)10L));
    }

    @AfterClass
    public static void shutdown() throws InterruptedException, ExecutionException, TimeoutException {
        if (rpcService != null) {
            RpcUtils.terminateRpcService((RpcService)rpcService, (Time)timeout);
            rpcService = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSlotAllocationNoResourceManager() throws Exception {
        JobID jid = new JobID();
        SlotPool pool = new SlotPool(rpcService, jid, (SchedulingStrategy)LocationPreferenceSchedulingStrategy.getInstance(), (Clock)SystemClock.getInstance(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime());
        try {
            pool.start(JobMasterId.generate(), "foobar");
            CompletableFuture future = pool.allocateSlot(new SlotRequestId(), new ScheduledUnit(SchedulerTestUtils.getDummyTask()), SlotProfile.noLocality((ResourceProfile)AvailableSlotsTest.DEFAULT_TESTING_PROFILE), true, fastTimeout);
            try {
                future.get();
                Assert.fail((String)"We expected an ExecutionException.");
            }
            catch (ExecutionException e) {
                Assert.assertTrue((boolean)(ExceptionUtils.stripExecutionException((Throwable)e) instanceof TimeoutException));
            }
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)pool, (Time)timeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCancelSlotAllocationWithoutResourceManager() throws Exception {
        JobID jid = new JobID();
        TestingSlotPool pool = new TestingSlotPool(rpcService, jid, (Clock)SystemClock.getInstance(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime());
        try {
            CompletableFuture timeoutFuture = new CompletableFuture();
            pool.setTimeoutPendingSlotRequestConsumer(slotRequestId -> timeoutFuture.complete(slotRequestId));
            pool.start(JobMasterId.generate(), "foobar");
            SlotPoolGateway slotPoolGateway = (SlotPoolGateway)pool.getSelfGateway(SlotPoolGateway.class);
            SlotRequestId requestId = new SlotRequestId();
            CompletableFuture future = slotPoolGateway.allocateSlot(requestId, new ScheduledUnit(SchedulerTestUtils.getDummyTask()), SlotProfile.noLocality((ResourceProfile)AvailableSlotsTest.DEFAULT_TESTING_PROFILE), true, fastTimeout);
            try {
                future.get();
                Assert.fail((String)"We expected a TimeoutException.");
            }
            catch (ExecutionException e) {
                Assert.assertTrue((boolean)(ExceptionUtils.stripExecutionException((Throwable)e) instanceof TimeoutException));
            }
            timeoutFuture.get();
            Assert.assertEquals((long)0L, (long)pool.getNumberOfWaitingForResourceRequests().get().intValue());
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)pool, (Time)timeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSlotAllocationTimeout() throws Exception {
        JobID jid = new JobID();
        TestingSlotPool pool = new TestingSlotPool(rpcService, jid, (Clock)SystemClock.getInstance(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime());
        try {
            pool.start(JobMasterId.generate(), "foobar");
            SlotPoolGateway slotPoolGateway = (SlotPoolGateway)pool.getSelfGateway(SlotPoolGateway.class);
            CompletableFuture slotRequestTimeoutFuture = new CompletableFuture();
            pool.setTimeoutPendingSlotRequestConsumer(slotRequestTimeoutFuture::complete);
            TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
            pool.connectToResourceManager(resourceManagerGateway);
            SlotRequestId requestId = new SlotRequestId();
            CompletableFuture future = slotPoolGateway.allocateSlot(requestId, (ScheduledUnit)new DummyScheduledUnit(), SlotProfile.noLocality((ResourceProfile)AvailableSlotsTest.DEFAULT_TESTING_PROFILE), true, fastTimeout);
            try {
                future.get();
                Assert.fail((String)"We expected a TimeoutException.");
            }
            catch (ExecutionException e) {
                Assert.assertTrue((boolean)(ExceptionUtils.stripExecutionException((Throwable)e) instanceof TimeoutException));
            }
            slotRequestTimeoutFuture.get();
            Assert.assertEquals((long)0L, (long)pool.getNumberOfPendingRequests().get().intValue());
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)pool, (Time)timeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testExtraSlotsAreKept() throws Exception {
        JobID jid = new JobID();
        TestingSlotPool pool = new TestingSlotPool(rpcService, jid, (Clock)SystemClock.getInstance(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime());
        try {
            pool.start(JobMasterId.generate(), "foobar");
            SlotPoolGateway slotPoolGateway = (SlotPoolGateway)pool.getSelfGateway(SlotPoolGateway.class);
            CompletableFuture allocationIdFuture = new CompletableFuture();
            TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
            resourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocationIdFuture.complete(slotRequest.getAllocationId()));
            CompletableFuture slotRequestTimeoutFuture = new CompletableFuture();
            pool.setTimeoutPendingSlotRequestConsumer(slotRequestTimeoutFuture::complete);
            pool.connectToResourceManager(resourceManagerGateway);
            SlotRequestId requestId = new SlotRequestId();
            CompletableFuture future = slotPoolGateway.allocateSlot(requestId, new ScheduledUnit(SchedulerTestUtils.getDummyTask()), SlotProfile.noLocality((ResourceProfile)AvailableSlotsTest.DEFAULT_TESTING_PROFILE), true, fastTimeout);
            try {
                future.get();
                Assert.fail((String)"We expected a TimeoutException.");
            }
            catch (ExecutionException e) {
                Assert.assertTrue((boolean)(ExceptionUtils.stripExecutionException((Throwable)e) instanceof TimeoutException));
            }
            slotRequestTimeoutFuture.get();
            Assert.assertEquals((long)0L, (long)pool.getNumberOfPendingRequests().get().intValue());
            AllocationID allocationId = (AllocationID)allocationIdFuture.get();
            SlotOffer slotOffer = new SlotOffer(allocationId, 0, AvailableSlotsTest.DEFAULT_TESTING_PROFILE);
            LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
            SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
            slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();
            Assert.assertTrue((boolean)((Boolean)slotPoolGateway.offerSlot((TaskManagerLocation)taskManagerLocation, (TaskManagerGateway)taskManagerGateway, slotOffer).get()));
            Assert.assertTrue((boolean)pool.containsAvailableSlot(allocationId).get());
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)pool, (Time)timeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testProviderAndOwnerSlotAllocationTimeout() throws Exception {
        JobID jid = new JobID();
        TestingSlotPool pool = new TestingSlotPool(rpcService, jid, (Clock)SystemClock.getInstance(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime());
        CompletableFuture releaseSlotFuture = new CompletableFuture();
        pool.setReleaseSlotConsumer(slotRequestID -> releaseSlotFuture.complete(slotRequestID));
        try {
            pool.start(JobMasterId.generate(), "foobar");
            TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
            pool.connectToResourceManager(resourceManagerGateway);
            CompletableFuture future = pool.getSlotProvider().allocateSlot((ScheduledUnit)new DummyScheduledUnit(), true, SlotProfile.noRequirements(), fastTimeout);
            try {
                future.get();
                Assert.fail((String)"We expected a TimeoutException.");
            }
            catch (ExecutionException e) {
                Assert.assertTrue((boolean)(ExceptionUtils.stripExecutionException((Throwable)e) instanceof TimeoutException));
            }
            releaseSlotFuture.get();
            Assert.assertEquals((long)0L, (long)pool.getNumberOfPendingRequests().get().intValue());
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)pool, (Time)timeout);
        }
    }

    static {
        timeout = Time.seconds((long)10L);
        fastTimeout = Time.milliseconds((long)1L);
    }

    private static final class TestingSlotPool
    extends SlotPool {
        private volatile Consumer<SlotRequestId> releaseSlotConsumer = null;
        private volatile Consumer<SlotRequestId> timeoutPendingSlotRequestConsumer = null;

        public TestingSlotPool(RpcService rpcService, JobID jobId, Clock clock, Time rpcTimeout, Time idleSlotTimeout) {
            super(rpcService, jobId, (SchedulingStrategy)LocationPreferenceSchedulingStrategy.getInstance(), clock, rpcTimeout, idleSlotTimeout);
        }

        public void setReleaseSlotConsumer(Consumer<SlotRequestId> releaseSlotConsumer) {
            this.releaseSlotConsumer = (Consumer)Preconditions.checkNotNull(releaseSlotConsumer);
        }

        public void setTimeoutPendingSlotRequestConsumer(Consumer<SlotRequestId> timeoutPendingSlotRequestConsumer) {
            this.timeoutPendingSlotRequestConsumer = (Consumer)Preconditions.checkNotNull(timeoutPendingSlotRequestConsumer);
        }

        public CompletableFuture<Acknowledge> releaseSlot(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, @Nullable Throwable cause) {
            Consumer<SlotRequestId> currentReleaseSlotConsumer = this.releaseSlotConsumer;
            CompletableFuture acknowledgeCompletableFuture = super.releaseSlot(slotRequestId, slotSharingGroupId, cause);
            if (currentReleaseSlotConsumer != null) {
                currentReleaseSlotConsumer.accept(slotRequestId);
            }
            return acknowledgeCompletableFuture;
        }

        protected void timeoutPendingSlotRequest(SlotRequestId slotRequestId) {
            Consumer<SlotRequestId> currentTimeoutPendingSlotRequestConsumer = this.timeoutPendingSlotRequestConsumer;
            if (currentTimeoutPendingSlotRequestConsumer != null) {
                currentTimeoutPendingSlotRequestConsumer.accept(slotRequestId);
            }
            super.timeoutPendingSlotRequest(slotRequestId);
        }

        CompletableFuture<Boolean> containsAllocatedSlot(AllocationID allocationId) {
            return this.callAsync(() -> this.getAllocatedSlots().contains(allocationId), timeout);
        }

        CompletableFuture<Boolean> containsAvailableSlot(AllocationID allocationId) {
            return this.callAsync(() -> this.getAvailableSlots().contains(allocationId), timeout);
        }

        CompletableFuture<Integer> getNumberOfPendingRequests() {
            return this.callAsync(() -> this.getPendingRequests().size(), timeout);
        }

        CompletableFuture<Integer> getNumberOfWaitingForResourceRequests() {
            return this.callAsync(() -> this.getWaitingForResourceManager().size(), timeout);
        }
    }
}

