/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmanager.scheduler;

import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;

public class ScheduleWithCoLocationHintTest
extends TestLogger {
    @Test
    public void scheduleAllSharedAndCoLocated() {
        try {
            JobVertexID jid1 = new JobVertexID();
            JobVertexID jid2 = new JobVertexID();
            Scheduler scheduler = new Scheduler((Executor)TestingUtils.directExecutionContext());
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(2));
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(2));
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(2));
            Assert.assertEquals((long)6L, (long)scheduler.getNumberOfAvailableSlots());
            SlotSharingGroup sharingGroup = new SlotSharingGroup();
            CoLocationGroup ccg = new CoLocationGroup();
            CoLocationConstraint c1 = new CoLocationConstraint(ccg);
            CoLocationConstraint c2 = new CoLocationConstraint(ccg);
            CoLocationConstraint c3 = new CoLocationConstraint(ccg);
            CoLocationConstraint c4 = new CoLocationConstraint(ccg);
            CoLocationConstraint c5 = new CoLocationConstraint(ccg);
            CoLocationConstraint c6 = new CoLocationConstraint(ccg);
            SimpleSlot s1 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jid1, 0, 6), sharingGroup, c1), false, Collections.emptyList()).get();
            SimpleSlot s2 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jid1, 1, 6), sharingGroup, c2), false, Collections.emptyList()).get();
            SimpleSlot s3 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jid1, 2, 6), sharingGroup, c3), false, Collections.emptyList()).get();
            SimpleSlot s4 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jid1, 3, 6), sharingGroup, c4), false, Collections.emptyList()).get();
            SimpleSlot s5 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jid2, 0, 6), sharingGroup, c1), false, Collections.emptyList()).get();
            SimpleSlot s6 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jid2, 1, 6), sharingGroup, c2), false, Collections.emptyList()).get();
            SimpleSlot s7 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jid2, 2, 6), sharingGroup, c3), false, Collections.emptyList()).get();
            SimpleSlot s8 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jid1, 4, 6), sharingGroup, c5), false, Collections.emptyList()).get();
            SimpleSlot s9 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jid1, 5, 6), sharingGroup, c6), false, Collections.emptyList()).get();
            SimpleSlot s10 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jid2, 3, 6), sharingGroup, c4), false, Collections.emptyList()).get();
            SimpleSlot s11 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jid2, 4, 6), sharingGroup, c5), false, Collections.emptyList()).get();
            SimpleSlot s12 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jid2, 5, 6), sharingGroup, c6), false, Collections.emptyList()).get();
            Assert.assertNotNull((Object)s1);
            Assert.assertNotNull((Object)s2);
            Assert.assertNotNull((Object)s3);
            Assert.assertNotNull((Object)s4);
            Assert.assertNotNull((Object)s5);
            Assert.assertNotNull((Object)s6);
            Assert.assertNotNull((Object)s7);
            Assert.assertNotNull((Object)s8);
            Assert.assertNotNull((Object)s9);
            Assert.assertNotNull((Object)s10);
            Assert.assertNotNull((Object)s11);
            Assert.assertNotNull((Object)s12);
            Assert.assertEquals((long)2L, (long)s1.getRoot().getNumberLeaves());
            Assert.assertEquals((long)2L, (long)s2.getRoot().getNumberLeaves());
            Assert.assertEquals((long)2L, (long)s3.getRoot().getNumberLeaves());
            Assert.assertEquals((long)2L, (long)s4.getRoot().getNumberLeaves());
            Assert.assertEquals((long)2L, (long)s5.getRoot().getNumberLeaves());
            Assert.assertEquals((long)2L, (long)s6.getRoot().getNumberLeaves());
            Assert.assertEquals((long)2L, (long)s7.getRoot().getNumberLeaves());
            Assert.assertEquals((long)2L, (long)s8.getRoot().getNumberLeaves());
            Assert.assertEquals((long)2L, (long)s9.getRoot().getNumberLeaves());
            Assert.assertEquals((long)2L, (long)s10.getRoot().getNumberLeaves());
            Assert.assertEquals((long)2L, (long)s11.getRoot().getNumberLeaves());
            Assert.assertEquals((long)2L, (long)s12.getRoot().getNumberLeaves());
            Assert.assertEquals((Object)s1.getTaskManagerID(), (Object)s5.getTaskManagerID());
            Assert.assertEquals((Object)s2.getTaskManagerID(), (Object)s6.getTaskManagerID());
            Assert.assertEquals((Object)s3.getTaskManagerID(), (Object)s7.getTaskManagerID());
            Assert.assertEquals((Object)s4.getTaskManagerID(), (Object)s10.getTaskManagerID());
            Assert.assertEquals((Object)s8.getTaskManagerID(), (Object)s11.getTaskManagerID());
            Assert.assertEquals((Object)s9.getTaskManagerID(), (Object)s12.getTaskManagerID());
            Assert.assertEquals((Object)c1.getLocation(), (Object)s1.getTaskManagerLocation());
            Assert.assertEquals((Object)c2.getLocation(), (Object)s2.getTaskManagerLocation());
            Assert.assertEquals((Object)c3.getLocation(), (Object)s3.getTaskManagerLocation());
            Assert.assertEquals((Object)c4.getLocation(), (Object)s4.getTaskManagerLocation());
            Assert.assertEquals((Object)c5.getLocation(), (Object)s8.getTaskManagerLocation());
            Assert.assertEquals((Object)c6.getLocation(), (Object)s9.getTaskManagerLocation());
            Assert.assertEquals((long)0L, (long)scheduler.getNumberOfAvailableSlots());
            Assert.assertEquals((long)6L, (long)scheduler.getNumberOfLocalizedAssignments());
            Assert.assertEquals((long)0L, (long)scheduler.getNumberOfNonLocalizedAssignments());
            Assert.assertEquals((long)6L, (long)scheduler.getNumberOfUnconstrainedAssignments());
            s1.releaseSlot();
            s2.releaseSlot();
            s3.releaseSlot();
            s4.releaseSlot();
            s7.releaseSlot();
            s10.releaseSlot();
            s11.releaseSlot();
            s12.releaseSlot();
            Assert.assertTrue((scheduler.getNumberOfAvailableSlots() >= 1 ? 1 : 0) != 0);
            SimpleSlot single = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(new JobVertexID(), 0, 1)), false, Collections.emptyList()).get();
            Assert.assertNotNull((Object)single);
            s1.releaseSlot();
            s2.releaseSlot();
            s3.releaseSlot();
            s5.releaseSlot();
            s6.releaseSlot();
            s7.releaseSlot();
            s8.releaseSlot();
            s9.releaseSlot();
            s11.releaseSlot();
            s12.releaseSlot();
            Assert.assertEquals((long)5L, (long)scheduler.getNumberOfAvailableSlots());
            Assert.assertEquals((long)6L, (long)scheduler.getNumberOfLocalizedAssignments());
            Assert.assertEquals((long)0L, (long)scheduler.getNumberOfNonLocalizedAssignments());
            Assert.assertEquals((long)7L, (long)scheduler.getNumberOfUnconstrainedAssignments());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void scheduleWithIntermediateRelease() {
        try {
            JobVertexID jid1 = new JobVertexID();
            JobVertexID jid2 = new JobVertexID();
            JobVertexID jid3 = new JobVertexID();
            JobVertexID jid4 = new JobVertexID();
            Scheduler scheduler = new Scheduler((Executor)TestingUtils.directExecutionContext());
            Instance i1 = SchedulerTestUtils.getRandomInstance(1);
            Instance i2 = SchedulerTestUtils.getRandomInstance(1);
            scheduler.newInstanceAvailable(i1);
            scheduler.newInstanceAvailable(i2);
            Assert.assertEquals((long)2L, (long)scheduler.getNumberOfAvailableSlots());
            SlotSharingGroup sharingGroup = new SlotSharingGroup();
            CoLocationConstraint c1 = new CoLocationConstraint(new CoLocationGroup());
            SimpleSlot s1 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jid1, 0, 1), sharingGroup, c1), false, Collections.emptyList()).get();
            SimpleSlot s2 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jid2, 0, 1), sharingGroup, c1), false, Collections.emptyList()).get();
            SimpleSlot sSolo = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jid4, 0, 1)), false, Collections.emptyList()).get();
            ResourceID taskManager = s1.getTaskManagerID();
            s1.releaseSlot();
            s2.releaseSlot();
            sSolo.releaseSlot();
            SimpleSlot sNew = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jid3, 0, 1), sharingGroup, c1), false, Collections.emptyList()).get();
            Assert.assertEquals((Object)taskManager, (Object)sNew.getTaskManagerID());
            Assert.assertEquals((long)2L, (long)scheduler.getNumberOfLocalizedAssignments());
            Assert.assertEquals((long)0L, (long)scheduler.getNumberOfNonLocalizedAssignments());
            Assert.assertEquals((long)2L, (long)scheduler.getNumberOfUnconstrainedAssignments());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void scheduleWithReleaseNoResource() {
        try {
            JobVertexID jid1 = new JobVertexID();
            JobVertexID jid2 = new JobVertexID();
            JobVertexID jid3 = new JobVertexID();
            Scheduler scheduler = new Scheduler((Executor)TestingUtils.directExecutionContext());
            Instance i1 = SchedulerTestUtils.getRandomInstance(1);
            Instance i2 = SchedulerTestUtils.getRandomInstance(1);
            scheduler.newInstanceAvailable(i1);
            scheduler.newInstanceAvailable(i2);
            Assert.assertEquals((long)2L, (long)scheduler.getNumberOfAvailableSlots());
            SlotSharingGroup sharingGroup = new SlotSharingGroup();
            CoLocationConstraint c1 = new CoLocationConstraint(new CoLocationGroup());
            SimpleSlot s1 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jid1, 0, 1), sharingGroup, c1), false, Collections.emptyList()).get();
            s1.releaseSlot();
            scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jid2, 0, 1)), false, Collections.emptyList()).get();
            scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jid2, 1, 2)), false, Collections.emptyList()).get();
            try {
                scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jid3, 0, 1), sharingGroup, c1), false, Collections.emptyList()).get();
                Assert.fail((String)"Scheduled even though no resource was available.");
            }
            catch (ExecutionException e) {
                Assert.assertTrue((boolean)(e.getCause() instanceof NoResourceAvailableException));
            }
            Assert.assertEquals((long)0L, (long)scheduler.getNumberOfLocalizedAssignments());
            Assert.assertEquals((long)0L, (long)scheduler.getNumberOfNonLocalizedAssignments());
            Assert.assertEquals((long)3L, (long)scheduler.getNumberOfUnconstrainedAssignments());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void scheduleMixedCoLocationSlotSharing() {
        try {
            JobVertexID jid1 = new JobVertexID();
            JobVertexID jid2 = new JobVertexID();
            JobVertexID jid3 = new JobVertexID();
            JobVertexID jid4 = new JobVertexID();
            Scheduler scheduler = new Scheduler((Executor)TestingUtils.directExecutionContext());
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(1));
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(1));
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(1));
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(1));
            Assert.assertEquals((long)4L, (long)scheduler.getNumberOfAvailableSlots());
            CoLocationGroup grp = new CoLocationGroup();
            CoLocationConstraint clc1 = new CoLocationConstraint(grp);
            CoLocationConstraint clc2 = new CoLocationConstraint(grp);
            CoLocationConstraint clc3 = new CoLocationConstraint(grp);
            CoLocationConstraint clc4 = new CoLocationConstraint(grp);
            SlotSharingGroup shareGroup = new SlotSharingGroup();
            scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jid1, 0, 4), shareGroup), false, Collections.emptyList());
            scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jid1, 2, 4), shareGroup), false, Collections.emptyList());
            scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jid1, 1, 4), shareGroup), false, Collections.emptyList());
            scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jid1, 3, 4), shareGroup), false, Collections.emptyList());
            SimpleSlot s21 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jid2, 0, 4), shareGroup, clc1), false, Collections.emptyList()).get();
            SimpleSlot s22 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jid2, 2, 4), shareGroup, clc2), false, Collections.emptyList()).get();
            SimpleSlot s23 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jid2, 1, 4), shareGroup, clc3), false, Collections.emptyList()).get();
            SimpleSlot s24 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jid2, 3, 4), shareGroup, clc4), false, Collections.emptyList()).get();
            SimpleSlot s31 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jid3, 1, 4), shareGroup, clc2), false, Collections.emptyList()).get();
            SimpleSlot s32 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jid3, 2, 4), shareGroup, clc3), false, Collections.emptyList()).get();
            SimpleSlot s33 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jid3, 3, 4), shareGroup, clc4), false, Collections.emptyList()).get();
            SimpleSlot s34 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jid3, 0, 4), shareGroup, clc1), false, Collections.emptyList()).get();
            scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jid4, 0, 4), shareGroup), false, Collections.emptyList());
            scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jid4, 1, 4), shareGroup), false, Collections.emptyList());
            scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jid4, 2, 4), shareGroup), false, Collections.emptyList());
            scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(jid4, 3, 4), shareGroup), false, Collections.emptyList());
            Assert.assertEquals((Object)s21.getTaskManagerID(), (Object)s34.getTaskManagerID());
            Assert.assertEquals((Object)s22.getTaskManagerID(), (Object)s31.getTaskManagerID());
            Assert.assertEquals((Object)s23.getTaskManagerID(), (Object)s32.getTaskManagerID());
            Assert.assertEquals((Object)s24.getTaskManagerID(), (Object)s33.getTaskManagerID());
            Assert.assertEquals((long)4L, (long)scheduler.getNumberOfLocalizedAssignments());
            Assert.assertEquals((long)0L, (long)scheduler.getNumberOfNonLocalizedAssignments());
            Assert.assertEquals((long)12L, (long)scheduler.getNumberOfUnconstrainedAssignments());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testGetsNonLocalFromSharingGroupFirst() {
        try {
            JobVertexID jid1 = new JobVertexID();
            JobVertexID jid2 = new JobVertexID();
            JobVertexID jid3 = new JobVertexID();
            Scheduler scheduler = new Scheduler((Executor)TestingUtils.directExecutionContext());
            Instance i1 = SchedulerTestUtils.getRandomInstance(1);
            Instance i2 = SchedulerTestUtils.getRandomInstance(1);
            TaskManagerLocation loc1 = i1.getTaskManagerLocation();
            TaskManagerLocation loc2 = i2.getTaskManagerLocation();
            scheduler.newInstanceAvailable(i2);
            scheduler.newInstanceAvailable(i1);
            Assert.assertEquals((long)2L, (long)scheduler.getNumberOfAvailableSlots());
            SlotSharingGroup sharingGroup = new SlotSharingGroup();
            CoLocationGroup ccg = new CoLocationGroup();
            CoLocationConstraint cc1 = new CoLocationConstraint(ccg);
            CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
            SimpleSlot s1 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get();
            SimpleSlot s2 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup), false, Collections.singleton(loc2)).get();
            SimpleSlot s3 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup, cc1), false, Collections.singleton(loc1)).get();
            SimpleSlot s4 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2), false, Collections.singleton(loc1)).get();
            SimpleSlot s5 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jid3, 0, 2, loc2), sharingGroup, cc1), false, Collections.singleton(loc2)).get();
            SimpleSlot s6 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jid3, 1, 2, loc1), sharingGroup, cc2), false, Collections.singleton(loc1)).get();
            Assert.assertEquals((long)3L, (long)s1.getRoot().getNumberLeaves());
            Assert.assertEquals((long)3L, (long)s2.getRoot().getNumberLeaves());
            Assert.assertEquals((Object)s1.getTaskManagerID(), (Object)s3.getTaskManagerID());
            Assert.assertEquals((Object)s2.getTaskManagerID(), (Object)s4.getTaskManagerID());
            Assert.assertEquals((Object)s1.getTaskManagerID(), (Object)s5.getTaskManagerID());
            Assert.assertEquals((Object)s2.getTaskManagerID(), (Object)s6.getTaskManagerID());
            Assert.assertEquals((long)0L, (long)scheduler.getNumberOfAvailableSlots());
            Assert.assertEquals((long)5L, (long)scheduler.getNumberOfLocalizedAssignments());
            Assert.assertEquals((long)1L, (long)scheduler.getNumberOfNonLocalizedAssignments());
            Assert.assertEquals((long)0L, (long)scheduler.getNumberOfUnconstrainedAssignments());
            s1.releaseSlot();
            s2.releaseSlot();
            s3.releaseSlot();
            s4.releaseSlot();
            s5.releaseSlot();
            s6.releaseSlot();
            Assert.assertEquals((long)2L, (long)scheduler.getNumberOfAvailableSlots());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testSlotReleasedInBetween() {
        try {
            JobVertexID jid1 = new JobVertexID();
            JobVertexID jid2 = new JobVertexID();
            Scheduler scheduler = new Scheduler((Executor)TestingUtils.directExecutionContext());
            Instance i1 = SchedulerTestUtils.getRandomInstance(1);
            Instance i2 = SchedulerTestUtils.getRandomInstance(1);
            TaskManagerLocation loc1 = i1.getTaskManagerLocation();
            TaskManagerLocation loc2 = i2.getTaskManagerLocation();
            scheduler.newInstanceAvailable(i2);
            scheduler.newInstanceAvailable(i1);
            Assert.assertEquals((long)2L, (long)scheduler.getNumberOfAvailableSlots());
            SlotSharingGroup sharingGroup = new SlotSharingGroup();
            CoLocationGroup ccg = new CoLocationGroup();
            CoLocationConstraint cc1 = new CoLocationConstraint(ccg);
            CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
            SimpleSlot s1 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false, Collections.singleton(loc1)).get();
            SimpleSlot s2 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false, Collections.singleton(loc2)).get();
            s1.releaseSlot();
            s2.releaseSlot();
            Assert.assertEquals((long)2L, (long)scheduler.getNumberOfAvailableSlots());
            Assert.assertEquals((long)0L, (long)sharingGroup.getTaskAssignment().getNumberOfSlots());
            SimpleSlot s3 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1), false, Collections.singleton(loc2)).get();
            SimpleSlot s4 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2), false, Collections.singleton(loc1)).get();
            Assert.assertEquals((Object)i1.getTaskManagerID(), (Object)s3.getTaskManagerID());
            Assert.assertEquals((Object)i2.getTaskManagerID(), (Object)s4.getTaskManagerID());
            s3.releaseSlot();
            s4.releaseSlot();
            Assert.assertEquals((long)2L, (long)scheduler.getNumberOfAvailableSlots());
            Assert.assertEquals((long)4L, (long)scheduler.getNumberOfLocalizedAssignments());
            Assert.assertEquals((long)0L, (long)scheduler.getNumberOfNonLocalizedAssignments());
            Assert.assertEquals((long)0L, (long)scheduler.getNumberOfUnconstrainedAssignments());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testSlotReleasedInBetweenAndNoNewLocal() {
        try {
            JobVertexID jid1 = new JobVertexID();
            JobVertexID jid2 = new JobVertexID();
            JobVertexID jidx = new JobVertexID();
            Scheduler scheduler = new Scheduler((Executor)TestingUtils.directExecutionContext());
            Instance i1 = SchedulerTestUtils.getRandomInstance(1);
            Instance i2 = SchedulerTestUtils.getRandomInstance(1);
            TaskManagerLocation loc1 = i1.getTaskManagerLocation();
            TaskManagerLocation loc2 = i2.getTaskManagerLocation();
            scheduler.newInstanceAvailable(i2);
            scheduler.newInstanceAvailable(i1);
            Assert.assertEquals((long)2L, (long)scheduler.getNumberOfAvailableSlots());
            SlotSharingGroup sharingGroup = new SlotSharingGroup();
            CoLocationGroup ccg = new CoLocationGroup();
            CoLocationConstraint cc1 = new CoLocationConstraint(ccg);
            CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
            SimpleSlot s1 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false, Collections.singleton(loc1)).get();
            SimpleSlot s2 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false, Collections.singleton(loc2)).get();
            s1.releaseSlot();
            s2.releaseSlot();
            Assert.assertEquals((long)2L, (long)scheduler.getNumberOfAvailableSlots());
            Assert.assertEquals((long)0L, (long)sharingGroup.getTaskAssignment().getNumberOfSlots());
            SimpleSlot sa = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jidx, 0, 2, new TaskManagerLocation[0])), false, Collections.emptyList()).get();
            SimpleSlot sb = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jidx, 1, 2, new TaskManagerLocation[0])), false, Collections.emptyList()).get();
            try {
                scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1), false, Collections.singleton(loc2)).get();
                Assert.fail((String)"should not be able to find a resource");
            }
            catch (ExecutionException e) {
                Assert.assertTrue((boolean)(e.getCause() instanceof NoResourceAvailableException));
            }
            catch (Exception e) {
                Assert.fail((String)"wrong exception");
            }
            sa.releaseSlot();
            sb.releaseSlot();
            Assert.assertEquals((long)2L, (long)scheduler.getNumberOfAvailableSlots());
            Assert.assertEquals((long)2L, (long)scheduler.getNumberOfLocalizedAssignments());
            Assert.assertEquals((long)0L, (long)scheduler.getNumberOfNonLocalizedAssignments());
            Assert.assertEquals((long)2L, (long)scheduler.getNumberOfUnconstrainedAssignments());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testScheduleOutOfOrder() {
        try {
            JobVertexID jid1 = new JobVertexID();
            JobVertexID jid2 = new JobVertexID();
            Scheduler scheduler = new Scheduler((Executor)TestingUtils.directExecutionContext());
            Instance i1 = SchedulerTestUtils.getRandomInstance(1);
            Instance i2 = SchedulerTestUtils.getRandomInstance(1);
            TaskManagerLocation loc1 = i1.getTaskManagerLocation();
            scheduler.newInstanceAvailable(i2);
            scheduler.newInstanceAvailable(i1);
            Assert.assertEquals((long)2L, (long)scheduler.getNumberOfAvailableSlots());
            SlotSharingGroup sharingGroup = new SlotSharingGroup();
            CoLocationGroup ccg = new CoLocationGroup();
            CoLocationConstraint cc1 = new CoLocationConstraint(ccg);
            CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
            SimpleSlot s1 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false, Collections.singleton(loc1)).get();
            SimpleSlot s2 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup, cc2), false, Collections.singleton(loc1)).get();
            SimpleSlot s3 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc1), false, Collections.singleton(loc1)).get();
            SimpleSlot s4 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup, cc2), false, Collections.singleton(loc1)).get();
            Assert.assertEquals((long)2L, (long)s1.getRoot().getNumberLeaves());
            Assert.assertEquals((long)2L, (long)s2.getRoot().getNumberLeaves());
            Assert.assertEquals((Object)s1.getTaskManagerID(), (Object)s3.getTaskManagerID());
            Assert.assertEquals((Object)s2.getTaskManagerID(), (Object)s4.getTaskManagerID());
            Assert.assertEquals((long)0L, (long)scheduler.getNumberOfAvailableSlots());
            Assert.assertEquals((long)3L, (long)scheduler.getNumberOfLocalizedAssignments());
            Assert.assertEquals((long)1L, (long)scheduler.getNumberOfNonLocalizedAssignments());
            Assert.assertEquals((long)0L, (long)scheduler.getNumberOfUnconstrainedAssignments());
            s1.releaseSlot();
            s2.releaseSlot();
            s3.releaseSlot();
            s4.releaseSlot();
            Assert.assertEquals((long)2L, (long)scheduler.getNumberOfAvailableSlots());
            Assert.assertEquals((long)0L, (long)sharingGroup.getTaskAssignment().getNumberOfSlots());
            Assert.assertEquals((long)0L, (long)sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup((AbstractID)jid1));
            Assert.assertEquals((long)0L, (long)sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup((AbstractID)jid2));
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void nonColocationFollowsCoLocation() {
        try {
            JobVertexID jid1 = new JobVertexID();
            JobVertexID jid2 = new JobVertexID();
            Scheduler scheduler = new Scheduler((Executor)TestingUtils.directExecutionContext());
            Instance i1 = SchedulerTestUtils.getRandomInstance(1);
            Instance i2 = SchedulerTestUtils.getRandomInstance(1);
            TaskManagerLocation loc1 = i1.getTaskManagerLocation();
            TaskManagerLocation loc2 = i2.getTaskManagerLocation();
            scheduler.newInstanceAvailable(i2);
            scheduler.newInstanceAvailable(i1);
            Assert.assertEquals((long)2L, (long)scheduler.getNumberOfAvailableSlots());
            SlotSharingGroup sharingGroup = new SlotSharingGroup();
            CoLocationGroup ccg = new CoLocationGroup();
            CoLocationConstraint cc1 = new CoLocationConstraint(ccg);
            CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
            SimpleSlot s1 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false, Collections.emptyList()).get();
            SimpleSlot s2 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false, Collections.emptyList()).get();
            SimpleSlot s3 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup), false, Collections.emptyList()).get();
            SimpleSlot s4 = (SimpleSlot)scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup), false, Collections.emptyList()).get();
            Assert.assertEquals((long)2L, (long)s1.getRoot().getNumberLeaves());
            Assert.assertEquals((long)2L, (long)s2.getRoot().getNumberLeaves());
            s1.releaseSlot();
            s2.releaseSlot();
            s3.releaseSlot();
            s4.releaseSlot();
            Assert.assertEquals((long)2L, (long)scheduler.getNumberOfAvailableSlots());
            Assert.assertEquals((long)0L, (long)sharingGroup.getTaskAssignment().getNumberOfSlots());
            Assert.assertEquals((long)0L, (long)sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup((AbstractID)jid1));
            Assert.assertEquals((long)0L, (long)sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup((AbstractID)jid2));
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }
}

