/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.FailingBlockingInvokable;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

public class CoordinatorShutdownTest
extends TestLogger {
    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCoordinatorShutsDownOnFailure() {
        LocalFlinkMiniCluster cluster = null;
        try {
            Configuration config = new Configuration();
            config.setInteger("local.number-taskmanager", 1);
            config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
            cluster = new LocalFlinkMiniCluster(config, true);
            cluster.start();
            JobVertex vertex = new JobVertex("Test Vertex");
            vertex.setInvokableClass(FailingBlockingInvokable.class);
            List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID());
            ExecutionConfig executionConfig = new ExecutionConfig();
            executionConfig.setRestartStrategy(RestartStrategies.noRestart());
            JobGraph testGraph = new JobGraph("test job", new JobVertex[]{vertex});
            testGraph.setSnapshotSettings(new JobCheckpointingSettings(vertexIdList, vertexIdList, vertexIdList, new CheckpointCoordinatorConfiguration(5000L, 60000L, 0L, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true), null));
            testGraph.setExecutionConfig(executionConfig);
            ActorGateway jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
            FiniteDuration timeout = new FiniteDuration(60L, TimeUnit.SECONDS);
            JobManagerMessages.SubmitJob submitMessage = new JobManagerMessages.SubmitJob(testGraph, ListeningBehaviour.EXECUTION_RESULT);
            Future submitFuture = jmGateway.ask((Object)submitMessage, timeout);
            Await.result((Awaitable)submitFuture, (Duration)timeout);
            Future jobRequestFuture = jmGateway.ask((Object)new JobManagerMessages.RequestJob(testGraph.getJobID()), timeout);
            ExecutionGraph graph = (ExecutionGraph)((JobManagerMessages.JobFound)Await.result((Awaitable)jobRequestFuture, (Duration)timeout)).executionGraph();
            Assert.assertNotNull((Object)graph);
            FailingBlockingInvokable.unblock();
            graph.waitUntilTerminal();
            CheckpointCoordinator coord = graph.getCheckpointCoordinator();
            Assert.assertTrue((coord == null || coord.isShutdown() ? 1 : 0) != 0);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
        finally {
            if (cluster != null) {
                cluster.stop();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCoordinatorShutsDownOnSuccess() {
        LocalFlinkMiniCluster cluster = null;
        try {
            Configuration config = new Configuration();
            config.setInteger("local.number-taskmanager", 1);
            config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
            cluster = new LocalFlinkMiniCluster(config, true);
            cluster.start();
            JobVertex vertex = new JobVertex("Test Vertex");
            vertex.setInvokableClass(BlockingInvokable.class);
            List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID());
            JobGraph testGraph = new JobGraph("test job", new JobVertex[]{vertex});
            testGraph.setSnapshotSettings(new JobCheckpointingSettings(vertexIdList, vertexIdList, vertexIdList, new CheckpointCoordinatorConfiguration(5000L, 60000L, 0L, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true), null));
            ActorGateway jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
            FiniteDuration timeout = new FiniteDuration(60L, TimeUnit.SECONDS);
            JobManagerMessages.SubmitJob submitMessage = new JobManagerMessages.SubmitJob(testGraph, ListeningBehaviour.EXECUTION_RESULT);
            Future submitFuture = jmGateway.ask((Object)submitMessage, timeout);
            Await.result((Awaitable)submitFuture, (Duration)timeout);
            Future jobRequestFuture = jmGateway.ask((Object)new JobManagerMessages.RequestJob(testGraph.getJobID()), timeout);
            ExecutionGraph graph = (ExecutionGraph)((JobManagerMessages.JobFound)Await.result((Awaitable)jobRequestFuture, (Duration)timeout)).executionGraph();
            Assert.assertNotNull((Object)graph);
            BlockingInvokable.unblock();
            graph.waitUntilTerminal();
            CheckpointCoordinator coord = graph.getCheckpointCoordinator();
            Assert.assertTrue((coord == null || coord.isShutdown() ? 1 : 0) != 0);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
        finally {
            if (cluster != null) {
                cluster.stop();
            }
        }
    }

    public static class BlockingInvokable
    extends AbstractInvokable {
        private static final OneShotLatch LATCH = new OneShotLatch();

        public BlockingInvokable(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            LATCH.await();
        }

        public static void unblock() {
            LATCH.trigger();
        }
    }
}

