/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest;
import org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;

public class SystemProcessingTimeServiceTest
extends TestLogger {
    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTriggerHoldsLock() throws Exception {
        final Object lock = new Object();
        AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
        SystemProcessingTimeService timer = new SystemProcessingTimeService((AsyncExceptionHandler)new TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler(errorRef), lock);
        try {
            Assert.assertEquals((long)0L, (long)timer.getNumTasksScheduled());
            ScheduledFuture future = timer.registerTimer(System.currentTimeMillis(), new ProcessingTimeCallback(){

                public void onProcessingTime(long timestamp) {
                    Assert.assertTrue((boolean)Thread.holdsLock(lock));
                }
            });
            future.get();
            Assert.assertEquals((long)0L, (long)timer.getNumTasksScheduled());
            if (errorRef.get() != null) {
                throw new Exception(errorRef.get());
            }
        }
        finally {
            timer.shutdownService();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testScheduleAtFixedRateHoldsLock() throws Exception {
        final Object lock = new Object();
        AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
        SystemProcessingTimeService timer = new SystemProcessingTimeService((AsyncExceptionHandler)new TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler(errorRef), lock);
        final OneShotLatch awaitCallback = new OneShotLatch();
        try {
            Assert.assertEquals((long)0L, (long)timer.getNumTasksScheduled());
            ScheduledFuture future = timer.scheduleAtFixedRate(new ProcessingTimeCallback(){

                public void onProcessingTime(long timestamp) {
                    Assert.assertTrue((boolean)Thread.holdsLock(lock));
                    awaitCallback.trigger();
                }
            }, 0L, 100L);
            awaitCallback.await();
            future.cancel(true);
            if (errorRef.get() != null) {
                throw new Exception(errorRef.get());
            }
        }
        finally {
            timer.shutdownService();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=10000L)
    public void testScheduleAtFixedRate() throws Exception {
        Object lock = new Object();
        AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
        long period = 10L;
        int countDown = 3;
        SystemProcessingTimeService timer = new SystemProcessingTimeService((AsyncExceptionHandler)new TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler(errorRef), lock);
        final CountDownLatch countDownLatch = new CountDownLatch(3);
        try {
            timer.scheduleAtFixedRate(new ProcessingTimeCallback(){

                public void onProcessingTime(long timestamp) throws Exception {
                    countDownLatch.countDown();
                }
            }, 0L, 10L);
            countDownLatch.await();
            if (errorRef.get() != null) {
                throw new Exception(errorRef.get());
            }
        }
        finally {
            timer.shutdownService();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testQuiesceAndAwaitingCancelsScheduledAtFixRateFuture() throws Exception {
        Object lock = new Object();
        AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
        long period = 10L;
        SystemProcessingTimeService timer = new SystemProcessingTimeService((AsyncExceptionHandler)new TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler(errorRef), lock);
        try {
            ScheduledFuture scheduledFuture = timer.scheduleAtFixedRate(new ProcessingTimeCallback(){

                public void onProcessingTime(long timestamp) throws Exception {
                }
            }, 0L, 10L);
            Assert.assertFalse((boolean)scheduledFuture.isDone());
            timer.quiesce();
            timer.awaitPendingAfterQuiesce();
            try {
                scheduledFuture.get();
                Assert.fail((String)"scheduled future is not cancelled");
            }
            catch (CancellationException cancellationException) {
                // empty catch block
            }
            scheduledFuture = timer.scheduleAtFixedRate(new ProcessingTimeCallback(){

                public void onProcessingTime(long timestamp) throws Exception {
                    throw new Exception("Test exception.");
                }
            }, 0L, 100L);
            Assert.assertNotNull((Object)scheduledFuture);
            Assert.assertEquals((long)0L, (long)timer.getNumTasksScheduled());
            if (errorRef.get() != null) {
                throw new Exception(errorRef.get());
            }
        }
        finally {
            timer.shutdownService();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testImmediateShutdown() throws Exception {
        Object lock = new Object();
        AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
        SystemProcessingTimeService timer = new SystemProcessingTimeService((AsyncExceptionHandler)new TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler(errorRef), lock);
        try {
            Assert.assertFalse((boolean)timer.isTerminated());
            final OneShotLatch latch = new OneShotLatch();
            timer.registerTimer(System.currentTimeMillis(), new ProcessingTimeCallback(){

                public void onProcessingTime(long timestamp) throws Exception {
                    latch.trigger();
                    Thread.sleep(100000000L);
                }
            });
            latch.await();
            timer.shutdownService();
            Object object = lock;
            synchronized (object) {
                Assert.assertTrue((boolean)timer.isTerminated());
            }
            try {
                timer.registerTimer(System.currentTimeMillis() + 1000L, new ProcessingTimeCallback(){

                    public void onProcessingTime(long timestamp) {
                    }
                });
                Assert.fail((String)"should result in an exception");
            }
            catch (IllegalStateException illegalStateException) {
                // empty catch block
            }
            try {
                timer.scheduleAtFixedRate(new ProcessingTimeCallback(){

                    public void onProcessingTime(long timestamp) {
                    }
                }, 0L, 100L);
                Assert.fail((String)"should result in an exception");
            }
            catch (IllegalStateException illegalStateException) {
                // empty catch block
            }
            Assert.assertNotNull((Object)errorRef.get());
            Assert.assertTrue((boolean)(errorRef.get().getCause() instanceof InterruptedException));
            Assert.assertEquals((long)0L, (long)timer.getNumTasksScheduled());
        }
        finally {
            timer.shutdownService();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testQuiescing() throws Exception {
        Object lock = new Object();
        AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
        SystemProcessingTimeService timer = new SystemProcessingTimeService((AsyncExceptionHandler)new TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler(errorRef), lock);
        try {
            final OneShotLatch latch = new OneShotLatch();
            final ReentrantLock scopeLock = new ReentrantLock();
            timer.registerTimer(timer.getCurrentProcessingTime() + 20L, new ProcessingTimeCallback(){

                public void onProcessingTime(long timestamp) throws Exception {
                    scopeLock.lock();
                    try {
                        latch.trigger();
                        Thread.sleep(5L);
                    }
                    finally {
                        scopeLock.unlock();
                    }
                }
            });
            latch.await();
            timer.quiesce();
            timer.awaitPendingAfterQuiesce();
            Assert.assertTrue((boolean)scopeLock.tryLock());
            ScheduledFuture future = timer.registerTimer(timer.getCurrentProcessingTime() - 5L, new ProcessingTimeCallback(){

                public void onProcessingTime(long timestamp) throws Exception {
                    throw new Exception("test");
                }
            });
            Assert.assertNotNull((Object)future);
            Assert.assertEquals((long)0L, (long)timer.getNumTasksScheduled());
            if (errorRef.get() != null) {
                throw new Exception(errorRef.get());
            }
        }
        finally {
            timer.shutdownService();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFutureCancellation() throws Exception {
        Object lock = new Object();
        AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
        SystemProcessingTimeService timer = new SystemProcessingTimeService((AsyncExceptionHandler)new TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler(errorRef), lock);
        try {
            Assert.assertEquals((long)0L, (long)timer.getNumTasksScheduled());
            ScheduledFuture future = timer.registerTimer(System.currentTimeMillis() + 100000000L, new ProcessingTimeCallback(){

                public void onProcessingTime(long timestamp) {
                }
            });
            Assert.assertEquals((long)1L, (long)timer.getNumTasksScheduled());
            future.cancel(false);
            Assert.assertEquals((long)0L, (long)timer.getNumTasksScheduled());
            future = timer.scheduleAtFixedRate(new ProcessingTimeCallback(){

                public void onProcessingTime(long timestamp) throws Exception {
                }
            }, 10000000000L, 50L);
            Assert.assertEquals((long)1L, (long)timer.getNumTasksScheduled());
            future.cancel(false);
            Assert.assertEquals((long)0L, (long)timer.getNumTasksScheduled());
            if (errorRef.get() != null) {
                throw new Exception(errorRef.get());
            }
        }
        finally {
            timer.shutdownService();
        }
    }

    @Test
    public void testExceptionReporting() throws InterruptedException {
        final AtomicBoolean exceptionWasThrown = new AtomicBoolean(false);
        final OneShotLatch latch = new OneShotLatch();
        Object lock = new Object();
        SystemProcessingTimeService timeServiceProvider = new SystemProcessingTimeService(new AsyncExceptionHandler(){

            public void handleAsyncException(String message, Throwable exception) {
                exceptionWasThrown.set(true);
                latch.trigger();
            }
        }, lock);
        timeServiceProvider.registerTimer(System.currentTimeMillis(), new ProcessingTimeCallback(){

            public void onProcessingTime(long timestamp) throws Exception {
                throw new Exception("Exception in Timer");
            }
        });
        latch.await();
        Assert.assertTrue((boolean)exceptionWasThrown.get());
    }

    @Test
    public void testExceptionReportingScheduleAtFixedRate() throws InterruptedException {
        final AtomicBoolean exceptionWasThrown = new AtomicBoolean(false);
        final OneShotLatch latch = new OneShotLatch();
        Object lock = new Object();
        SystemProcessingTimeService timeServiceProvider = new SystemProcessingTimeService(new AsyncExceptionHandler(){

            public void handleAsyncException(String message, Throwable exception) {
                exceptionWasThrown.set(true);
                latch.trigger();
            }
        }, lock);
        timeServiceProvider.scheduleAtFixedRate(new ProcessingTimeCallback(){

            public void onProcessingTime(long timestamp) throws Exception {
                throw new Exception("Exception in Timer");
            }
        }, 0L, 100L);
        latch.await();
        Assert.assertTrue((boolean)exceptionWasThrown.get());
    }

    @Test
    public void testShutdownAndWaitPending() {
        Object lock = new Object();
        OneShotLatch waitUntilTimerStarted = new OneShotLatch();
        OneShotLatch blockUntilTerminationInterrupts = new OneShotLatch();
        OneShotLatch blockUntilTriggered = new OneShotLatch();
        AtomicBoolean check = new AtomicBoolean(true);
        SystemProcessingTimeService timeService = new SystemProcessingTimeService((message, exception) -> {}, lock);
        timeService.scheduleAtFixedRate(timestamp -> {
            waitUntilTimerStarted.trigger();
            try {
                blockUntilTerminationInterrupts.await();
                check.set(false);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            try {
                blockUntilTriggered.await();
            }
            catch (InterruptedException ignore) {
                check.set(false);
            }
        }, 0L, 10L);
        try {
            waitUntilTimerStarted.await();
        }
        catch (InterruptedException e) {
            Assert.fail();
        }
        Assert.assertFalse((boolean)timeService.isTerminated());
        try {
            Assert.assertFalse((boolean)timeService.shutdownAndAwaitPending(1L, TimeUnit.SECONDS));
        }
        catch (InterruptedException e) {
            Assert.fail((String)"Unexpected interruption.");
        }
        blockUntilTriggered.trigger();
        try {
            Assert.assertTrue((boolean)timeService.shutdownAndAwaitPending(60L, TimeUnit.SECONDS));
        }
        catch (InterruptedException e) {
            Assert.fail((String)"Unexpected interruption.");
        }
        Assert.assertTrue((boolean)check.get());
        Assert.assertTrue((boolean)timeService.isTerminated());
    }
}

