/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmanager.scheduler;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.Assert;
import org.junit.Test;

public class SchedulerIsolatedTasksTest {
    @Test
    public void testAddAndRemoveInstance() {
        try {
            Scheduler scheduler = new Scheduler((Executor)TestingUtils.defaultExecutionContext());
            Instance i1 = SchedulerTestUtils.getRandomInstance(2);
            Instance i2 = SchedulerTestUtils.getRandomInstance(2);
            Instance i3 = SchedulerTestUtils.getRandomInstance(2);
            Assert.assertEquals((long)0L, (long)scheduler.getNumberOfAvailableInstances());
            Assert.assertEquals((long)0L, (long)scheduler.getNumberOfAvailableSlots());
            scheduler.newInstanceAvailable(i1);
            Assert.assertEquals((long)1L, (long)scheduler.getNumberOfAvailableInstances());
            Assert.assertEquals((long)2L, (long)scheduler.getNumberOfAvailableSlots());
            scheduler.newInstanceAvailable(i2);
            Assert.assertEquals((long)2L, (long)scheduler.getNumberOfAvailableInstances());
            Assert.assertEquals((long)4L, (long)scheduler.getNumberOfAvailableSlots());
            scheduler.newInstanceAvailable(i3);
            Assert.assertEquals((long)3L, (long)scheduler.getNumberOfAvailableInstances());
            Assert.assertEquals((long)6L, (long)scheduler.getNumberOfAvailableSlots());
            try {
                scheduler.newInstanceAvailable(i2);
                Assert.fail((String)"Scheduler accepted instance twice");
            }
            catch (IllegalArgumentException illegalArgumentException) {
                // empty catch block
            }
            Assert.assertEquals((long)3L, (long)scheduler.getNumberOfAvailableInstances());
            Assert.assertEquals((long)6L, (long)scheduler.getNumberOfAvailableSlots());
            scheduler.instanceDied(i2);
            Assert.assertEquals((long)2L, (long)scheduler.getNumberOfAvailableInstances());
            Assert.assertEquals((long)4L, (long)scheduler.getNumberOfAvailableSlots());
            try {
                scheduler.newInstanceAvailable(i2);
                Assert.fail((String)"Scheduler accepted dead instance");
            }
            catch (IllegalArgumentException illegalArgumentException) {
                // empty catch block
            }
            scheduler.instanceDied(i1);
            Assert.assertEquals((long)1L, (long)scheduler.getNumberOfAvailableInstances());
            Assert.assertEquals((long)2L, (long)scheduler.getNumberOfAvailableSlots());
            scheduler.instanceDied(i3);
            Assert.assertEquals((long)0L, (long)scheduler.getNumberOfAvailableInstances());
            Assert.assertEquals((long)0L, (long)scheduler.getNumberOfAvailableSlots());
            Assert.assertFalse((boolean)i1.isAlive());
            Assert.assertFalse((boolean)i2.isAlive());
            Assert.assertFalse((boolean)i3.isAlive());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testScheduleImmediately() {
        try {
            Scheduler scheduler = new Scheduler((Executor)TestingUtils.defaultExecutionContext());
            Assert.assertEquals((long)0L, (long)scheduler.getNumberOfAvailableSlots());
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(2));
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(1));
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(2));
            Assert.assertEquals((long)5L, (long)scheduler.getNumberOfAvailableSlots());
            SimpleSlot s1 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false, Collections.emptyList()).get();
            SimpleSlot s2 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false, Collections.emptyList()).get();
            SimpleSlot s3 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false, Collections.emptyList()).get();
            SimpleSlot s4 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false, Collections.emptyList()).get();
            SimpleSlot s5 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false, Collections.emptyList()).get();
            Assert.assertTrue((boolean)SchedulerTestUtils.areAllDistinct(s1, s2, s3, s4, s5));
            try {
                scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false, Collections.emptyList()).get();
                Assert.fail((String)"Scheduler accepted scheduling request without available resource.");
            }
            catch (ExecutionException e) {
                Assert.assertTrue((boolean)(e.getCause() instanceof NoResourceAvailableException));
            }
            s3.releaseSlot();
            s4.releaseSlot();
            Assert.assertEquals((long)2L, (long)scheduler.getNumberOfAvailableSlots());
            SimpleSlot s6 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false, Collections.emptyList()).get();
            SimpleSlot s7 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false, Collections.emptyList()).get();
            Assert.assertTrue((boolean)SchedulerTestUtils.areAllDistinct(s1, s2, s3, s4, s5, s6, s7));
            s1.releaseSlot();
            s2.releaseSlot();
            s5.releaseSlot();
            s6.releaseSlot();
            s7.releaseSlot();
            Assert.assertEquals((long)5L, (long)scheduler.getNumberOfAvailableSlots());
            s1.releaseSlot();
            s2.releaseSlot();
            s5.releaseSlot();
            s6.releaseSlot();
            s7.releaseSlot();
            Assert.assertEquals((long)5L, (long)scheduler.getNumberOfAvailableSlots());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testScheduleQueueing() {
        int NUM_INSTANCES = 50;
        int NUM_SLOTS_PER_INSTANCE = 3;
        int NUM_TASKS_TO_SCHEDULE = 2000;
        try {
            Scheduler scheduler = new Scheduler((Executor)TestingUtils.defaultExecutionContext());
            for (int i = 0; i < 50; ++i) {
                scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance((int)(Math.random() * 3.0) + 1));
            }
            Assert.assertEquals((long)50L, (long)scheduler.getNumberOfAvailableInstances());
            int totalSlots = scheduler.getNumberOfAvailableSlots();
            ArrayList<CompletableFuture> allAllocatedSlots = new ArrayList<CompletableFuture>();
            final HashSet toRelease = new HashSet();
            final AtomicBoolean errored = new AtomicBoolean(false);
            Runnable disposer = new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 * Enabled aggressive block sorting
                 * Enabled unnecessary exception pruning
                 * Enabled aggressive exception aggregation
                 */
                @Override
                public void run() {
                    try {
                        for (int recycled = 0; recycled < 2000; ++recycled) {
                            Set set = toRelease;
                            synchronized (set) {
                                while (toRelease.isEmpty()) {
                                    toRelease.wait();
                                }
                                Iterator iter = toRelease.iterator();
                                SimpleSlot next = (SimpleSlot)iter.next();
                                iter.remove();
                                next.releaseSlot();
                                continue;
                            }
                        }
                        return;
                    }
                    catch (Throwable t) {
                        errored.set(true);
                    }
                }
            };
            Thread disposeThread = new Thread(disposer);
            disposeThread.start();
            for (int i = 0; i < 2000; ++i) {
                CompletableFuture future = scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), true, Collections.emptyList());
                future.thenAcceptAsync(slot -> {
                    Set set = toRelease;
                    synchronized (set) {
                        toRelease.add(slot);
                        toRelease.notifyAll();
                    }
                }, (Executor)TestingUtils.defaultExecutionContext());
                allAllocatedSlots.add(future);
            }
            disposeThread.join();
            Assert.assertFalse((String)"The slot releasing thread caused an error.", (boolean)errored.get());
            ArrayList slotsAfter = new ArrayList();
            for (CompletableFuture future : allAllocatedSlots) {
                slotsAfter.add(future.get());
            }
            Assert.assertEquals((String)"All instances should have available slots.", (long)50L, (long)scheduler.getNumberOfInstancesWithAvailableSlots());
            Assert.assertTrue((boolean)SchedulerTestUtils.areAllDistinct(slotsAfter.toArray()));
            Assert.assertEquals((String)"All slots should be available.", (long)totalSlots, (long)scheduler.getNumberOfAvailableSlots());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testScheduleWithDyingInstances() {
        try {
            Scheduler scheduler = new Scheduler((Executor)TestingUtils.defaultExecutionContext());
            Instance i1 = SchedulerTestUtils.getRandomInstance(2);
            Instance i2 = SchedulerTestUtils.getRandomInstance(2);
            Instance i3 = SchedulerTestUtils.getRandomInstance(1);
            scheduler.newInstanceAvailable(i1);
            scheduler.newInstanceAvailable(i2);
            scheduler.newInstanceAvailable(i3);
            ArrayList slots = new ArrayList();
            slots.add(scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false, Collections.emptyList()).get());
            slots.add(scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false, Collections.emptyList()).get());
            slots.add(scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false, Collections.emptyList()).get());
            slots.add(scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false, Collections.emptyList()).get());
            slots.add(scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false, Collections.emptyList()).get());
            i2.markDead();
            for (SimpleSlot slot : slots) {
                if (slot.getOwner() == i2) {
                    Assert.assertTrue((boolean)slot.isCanceled());
                } else {
                    Assert.assertFalse((boolean)slot.isCanceled());
                }
                slot.releaseSlot();
            }
            Assert.assertEquals((long)3L, (long)scheduler.getNumberOfAvailableSlots());
            i1.markDead();
            i3.markDead();
            try {
                scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false, Collections.emptyList()).get();
                Assert.fail((String)"Scheduler served a slot from a dead instance");
            }
            catch (ExecutionException e) {
                Assert.assertTrue((boolean)(e.getCause() instanceof NoResourceAvailableException));
            }
            catch (Exception e) {
                Assert.fail((String)"Wrong exception type.");
            }
            Assert.assertEquals((long)0L, (long)scheduler.getNumberOfInstancesWithAvailableSlots());
            Assert.assertEquals((long)0L, (long)scheduler.getNumberOfAvailableSlots());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testSchedulingLocation() {
        try {
            Scheduler scheduler = new Scheduler((Executor)TestingUtils.defaultExecutionContext());
            Instance i1 = SchedulerTestUtils.getRandomInstance(2);
            Instance i2 = SchedulerTestUtils.getRandomInstance(2);
            Instance i3 = SchedulerTestUtils.getRandomInstance(2);
            scheduler.newInstanceAvailable(i1);
            scheduler.newInstanceAvailable(i2);
            scheduler.newInstanceAvailable(i3);
            SimpleSlot s1 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(new Instance[0])), false, Collections.emptyList()).get();
            Instance first = (Instance)s1.getOwner();
            Instance second = first != i1 ? i1 : i2;
            Instance third = first == i3 ? i2 : i3;
            SimpleSlot s2 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(s1.getTaskManagerLocation())), false, Collections.singleton(s1.getTaskManagerLocation())).get();
            Assert.assertEquals((Object)first, (Object)s2.getOwner());
            SimpleSlot s3 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(first, second)), false, Arrays.asList(first.getTaskManagerLocation(), second.getTaskManagerLocation())).get();
            Assert.assertEquals((Object)second, (Object)s3.getOwner());
            SimpleSlot s4 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(first, third)), false, Arrays.asList(first.getTaskManagerLocation(), third.getTaskManagerLocation())).get();
            SimpleSlot s5 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(first, third)), false, Arrays.asList(first.getTaskManagerLocation(), third.getTaskManagerLocation())).get();
            Assert.assertEquals((Object)third, (Object)s4.getOwner());
            Assert.assertEquals((Object)third, (Object)s5.getOwner());
            SimpleSlot s6 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(first, third)), false, Arrays.asList(first.getTaskManagerLocation(), third.getTaskManagerLocation())).get();
            Assert.assertEquals((Object)second, (Object)s6.getOwner());
            s2.releaseSlot();
            s6.releaseSlot();
            SimpleSlot s7 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(first, third)), false, Arrays.asList(first.getTaskManagerLocation(), third.getTaskManagerLocation())).get();
            Assert.assertEquals((Object)first, (Object)s7.getOwner());
            Assert.assertEquals((long)1L, (long)scheduler.getNumberOfUnconstrainedAssignments());
            Assert.assertEquals((long)1L, (long)scheduler.getNumberOfNonLocalizedAssignments());
            Assert.assertEquals((long)5L, (long)scheduler.getNumberOfLocalizedAssignments());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }
}

