/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.concurrent;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

public class FutureUtilsTest
extends TestLogger {
    @Test
    public void testRetrySuccess() throws Exception {
        int retries = 10;
        AtomicInteger atomicInteger = new AtomicInteger(0);
        CompletableFuture retryFuture = FutureUtils.retry(() -> CompletableFuture.supplyAsync(() -> {
            if (atomicInteger.incrementAndGet() == 10) {
                return true;
            }
            throw new CompletionException((Throwable)new FlinkException("Test exception"));
        }, TestingUtils.defaultExecutor()), (int)10, (Executor)TestingUtils.defaultExecutor());
        Assert.assertTrue((boolean)((Boolean)retryFuture.get()));
        Assert.assertTrue((10 == atomicInteger.get() ? 1 : 0) != 0);
    }

    @Test(expected=FutureUtils.RetryException.class)
    public void testRetryFailure() throws Throwable {
        int retries = 3;
        CompletableFuture retryFuture = FutureUtils.retry(() -> FutureUtils.completedExceptionally((Throwable)new FlinkException("Test exception")), (int)3, (Executor)TestingUtils.defaultExecutor());
        try {
            retryFuture.get();
        }
        catch (ExecutionException ee) {
            throw ExceptionUtils.stripExecutionException((Throwable)ee);
        }
    }

    @Test
    public void testRetryCancellation() throws Exception {
        int retries = 10;
        AtomicInteger atomicInteger = new AtomicInteger(0);
        OneShotLatch notificationLatch = new OneShotLatch();
        OneShotLatch waitLatch = new OneShotLatch();
        AtomicReference<Object> atomicThrowable = new AtomicReference<Object>(null);
        CompletableFuture retryFuture = FutureUtils.retry(() -> CompletableFuture.supplyAsync(() -> {
            if (atomicInteger.incrementAndGet() == 2) {
                notificationLatch.trigger();
                try {
                    waitLatch.await();
                }
                catch (InterruptedException e) {
                    atomicThrowable.compareAndSet(null, e);
                }
            }
            throw new CompletionException((Throwable)new FlinkException("Test exception"));
        }, TestingUtils.defaultExecutor()), (int)10, (Executor)TestingUtils.defaultExecutor());
        notificationLatch.await();
        Assert.assertFalse((boolean)retryFuture.isDone());
        retryFuture.cancel(false);
        waitLatch.trigger();
        Assert.assertTrue((boolean)retryFuture.isCancelled());
        Assert.assertEquals((long)2L, (long)atomicInteger.get());
        if (atomicThrowable.get() != null) {
            throw new FlinkException("Exception occurred in the retry operation.", (Throwable)atomicThrowable.get());
        }
    }

    @Test(expected=FutureUtils.RetryException.class)
    public void testRetryWithDelayFailure() throws Throwable {
        CompletableFuture retryFuture = FutureUtils.retryWithDelay(() -> FutureUtils.completedExceptionally((Throwable)new FlinkException("Test exception")), (int)3, (Time)Time.milliseconds((long)1L), (ScheduledExecutor)TestingUtils.defaultScheduledExecutor());
        try {
            retryFuture.get(TestingUtils.TIMEOUT().toMilliseconds(), TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException ee) {
            throw ExceptionUtils.stripExecutionException((Throwable)ee);
        }
    }

    @Test
    public void testRetryWithDelay() throws Exception {
        int retries = 4;
        Time delay = Time.milliseconds((long)50L);
        AtomicInteger countDown = new AtomicInteger(4);
        CompletableFuture retryFuture = FutureUtils.retryWithDelay(() -> {
            if (countDown.getAndDecrement() == 0) {
                return CompletableFuture.completedFuture(true);
            }
            return FutureUtils.completedExceptionally((Throwable)new FlinkException("Test exception."));
        }, (int)4, (Time)delay, (ScheduledExecutor)TestingUtils.defaultScheduledExecutor());
        long start = System.currentTimeMillis();
        Boolean result = (Boolean)retryFuture.get();
        long completionTime = System.currentTimeMillis() - start;
        Assert.assertTrue((boolean)result);
        Assert.assertTrue((String)"The completion time should be at least rertries times delay between retries.", (completionTime >= 4L * delay.toMilliseconds() ? 1 : 0) != 0);
    }

    @Test
    public void testRetryWithDelayCancellation() {
        ScheduledFuture scheduledFutureMock = (ScheduledFuture)Mockito.mock(ScheduledFuture.class);
        ScheduledExecutor scheduledExecutorMock = (ScheduledExecutor)Mockito.mock(ScheduledExecutor.class);
        ((ScheduledExecutor)Mockito.doReturn((Object)scheduledFutureMock).when((Object)scheduledExecutorMock)).schedule((Runnable)Matchers.any(Runnable.class), Matchers.anyLong(), (TimeUnit)((Object)Matchers.any(TimeUnit.class)));
        ((ScheduledExecutor)Mockito.doAnswer(invocation -> {
            ((Runnable)invocation.getArgumentAt(0, Runnable.class)).run();
            return null;
        }).when((Object)scheduledExecutorMock)).execute((Runnable)Matchers.any(Runnable.class));
        CompletableFuture retryFuture = FutureUtils.retryWithDelay(() -> FutureUtils.completedExceptionally((Throwable)new FlinkException("Test exception")), (int)1, (Time)TestingUtils.infiniteTime(), (ScheduledExecutor)scheduledExecutorMock);
        Assert.assertFalse((boolean)retryFuture.isDone());
        ((ScheduledExecutor)Mockito.verify((Object)scheduledExecutorMock)).schedule((Runnable)Matchers.any(Runnable.class), Matchers.anyLong(), (TimeUnit)((Object)Matchers.any(TimeUnit.class)));
        retryFuture.cancel(false);
        Assert.assertTrue((boolean)retryFuture.isCancelled());
        ((ScheduledFuture)Mockito.verify((Object)scheduledFutureMock)).cancel(Matchers.anyBoolean());
    }

    @Test
    public void testOrTimeout() throws Exception {
        CompletableFuture future = new CompletableFuture();
        long timeout = 10L;
        FutureUtils.orTimeout(future, (long)10L, (TimeUnit)TimeUnit.MILLISECONDS);
        try {
            future.get();
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(ExceptionUtils.stripExecutionException((Throwable)e) instanceof TimeoutException));
        }
    }
}

