/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmanager;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.testkit.TestActorRef;
import com.typesafe.config.Config;
import java.io.File;
import java.util.Collection;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.IOFileFilter;
import org.apache.commons.io.filefilter.TrueFileFilter;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.leaderelection.TestingListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
import org.apache.flink.runtime.testutils.JobManagerProcess;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.shaded.zookeeper.org.apache.zookeeper.data.Stat;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import scala.Option;
import scala.Some;
import scala.Tuple2;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

public class JobManagerHAJobGraphRecoveryITCase
extends TestLogger {
    private static final ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
    private static final FiniteDuration TestTimeOut = new FiniteDuration(5L, TimeUnit.MINUTES);
    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();

    @AfterClass
    public static void tearDown() throws Exception {
        ZooKeeper.shutdown();
    }

    @Before
    public void cleanUp() throws Exception {
        ZooKeeper.deleteAll();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testJobPersistencyWhenJobManagerShutdown() throws Exception {
        Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(ZooKeeper.getConnectString(), this.tempFolder.getRoot().getPath());
        config.setInteger("local.number-jobmanager", 1);
        config.setInteger("local.number-taskmanager", 1);
        TestingCluster flink = new TestingCluster(config, false, false);
        try {
            Deadline deadline = TestTimeOut.fromNow();
            flink.start(true);
            JobGraph jobGraph = JobManagerHAJobGraphRecoveryITCase.createBlockingJobGraph();
            ExecutionConfig ec = new ExecutionConfig();
            ec.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)Integer.MAX_VALUE, (long)100L));
            jobGraph.setExecutionConfig(ec);
            ActorGateway jobManager = flink.getLeaderGateway(deadline.timeLeft());
            jobManager.tell((Object)new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED));
            JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.RUNNING, jobManager, deadline.timeLeft());
        }
        finally {
            flink.stop();
        }
        this.verifyRecoveryState(config);
    }

    @Test
    public void testClientNonDetachedListeningBehaviour() throws Exception {
        Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(ZooKeeper.getConnectString(), this.tempFolder.getRoot().getPath());
        ActorSystem testSystem = null;
        JobManagerProcess[] jobManagerProcess = new JobManagerProcess[2];
        LeaderRetrievalService leaderRetrievalService = null;
        ActorSystem taskManagerSystem = null;
        HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices((Configuration)config, (Executor)TestingUtils.defaultExecutor(), (HighAvailabilityServicesUtils.AddressResolution)HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
        try {
            Deadline deadline = TestTimeOut.fromNow();
            testSystem = AkkaUtils.createActorSystem((Configuration)new Configuration(), (Option)new Some((Object)new Tuple2((Object)"localhost", (Object)0)));
            jobManagerProcess[0] = new JobManagerProcess(0, config);
            jobManagerProcess[1] = new JobManagerProcess(1, config);
            jobManagerProcess[0].startProcess();
            jobManagerProcess[1].startProcess();
            TestingListener leaderListener = new TestingListener();
            leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
            leaderRetrievalService.start((LeaderRetrievalListener)leaderListener);
            taskManagerSystem = AkkaUtils.createActorSystem((Config)AkkaUtils.getDefaultAkkaConfig());
            TaskManager.startTaskManagerComponentsAndActor((Configuration)config, (ResourceID)ResourceID.generate(), (ActorSystem)taskManagerSystem, (HighAvailabilityServices)highAvailabilityServices, (MetricRegistry)new NoOpMetricRegistry(), (String)"localhost", (Option)Option.empty(), (boolean)false, TaskManager.class);
            TestActorRef clientRef = TestActorRef.create((ActorSystem)testSystem, (Props)Props.create(RecordingTestClient.class, (Object[])new Object[0]));
            JobGraph jobGraph = JobManagerHAJobGraphRecoveryITCase.createBlockingJobGraph();
            leaderListener.waitForNewLeader(deadline.timeLeft().toMillis());
            String leaderAddress = leaderListener.getAddress();
            UUID leaderId = leaderListener.getLeaderSessionID();
            AkkaActorGateway client = new AkkaActorGateway((ActorRef)clientRef, leaderId);
            ActorRef actorRef = AkkaUtils.getActorRef((String)leaderAddress, (ActorSystem)testSystem, (FiniteDuration)deadline.timeLeft());
            AkkaActorGateway leader = new AkkaActorGateway(actorRef, leaderId);
            int numSlots = 0;
            while (numSlots == 0) {
                Future slotsFuture = leader.ask(JobManagerMessages.getRequestTotalNumberOfSlots(), deadline.timeLeft());
                numSlots = (Integer)Await.result((Awaitable)slotsFuture, (Duration)deadline.timeLeft());
            }
            leader.tell((Object)new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES), (ActorGateway)client);
            JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.RUNNING, (ActorGateway)leader, deadline.timeLeft());
            JobManagerProcess leadingJobManagerProcess = jobManagerProcess[0].getJobManagerAkkaURL(deadline.timeLeft()).equals(leaderListener.getAddress()) ? jobManagerProcess[0] : jobManagerProcess[1];
            leadingJobManagerProcess.destroy();
            leaderListener.waitForNewLeader(deadline.timeLeft().toMillis());
            String leaderAddress2 = leaderListener.getAddress();
            UUID leaderId2 = leaderListener.getLeaderSessionID();
            ActorRef actorRef2 = AkkaUtils.getActorRef((String)leaderAddress2, (ActorSystem)testSystem, (FiniteDuration)deadline.timeLeft());
            leader = new AkkaActorGateway(actorRef2, leaderId2);
            JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.RUNNING, (ActorGateway)leader, deadline.timeLeft());
            leader.tell((Object)new JobManagerMessages.CancelJob(jobGraph.getJobID()));
            ((RecordingTestClient)clientRef.underlyingActor()).awaitJobResult(deadline.timeLeft().toMillis());
            int jobSubmitSuccessMessages = 0;
            for (Object e : ((RecordingTestClient)clientRef.underlyingActor()).getMessages()) {
                if (!(e instanceof JobManagerMessages.JobSubmitSuccess)) continue;
                ++jobSubmitSuccessMessages;
            }
            Assert.assertEquals((long)2L, (long)jobSubmitSuccessMessages);
        }
        catch (Throwable t) {
            t.printStackTrace();
            if (jobManagerProcess[0] != null) {
                jobManagerProcess[0].printProcessLog();
            }
            if (jobManagerProcess[1] != null) {
                jobManagerProcess[1].printProcessLog();
            }
            throw t;
        }
        finally {
            if (jobManagerProcess[0] != null) {
                jobManagerProcess[0].destroy();
            }
            if (jobManagerProcess[1] != null) {
                jobManagerProcess[1].destroy();
            }
            if (leaderRetrievalService != null) {
                leaderRetrievalService.stop();
            }
            if (taskManagerSystem != null) {
                taskManagerSystem.shutdown();
            }
            if (testSystem != null) {
                testSystem.shutdown();
            }
            highAvailabilityServices.closeAndCleanupAllData();
        }
    }

    private static JobGraph createBlockingJobGraph() {
        JobGraph jobGraph = new JobGraph("Blocking program");
        JobVertex jobVertex = new JobVertex("Blocking Vertex");
        jobVertex.setInvokableClass(BlockingNoOpInvokable.class);
        jobGraph.addVertex(jobVertex);
        return jobGraph;
    }

    private void verifyCleanRecoveryState(Configuration config) throws Exception {
        Collection stateHandles = FileUtils.listFiles((File)this.tempFolder.getRoot(), (IOFileFilter)TrueFileFilter.INSTANCE, (IOFileFilter)TrueFileFilter.INSTANCE);
        if (!stateHandles.isEmpty()) {
            Assert.fail((String)("File state backend is not clean: " + stateHandles));
        }
        String currentJobsPath = config.getString(HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);
        Stat stat = (Stat)ZooKeeper.getClient().checkExists().forPath(currentJobsPath);
        if (stat.getCversion() == 0) {
            Assert.fail((String)("ZooKeeper state for '" + currentJobsPath + "' has not been modified during this test. What are you testing?"));
        }
        if (stat.getNumChildren() != 0) {
            Assert.fail((String)("ZooKeeper path '" + currentJobsPath + "' is not clean: " + ZooKeeper.getClient().getChildren().forPath(currentJobsPath)));
        }
    }

    private void verifyRecoveryState(Configuration config) throws Exception {
        Collection stateHandles = FileUtils.listFiles((File)this.tempFolder.getRoot(), (IOFileFilter)TrueFileFilter.INSTANCE, (IOFileFilter)TrueFileFilter.INSTANCE);
        if (stateHandles.isEmpty()) {
            Assert.fail((String)("File state backend has been cleaned: " + stateHandles));
        }
        String currentJobsPath = config.getString(HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);
        Stat stat = (Stat)ZooKeeper.getClient().checkExists().forPath(currentJobsPath);
        if (stat.getCversion() == 0) {
            Assert.fail((String)("ZooKeeper state for '" + currentJobsPath + "' has not been modified during this test. What are you testing?"));
        }
        if (stat.getNumChildren() == 0) {
            Assert.fail((String)("ZooKeeper path '" + currentJobsPath + "' has been cleaned: " + ZooKeeper.getClient().getChildren().forPath(currentJobsPath)));
        }
    }

    private static class RecordingTestClient
    extends UntypedActor {
        private final Queue<Object> messages = new ConcurrentLinkedQueue<Object>();
        private CountDownLatch jobResultLatch = new CountDownLatch(1);

        private RecordingTestClient() {
        }

        public void onReceive(Object message) throws Exception {
            if (message instanceof JobManagerMessages.LeaderSessionMessage) {
                message = ((JobManagerMessages.LeaderSessionMessage)message).message();
            }
            this.messages.add(message);
            if (message instanceof JobManagerMessages.JobResultFailure || message instanceof JobManagerMessages.JobResultSuccess) {
                this.jobResultLatch.countDown();
            }
        }

        public Queue<Object> getMessages() {
            return this.messages;
        }

        public void awaitJobResult(long timeout) throws InterruptedException {
            this.jobResultLatch.await(timeout, TimeUnit.MILLISECONDS);
        }
    }
}

