/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import akka.dispatch.Futures;
import java.io.Closeable;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.PermanentBlobCache;
import org.apache.flink.runtime.blob.TransientBlobCache;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateBackendFactory;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskExecutionStateListener;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.DirectExecutorService;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.impl.Promise;

@RunWith(value=PowerMockRunner.class)
@PrepareForTest(value={StreamTask.class})
@PowerMockIgnore(value={"org.apache.log4j.*"})
public class StreamTaskTest
extends TestLogger {
    private static OneShotLatch syncLatch;

    @Test
    public void testEarlyCanceling() throws Exception {
        Deadline deadline = new FiniteDuration(2L, TimeUnit.MINUTES).fromNow();
        StreamConfig cfg = new StreamConfig(new Configuration());
        cfg.setOperatorID(new OperatorID(4711L, 42L));
        cfg.setStreamOperator((StreamOperator)new SlowlyDeserializingOperator());
        cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        Task task = StreamTaskTest.createTask(SourceStreamTask.class, cfg, new Configuration());
        TestingExecutionStateListener testingExecutionStateListener = new TestingExecutionStateListener();
        task.registerExecutionListener((TaskExecutionStateListener)testingExecutionStateListener);
        task.startTaskThread();
        Future<ExecutionState> running = testingExecutionStateListener.notifyWhenExecutionState(ExecutionState.RUNNING);
        ExecutionState executionState = (ExecutionState)Await.result(running, (Duration)deadline.timeLeft());
        if (executionState != ExecutionState.RUNNING) {
            Assert.fail((String)("Task entered state " + task.getExecutionState() + " with error " + ExceptionUtils.stringifyException((Throwable)task.getFailureCause())));
        }
        task.cancelExecution();
        Future<ExecutionState> canceling = testingExecutionStateListener.notifyWhenExecutionState(ExecutionState.CANCELING);
        executionState = (ExecutionState)Await.result(canceling, (Duration)deadline.timeLeft());
        Assert.assertTrue((executionState == ExecutionState.CANCELING || executionState == ExecutionState.CANCELED ? 1 : 0) != 0);
        task.getExecutingThread().join(deadline.timeLeft().toMillis());
        Assert.assertFalse((String)"Task did not cancel", (boolean)task.getExecutingThread().isAlive());
        Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
    }

    @Test
    public void testStateBackendLoadingAndClosing() throws Exception {
        Configuration taskManagerConfig = new Configuration();
        taskManagerConfig.setString(CoreOptions.STATE_BACKEND, MockStateBackend.class.getName());
        StreamConfig cfg = new StreamConfig(new Configuration());
        cfg.setOperatorID(new OperatorID(4711L, 42L));
        cfg.setStreamOperator((StreamOperator)new StreamSource((SourceFunction)new MockSourceFunction()));
        cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        Task task = StreamTaskTest.createTask(StateBackendTestSource.class, cfg, taskManagerConfig);
        StateBackendTestSource.fail = false;
        task.startTaskThread();
        task.getExecutingThread().join();
        ((OperatorStateBackend)Mockito.verify((Object)StateBackendTestSource.operatorStateBackend)).close();
        ((AbstractKeyedStateBackend)Mockito.verify((Object)StateBackendTestSource.keyedStateBackend)).close();
        Assert.assertEquals((Object)ExecutionState.FINISHED, (Object)task.getExecutionState());
    }

    @Test
    public void testStateBackendClosingOnFailure() throws Exception {
        Configuration taskManagerConfig = new Configuration();
        taskManagerConfig.setString(CoreOptions.STATE_BACKEND, MockStateBackend.class.getName());
        StreamConfig cfg = new StreamConfig(new Configuration());
        cfg.setOperatorID(new OperatorID(4711L, 42L));
        cfg.setStreamOperator((StreamOperator)new StreamSource((SourceFunction)new MockSourceFunction()));
        cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        Task task = StreamTaskTest.createTask(StateBackendTestSource.class, cfg, taskManagerConfig);
        StateBackendTestSource.fail = true;
        task.startTaskThread();
        task.getExecutingThread().join();
        ((OperatorStateBackend)Mockito.verify((Object)StateBackendTestSource.operatorStateBackend)).close();
        ((AbstractKeyedStateBackend)Mockito.verify((Object)StateBackendTestSource.keyedStateBackend)).close();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
    }

    @Test
    public void testCancellationNotBlockedOnLock() throws Exception {
        syncLatch = new OneShotLatch();
        StreamConfig cfg = new StreamConfig(new Configuration());
        Task task = StreamTaskTest.createTask(CancelLockingTask.class, cfg, new Configuration());
        task.startTaskThread();
        syncLatch.await();
        task.cancelExecution();
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
    }

    @Test
    public void testCancellationFailsWithBlockingLock() throws Exception {
        syncLatch = new OneShotLatch();
        StreamConfig cfg = new StreamConfig(new Configuration());
        Task task = StreamTaskTest.createTask(CancelFailingTask.class, cfg, new Configuration());
        task.startTaskThread();
        syncLatch.await();
        task.cancelExecution();
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
    }

    @Test
    public void testFailingCheckpointStreamOperator() throws Exception {
        long checkpointId = 42L;
        long timestamp = 1L;
        TaskInfo mockTaskInfo = (TaskInfo)Mockito.mock(TaskInfo.class);
        Mockito.when((Object)mockTaskInfo.getTaskNameWithSubtasks()).thenReturn((Object)"foobar");
        Mockito.when((Object)mockTaskInfo.getIndexOfThisSubtask()).thenReturn((Object)0);
        Environment mockEnvironment = (Environment)Mockito.mock(Environment.class);
        Mockito.when((Object)mockEnvironment.getTaskInfo()).thenReturn((Object)mockTaskInfo);
        StreamTask streamTask = (StreamTask)Mockito.mock(StreamTask.class, (Answer)Mockito.CALLS_REAL_METHODS);
        CheckpointMetaData checkpointMetaData = new CheckpointMetaData(42L, 1L);
        streamTask.setEnvironment(mockEnvironment);
        StreamOperator streamOperator1 = (StreamOperator)Mockito.mock(StreamOperator.class);
        StreamOperator streamOperator2 = (StreamOperator)Mockito.mock(StreamOperator.class);
        StreamOperator streamOperator3 = (StreamOperator)Mockito.mock(StreamOperator.class);
        OperatorSnapshotResult operatorSnapshotResult1 = (OperatorSnapshotResult)Mockito.mock(OperatorSnapshotResult.class);
        OperatorSnapshotResult operatorSnapshotResult2 = (OperatorSnapshotResult)Mockito.mock(OperatorSnapshotResult.class);
        Exception testException = new Exception("Test exception");
        Mockito.when((Object)streamOperator1.snapshotState(Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions)Matchers.any(CheckpointOptions.class))).thenReturn((Object)operatorSnapshotResult1);
        Mockito.when((Object)streamOperator2.snapshotState(Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions)Matchers.any(CheckpointOptions.class))).thenReturn((Object)operatorSnapshotResult2);
        Mockito.when((Object)streamOperator3.snapshotState(Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions)Matchers.any(CheckpointOptions.class))).thenThrow(new Throwable[]{testException});
        OperatorID operatorID1 = new OperatorID();
        OperatorID operatorID2 = new OperatorID();
        OperatorID operatorID3 = new OperatorID();
        Mockito.when((Object)streamOperator1.getOperatorID()).thenReturn((Object)operatorID1);
        Mockito.when((Object)streamOperator2.getOperatorID()).thenReturn((Object)operatorID2);
        Mockito.when((Object)streamOperator3.getOperatorID()).thenReturn((Object)operatorID3);
        StreamOperator[] streamOperators = new StreamOperator[]{streamOperator1, streamOperator2, streamOperator3};
        OperatorChain operatorChain = (OperatorChain)Mockito.mock(OperatorChain.class);
        Mockito.when((Object)operatorChain.getAllOperators()).thenReturn((Object)streamOperators);
        Whitebox.setInternalState((Object)streamTask, (String)"isRunning", (Object)true);
        Whitebox.setInternalState((Object)streamTask, (String)"lock", (Object)new Object());
        Whitebox.setInternalState((Object)streamTask, (String)"operatorChain", (Object)operatorChain);
        Whitebox.setInternalState((Object)streamTask, (String)"cancelables", (Object)new CloseableRegistry());
        Whitebox.setInternalState((Object)streamTask, (String)"configuration", (Object)new StreamConfig(new Configuration()));
        try {
            streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpoint());
            Assert.fail((String)"Expected test exception here.");
        }
        catch (Exception e) {
            Assert.assertEquals((Object)testException, (Object)e.getCause());
        }
        ((OperatorSnapshotResult)Mockito.verify((Object)operatorSnapshotResult1)).cancel();
        ((OperatorSnapshotResult)Mockito.verify((Object)operatorSnapshotResult2)).cancel();
    }

    @Test
    public void testFailingAsyncCheckpointRunnable() throws Exception {
        long checkpointId = 42L;
        long timestamp = 1L;
        TaskInfo mockTaskInfo = (TaskInfo)Mockito.mock(TaskInfo.class);
        Mockito.when((Object)mockTaskInfo.getTaskNameWithSubtasks()).thenReturn((Object)"foobar");
        Mockito.when((Object)mockTaskInfo.getIndexOfThisSubtask()).thenReturn((Object)0);
        Environment mockEnvironment = (Environment)Mockito.mock(Environment.class);
        Mockito.when((Object)mockEnvironment.getTaskInfo()).thenReturn((Object)mockTaskInfo);
        StreamTask streamTask = (StreamTask)Mockito.mock(StreamTask.class, (Answer)Mockito.CALLS_REAL_METHODS);
        CheckpointMetaData checkpointMetaData = new CheckpointMetaData(42L, 1L);
        streamTask.setEnvironment(mockEnvironment);
        StreamOperator streamOperator1 = (StreamOperator)Mockito.mock(StreamOperator.class);
        StreamOperator streamOperator2 = (StreamOperator)Mockito.mock(StreamOperator.class);
        StreamOperator streamOperator3 = (StreamOperator)Mockito.mock(StreamOperator.class);
        OperatorSnapshotResult operatorSnapshotResult1 = (OperatorSnapshotResult)Mockito.mock(OperatorSnapshotResult.class);
        OperatorSnapshotResult operatorSnapshotResult2 = (OperatorSnapshotResult)Mockito.mock(OperatorSnapshotResult.class);
        OperatorSnapshotResult operatorSnapshotResult3 = (OperatorSnapshotResult)Mockito.mock(OperatorSnapshotResult.class);
        RunnableFuture failingFuture = (RunnableFuture)Mockito.mock(RunnableFuture.class);
        Mockito.when(failingFuture.get()).thenThrow(new Throwable[]{new ExecutionException(new Exception("Test exception"))});
        Mockito.when((Object)operatorSnapshotResult3.getOperatorStateRawFuture()).thenReturn((Object)failingFuture);
        Mockito.when((Object)streamOperator1.snapshotState(Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions)Matchers.any(CheckpointOptions.class))).thenReturn((Object)operatorSnapshotResult1);
        Mockito.when((Object)streamOperator2.snapshotState(Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions)Matchers.any(CheckpointOptions.class))).thenReturn((Object)operatorSnapshotResult2);
        Mockito.when((Object)streamOperator3.snapshotState(Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions)Matchers.any(CheckpointOptions.class))).thenReturn((Object)operatorSnapshotResult3);
        OperatorID operatorID1 = new OperatorID();
        OperatorID operatorID2 = new OperatorID();
        OperatorID operatorID3 = new OperatorID();
        Mockito.when((Object)streamOperator1.getOperatorID()).thenReturn((Object)operatorID1);
        Mockito.when((Object)streamOperator2.getOperatorID()).thenReturn((Object)operatorID2);
        Mockito.when((Object)streamOperator3.getOperatorID()).thenReturn((Object)operatorID3);
        StreamOperator[] streamOperators = new StreamOperator[]{streamOperator1, streamOperator2, streamOperator3};
        OperatorChain operatorChain = (OperatorChain)Mockito.mock(OperatorChain.class);
        Mockito.when((Object)operatorChain.getAllOperators()).thenReturn((Object)streamOperators);
        Whitebox.setInternalState((Object)streamTask, (String)"isRunning", (Object)true);
        Whitebox.setInternalState((Object)streamTask, (String)"lock", (Object)new Object());
        Whitebox.setInternalState((Object)streamTask, (String)"operatorChain", (Object)operatorChain);
        Whitebox.setInternalState((Object)streamTask, (String)"cancelables", (Object)new CloseableRegistry());
        Whitebox.setInternalState((Object)streamTask, (String)"asyncOperationsThreadPool", (Object)new DirectExecutorService());
        Whitebox.setInternalState((Object)streamTask, (String)"configuration", (Object)new StreamConfig(new Configuration()));
        streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpoint());
        ((StreamTask)Mockito.verify((Object)streamTask)).handleAsyncException(Matchers.anyString(), (Throwable)Matchers.any(Throwable.class));
        ((OperatorSnapshotResult)Mockito.verify((Object)operatorSnapshotResult1)).cancel();
        ((OperatorSnapshotResult)Mockito.verify((Object)operatorSnapshotResult2)).cancel();
        ((OperatorSnapshotResult)Mockito.verify((Object)operatorSnapshotResult3)).cancel();
    }

    @Test
    public void testAsyncCheckpointingConcurrentCloseAfterAcknowledge() throws Exception {
        long checkpointId = 42L;
        long timestamp = 1L;
        final OneShotLatch acknowledgeCheckpointLatch = new OneShotLatch();
        final OneShotLatch completeAcknowledge = new OneShotLatch();
        TaskInfo mockTaskInfo = (TaskInfo)Mockito.mock(TaskInfo.class);
        Mockito.when((Object)mockTaskInfo.getTaskNameWithSubtasks()).thenReturn((Object)"foobar");
        Mockito.when((Object)mockTaskInfo.getIndexOfThisSubtask()).thenReturn((Object)0);
        Environment mockEnvironment = (Environment)Mockito.mock(Environment.class);
        Mockito.when((Object)mockEnvironment.getTaskInfo()).thenReturn((Object)mockTaskInfo);
        ((Environment)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocation) throws Throwable {
                acknowledgeCheckpointLatch.trigger();
                completeAcknowledge.await();
                return null;
            }
        }).when((Object)mockEnvironment)).acknowledgeCheckpoint(Matchers.anyLong(), (CheckpointMetrics)Matchers.any(CheckpointMetrics.class), (TaskStateSnapshot)Matchers.any(TaskStateSnapshot.class));
        StreamTask streamTask = (StreamTask)Mockito.mock(StreamTask.class, (Answer)Mockito.CALLS_REAL_METHODS);
        CheckpointMetaData checkpointMetaData = new CheckpointMetaData(42L, 1L);
        streamTask.setEnvironment(mockEnvironment);
        StreamOperator streamOperator = (StreamOperator)Mockito.mock(StreamOperator.class);
        KeyedStateHandle managedKeyedStateHandle = (KeyedStateHandle)Mockito.mock(KeyedStateHandle.class);
        KeyedStateHandle rawKeyedStateHandle = (KeyedStateHandle)Mockito.mock(KeyedStateHandle.class);
        OperatorStateHandle managedOperatorStateHandle = (OperatorStateHandle)Mockito.mock(OperatorStateHandle.class);
        OperatorStateHandle rawOperatorStateHandle = (OperatorStateHandle)Mockito.mock(OperatorStateHandle.class);
        OperatorSnapshotResult operatorSnapshotResult = new OperatorSnapshotResult((RunnableFuture)new DoneFuture((Object)managedKeyedStateHandle), (RunnableFuture)new DoneFuture((Object)rawKeyedStateHandle), (RunnableFuture)new DoneFuture((Object)managedOperatorStateHandle), (RunnableFuture)new DoneFuture((Object)rawOperatorStateHandle));
        Mockito.when((Object)streamOperator.snapshotState(Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions)Matchers.any(CheckpointOptions.class))).thenReturn((Object)operatorSnapshotResult);
        StreamOperator[] streamOperators = new StreamOperator[]{streamOperator};
        OperatorChain operatorChain = (OperatorChain)Mockito.mock(OperatorChain.class);
        Mockito.when((Object)operatorChain.getAllOperators()).thenReturn((Object)streamOperators);
        StreamStateHandle streamStateHandle = (StreamStateHandle)Mockito.mock(StreamStateHandle.class);
        CheckpointStreamFactory.CheckpointStateOutputStream outStream = (CheckpointStreamFactory.CheckpointStateOutputStream)Mockito.mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
        Mockito.when((Object)outStream.closeAndGetHandle()).thenReturn((Object)streamStateHandle);
        CheckpointStreamFactory mockStreamFactory = (CheckpointStreamFactory)Mockito.mock(CheckpointStreamFactory.class);
        Mockito.when((Object)mockStreamFactory.createCheckpointStateOutputStream(Matchers.anyLong(), Matchers.anyLong())).thenReturn((Object)outStream);
        AbstractStateBackend mockStateBackend = (AbstractStateBackend)Mockito.mock(AbstractStateBackend.class);
        Mockito.when((Object)mockStateBackend.createStreamFactory((JobID)Matchers.any(JobID.class), Matchers.anyString())).thenReturn((Object)mockStreamFactory);
        Whitebox.setInternalState((Object)streamTask, (String)"isRunning", (Object)true);
        Whitebox.setInternalState((Object)streamTask, (String)"lock", (Object)new Object());
        Whitebox.setInternalState((Object)streamTask, (String)"operatorChain", (Object)operatorChain);
        Whitebox.setInternalState((Object)streamTask, (String)"cancelables", (Object)new CloseableRegistry());
        Whitebox.setInternalState((Object)streamTask, (String)"asyncOperationsThreadPool", (Object)Executors.newFixedThreadPool(1));
        Whitebox.setInternalState((Object)streamTask, (String)"configuration", (Object)new StreamConfig(new Configuration()));
        Whitebox.setInternalState((Object)streamTask, (String)"stateBackend", (Object)mockStateBackend);
        streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpoint());
        acknowledgeCheckpointLatch.await();
        ArgumentCaptor subtaskStateCaptor = ArgumentCaptor.forClass(TaskStateSnapshot.class);
        ((Environment)Mockito.verify((Object)mockEnvironment)).acknowledgeCheckpoint(Matchers.eq((long)42L), (CheckpointMetrics)Matchers.any(CheckpointMetrics.class), (TaskStateSnapshot)subtaskStateCaptor.capture());
        TaskStateSnapshot subtaskStates = (TaskStateSnapshot)subtaskStateCaptor.getValue();
        OperatorSubtaskState subtaskState = (OperatorSubtaskState)((Map.Entry)subtaskStates.getSubtaskStateMappings().iterator().next()).getValue();
        Assert.assertEquals(Collections.singletonList(managedKeyedStateHandle), (Object)subtaskState.getManagedKeyedState());
        Assert.assertEquals(Collections.singletonList(rawKeyedStateHandle), (Object)subtaskState.getRawKeyedState());
        Assert.assertEquals(Collections.singletonList(managedOperatorStateHandle), (Object)subtaskState.getManagedOperatorState());
        Assert.assertEquals(Collections.singletonList(rawOperatorStateHandle), (Object)subtaskState.getRawOperatorState());
        ((KeyedStateHandle)Mockito.verify((Object)managedKeyedStateHandle, (VerificationMode)Mockito.never())).discardState();
        ((KeyedStateHandle)Mockito.verify((Object)rawKeyedStateHandle, (VerificationMode)Mockito.never())).discardState();
        ((OperatorStateHandle)Mockito.verify((Object)managedOperatorStateHandle, (VerificationMode)Mockito.never())).discardState();
        ((OperatorStateHandle)Mockito.verify((Object)rawOperatorStateHandle, (VerificationMode)Mockito.never())).discardState();
        streamTask.cancel();
        completeAcknowledge.trigger();
        ((KeyedStateHandle)Mockito.verify((Object)managedKeyedStateHandle, (VerificationMode)Mockito.never())).discardState();
        ((KeyedStateHandle)Mockito.verify((Object)rawKeyedStateHandle, (VerificationMode)Mockito.never())).discardState();
        ((OperatorStateHandle)Mockito.verify((Object)managedOperatorStateHandle, (VerificationMode)Mockito.never())).discardState();
        ((OperatorStateHandle)Mockito.verify((Object)rawOperatorStateHandle, (VerificationMode)Mockito.never())).discardState();
    }

    @Test
    public void testAsyncCheckpointingConcurrentCloseBeforeAcknowledge() throws Exception {
        long checkpointId = 42L;
        long timestamp = 1L;
        final OneShotLatch createSubtask = new OneShotLatch();
        final OneShotLatch completeSubtask = new OneShotLatch();
        TaskInfo mockTaskInfo = (TaskInfo)Mockito.mock(TaskInfo.class);
        Mockito.when((Object)mockTaskInfo.getTaskNameWithSubtasks()).thenReturn((Object)"foobar");
        Mockito.when((Object)mockTaskInfo.getIndexOfThisSubtask()).thenReturn((Object)0);
        Environment mockEnvironment = (Environment)Mockito.mock(Environment.class);
        Mockito.when((Object)mockEnvironment.getTaskInfo()).thenReturn((Object)mockTaskInfo);
        PowerMockito.whenNew(OperatorSubtaskState.class).withArguments((Object)Matchers.anyCollectionOf(OperatorStateHandle.class), new Object[]{Matchers.anyCollectionOf(OperatorStateHandle.class), Matchers.anyCollectionOf(KeyedStateHandle.class), Matchers.anyCollectionOf(KeyedStateHandle.class)}).thenAnswer((Answer)new Answer<OperatorSubtaskState>(){

            public OperatorSubtaskState answer(InvocationOnMock invocation) throws Throwable {
                createSubtask.trigger();
                completeSubtask.await();
                Object[] arguments = invocation.getArguments();
                return new OperatorSubtaskState((OperatorStateHandle)arguments[0], (OperatorStateHandle)arguments[1], (KeyedStateHandle)arguments[2], (KeyedStateHandle)arguments[3]);
            }
        });
        StreamTask streamTask = (StreamTask)Mockito.mock(StreamTask.class, (Answer)Mockito.CALLS_REAL_METHODS);
        CheckpointMetaData checkpointMetaData = new CheckpointMetaData(42L, 1L);
        streamTask.setEnvironment(mockEnvironment);
        StreamOperator streamOperator = (StreamOperator)Mockito.mock(StreamOperator.class);
        OperatorID operatorID = new OperatorID();
        Mockito.when((Object)streamOperator.getOperatorID()).thenReturn((Object)operatorID);
        KeyedStateHandle managedKeyedStateHandle = (KeyedStateHandle)Mockito.mock(KeyedStateHandle.class);
        KeyedStateHandle rawKeyedStateHandle = (KeyedStateHandle)Mockito.mock(KeyedStateHandle.class);
        OperatorStateHandle managedOperatorStateHandle = (OperatorStateHandle)Mockito.mock(OperatorStateHandle.class);
        OperatorStateHandle rawOperatorStateHandle = (OperatorStateHandle)Mockito.mock(OperatorStateHandle.class);
        OperatorSnapshotResult operatorSnapshotResult = new OperatorSnapshotResult((RunnableFuture)new DoneFuture((Object)managedKeyedStateHandle), (RunnableFuture)new DoneFuture((Object)rawKeyedStateHandle), (RunnableFuture)new DoneFuture((Object)managedOperatorStateHandle), (RunnableFuture)new DoneFuture((Object)rawOperatorStateHandle));
        Mockito.when((Object)streamOperator.snapshotState(Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions)Matchers.any(CheckpointOptions.class))).thenReturn((Object)operatorSnapshotResult);
        StreamOperator[] streamOperators = new StreamOperator[]{streamOperator};
        OperatorChain operatorChain = (OperatorChain)Mockito.mock(OperatorChain.class);
        Mockito.when((Object)operatorChain.getAllOperators()).thenReturn((Object)streamOperators);
        StreamStateHandle streamStateHandle = (StreamStateHandle)Mockito.mock(StreamStateHandle.class);
        CheckpointStreamFactory.CheckpointStateOutputStream outStream = (CheckpointStreamFactory.CheckpointStateOutputStream)Mockito.mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
        Mockito.when((Object)outStream.closeAndGetHandle()).thenReturn((Object)streamStateHandle);
        CheckpointStreamFactory mockStreamFactory = (CheckpointStreamFactory)Mockito.mock(CheckpointStreamFactory.class);
        Mockito.when((Object)mockStreamFactory.createCheckpointStateOutputStream(Matchers.anyLong(), Matchers.anyLong())).thenReturn((Object)outStream);
        AbstractStateBackend mockStateBackend = (AbstractStateBackend)Mockito.mock(AbstractStateBackend.class);
        Mockito.when((Object)mockStateBackend.createStreamFactory((JobID)Matchers.any(JobID.class), Matchers.anyString())).thenReturn((Object)mockStreamFactory);
        ExecutorService executor = Executors.newFixedThreadPool(1);
        Whitebox.setInternalState((Object)streamTask, (String)"isRunning", (Object)true);
        Whitebox.setInternalState((Object)streamTask, (String)"lock", (Object)new Object());
        Whitebox.setInternalState((Object)streamTask, (String)"operatorChain", (Object)operatorChain);
        Whitebox.setInternalState((Object)streamTask, (String)"cancelables", (Object)new CloseableRegistry());
        Whitebox.setInternalState((Object)streamTask, (String)"asyncOperationsThreadPool", (Object)executor);
        Whitebox.setInternalState((Object)streamTask, (String)"configuration", (Object)new StreamConfig(new Configuration()));
        Whitebox.setInternalState((Object)streamTask, (String)"stateBackend", (Object)mockStateBackend);
        streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpoint());
        createSubtask.await();
        streamTask.cancel();
        completeSubtask.trigger();
        executor.shutdown();
        if (!executor.awaitTermination(10000L, TimeUnit.MILLISECONDS)) {
            Assert.fail((String)"Executor did not shut down within the given timeout. This indicates that the checkpointing did not resume.");
        }
        ((Environment)Mockito.verify((Object)mockEnvironment, (VerificationMode)Mockito.never())).acknowledgeCheckpoint(Matchers.eq((long)42L), (CheckpointMetrics)Matchers.any(CheckpointMetrics.class), (TaskStateSnapshot)Matchers.any(TaskStateSnapshot.class));
        ((KeyedStateHandle)Mockito.verify((Object)managedKeyedStateHandle)).discardState();
        ((KeyedStateHandle)Mockito.verify((Object)rawKeyedStateHandle)).discardState();
        ((OperatorStateHandle)Mockito.verify((Object)managedOperatorStateHandle)).discardState();
        ((OperatorStateHandle)Mockito.verify((Object)rawOperatorStateHandle)).discardState();
    }

    @Test
    public void testEmptySubtaskStateLeadsToStatelessAcknowledgment() throws Exception {
        long checkpointId = 42L;
        long timestamp = 1L;
        TaskInfo mockTaskInfo = (TaskInfo)Mockito.mock(TaskInfo.class);
        Mockito.when((Object)mockTaskInfo.getTaskNameWithSubtasks()).thenReturn((Object)"foobar");
        Mockito.when((Object)mockTaskInfo.getIndexOfThisSubtask()).thenReturn((Object)0);
        Environment mockEnvironment = (Environment)Mockito.mock(Environment.class);
        final OneShotLatch checkpointCompletedLatch = new OneShotLatch();
        final ArrayList checkpointResult = new ArrayList(1);
        ((Environment)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                SubtaskState subtaskState = (SubtaskState)invocationOnMock.getArgumentAt(2, SubtaskState.class);
                checkpointResult.add(subtaskState);
                checkpointCompletedLatch.trigger();
                return null;
            }
        }).when((Object)mockEnvironment)).acknowledgeCheckpoint(Matchers.anyLong(), (CheckpointMetrics)Matchers.any(CheckpointMetrics.class), (TaskStateSnapshot)Matchers.any(TaskStateSnapshot.class));
        Mockito.when((Object)mockEnvironment.getTaskInfo()).thenReturn((Object)mockTaskInfo);
        StreamTask streamTask = (StreamTask)Mockito.mock(StreamTask.class, (Answer)Mockito.CALLS_REAL_METHODS);
        CheckpointMetaData checkpointMetaData = new CheckpointMetaData(42L, 1L);
        streamTask.setEnvironment(mockEnvironment);
        StreamOperator statelessOperator = (StreamOperator)Mockito.mock(StreamOperator.class);
        OperatorID operatorID = new OperatorID();
        Mockito.when((Object)statelessOperator.getOperatorID()).thenReturn((Object)operatorID);
        OperatorSnapshotResult statelessOperatorSnapshotResult = new OperatorSnapshotResult();
        Mockito.when((Object)statelessOperator.snapshotState(Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions)Matchers.any(CheckpointOptions.class))).thenReturn((Object)statelessOperatorSnapshotResult);
        StreamOperator[] streamOperators = new StreamOperator[]{statelessOperator};
        OperatorChain operatorChain = (OperatorChain)Mockito.mock(OperatorChain.class);
        Mockito.when((Object)operatorChain.getAllOperators()).thenReturn((Object)streamOperators);
        Whitebox.setInternalState((Object)streamTask, (String)"isRunning", (Object)true);
        Whitebox.setInternalState((Object)streamTask, (String)"lock", (Object)new Object());
        Whitebox.setInternalState((Object)streamTask, (String)"operatorChain", (Object)operatorChain);
        Whitebox.setInternalState((Object)streamTask, (String)"cancelables", (Object)new CloseableRegistry());
        Whitebox.setInternalState((Object)streamTask, (String)"configuration", (Object)new StreamConfig(new Configuration()));
        Whitebox.setInternalState((Object)streamTask, (String)"asyncOperationsThreadPool", (Object)Executors.newCachedThreadPool());
        streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpoint());
        checkpointCompletedLatch.await(30L, TimeUnit.SECONDS);
        streamTask.cancel();
        Assert.assertNull(checkpointResult.get(0));
    }

    @Test
    public void testOperatorClosingBeforeStopRunning() throws Throwable {
        Configuration taskConfiguration = new Configuration();
        StreamConfig streamConfig = new StreamConfig(taskConfiguration);
        streamConfig.setStreamOperator((StreamOperator)new BlockingCloseStreamOperator());
        streamConfig.setOperatorID(new OperatorID());
        MockEnvironment mockEnvironment = new MockEnvironment("Test Task", 32768L, new MockInputSplitProvider(), 1, taskConfiguration, new ExecutionConfig());
        NoOpStreamTask streamTask = new NoOpStreamTask((Environment)mockEnvironment);
        AtomicReference<Object> atomicThrowable = new AtomicReference<Object>(null);
        CompletableFuture<Void> invokeFuture = CompletableFuture.runAsync(() -> {
            try {
                streamTask.invoke();
            }
            catch (Exception e) {
                atomicThrowable.set(e);
            }
        }, TestingUtils.defaultExecutor());
        BlockingCloseStreamOperator.IN_CLOSE.await();
        Assert.assertTrue((boolean)streamTask.isRunning());
        BlockingCloseStreamOperator.FINISH_CLOSE.trigger();
        invokeFuture.get();
        Assert.assertFalse((boolean)streamTask.isRunning());
        if (atomicThrowable.get() != null) {
            throw (Throwable)atomicThrowable.get();
        }
    }

    public static Task createTask(Class<? extends AbstractInvokable> invokable, StreamConfig taskConfig, Configuration taskManagerConfig) throws Exception {
        BlobCacheService blobService = new BlobCacheService((PermanentBlobCache)Mockito.mock(PermanentBlobCache.class), (TransientBlobCache)Mockito.mock(TransientBlobCache.class));
        LibraryCacheManager libCache = (LibraryCacheManager)Mockito.mock(LibraryCacheManager.class);
        Mockito.when((Object)libCache.getClassLoader((JobID)Matchers.any(JobID.class))).thenReturn((Object)StreamTaskTest.class.getClassLoader());
        ResultPartitionManager partitionManager = (ResultPartitionManager)Mockito.mock(ResultPartitionManager.class);
        ResultPartitionConsumableNotifier consumableNotifier = (ResultPartitionConsumableNotifier)Mockito.mock(ResultPartitionConsumableNotifier.class);
        PartitionProducerStateChecker partitionProducerStateChecker = (PartitionProducerStateChecker)Mockito.mock(PartitionProducerStateChecker.class);
        Executor executor = (Executor)Mockito.mock(Executor.class);
        NetworkEnvironment network = (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class);
        Mockito.when((Object)network.getResultPartitionManager()).thenReturn((Object)partitionManager);
        Mockito.when((Object)network.getDefaultIOMode()).thenReturn((Object)IOManager.IOMode.SYNC);
        Mockito.when((Object)network.createKvStateTaskRegistry((JobID)Matchers.any(JobID.class), (JobVertexID)Matchers.any(JobVertexID.class))).thenReturn(Mockito.mock(TaskKvStateRegistry.class));
        JobInformation jobInformation = new JobInformation(new JobID(), "Job Name", new SerializedValue((Object)new ExecutionConfig()), new Configuration(), Collections.emptyList(), Collections.emptyList());
        TaskInformation taskInformation = new TaskInformation(new JobVertexID(), "Test Task", 1, 1, invokable.getName(), taskConfig.getConfiguration());
        return new Task(jobInformation, taskInformation, new ExecutionAttemptID(), new AllocationID(), 0, 0, Collections.emptyList(), Collections.emptyList(), 0, new TaskStateSnapshot(), (MemoryManager)Mockito.mock(MemoryManager.class), (IOManager)Mockito.mock(IOManager.class), network, (BroadcastVariableManager)Mockito.mock(BroadcastVariableManager.class), (TaskManagerActions)Mockito.mock(TaskManagerActions.class), (InputSplitProvider)Mockito.mock(InputSplitProvider.class), (CheckpointResponder)Mockito.mock(CheckpointResponder.class), blobService, libCache, (FileCache)Mockito.mock(FileCache.class), (TaskManagerRuntimeInfo)new TestingTaskManagerRuntimeInfo(taskManagerConfig, new String[]{System.getProperty("java.io.tmpdir")}), (TaskMetricGroup)new UnregisteredTaskMetricsGroup(), consumableNotifier, partitionProducerStateChecker, executor);
    }

    private static final class LockHolder
    extends Thread
    implements Closeable {
        private final OneShotLatch trigger;
        private final Object lock;
        private volatile boolean canceled;

        private LockHolder(Object lock, OneShotLatch trigger) {
            this.lock = lock;
            this.trigger = trigger;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            Object object = this.lock;
            synchronized (object) {
                while (!this.canceled) {
                    this.trigger.trigger();
                    try {
                        Thread.sleep(1000000000L);
                    }
                    catch (InterruptedException interruptedException) {}
                }
            }
        }

        public void cancel() {
            this.canceled = true;
        }

        @Override
        public void close() {
            this.canceled = true;
            this.interrupt();
        }
    }

    public static class CancelFailingTask
    extends StreamTask<String, AbstractStreamOperator<String>> {
        protected void init() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void run() throws Exception {
            OneShotLatch latch = new OneShotLatch();
            Object lock = new Object();
            holder.start();
            try (LockHolder holder = new LockHolder(lock, latch);){
                this.getCancelables().registerCloseable((Closeable)holder);
                latch.await();
                syncLatch.trigger();
                Object object = lock;
                synchronized (object) {
                }
            }
        }

        protected void cleanup() {
        }

        protected void cancelTask() throws Exception {
            throw new Exception("test exception");
        }
    }

    public static class CancelLockingTask
    extends StreamTask<String, AbstractStreamOperator<String>> {
        private final OneShotLatch latch = new OneShotLatch();
        private LockHolder holder;

        protected void init() {
        }

        protected void run() throws Exception {
            this.holder = new LockHolder(this.getCheckpointLock(), this.latch);
            this.holder.start();
            this.latch.await();
            syncLatch.trigger();
            try {
                Thread.sleep(100000000L);
            }
            catch (InterruptedException ignored) {
                Thread.currentThread().interrupt();
            }
        }

        protected void cleanup() {
            this.holder.close();
        }

        protected void cancelTask() {
            this.holder.cancel();
        }
    }

    public static class StateBackendTestSource
    extends StreamTask<Long, StreamSource<Long, SourceFunction<Long>>> {
        private static volatile boolean fail;
        private static volatile OperatorStateBackend operatorStateBackend;
        private static volatile AbstractKeyedStateBackend keyedStateBackend;

        protected void init() throws Exception {
            operatorStateBackend = this.createOperatorStateBackend((StreamOperator)Mockito.mock(StreamOperator.class), null);
            keyedStateBackend = this.createKeyedStateBackend((TypeSerializer)Mockito.mock(TypeSerializer.class), 4, (KeyGroupRange)Mockito.mock(KeyGroupRange.class));
        }

        protected void run() throws Exception {
            if (fail) {
                throw new RuntimeException();
            }
        }

        protected void cleanup() throws Exception {
        }

        protected void cancelTask() throws Exception {
        }
    }

    public static final class MockStateBackend
    implements StateBackendFactory<AbstractStateBackend> {
        private static final long serialVersionUID = 1L;

        public AbstractStateBackend createFromConfig(Configuration config) {
            AbstractStateBackend stateBackendMock = (AbstractStateBackend)Mockito.mock(AbstractStateBackend.class);
            try {
                Mockito.when((Object)stateBackendMock.createOperatorStateBackend((Environment)Mockito.any(Environment.class), (String)Mockito.any(String.class))).thenAnswer((Answer)new Answer<OperatorStateBackend>(){

                    public OperatorStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable {
                        return (OperatorStateBackend)Mockito.mock(OperatorStateBackend.class);
                    }
                });
                Mockito.when((Object)stateBackendMock.createKeyedStateBackend((Environment)Mockito.any(Environment.class), (JobID)Mockito.any(JobID.class), (String)Mockito.any(String.class), (TypeSerializer)Mockito.any(TypeSerializer.class), ((Integer)Mockito.any(Integer.TYPE)).intValue(), (KeyGroupRange)Mockito.any(KeyGroupRange.class), (TaskKvStateRegistry)Mockito.any(TaskKvStateRegistry.class))).thenAnswer((Answer)new Answer<AbstractKeyedStateBackend>(){

                    public AbstractKeyedStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable {
                        return (AbstractKeyedStateBackend)Mockito.mock(AbstractKeyedStateBackend.class);
                    }
                });
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            return stateBackendMock;
        }
    }

    private static class MockSourceFunction
    implements SourceFunction<Long> {
        private static final long serialVersionUID = 1L;

        private MockSourceFunction() {
        }

        public void run(SourceFunction.SourceContext<Long> ctx) {
        }

        public void cancel() {
        }
    }

    private static class SlowlyDeserializingOperator
    extends StreamSource<Long, SourceFunction<Long>> {
        private static final long serialVersionUID = 1L;
        private volatile boolean canceled = false;

        public SlowlyDeserializingOperator() {
            super((SourceFunction)new MockSourceFunction());
        }

        public void run(Object lockingObject, StreamStatusMaintainer streamStatusMaintainer, Output<StreamRecord<Long>> collector) throws Exception {
            while (!this.canceled) {
                try {
                    Thread.sleep(500L);
                }
                catch (InterruptedException interruptedException) {}
            }
        }

        public void cancel() {
            this.canceled = true;
        }

        private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
            in.defaultReadObject();
            long delay = 500L;
            long deadline = System.currentTimeMillis() + delay;
            do {
                try {
                    Thread.sleep(delay);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            } while ((delay = deadline - System.currentTimeMillis()) > 0L);
        }
    }

    private static class TestingExecutionStateListener
    implements TaskExecutionStateListener {
        private ExecutionState executionState = null;
        private final PriorityQueue<Tuple2<ExecutionState, Promise<ExecutionState>>> priorityQueue = new PriorityQueue<Tuple2<ExecutionState, Promise<ExecutionState>>>(1, new Comparator<Tuple2<ExecutionState, Promise<ExecutionState>>>(){

            @Override
            public int compare(Tuple2<ExecutionState, Promise<ExecutionState>> o1, Tuple2<ExecutionState, Promise<ExecutionState>> o2) {
                return ((ExecutionState)o1.f0).ordinal() - ((ExecutionState)o2.f0).ordinal();
            }
        });

        private TestingExecutionStateListener() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public Future<ExecutionState> notifyWhenExecutionState(ExecutionState executionState) {
            PriorityQueue<Tuple2<ExecutionState, Promise<ExecutionState>>> priorityQueue = this.priorityQueue;
            synchronized (priorityQueue) {
                if (this.executionState != null && this.executionState.ordinal() >= executionState.ordinal()) {
                    return Futures.successful((Object)executionState);
                }
                Promise.DefaultPromise promise = new Promise.DefaultPromise();
                this.priorityQueue.offer((Tuple2<ExecutionState, Promise<ExecutionState>>)Tuple2.of((Object)executionState, (Object)promise));
                return promise.future();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void notifyTaskExecutionStateChanged(TaskExecutionState taskExecutionState) {
            PriorityQueue<Tuple2<ExecutionState, Promise<ExecutionState>>> priorityQueue = this.priorityQueue;
            synchronized (priorityQueue) {
                this.executionState = taskExecutionState.getExecutionState();
                while (!this.priorityQueue.isEmpty() && ((ExecutionState)this.priorityQueue.peek().f0).ordinal() <= this.executionState.ordinal()) {
                    Promise promise = (Promise)this.priorityQueue.poll().f1;
                    promise.success((Object)this.executionState);
                }
            }
        }
    }

    private static class BlockingCloseStreamOperator
    extends AbstractStreamOperator<Void> {
        private static final long serialVersionUID = -9042150529568008847L;
        public static final OneShotLatch IN_CLOSE = new OneShotLatch();
        public static final OneShotLatch FINISH_CLOSE = new OneShotLatch();

        private BlockingCloseStreamOperator() {
        }

        public void close() throws Exception {
            IN_CLOSE.trigger();
            FINISH_CLOSE.await();
            super.close();
        }
    }

    private static class NoOpStreamTask<T, OP extends StreamOperator<T>>
    extends StreamTask<T, OP> {
        public NoOpStreamTask(Environment environment) {
            this.setEnvironment(environment);
        }

        protected void init() throws Exception {
        }

        protected void run() throws Exception {
        }

        protected void cleanup() throws Exception {
        }

        protected void cancelTask() throws Exception {
        }
    }
}

