/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmanager.scheduler;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestBase;
import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.Assert;
import org.junit.Test;

public class SchedulerIsolatedTasksTest
extends SchedulerTestBase {
    @Test
    public void testScheduleImmediately() throws Exception {
        Assert.assertEquals((long)0L, (long)this.testingSlotProvider.getNumberOfAvailableSlots());
        this.testingSlotProvider.addTaskManager(2);
        this.testingSlotProvider.addTaskManager(1);
        this.testingSlotProvider.addTaskManager(2);
        Assert.assertEquals((long)5L, (long)this.testingSlotProvider.getNumberOfAvailableSlots());
        LogicalSlot s1 = (LogicalSlot)this.testingSlotProvider.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        LogicalSlot s2 = (LogicalSlot)this.testingSlotProvider.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        LogicalSlot s3 = (LogicalSlot)this.testingSlotProvider.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        LogicalSlot s4 = (LogicalSlot)this.testingSlotProvider.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        LogicalSlot s5 = (LogicalSlot)this.testingSlotProvider.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        Assert.assertTrue((boolean)SchedulerTestUtils.areAllDistinct(s1, s2, s3, s4, s5));
        try {
            this.testingSlotProvider.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
            Assert.fail((String)"Scheduler accepted scheduling request without available resource.");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof NoResourceAvailableException));
        }
        s3.releaseSlot();
        s4.releaseSlot();
        Assert.assertEquals((long)2L, (long)this.testingSlotProvider.getNumberOfAvailableSlots());
        LogicalSlot s6 = (LogicalSlot)this.testingSlotProvider.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        LogicalSlot s7 = (LogicalSlot)this.testingSlotProvider.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        Assert.assertTrue((boolean)SchedulerTestUtils.areAllDistinct(s1, s2, s3, s4, s5, s6, s7));
        s1.releaseSlot();
        s2.releaseSlot();
        s5.releaseSlot();
        s6.releaseSlot();
        s7.releaseSlot();
        Assert.assertEquals((long)5L, (long)this.testingSlotProvider.getNumberOfAvailableSlots());
        s1.releaseSlot();
        s2.releaseSlot();
        s5.releaseSlot();
        s6.releaseSlot();
        s7.releaseSlot();
        Assert.assertEquals((long)5L, (long)this.testingSlotProvider.getNumberOfAvailableSlots());
    }

    @Test
    public void testScheduleQueueing() throws Exception {
        int NUM_INSTANCES = 50;
        int NUM_SLOTS_PER_INSTANCE = 3;
        int NUM_TASKS_TO_SCHEDULE = 2000;
        for (int i = 0; i < 50; ++i) {
            this.testingSlotProvider.addTaskManager((int)(Math.random() * 3.0) + 1);
        }
        int totalSlots = this.testingSlotProvider.getNumberOfAvailableSlots();
        ArrayList<CompletableFuture> allAllocatedSlots = new ArrayList<CompletableFuture>();
        final HashSet toRelease = new HashSet();
        final AtomicBoolean errored = new AtomicBoolean(false);
        Runnable disposer = new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             * Enabled aggressive block sorting
             * Enabled unnecessary exception pruning
             * Enabled aggressive exception aggregation
             */
            @Override
            public void run() {
                try {
                    for (int recycled = 0; recycled < 2000; ++recycled) {
                        Set set = toRelease;
                        synchronized (set) {
                            while (toRelease.isEmpty()) {
                                toRelease.wait();
                            }
                            Iterator iter = toRelease.iterator();
                            LogicalSlot next = (LogicalSlot)iter.next();
                            iter.remove();
                            next.releaseSlot();
                            continue;
                        }
                    }
                    return;
                }
                catch (Throwable t) {
                    errored.set(true);
                }
            }
        };
        Thread disposeThread = new Thread(disposer);
        disposeThread.start();
        for (int i = 0; i < 2000; ++i) {
            CompletableFuture future = this.testingSlotProvider.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), true, SlotProfile.noRequirements(), TestingUtils.infiniteTime());
            future.thenAcceptAsync(slot -> {
                Set set = toRelease;
                synchronized (set) {
                    toRelease.add(slot);
                    toRelease.notifyAll();
                }
            }, (Executor)TestingUtils.defaultExecutionContext());
            allAllocatedSlots.add(future);
        }
        disposeThread.join();
        Assert.assertFalse((String)"The slot releasing thread caused an error.", (boolean)errored.get());
        ArrayList slotsAfter = new ArrayList();
        for (CompletableFuture future : allAllocatedSlots) {
            slotsAfter.add(future.get());
        }
        Assert.assertTrue((boolean)SchedulerTestUtils.areAllDistinct(slotsAfter.toArray()));
        Assert.assertEquals((String)"All slots should be available.", (long)totalSlots, (long)this.testingSlotProvider.getNumberOfAvailableSlots());
    }

    @Test
    public void testScheduleWithDyingInstances() throws Exception {
        TaskManagerLocation taskManagerLocation1 = this.testingSlotProvider.addTaskManager(2);
        TaskManagerLocation taskManagerLocation2 = this.testingSlotProvider.addTaskManager(2);
        TaskManagerLocation taskManagerLocation3 = this.testingSlotProvider.addTaskManager(1);
        ArrayList slots = new ArrayList();
        slots.add(this.testingSlotProvider.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get());
        slots.add(this.testingSlotProvider.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get());
        slots.add(this.testingSlotProvider.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get());
        slots.add(this.testingSlotProvider.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get());
        slots.add(this.testingSlotProvider.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get());
        this.testingSlotProvider.releaseTaskManager(taskManagerLocation2.getResourceID());
        for (LogicalSlot slot : slots) {
            if (slot.getTaskManagerLocation().getResourceID().equals((Object)taskManagerLocation2.getResourceID())) {
                Assert.assertFalse((boolean)slot.isAlive());
            } else {
                Assert.assertTrue((boolean)slot.isAlive());
            }
            slot.releaseSlot();
        }
        Assert.assertEquals((long)3L, (long)this.testingSlotProvider.getNumberOfAvailableSlots());
        this.testingSlotProvider.releaseTaskManager(taskManagerLocation1.getResourceID());
        this.testingSlotProvider.releaseTaskManager(taskManagerLocation3.getResourceID());
        try {
            this.testingSlotProvider.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
            Assert.fail((String)"Scheduler served a slot from a dead instance");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof NoResourceAvailableException));
        }
        catch (Exception e) {
            Assert.fail((String)"Wrong exception type.");
        }
        Assert.assertEquals((long)0L, (long)this.testingSlotProvider.getNumberOfAvailableSlots());
    }

    @Test
    public void testSchedulingLocation() throws Exception {
        int index;
        TaskManagerLocation taskManagerLocation1 = this.testingSlotProvider.addTaskManager(2);
        TaskManagerLocation taskManagerLocation2 = this.testingSlotProvider.addTaskManager(2);
        TaskManagerLocation taskManagerLocation3 = this.testingSlotProvider.addTaskManager(2);
        LogicalSlot s1 = (LogicalSlot)this.testingSlotProvider.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(new Instance[0])), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        ResourceID firstResourceId = s1.getTaskManagerLocation().getResourceID();
        List<TaskManagerLocation> taskManagerLocations = Arrays.asList(taskManagerLocation1, taskManagerLocation2, taskManagerLocation3);
        for (index = 0; index < taskManagerLocations.size() && !Objects.equals(taskManagerLocations.get(index).getResourceID(), firstResourceId); ++index) {
        }
        TaskManagerLocation first = taskManagerLocations.get(index);
        TaskManagerLocation second = taskManagerLocations.get((index + 1) % taskManagerLocations.size());
        TaskManagerLocation third = taskManagerLocations.get((index + 2) % taskManagerLocations.size());
        LogicalSlot s2 = (LogicalSlot)this.testingSlotProvider.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(s1.getTaskManagerLocation())), false, SchedulerIsolatedTasksTest.slotProfileForLocation(s1.getTaskManagerLocation()), TestingUtils.infiniteTime()).get();
        Assert.assertEquals((Object)first.getResourceID(), (Object)s2.getTaskManagerLocation().getResourceID());
        LogicalSlot s3 = (LogicalSlot)this.testingSlotProvider.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(first, second)), false, SchedulerIsolatedTasksTest.slotProfileForLocation(first, second), TestingUtils.infiniteTime()).get();
        Assert.assertEquals((Object)second.getResourceID(), (Object)s3.getTaskManagerLocation().getResourceID());
        LogicalSlot s4 = (LogicalSlot)this.testingSlotProvider.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(first, third)), false, SchedulerIsolatedTasksTest.slotProfileForLocation(first, third), TestingUtils.infiniteTime()).get();
        LogicalSlot s5 = (LogicalSlot)this.testingSlotProvider.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(first, third)), false, SchedulerIsolatedTasksTest.slotProfileForLocation(first, third), TestingUtils.infiniteTime()).get();
        Assert.assertEquals((Object)third.getResourceID(), (Object)s4.getTaskManagerLocation().getResourceID());
        Assert.assertEquals((Object)third.getResourceID(), (Object)s5.getTaskManagerLocation().getResourceID());
        LogicalSlot s6 = (LogicalSlot)this.testingSlotProvider.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(first, third)), false, SchedulerIsolatedTasksTest.slotProfileForLocation(first, third), TestingUtils.infiniteTime()).get();
        Assert.assertEquals((Object)second.getResourceID(), (Object)s6.getTaskManagerLocation().getResourceID());
        s2.releaseSlot();
        s6.releaseSlot();
        LogicalSlot s7 = (LogicalSlot)this.testingSlotProvider.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(first, third)), false, SchedulerIsolatedTasksTest.slotProfileForLocation(first, third), TestingUtils.infiniteTime()).get();
        Assert.assertEquals((Object)first.getResourceID(), (Object)s7.getTaskManagerLocation().getResourceID());
        Assert.assertEquals((long)1L, (long)this.testingSlotProvider.getNumberOfUnconstrainedAssignments());
        Assert.assertTrue((1 == this.testingSlotProvider.getNumberOfNonLocalizedAssignments() || 1 == this.testingSlotProvider.getNumberOfHostLocalizedAssignments() ? 1 : 0) != 0);
        Assert.assertEquals((long)5L, (long)this.testingSlotProvider.getNumberOfLocalizedAssignments());
    }

    private static SlotProfile slotProfileForLocation(TaskManagerLocation ... location) {
        return new SlotProfile(ResourceProfile.UNKNOWN, Arrays.asList(location), Collections.emptyList());
    }
}

