/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher;

import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
import org.apache.flink.runtime.dispatcher.TestingDispatcher;
import org.apache.flink.runtime.dispatcher.TestingJobManagerRunnerFactory;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

public class DispatcherHATest
extends TestLogger {
    private static final DispatcherId NULL_FENCING_TOKEN = DispatcherId.fromUuid((UUID)new UUID(0L, 0L));
    private static final Time timeout = Time.seconds((long)10L);
    private static TestingRpcService rpcService;
    private TestingFatalErrorHandler testingFatalErrorHandler;

    @BeforeClass
    public static void setupClass() {
        rpcService = new TestingRpcService();
    }

    @Before
    public void setup() {
        this.testingFatalErrorHandler = new TestingFatalErrorHandler();
    }

    @After
    public void teardown() throws Exception {
        if (this.testingFatalErrorHandler != null) {
            this.testingFatalErrorHandler.rethrowError();
        }
    }

    @AfterClass
    public static void teardownClass() throws ExecutionException, InterruptedException {
        if (rpcService != null) {
            rpcService.stopService().get();
            rpcService = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testGrantingRevokingLeadership() throws Exception {
        TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
        JobGraph nonEmptyJobGraph = DispatcherHATest.createNonEmptyJobGraph();
        SubmittedJobGraph submittedJobGraph = new SubmittedJobGraph(nonEmptyJobGraph, null);
        OneShotLatch enterGetJobIdsLatch = new OneShotLatch();
        OneShotLatch proceedGetJobIdsLatch = new OneShotLatch();
        highAvailabilityServices.setSubmittedJobGraphStore(new BlockingSubmittedJobGraphStore(submittedJobGraph, enterGetJobIdsLatch, proceedGetJobIdsLatch));
        TestingLeaderElectionService dispatcherLeaderElectionService = new TestingLeaderElectionService();
        highAvailabilityServices.setDispatcherLeaderElectionService(dispatcherLeaderElectionService);
        ArrayBlockingQueue<DispatcherId> fencingTokens = new ArrayBlockingQueue<DispatcherId>(2);
        HATestingDispatcher dispatcher = this.createHADispatcher(highAvailabilityServices, fencingTokens);
        dispatcher.start();
        try {
            UUID leaderId = UUID.randomUUID();
            dispatcherLeaderElectionService.isLeader(leaderId);
            dispatcherLeaderElectionService.notLeader();
            DispatcherId firstFencingToken = (DispatcherId)fencingTokens.take();
            Assert.assertThat((Object)firstFencingToken, (Matcher)Matchers.equalTo((Object)NULL_FENCING_TOKEN));
            enterGetJobIdsLatch.await();
            proceedGetJobIdsLatch.trigger();
            Assert.assertThat((Object)dispatcher.getNumberJobs(timeout).get(), (Matcher)Matchers.is((Object)0));
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)dispatcher, (Time)timeout);
        }
    }

    @Nonnull
    private HATestingDispatcher createHADispatcher(TestingHighAvailabilityServices highAvailabilityServices, BlockingQueue<DispatcherId> fencingTokens) throws Exception {
        Configuration configuration = new Configuration();
        return new HATestingDispatcher((RpcService)rpcService, UUID.randomUUID().toString(), configuration, highAvailabilityServices, new TestingResourceManagerGateway(), new BlobServer(configuration, (BlobStore)new VoidBlobStore()), new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(), null, (ArchivedExecutionGraphStore)new MemoryArchivedExecutionGraphStore(), new TestingJobManagerRunnerFactory(new CompletableFuture<JobGraph>(), new CompletableFuture<ArchivedExecutionGraph>(), CompletableFuture.completedFuture(null)), this.testingFatalErrorHandler, fencingTokens);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRevokeLeadershipTerminatesJobManagerRunners() throws Exception {
        TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
        highAvailabilityServices.setSubmittedJobGraphStore((SubmittedJobGraphStore)new StandaloneSubmittedJobGraphStore());
        TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
        highAvailabilityServices.setDispatcherLeaderElectionService(leaderElectionService);
        ArrayBlockingQueue<DispatcherId> fencingTokens = new ArrayBlockingQueue<DispatcherId>(2);
        HATestingDispatcher dispatcher = this.createHADispatcher(highAvailabilityServices, fencingTokens);
        dispatcher.start();
        try {
            DispatcherId expectedDispatcherId = DispatcherId.generate();
            leaderElectionService.isLeader(expectedDispatcherId.toUUID()).get();
            Assert.assertThat((Object)fencingTokens.take(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)expectedDispatcherId)));
            DispatcherGateway dispatcherGateway = (DispatcherGateway)dispatcher.getSelfGateway(DispatcherGateway.class);
            CompletableFuture submissionFuture = dispatcherGateway.submitJob(DispatcherHATest.createNonEmptyJobGraph(), timeout);
            submissionFuture.get();
            Assert.assertThat((Object)dispatcher.getNumberJobs(timeout).get(), (Matcher)Matchers.is((Object)1));
            leaderElectionService.notLeader();
            Assert.assertThat((Object)fencingTokens.take(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)NULL_FENCING_TOKEN)));
            Assert.assertThat((Object)dispatcher.getNumberJobs(timeout).get(), (Matcher)Matchers.is((Object)0));
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)dispatcher, (Time)timeout);
        }
    }

    @Nonnull
    public static JobGraph createNonEmptyJobGraph() {
        JobVertex noOpVertex = new JobVertex("NoOp vertex");
        noOpVertex.setInvokableClass(NoOpInvokable.class);
        JobGraph jobGraph = new JobGraph(new JobVertex[]{noOpVertex});
        jobGraph.setAllowQueuedScheduling(true);
        return jobGraph;
    }

    private static class BlockingSubmittedJobGraphStore
    implements SubmittedJobGraphStore {
        @Nonnull
        private final SubmittedJobGraph submittedJobGraph;
        @Nonnull
        private final OneShotLatch enterGetJobIdsLatch;
        @Nonnull
        private final OneShotLatch proceedGetJobIdsLatch;
        private boolean isStarted = false;

        private BlockingSubmittedJobGraphStore(@Nonnull SubmittedJobGraph submittedJobGraph, @Nonnull OneShotLatch enterGetJobIdsLatch, @Nonnull OneShotLatch proceedGetJobIdsLatch) {
            this.submittedJobGraph = submittedJobGraph;
            this.enterGetJobIdsLatch = enterGetJobIdsLatch;
            this.proceedGetJobIdsLatch = proceedGetJobIdsLatch;
        }

        public void start(SubmittedJobGraphStore.SubmittedJobGraphListener jobGraphListener) throws Exception {
            this.isStarted = true;
        }

        public void stop() throws Exception {
            this.isStarted = false;
        }

        @Nullable
        public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
            Preconditions.checkArgument((boolean)jobId.equals((Object)this.submittedJobGraph.getJobId()));
            return this.submittedJobGraph;
        }

        public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
            throw new UnsupportedOperationException("Should not be called.");
        }

        public void removeJobGraph(JobID jobId) throws Exception {
            throw new UnsupportedOperationException("Should not be called.");
        }

        public void releaseJobGraph(JobID jobId) throws Exception {
        }

        public Collection<JobID> getJobIds() throws Exception {
            this.enterGetJobIdsLatch.trigger();
            this.proceedGetJobIdsLatch.await();
            return Collections.singleton(this.submittedJobGraph.getJobId());
        }
    }

    private static class HATestingDispatcher
    extends TestingDispatcher {
        @Nonnull
        private final BlockingQueue<DispatcherId> fencingTokens;

        HATestingDispatcher(RpcService rpcService, String endpointId, Configuration configuration, HighAvailabilityServices highAvailabilityServices, ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, JobManagerMetricGroup jobManagerMetricGroup, @Nullable String metricQueryServicePath, ArchivedExecutionGraphStore archivedExecutionGraphStore, Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler fatalErrorHandler, @Nonnull BlockingQueue<DispatcherId> fencingTokens) throws Exception {
            super(rpcService, endpointId, configuration, highAvailabilityServices, resourceManagerGateway, blobServer, heartbeatServices, jobManagerMetricGroup, metricQueryServicePath, archivedExecutionGraphStore, jobManagerRunnerFactory, fatalErrorHandler);
            this.fencingTokens = fencingTokens;
        }

        @VisibleForTesting
        CompletableFuture<Integer> getNumberJobs(Time timeout) {
            return this.callAsyncWithoutFencing(() -> ((Collection)this.listJobs(timeout).get()).size(), timeout);
        }

        protected void setFencingToken(@Nullable DispatcherId newFencingToken) {
            super.setFencingToken((Serializable)newFencingToken);
            if (newFencingToken == null) {
                this.fencingTokens.offer(NULL_FENCING_TOKEN);
            } else {
                this.fencingTokens.offer(newFencingToken);
            }
        }
    }
}

