/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.leaderelection;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.RobustActorSystem;
import akka.pattern.Patterns;
import akka.testkit.JavaTestKit;
import akka.util.Timeout;
import com.typesafe.config.Config;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.curator.test.TestingServer;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.PermanentBlobService;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.instance.InstanceManager;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.testingUtils.TestingJobManager;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.shaded.curator.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

public class JobManagerLeaderElectionTest
extends TestLogger {
    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();
    private static ActorSystem actorSystem;
    private static TestingServer testingServer;
    private static Timeout timeout;
    private static FiniteDuration duration;

    @BeforeClass
    public static void setup() throws Exception {
        actorSystem = RobustActorSystem.create((String)"TestingActorSystem", (Config)TestingUtils.getDefaultTestingActorSystemConfig());
        testingServer = new TestingServer();
    }

    @AfterClass
    public static void teardown() throws Exception {
        if (actorSystem != null) {
            JavaTestKit.shutdownActorSystem((ActorSystem)actorSystem);
        }
        if (testingServer != null) {
            testingServer.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testLeaderElection() throws Exception {
        Configuration configuration = ZooKeeperTestUtils.createZooKeeperHAConfig(testingServer.getConnectString(), this.tempFolder.getRoot().getPath());
        ActorRef jm = null;
        try {
            Props jmProps = this.createJobManagerProps(configuration);
            jm = actorSystem.actorOf(jmProps);
            Future leaderFuture = Patterns.ask((ActorRef)jm, (Object)TestingJobManagerMessages.getNotifyWhenLeader(), (Timeout)timeout);
            Await.ready((Awaitable)leaderFuture, (Duration)duration);
        }
        catch (Throwable throwable) {
            TestingUtils.stopActor(jm);
            throw throwable;
        }
        TestingUtils.stopActor(jm);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testLeaderReelection() throws Exception {
        Configuration configuration = ZooKeeperTestUtils.createZooKeeperHAConfig(testingServer.getConnectString(), this.tempFolder.getRoot().getPath());
        ActorRef jm2 = null;
        try {
            Props jmProps = this.createJobManagerProps(configuration);
            ActorRef jm = actorSystem.actorOf(jmProps);
            Future leaderFuture = Patterns.ask((ActorRef)jm, (Object)TestingJobManagerMessages.getNotifyWhenLeader(), (Timeout)timeout);
            Await.ready((Awaitable)leaderFuture, (Duration)duration);
            Props jmProps2 = this.createJobManagerProps(configuration);
            jm2 = actorSystem.actorOf(jmProps2);
            jm.tell((Object)PoisonPill.getInstance(), ActorRef.noSender());
            Future leader2Future = Patterns.ask((ActorRef)jm2, (Object)TestingJobManagerMessages.getNotifyWhenLeader(), (Timeout)timeout);
            Await.ready((Awaitable)leader2Future, (Duration)duration);
        }
        catch (Throwable throwable) {
            TestingUtils.stopActor(jm2);
            throw throwable;
        }
        TestingUtils.stopActor(jm2);
    }

    private Props createJobManagerProps(Configuration configuration) throws Exception {
        StandaloneLeaderElectionService leaderElectionService;
        if (HighAvailabilityMode.fromConfig((Configuration)configuration) == HighAvailabilityMode.NONE) {
            leaderElectionService = new StandaloneLeaderElectionService();
        } else {
            CuratorFramework client = ZooKeeperUtils.startCuratorFramework((Configuration)configuration);
            leaderElectionService = ZooKeeperUtils.createLeaderElectionService((CuratorFramework)client, (Configuration)configuration);
        }
        StandaloneSubmittedJobGraphStore submittedJobGraphStore = new StandaloneSubmittedJobGraphStore();
        StandaloneCheckpointRecoveryFactory checkpointRecoveryFactory = new StandaloneCheckpointRecoveryFactory();
        configuration.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
        BlobServer blobServer = new BlobServer(configuration, (BlobStore)new VoidBlobStore());
        blobServer.start();
        return Props.create(TestingJobManager.class, (Object[])new Object[]{configuration, TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), new InstanceManager(), new Scheduler((Executor)TestingUtils.defaultExecutionContext()), blobServer, new BlobLibraryCacheManager((PermanentBlobService)blobServer, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST, new String[0]), ActorRef.noSender(), new NoRestartStrategy.NoRestartStrategyFactory(), AkkaUtils.getDefaultTimeoutAsFiniteDuration(), leaderElectionService, submittedJobGraphStore, checkpointRecoveryFactory, AkkaUtils.getDefaultTimeoutAsFiniteDuration(), UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(), Option.empty()});
    }

    static {
        timeout = new Timeout(TestingUtils.TESTING_DURATION());
        duration = new FiniteDuration(5L, TimeUnit.MINUTES);
    }
}

