/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskmanager;

import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.BlobView;
import org.apache.flink.runtime.blob.PermanentBlobCache;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.PermanentBlobService;
import org.apache.flink.runtime.blob.TransientBlobCache;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.WrappingRuntimeException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class TaskTest
extends TestLogger {
    private static OneShotLatch awaitLatch;
    private static OneShotLatch triggerLatch;
    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER;

    @Before
    public void setup() {
        awaitLatch = new OneShotLatch();
        triggerLatch = new OneShotLatch();
    }

    @Test
    public void testRegularExecution() throws Exception {
        QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
        Task task = new TaskBuilder().setTaskManagerActions(taskManagerActions).build();
        Assert.assertEquals((Object)ExecutionState.CREATED, (Object)task.getExecutionState());
        Assert.assertFalse((boolean)task.isCanceledOrFailed());
        Assert.assertNull((Object)task.getFailureCause());
        task.run();
        Assert.assertEquals((Object)ExecutionState.FINISHED, (Object)task.getExecutionState());
        Assert.assertFalse((boolean)task.isCanceledOrFailed());
        Assert.assertNull((Object)task.getFailureCause());
        Assert.assertNull((Object)task.getInvokable());
        taskManagerActions.validateListenerMessage(ExecutionState.RUNNING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.FINISHED, task, null);
    }

    @Test
    public void testCancelRightAway() throws Exception {
        Task task = new TaskBuilder().build();
        task.cancelExecution();
        Assert.assertEquals((Object)ExecutionState.CANCELING, (Object)task.getExecutionState());
        task.run();
        Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
        Assert.assertNull((Object)task.getInvokable());
    }

    @Test
    public void testFailExternallyRightAway() throws Exception {
        Task task = new TaskBuilder().build();
        task.failExternally((Throwable)new Exception("fail externally"));
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        task.run();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
    }

    @Test
    public void testLibraryCacheRegistrationFailed() throws Exception {
        QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
        Task task = new TaskBuilder().setTaskManagerActions(taskManagerActions).setLibraryCacheManager((LibraryCacheManager)Mockito.mock(LibraryCacheManager.class)).build();
        Assert.assertEquals((Object)ExecutionState.CREATED, (Object)task.getExecutionState());
        Assert.assertFalse((boolean)task.isCanceledOrFailed());
        Assert.assertNull((Object)task.getFailureCause());
        task.run();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Assert.assertNotNull((Object)task.getFailureCause());
        Assert.assertNotNull((Object)task.getFailureCause().getMessage());
        Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("classloader"));
        Assert.assertNull((Object)task.getInvokable());
        taskManagerActions.validateListenerMessage(ExecutionState.FAILED, task, new Exception("No user code classloader available."));
    }

    @Test
    public void testExecutionFailsInBlobsMissing() throws Exception {
        PermanentBlobKey missingKey = new PermanentBlobKey();
        Configuration config = new Configuration();
        config.setString(BlobServerOptions.STORAGE_DIRECTORY, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
        config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
        BlobServer blobServer = new BlobServer(config, (BlobStore)new VoidBlobStore());
        blobServer.start();
        InetSocketAddress serverAddress = new InetSocketAddress("localhost", blobServer.getPort());
        PermanentBlobCache permanentBlobCache = new PermanentBlobCache(config, (BlobView)new VoidBlobStore(), serverAddress);
        BlobLibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager((PermanentBlobService)permanentBlobCache, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST, new String[0]);
        Task task = new TaskBuilder().setRequiredJarFileBlobKeys(Collections.singletonList(missingKey)).setLibraryCacheManager((LibraryCacheManager)libraryCacheManager).build();
        Assert.assertEquals((Object)ExecutionState.CREATED, (Object)task.getExecutionState());
        Assert.assertFalse((boolean)task.isCanceledOrFailed());
        Assert.assertNull((Object)task.getFailureCause());
        task.run();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Assert.assertNotNull((Object)task.getFailureCause());
        Assert.assertNotNull((Object)task.getFailureCause().getMessage());
        Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("Failed to fetch BLOB"));
        Assert.assertNull((Object)task.getInvokable());
    }

    @Test
    public void testExecutionFailsInNetworkRegistration() throws Exception {
        ResultPartitionManager partitionManager = (ResultPartitionManager)Mockito.mock(ResultPartitionManager.class);
        NoOpResultPartitionConsumableNotifier consumableNotifier = new NoOpResultPartitionConsumableNotifier();
        PartitionProducerStateChecker partitionProducerStateChecker = (PartitionProducerStateChecker)Mockito.mock(PartitionProducerStateChecker.class);
        TaskEventDispatcher taskEventDispatcher = (TaskEventDispatcher)Mockito.mock(TaskEventDispatcher.class);
        NetworkEnvironment network = (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class);
        Mockito.when((Object)network.getResultPartitionManager()).thenReturn((Object)partitionManager);
        Mockito.when((Object)network.getDefaultIOMode()).thenReturn((Object)IOManager.IOMode.SYNC);
        Mockito.when((Object)network.getTaskEventDispatcher()).thenReturn((Object)taskEventDispatcher);
        ((NetworkEnvironment)Mockito.doThrow((Throwable[])new Throwable[]{new RuntimeException("buffers")}).when((Object)network)).registerTask((Task)ArgumentMatchers.any(Task.class));
        QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
        Task task = new TaskBuilder().setTaskManagerActions(taskManagerActions).setConsumableNotifier(consumableNotifier).setPartitionProducerStateChecker(partitionProducerStateChecker).setNetworkEnvironment(network).build();
        task.run();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("buffers"));
        taskManagerActions.validateListenerMessage(ExecutionState.FAILED, task, new RuntimeException("buffers"));
    }

    @Test
    public void testInvokableInstantiationFailed() throws Exception {
        QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
        Task task = new TaskBuilder().setTaskManagerActions(taskManagerActions).setInvokable(InvokableNonInstantiable.class).build();
        task.run();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("instantiate"));
        taskManagerActions.validateListenerMessage(ExecutionState.FAILED, task, (Throwable)new FlinkException("Could not instantiate the task's invokable class."));
    }

    @Test
    public void testExecutionFailsInInvoke() throws Exception {
        QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
        Task task = new TaskBuilder().setInvokable(InvokableWithExceptionInInvoke.class).setTaskManagerActions(taskManagerActions).build();
        task.run();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Assert.assertNotNull((Object)task.getFailureCause());
        Assert.assertNotNull((Object)task.getFailureCause().getMessage());
        Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("test"));
        taskManagerActions.validateListenerMessage(ExecutionState.RUNNING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.FAILED, task, new Exception("test"));
    }

    @Test
    public void testFailWithWrappedException() throws Exception {
        QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
        Task task = new TaskBuilder().setInvokable(FailingInvokableWithChainedException.class).setTaskManagerActions(taskManagerActions).build();
        task.run();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Throwable cause = task.getFailureCause();
        Assert.assertTrue((boolean)(cause instanceof IOException));
        taskManagerActions.validateListenerMessage(ExecutionState.RUNNING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.FAILED, task, new IOException("test"));
    }

    @Test
    public void testCancelDuringInvoke() throws Exception {
        QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
        Task task = new TaskBuilder().setInvokable(InvokableBlockingInInvoke.class).setTaskManagerActions(taskManagerActions).build();
        task.startTaskThread();
        awaitLatch.await();
        task.cancelExecution();
        Assert.assertTrue((task.getExecutionState() == ExecutionState.CANCELING || task.getExecutionState() == ExecutionState.CANCELED ? 1 : 0) != 0);
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Assert.assertNull((Object)task.getFailureCause());
        taskManagerActions.validateListenerMessage(ExecutionState.RUNNING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.CANCELED, task, null);
    }

    @Test
    public void testFailExternallyDuringInvoke() throws Exception {
        QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
        Task task = new TaskBuilder().setInvokable(InvokableBlockingInInvoke.class).setTaskManagerActions(taskManagerActions).build();
        task.startTaskThread();
        awaitLatch.await();
        task.failExternally((Throwable)new Exception("test"));
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("test"));
        taskManagerActions.validateListenerMessage(ExecutionState.RUNNING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.FAILED, task, new Exception("test"));
    }

    @Test
    public void testCanceledAfterExecutionFailedInInvoke() throws Exception {
        QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
        Task task = new TaskBuilder().setInvokable(InvokableWithExceptionInInvoke.class).setTaskManagerActions(taskManagerActions).build();
        task.run();
        task.cancelExecution();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("test"));
        taskManagerActions.validateListenerMessage(ExecutionState.RUNNING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.FAILED, task, new Exception("test"));
    }

    @Test
    public void testExecutionFailsAfterCanceling() throws Exception {
        QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
        Task task = new TaskBuilder().setInvokable(InvokableWithExceptionOnTrigger.class).setTaskManagerActions(taskManagerActions).build();
        task.startTaskThread();
        awaitLatch.await();
        task.cancelExecution();
        Assert.assertEquals((Object)ExecutionState.CANCELING, (Object)task.getExecutionState());
        triggerLatch.trigger();
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Assert.assertNull((Object)task.getFailureCause());
        taskManagerActions.validateListenerMessage(ExecutionState.RUNNING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.CANCELED, task, null);
    }

    @Test
    public void testExecutionFailsAfterTaskMarkedFailed() throws Exception {
        QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
        Task task = new TaskBuilder().setInvokable(InvokableWithExceptionOnTrigger.class).setTaskManagerActions(taskManagerActions).build();
        task.startTaskThread();
        awaitLatch.await();
        task.failExternally((Throwable)new Exception("external"));
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        triggerLatch.trigger();
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("external"));
        taskManagerActions.validateListenerMessage(ExecutionState.RUNNING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.FAILED, task, new Exception("external"));
    }

    @Test
    public void testCancelTaskException() throws Exception {
        Task task = new TaskBuilder().setInvokable(InvokableWithCancelTaskExceptionInInvoke.class).build();
        triggerLatch.trigger();
        task.run();
        Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
    }

    @Test
    public void testCancelTaskExceptionAfterTaskMarkedFailed() throws Exception {
        Task task = new TaskBuilder().setInvokable(InvokableWithCancelTaskExceptionInInvoke.class).build();
        task.startTaskThread();
        awaitLatch.await();
        task.failExternally((Throwable)new Exception("external"));
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        triggerLatch.trigger();
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("external"));
    }

    @Test
    public void testOnPartitionStateUpdate() throws Exception {
        IntermediateDataSetID resultId = new IntermediateDataSetID();
        ResultPartitionID partitionId = new ResultPartitionID();
        SingleInputGate inputGate = (SingleInputGate)Mockito.mock(SingleInputGate.class);
        Mockito.when((Object)inputGate.getConsumedResultId()).thenReturn((Object)resultId);
        Task task = new TaskBuilder().setInvokable(InvokableBlockingInInvoke.class).build();
        this.setInputGate(task, inputGate);
        HashMap<ExecutionState, ExecutionState> expected = new HashMap<ExecutionState, ExecutionState>(ExecutionState.values().length);
        for (ExecutionState state : ExecutionState.values()) {
            expected.put(state, ExecutionState.FAILED);
        }
        expected.put(ExecutionState.RUNNING, ExecutionState.RUNNING);
        expected.put(ExecutionState.SCHEDULED, ExecutionState.RUNNING);
        expected.put(ExecutionState.DEPLOYING, ExecutionState.RUNNING);
        expected.put(ExecutionState.FINISHED, ExecutionState.RUNNING);
        expected.put(ExecutionState.CANCELED, ExecutionState.CANCELING);
        expected.put(ExecutionState.CANCELING, ExecutionState.CANCELING);
        expected.put(ExecutionState.FAILED, ExecutionState.CANCELING);
        for (ExecutionState state : ExecutionState.values()) {
            this.setState(task, ExecutionState.RUNNING);
            task.onPartitionStateUpdate(resultId, partitionId, state);
            ExecutionState newTaskState = task.getExecutionState();
            Assert.assertEquals(expected.get(state), (Object)newTaskState);
        }
        ((SingleInputGate)Mockito.verify((Object)inputGate, (VerificationMode)Mockito.times((int)4))).retriggerPartitionRequest((IntermediateResultPartitionID)ArgumentMatchers.eq((Object)partitionId.getPartitionId()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTriggerPartitionStateUpdate() throws Exception {
        CompletableFuture<ExecutionState> promise;
        IntermediateDataSetID resultId = new IntermediateDataSetID();
        ResultPartitionID partitionId = new ResultPartitionID();
        PartitionProducerStateChecker partitionChecker = (PartitionProducerStateChecker)Mockito.mock(PartitionProducerStateChecker.class);
        TaskEventDispatcher taskEventDispatcher = (TaskEventDispatcher)Mockito.mock(TaskEventDispatcher.class);
        NoOpResultPartitionConsumableNotifier consumableNotifier = new NoOpResultPartitionConsumableNotifier();
        NetworkEnvironment network = (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class);
        Mockito.when((Object)network.getResultPartitionManager()).thenReturn(Mockito.mock(ResultPartitionManager.class));
        Mockito.when((Object)network.getDefaultIOMode()).thenReturn((Object)IOManager.IOMode.SYNC);
        Mockito.when((Object)network.createKvStateTaskRegistry((JobID)ArgumentMatchers.any(JobID.class), (JobVertexID)ArgumentMatchers.any(JobVertexID.class))).thenReturn(Mockito.mock(TaskKvStateRegistry.class));
        Mockito.when((Object)network.getTaskEventDispatcher()).thenReturn((Object)taskEventDispatcher);
        this.setup();
        Task task = new TaskBuilder().setInvokable(InvokableBlockingInInvoke.class).setNetworkEnvironment(network).setConsumableNotifier(consumableNotifier).setPartitionProducerStateChecker(partitionChecker).setExecutor(Executors.directExecutor()).build();
        CompletableFuture promise2 = new CompletableFuture();
        Mockito.when((Object)partitionChecker.requestPartitionProducerState((JobID)ArgumentMatchers.eq((Object)task.getJobID()), (IntermediateDataSetID)ArgumentMatchers.eq((Object)resultId), (ResultPartitionID)ArgumentMatchers.eq((Object)partitionId))).thenReturn(promise2);
        task.triggerPartitionProducerStateCheck(task.getJobID(), resultId, partitionId);
        promise2.completeExceptionally((Throwable)new PartitionProducerDisposedException(partitionId));
        Assert.assertEquals((Object)ExecutionState.CANCELING, (Object)task.getExecutionState());
        this.setup();
        task = new TaskBuilder().setInvokable(InvokableBlockingInInvoke.class).setNetworkEnvironment(network).setConsumableNotifier(consumableNotifier).setPartitionProducerStateChecker(partitionChecker).setExecutor(Executors.directExecutor()).build();
        promise2 = new CompletableFuture();
        Mockito.when((Object)partitionChecker.requestPartitionProducerState((JobID)ArgumentMatchers.eq((Object)task.getJobID()), (IntermediateDataSetID)ArgumentMatchers.eq((Object)resultId), (ResultPartitionID)ArgumentMatchers.eq((Object)partitionId))).thenReturn(promise2);
        task.triggerPartitionProducerStateCheck(task.getJobID(), resultId, partitionId);
        promise2.completeExceptionally(new RuntimeException("Any other exception"));
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        this.setup();
        task = new TaskBuilder().setInvokable(InvokableBlockingInInvoke.class).setNetworkEnvironment(network).setConsumableNotifier(consumableNotifier).setPartitionProducerStateChecker(partitionChecker).setExecutor(Executors.directExecutor()).build();
        SingleInputGate inputGate = (SingleInputGate)Mockito.mock(SingleInputGate.class);
        Mockito.when((Object)inputGate.getConsumedResultId()).thenReturn((Object)resultId);
        try {
            task.startTaskThread();
            awaitLatch.await();
            this.setInputGate(task, inputGate);
            promise = new CompletableFuture<ExecutionState>();
            Mockito.when((Object)partitionChecker.requestPartitionProducerState((JobID)ArgumentMatchers.eq((Object)task.getJobID()), (IntermediateDataSetID)ArgumentMatchers.eq((Object)resultId), (ResultPartitionID)ArgumentMatchers.eq((Object)partitionId))).thenReturn(promise);
            task.triggerPartitionProducerStateCheck(task.getJobID(), resultId, partitionId);
            promise.completeExceptionally(new TimeoutException());
            Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)task.getExecutionState());
            ((SingleInputGate)Mockito.verify((Object)inputGate, (VerificationMode)Mockito.times((int)1))).retriggerPartitionRequest((IntermediateResultPartitionID)ArgumentMatchers.eq((Object)partitionId.getPartitionId()));
        }
        finally {
            task.getExecutingThread().interrupt();
            task.getExecutingThread().join();
        }
        this.setup();
        task = new TaskBuilder().setInvokable(InvokableBlockingInInvoke.class).setNetworkEnvironment(network).setConsumableNotifier(consumableNotifier).setPartitionProducerStateChecker(partitionChecker).setExecutor(Executors.directExecutor()).build();
        inputGate = (SingleInputGate)Mockito.mock(SingleInputGate.class);
        Mockito.when((Object)inputGate.getConsumedResultId()).thenReturn((Object)resultId);
        try {
            task.startTaskThread();
            awaitLatch.await();
            this.setInputGate(task, inputGate);
            promise = new CompletableFuture();
            Mockito.when((Object)partitionChecker.requestPartitionProducerState((JobID)ArgumentMatchers.eq((Object)task.getJobID()), (IntermediateDataSetID)ArgumentMatchers.eq((Object)resultId), (ResultPartitionID)ArgumentMatchers.eq((Object)partitionId))).thenReturn(promise);
            task.triggerPartitionProducerStateCheck(task.getJobID(), resultId, partitionId);
            promise.complete(ExecutionState.RUNNING);
            Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)task.getExecutionState());
            ((SingleInputGate)Mockito.verify((Object)inputGate, (VerificationMode)Mockito.times((int)1))).retriggerPartitionRequest((IntermediateResultPartitionID)ArgumentMatchers.eq((Object)partitionId.getPartitionId()));
        }
        finally {
            task.getExecutingThread().interrupt();
            task.getExecutingThread().join();
        }
    }

    @Test
    public void testWatchDogInterruptsTask() throws Exception {
        ProhibitFatalErrorTaskManagerActions taskManagerActions = new ProhibitFatalErrorTaskManagerActions();
        Configuration config = new Configuration();
        config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL.key(), 5L);
        config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT.key(), 60000L);
        Task task = new TaskBuilder().setInvokable(InvokableBlockingInCancel.class).setTaskManagerConfig(config).setTaskManagerActions(taskManagerActions).build();
        task.startTaskThread();
        awaitLatch.await();
        task.cancelExecution();
        task.getExecutingThread().join();
    }

    @Test
    public void testInterruptibleSharedLockInInvokeAndCancel() throws Exception {
        ProhibitFatalErrorTaskManagerActions taskManagerActions = new ProhibitFatalErrorTaskManagerActions();
        Configuration config = new Configuration();
        config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL, 5L);
        config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 50L);
        Task task = new TaskBuilder().setInvokable(InvokableInterruptibleSharedLockInInvokeAndCancel.class).setTaskManagerConfig(config).setTaskManagerActions(taskManagerActions).build();
        task.startTaskThread();
        awaitLatch.await();
        task.cancelExecution();
        task.getExecutingThread().join();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFatalErrorAfterUnInterruptibleInvoke() throws Exception {
        AwaitFatalErrorTaskManagerActions taskManagerActions = new AwaitFatalErrorTaskManagerActions();
        Configuration config = new Configuration();
        config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL, 5L);
        config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 50L);
        Task task = new TaskBuilder().setInvokable(InvokableUnInterruptibleBlockingInvoke.class).setTaskManagerConfig(config).setTaskManagerActions(taskManagerActions).build();
        try {
            task.startTaskThread();
            awaitLatch.await();
            task.cancelExecution();
            taskManagerActions.latch.await();
        }
        finally {
            triggerLatch.trigger();
            task.getExecutingThread().interrupt();
            task.getExecutingThread().join();
        }
    }

    @Test
    public void testTaskConfig() throws Exception {
        long interval = 28218123L;
        long timeout = interval + 19292L;
        Configuration config = new Configuration();
        config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL, interval);
        config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, timeout);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setTaskCancellationInterval(interval + 1337L);
        executionConfig.setTaskCancellationTimeout(timeout - 1337L);
        Task task = new TaskBuilder().setInvokable(InvokableBlockingInInvoke.class).setTaskManagerConfig(config).setExecutionConfig(executionConfig).build();
        Assert.assertEquals((long)interval, (long)task.getTaskCancellationInterval());
        Assert.assertEquals((long)timeout, (long)task.getTaskCancellationTimeout());
        task.startTaskThread();
        awaitLatch.await();
        Assert.assertEquals((long)executionConfig.getTaskCancellationInterval(), (long)task.getTaskCancellationInterval());
        Assert.assertEquals((long)executionConfig.getTaskCancellationTimeout(), (long)task.getTaskCancellationTimeout());
        task.getExecutingThread().interrupt();
        task.getExecutingThread().join();
    }

    private void setInputGate(Task task, SingleInputGate inputGate) {
        try {
            Field f = Task.class.getDeclaredField("inputGates");
            f.setAccessible(true);
            f.set(task, new SingleInputGate[]{inputGate});
            HashMap<IntermediateDataSetID, SingleInputGate> byId = new HashMap<IntermediateDataSetID, SingleInputGate>(1);
            byId.put(inputGate.getConsumedResultId(), inputGate);
            f = Task.class.getDeclaredField("inputGatesById");
            f.setAccessible(true);
            f.set(task, byId);
        }
        catch (Exception e) {
            throw new RuntimeException("Modifying the task state failed", e);
        }
    }

    private void setState(Task task, ExecutionState state) {
        try {
            Field f = Task.class.getDeclaredField("executionState");
            f.setAccessible(true);
            f.set(task, state);
        }
        catch (Exception e) {
            throw new RuntimeException("Modifying the task state failed", e);
        }
    }

    static {
        TEMPORARY_FOLDER = new TemporaryFolder();
    }

    private static class TestWrappedException
    extends WrappingRuntimeException {
        private static final long serialVersionUID = 1L;

        TestWrappedException(@Nonnull Throwable cause) {
            super(cause);
        }
    }

    public static final class InvokableUnInterruptibleBlockingInvoke
    extends AbstractInvokable {
        public InvokableUnInterruptibleBlockingInvoke(Environment environment) {
            super(environment);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke() {
            while (!triggerLatch.isTriggered()) {
                try {
                    InvokableUnInterruptibleBlockingInvoke invokableUnInterruptibleBlockingInvoke = this;
                    synchronized (invokableUnInterruptibleBlockingInvoke) {
                        awaitLatch.trigger();
                        ((Object)((Object)this)).wait();
                    }
                }
                catch (InterruptedException interruptedException) {
                }
            }
        }

        public void cancel() {
        }
    }

    public static final class InvokableInterruptibleSharedLockInInvokeAndCancel
    extends AbstractInvokable {
        private final Object lock = new Object();

        public InvokableInterruptibleSharedLockInInvokeAndCancel(Environment environment) {
            super(environment);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke() throws Exception {
            Object object = this.lock;
            synchronized (object) {
                awaitLatch.trigger();
                ((Object)((Object)this)).wait();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void cancel() {
            Object object = this.lock;
            synchronized (object) {
                triggerLatch.trigger();
            }
        }
    }

    public static final class InvokableBlockingInCancel
    extends AbstractInvokable {
        public InvokableBlockingInCancel(Environment environment) {
            super(environment);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke() {
            awaitLatch.trigger();
            try {
                triggerLatch.await();
                InvokableBlockingInCancel invokableBlockingInCancel = this;
                synchronized (invokableBlockingInCancel) {
                    ((Object)((Object)this)).wait();
                }
            }
            catch (InterruptedException ignored) {
                InvokableBlockingInCancel invokableBlockingInCancel = this;
                synchronized (invokableBlockingInCancel) {
                    ((Object)((Object)this)).notifyAll();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void cancel() throws Exception {
            InvokableBlockingInCancel invokableBlockingInCancel = this;
            synchronized (invokableBlockingInCancel) {
                triggerLatch.trigger();
                ((Object)((Object)this)).wait();
            }
        }
    }

    public static final class InvokableWithCancelTaskExceptionInInvoke
    extends AbstractInvokable {
        public InvokableWithCancelTaskExceptionInInvoke(Environment environment) {
            super(environment);
        }

        public void invoke() {
            awaitLatch.trigger();
            try {
                triggerLatch.await();
            }
            catch (Throwable throwable) {
                // empty catch block
            }
            throw new CancelTaskException();
        }
    }

    public static final class InvokableWithExceptionOnTrigger
    extends AbstractInvokable {
        public InvokableWithExceptionOnTrigger(Environment environment) {
            super(environment);
        }

        public void invoke() {
            awaitLatch.trigger();
            while (true) {
                try {
                    triggerLatch.await();
                }
                catch (InterruptedException interruptedException) {
                    continue;
                }
                break;
            }
            throw new RuntimeException("test");
        }
    }

    private static final class InvokableBlockingInInvoke
    extends AbstractInvokable {
        public InvokableBlockingInInvoke(Environment environment) {
            super(environment);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke() throws Exception {
            awaitLatch.trigger();
            InvokableBlockingInInvoke invokableBlockingInInvoke = this;
            synchronized (invokableBlockingInInvoke) {
                ((Object)((Object)this)).wait();
            }
        }
    }

    private static final class FailingInvokableWithChainedException
    extends AbstractInvokable {
        public FailingInvokableWithChainedException(Environment environment) {
            super(environment);
        }

        public void invoke() {
            throw new TestWrappedException(new IOException("test"));
        }

        public void cancel() {
        }
    }

    private static final class InvokableWithExceptionInInvoke
    extends AbstractInvokable {
        public InvokableWithExceptionInInvoke(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            throw new Exception("test");
        }
    }

    private static abstract class InvokableNonInstantiable
    extends AbstractInvokable {
        public InvokableNonInstantiable(Environment environment) {
            super(environment);
        }
    }

    private static final class TestInvokableCorrect
    extends AbstractInvokable {
        public TestInvokableCorrect(Environment environment) {
            super(environment);
        }

        public void invoke() {
        }

        public void cancel() {
            Assert.fail((String)"This should not be called");
        }
    }

    private final class TaskBuilder {
        private Class<? extends AbstractInvokable> invokable = TestInvokableCorrect.class;
        private TaskManagerActions taskManagerActions = (TaskManagerActions)Mockito.mock(TaskManagerActions.class);
        private LibraryCacheManager libraryCacheManager = (LibraryCacheManager)Mockito.mock(LibraryCacheManager.class);
        private ResultPartitionConsumableNotifier consumableNotifier;
        private PartitionProducerStateChecker partitionProducerStateChecker;
        private NetworkEnvironment networkEnvironment;
        private Executor executor;
        private Configuration taskManagerConfig;
        private ExecutionConfig executionConfig;
        private Collection<PermanentBlobKey> requiredJarFileBlobKeys;

        private TaskBuilder() {
            Mockito.when((Object)this.libraryCacheManager.getClassLoader((JobID)ArgumentMatchers.any(JobID.class))).thenReturn((Object)this.getClass().getClassLoader());
            this.consumableNotifier = new NoOpResultPartitionConsumableNotifier();
            this.partitionProducerStateChecker = (PartitionProducerStateChecker)Mockito.mock(PartitionProducerStateChecker.class);
            ResultPartitionManager partitionManager = (ResultPartitionManager)Mockito.mock(ResultPartitionManager.class);
            TaskEventDispatcher taskEventDispatcher = (TaskEventDispatcher)Mockito.mock(TaskEventDispatcher.class);
            this.networkEnvironment = (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class);
            Mockito.when((Object)this.networkEnvironment.getResultPartitionManager()).thenReturn((Object)partitionManager);
            Mockito.when((Object)this.networkEnvironment.getDefaultIOMode()).thenReturn((Object)IOManager.IOMode.SYNC);
            Mockito.when((Object)this.networkEnvironment.createKvStateTaskRegistry((JobID)ArgumentMatchers.any(JobID.class), (JobVertexID)ArgumentMatchers.any(JobVertexID.class))).thenReturn(Mockito.mock(TaskKvStateRegistry.class));
            Mockito.when((Object)this.networkEnvironment.getTaskEventDispatcher()).thenReturn((Object)taskEventDispatcher);
            this.executor = TestingUtils.defaultExecutor();
            this.taskManagerConfig = new Configuration();
            this.executionConfig = new ExecutionConfig();
            this.requiredJarFileBlobKeys = Collections.emptyList();
        }

        TaskBuilder setInvokable(Class<? extends AbstractInvokable> invokable) {
            this.invokable = invokable;
            return this;
        }

        TaskBuilder setTaskManagerActions(TaskManagerActions taskManagerActions) {
            this.taskManagerActions = taskManagerActions;
            return this;
        }

        TaskBuilder setLibraryCacheManager(LibraryCacheManager libraryCacheManager) {
            this.libraryCacheManager = libraryCacheManager;
            return this;
        }

        TaskBuilder setConsumableNotifier(ResultPartitionConsumableNotifier consumableNotifier) {
            this.consumableNotifier = consumableNotifier;
            return this;
        }

        TaskBuilder setPartitionProducerStateChecker(PartitionProducerStateChecker partitionProducerStateChecker) {
            this.partitionProducerStateChecker = partitionProducerStateChecker;
            return this;
        }

        TaskBuilder setNetworkEnvironment(NetworkEnvironment networkEnvironment) {
            this.networkEnvironment = networkEnvironment;
            return this;
        }

        TaskBuilder setExecutor(Executor executor) {
            this.executor = executor;
            return this;
        }

        TaskBuilder setTaskManagerConfig(Configuration taskManagerConfig) {
            this.taskManagerConfig = taskManagerConfig;
            return this;
        }

        TaskBuilder setExecutionConfig(ExecutionConfig executionConfig) {
            this.executionConfig = executionConfig;
            return this;
        }

        TaskBuilder setRequiredJarFileBlobKeys(Collection<PermanentBlobKey> requiredJarFileBlobKeys) {
            this.requiredJarFileBlobKeys = requiredJarFileBlobKeys;
            return this;
        }

        private Task build() throws Exception {
            JobID jobId = new JobID();
            JobVertexID jobVertexId = new JobVertexID();
            ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
            SerializedValue serializedExecutionConfig = new SerializedValue((Object)this.executionConfig);
            JobInformation jobInformation = new JobInformation(jobId, "Test Job", serializedExecutionConfig, new Configuration(), this.requiredJarFileBlobKeys, Collections.emptyList());
            TaskInformation taskInformation = new TaskInformation(jobVertexId, "Test Task", 1, 1, this.invokable.getName(), new Configuration());
            BlobCacheService blobCacheService = new BlobCacheService((PermanentBlobCache)Mockito.mock(PermanentBlobCache.class), (TransientBlobCache)Mockito.mock(TransientBlobCache.class));
            TaskMetricGroup taskMetricGroup = (TaskMetricGroup)Mockito.mock(TaskMetricGroup.class);
            Mockito.when((Object)taskMetricGroup.getIOMetricGroup()).thenReturn(Mockito.mock(TaskIOMetricGroup.class));
            return new Task(jobInformation, taskInformation, executionAttemptId, new AllocationID(), 0, 0, Collections.emptyList(), Collections.emptyList(), 0, (MemoryManager)Mockito.mock(MemoryManager.class), (IOManager)Mockito.mock(IOManager.class), this.networkEnvironment, (BroadcastVariableManager)Mockito.mock(BroadcastVariableManager.class), (TaskStateManager)new TestTaskStateManager(), this.taskManagerActions, (InputSplitProvider)new MockInputSplitProvider(), (CheckpointResponder)new TestCheckpointResponder(), blobCacheService, this.libraryCacheManager, (FileCache)Mockito.mock(FileCache.class), (TaskManagerRuntimeInfo)new TestingTaskManagerRuntimeInfo(this.taskManagerConfig), taskMetricGroup, this.consumableNotifier, this.partitionProducerStateChecker, this.executor);
        }
    }

    private static class AwaitFatalErrorTaskManagerActions
    extends NoOpTaskManagerActions {
        private final OneShotLatch latch = new OneShotLatch();

        private AwaitFatalErrorTaskManagerActions() {
        }

        @Override
        public void notifyFatalError(String message, Throwable cause) {
            this.latch.trigger();
        }
    }

    private static class ProhibitFatalErrorTaskManagerActions
    extends NoOpTaskManagerActions {
        private ProhibitFatalErrorTaskManagerActions() {
        }

        @Override
        public void notifyFatalError(String message, Throwable cause) {
            throw new RuntimeException("Unexpected FatalError notification");
        }
    }

    private static class QueuedNoOpTaskManagerActions
    extends NoOpTaskManagerActions {
        private final BlockingQueue<TaskExecutionState> queue = new LinkedBlockingDeque<TaskExecutionState>();

        private QueuedNoOpTaskManagerActions() {
        }

        @Override
        public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
            this.queue.offer(taskExecutionState);
        }

        private void validateListenerMessage(ExecutionState state, Task task, Throwable error) {
            try {
                TaskExecutionState taskState = this.queue.take();
                Assert.assertNotNull((String)"There is no additional listener message", (Object)state);
                Assert.assertEquals((Object)task.getJobID(), (Object)taskState.getJobID());
                Assert.assertEquals((Object)task.getExecutionId(), (Object)taskState.getID());
                Assert.assertEquals((Object)state, (Object)taskState.getExecutionState());
                Throwable t = taskState.getError(this.getClass().getClassLoader());
                if (error == null) {
                    Assert.assertNull((Object)t);
                } else {
                    Assert.assertEquals((Object)error.toString(), (Object)t.toString());
                }
            }
            catch (InterruptedException e) {
                Assert.fail((String)"interrupted");
            }
        }
    }
}

