/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher;

import java.io.IOException;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.BlobStoreService;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherHATest;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
import org.apache.flink.runtime.dispatcher.NoOpSubmittedJobGraphListener;
import org.apache.flink.runtime.dispatcher.TestingDispatcher;
import org.apache.flink.runtime.dispatcher.TestingJobManagerRunnerFactory;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.util.LeaderConnectionInfo;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
import org.apache.flink.shaded.curator.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;

public class ZooKeeperHADispatcherTest
extends TestLogger {
    private static final Time TIMEOUT = Time.seconds((long)10L);
    @Rule
    public final ZooKeeperResource zooKeeperResource = new ZooKeeperResource();
    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    private static Configuration configuration;
    private static TestingRpcService rpcService;
    private static BlobServer blobServer;
    @Rule
    public TestName name = new TestName();
    private TestingFatalErrorHandler testingFatalErrorHandler;

    @BeforeClass
    public static void setupClass() throws IOException {
        configuration = new Configuration();
        configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
        rpcService = new TestingRpcService();
        blobServer = new BlobServer(configuration, (BlobStore)new VoidBlobStore());
    }

    @Before
    public void setup() throws Exception {
        configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, this.zooKeeperResource.getConnectString());
        this.testingFatalErrorHandler = new TestingFatalErrorHandler();
    }

    @After
    public void teardown() throws Exception {
        if (this.testingFatalErrorHandler != null) {
            this.testingFatalErrorHandler.rethrowError();
        }
    }

    @AfterClass
    public static void teardownClass() throws Exception {
        if (rpcService != null) {
            RpcUtils.terminateRpcService((RpcService)rpcService, (Time)TIMEOUT);
            rpcService = null;
        }
        if (blobServer != null) {
            blobServer.close();
            blobServer = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSubmittedJobGraphRelease() throws Exception {
        CuratorFramework client = ZooKeeperUtils.startCuratorFramework((Configuration)configuration);
        CuratorFramework otherClient = ZooKeeperUtils.startCuratorFramework((Configuration)configuration);
        try (TestingHighAvailabilityServices testingHighAvailabilityServices = new TestingHighAvailabilityServices();){
            testingHighAvailabilityServices.setSubmittedJobGraphStore((SubmittedJobGraphStore)ZooKeeperUtils.createSubmittedJobGraphs((CuratorFramework)client, (Configuration)configuration));
            ZooKeeperSubmittedJobGraphStore otherSubmittedJobGraphStore = ZooKeeperUtils.createSubmittedJobGraphs((CuratorFramework)otherClient, (Configuration)configuration);
            otherSubmittedJobGraphStore.start((SubmittedJobGraphStore.SubmittedJobGraphListener)NoOpSubmittedJobGraphListener.INSTANCE);
            TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
            testingHighAvailabilityServices.setDispatcherLeaderElectionService(leaderElectionService);
            TestingDispatcher dispatcher = this.createDispatcher(testingHighAvailabilityServices, new TestingJobManagerRunnerFactory(new CompletableFuture<JobGraph>(), new CompletableFuture<ArchivedExecutionGraph>(), CompletableFuture.completedFuture(null)));
            dispatcher.start();
            try {
                DispatcherId expectedLeaderId = DispatcherId.generate();
                leaderElectionService.isLeader(expectedLeaderId.toUUID()).get();
                DispatcherGateway dispatcherGateway = (DispatcherGateway)dispatcher.getSelfGateway(DispatcherGateway.class);
                JobGraph nonEmptyJobGraph = DispatcherHATest.createNonEmptyJobGraph();
                CompletableFuture submissionFuture = dispatcherGateway.submitJob(nonEmptyJobGraph, TIMEOUT);
                submissionFuture.get();
                Collection jobIds = otherSubmittedJobGraphStore.getJobIds();
                JobID jobId = nonEmptyJobGraph.getJobID();
                Assert.assertThat((Object)jobIds, (Matcher)Matchers.contains((Object[])new JobID[]{jobId}));
                leaderElectionService.notLeader();
                CompletableFuture<Void> jobTerminationFuture = dispatcher.getJobTerminationFuture(jobId, TIMEOUT);
                jobTerminationFuture.get();
                SubmittedJobGraph submittedJobGraph = otherSubmittedJobGraphStore.recoverJobGraph(jobId);
                Assert.assertThat((Object)submittedJobGraph, (Matcher)Matchers.is((Matcher)Matchers.notNullValue()));
                otherSubmittedJobGraphStore.removeJobGraph(jobId);
                jobIds = otherSubmittedJobGraphStore.getJobIds();
                Assert.assertThat((Object)jobIds, (Matcher)Matchers.not((Matcher)Matchers.contains((Object[])new JobID[]{jobId})));
            }
            finally {
                RpcUtils.terminateRpcEndpoint((RpcEndpoint)dispatcher, (Time)TIMEOUT);
                client.close();
                otherClient.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testStandbyDispatcherJobExecution() throws Exception {
        try (TestingHighAvailabilityServices haServices1 = new TestingHighAvailabilityServices();
             TestingHighAvailabilityServices haServices2 = new TestingHighAvailabilityServices();
             CuratorFramework curatorFramework = ZooKeeperUtils.startCuratorFramework((Configuration)configuration);){
            ZooKeeperSubmittedJobGraphStore submittedJobGraphStore1 = ZooKeeperUtils.createSubmittedJobGraphs((CuratorFramework)curatorFramework, (Configuration)configuration);
            haServices1.setSubmittedJobGraphStore((SubmittedJobGraphStore)submittedJobGraphStore1);
            TestingLeaderElectionService leaderElectionService1 = new TestingLeaderElectionService();
            haServices1.setDispatcherLeaderElectionService(leaderElectionService1);
            ZooKeeperSubmittedJobGraphStore submittedJobGraphStore2 = ZooKeeperUtils.createSubmittedJobGraphs((CuratorFramework)curatorFramework, (Configuration)configuration);
            haServices2.setSubmittedJobGraphStore((SubmittedJobGraphStore)submittedJobGraphStore2);
            TestingLeaderElectionService leaderElectionService2 = new TestingLeaderElectionService();
            haServices2.setDispatcherLeaderElectionService(leaderElectionService2);
            CompletableFuture<JobGraph> jobGraphFuture = new CompletableFuture<JobGraph>();
            CompletableFuture<ArchivedExecutionGraph> resultFuture = new CompletableFuture<ArchivedExecutionGraph>();
            TestingDispatcher dispatcher1 = this.createDispatcher(haServices1, new TestingJobManagerRunnerFactory(jobGraphFuture, resultFuture, CompletableFuture.completedFuture(null)));
            TestingDispatcher dispatcher2 = this.createDispatcher(haServices2, new TestingJobManagerRunnerFactory(new CompletableFuture<JobGraph>(), new CompletableFuture<ArchivedExecutionGraph>(), CompletableFuture.completedFuture(null)));
            try {
                dispatcher1.start();
                dispatcher2.start();
                leaderElectionService1.isLeader(UUID.randomUUID()).get();
                DispatcherGateway dispatcherGateway1 = (DispatcherGateway)dispatcher1.getSelfGateway(DispatcherGateway.class);
                JobGraph jobGraph = DispatcherHATest.createNonEmptyJobGraph();
                dispatcherGateway1.submitJob(jobGraph, TIMEOUT).get();
                CompletableFuture jobResultFuture = dispatcherGateway1.requestJobResult(jobGraph.getJobID(), TIMEOUT);
                jobGraphFuture.get();
                resultFuture.complete(new ArchivedExecutionGraphBuilder().setJobID(jobGraph.getJobID()).setState(JobStatus.FINISHED).build());
                JobResult jobResult = (JobResult)jobResultFuture.get();
                Assert.assertThat((Object)jobResult.isSuccess(), (Matcher)Matchers.is((Object)true));
                dispatcher1.getJobTerminationFuture(jobGraph.getJobID(), TIMEOUT).get();
                leaderElectionService1.notLeader();
                leaderElectionService2.isLeader(UUID.randomUUID()).get();
                DispatcherGateway dispatcherGateway2 = (DispatcherGateway)dispatcher2.getSelfGateway(DispatcherGateway.class);
                Assert.assertThat(dispatcherGateway2.listJobs(TIMEOUT).get(), (Matcher)Matchers.is((Matcher)Matchers.empty()));
            }
            finally {
                RpcUtils.terminateRpcEndpoint((RpcEndpoint)dispatcher1, (Time)TIMEOUT);
                RpcUtils.terminateRpcEndpoint((RpcEndpoint)dispatcher2, (Time)TIMEOUT);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testStandbyDispatcherJobRecovery() throws Exception {
        try (CuratorFramework curatorFramework = ZooKeeperUtils.startCuratorFramework((Configuration)configuration);){
            TestingDispatcher dispatcher2;
            ZooKeeperHaServices haServices;
            block21: {
                haServices = null;
                TestingDispatcher dispatcher1 = null;
                dispatcher2 = null;
                try {
                    haServices = new ZooKeeperHaServices(curatorFramework, rpcService.getExecutor(), configuration, (BlobStoreService)new VoidBlobStore());
                    CompletableFuture<JobGraph> jobGraphFuture1 = new CompletableFuture<JobGraph>();
                    dispatcher1 = this.createDispatcher((HighAvailabilityServices)haServices, new TestingJobManagerRunnerFactory(jobGraphFuture1, new CompletableFuture<ArchivedExecutionGraph>(), CompletableFuture.completedFuture(null)));
                    CompletableFuture<JobGraph> jobGraphFuture2 = new CompletableFuture<JobGraph>();
                    dispatcher2 = this.createDispatcher((HighAvailabilityServices)haServices, new TestingJobManagerRunnerFactory(jobGraphFuture2, new CompletableFuture<ArchivedExecutionGraph>(), CompletableFuture.completedFuture(null)));
                    dispatcher1.start();
                    dispatcher2.start();
                    LeaderConnectionInfo leaderConnectionInfo = LeaderRetrievalUtils.retrieveLeaderConnectionInfo((LeaderRetrievalService)haServices.getDispatcherLeaderRetriever(), (Time)TIMEOUT);
                    DispatcherGateway dispatcherGateway = rpcService.connect(leaderConnectionInfo.getAddress(), DispatcherId.fromUuid((UUID)leaderConnectionInfo.getLeaderSessionID()), DispatcherGateway.class).get();
                    JobGraph nonEmptyJobGraph = DispatcherHATest.createNonEmptyJobGraph();
                    dispatcherGateway.submitJob(nonEmptyJobGraph, TIMEOUT).get();
                    if (dispatcher1.getAddress().equals(leaderConnectionInfo.getAddress())) {
                        dispatcher1.shutDown();
                        Assert.assertThat((Object)jobGraphFuture2.get().getJobID(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)nonEmptyJobGraph.getJobID())));
                    } else {
                        dispatcher2.shutDown();
                        Assert.assertThat((Object)jobGraphFuture1.get().getJobID(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)nonEmptyJobGraph.getJobID())));
                    }
                    if (dispatcher1 == null) break block21;
                }
                catch (Throwable throwable) {
                    if (dispatcher1 != null) {
                        RpcUtils.terminateRpcEndpoint(dispatcher1, (Time)TIMEOUT);
                    }
                    if (dispatcher2 != null) {
                        RpcUtils.terminateRpcEndpoint(dispatcher2, (Time)TIMEOUT);
                    }
                    if (haServices != null) {
                        haServices.close();
                    }
                    throw throwable;
                }
                RpcUtils.terminateRpcEndpoint((RpcEndpoint)dispatcher1, (Time)TIMEOUT);
            }
            if (dispatcher2 != null) {
                RpcUtils.terminateRpcEndpoint((RpcEndpoint)dispatcher2, (Time)TIMEOUT);
            }
            if (haServices != null) {
                haServices.close();
            }
        }
    }

    @Nonnull
    private TestingDispatcher createDispatcher(HighAvailabilityServices highAvailabilityServices, Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
        return new TestingDispatcher((RpcService)rpcService, "dispatcher_" + this.name.getMethodName() + UUID.randomUUID(), configuration, highAvailabilityServices, new TestingResourceManagerGateway(), blobServer, new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(), null, (ArchivedExecutionGraphStore)new MemoryArchivedExecutionGraphStore(), jobManagerRunnerFactory, this.testingFatalErrorHandler);
    }
}

