/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.util.Collections;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.PermanentBlobCache;
import org.apache.flink.runtime.blob.PermanentBlobService;
import org.apache.flink.runtime.blob.TransientBlobCache;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamFilter;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.util.SerializedValue;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

public class BlockingCheckpointsTest {
    private static final OneShotLatch IN_CHECKPOINT_LATCH = new OneShotLatch();

    @Test
    public void testBlockingNonInterruptibleCheckpoint() throws Exception {
        Configuration taskConfig = new Configuration();
        StreamConfig cfg = new StreamConfig(taskConfig);
        cfg.setStreamOperator((StreamOperator)new TestOperator());
        cfg.setOperatorID(new OperatorID());
        cfg.setStateBackend((AbstractStateBackend)new LockingStreamStateBackend());
        Task task = BlockingCheckpointsTest.createTask(taskConfig);
        task.startTaskThread();
        IN_CHECKPOINT_LATCH.await();
        task.cancelExecution();
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
        Assert.assertNull((Object)task.getFailureCause());
    }

    private static Task createTask(Configuration taskConfig) throws IOException {
        JobInformation jobInformation = new JobInformation(new JobID(), "test job name", new SerializedValue((Object)new ExecutionConfig()), new Configuration(), Collections.emptyList(), Collections.emptyList());
        TaskInformation taskInformation = new TaskInformation(new JobVertexID(), "test task name", 1, 11, TestStreamTask.class.getName(), taskConfig);
        TaskKvStateRegistry mockKvRegistry = (TaskKvStateRegistry)Mockito.mock(TaskKvStateRegistry.class);
        NetworkEnvironment network = (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class);
        Mockito.when((Object)network.createKvStateTaskRegistry((JobID)Matchers.any(JobID.class), (JobVertexID)Matchers.any(JobVertexID.class))).thenReturn((Object)mockKvRegistry);
        BlobCacheService blobService = new BlobCacheService((PermanentBlobCache)Mockito.mock(PermanentBlobCache.class), (TransientBlobCache)Mockito.mock(TransientBlobCache.class));
        return new Task(jobInformation, taskInformation, new ExecutionAttemptID(), new AllocationID(), 0, 0, Collections.emptyList(), Collections.emptyList(), 0, null, (MemoryManager)Mockito.mock(MemoryManager.class), (IOManager)Mockito.mock(IOManager.class), network, (BroadcastVariableManager)Mockito.mock(BroadcastVariableManager.class), (TaskManagerActions)Mockito.mock(TaskManagerActions.class), (InputSplitProvider)Mockito.mock(InputSplitProvider.class), (CheckpointResponder)Mockito.mock(CheckpointResponder.class), blobService, (LibraryCacheManager)new BlobLibraryCacheManager((PermanentBlobService)blobService.getPermanentBlobService(), FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST, new String[0]), new FileCache(new String[]{EnvironmentInformation.getTemporaryFileDirectory()}), (TaskManagerRuntimeInfo)new TestingTaskManagerRuntimeInfo(), (TaskMetricGroup)new UnregisteredTaskMetricsGroup(), (ResultPartitionConsumableNotifier)Mockito.mock(ResultPartitionConsumableNotifier.class), (PartitionProducerStateChecker)Mockito.mock(PartitionProducerStateChecker.class), Executors.directExecutor());
    }

    public static final class TestStreamTask
    extends OneInputStreamTask<Object, Object> {
        public void init() {
        }

        protected void run() throws Exception {
            this.triggerCheckpointOnBarrier(new CheckpointMetaData(11L, System.currentTimeMillis()), CheckpointOptions.forCheckpoint(), new CheckpointMetrics());
        }

        protected void cleanup() {
        }

        protected void cancelTask() {
        }
    }

    private static final class TestOperator
    extends StreamFilter<Object> {
        private static final long serialVersionUID = 1L;

        public TestOperator() {
            super((FilterFunction)new FilterFunction<Object>(){

                public boolean filter(Object value) {
                    return false;
                }
            });
        }

        public void snapshotState(StateSnapshotContext context) throws Exception {
            OperatorStateCheckpointOutputStream outStream = context.getRawOperatorStateOutput();
            IN_CHECKPOINT_LATCH.trigger();
            outStream.write(1);
        }
    }

    private static final class LockingOutputStream
    extends CheckpointStreamFactory.CheckpointStateOutputStream {
        private final Object lock = new Object();
        private volatile boolean closed;

        private LockingOutputStream() {
        }

        public StreamStateHandle closeAndGetHandle() throws IOException {
            return null;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void write(int b) throws IOException {
            Object object = this.lock;
            synchronized (object) {
                while (!this.closed) {
                    try {
                        this.lock.wait();
                    }
                    catch (InterruptedException interruptedException) {}
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void close() throws IOException {
            Object object = this.lock;
            synchronized (object) {
                this.closed = true;
                this.lock.notifyAll();
            }
        }

        public long getPos() {
            return 0L;
        }

        public void flush() {
        }

        public void sync() {
        }
    }

    private static final class LockingOutputStreamFactory
    implements CheckpointStreamFactory {
        private LockingOutputStreamFactory() {
        }

        public CheckpointStreamFactory.CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) {
            return new LockingOutputStream();
        }

        public void close() {
        }
    }

    private static class LockingStreamStateBackend
    extends AbstractStateBackend {
        private static final long serialVersionUID = 1L;

        private LockingStreamStateBackend() {
        }

        public CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException {
            return new LockingOutputStreamFactory();
        }

        public CheckpointStreamFactory createSavepointStreamFactory(JobID jobId, String operatorIdentifier, String targetLocation) throws IOException {
            throw new UnsupportedOperationException();
        }

        public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry) {
            throw new UnsupportedOperationException();
        }

        public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception {
            return new DefaultOperatorStateBackend(((Object)((Object)this)).getClass().getClassLoader(), new ExecutionConfig(), true);
        }
    }
}

