/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.async.queue;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.flink.streaming.api.operators.async.OperatorActions;
import org.apache.flink.streaming.api.operators.async.queue.AsyncResult;
import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueueEntry;
import org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry;
import org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue;
import org.apache.flink.streaming.api.operators.async.queue.WatermarkQueueEntry;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class UnorderedStreamElementQueueTest
extends TestLogger {
    private static final long timeout = 10000L;
    private static ExecutorService executor;

    @BeforeClass
    public static void setup() {
        executor = Executors.newFixedThreadPool(3);
    }

    @AfterClass
    public static void shutdown() {
        executor.shutdown();
        try {
            if (!executor.awaitTermination(10000L, TimeUnit.MILLISECONDS)) {
                executor.shutdownNow();
            }
        }
        catch (InterruptedException interrupted) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    @Test
    public void testCompletionOrder() throws Exception {
        OperatorActions operatorActions = (OperatorActions)Mockito.mock(OperatorActions.class);
        UnorderedStreamElementQueue queue = new UnorderedStreamElementQueue(8, (Executor)executor, operatorActions);
        StreamRecordQueueEntry record1 = new StreamRecordQueueEntry(new StreamRecord((Object)1, 0L));
        StreamRecordQueueEntry record2 = new StreamRecordQueueEntry(new StreamRecord((Object)2, 1L));
        WatermarkQueueEntry watermark1 = new WatermarkQueueEntry(new Watermark(2L));
        StreamRecordQueueEntry record3 = new StreamRecordQueueEntry(new StreamRecord((Object)3, 3L));
        StreamRecordQueueEntry record4 = new StreamRecordQueueEntry(new StreamRecord((Object)4, 4L));
        WatermarkQueueEntry watermark2 = new WatermarkQueueEntry(new Watermark(5L));
        StreamRecordQueueEntry record5 = new StreamRecordQueueEntry(new StreamRecord((Object)5, 6L));
        StreamRecordQueueEntry record6 = new StreamRecordQueueEntry(new StreamRecord((Object)6, 7L));
        List<StreamElementQueueEntry> entries = Arrays.asList(record1, record2, watermark1, record3, record4, watermark2, record5, record6);
        for (StreamElementQueueEntry entry : entries) {
            queue.put(entry);
        }
        Assert.assertTrue((8 == queue.size() ? 1 : 0) != 0);
        CompletableFuture<AsyncResult> firstPoll = CompletableFuture.supplyAsync(() -> {
            try {
                return queue.poll();
            }
            catch (InterruptedException e) {
                throw new CompletionException(e);
            }
        }, executor);
        record3.complete(Collections.emptyList());
        Thread.sleep(10L);
        Assert.assertFalse((boolean)firstPoll.isDone());
        record2.complete(Collections.emptyList());
        Assert.assertEquals((Object)record2, (Object)firstPoll.get());
        CompletableFuture<AsyncResult> secondPoll = CompletableFuture.supplyAsync(() -> {
            try {
                return queue.poll();
            }
            catch (InterruptedException e) {
                throw new CompletionException(e);
            }
        }, executor);
        record6.complete(Collections.emptyList());
        record4.complete(Collections.emptyList());
        Thread.sleep(10L);
        Assert.assertFalse((boolean)secondPoll.isDone());
        record1.complete(Collections.emptyList());
        Assert.assertEquals((Object)record1, (Object)secondPoll.get());
        Assert.assertEquals((Object)watermark1, (Object)queue.poll());
        HashSet<StreamRecordQueueEntry> expected = new HashSet<StreamRecordQueueEntry>(2);
        expected.add(record3);
        expected.add(record4);
        HashSet<AsyncResult> actual = new HashSet<AsyncResult>(2);
        actual.add(queue.poll());
        actual.add(queue.poll());
        Assert.assertEquals(expected, actual);
        Assert.assertEquals((Object)watermark2, (Object)queue.poll());
        Assert.assertEquals((Object)record6, (Object)queue.poll());
        Assert.assertTrue((1 == queue.size() ? 1 : 0) != 0);
        CompletableFuture<AsyncResult> thirdPoll = CompletableFuture.supplyAsync(() -> {
            try {
                return queue.poll();
            }
            catch (InterruptedException e) {
                throw new CompletionException(e);
            }
        }, executor);
        Thread.sleep(10L);
        Assert.assertFalse((boolean)thirdPoll.isDone());
        record5.complete(Collections.emptyList());
        Assert.assertEquals((Object)record5, (Object)thirdPoll.get());
        Assert.assertTrue((boolean)queue.isEmpty());
        ((OperatorActions)Mockito.verify((Object)operatorActions, (VerificationMode)Mockito.never())).failOperator((Throwable)Matchers.any(Exception.class));
    }
}

