/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskmanager;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.PermanentBlobCache;
import org.apache.flink.runtime.blob.TransientBlobCache;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

public class TaskAsyncCallTest
extends TestLogger {
    private static int numCalls;
    private static OneShotLatch awaitLatch;
    private static OneShotLatch triggerLatch;
    private static OneShotLatch notifyCheckpointCompleteLatch;
    private static OneShotLatch stopLatch;
    private static final List<ClassLoader> classLoaders;

    @Before
    public void createQueuesAndActors() {
        numCalls = 1000;
        awaitLatch = new OneShotLatch();
        triggerLatch = new OneShotLatch();
        notifyCheckpointCompleteLatch = new OneShotLatch();
        stopLatch = new OneShotLatch();
        classLoaders.clear();
    }

    @Test
    public void testCheckpointCallsInOrder() throws Exception {
        Task task = this.createTask(CheckpointsInOrderInvokable.class);
        try (TaskCleaner ignored = new TaskCleaner(task);){
            task.startTaskThread();
            awaitLatch.await();
            for (int i = 1; i <= numCalls; ++i) {
                task.triggerCheckpointBarrier((long)i, 156865867234L, CheckpointOptions.forCheckpointWithDefaultLocation());
            }
            triggerLatch.await();
            Assert.assertFalse((boolean)task.isCanceledOrFailed());
            ExecutionState currentState = task.getExecutionState();
            Assert.assertThat((Object)currentState, (Matcher)org.hamcrest.Matchers.isOneOf((Object[])new ExecutionState[]{ExecutionState.RUNNING, ExecutionState.FINISHED}));
        }
    }

    @Test
    public void testMixedAsyncCallsInOrder() throws Exception {
        Task task = this.createTask(CheckpointsInOrderInvokable.class);
        try (TaskCleaner ignored = new TaskCleaner(task);){
            task.startTaskThread();
            awaitLatch.await();
            for (int i = 1; i <= numCalls; ++i) {
                task.triggerCheckpointBarrier((long)i, 156865867234L, CheckpointOptions.forCheckpointWithDefaultLocation());
                task.notifyCheckpointComplete((long)i);
            }
            triggerLatch.await();
            Assert.assertFalse((boolean)task.isCanceledOrFailed());
            ExecutionState currentState = task.getExecutionState();
            Assert.assertThat((Object)currentState, (Matcher)org.hamcrest.Matchers.isOneOf((Object[])new ExecutionState[]{ExecutionState.RUNNING, ExecutionState.FINISHED}));
        }
    }

    @Test
    public void testThrowExceptionIfStopInvokedWithNotStoppableTask() throws Exception {
        Task task = this.createTask(CheckpointsInOrderInvokable.class);
        try (TaskCleaner ignored = new TaskCleaner(task);){
            task.startTaskThread();
            awaitLatch.await();
            try {
                task.stopExecution();
                Assert.fail((String)"Expected exception not thrown");
            }
            catch (UnsupportedOperationException e) {
                Assert.assertThat((Object)e.getMessage(), (Matcher)org.hamcrest.Matchers.containsString((String)"Stopping not supported by task"));
            }
        }
    }

    @Test
    public void testSetsUserCodeClassLoader() throws Exception {
        numCalls = 1;
        Task task = this.createTask(ContextClassLoaderInterceptingInvokable.class);
        try (TaskCleaner ignored = new TaskCleaner(task);){
            task.startTaskThread();
            awaitLatch.await();
            task.triggerCheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation());
            task.notifyCheckpointComplete(1L);
            task.stopExecution();
            triggerLatch.await();
            notifyCheckpointCompleteLatch.await();
            stopLatch.await();
            Assert.assertThat(classLoaders, (Matcher)org.hamcrest.Matchers.hasSize((Matcher)org.hamcrest.Matchers.greaterThanOrEqualTo((Comparable)Integer.valueOf(3))));
            Assert.assertThat(classLoaders, (Matcher)org.hamcrest.Matchers.everyItem((Matcher)org.hamcrest.Matchers.instanceOf(TestUserCodeClassLoader.class)));
        }
    }

    private Task createTask(Class<? extends AbstractInvokable> invokableClass) throws Exception {
        BlobCacheService blobService = new BlobCacheService((PermanentBlobCache)Mockito.mock(PermanentBlobCache.class), (TransientBlobCache)Mockito.mock(TransientBlobCache.class));
        LibraryCacheManager libCache = (LibraryCacheManager)Mockito.mock(LibraryCacheManager.class);
        Mockito.when((Object)libCache.getClassLoader((JobID)Matchers.any(JobID.class))).thenReturn((Object)new TestUserCodeClassLoader());
        ResultPartitionManager partitionManager = (ResultPartitionManager)Mockito.mock(ResultPartitionManager.class);
        NoOpResultPartitionConsumableNotifier consumableNotifier = new NoOpResultPartitionConsumableNotifier();
        PartitionProducerStateChecker partitionProducerStateChecker = (PartitionProducerStateChecker)Mockito.mock(PartitionProducerStateChecker.class);
        Executor executor = (Executor)Mockito.mock(Executor.class);
        TaskEventDispatcher taskEventDispatcher = (TaskEventDispatcher)Mockito.mock(TaskEventDispatcher.class);
        NetworkEnvironment networkEnvironment = (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class);
        Mockito.when((Object)networkEnvironment.getResultPartitionManager()).thenReturn((Object)partitionManager);
        Mockito.when((Object)networkEnvironment.getDefaultIOMode()).thenReturn((Object)IOManager.IOMode.SYNC);
        Mockito.when((Object)networkEnvironment.createKvStateTaskRegistry((JobID)Matchers.any(JobID.class), (JobVertexID)Matchers.any(JobVertexID.class))).thenReturn(Mockito.mock(TaskKvStateRegistry.class));
        Mockito.when((Object)networkEnvironment.getTaskEventDispatcher()).thenReturn((Object)taskEventDispatcher);
        TaskMetricGroup taskMetricGroup = (TaskMetricGroup)Mockito.mock(TaskMetricGroup.class);
        Mockito.when((Object)taskMetricGroup.getIOMetricGroup()).thenReturn(Mockito.mock(TaskIOMetricGroup.class));
        JobInformation jobInformation = new JobInformation(new JobID(), "Job Name", new SerializedValue((Object)new ExecutionConfig()), new Configuration(), Collections.emptyList(), Collections.emptyList());
        TaskInformation taskInformation = new TaskInformation(new JobVertexID(), "Test Task", 1, 1, invokableClass.getName(), new Configuration());
        return new Task(jobInformation, taskInformation, new ExecutionAttemptID(), new AllocationID(), 0, 0, Collections.emptyList(), Collections.emptyList(), 0, (MemoryManager)Mockito.mock(MemoryManager.class), (IOManager)Mockito.mock(IOManager.class), networkEnvironment, (BroadcastVariableManager)Mockito.mock(BroadcastVariableManager.class), (TaskStateManager)new TestTaskStateManager(), (TaskManagerActions)Mockito.mock(TaskManagerActions.class), (InputSplitProvider)Mockito.mock(InputSplitProvider.class), (CheckpointResponder)Mockito.mock(CheckpointResponder.class), blobService, libCache, (FileCache)Mockito.mock(FileCache.class), (TaskManagerRuntimeInfo)new TestingTaskManagerRuntimeInfo(), taskMetricGroup, (ResultPartitionConsumableNotifier)consumableNotifier, partitionProducerStateChecker, executor);
    }

    static {
        classLoaders = Collections.synchronizedList(new ArrayList());
    }

    private static class TaskCleaner
    implements AutoCloseable {
        private final Task task;

        private TaskCleaner(Task task) {
            this.task = task;
        }

        @Override
        public void close() throws Exception {
            this.task.cancelExecution();
            this.task.getExecutingThread().join(5000L);
        }
    }

    private static class TestUserCodeClassLoader
    extends ClassLoader {
        public TestUserCodeClassLoader() {
            super(ClassLoader.getSystemClassLoader());
        }
    }

    public static class ContextClassLoaderInterceptingInvokable
    extends CheckpointsInOrderInvokable
    implements StoppableTask {
        public ContextClassLoaderInterceptingInvokable(Environment environment) {
            super(environment);
        }

        @Override
        public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
            classLoaders.add(Thread.currentThread().getContextClassLoader());
            return super.triggerCheckpoint(checkpointMetaData, checkpointOptions);
        }

        @Override
        public void notifyCheckpointComplete(long checkpointId) {
            classLoaders.add(Thread.currentThread().getContextClassLoader());
            super.notifyCheckpointComplete(checkpointId);
        }

        public void stop() {
            classLoaders.add(Thread.currentThread().getContextClassLoader());
            stopLatch.trigger();
        }
    }

    public static class CheckpointsInOrderInvokable
    extends AbstractInvokable {
        private volatile long lastCheckpointId = 0L;
        private volatile Exception error;

        public CheckpointsInOrderInvokable(Environment environment) {
            super(environment);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke() throws Exception {
            awaitLatch.trigger();
            CheckpointsInOrderInvokable checkpointsInOrderInvokable = this;
            synchronized (checkpointsInOrderInvokable) {
                while (this.error == null) {
                    ((Object)((Object)this)).wait();
                }
            }
            if (this.error != null) {
                triggerLatch.trigger();
                notifyCheckpointCompleteLatch.trigger();
                stopLatch.trigger();
                throw this.error;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
            ++this.lastCheckpointId;
            if (checkpointMetaData.getCheckpointId() == this.lastCheckpointId) {
                if (this.lastCheckpointId == (long)numCalls) {
                    triggerLatch.trigger();
                }
            } else if (this.error == null) {
                this.error = new Exception("calls out of order");
                CheckpointsInOrderInvokable checkpointsInOrderInvokable = this;
                synchronized (checkpointsInOrderInvokable) {
                    ((Object)((Object)this)).notifyAll();
                }
            }
            return true;
        }

        public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
            throw new UnsupportedOperationException("Should not be called");
        }

        public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) {
            throw new UnsupportedOperationException("Should not be called");
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void notifyCheckpointComplete(long checkpointId) {
            if (checkpointId != this.lastCheckpointId && this.error == null) {
                this.error = new Exception("calls out of order");
                CheckpointsInOrderInvokable checkpointsInOrderInvokable = this;
                synchronized (checkpointsInOrderInvokable) {
                    ((Object)((Object)this)).notifyAll();
                }
            } else if (this.lastCheckpointId == (long)numCalls) {
                notifyCheckpointCompleteLatch.trigger();
            }
        }
    }
}

