/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest.handler.legacy.backpressure;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSample;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

public class BackPressureStatsTrackerTest
extends TestLogger {
    @Test
    public void testTriggerStackTraceSample() throws Exception {
        CompletableFuture<StackTraceSample> sampleFuture = new CompletableFuture<StackTraceSample>();
        StackTraceSampleCoordinator sampleCoordinator = (StackTraceSampleCoordinator)Mockito.mock(StackTraceSampleCoordinator.class);
        Mockito.when((Object)sampleCoordinator.triggerStackTraceSample((ExecutionVertex[])Matchers.any(ExecutionVertex[].class), Matchers.anyInt(), (Time)Matchers.any(Time.class), Matchers.anyInt())).thenReturn(sampleFuture);
        ExecutionGraph graph = (ExecutionGraph)Mockito.mock(ExecutionGraph.class);
        Mockito.when((Object)graph.getState()).thenReturn((Object)JobStatus.RUNNING);
        Mockito.when((Object)graph.getFutureExecutor()).thenReturn((Object)new Executor(){

            @Override
            public void execute(Runnable runnable) {
                runnable.run();
            }
        });
        ExecutionVertex[] taskVertices = new ExecutionVertex[4];
        ExecutionJobVertex jobVertex = (ExecutionJobVertex)Mockito.mock(ExecutionJobVertex.class);
        Mockito.when((Object)jobVertex.getJobId()).thenReturn((Object)new JobID());
        Mockito.when((Object)jobVertex.getJobVertexId()).thenReturn((Object)new JobVertexID());
        Mockito.when((Object)jobVertex.getGraph()).thenReturn((Object)graph);
        Mockito.when((Object)jobVertex.getTaskVertices()).thenReturn((Object)taskVertices);
        taskVertices[0] = this.mockExecutionVertex(jobVertex, 0);
        taskVertices[1] = this.mockExecutionVertex(jobVertex, 1);
        taskVertices[2] = this.mockExecutionVertex(jobVertex, 2);
        taskVertices[3] = this.mockExecutionVertex(jobVertex, 3);
        int numSamples = 100;
        Time delayBetweenSamples = Time.milliseconds((long)100L);
        BackPressureStatsTracker tracker = new BackPressureStatsTracker(sampleCoordinator, 9999, numSamples, delayBetweenSamples);
        Assert.assertTrue((String)"Failed to trigger", (boolean)tracker.triggerStackTraceSample(jobVertex));
        ((StackTraceSampleCoordinator)Mockito.verify((Object)sampleCoordinator)).triggerStackTraceSample((ExecutionVertex[])Matchers.eq((Object)taskVertices), Matchers.eq((int)numSamples), (Time)Matchers.eq((Object)delayBetweenSamples), Matchers.eq((int)3));
        Assert.assertFalse((String)"Unexpected trigger", (boolean)tracker.triggerStackTraceSample(jobVertex));
        Assert.assertTrue((!tracker.getOperatorBackPressureStats(jobVertex).isPresent() ? 1 : 0) != 0);
        ((StackTraceSampleCoordinator)Mockito.verify((Object)sampleCoordinator)).triggerStackTraceSample((ExecutionVertex[])Matchers.eq((Object)taskVertices), Matchers.eq((int)numSamples), (Time)Matchers.eq((Object)delayBetweenSamples), Matchers.eq((int)3));
        Assert.assertTrue((!tracker.getOperatorBackPressureStats(jobVertex).isPresent() ? 1 : 0) != 0);
        HashMap traces = new HashMap();
        for (ExecutionVertex vertex : taskVertices) {
            ArrayList<StackTraceElement[]> taskTraces = new ArrayList<StackTraceElement[]>();
            for (int i = 0; i < taskVertices.length; ++i) {
                taskTraces.add(this.createStackTrace(i <= vertex.getParallelSubtaskIndex()));
            }
            traces.put(vertex.getCurrentExecutionAttempt().getAttemptId(), taskTraces);
        }
        int sampleId = 1231;
        int endTime = 841;
        StackTraceSample sample = new StackTraceSample(sampleId, 0L, (long)endTime, traces);
        sampleFuture.complete(sample);
        Assert.assertTrue((boolean)tracker.getOperatorBackPressureStats(jobVertex).isPresent());
        OperatorBackPressureStats stats = (OperatorBackPressureStats)tracker.getOperatorBackPressureStats(jobVertex).get();
        Assert.assertEquals((long)sampleId, (long)stats.getSampleId());
        Assert.assertEquals((long)endTime, (long)stats.getEndTimestamp());
        Assert.assertEquals((long)taskVertices.length, (long)stats.getNumberOfSubTasks());
        for (int i = 0; i < taskVertices.length; ++i) {
            double ratio = stats.getBackPressureRatio(i);
            Assert.assertEquals((double)((double)(i + 1) / 4.0), (double)ratio, (double)0.0);
        }
    }

    private StackTraceElement[] createStackTrace(boolean isBackPressure) {
        if (isBackPressure) {
            return new StackTraceElement[]{new StackTraceElement("org.apache.flink.runtime.io.network.buffer.LocalBufferPool", "requestBufferBlocking", "LocalBufferPool.java", 133)};
        }
        return Thread.currentThread().getStackTrace();
    }

    private ExecutionVertex mockExecutionVertex(ExecutionJobVertex jobVertex, int subTaskIndex) {
        Execution exec = (Execution)Mockito.mock(Execution.class);
        Mockito.when((Object)exec.getAttemptId()).thenReturn((Object)new ExecutionAttemptID());
        JobVertexID id = jobVertex.getJobVertexId();
        ExecutionVertex vertex = (ExecutionVertex)Mockito.mock(ExecutionVertex.class);
        Mockito.when((Object)vertex.getJobvertexId()).thenReturn((Object)id);
        Mockito.when((Object)vertex.getCurrentExecutionAttempt()).thenReturn((Object)exec);
        Mockito.when((Object)vertex.getParallelSubtaskIndex()).thenReturn((Object)subTaskIndex);
        return vertex;
    }
}

