/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc;

import akka.actor.ActorSystem;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;

public class AsyncCallsTest
extends TestLogger {
    private static final ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
    private static final Time timeout = Time.seconds((long)10L);
    private static final AkkaRpcService akkaRpcService = new AkkaRpcService(actorSystem, Time.milliseconds((long)10000L));

    @AfterClass
    public static void shutdown() {
        akkaRpcService.stopService();
        actorSystem.shutdown();
    }

    @Test
    public void testScheduleWithNoDelay() throws Exception {
        final ReentrantLock lock = new ReentrantLock();
        final AtomicBoolean concurrentAccess = new AtomicBoolean(false);
        TestEndpoint testEndpoint = new TestEndpoint((RpcService)akkaRpcService, lock);
        testEndpoint.start();
        TestGateway gateway = (TestGateway)testEndpoint.getSelfGateway(TestGateway.class);
        gateway.someCall();
        gateway.anotherCall();
        gateway.someCall();
        for (int i = 0; i < 10000; ++i) {
            testEndpoint.runAsync(new Runnable(){

                @Override
                public void run() {
                    boolean holdsLock = lock.tryLock();
                    if (holdsLock) {
                        lock.unlock();
                    } else {
                        concurrentAccess.set(true);
                    }
                }
            });
        }
        CompletableFuture result = testEndpoint.callAsync(() -> {
            boolean holdsLock = lock.tryLock();
            if (holdsLock) {
                lock.unlock();
            } else {
                concurrentAccess.set(true);
            }
            return "test";
        }, Time.seconds((long)30L));
        String str = (String)result.get(30L, TimeUnit.SECONDS);
        Assert.assertEquals((Object)"test", (Object)str);
        Assert.assertFalse((String)"Rpc Endpoint had concurrent access", (boolean)testEndpoint.hasConcurrentAccess());
        Assert.assertFalse((String)"Rpc Endpoint had concurrent access", (boolean)concurrentAccess.get());
        testEndpoint.shutDown();
    }

    @Test
    public void testScheduleWithDelay() throws Exception {
        final ReentrantLock lock = new ReentrantLock();
        final AtomicBoolean concurrentAccess = new AtomicBoolean(false);
        final OneShotLatch latch = new OneShotLatch();
        long delay = 100L;
        TestEndpoint testEndpoint = new TestEndpoint((RpcService)akkaRpcService, lock);
        testEndpoint.start();
        testEndpoint.runAsync(new Runnable(){

            @Override
            public void run() {
                boolean holdsLock = lock.tryLock();
                if (holdsLock) {
                    lock.unlock();
                } else {
                    concurrentAccess.set(true);
                }
            }
        });
        long start = System.nanoTime();
        testEndpoint.scheduleRunAsync(new Runnable(){

            @Override
            public void run() {
                boolean holdsLock = lock.tryLock();
                if (holdsLock) {
                    lock.unlock();
                } else {
                    concurrentAccess.set(true);
                }
                latch.trigger();
            }
        }, 100L, TimeUnit.MILLISECONDS);
        latch.await();
        long stop = System.nanoTime();
        Assert.assertFalse((String)"Rpc Endpoint had concurrent access", (boolean)testEndpoint.hasConcurrentAccess());
        Assert.assertFalse((String)"Rpc Endpoint had concurrent access", (boolean)concurrentAccess.get());
        Assert.assertTrue((String)"call was not properly delayed", ((stop - start) / 1000000L >= 100L ? 1 : 0) != 0);
    }

    @Test
    public void testRunAsyncWithFencing() throws Exception {
        Time shortTimeout = Time.milliseconds((long)100L);
        UUID newFencingToken = UUID.randomUUID();
        CompletableFuture resultFuture = new CompletableFuture();
        AsyncCallsTest.testRunAsync(endpoint -> {
            endpoint.runAsync(() -> resultFuture.complete(endpoint.getFencingToken()));
            return resultFuture;
        }, newFencingToken);
        try {
            resultFuture.get(shortTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            Assert.fail((String)"The async run operation should not complete since it is filtered out due to the changed fencing token.");
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
    }

    @Test
    public void testRunAsyncWithoutFencing() throws Exception {
        CompletableFuture resultFuture = new CompletableFuture();
        UUID newFencingToken = UUID.randomUUID();
        AsyncCallsTest.testRunAsync(endpoint -> {
            endpoint.runAsyncWithoutFencing(() -> resultFuture.complete(endpoint.getFencingToken()));
            return resultFuture;
        }, newFencingToken);
        Assert.assertEquals((Object)newFencingToken, resultFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS));
    }

    @Test
    public void testCallAsyncWithFencing() throws Exception {
        UUID newFencingToken = UUID.randomUUID();
        CompletableFuture resultFuture = AsyncCallsTest.testRunAsync(endpoint -> endpoint.callAsync(() -> true, timeout), newFencingToken);
        try {
            resultFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            Assert.fail((String)"The async call operation should fail due to the changed fencing token.");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(ExceptionUtils.stripExecutionException((Throwable)e) instanceof FencingTokenException));
        }
    }

    @Test
    public void testCallAsyncWithoutFencing() throws Exception {
        UUID newFencingToken = UUID.randomUUID();
        CompletableFuture resultFuture = AsyncCallsTest.testRunAsync(endpoint -> endpoint.callAsyncWithoutFencing(() -> true, timeout), newFencingToken);
        Assert.assertTrue((boolean)((Boolean)resultFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS)));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static <T> CompletableFuture<T> testRunAsync(Function<FencedTestEndpoint, CompletableFuture<T>> runAsyncCall, UUID newFencingToken) throws Exception {
        UUID initialFencingToken = UUID.randomUUID();
        OneShotLatch enterSetNewFencingToken = new OneShotLatch();
        OneShotLatch triggerSetNewFencingToken = new OneShotLatch();
        FencedTestEndpoint fencedTestEndpoint = new FencedTestEndpoint((RpcService)akkaRpcService, initialFencingToken, enterSetNewFencingToken, triggerSetNewFencingToken);
        FencedTestGateway fencedTestGateway = (FencedTestGateway)fencedTestEndpoint.getSelfGateway(FencedTestGateway.class);
        try {
            fencedTestEndpoint.start();
            CompletableFuture<Acknowledge> newFencingTokenFuture = fencedTestGateway.setNewFencingToken(newFencingToken, timeout);
            Assert.assertFalse((boolean)newFencingTokenFuture.isDone());
            Assert.assertEquals((Object)initialFencingToken, (Object)fencedTestEndpoint.getFencingToken());
            CompletableFuture<T> result = runAsyncCall.apply(fencedTestEndpoint);
            enterSetNewFencingToken.await();
            triggerSetNewFencingToken.trigger();
            newFencingTokenFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            CompletableFuture<T> completableFuture = result;
            return completableFuture;
        }
        finally {
            fencedTestEndpoint.shutDown();
            fencedTestEndpoint.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        }
    }

    public static class FencedTestEndpoint
    extends FencedRpcEndpoint<UUID>
    implements FencedTestGateway {
        private final OneShotLatch enteringSetNewFencingToken;
        private final OneShotLatch triggerSetNewFencingToken;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected FencedTestEndpoint(RpcService rpcService, UUID initialFencingToken, OneShotLatch enteringSetNewFencingToken, OneShotLatch triggerSetNewFencingToken) {
            super(rpcService);
            this.enteringSetNewFencingToken = enteringSetNewFencingToken;
            this.triggerSetNewFencingToken = triggerSetNewFencingToken;
            this.currentMainThread.set(Thread.currentThread());
            try {
                this.setFencingToken(initialFencingToken);
            }
            finally {
                this.currentMainThread.set(null);
            }
        }

        @Override
        public CompletableFuture<Acknowledge> setNewFencingToken(UUID fencingToken, Time timeout) {
            this.enteringSetNewFencingToken.trigger();
            try {
                this.triggerSetNewFencingToken.await();
            }
            catch (InterruptedException e) {
                throw new RuntimeException("TriggerSetNewFencingToken OneShotLatch was interrupted.");
            }
            this.setFencingToken(fencingToken);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }
    }

    public static interface FencedTestGateway
    extends FencedRpcGateway<UUID> {
        public CompletableFuture<Acknowledge> setNewFencingToken(UUID var1, @RpcTimeout Time var2);
    }

    public static class TestEndpoint
    extends RpcEndpoint
    implements TestGateway {
        private final ReentrantLock lock;
        private volatile boolean concurrentAccess;

        public TestEndpoint(RpcService rpcService, ReentrantLock lock) {
            super(rpcService);
            this.lock = lock;
        }

        @Override
        public void someCall() {
            boolean holdsLock = this.lock.tryLock();
            if (holdsLock) {
                this.lock.unlock();
            } else {
                this.concurrentAccess = true;
            }
        }

        @Override
        public void anotherCall() {
            boolean holdsLock = this.lock.tryLock();
            if (holdsLock) {
                this.lock.unlock();
            } else {
                this.concurrentAccess = true;
            }
        }

        public boolean hasConcurrentAccess() {
            return this.concurrentAccess;
        }
    }

    public static interface TestGateway
    extends RpcGateway {
        public void someCall();

        public void anotherCall();
    }
}

