/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest.handler.legacy.backpressure;

import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

public class StackTraceSampleCoordinatorITCase
extends TestLogger {
    private static ActorSystem testActorSystem;

    @BeforeClass
    public static void setup() {
        testActorSystem = AkkaUtils.createLocalActorSystem((Configuration)new Configuration());
    }

    @AfterClass
    public static void teardown() {
        JavaTestKit.shutdownActorSystem((ActorSystem)testActorSystem);
    }

    @Test
    public void testTaskClearedWhileSampling() throws Exception {
        new JavaTestKit(testActorSystem){
            {
                final FiniteDuration deadline = new FiniteDuration(60L, TimeUnit.SECONDS);
                final JobGraph jobGraph = new JobGraph(new JobVertex[0]);
                boolean parallelism = true;
                final JobVertex task = new JobVertex("Task");
                task.setInvokableClass(BlockingNoOpInvokable.class);
                task.setParallelism(1);
                jobGraph.addVertex(task);
                final Configuration config = new Configuration();
                HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices((Configuration)config, (Executor)TestingUtils.defaultExecutor());
                ActorGateway jobManger = null;
                ActorGateway taskManager = null;
                try {
                    jobManger = TestingUtils.createJobManager(testActorSystem, TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), config, highAvailabilityServices);
                    config.setInteger("taskmanager.numberOfTaskSlots", 1);
                    taskManager = TestingUtils.createTaskManager(testActorSystem, highAvailabilityServices, config, true, true);
                    final ActorGateway jm = jobManger;
                    new JavaTestKit.Within(deadline){

                        /*
                         * WARNING - Removed try catching itself - possible behaviour change.
                         */
                        protected void run() {
                            try {
                                AkkaActorGateway testActor = new AkkaActorGateway(this.getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID);
                                int maxAttempts = 10;
                                int sleepTime = 100;
                                int i = 0;
                                while (i < maxAttempts) {
                                    JobClient.submitJobDetached((JobManagerGateway)new AkkaJobManagerGateway(jm), (Configuration)config, (JobGraph)jobGraph, (Time)Time.milliseconds((long)deadline.toMillis()), (ClassLoader)ClassLoader.getSystemClassLoader());
                                    jm.tell((Object)new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID()), (ActorGateway)testActor);
                                    this.expectMsgEquals(new TestingJobManagerMessages.AllVerticesRunning(jobGraph.getJobID()));
                                    jm.tell((Object)new TestingJobManagerMessages.RequestExecutionGraph(jobGraph.getJobID()), (ActorGateway)testActor);
                                    TestingJobManagerMessages.ExecutionGraphFound executionGraphResponse = (TestingJobManagerMessages.ExecutionGraphFound)this.expectMsgClass(TestingJobManagerMessages.ExecutionGraphFound.class);
                                    ExecutionGraph executionGraph = (ExecutionGraph)executionGraphResponse.executionGraph();
                                    ExecutionJobVertex vertex = executionGraph.getJobVertex(task.getID());
                                    StackTraceSampleCoordinator coordinator = new StackTraceSampleCoordinator((Executor)testActorSystem.dispatcher(), 60000L);
                                    CompletableFuture sampleFuture = coordinator.triggerStackTraceSample(vertex.getTaskVertices(), 2147470000, Time.milliseconds((long)10L), 0);
                                    Thread.sleep(sleepTime);
                                    Future removeFuture = jm.ask((Object)new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), this.remaining());
                                    jm.tell((Object)new JobManagerMessages.CancelJob(jobGraph.getJobID()));
                                    try {
                                        sampleFuture.get(this.remaining().toMillis(), TimeUnit.MILLISECONDS);
                                        break;
                                    }
                                    catch (Throwable throwable) {
                                    }
                                    finally {
                                        Await.ready((Awaitable)removeFuture, (Duration)this.remaining());
                                    }
                                    ++i;
                                    sleepTime *= 2;
                                }
                            }
                            catch (Exception e) {
                                e.printStackTrace();
                                Assert.fail((String)e.getMessage());
                            }
                        }
                    };
                }
                catch (Throwable throwable) {
                    TestingUtils.stopActor(jobManger);
                    TestingUtils.stopActor(taskManager);
                    highAvailabilityServices.closeAndCleanupAllData();
                    throw throwable;
                }
                TestingUtils.stopActor(jobManger);
                TestingUtils.stopActor(taskManager);
                highAvailabilityServices.closeAndCleanupAllData();
            }
        };
    }
}

