/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest.handler.legacy.backpressure;

import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;

public class BackPressureStatsTrackerITCase
extends TestLogger {
    private static NetworkBufferPool networkBufferPool;
    private static ActorSystem testActorSystem;
    private static BufferPool testBufferPool;

    @BeforeClass
    public static void setup() {
        testActorSystem = AkkaUtils.createLocalActorSystem((Configuration)new Configuration());
        networkBufferPool = new NetworkBufferPool(100, 8192, MemoryType.HEAP);
    }

    @AfterClass
    public static void teardown() {
        JavaTestKit.shutdownActorSystem((ActorSystem)testActorSystem);
        networkBufferPool.destroy();
    }

    @Test
    public void testBackPressuredProducer() throws Exception {
        new JavaTestKit(testActorSystem){
            {
                Buffer buffer;
                super(x0);
                final FiniteDuration deadline = new FiniteDuration(60L, TimeUnit.SECONDS);
                final JobGraph jobGraph = new JobGraph(new JobVertex[0]);
                int parallelism = 4;
                final JobVertex task = new JobVertex("Task");
                task.setInvokableClass(BackPressuredTask.class);
                task.setParallelism(4);
                jobGraph.addVertex(task);
                final Configuration config = new Configuration();
                HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices((Configuration)config, (Executor)TestingUtils.defaultExecutor());
                ActorGateway jobManger = null;
                ActorGateway taskManager = null;
                testBufferPool = networkBufferPool.createBufferPool(1, Integer.MAX_VALUE);
                final ArrayList<Buffer> buffers = new ArrayList<Buffer>();
                while ((buffer = testBufferPool.requestBuffer()) != null) {
                    buffers.add(buffer);
                }
                try {
                    jobManger = TestingUtils.createJobManager(testActorSystem, TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), config, highAvailabilityServices);
                    config.setInteger("taskmanager.numberOfTaskSlots", 4);
                    taskManager = TestingUtils.createTaskManager(testActorSystem, highAvailabilityServices, config, true, true);
                    final ActorGateway jm = jobManger;
                    new JavaTestKit.Within(deadline){

                        protected void run() {
                            try {
                                int i;
                                OperatorBackPressureStats stats;
                                AkkaActorGateway testActor = new AkkaActorGateway(this.getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID);
                                JobClient.submitJobDetached((JobManagerGateway)new AkkaJobManagerGateway(jm), (Configuration)config, (JobGraph)jobGraph, (Time)Time.milliseconds((long)deadline.toMillis()), (ClassLoader)ClassLoader.getSystemClassLoader());
                                jm.tell((Object)new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID()), (ActorGateway)testActor);
                                this.expectMsgEquals(new TestingJobManagerMessages.AllVerticesRunning(jobGraph.getJobID()));
                                jm.tell((Object)new TestingJobManagerMessages.RequestExecutionGraph(jobGraph.getJobID()), (ActorGateway)testActor);
                                TestingJobManagerMessages.ExecutionGraphFound executionGraphResponse = (TestingJobManagerMessages.ExecutionGraphFound)this.expectMsgClass(TestingJobManagerMessages.ExecutionGraphFound.class);
                                ExecutionGraph executionGraph = (ExecutionGraph)executionGraphResponse.executionGraph();
                                ExecutionJobVertex vertex = executionGraph.getJobVertex(task.getID());
                                StackTraceSampleCoordinator coordinator = new StackTraceSampleCoordinator((Executor)testActorSystem.dispatcher(), 60000L);
                                BackPressureStatsTracker statsTracker = new BackPressureStatsTracker(coordinator, 100000, 20, Time.milliseconds((long)10L));
                                int numAttempts = 10;
                                int nextSampleId = 0;
                                for (int attempt = 0; attempt < numAttempts; ++attempt) {
                                    try {
                                        stats = BackPressureStatsTrackerITCase.this.triggerStatsSample(statsTracker, vertex);
                                        Assert.assertEquals((long)(nextSampleId + attempt), (long)stats.getSampleId());
                                        Assert.assertEquals((long)4L, (long)stats.getNumberOfSubTasks());
                                        Assert.assertEquals((double)1.0, (double)stats.getMaxBackPressureRatio(), (double)0.0);
                                        for (i = 0; i < 4; ++i) {
                                            Assert.assertEquals((double)1.0, (double)stats.getBackPressureRatio(i), (double)0.0);
                                        }
                                        nextSampleId = stats.getSampleId() + 1;
                                        break;
                                    }
                                    catch (Throwable t) {
                                        if (attempt == numAttempts - 1) {
                                            throw t;
                                        }
                                        Thread.sleep(500L);
                                        continue;
                                    }
                                }
                                for (Buffer buf : buffers) {
                                    buf.recycle();
                                    Assert.assertTrue((boolean)buf.isRecycled());
                                }
                                while (testBufferPool.getNumberOfAvailableMemorySegments() < 100) {
                                    Thread.sleep(100L);
                                }
                                for (int attempt = 0; attempt < numAttempts; ++attempt) {
                                    try {
                                        stats = BackPressureStatsTrackerITCase.this.triggerStatsSample(statsTracker, vertex);
                                        Assert.assertEquals((long)(nextSampleId + attempt), (long)stats.getSampleId());
                                        Assert.assertEquals((long)4L, (long)stats.getNumberOfSubTasks());
                                        for (i = 0; i < 4; ++i) {
                                            Assert.assertEquals((double)0.0, (double)stats.getBackPressureRatio(i), (double)0.0);
                                        }
                                        break;
                                    }
                                    catch (Throwable t) {
                                        if (attempt == numAttempts - 1) {
                                            throw t;
                                        }
                                        Thread.sleep(500L);
                                        continue;
                                    }
                                }
                                jm.tell((Object)new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), (ActorGateway)testActor);
                                jm.tell((Object)new JobManagerMessages.CancelJob(jobGraph.getJobID()));
                                this.expectMsgEquals(true);
                                statsTracker.invalidateOperatorStatsCache();
                                Assert.assertFalse((String)"Unexpected trigger", (boolean)statsTracker.triggerStackTraceSample(vertex));
                            }
                            catch (Exception e) {
                                e.printStackTrace();
                                Assert.fail((String)e.getMessage());
                            }
                        }
                    };
                }
                catch (Throwable throwable) {
                    TestingUtils.stopActor(jobManger);
                    TestingUtils.stopActor(taskManager);
                    highAvailabilityServices.closeAndCleanupAllData();
                    testBufferPool.lazyDestroy();
                    throw throwable;
                }
                TestingUtils.stopActor(jobManger);
                TestingUtils.stopActor(taskManager);
                highAvailabilityServices.closeAndCleanupAllData();
                testBufferPool.lazyDestroy();
            }
        };
    }

    private OperatorBackPressureStats triggerStatsSample(BackPressureStatsTracker statsTracker, ExecutionJobVertex vertex) throws InterruptedException {
        Optional stats;
        statsTracker.invalidateOperatorStatsCache();
        Assert.assertTrue((String)"Failed to trigger", (boolean)statsTracker.triggerStackTraceSample(vertex));
        Thread.sleep(200L);
        while (!(stats = statsTracker.getOperatorBackPressureStats(vertex)).isPresent()) {
            Thread.sleep(10L);
        }
        return (OperatorBackPressureStats)stats.get();
    }

    public static class BackPressuredTask
    extends AbstractInvokable {
        public void invoke() throws Exception {
            while (true) {
                Buffer buffer = testBufferPool.requestBufferBlocking();
                buffer.recycle();
                new CountDownLatch(1).await();
            }
        }
    }
}

