/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.client;

import akka.actor.PoisonPill;
import java.io.File;
import java.util.concurrent.TimeUnit;
import org.apache.curator.test.TestingServer;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Promise;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.impl.Promise;

public class JobClientActorRecoveryITCase
extends TestLogger {
    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();
    public static TestingServer zkServer;

    @BeforeClass
    public static void setup() throws Exception {
        zkServer = new TestingServer();
        zkServer.start();
    }

    public static void teardown() throws Exception {
        if (zkServer != null) {
            zkServer.stop();
            zkServer = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testJobClientRecovery() throws Exception {
        File rootFolder = this.tempFolder.getRoot();
        Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(zkServer.getConnectString(), rootFolder.getPath());
        config.setInteger("local.number-jobmanager", 2);
        config.setInteger("local.number-taskmanager", 1);
        final TestingCluster cluster = new TestingCluster(config);
        cluster.start();
        JobVertex blockingVertex = new JobVertex("Blocking Vertex");
        blockingVertex.setInvokableClass(BlockingTask.class);
        blockingVertex.setParallelism(1);
        final JobGraph jobGraph = new JobGraph("Blocking Test Job", new JobVertex[]{blockingVertex});
        Promise.DefaultPromise promise = new Promise.DefaultPromise();
        Deadline deadline = new FiniteDuration(2L, TimeUnit.MINUTES).fromNow();
        try {
            Thread submitter = new Thread(new Runnable((Promise)promise){
                final /* synthetic */ Promise val$promise;
                {
                    this.val$promise = promise;
                }

                @Override
                public void run() {
                    try {
                        JobExecutionResult result = cluster.submitJobAndWait(jobGraph, false);
                        this.val$promise.success((Object)result);
                    }
                    catch (Exception e) {
                        this.val$promise.failure((Throwable)e);
                    }
                }
            });
            submitter.start();
            Object object = BlockingTask.waitLock;
            synchronized (object) {
                while (BlockingTask.HasBlockedExecution < 1 && deadline.hasTimeLeft()) {
                    BlockingTask.waitLock.wait(deadline.timeLeft().toMillis());
                }
            }
            if (deadline.isOverdue()) {
                Assert.fail((String)"The job has not blocked within the given deadline.");
            }
            ActorGateway gateway = cluster.getLeaderGateway(deadline.timeLeft());
            gateway.tell(TestingJobManagerMessages.getDisablePostStop());
            gateway.tell((Object)PoisonPill.getInstance());
            Await.result((Awaitable)promise.future(), (Duration)deadline.timeLeft());
        }
        finally {
            cluster.stop();
        }
    }

    public static class BlockingTask
    extends AbstractInvokable {
        private static volatile int BlockExecution = 1;
        private static volatile int HasBlockedExecution = 0;
        private static Object waitLock = new Object();

        public BlockingTask(Environment environment) {
            super(environment);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke() throws Exception {
            if (BlockExecution > 0) {
                --BlockExecution;
                Object object = waitLock;
                synchronized (object) {
                    ++HasBlockedExecution;
                    waitLock.notifyAll();
                }
            }
        }
    }
}

