/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc;

import java.io.Serializable;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
import org.apache.flink.runtime.rpc.exceptions.RpcException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class FencedRpcEndpointTest
extends TestLogger {
    private static final Time timeout = Time.seconds((long)10L);
    private static RpcService rpcService;

    @BeforeClass
    public static void setup() {
        rpcService = new TestingRpcService();
    }

    @AfterClass
    public static void teardown() throws ExecutionException, InterruptedException, TimeoutException {
        if (rpcService != null) {
            rpcService.stopService();
            rpcService.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFencingTokenSetting() throws Exception {
        String value = "foobar";
        FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, "foobar");
        FencedTestingGateway fencedGateway = (FencedTestingGateway)fencedTestingEndpoint.getSelfGateway(FencedTestingGateway.class);
        try {
            fencedTestingEndpoint.start();
            Assert.assertNull((Object)fencedGateway.getFencingToken());
            Assert.assertNull((Object)fencedTestingEndpoint.getFencingToken());
            UUID newFencingToken = UUID.randomUUID();
            try {
                fencedTestingEndpoint.setFencingToken(newFencingToken);
                Assert.fail((String)"Fencing token can only be set from within the main thread.");
            }
            catch (AssertionError assertionError) {
                // empty catch block
            }
            Assert.assertNull((Object)fencedTestingEndpoint.getFencingToken());
            CompletableFuture<Acknowledge> setFencingFuture = fencedTestingEndpoint.setFencingTokenInMainThread(newFencingToken, timeout);
            setFencingFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            Assert.assertEquals((Object)newFencingToken, (Object)fencedGateway.getFencingToken());
            Assert.assertEquals((Object)newFencingToken, (Object)fencedTestingEndpoint.getFencingToken());
        }
        finally {
            fencedTestingEndpoint.shutDown();
            fencedTestingEndpoint.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFencing() throws Exception {
        UUID fencingToken = UUID.randomUUID();
        UUID wrongFencingToken = UUID.randomUUID();
        String value = "barfoo";
        FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, "barfoo", fencingToken);
        try {
            fencedTestingEndpoint.start();
            FencedTestingGateway properFencedGateway = (FencedTestingGateway)rpcService.connect(fencedTestingEndpoint.getAddress(), (Serializable)fencingToken, FencedTestingGateway.class).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            FencedTestingGateway wronglyFencedGateway = (FencedTestingGateway)rpcService.connect(fencedTestingEndpoint.getAddress(), (Serializable)wrongFencingToken, FencedTestingGateway.class).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            Assert.assertEquals((Object)"barfoo", (Object)properFencedGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS));
            try {
                wronglyFencedGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
                Assert.fail((String)"This should fail since we have the wrong fencing token.");
            }
            catch (ExecutionException e) {
                Assert.assertTrue((boolean)(ExceptionUtils.stripExecutionException((Throwable)e) instanceof FencingTokenException));
            }
            UUID newFencingToken = UUID.randomUUID();
            CompletableFuture<Acknowledge> newFencingTokenFuture = fencedTestingEndpoint.setFencingTokenInMainThread(newFencingToken, timeout);
            newFencingTokenFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            try {
                properFencedGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
                Assert.fail((String)"This should fail since we have the wrong fencing token by now.");
            }
            catch (ExecutionException e) {
                Assert.assertTrue((boolean)(ExceptionUtils.stripExecutionException((Throwable)e) instanceof FencingTokenException));
            }
        }
        finally {
            fencedTestingEndpoint.shutDown();
            fencedTestingEndpoint.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRemoteAndSelfGateways() throws Exception {
        UUID initialFencingToken = UUID.randomUUID();
        UUID newFencingToken = UUID.randomUUID();
        String value = "foobar";
        FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, "foobar", initialFencingToken);
        try {
            fencedTestingEndpoint.start();
            FencedTestingGateway selfGateway = (FencedTestingGateway)fencedTestingEndpoint.getSelfGateway(FencedTestingGateway.class);
            FencedTestingGateway remoteGateway = (FencedTestingGateway)rpcService.connect(fencedTestingEndpoint.getAddress(), (Serializable)initialFencingToken, FencedTestingGateway.class).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            Assert.assertEquals((Object)initialFencingToken, (Object)selfGateway.getFencingToken());
            Assert.assertEquals((Object)initialFencingToken, (Object)remoteGateway.getFencingToken());
            Assert.assertEquals((Object)"foobar", (Object)selfGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS));
            Assert.assertEquals((Object)"foobar", (Object)remoteGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS));
            CompletableFuture<Acknowledge> newFencingTokenFuture = fencedTestingEndpoint.setFencingTokenInMainThread(newFencingToken, timeout);
            newFencingTokenFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            Assert.assertEquals((Object)newFencingToken, (Object)selfGateway.getFencingToken());
            Assert.assertNotEquals((Object)newFencingToken, (Object)remoteGateway.getFencingToken());
            Assert.assertEquals((Object)"foobar", (Object)selfGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS));
            try {
                remoteGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
                Assert.fail((String)"This should have failed because we don't have the right fencing token.");
            }
            catch (ExecutionException e) {
                Assert.assertTrue((boolean)(ExceptionUtils.stripExecutionException((Throwable)e) instanceof FencingTokenException));
            }
        }
        finally {
            fencedTestingEndpoint.shutDown();
            fencedTestingEndpoint.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMainThreadExecutorUnderChangingFencingToken() throws Exception {
        Time shortTimeout = Time.milliseconds((long)100L);
        UUID initialFencingToken = UUID.randomUUID();
        String value = "foobar";
        FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, "foobar", initialFencingToken);
        try {
            fencedTestingEndpoint.start();
            FencedTestingGateway selfGateway = (FencedTestingGateway)fencedTestingEndpoint.getSelfGateway(FencedTestingGateway.class);
            CompletableFuture<Acknowledge> mainThreadExecutorComputation = selfGateway.triggerMainThreadExecutorComputation(timeout);
            UUID newFencingToken = UUID.randomUUID();
            CompletableFuture<Acknowledge> newFencingTokenFuture = fencedTestingEndpoint.setFencingTokenInMainThread(newFencingToken, timeout);
            newFencingTokenFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            CompletableFuture<Acknowledge> triggerFuture = selfGateway.triggerComputationLatch(timeout);
            triggerFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            try {
                mainThreadExecutorComputation.get(shortTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
                Assert.fail((String)"The MainThreadExecutor computation should be able to complete because it was filtered out leading to a timeout exception.");
            }
            catch (TimeoutException timeoutException) {
                // empty catch block
            }
        }
        finally {
            fencedTestingEndpoint.shutDown();
            fencedTestingEndpoint.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testUnfencedRemoteGateway() throws Exception {
        UUID initialFencingToken = UUID.randomUUID();
        String value = "foobar";
        FencedTestingEndpoint fencedTestingEndpoint = new FencedTestingEndpoint(rpcService, "foobar", initialFencingToken);
        try {
            fencedTestingEndpoint.start();
            FencedTestingGateway unfencedGateway = (FencedTestingGateway)rpcService.connect(fencedTestingEndpoint.getAddress(), FencedTestingGateway.class).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            try {
                unfencedGateway.foobar(timeout).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
                Assert.fail((String)"This should have failed because we have an unfenced gateway.");
            }
            catch (ExecutionException e) {
                Assert.assertTrue((boolean)(ExceptionUtils.stripExecutionException((Throwable)e) instanceof RpcException));
            }
            try {
                unfencedGateway.getFencingToken();
                Assert.fail((String)"We should not be able to call getFencingToken on an unfenced gateway.");
            }
            catch (UnsupportedOperationException unsupportedOperationException) {
                // empty catch block
            }
        }
        finally {
            fencedTestingEndpoint.shutDown();
            fencedTestingEndpoint.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        }
    }

    private static class FencedTestingEndpoint
    extends FencedRpcEndpoint<UUID>
    implements FencedTestingGateway {
        private final OneShotLatch computationLatch = new OneShotLatch();
        private final String value;

        protected FencedTestingEndpoint(RpcService rpcService, String value) {
            this(rpcService, value, null);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected FencedTestingEndpoint(RpcService rpcService, String value, UUID initialFencingToken) {
            super(rpcService);
            this.value = value;
            this.currentMainThread.set(Thread.currentThread());
            try {
                this.setFencingToken(initialFencingToken);
            }
            finally {
                this.currentMainThread.set(null);
            }
        }

        @Override
        public CompletableFuture<String> foobar(Time timeout) {
            return CompletableFuture.completedFuture(this.value);
        }

        @Override
        public CompletableFuture<Acknowledge> triggerMainThreadExecutorComputation(Time timeout) {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    this.computationLatch.await();
                }
                catch (InterruptedException e) {
                    throw new CompletionException((Throwable)new FlinkException("Waiting on latch failed.", (Throwable)e));
                }
                return this.value;
            }, this.getRpcService().getExecutor()).thenApplyAsync(v -> Acknowledge.get(), (Executor)this.getMainThreadExecutor());
        }

        @Override
        public CompletableFuture<Acknowledge> triggerComputationLatch(Time timeout) {
            this.computationLatch.trigger();
            return CompletableFuture.completedFuture(Acknowledge.get());
        }

        public CompletableFuture<Acknowledge> setFencingTokenInMainThread(UUID fencingToken, Time timeout) {
            return this.callAsyncWithoutFencing(() -> {
                this.setFencingToken(fencingToken);
                return Acknowledge.get();
            }, timeout);
        }
    }

    public static interface FencedTestingGateway
    extends FencedRpcGateway<UUID> {
        public CompletableFuture<String> foobar(@RpcTimeout Time var1);

        public CompletableFuture<Acknowledge> triggerMainThreadExecutorComputation(@RpcTimeout Time var1);

        public CompletableFuture<Acknowledge> triggerComputationLatch(@RpcTimeout Time var1);
    }
}

