/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.io.File;
import java.lang.reflect.Field;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.powermock.api.mockito.PowerMockito;

public class PendingCheckpointTest {
    private static final Map<ExecutionAttemptID, ExecutionVertex> ACK_TASKS = new HashMap<ExecutionAttemptID, ExecutionVertex>();
    private static final ExecutionAttemptID ATTEMPT_ID = new ExecutionAttemptID();
    @Rule
    public final TemporaryFolder tmpFolder = new TemporaryFolder();

    @Test
    public void testCanBeSubsumed() throws Exception {
        CheckpointProperties forced = new CheckpointProperties(true, true, false, false, false, false, false, false);
        PendingCheckpoint pending = PendingCheckpointTest.createPendingCheckpoint(forced, "ignored");
        Assert.assertFalse((boolean)pending.canBeSubsumed());
        try {
            pending.abortSubsumed();
            Assert.fail((String)"Did not throw expected Exception");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
        CheckpointProperties subsumed = new CheckpointProperties(false, true, false, false, false, false, false, false);
        pending = PendingCheckpointTest.createPendingCheckpoint(subsumed, "ignored");
        Assert.assertTrue((boolean)pending.canBeSubsumed());
    }

    @Test
    public void testPersistExternally() throws Exception {
        File tmp = this.tmpFolder.newFolder();
        CheckpointProperties persisted = new CheckpointProperties(false, true, false, false, false, false, false, false);
        PendingCheckpoint pending = PendingCheckpointTest.createPendingCheckpoint(persisted, tmp.getAbsolutePath());
        pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
        Assert.assertEquals((long)0L, (long)tmp.listFiles().length);
        pending.finalizeCheckpointExternalized();
        Assert.assertEquals((long)1L, (long)tmp.listFiles().length);
        CheckpointProperties ephemeral = new CheckpointProperties(false, false, false, true, true, true, true, true);
        pending = PendingCheckpointTest.createPendingCheckpoint(ephemeral, null);
        pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
        Assert.assertEquals((long)1L, (long)tmp.listFiles().length);
        pending.finalizeCheckpointNonExternalized();
        Assert.assertEquals((long)1L, (long)tmp.listFiles().length);
    }

    @Test
    public void testCompletionFuture() throws Exception {
        CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false, false);
        PendingCheckpoint pending = PendingCheckpointTest.createPendingCheckpoint(props, "ignored");
        CompletableFuture future = pending.getCompletionFuture();
        Assert.assertFalse((boolean)future.isDone());
        pending.abortDeclined();
        Assert.assertTrue((boolean)future.isDone());
        pending = PendingCheckpointTest.createPendingCheckpoint(props, "ignored");
        future = pending.getCompletionFuture();
        Assert.assertFalse((boolean)future.isDone());
        pending.abortExpired();
        Assert.assertTrue((boolean)future.isDone());
        pending = PendingCheckpointTest.createPendingCheckpoint(props, "ignored");
        future = pending.getCompletionFuture();
        Assert.assertFalse((boolean)future.isDone());
        pending.abortSubsumed();
        Assert.assertTrue((boolean)future.isDone());
        String target = this.tmpFolder.newFolder().getAbsolutePath();
        pending = PendingCheckpointTest.createPendingCheckpoint(props, target);
        future = pending.getCompletionFuture();
        Assert.assertFalse((boolean)future.isDone());
        pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
        Assert.assertTrue((boolean)pending.isFullyAcknowledged());
        pending.finalizeCheckpointExternalized();
        Assert.assertTrue((boolean)future.isDone());
        pending = PendingCheckpointTest.createPendingCheckpoint(props, "ignored");
        future = pending.getCompletionFuture();
        Assert.assertFalse((boolean)future.isDone());
        try {
            pending.finalizeCheckpointNonExternalized();
            Assert.fail((String)"Did not throw expected Exception");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
        try {
            pending.finalizeCheckpointExternalized();
            Assert.fail((String)"Did not throw expected Exception");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    @Test
    public void testAbortDiscardsState() throws Exception {
        CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false, false);
        QueueExecutor executor = new QueueExecutor();
        OperatorState state = (OperatorState)Mockito.mock(OperatorState.class);
        ((OperatorState)Mockito.doNothing().when((Object)state)).registerSharedStates((SharedStateRegistry)Matchers.any(SharedStateRegistry.class));
        String targetDir = this.tmpFolder.newFolder().getAbsolutePath();
        PendingCheckpoint pending = PendingCheckpointTest.createPendingCheckpoint(props, targetDir, executor);
        PendingCheckpointTest.setTaskState(pending, state);
        pending.abortDeclined();
        executor.runQueuedCommands();
        ((OperatorState)Mockito.verify((Object)state, (VerificationMode)Mockito.times((int)1))).discardState();
        Mockito.reset((Object[])new OperatorState[]{state});
        pending = PendingCheckpointTest.createPendingCheckpoint(props, targetDir, executor);
        PendingCheckpointTest.setTaskState(pending, state);
        pending.abortError((Throwable)new Exception("Expected Test Exception"));
        executor.runQueuedCommands();
        ((OperatorState)Mockito.verify((Object)state, (VerificationMode)Mockito.times((int)1))).discardState();
        Mockito.reset((Object[])new OperatorState[]{state});
        pending = PendingCheckpointTest.createPendingCheckpoint(props, targetDir, executor);
        PendingCheckpointTest.setTaskState(pending, state);
        pending.abortExpired();
        executor.runQueuedCommands();
        ((OperatorState)Mockito.verify((Object)state, (VerificationMode)Mockito.times((int)1))).discardState();
        Mockito.reset((Object[])new OperatorState[]{state});
        pending = PendingCheckpointTest.createPendingCheckpoint(props, targetDir, executor);
        PendingCheckpointTest.setTaskState(pending, state);
        pending.abortSubsumed();
        executor.runQueuedCommands();
        ((OperatorState)Mockito.verify((Object)state, (VerificationMode)Mockito.times((int)1))).discardState();
    }

    @Test
    public void testPendingCheckpointStatsCallbacks() throws Exception {
        PendingCheckpointStats callback = (PendingCheckpointStats)Mockito.mock(PendingCheckpointStats.class);
        PendingCheckpoint pending = PendingCheckpointTest.createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null);
        pending.setStatsCallback(callback);
        pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
        ((PendingCheckpointStats)Mockito.verify((Object)callback, (VerificationMode)Mockito.times((int)1))).reportSubtaskStats((JobVertexID)Matchers.any(JobVertexID.class), (SubtaskStateStats)Matchers.any(SubtaskStateStats.class));
        pending.finalizeCheckpointNonExternalized();
        ((PendingCheckpointStats)Mockito.verify((Object)callback, (VerificationMode)Mockito.times((int)1))).reportCompletedCheckpoint((String)Matchers.any(String.class));
        callback = (PendingCheckpointStats)Mockito.mock(PendingCheckpointStats.class);
        pending = PendingCheckpointTest.createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null);
        pending.setStatsCallback(callback);
        pending.abortSubsumed();
        ((PendingCheckpointStats)Mockito.verify((Object)callback, (VerificationMode)Mockito.times((int)1))).reportFailedCheckpoint(Matchers.anyLong(), (Throwable)Matchers.any(Exception.class));
        callback = (PendingCheckpointStats)Mockito.mock(PendingCheckpointStats.class);
        pending = PendingCheckpointTest.createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null);
        pending.setStatsCallback(callback);
        pending.abortDeclined();
        ((PendingCheckpointStats)Mockito.verify((Object)callback, (VerificationMode)Mockito.times((int)1))).reportFailedCheckpoint(Matchers.anyLong(), (Throwable)Matchers.any(Exception.class));
        callback = (PendingCheckpointStats)Mockito.mock(PendingCheckpointStats.class);
        pending = PendingCheckpointTest.createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null);
        pending.setStatsCallback(callback);
        pending.abortError((Throwable)new Exception("Expected test error"));
        ((PendingCheckpointStats)Mockito.verify((Object)callback, (VerificationMode)Mockito.times((int)1))).reportFailedCheckpoint(Matchers.anyLong(), (Throwable)Matchers.any(Exception.class));
        callback = (PendingCheckpointStats)Mockito.mock(PendingCheckpointStats.class);
        pending = PendingCheckpointTest.createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null);
        pending.setStatsCallback(callback);
        pending.abortExpired();
        ((PendingCheckpointStats)Mockito.verify((Object)callback, (VerificationMode)Mockito.times((int)1))).reportFailedCheckpoint(Matchers.anyLong(), (Throwable)Matchers.any(Exception.class));
    }

    @Test
    public void testNullSubtaskStateLeadsToStatelessTask() throws Exception {
        PendingCheckpoint pending = PendingCheckpointTest.createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null);
        pending.acknowledgeTask(ATTEMPT_ID, null, (CheckpointMetrics)Mockito.mock(CheckpointMetrics.class));
        Assert.assertTrue((boolean)pending.getOperatorStates().isEmpty());
    }

    @Test
    public void testNonNullSubtaskStateLeadsToStatefulTask() throws Exception {
        PendingCheckpoint pending = PendingCheckpointTest.createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null);
        pending.acknowledgeTask(ATTEMPT_ID, (TaskStateSnapshot)Mockito.mock(TaskStateSnapshot.class), (CheckpointMetrics)Mockito.mock(CheckpointMetrics.class));
        Assert.assertFalse((boolean)pending.getOperatorStates().isEmpty());
    }

    @Test
    public void testSetCanceller() {
        CheckpointProperties props = new CheckpointProperties(false, false, false, true, true, true, true, true);
        PendingCheckpoint aborted = PendingCheckpointTest.createPendingCheckpoint(props, null);
        aborted.abortDeclined();
        Assert.assertTrue((boolean)aborted.isDiscarded());
        Assert.assertFalse((boolean)aborted.setCancellerHandle((ScheduledFuture)Mockito.mock(ScheduledFuture.class)));
        PendingCheckpoint pending = PendingCheckpointTest.createPendingCheckpoint(props, null);
        ScheduledFuture canceller = (ScheduledFuture)Mockito.mock(ScheduledFuture.class);
        Assert.assertTrue((boolean)pending.setCancellerHandle(canceller));
        pending.abortDeclined();
        ((ScheduledFuture)Mockito.verify((Object)canceller)).cancel(false);
    }

    private static PendingCheckpoint createPendingCheckpoint(CheckpointProperties props, String targetDirectory) {
        return PendingCheckpointTest.createPendingCheckpoint(props, targetDirectory, Executors.directExecutor());
    }

    private static PendingCheckpoint createPendingCheckpoint(CheckpointProperties props, String targetDirectory, Executor executor) {
        HashMap<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<ExecutionAttemptID, ExecutionVertex>(ACK_TASKS);
        return new PendingCheckpoint(new JobID(), 0L, 1L, ackTasks, props, targetDirectory, executor);
    }

    static void setTaskState(PendingCheckpoint pending, OperatorState state) throws NoSuchFieldException, IllegalAccessException {
        Field field = PendingCheckpoint.class.getDeclaredField("operatorStates");
        field.setAccessible(true);
        Map taskStates = (Map)field.get(pending);
        taskStates.put(new OperatorID(), state);
    }

    static {
        ExecutionJobVertex jobVertex = (ExecutionJobVertex)Mockito.mock(ExecutionJobVertex.class);
        PowerMockito.when((Object)jobVertex.getOperatorIDs()).thenReturn(Collections.singletonList(new OperatorID()));
        ExecutionVertex vertex = (ExecutionVertex)Mockito.mock(ExecutionVertex.class);
        PowerMockito.when((Object)vertex.getMaxParallelism()).thenReturn((Object)128);
        PowerMockito.when((Object)vertex.getTotalNumberOfParallelSubtasks()).thenReturn((Object)1);
        PowerMockito.when((Object)vertex.getJobVertex()).thenReturn((Object)jobVertex);
        ACK_TASKS.put(ATTEMPT_ID, vertex);
    }

    private static final class QueueExecutor
    implements Executor {
        private final Queue<Runnable> queue = new ArrayDeque<Runnable>(4);

        private QueueExecutor() {
        }

        @Override
        public void execute(Runnable command) {
            this.queue.add(command);
        }

        public void runQueuedCommands() {
            for (Runnable runnable : this.queue) {
                runnable.run();
            }
        }
    }
}

