/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest.handler.legacy.backpressure;

import akka.actor.ActorSystem;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.StackTraceSampleMessages;
import org.apache.flink.runtime.messages.StackTraceSampleResponse;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSample;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

public class StackTraceSampleCoordinatorTest
extends TestLogger {
    private static ActorSystem system;
    private StackTraceSampleCoordinator coord;

    @BeforeClass
    public static void setUp() throws Exception {
        system = AkkaUtils.createLocalActorSystem((Configuration)new Configuration());
    }

    @AfterClass
    public static void tearDown() throws Exception {
        if (system != null) {
            system.terminate();
        }
    }

    @Before
    public void init() throws Exception {
        this.coord = new StackTraceSampleCoordinator((Executor)system.dispatcher(), 60000L);
    }

    @Test
    public void testTriggerStackTraceSample() throws Exception {
        ExecutionVertex[] vertices = new ExecutionVertex[]{this.mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), this.mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), this.mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), this.mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true)};
        int numSamples = 1;
        Time delayBetweenSamples = Time.milliseconds((long)100L);
        int maxStackTraceDepth = 0;
        CompletableFuture sampleFuture = this.coord.triggerStackTraceSample(vertices, numSamples, delayBetweenSamples, maxStackTraceDepth);
        for (ExecutionVertex vertex : vertices) {
            ExecutionAttemptID expectedExecutionId = vertex.getCurrentExecutionAttempt().getAttemptId();
            StackTraceSampleMessages.TriggerStackTraceSample expectedMsg = new StackTraceSampleMessages.TriggerStackTraceSample(0, expectedExecutionId, numSamples, delayBetweenSamples, maxStackTraceDepth);
            ((Execution)Mockito.verify((Object)vertex.getCurrentExecutionAttempt())).requestStackTraceSample(Matchers.eq((int)0), Matchers.eq((int)numSamples), (Time)Matchers.eq((Object)delayBetweenSamples), Matchers.eq((int)maxStackTraceDepth), (Time)Matchers.any(Time.class));
        }
        Assert.assertFalse((boolean)sampleFuture.isDone());
        StackTraceElement[] stackTraceSample = Thread.currentThread().getStackTrace();
        ArrayList<StackTraceElement[]> traces = new ArrayList<StackTraceElement[]>();
        traces.add(stackTraceSample);
        traces.add(stackTraceSample);
        traces.add(stackTraceSample);
        for (int i = 0; i < vertices.length; ++i) {
            ExecutionAttemptID executionId = vertices[i].getCurrentExecutionAttempt().getAttemptId();
            this.coord.collectStackTraces(0, executionId, traces);
            if (i == vertices.length - 1) {
                Assert.assertTrue((boolean)sampleFuture.isDone());
                continue;
            }
            Assert.assertFalse((boolean)sampleFuture.isDone());
        }
        StackTraceSample sample = (StackTraceSample)sampleFuture.get();
        Assert.assertEquals((long)0L, (long)sample.getSampleId());
        Assert.assertTrue((sample.getEndTime() >= sample.getStartTime() ? 1 : 0) != 0);
        Map tracesByTask = sample.getStackTraces();
        for (ExecutionVertex vertex : vertices) {
            ExecutionAttemptID executionId = vertex.getCurrentExecutionAttempt().getAttemptId();
            List sampleTraces = (List)tracesByTask.get(executionId);
            Assert.assertNotNull((String)"Task not found", (Object)sampleTraces);
            Assert.assertTrue((boolean)traces.equals(sampleTraces));
        }
        Assert.assertEquals((long)0L, (long)this.coord.getNumberOfPendingSamples());
        this.coord.collectStackTraces(0, vertices[0].getCurrentExecutionAttempt().getAttemptId(), traces);
    }

    @Test
    public void testTriggerStackTraceSampleNotRunningTasks() throws Exception {
        ExecutionVertex[] vertices = new ExecutionVertex[]{this.mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), this.mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.DEPLOYING, true)};
        CompletableFuture sampleFuture = this.coord.triggerStackTraceSample(vertices, 1, Time.milliseconds((long)100L), 0);
        Assert.assertTrue((boolean)sampleFuture.isDone());
        try {
            sampleFuture.get();
            Assert.fail((String)"Expected exception.");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof IllegalStateException));
        }
    }

    @Test(timeout=1000L)
    public void testTriggerStackTraceSampleResetRunningTasks() throws Exception {
        ExecutionVertex[] vertices = new ExecutionVertex[]{this.mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), this.mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, false)};
        CompletableFuture sampleFuture = this.coord.triggerStackTraceSample(vertices, 1, Time.milliseconds((long)100L), 0);
        try {
            sampleFuture.get();
            Assert.fail((String)"Expected exception.");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof RuntimeException));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=1000L)
    public void testTriggerStackTraceSampleTimeout() throws Exception {
        int timeout = 100;
        this.coord = new StackTraceSampleCoordinator((Executor)system.dispatcher(), (long)timeout);
        ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
        try {
            ExecutionVertex[] vertices = new ExecutionVertex[]{this.mockExecutionVertexWithTimeout(new ExecutionAttemptID(), ExecutionState.RUNNING, scheduledExecutorService, timeout)};
            CompletableFuture sampleFuture = this.coord.triggerStackTraceSample(vertices, 1, Time.milliseconds((long)100L), 0);
            Thread.sleep(timeout * 2);
            boolean success = false;
            for (int i = 0; i < 10; ++i) {
                if (sampleFuture.isDone()) {
                    success = true;
                    break;
                }
                Thread.sleep(timeout);
            }
            Assert.assertTrue((String)"Sample did not time out", (boolean)success);
            try {
                sampleFuture.get();
                Assert.fail((String)"Expected exception.");
            }
            catch (ExecutionException e) {
                Assert.assertTrue((boolean)e.getCause().getCause().getMessage().contains("Timeout"));
            }
            ExecutionAttemptID executionId = vertices[0].getCurrentExecutionAttempt().getAttemptId();
            this.coord.collectStackTraces(0, executionId, new ArrayList());
        }
        finally {
            scheduledExecutorService.shutdownNow();
        }
    }

    @Test
    public void testCollectStackTraceForUnknownSample() throws Exception {
        this.coord.collectStackTraces(0, new ExecutionAttemptID(), new ArrayList());
    }

    @Test
    public void testCancelStackTraceSample() throws Exception {
        ExecutionVertex[] vertices = new ExecutionVertex[]{this.mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true)};
        CompletableFuture sampleFuture = this.coord.triggerStackTraceSample(vertices, 1, Time.milliseconds((long)100L), 0);
        Assert.assertFalse((boolean)sampleFuture.isDone());
        this.coord.cancelStackTraceSample(0, null);
        Assert.assertTrue((boolean)sampleFuture.isDone());
        Assert.assertEquals((long)0L, (long)this.coord.getNumberOfPendingSamples());
    }

    @Test
    public void testCollectStackTraceForCanceledSample() throws Exception {
        ExecutionVertex[] vertices = new ExecutionVertex[]{this.mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true)};
        CompletableFuture sampleFuture = this.coord.triggerStackTraceSample(vertices, 1, Time.milliseconds((long)100L), 0);
        Assert.assertFalse((boolean)sampleFuture.isDone());
        this.coord.cancelStackTraceSample(0, null);
        Assert.assertTrue((boolean)sampleFuture.isDone());
        ExecutionAttemptID executionId = vertices[0].getCurrentExecutionAttempt().getAttemptId();
        this.coord.collectStackTraces(0, executionId, new ArrayList());
    }

    @Test
    public void testCollectForDiscardedPendingSample() throws Exception {
        ExecutionVertex[] vertices = new ExecutionVertex[]{this.mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true)};
        CompletableFuture sampleFuture = this.coord.triggerStackTraceSample(vertices, 1, Time.milliseconds((long)100L), 0);
        Assert.assertFalse((boolean)sampleFuture.isDone());
        this.coord.cancelStackTraceSample(0, null);
        Assert.assertTrue((boolean)sampleFuture.isDone());
        ExecutionAttemptID executionId = vertices[0].getCurrentExecutionAttempt().getAttemptId();
        this.coord.collectStackTraces(0, executionId, new ArrayList());
    }

    @Test(expected=IllegalArgumentException.class)
    public void testCollectStackTraceForUnknownTask() throws Exception {
        ExecutionVertex[] vertices = new ExecutionVertex[]{this.mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true)};
        this.coord.triggerStackTraceSample(vertices, 1, Time.milliseconds((long)100L), 0);
        this.coord.collectStackTraces(0, new ExecutionAttemptID(), new ArrayList());
    }

    @Test
    public void testShutDown() throws Exception {
        ExecutionVertex[] vertices = new ExecutionVertex[]{this.mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true)};
        ArrayList<CompletableFuture> sampleFutures = new ArrayList<CompletableFuture>();
        sampleFutures.add(this.coord.triggerStackTraceSample(vertices, 1, Time.milliseconds((long)100L), 0));
        sampleFutures.add(this.coord.triggerStackTraceSample(vertices, 1, Time.milliseconds((long)100L), 0));
        for (CompletableFuture future : sampleFutures) {
            Assert.assertFalse((boolean)future.isDone());
        }
        this.coord.shutDown();
        for (CompletableFuture future : sampleFutures) {
            Assert.assertTrue((boolean)future.isDone());
        }
        CompletableFuture future = this.coord.triggerStackTraceSample(vertices, 1, Time.milliseconds((long)100L), 0);
        Assert.assertTrue((boolean)future.isDone());
        try {
            future.get();
            Assert.fail((String)"Expected exception.");
        }
        catch (ExecutionException executionException) {
            // empty catch block
        }
    }

    private ExecutionVertex mockExecutionVertex(ExecutionAttemptID executionId, ExecutionState state, boolean sendSuccess) {
        Execution exec = (Execution)Mockito.mock(Execution.class);
        CompletableFuture failedFuture = new CompletableFuture();
        failedFuture.completeExceptionally(new Exception("Send failed."));
        Mockito.when((Object)exec.getAttemptId()).thenReturn((Object)executionId);
        Mockito.when((Object)exec.getState()).thenReturn((Object)state);
        Mockito.when((Object)exec.requestStackTraceSample(Matchers.anyInt(), Matchers.anyInt(), (Time)Matchers.any(Time.class), Matchers.anyInt(), (Time)Matchers.any(Time.class))).thenReturn(sendSuccess ? CompletableFuture.completedFuture(Mockito.mock(StackTraceSampleResponse.class)) : failedFuture);
        ExecutionVertex vertex = (ExecutionVertex)Mockito.mock(ExecutionVertex.class);
        Mockito.when((Object)vertex.getJobvertexId()).thenReturn((Object)new JobVertexID());
        Mockito.when((Object)vertex.getCurrentExecutionAttempt()).thenReturn((Object)exec);
        return vertex;
    }

    private ExecutionVertex mockExecutionVertexWithTimeout(ExecutionAttemptID executionId, ExecutionState state, ScheduledExecutorService scheduledExecutorService, int timeout) {
        final CompletableFuture future = new CompletableFuture();
        Execution exec = (Execution)Mockito.mock(Execution.class);
        Mockito.when((Object)exec.getAttemptId()).thenReturn((Object)executionId);
        Mockito.when((Object)exec.getState()).thenReturn((Object)state);
        Mockito.when((Object)exec.requestStackTraceSample(Matchers.anyInt(), Matchers.anyInt(), (Time)Matchers.any(Time.class), Matchers.anyInt(), (Time)Matchers.any(Time.class))).thenReturn(future);
        scheduledExecutorService.schedule(new Runnable(){

            @Override
            public void run() {
                future.completeExceptionally(new TimeoutException("Timeout"));
            }
        }, (long)timeout, TimeUnit.MILLISECONDS);
        ExecutionVertex vertex = (ExecutionVertex)Mockito.mock(ExecutionVertex.class);
        Mockito.when((Object)vertex.getJobvertexId()).thenReturn((Object)new JobVertexID());
        Mockito.when((Object)vertex.getCurrentExecutionAttempt()).thenReturn((Object)exec);
        return vertex;
    }
}

