/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmanager;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.ExtendedActorSystem;
import akka.actor.Identify;
import akka.pattern.Patterns;
import akka.serialization.JavaSerializer;
import java.util.Collection;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.akka.ActorUtils;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.dispatcher.DispatcherHATest;
import org.apache.flink.runtime.dispatcher.NoOpSubmittedJobGraphListener;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.testingUtils.TestingJobManager;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
import org.apache.flink.shaded.curator.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

public class ZooKeeperHAJobManagerTest
extends TestLogger {
    @ClassRule
    public static final ZooKeeperResource ZOO_KEEPER_RESOURCE = new ZooKeeperResource();
    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    private static final FiniteDuration TIMEOUT = FiniteDuration.apply((long)10L, (TimeUnit)TimeUnit.SECONDS);
    private static ActorSystem system;

    @BeforeClass
    public static void setup() {
        system = AkkaUtils.createLocalActorSystem((Configuration)new Configuration());
    }

    @AfterClass
    public static void teardown() throws Exception {
        Future terminationFuture = system.terminate();
        Await.ready((Awaitable)terminationFuture, (Duration)TIMEOUT);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testJobGraphReleaseWhenLosingLeadership() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, ZOO_KEEPER_RESOURCE.getConnectString());
        configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
        try (TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();){
            CuratorFramework client = ZooKeeperUtils.startCuratorFramework((Configuration)configuration);
            TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
            highAvailabilityServices.setJobMasterLeaderElectionService(HighAvailabilityServices.DEFAULT_JOB_ID, leaderElectionService);
            highAvailabilityServices.setSubmittedJobGraphStore((SubmittedJobGraphStore)ZooKeeperUtils.createSubmittedJobGraphs((CuratorFramework)client, (Configuration)configuration));
            highAvailabilityServices.setCheckpointRecoveryFactory((CheckpointRecoveryFactory)new StandaloneCheckpointRecoveryFactory());
            CuratorFramework otherClient = ZooKeeperUtils.startCuratorFramework((Configuration)configuration);
            ZooKeeperSubmittedJobGraphStore otherSubmittedJobGraphStore = ZooKeeperUtils.createSubmittedJobGraphs((CuratorFramework)otherClient, (Configuration)configuration);
            otherSubmittedJobGraphStore.start((SubmittedJobGraphStore.SubmittedJobGraphListener)NoOpSubmittedJobGraphListener.INSTANCE);
            ActorRef jobManagerActorRef = null;
            try {
                jobManagerActorRef = (ActorRef)JobManager.startJobManagerActors((Configuration)configuration, (ActorSystem)system, (ScheduledExecutorService)TestingUtils.defaultExecutor(), (Executor)TestingUtils.defaultExecutor(), (HighAvailabilityServices)highAvailabilityServices, (MetricRegistry)NoOpMetricRegistry.INSTANCE, (Option)Option.empty(), TestingJobManager.class, MemoryArchivist.class)._1();
                this.waitForActorToBeStarted(jobManagerActorRef, TIMEOUT);
                AkkaActorGateway jobManager = new AkkaActorGateway(jobManagerActorRef, HighAvailabilityServices.DEFAULT_LEADER_ID);
                leaderElectionService.isLeader(HighAvailabilityServices.DEFAULT_LEADER_ID).get();
                JobGraph nonEmptyJobGraph = DispatcherHATest.createNonEmptyJobGraph();
                JobManagerMessages.SubmitJob submitJobMessage = new JobManagerMessages.SubmitJob(nonEmptyJobGraph, ListeningBehaviour.DETACHED);
                Await.result((Awaitable)jobManager.ask((Object)submitJobMessage, TIMEOUT), (Duration)TIMEOUT);
                Collection jobIds = otherSubmittedJobGraphStore.getJobIds();
                JobID jobId = nonEmptyJobGraph.getJobID();
                Assert.assertThat((Object)jobIds, (Matcher)Matchers.contains((Object[])new JobID[]{jobId}));
                leaderElectionService.notLeader();
                Await.result((Awaitable)jobManager.ask(TestingJobManagerMessages.getWaitForBackgroundTasksToFinish(), TIMEOUT), (Duration)TIMEOUT);
                SubmittedJobGraph recoveredJobGraph = (SubmittedJobGraph)JavaSerializer.currentSystem().withValue((ExtendedActorSystem)system, () -> otherSubmittedJobGraphStore.recoverJobGraph(jobId));
                Assert.assertThat((Object)recoveredJobGraph, (Matcher)Matchers.is((Matcher)Matchers.notNullValue()));
                otherSubmittedJobGraphStore.removeJobGraph(jobId);
                jobIds = otherSubmittedJobGraphStore.getJobIds();
                Assert.assertThat((Object)jobIds, (Matcher)Matchers.not((Matcher)Matchers.contains((Object[])new JobID[]{jobId})));
            }
            catch (Throwable throwable) {
                client.close();
                otherClient.close();
                if (jobManagerActorRef != null) {
                    ActorUtils.stopActor(jobManagerActorRef);
                }
                throw throwable;
            }
            client.close();
            otherClient.close();
            if (jobManagerActorRef != null) {
                ActorUtils.stopActor((ActorRef)jobManagerActorRef);
            }
        }
    }

    private void waitForActorToBeStarted(ActorRef jobManagerActorRef, FiniteDuration timeout) throws InterruptedException, TimeoutException {
        Await.ready((Awaitable)Patterns.ask((ActorRef)jobManagerActorRef, (Object)new Identify((Object)42), (long)timeout.toMillis()), (Duration)timeout);
    }
}

