/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskmanager;

import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

public class TaskCancelAsyncProducerConsumerITCase
extends TestLogger {
    private static volatile Exception ASYNC_PRODUCER_EXCEPTION;
    private static volatile Exception ASYNC_CONSUMER_EXCEPTION;
    private static volatile Thread ASYNC_PRODUCER_THREAD;
    private static volatile Thread ASYNC_CONSUMER_THREAD;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCancelAsyncProducerAndConsumer() throws Exception {
        Deadline deadline = new FiniteDuration(2L, TimeUnit.MINUTES).fromNow();
        TestingCluster flink = null;
        try {
            Configuration config = new Configuration();
            config.setInteger("local.number-taskmanager", 1);
            config.setInteger("taskmanager.numberOfTaskSlots", 1);
            config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096);
            config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 8);
            flink = new TestingCluster(config, true);
            flink.start();
            JobVertex producer = new JobVertex("AsyncProducer");
            producer.setParallelism(1);
            producer.setInvokableClass(AsyncProducer.class);
            JobVertex consumer = new JobVertex("AsyncConsumer");
            consumer.setParallelism(1);
            consumer.setInvokableClass(AsyncConsumer.class);
            consumer.connectNewDataSetAsInput(producer, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            SlotSharingGroup slot = new SlotSharingGroup(new JobVertexID[]{producer.getID(), consumer.getID()});
            producer.setSlotSharingGroup(slot);
            consumer.setSlotSharingGroup(slot);
            JobGraph jobGraph = new JobGraph(new JobVertex[]{producer, consumer});
            ActorGateway jobManager = flink.getLeaderGateway(deadline.timeLeft());
            flink.submitJobDetached(jobGraph);
            Object msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID());
            Future runningFuture = jobManager.ask(msg, deadline.timeLeft());
            Await.ready((Awaitable)runningFuture, (Duration)deadline.timeLeft());
            msg = new TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.CANCELED);
            Future cancelledFuture = jobManager.ask(msg, deadline.timeLeft());
            boolean producerBlocked = false;
            for (int i = 0; i < 50; ++i) {
                Thread thread = ASYNC_PRODUCER_THREAD;
                if (thread != null && thread.isAlive()) {
                    StackTraceElement[] stackTrace = thread.getStackTrace();
                    producerBlocked = this.isInBlockingBufferRequest(stackTrace);
                }
                if (producerBlocked) break;
                Thread.sleep(500L);
            }
            Assert.assertTrue((String)("Producer thread is not blocked: " + Arrays.toString(ASYNC_PRODUCER_THREAD.getStackTrace())), (boolean)producerBlocked);
            boolean consumerWaiting = false;
            for (int i = 0; i < 50; ++i) {
                Thread thread = ASYNC_CONSUMER_THREAD;
                if (thread != null && thread.isAlive()) {
                    boolean bl = consumerWaiting = thread.getState() == Thread.State.WAITING;
                }
                if (consumerWaiting) break;
                Thread.sleep(500L);
            }
            Assert.assertTrue((String)"Consumer thread is not blocked.", (boolean)consumerWaiting);
            msg = new JobManagerMessages.CancelJob(jobGraph.getJobID());
            Future cancelFuture = jobManager.ask(msg, deadline.timeLeft());
            Await.ready((Awaitable)cancelFuture, (Duration)deadline.timeLeft());
            Await.ready((Awaitable)cancelledFuture, (Duration)deadline.timeLeft());
            Assert.assertNotNull((Object)ASYNC_PRODUCER_EXCEPTION);
            Assert.assertEquals(IllegalStateException.class, ASYNC_PRODUCER_EXCEPTION.getClass());
            Assert.assertNotNull((Object)ASYNC_CONSUMER_EXCEPTION);
            Assert.assertEquals(IllegalStateException.class, ASYNC_CONSUMER_EXCEPTION.getClass());
        }
        finally {
            if (flink != null) {
                flink.stop();
            }
        }
    }

    private boolean isInBlockingBufferRequest(StackTraceElement[] stackTrace) {
        return stackTrace.length >= 3 && stackTrace[0].getMethodName().equals("wait") && stackTrace[1].getMethodName().equals("requestBuffer") && stackTrace[2].getMethodName().equals("requestBufferBlocking");
    }

    public static class AsyncConsumer
    extends AbstractInvokable {
        public void invoke() throws Exception {
            ConsumerThread consumer = new ConsumerThread(this.getEnvironment().getInputGate(0));
            ASYNC_CONSUMER_THREAD = consumer;
            consumer.start();
            while (consumer.isAlive()) {
                try {
                    consumer.join();
                }
                catch (InterruptedException interruptedException) {}
            }
        }

        private static class ConsumerThread
        extends Thread {
            private final InputGate inputGate;

            public ConsumerThread(InputGate inputGate) {
                this.inputGate = inputGate;
            }

            @Override
            public void run() {
                try {
                    while (true) {
                        this.inputGate.getNextBufferOrEvent();
                    }
                }
                catch (Exception e) {
                    ASYNC_CONSUMER_EXCEPTION = e;
                    return;
                }
            }
        }
    }

    public static class AsyncProducer
    extends AbstractInvokable {
        public void invoke() throws Exception {
            ProducerThread producer = new ProducerThread(this.getEnvironment().getWriter(0));
            ASYNC_PRODUCER_THREAD = producer;
            producer.start();
            while (producer.isAlive()) {
                try {
                    producer.join();
                }
                catch (InterruptedException interruptedException) {}
            }
        }

        private static class ProducerThread
        extends Thread {
            private final RecordWriter<LongValue> recordWriter;

            public ProducerThread(ResultPartitionWriter partitionWriter) {
                this.recordWriter = new RecordWriter(partitionWriter);
            }

            @Override
            public void run() {
                LongValue current = new LongValue(0L);
                try {
                    while (true) {
                        current.setValue(current.getValue() + 1L);
                        this.recordWriter.emit((IOReadableWritable)current);
                        this.recordWriter.flush();
                    }
                }
                catch (Exception e) {
                    ASYNC_PRODUCER_EXCEPTION = e;
                    return;
                }
            }
        }
    }
}

