/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmanager.scheduler;

import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSchedulingStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.SchedulingStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Before;

public class SchedulerTestBase
extends TestLogger {
    protected TestingSlotProvider testingSlotProvider;
    private RpcService rpcService;

    @Before
    public void setup() throws Exception {
        this.rpcService = new TestingRpcService();
        JobID jobId = new JobID();
        TestingSlotPool slotPool = new TestingSlotPool(this.rpcService, jobId, (SchedulingStrategy)LocationPreferenceSchedulingStrategy.getInstance());
        this.testingSlotProvider = new TestingSlotPoolSlotProvider(slotPool);
        JobMasterId jobMasterId = JobMasterId.generate();
        String jobManagerAddress = "localhost";
        slotPool.start(jobMasterId, "localhost");
    }

    @After
    public void teardown() throws Exception {
        if (this.testingSlotProvider != null) {
            this.testingSlotProvider.shutdown();
            this.testingSlotProvider = null;
        }
        if (this.rpcService != null) {
            this.rpcService.stopService().get();
            this.rpcService = null;
        }
    }

    private static final class TestingSlotPool
    extends SlotPool {
        public TestingSlotPool(RpcService rpcService, JobID jobId, SchedulingStrategy schedulingStrategy) {
            super(rpcService, jobId, schedulingStrategy);
        }

        CompletableFuture<Integer> getNumberOfAvailableSlots() {
            return this.callAsync(() -> this.getAvailableSlots().size(), TestingUtils.infiniteTime());
        }

        CompletableFuture<Integer> getNumberOfSharedSlots(SlotSharingGroupId slotSharingGroupId) {
            return this.callAsync(() -> {
                SlotSharingManager multiTaskSlotManager = (SlotSharingManager)this.slotSharingManagers.get(slotSharingGroupId);
                if (multiTaskSlotManager != null) {
                    return multiTaskSlotManager.getResolvedRootSlots().size();
                }
                throw new FlinkException("No MultiTaskSlotManager registered under " + slotSharingGroupId + '.');
            }, TestingUtils.infiniteTime());
        }

        CompletableFuture<Integer> getNumberOfAvailableSlotsForGroup(SlotSharingGroupId slotSharingGroupId, JobVertexID jobVertexId) {
            return this.callAsync(() -> {
                SlotSharingManager multiTaskSlotManager = (SlotSharingManager)this.slotSharingManagers.get(slotSharingGroupId);
                if (multiTaskSlotManager != null) {
                    int availableSlots = 0;
                    for (SlotSharingManager.MultiTaskSlot multiTaskSlot : multiTaskSlotManager.getResolvedRootSlots()) {
                        if (multiTaskSlot.contains((AbstractID)jobVertexId)) continue;
                        ++availableSlots;
                    }
                    return availableSlots;
                }
                throw new FlinkException("No MultiTaskSlotmanager registered under " + slotSharingGroupId + '.');
            }, TestingUtils.infiniteTime());
        }
    }

    private static final class TestingSlotPoolSlotProvider
    implements TestingSlotProvider {
        private final TestingSlotPool slotPool;
        private final SlotProvider slotProvider;
        private final AtomicInteger numberOfLocalizedAssignments;
        private final AtomicInteger numberOfNonLocalizedAssignments;
        private final AtomicInteger numberOfUnconstrainedAssignments;
        private final AtomicInteger numberOfHostLocalizedAssignments;

        private TestingSlotPoolSlotProvider(TestingSlotPool slotPool) {
            this.slotPool = (TestingSlotPool)((Object)Preconditions.checkNotNull((Object)((Object)slotPool)));
            this.slotProvider = slotPool.getSlotProvider();
            this.numberOfLocalizedAssignments = new AtomicInteger();
            this.numberOfNonLocalizedAssignments = new AtomicInteger();
            this.numberOfUnconstrainedAssignments = new AtomicInteger();
            this.numberOfHostLocalizedAssignments = new AtomicInteger();
        }

        @Override
        public TaskManagerLocation addTaskManager(int numberSlots) {
            Collection acceptedSlotOffers;
            LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
            ResourceID resourceId = taskManagerLocation.getResourceID();
            SlotPoolGateway slotPoolGateway = (SlotPoolGateway)this.slotPool.getSelfGateway(SlotPoolGateway.class);
            try {
                slotPoolGateway.registerTaskManager(resourceId).get();
            }
            catch (Exception e) {
                throw new RuntimeException("Unexpected exception occurred. This indicates a programming bug.", e);
            }
            SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
            ArrayList<SlotOffer> slotOffers = new ArrayList<SlotOffer>(numberSlots);
            for (int i = 0; i < numberSlots; ++i) {
                SlotOffer slotOffer = new SlotOffer(new AllocationID(), i, ResourceProfile.UNKNOWN);
                slotOffers.add(slotOffer);
            }
            try {
                acceptedSlotOffers = (Collection)slotPoolGateway.offerSlots((TaskManagerLocation)taskManagerLocation, (TaskManagerGateway)taskManagerGateway, slotOffers).get();
            }
            catch (Exception e) {
                throw new RuntimeException("Unexpected exception occurred. This indicates a programming bug.", e);
            }
            Preconditions.checkState((acceptedSlotOffers.size() == numberSlots ? 1 : 0) != 0);
            return taskManagerLocation;
        }

        @Override
        public void releaseTaskManager(ResourceID resourceId) {
            try {
                this.slotPool.releaseTaskManager(resourceId, null).get();
            }
            catch (Exception e) {
                throw new RuntimeException("Should not have happened.", e);
            }
        }

        @Override
        public int getNumberOfAvailableSlots() {
            try {
                return this.slotPool.getNumberOfAvailableSlots().get();
            }
            catch (Exception e) {
                throw new RuntimeException("Should not have happened.", e);
            }
        }

        @Override
        public int getNumberOfLocalizedAssignments() {
            return this.numberOfLocalizedAssignments.get();
        }

        @Override
        public int getNumberOfNonLocalizedAssignments() {
            return this.numberOfNonLocalizedAssignments.get();
        }

        @Override
        public int getNumberOfUnconstrainedAssignments() {
            return this.numberOfUnconstrainedAssignments.get();
        }

        @Override
        public int getNumberOfHostLocalizedAssignments() {
            return this.numberOfHostLocalizedAssignments.get();
        }

        @Override
        public int getNumberOfSlots(SlotSharingGroup slotSharingGroup) {
            try {
                return this.slotPool.getNumberOfSharedSlots(slotSharingGroup.getSlotSharingGroupId()).get();
            }
            catch (Exception e) {
                throw new RuntimeException("Should not have happened.", e);
            }
        }

        @Override
        public int getNumberOfAvailableSlotsForGroup(SlotSharingGroup slotSharingGroup, JobVertexID jobVertexId) {
            try {
                return this.slotPool.getNumberOfAvailableSlotsForGroup(slotSharingGroup.getSlotSharingGroupId(), jobVertexId).get();
            }
            catch (Exception e) {
                throw new RuntimeException("Should not have happened.", e);
            }
        }

        @Override
        public void shutdown() throws Exception {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)this.slotPool, (Time)TestingUtils.TIMEOUT());
        }

        public CompletableFuture<LogicalSlot> allocateSlot(SlotRequestId slotRequestId, ScheduledUnit task, boolean allowQueued, SlotProfile slotProfile, Time allocationTimeout) {
            return this.slotProvider.allocateSlot(task, allowQueued, slotProfile, allocationTimeout).thenApply(logicalSlot -> {
                switch (logicalSlot.getLocality()) {
                    case LOCAL: {
                        this.numberOfLocalizedAssignments.incrementAndGet();
                        break;
                    }
                    case UNCONSTRAINED: {
                        this.numberOfUnconstrainedAssignments.incrementAndGet();
                        break;
                    }
                    case NON_LOCAL: {
                        this.numberOfNonLocalizedAssignments.incrementAndGet();
                        break;
                    }
                    case HOST_LOCAL: {
                        this.numberOfHostLocalizedAssignments.incrementAndGet();
                        break;
                    }
                }
                return logicalSlot;
            });
        }

        public CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) {
            return CompletableFuture.completedFuture(Acknowledge.get());
        }
    }

    protected static interface TestingSlotProvider
    extends SlotProvider {
        public TaskManagerLocation addTaskManager(int var1);

        public void releaseTaskManager(ResourceID var1);

        public int getNumberOfAvailableSlots();

        public int getNumberOfLocalizedAssignments();

        public int getNumberOfNonLocalizedAssignments();

        public int getNumberOfUnconstrainedAssignments();

        public int getNumberOfHostLocalizedAssignments();

        public int getNumberOfSlots(SlotSharingGroup var1);

        public int getNumberOfAvailableSlotsForGroup(SlotSharingGroup var1, JobVertexID var2);

        public void shutdown() throws Exception;
    }
}

