/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointDeclineReason;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CheckpointTriggerResult;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorStateRepartitioner;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
import org.apache.flink.runtime.checkpoint.RoundRobinOperatorStateRepartitioner;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class CheckpointCoordinatorTest
extends TestLogger {
    @Rule
    public TemporaryFolder tmpFolder = new TemporaryFolder();

    @Test
    public void testCheckpointAbortsIfTriggerTasksAreNotExecuted() {
        try {
            JobID jid = new JobID();
            long timestamp = System.currentTimeMillis();
            ExecutionVertex triggerVertex1 = (ExecutionVertex)Mockito.mock(ExecutionVertex.class);
            ExecutionVertex triggerVertex2 = (ExecutionVertex)Mockito.mock(ExecutionVertex.class);
            ExecutionAttemptID ackAttemptID1 = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID2 = new ExecutionAttemptID();
            ExecutionVertex ackVertex1 = CheckpointCoordinatorTest.mockExecutionVertex(ackAttemptID1);
            ExecutionVertex ackVertex2 = CheckpointCoordinatorTest.mockExecutionVertex(ackAttemptID2);
            CheckpointCoordinator coord = new CheckpointCoordinator(jid, 600000L, 600000L, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), new ExecutionVertex[]{triggerVertex1, triggerVertex2}, new ExecutionVertex[]{ackVertex1, ackVertex2}, new ExecutionVertex[0], (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(1), null, Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
            Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertFalse((boolean)coord.triggerCheckpoint(timestamp, false));
            Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            coord.shutdown(JobStatus.FINISHED);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testCheckpointAbortsIfTriggerTasksAreFinished() {
        try {
            JobID jid = new JobID();
            long timestamp = System.currentTimeMillis();
            ExecutionAttemptID triggerAttemptID1 = new ExecutionAttemptID();
            ExecutionAttemptID triggerAttemptID2 = new ExecutionAttemptID();
            ExecutionVertex triggerVertex1 = CheckpointCoordinatorTest.mockExecutionVertex(triggerAttemptID1);
            JobVertexID jobVertexID2 = new JobVertexID();
            ExecutionVertex triggerVertex2 = CheckpointCoordinatorTest.mockExecutionVertex(triggerAttemptID2, jobVertexID2, Collections.singletonList(OperatorID.fromJobVertexID((JobVertexID)jobVertexID2)), 1, 1, ExecutionState.FINISHED, new ExecutionState[0]);
            ExecutionAttemptID ackAttemptID1 = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID2 = new ExecutionAttemptID();
            ExecutionVertex ackVertex1 = CheckpointCoordinatorTest.mockExecutionVertex(ackAttemptID1);
            ExecutionVertex ackVertex2 = CheckpointCoordinatorTest.mockExecutionVertex(ackAttemptID2);
            CheckpointCoordinator coord = new CheckpointCoordinator(jid, 600000L, 600000L, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), new ExecutionVertex[]{triggerVertex1, triggerVertex2}, new ExecutionVertex[]{ackVertex1, ackVertex2}, new ExecutionVertex[0], (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(1), null, Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
            Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertFalse((boolean)coord.triggerCheckpoint(timestamp, false));
            Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            coord.shutdown(JobStatus.FINISHED);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testCheckpointAbortsIfAckTasksAreNotExecuted() {
        try {
            JobID jid = new JobID();
            long timestamp = System.currentTimeMillis();
            ExecutionAttemptID triggerAttemptID1 = new ExecutionAttemptID();
            ExecutionAttemptID triggerAttemptID2 = new ExecutionAttemptID();
            ExecutionVertex triggerVertex1 = CheckpointCoordinatorTest.mockExecutionVertex(triggerAttemptID1);
            ExecutionVertex triggerVertex2 = CheckpointCoordinatorTest.mockExecutionVertex(triggerAttemptID2);
            ExecutionVertex ackVertex1 = (ExecutionVertex)Mockito.mock(ExecutionVertex.class);
            ExecutionVertex ackVertex2 = (ExecutionVertex)Mockito.mock(ExecutionVertex.class);
            CheckpointCoordinator coord = new CheckpointCoordinator(jid, 600000L, 600000L, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), new ExecutionVertex[]{triggerVertex1, triggerVertex2}, new ExecutionVertex[]{ackVertex1, ackVertex2}, new ExecutionVertex[0], (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(1), null, Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
            Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertFalse((boolean)coord.triggerCheckpoint(timestamp, false));
            Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            coord.shutdown(JobStatus.FINISHED);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testTriggerAndDeclineCheckpointSimple() {
        try {
            JobID jid = new JobID();
            long timestamp = System.currentTimeMillis();
            ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
            ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
            ExecutionVertex vertex1 = CheckpointCoordinatorTest.mockExecutionVertex(attemptID1);
            ExecutionVertex vertex2 = CheckpointCoordinatorTest.mockExecutionVertex(attemptID2);
            CheckpointCoordinator coord = new CheckpointCoordinator(jid, 600000L, 600000L, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), new ExecutionVertex[]{vertex1, vertex2}, new ExecutionVertex[]{vertex1, vertex2}, new ExecutionVertex[]{vertex1, vertex2}, (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(1), null, Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
            Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertTrue((boolean)coord.triggerCheckpoint(timestamp, false));
            Assert.assertEquals((long)1L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertEquals((long)1L, (long)coord.getNumScheduledTasks());
            long checkpointId = (Long)coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
            PendingCheckpoint checkpoint = (PendingCheckpoint)coord.getPendingCheckpoints().get(checkpointId);
            Assert.assertNotNull((Object)checkpoint);
            Assert.assertEquals((long)checkpointId, (long)checkpoint.getCheckpointId());
            Assert.assertEquals((long)timestamp, (long)checkpoint.getCheckpointTimestamp());
            Assert.assertEquals((Object)jid, (Object)checkpoint.getJobId());
            Assert.assertEquals((long)2L, (long)checkpoint.getNumberOfNonAcknowledgedTasks());
            Assert.assertEquals((long)0L, (long)checkpoint.getNumberOfAcknowledgedTasks());
            Assert.assertEquals((long)0L, (long)checkpoint.getOperatorStates().size());
            Assert.assertFalse((boolean)checkpoint.isDiscarded());
            Assert.assertFalse((boolean)checkpoint.isFullyAcknowledged());
            ((Execution)Mockito.verify((Object)vertex1.getCurrentExecutionAttempt())).triggerCheckpoint(checkpointId, timestamp, CheckpointOptions.forCheckpoint());
            ((Execution)Mockito.verify((Object)vertex2.getCurrentExecutionAttempt())).triggerCheckpoint(checkpointId, timestamp, CheckpointOptions.forCheckpoint());
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
            Assert.assertEquals((long)1L, (long)checkpoint.getNumberOfAcknowledgedTasks());
            Assert.assertEquals((long)1L, (long)checkpoint.getNumberOfNonAcknowledgedTasks());
            Assert.assertFalse((boolean)checkpoint.isDiscarded());
            Assert.assertFalse((boolean)checkpoint.isFullyAcknowledged());
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
            Assert.assertFalse((boolean)checkpoint.isDiscarded());
            Assert.assertFalse((boolean)checkpoint.isFullyAcknowledged());
            coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId));
            Assert.assertTrue((boolean)checkpoint.isDiscarded());
            Assert.assertEquals((long)0L, (long)coord.getNumScheduledTasks());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId));
            coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpointId));
            Assert.assertTrue((boolean)checkpoint.isDiscarded());
            coord.shutdown(JobStatus.FINISHED);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testTriggerAndDeclineCheckpointComplex() {
        try {
            JobID jid = new JobID();
            long timestamp = System.currentTimeMillis();
            ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
            ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
            ExecutionVertex vertex1 = CheckpointCoordinatorTest.mockExecutionVertex(attemptID1);
            ExecutionVertex vertex2 = CheckpointCoordinatorTest.mockExecutionVertex(attemptID2);
            CheckpointCoordinator coord = new CheckpointCoordinator(jid, 600000L, 600000L, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), new ExecutionVertex[]{vertex1, vertex2}, new ExecutionVertex[]{vertex1, vertex2}, new ExecutionVertex[]{vertex1, vertex2}, (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(1), null, Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
            Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumScheduledTasks());
            Assert.assertTrue((boolean)coord.triggerCheckpoint(timestamp, false));
            Assert.assertTrue((boolean)coord.triggerCheckpoint(timestamp + 2L, false));
            Assert.assertEquals((long)2L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertEquals((long)2L, (long)coord.getNumScheduledTasks());
            Iterator it = coord.getPendingCheckpoints().entrySet().iterator();
            long checkpoint1Id = (Long)it.next().getKey();
            long checkpoint2Id = (Long)it.next().getKey();
            PendingCheckpoint checkpoint1 = (PendingCheckpoint)coord.getPendingCheckpoints().get(checkpoint1Id);
            PendingCheckpoint checkpoint2 = (PendingCheckpoint)coord.getPendingCheckpoints().get(checkpoint2Id);
            Assert.assertNotNull((Object)checkpoint1);
            Assert.assertEquals((long)checkpoint1Id, (long)checkpoint1.getCheckpointId());
            Assert.assertEquals((long)timestamp, (long)checkpoint1.getCheckpointTimestamp());
            Assert.assertEquals((Object)jid, (Object)checkpoint1.getJobId());
            Assert.assertEquals((long)2L, (long)checkpoint1.getNumberOfNonAcknowledgedTasks());
            Assert.assertEquals((long)0L, (long)checkpoint1.getNumberOfAcknowledgedTasks());
            Assert.assertEquals((long)0L, (long)checkpoint1.getOperatorStates().size());
            Assert.assertFalse((boolean)checkpoint1.isDiscarded());
            Assert.assertFalse((boolean)checkpoint1.isFullyAcknowledged());
            Assert.assertNotNull((Object)checkpoint2);
            Assert.assertEquals((long)checkpoint2Id, (long)checkpoint2.getCheckpointId());
            Assert.assertEquals((long)(timestamp + 2L), (long)checkpoint2.getCheckpointTimestamp());
            Assert.assertEquals((Object)jid, (Object)checkpoint2.getJobId());
            Assert.assertEquals((long)2L, (long)checkpoint2.getNumberOfNonAcknowledgedTasks());
            Assert.assertEquals((long)0L, (long)checkpoint2.getNumberOfAcknowledgedTasks());
            Assert.assertEquals((long)0L, (long)checkpoint2.getOperatorStates().size());
            Assert.assertFalse((boolean)checkpoint2.isDiscarded());
            Assert.assertFalse((boolean)checkpoint2.isFullyAcknowledged());
            ((Execution)Mockito.verify((Object)vertex1.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpoint1Id), Mockito.eq((long)timestamp), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            ((Execution)Mockito.verify((Object)vertex2.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpoint1Id), Mockito.eq((long)timestamp), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            ((Execution)Mockito.verify((Object)vertex1.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpoint2Id), Mockito.eq((long)(timestamp + 2L)), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            ((Execution)Mockito.verify((Object)vertex2.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpoint2Id), Mockito.eq((long)(timestamp + 2L)), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id));
            Assert.assertTrue((boolean)checkpoint1.isDiscarded());
            Assert.assertEquals((long)1L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertEquals((long)1L, (long)coord.getNumScheduledTasks());
            long checkpointIdNew = (Long)coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
            PendingCheckpoint checkpointNew = (PendingCheckpoint)coord.getPendingCheckpoints().get(checkpointIdNew);
            Assert.assertEquals((long)checkpoint2Id, (long)checkpointIdNew);
            Assert.assertNotNull((Object)checkpointNew);
            Assert.assertEquals((long)checkpointIdNew, (long)checkpointNew.getCheckpointId());
            Assert.assertEquals((Object)jid, (Object)checkpointNew.getJobId());
            Assert.assertEquals((long)2L, (long)checkpointNew.getNumberOfNonAcknowledgedTasks());
            Assert.assertEquals((long)0L, (long)checkpointNew.getNumberOfAcknowledgedTasks());
            Assert.assertEquals((long)0L, (long)checkpointNew.getOperatorStates().size());
            Assert.assertFalse((boolean)checkpointNew.isDiscarded());
            Assert.assertFalse((boolean)checkpointNew.isFullyAcknowledged());
            Assert.assertNotEquals((long)checkpoint1.getCheckpointId(), (long)checkpointNew.getCheckpointId());
            coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id));
            coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpoint1Id));
            Assert.assertTrue((boolean)checkpoint1.isDiscarded());
            coord.shutdown(JobStatus.FINISHED);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testTriggerAndConfirmSimpleCheckpoint() {
        try {
            JobID jid = new JobID();
            long timestamp = System.currentTimeMillis();
            ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
            ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
            ExecutionVertex vertex1 = CheckpointCoordinatorTest.mockExecutionVertex(attemptID1);
            ExecutionVertex vertex2 = CheckpointCoordinatorTest.mockExecutionVertex(attemptID2);
            CheckpointCoordinator coord = new CheckpointCoordinator(jid, 600000L, 600000L, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), new ExecutionVertex[]{vertex1, vertex2}, new ExecutionVertex[]{vertex1, vertex2}, new ExecutionVertex[]{vertex1, vertex2}, (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(1), null, Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
            Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumScheduledTasks());
            Assert.assertTrue((boolean)coord.triggerCheckpoint(timestamp, false));
            Assert.assertEquals((long)1L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertEquals((long)1L, (long)coord.getNumScheduledTasks());
            long checkpointId = (Long)coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
            PendingCheckpoint checkpoint = (PendingCheckpoint)coord.getPendingCheckpoints().get(checkpointId);
            Assert.assertNotNull((Object)checkpoint);
            Assert.assertEquals((long)checkpointId, (long)checkpoint.getCheckpointId());
            Assert.assertEquals((long)timestamp, (long)checkpoint.getCheckpointTimestamp());
            Assert.assertEquals((Object)jid, (Object)checkpoint.getJobId());
            Assert.assertEquals((long)2L, (long)checkpoint.getNumberOfNonAcknowledgedTasks());
            Assert.assertEquals((long)0L, (long)checkpoint.getNumberOfAcknowledgedTasks());
            Assert.assertEquals((long)0L, (long)checkpoint.getOperatorStates().size());
            Assert.assertFalse((boolean)checkpoint.isDiscarded());
            Assert.assertFalse((boolean)checkpoint.isFullyAcknowledged());
            ((Execution)Mockito.verify((Object)vertex1.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpointId), Mockito.eq((long)timestamp), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            ((Execution)Mockito.verify((Object)vertex2.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpointId), Mockito.eq((long)timestamp), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            OperatorID opID1 = OperatorID.fromJobVertexID((JobVertexID)vertex1.getJobvertexId());
            OperatorID opID2 = OperatorID.fromJobVertexID((JobVertexID)vertex2.getJobvertexId());
            TaskStateSnapshot taskOperatorSubtaskStates1 = (TaskStateSnapshot)Mockito.mock(TaskStateSnapshot.class);
            TaskStateSnapshot taskOperatorSubtaskStates2 = (TaskStateSnapshot)Mockito.mock(TaskStateSnapshot.class);
            OperatorSubtaskState subtaskState1 = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
            OperatorSubtaskState subtaskState2 = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
            Mockito.when((Object)taskOperatorSubtaskStates1.getSubtaskStateByOperatorID(opID1)).thenReturn((Object)subtaskState1);
            Mockito.when((Object)taskOperatorSubtaskStates2.getSubtaskStateByOperatorID(opID2)).thenReturn((Object)subtaskState2);
            AcknowledgeCheckpoint acknowledgeCheckpoint1 = new AcknowledgeCheckpoint(jid, attemptID2, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates2);
            coord.receiveAcknowledgeMessage(acknowledgeCheckpoint1);
            Assert.assertEquals((long)1L, (long)checkpoint.getNumberOfAcknowledgedTasks());
            Assert.assertEquals((long)1L, (long)checkpoint.getNumberOfNonAcknowledgedTasks());
            Assert.assertFalse((boolean)checkpoint.isDiscarded());
            Assert.assertFalse((boolean)checkpoint.isFullyAcknowledged());
            ((TaskStateSnapshot)Mockito.verify((Object)taskOperatorSubtaskStates2, (VerificationMode)Mockito.never())).registerSharedStates((SharedStateRegistry)Matchers.any(SharedStateRegistry.class));
            coord.receiveAcknowledgeMessage(acknowledgeCheckpoint1);
            Assert.assertFalse((boolean)checkpoint.isDiscarded());
            Assert.assertFalse((boolean)checkpoint.isFullyAcknowledged());
            ((OperatorSubtaskState)Mockito.verify((Object)subtaskState2, (VerificationMode)Mockito.never())).registerSharedStates((SharedStateRegistry)Matchers.any(SharedStateRegistry.class));
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates1));
            Assert.assertTrue((boolean)checkpoint.isDiscarded());
            Assert.assertEquals((long)1L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumScheduledTasks());
            ((OperatorSubtaskState)Mockito.verify((Object)subtaskState1, (VerificationMode)Mockito.times((int)1))).registerSharedStates((SharedStateRegistry)Matchers.any(SharedStateRegistry.class));
            ((OperatorSubtaskState)Mockito.verify((Object)subtaskState2, (VerificationMode)Mockito.times((int)1))).registerSharedStates((SharedStateRegistry)Matchers.any(SharedStateRegistry.class));
            ((Execution)Mockito.verify((Object)vertex1.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpointId), Mockito.eq((long)timestamp), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            ((Execution)Mockito.verify((Object)vertex2.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpointId), Mockito.eq((long)timestamp), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            CompletedCheckpoint success = (CompletedCheckpoint)coord.getSuccessfulCheckpoints().get(0);
            Assert.assertEquals((Object)jid, (Object)success.getJobId());
            Assert.assertEquals((long)timestamp, (long)success.getTimestamp());
            Assert.assertEquals((long)checkpoint.getCheckpointId(), (long)success.getCheckpointID());
            Assert.assertEquals((long)2L, (long)success.getOperatorStates().size());
            long timestampNew = timestamp + 7L;
            coord.triggerCheckpoint(timestampNew, false);
            long checkpointIdNew = (Long)coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointIdNew));
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew));
            Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)1L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumScheduledTasks());
            CompletedCheckpoint successNew = (CompletedCheckpoint)coord.getSuccessfulCheckpoints().get(0);
            Assert.assertEquals((Object)jid, (Object)successNew.getJobId());
            Assert.assertEquals((long)timestampNew, (long)successNew.getTimestamp());
            Assert.assertEquals((long)checkpointIdNew, (long)successNew.getCheckpointID());
            Assert.assertTrue((boolean)successNew.getOperatorStates().isEmpty());
            ((Execution)Mockito.verify((Object)vertex1.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpointIdNew), Mockito.eq((long)timestampNew), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            ((Execution)Mockito.verify((Object)vertex2.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpointIdNew), Mockito.eq((long)timestampNew), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            ((Execution)Mockito.verify((Object)vertex1.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).notifyCheckpointComplete(Mockito.eq((long)checkpointIdNew), Mockito.eq((long)timestampNew));
            ((Execution)Mockito.verify((Object)vertex2.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).notifyCheckpointComplete(Mockito.eq((long)checkpointIdNew), Mockito.eq((long)timestampNew));
            coord.shutdown(JobStatus.FINISHED);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testMultipleConcurrentCheckpoints() {
        try {
            JobID jid = new JobID();
            long timestamp1 = System.currentTimeMillis();
            long timestamp2 = timestamp1 + 8617L;
            ExecutionAttemptID triggerAttemptID1 = new ExecutionAttemptID();
            ExecutionAttemptID triggerAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID1 = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID3 = new ExecutionAttemptID();
            ExecutionAttemptID commitAttemptID = new ExecutionAttemptID();
            ExecutionVertex triggerVertex1 = CheckpointCoordinatorTest.mockExecutionVertex(triggerAttemptID1);
            ExecutionVertex triggerVertex2 = CheckpointCoordinatorTest.mockExecutionVertex(triggerAttemptID2);
            ExecutionVertex ackVertex1 = CheckpointCoordinatorTest.mockExecutionVertex(ackAttemptID1);
            ExecutionVertex ackVertex2 = CheckpointCoordinatorTest.mockExecutionVertex(ackAttemptID2);
            ExecutionVertex ackVertex3 = CheckpointCoordinatorTest.mockExecutionVertex(ackAttemptID3);
            ExecutionVertex commitVertex = CheckpointCoordinatorTest.mockExecutionVertex(commitAttemptID);
            CheckpointCoordinator coord = new CheckpointCoordinator(jid, 600000L, 600000L, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), new ExecutionVertex[]{triggerVertex1, triggerVertex2}, new ExecutionVertex[]{ackVertex1, ackVertex2, ackVertex3}, new ExecutionVertex[]{commitVertex}, (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2), null, Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
            Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertTrue((boolean)coord.triggerCheckpoint(timestamp1, false));
            Assert.assertEquals((long)1L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            PendingCheckpoint pending1 = (PendingCheckpoint)coord.getPendingCheckpoints().values().iterator().next();
            long checkpointId1 = pending1.getCheckpointId();
            ((Execution)Mockito.verify((Object)triggerVertex1.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpointId1), Mockito.eq((long)timestamp1), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            ((Execution)Mockito.verify((Object)triggerVertex2.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpointId1), Mockito.eq((long)timestamp1), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId1));
            Assert.assertTrue((boolean)coord.triggerCheckpoint(timestamp2, false));
            Assert.assertEquals((long)2L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            Iterator all = coord.getPendingCheckpoints().values().iterator();
            PendingCheckpoint cc1 = (PendingCheckpoint)all.next();
            PendingCheckpoint cc2 = (PendingCheckpoint)all.next();
            PendingCheckpoint pending2 = pending1 == cc1 ? cc2 : cc1;
            long checkpointId2 = pending2.getCheckpointId();
            ((Execution)Mockito.verify((Object)triggerVertex1.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpointId2), Mockito.eq((long)timestamp2), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            ((Execution)Mockito.verify((Object)triggerVertex2.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpointId2), Mockito.eq((long)timestamp2), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1));
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId2));
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId1));
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId2));
            Assert.assertEquals((long)1L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)1L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertTrue((boolean)pending1.isDiscarded());
            ((Execution)Mockito.verify((Object)commitVertex.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).notifyCheckpointComplete(Mockito.eq((long)checkpointId1), Mockito.eq((long)timestamp1));
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2));
            Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)2L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertTrue((boolean)pending2.isDiscarded());
            ((Execution)Mockito.verify((Object)commitVertex.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).notifyCheckpointComplete(Mockito.eq((long)checkpointId2), Mockito.eq((long)timestamp2));
            List scs = coord.getSuccessfulCheckpoints();
            CompletedCheckpoint sc1 = (CompletedCheckpoint)scs.get(0);
            Assert.assertEquals((long)checkpointId1, (long)sc1.getCheckpointID());
            Assert.assertEquals((long)timestamp1, (long)sc1.getTimestamp());
            Assert.assertEquals((Object)jid, (Object)sc1.getJobId());
            Assert.assertTrue((boolean)sc1.getOperatorStates().isEmpty());
            CompletedCheckpoint sc2 = (CompletedCheckpoint)scs.get(1);
            Assert.assertEquals((long)checkpointId2, (long)sc2.getCheckpointID());
            Assert.assertEquals((long)timestamp2, (long)sc2.getTimestamp());
            Assert.assertEquals((Object)jid, (Object)sc2.getJobId());
            Assert.assertTrue((boolean)sc2.getOperatorStates().isEmpty());
            coord.shutdown(JobStatus.FINISHED);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testSuccessfulCheckpointSubsumesUnsuccessful() {
        try {
            JobID jid = new JobID();
            long timestamp1 = System.currentTimeMillis();
            long timestamp2 = timestamp1 + 1552L;
            ExecutionAttemptID triggerAttemptID1 = new ExecutionAttemptID();
            ExecutionAttemptID triggerAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID1 = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID3 = new ExecutionAttemptID();
            ExecutionAttemptID commitAttemptID = new ExecutionAttemptID();
            ExecutionVertex triggerVertex1 = CheckpointCoordinatorTest.mockExecutionVertex(triggerAttemptID1);
            ExecutionVertex triggerVertex2 = CheckpointCoordinatorTest.mockExecutionVertex(triggerAttemptID2);
            ExecutionVertex ackVertex1 = CheckpointCoordinatorTest.mockExecutionVertex(ackAttemptID1);
            ExecutionVertex ackVertex2 = CheckpointCoordinatorTest.mockExecutionVertex(ackAttemptID2);
            ExecutionVertex ackVertex3 = CheckpointCoordinatorTest.mockExecutionVertex(ackAttemptID3);
            ExecutionVertex commitVertex = CheckpointCoordinatorTest.mockExecutionVertex(commitAttemptID);
            CheckpointCoordinator coord = new CheckpointCoordinator(jid, 600000L, 600000L, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), new ExecutionVertex[]{triggerVertex1, triggerVertex2}, new ExecutionVertex[]{ackVertex1, ackVertex2, ackVertex3}, new ExecutionVertex[]{commitVertex}, (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(10), null, Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
            Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertTrue((boolean)coord.triggerCheckpoint(timestamp1, false));
            Assert.assertEquals((long)1L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            PendingCheckpoint pending1 = (PendingCheckpoint)coord.getPendingCheckpoints().values().iterator().next();
            long checkpointId1 = pending1.getCheckpointId();
            ((Execution)Mockito.verify((Object)triggerVertex1.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpointId1), Mockito.eq((long)timestamp1), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            ((Execution)Mockito.verify((Object)triggerVertex2.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpointId1), Mockito.eq((long)timestamp1), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            OperatorID opID1 = OperatorID.fromJobVertexID((JobVertexID)ackVertex1.getJobvertexId());
            OperatorID opID2 = OperatorID.fromJobVertexID((JobVertexID)ackVertex2.getJobvertexId());
            OperatorID opID3 = OperatorID.fromJobVertexID((JobVertexID)ackVertex3.getJobvertexId());
            TaskStateSnapshot taskOperatorSubtaskStates1_1 = (TaskStateSnapshot)Mockito.spy((Object)new TaskStateSnapshot());
            TaskStateSnapshot taskOperatorSubtaskStates1_2 = (TaskStateSnapshot)Mockito.spy((Object)new TaskStateSnapshot());
            TaskStateSnapshot taskOperatorSubtaskStates1_3 = (TaskStateSnapshot)Mockito.spy((Object)new TaskStateSnapshot());
            OperatorSubtaskState subtaskState1_1 = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
            OperatorSubtaskState subtaskState1_2 = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
            OperatorSubtaskState subtaskState1_3 = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
            taskOperatorSubtaskStates1_1.putSubtaskStateByOperatorID(opID1, subtaskState1_1);
            taskOperatorSubtaskStates1_2.putSubtaskStateByOperatorID(opID2, subtaskState1_2);
            taskOperatorSubtaskStates1_3.putSubtaskStateByOperatorID(opID3, subtaskState1_3);
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId1, new CheckpointMetrics(), taskOperatorSubtaskStates1_2));
            Assert.assertTrue((boolean)coord.triggerCheckpoint(timestamp2, false));
            Assert.assertEquals((long)2L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            Iterator all = coord.getPendingCheckpoints().values().iterator();
            PendingCheckpoint cc1 = (PendingCheckpoint)all.next();
            PendingCheckpoint cc2 = (PendingCheckpoint)all.next();
            PendingCheckpoint pending2 = pending1 == cc1 ? cc2 : cc1;
            long checkpointId2 = pending2.getCheckpointId();
            TaskStateSnapshot taskOperatorSubtaskStates2_1 = (TaskStateSnapshot)Mockito.spy((Object)new TaskStateSnapshot());
            TaskStateSnapshot taskOperatorSubtaskStates2_2 = (TaskStateSnapshot)Mockito.spy((Object)new TaskStateSnapshot());
            TaskStateSnapshot taskOperatorSubtaskStates2_3 = (TaskStateSnapshot)Mockito.spy((Object)new TaskStateSnapshot());
            OperatorSubtaskState subtaskState2_1 = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
            OperatorSubtaskState subtaskState2_2 = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
            OperatorSubtaskState subtaskState2_3 = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
            taskOperatorSubtaskStates2_1.putSubtaskStateByOperatorID(opID1, subtaskState2_1);
            taskOperatorSubtaskStates2_2.putSubtaskStateByOperatorID(opID2, subtaskState2_2);
            taskOperatorSubtaskStates2_3.putSubtaskStateByOperatorID(opID3, subtaskState2_3);
            ((Execution)Mockito.verify((Object)triggerVertex1.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpointId2), Mockito.eq((long)timestamp2), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            ((Execution)Mockito.verify((Object)triggerVertex2.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpointId2), Mockito.eq((long)timestamp2), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2, new CheckpointMetrics(), taskOperatorSubtaskStates2_3));
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId2, new CheckpointMetrics(), taskOperatorSubtaskStates2_1));
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId1, new CheckpointMetrics(), taskOperatorSubtaskStates1_1));
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId2, new CheckpointMetrics(), taskOperatorSubtaskStates2_2));
            Assert.assertTrue((boolean)pending1.isDiscarded());
            Assert.assertTrue((boolean)pending2.isDiscarded());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)1L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            ((OperatorSubtaskState)Mockito.verify((Object)subtaskState1_1, (VerificationMode)Mockito.times((int)1))).discardState();
            ((OperatorSubtaskState)Mockito.verify((Object)subtaskState1_2, (VerificationMode)Mockito.times((int)1))).discardState();
            ((OperatorSubtaskState)Mockito.verify((Object)subtaskState2_1, (VerificationMode)Mockito.never())).discardState();
            ((OperatorSubtaskState)Mockito.verify((Object)subtaskState2_2, (VerificationMode)Mockito.never())).discardState();
            ((OperatorSubtaskState)Mockito.verify((Object)subtaskState2_3, (VerificationMode)Mockito.never())).discardState();
            List scs = coord.getSuccessfulCheckpoints();
            CompletedCheckpoint success = (CompletedCheckpoint)scs.get(0);
            Assert.assertEquals((long)checkpointId2, (long)success.getCheckpointID());
            Assert.assertEquals((long)timestamp2, (long)success.getTimestamp());
            Assert.assertEquals((Object)jid, (Object)success.getJobId());
            Assert.assertEquals((long)3L, (long)success.getOperatorStates().size());
            ((Execution)Mockito.verify((Object)commitVertex.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).notifyCheckpointComplete(Mockito.eq((long)checkpointId2), Mockito.eq((long)timestamp2));
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1, new CheckpointMetrics(), taskOperatorSubtaskStates1_3));
            ((OperatorSubtaskState)Mockito.verify((Object)subtaskState1_3, (VerificationMode)Mockito.times((int)1))).discardState();
            coord.shutdown(JobStatus.FINISHED);
            ((OperatorSubtaskState)Mockito.verify((Object)subtaskState2_1, (VerificationMode)Mockito.times((int)1))).discardState();
            ((OperatorSubtaskState)Mockito.verify((Object)subtaskState2_2, (VerificationMode)Mockito.times((int)1))).discardState();
            ((OperatorSubtaskState)Mockito.verify((Object)subtaskState2_3, (VerificationMode)Mockito.times((int)1))).discardState();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testCheckpointTimeoutIsolated() {
        try {
            JobID jid = new JobID();
            long timestamp = System.currentTimeMillis();
            ExecutionAttemptID triggerAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID1 = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID commitAttemptID = new ExecutionAttemptID();
            ExecutionVertex triggerVertex = CheckpointCoordinatorTest.mockExecutionVertex(triggerAttemptID);
            ExecutionVertex ackVertex1 = CheckpointCoordinatorTest.mockExecutionVertex(ackAttemptID1);
            ExecutionVertex ackVertex2 = CheckpointCoordinatorTest.mockExecutionVertex(ackAttemptID2);
            ExecutionVertex commitVertex = CheckpointCoordinatorTest.mockExecutionVertex(commitAttemptID);
            CheckpointCoordinator coord = new CheckpointCoordinator(jid, 600000L, 200L, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), new ExecutionVertex[]{triggerVertex}, new ExecutionVertex[]{ackVertex1, ackVertex2}, new ExecutionVertex[]{commitVertex}, (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2), null, Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
            Assert.assertTrue((boolean)coord.triggerCheckpoint(timestamp, false));
            Assert.assertEquals((long)1L, (long)coord.getNumberOfPendingCheckpoints());
            PendingCheckpoint checkpoint = (PendingCheckpoint)coord.getPendingCheckpoints().values().iterator().next();
            Assert.assertFalse((boolean)checkpoint.isDiscarded());
            OperatorID opID1 = OperatorID.fromJobVertexID((JobVertexID)ackVertex1.getJobvertexId());
            TaskStateSnapshot taskOperatorSubtaskStates1 = (TaskStateSnapshot)Mockito.spy((Object)new TaskStateSnapshot());
            OperatorSubtaskState subtaskState1 = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
            taskOperatorSubtaskStates1.putSubtaskStateByOperatorID(opID1, subtaskState1);
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpoint.getCheckpointId(), new CheckpointMetrics(), taskOperatorSubtaskStates1));
            long deadline = System.currentTimeMillis() + 5000L;
            do {
                Thread.sleep(250L);
            } while (!checkpoint.isDiscarded() && coord.getNumberOfPendingCheckpoints() > 0 && System.currentTimeMillis() < deadline);
            Assert.assertTrue((String)"Checkpoint was not canceled by the timeout", (boolean)checkpoint.isDiscarded());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            ((OperatorSubtaskState)Mockito.verify((Object)subtaskState1, (VerificationMode)Mockito.times((int)1))).discardState();
            ((Execution)Mockito.verify((Object)commitVertex.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)0))).notifyCheckpointComplete(Matchers.anyLong(), Matchers.anyLong());
            coord.shutdown(JobStatus.FINISHED);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testHandleMessagesForNonExistingCheckpoints() {
        try {
            JobID jid = new JobID();
            long timestamp = System.currentTimeMillis();
            ExecutionAttemptID triggerAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID1 = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID commitAttemptID = new ExecutionAttemptID();
            ExecutionVertex triggerVertex = CheckpointCoordinatorTest.mockExecutionVertex(triggerAttemptID);
            ExecutionVertex ackVertex1 = CheckpointCoordinatorTest.mockExecutionVertex(ackAttemptID1);
            ExecutionVertex ackVertex2 = CheckpointCoordinatorTest.mockExecutionVertex(ackAttemptID2);
            ExecutionVertex commitVertex = CheckpointCoordinatorTest.mockExecutionVertex(commitAttemptID);
            CheckpointCoordinator coord = new CheckpointCoordinator(jid, 200000L, 200000L, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), new ExecutionVertex[]{triggerVertex}, new ExecutionVertex[]{ackVertex1, ackVertex2}, new ExecutionVertex[]{commitVertex}, (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2), null, Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
            Assert.assertTrue((boolean)coord.triggerCheckpoint(timestamp, false));
            long checkpointId = (Long)coord.getPendingCheckpoints().keySet().iterator().next();
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), ackAttemptID1, checkpointId));
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, 1L));
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, new ExecutionAttemptID(), checkpointId));
            coord.shutdown(JobStatus.FINISHED);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testStateCleanupForLateOrUnknownMessages() throws Exception {
        JobID jobId = new JobID();
        ExecutionAttemptID triggerAttemptId = new ExecutionAttemptID();
        ExecutionVertex triggerVertex = CheckpointCoordinatorTest.mockExecutionVertex(triggerAttemptId);
        ExecutionAttemptID ackAttemptId1 = new ExecutionAttemptID();
        ExecutionVertex ackVertex1 = CheckpointCoordinatorTest.mockExecutionVertex(ackAttemptId1);
        ExecutionAttemptID ackAttemptId2 = new ExecutionAttemptID();
        ExecutionVertex ackVertex2 = CheckpointCoordinatorTest.mockExecutionVertex(ackAttemptId2);
        long timestamp = 1L;
        CheckpointCoordinator coord = new CheckpointCoordinator(jobId, 20000L, 20000L, 0L, 1, ExternalizedCheckpointSettings.none(), new ExecutionVertex[]{triggerVertex}, new ExecutionVertex[]{triggerVertex, ackVertex1, ackVertex2}, new ExecutionVertex[0], (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(1), null, Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
        Assert.assertTrue((boolean)coord.triggerCheckpoint(1L, false));
        Assert.assertEquals((long)1L, (long)coord.getNumberOfPendingCheckpoints());
        PendingCheckpoint pendingCheckpoint = (PendingCheckpoint)coord.getPendingCheckpoints().values().iterator().next();
        long checkpointId = pendingCheckpoint.getCheckpointId();
        OperatorID opIDtrigger = OperatorID.fromJobVertexID((JobVertexID)triggerVertex.getJobvertexId());
        TaskStateSnapshot taskOperatorSubtaskStatesTrigger = (TaskStateSnapshot)Mockito.spy((Object)new TaskStateSnapshot());
        OperatorSubtaskState subtaskStateTrigger = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
        taskOperatorSubtaskStatesTrigger.putSubtaskStateByOperatorID(opIDtrigger, subtaskStateTrigger);
        coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStatesTrigger));
        ((OperatorSubtaskState)Mockito.verify((Object)subtaskStateTrigger, (VerificationMode)Mockito.never())).discardState();
        TaskStateSnapshot unknownSubtaskState = (TaskStateSnapshot)Mockito.mock(TaskStateSnapshot.class);
        coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), unknownSubtaskState));
        ((TaskStateSnapshot)Mockito.verify((Object)unknownSubtaskState, (VerificationMode)Mockito.times((int)1))).discardState();
        TaskStateSnapshot differentJobSubtaskState = (TaskStateSnapshot)Mockito.mock(TaskStateSnapshot.class);
        coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), differentJobSubtaskState));
        ((TaskStateSnapshot)Mockito.verify((Object)differentJobSubtaskState, (VerificationMode)Mockito.never())).discardState();
        TaskStateSnapshot triggerSubtaskState = (TaskStateSnapshot)Mockito.mock(TaskStateSnapshot.class);
        coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointId, new CheckpointMetrics(), triggerSubtaskState));
        ((TaskStateSnapshot)Mockito.verify((Object)triggerSubtaskState, (VerificationMode)Mockito.never())).discardState();
        Mockito.reset((Object[])new OperatorSubtaskState[]{subtaskStateTrigger});
        coord.receiveDeclineMessage(new DeclineCheckpoint(jobId, ackAttemptId1, checkpointId));
        Assert.assertTrue((boolean)pendingCheckpoint.isDiscarded());
        ((OperatorSubtaskState)Mockito.verify((Object)subtaskStateTrigger, (VerificationMode)Mockito.times((int)1))).discardState();
        TaskStateSnapshot ackSubtaskState = (TaskStateSnapshot)Mockito.mock(TaskStateSnapshot.class);
        coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptId2, checkpointId, new CheckpointMetrics(), ackSubtaskState));
        ((TaskStateSnapshot)Mockito.verify((Object)ackSubtaskState, (VerificationMode)Mockito.times((int)1))).discardState();
        Mockito.reset((Object[])new TaskStateSnapshot[]{differentJobSubtaskState});
        coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), differentJobSubtaskState));
        ((TaskStateSnapshot)Mockito.verify((Object)differentJobSubtaskState, (VerificationMode)Mockito.never())).discardState();
        TaskStateSnapshot unknownSubtaskState2 = (TaskStateSnapshot)Mockito.mock(TaskStateSnapshot.class);
        coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointId, new CheckpointMetrics(), unknownSubtaskState2));
        ((TaskStateSnapshot)Mockito.verify((Object)unknownSubtaskState2, (VerificationMode)Mockito.times((int)1))).discardState();
    }

    @Test
    public void testPeriodicTriggering() {
        try {
            JobID jid = new JobID();
            final long start = System.currentTimeMillis();
            ExecutionAttemptID triggerAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID commitAttemptID = new ExecutionAttemptID();
            ExecutionVertex triggerVertex = CheckpointCoordinatorTest.mockExecutionVertex(triggerAttemptID);
            ExecutionVertex ackVertex = CheckpointCoordinatorTest.mockExecutionVertex(ackAttemptID);
            ExecutionVertex commitVertex = CheckpointCoordinatorTest.mockExecutionVertex(commitAttemptID);
            final AtomicInteger numCalls = new AtomicInteger();
            Execution execution = triggerVertex.getCurrentExecutionAttempt();
            ((Execution)Mockito.doAnswer((Answer)new Answer<Void>(){
                private long lastId = -1L;
                private long lastTs = -1L;

                public Void answer(InvocationOnMock invocation) throws Throwable {
                    long id = (Long)invocation.getArguments()[0];
                    long ts = (Long)invocation.getArguments()[1];
                    Assert.assertTrue((id > this.lastId ? 1 : 0) != 0);
                    Assert.assertTrue((ts >= this.lastTs ? 1 : 0) != 0);
                    Assert.assertTrue((ts >= start ? 1 : 0) != 0);
                    this.lastId = id;
                    this.lastTs = ts;
                    numCalls.incrementAndGet();
                    return null;
                }
            }).when((Object)execution)).triggerCheckpoint(Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            CheckpointCoordinator coord = new CheckpointCoordinator(jid, 10L, 200000L, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), new ExecutionVertex[]{triggerVertex}, new ExecutionVertex[]{ackVertex}, new ExecutionVertex[]{commitVertex}, (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2), null, Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
            coord.startCheckpointScheduler();
            long timeout = System.currentTimeMillis() + 60000L;
            do {
                Thread.sleep(20L);
            } while (timeout > System.currentTimeMillis() && numCalls.get() < 5);
            Assert.assertTrue((numCalls.get() >= 5 ? 1 : 0) != 0);
            coord.stopCheckpointScheduler();
            int numCallsSoFar = numCalls.get();
            Thread.sleep(400L);
            Assert.assertTrue((numCallsSoFar == numCalls.get() || numCallsSoFar + 1 == numCalls.get() ? 1 : 0) != 0);
            numCalls.set(0);
            coord.startCheckpointScheduler();
            timeout = System.currentTimeMillis() + 60000L;
            do {
                Thread.sleep(20L);
            } while (timeout > System.currentTimeMillis() && numCalls.get() < 5);
            Assert.assertTrue((numCalls.get() >= 5 ? 1 : 0) != 0);
            coord.stopCheckpointScheduler();
            numCallsSoFar = numCalls.get();
            Thread.sleep(400L);
            Assert.assertTrue((numCallsSoFar == numCalls.get() || numCallsSoFar + 1 == numCalls.get() ? 1 : 0) != 0);
            coord.shutdown(JobStatus.FINISHED);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMinTimeBetweenCheckpointsInterval() throws Exception {
        JobID jid = new JobID();
        ExecutionAttemptID attemptID = new ExecutionAttemptID();
        ExecutionVertex vertex = CheckpointCoordinatorTest.mockExecutionVertex(attemptID);
        Execution executionAttempt = vertex.getCurrentExecutionAttempt();
        LinkedBlockingQueue triggerCalls = new LinkedBlockingQueue();
        ((Execution)Mockito.doAnswer(invocation -> {
            triggerCalls.add((Long)invocation.getArguments()[0]);
            return null;
        }).when((Object)executionAttempt)).triggerCheckpoint(Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
        long delay = 50L;
        CheckpointCoordinator coord = new CheckpointCoordinator(jid, 2L, 200000L, 50L, 1, ExternalizedCheckpointSettings.none(), new ExecutionVertex[]{vertex}, new ExecutionVertex[]{vertex}, new ExecutionVertex[]{vertex}, (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2), "dummy-path", Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
        try {
            coord.startCheckpointScheduler();
            Long firstCallId = (Long)triggerCalls.take();
            Assert.assertEquals((long)1L, (long)firstCallId);
            AcknowledgeCheckpoint ackMsg = new AcknowledgeCheckpoint(jid, attemptID, 1L);
            long ackTime = System.nanoTime();
            coord.receiveAcknowledgeMessage(ackMsg);
            Long nextCallId = (Long)triggerCalls.take();
            long nextCheckpointTime = System.nanoTime();
            Assert.assertEquals((long)2L, (long)nextCallId);
            long delayMillis = (nextCheckpointTime - ackTime) / 1000000L;
            if (delayMillis + 1L < 50L) {
                Assert.fail((String)("checkpoint came too early: delay was " + delayMillis + " but should have been at least " + 50L));
            }
        }
        finally {
            coord.stopCheckpointScheduler();
            coord.shutdown(JobStatus.FINISHED);
        }
    }

    @Test
    public void testMaxConcurrentAttempts1() {
        this.testMaxConcurrentAttempts(1);
    }

    @Test
    public void testMaxConcurrentAttempts2() {
        this.testMaxConcurrentAttempts(2);
    }

    @Test
    public void testMaxConcurrentAttempts5() {
        this.testMaxConcurrentAttempts(5);
    }

    @Test
    public void testTriggerAndConfirmSimpleSavepoint() throws Exception {
        JobID jid = new JobID();
        long timestamp = System.currentTimeMillis();
        ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
        ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
        ExecutionVertex vertex1 = CheckpointCoordinatorTest.mockExecutionVertex(attemptID1);
        ExecutionVertex vertex2 = CheckpointCoordinatorTest.mockExecutionVertex(attemptID2);
        CheckpointCoordinator coord = new CheckpointCoordinator(jid, 600000L, 600000L, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), new ExecutionVertex[]{vertex1, vertex2}, new ExecutionVertex[]{vertex1, vertex2}, new ExecutionVertex[]{vertex1, vertex2}, (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(1), null, Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
        Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
        String savepointDir = this.tmpFolder.newFolder().getAbsolutePath();
        CompletableFuture savepointFuture = coord.triggerSavepoint(timestamp, savepointDir);
        Assert.assertFalse((boolean)savepointFuture.isDone());
        Assert.assertEquals((long)1L, (long)coord.getNumberOfPendingCheckpoints());
        long checkpointId = (Long)coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
        PendingCheckpoint pending = (PendingCheckpoint)coord.getPendingCheckpoints().get(checkpointId);
        Assert.assertNotNull((Object)pending);
        Assert.assertEquals((long)checkpointId, (long)pending.getCheckpointId());
        Assert.assertEquals((long)timestamp, (long)pending.getCheckpointTimestamp());
        Assert.assertEquals((Object)jid, (Object)pending.getJobId());
        Assert.assertEquals((long)2L, (long)pending.getNumberOfNonAcknowledgedTasks());
        Assert.assertEquals((long)0L, (long)pending.getNumberOfAcknowledgedTasks());
        Assert.assertEquals((long)0L, (long)pending.getOperatorStates().size());
        Assert.assertFalse((boolean)pending.isDiscarded());
        Assert.assertFalse((boolean)pending.isFullyAcknowledged());
        Assert.assertFalse((boolean)pending.canBeSubsumed());
        OperatorID opID1 = OperatorID.fromJobVertexID((JobVertexID)vertex1.getJobvertexId());
        OperatorID opID2 = OperatorID.fromJobVertexID((JobVertexID)vertex2.getJobvertexId());
        TaskStateSnapshot taskOperatorSubtaskStates1 = (TaskStateSnapshot)Mockito.mock(TaskStateSnapshot.class);
        TaskStateSnapshot taskOperatorSubtaskStates2 = (TaskStateSnapshot)Mockito.mock(TaskStateSnapshot.class);
        OperatorSubtaskState subtaskState1 = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
        OperatorSubtaskState subtaskState2 = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
        Mockito.when((Object)taskOperatorSubtaskStates1.getSubtaskStateByOperatorID(opID1)).thenReturn((Object)subtaskState1);
        Mockito.when((Object)taskOperatorSubtaskStates2.getSubtaskStateByOperatorID(opID2)).thenReturn((Object)subtaskState2);
        AcknowledgeCheckpoint acknowledgeCheckpoint2 = new AcknowledgeCheckpoint(jid, attemptID2, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates2);
        coord.receiveAcknowledgeMessage(acknowledgeCheckpoint2);
        Assert.assertEquals((long)1L, (long)pending.getNumberOfAcknowledgedTasks());
        Assert.assertEquals((long)1L, (long)pending.getNumberOfNonAcknowledgedTasks());
        Assert.assertFalse((boolean)pending.isDiscarded());
        Assert.assertFalse((boolean)pending.isFullyAcknowledged());
        Assert.assertFalse((boolean)savepointFuture.isDone());
        coord.receiveAcknowledgeMessage(acknowledgeCheckpoint2);
        Assert.assertFalse((boolean)pending.isDiscarded());
        Assert.assertFalse((boolean)pending.isFullyAcknowledged());
        Assert.assertFalse((boolean)savepointFuture.isDone());
        coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates1));
        Assert.assertTrue((boolean)pending.isDiscarded());
        Assert.assertTrue((boolean)savepointFuture.isDone());
        Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
        ((Execution)Mockito.verify((Object)vertex1.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).notifyCheckpointComplete(Mockito.eq((long)checkpointId), Mockito.eq((long)timestamp));
        ((Execution)Mockito.verify((Object)vertex2.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).notifyCheckpointComplete(Mockito.eq((long)checkpointId), Mockito.eq((long)timestamp));
        ((OperatorSubtaskState)Mockito.verify((Object)subtaskState1, (VerificationMode)Mockito.times((int)1))).registerSharedStates((SharedStateRegistry)Matchers.any(SharedStateRegistry.class));
        ((OperatorSubtaskState)Mockito.verify((Object)subtaskState2, (VerificationMode)Mockito.times((int)1))).registerSharedStates((SharedStateRegistry)Matchers.any(SharedStateRegistry.class));
        CompletedCheckpoint success = (CompletedCheckpoint)savepointFuture.get();
        Assert.assertEquals((Object)jid, (Object)success.getJobId());
        Assert.assertEquals((long)timestamp, (long)success.getTimestamp());
        Assert.assertEquals((long)pending.getCheckpointId(), (long)success.getCheckpointID());
        Assert.assertEquals((long)2L, (long)success.getOperatorStates().size());
        long timestampNew = timestamp + 7L;
        savepointFuture = coord.triggerSavepoint(timestampNew, savepointDir);
        Assert.assertFalse((boolean)savepointFuture.isDone());
        long checkpointIdNew = (Long)coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
        coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointIdNew));
        coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew));
        Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
        CompletedCheckpoint successNew = (CompletedCheckpoint)savepointFuture.get();
        Assert.assertEquals((Object)jid, (Object)successNew.getJobId());
        Assert.assertEquals((long)timestampNew, (long)successNew.getTimestamp());
        Assert.assertEquals((long)checkpointIdNew, (long)successNew.getCheckpointID());
        Assert.assertTrue((boolean)successNew.getOperatorStates().isEmpty());
        Assert.assertTrue((boolean)savepointFuture.isDone());
        ((OperatorSubtaskState)Mockito.verify((Object)subtaskState1, (VerificationMode)Mockito.never())).discardState();
        ((OperatorSubtaskState)Mockito.verify((Object)subtaskState2, (VerificationMode)Mockito.never())).discardState();
        ((Execution)Mockito.verify((Object)vertex1.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpointIdNew), Mockito.eq((long)timestampNew), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
        ((Execution)Mockito.verify((Object)vertex2.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.eq((long)checkpointIdNew), Mockito.eq((long)timestampNew), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
        ((Execution)Mockito.verify((Object)vertex1.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).notifyCheckpointComplete(Mockito.eq((long)checkpointIdNew), Mockito.eq((long)timestampNew));
        ((Execution)Mockito.verify((Object)vertex2.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).notifyCheckpointComplete(Mockito.eq((long)checkpointIdNew), Mockito.eq((long)timestampNew));
        coord.shutdown(JobStatus.FINISHED);
    }

    @Test
    public void testSavepointsAreNotSubsumed() throws Exception {
        JobID jid = new JobID();
        long timestamp = System.currentTimeMillis();
        ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
        ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
        ExecutionVertex vertex1 = CheckpointCoordinatorTest.mockExecutionVertex(attemptID1);
        ExecutionVertex vertex2 = CheckpointCoordinatorTest.mockExecutionVertex(attemptID2);
        StandaloneCheckpointIDCounter counter = new StandaloneCheckpointIDCounter();
        CheckpointCoordinator coord = new CheckpointCoordinator(jid, 600000L, 600000L, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), new ExecutionVertex[]{vertex1, vertex2}, new ExecutionVertex[]{vertex1, vertex2}, new ExecutionVertex[]{vertex1, vertex2}, (CheckpointIDCounter)counter, (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(10), null, Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
        String savepointDir = this.tmpFolder.newFolder().getAbsolutePath();
        CompletableFuture savepointFuture1 = coord.triggerSavepoint(timestamp, savepointDir);
        long savepointId1 = counter.getLast();
        Assert.assertEquals((long)1L, (long)coord.getNumberOfPendingCheckpoints());
        Assert.assertTrue((boolean)coord.triggerCheckpoint(timestamp + 1L, false));
        Assert.assertEquals((long)2L, (long)coord.getNumberOfPendingCheckpoints());
        Assert.assertTrue((boolean)coord.triggerCheckpoint(timestamp + 2L, false));
        long checkpointId2 = counter.getLast();
        Assert.assertEquals((long)3L, (long)coord.getNumberOfPendingCheckpoints());
        coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId2));
        coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId2));
        Assert.assertEquals((long)1L, (long)coord.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)1L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertFalse((boolean)((PendingCheckpoint)coord.getPendingCheckpoints().get(savepointId1)).isDiscarded());
        Assert.assertFalse((boolean)savepointFuture1.isDone());
        Assert.assertTrue((boolean)coord.triggerCheckpoint(timestamp + 3L, false));
        long checkpointId3 = counter.getLast();
        Assert.assertEquals((long)2L, (long)coord.getNumberOfPendingCheckpoints());
        CompletableFuture savepointFuture2 = coord.triggerSavepoint(timestamp + 4L, savepointDir);
        long savepointId2 = counter.getLast();
        Assert.assertEquals((long)3L, (long)coord.getNumberOfPendingCheckpoints());
        coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, savepointId2));
        coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, savepointId2));
        Assert.assertEquals((long)2L, (long)coord.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)1L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertFalse((boolean)((PendingCheckpoint)coord.getPendingCheckpoints().get(savepointId1)).isDiscarded());
        Assert.assertFalse((boolean)savepointFuture1.isDone());
        Assert.assertTrue((boolean)savepointFuture2.isDone());
        coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, savepointId1));
        coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, savepointId1));
        Assert.assertEquals((long)1L, (long)coord.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)1L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
        Assert.assertTrue((boolean)savepointFuture1.isDone());
        coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId3));
        coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId3));
        Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)2L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
    }

    private void testMaxConcurrentAttempts(int maxConcurrentAttempts) {
        try {
            JobID jid = new JobID();
            ExecutionAttemptID triggerAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID commitAttemptID = new ExecutionAttemptID();
            ExecutionVertex triggerVertex = CheckpointCoordinatorTest.mockExecutionVertex(triggerAttemptID);
            ExecutionVertex ackVertex = CheckpointCoordinatorTest.mockExecutionVertex(ackAttemptID);
            ExecutionVertex commitVertex = CheckpointCoordinatorTest.mockExecutionVertex(commitAttemptID);
            AtomicInteger numCalls = new AtomicInteger();
            Execution execution = triggerVertex.getCurrentExecutionAttempt();
            ((Execution)Mockito.doAnswer(invocation -> {
                numCalls.incrementAndGet();
                return null;
            }).when((Object)execution)).triggerCheckpoint(Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            ((Execution)Mockito.doAnswer(invocation -> {
                numCalls.incrementAndGet();
                return null;
            }).when((Object)execution)).notifyCheckpointComplete(Matchers.anyLong(), Matchers.anyLong());
            CheckpointCoordinator coord = new CheckpointCoordinator(jid, 10L, 200000L, 0L, maxConcurrentAttempts, ExternalizedCheckpointSettings.none(), new ExecutionVertex[]{triggerVertex}, new ExecutionVertex[]{ackVertex}, new ExecutionVertex[]{commitVertex}, (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2), null, Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
            coord.startCheckpointScheduler();
            long now = System.currentTimeMillis();
            long timeout = now + 60000L;
            long minDuration = now + 100L;
            do {
                Thread.sleep(20L);
            } while ((now = System.currentTimeMillis()) < minDuration || numCalls.get() < maxConcurrentAttempts && now < timeout);
            Assert.assertEquals((long)maxConcurrentAttempts, (long)numCalls.get());
            ((Execution)Mockito.verify((Object)triggerVertex.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)maxConcurrentAttempts))).triggerCheckpoint(Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions)Matchers.any(CheckpointOptions.class));
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, 1L));
            now = System.currentTimeMillis();
            timeout = now + 60000L;
            do {
                Thread.sleep(20L);
            } while (numCalls.get() < maxConcurrentAttempts + 1 && now < timeout);
            Assert.assertEquals((long)(maxConcurrentAttempts + 1), (long)numCalls.get());
            Thread.sleep(200L);
            Assert.assertEquals((long)(maxConcurrentAttempts + 1), (long)numCalls.get());
            coord.shutdown(JobStatus.FINISHED);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testMaxConcurrentAttempsWithSubsumption() {
        try {
            int maxConcurrentAttempts = 2;
            JobID jid = new JobID();
            ExecutionAttemptID triggerAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID commitAttemptID = new ExecutionAttemptID();
            ExecutionVertex triggerVertex = CheckpointCoordinatorTest.mockExecutionVertex(triggerAttemptID);
            ExecutionVertex ackVertex = CheckpointCoordinatorTest.mockExecutionVertex(ackAttemptID);
            ExecutionVertex commitVertex = CheckpointCoordinatorTest.mockExecutionVertex(commitAttemptID);
            CheckpointCoordinator coord = new CheckpointCoordinator(jid, 10L, 200000L, 0L, 2, ExternalizedCheckpointSettings.none(), new ExecutionVertex[]{triggerVertex}, new ExecutionVertex[]{ackVertex}, new ExecutionVertex[]{commitVertex}, (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2), null, Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
            coord.startCheckpointScheduler();
            long now = System.currentTimeMillis();
            long timeout = now + 60000L;
            long minDuration = now + 100L;
            do {
                Thread.sleep(20L);
            } while ((now = System.currentTimeMillis()) < minDuration || coord.getNumberOfPendingCheckpoints() < 2 && now < timeout);
            Assert.assertEquals((long)2L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertNotNull(coord.getPendingCheckpoints().get(1L));
            Assert.assertNotNull(coord.getPendingCheckpoints().get(2L));
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, 2L));
            long newTimeout = System.currentTimeMillis() + 60000L;
            do {
                Thread.sleep(20L);
            } while (coord.getPendingCheckpoints().get(4L) == null && System.currentTimeMillis() < newTimeout);
            Assert.assertEquals((long)2L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertNotNull(coord.getPendingCheckpoints().get(3L));
            Assert.assertNotNull(coord.getPendingCheckpoints().get(4L));
            coord.shutdown(JobStatus.FINISHED);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testPeriodicSchedulingWithInactiveTasks() {
        try {
            JobID jid = new JobID();
            ExecutionAttemptID triggerAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID commitAttemptID = new ExecutionAttemptID();
            ExecutionVertex triggerVertex = CheckpointCoordinatorTest.mockExecutionVertex(triggerAttemptID);
            ExecutionVertex ackVertex = CheckpointCoordinatorTest.mockExecutionVertex(ackAttemptID);
            ExecutionVertex commitVertex = CheckpointCoordinatorTest.mockExecutionVertex(commitAttemptID);
            AtomicReference<ExecutionState> currentState = new AtomicReference<ExecutionState>(ExecutionState.CREATED);
            Mockito.when((Object)triggerVertex.getCurrentExecutionAttempt().getState()).thenAnswer(invocation -> (ExecutionState)currentState.get());
            CheckpointCoordinator coord = new CheckpointCoordinator(jid, 10L, 200000L, 0L, 2, ExternalizedCheckpointSettings.none(), new ExecutionVertex[]{triggerVertex}, new ExecutionVertex[]{ackVertex}, new ExecutionVertex[]{commitVertex}, (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2), null, Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
            coord.startCheckpointScheduler();
            Thread.sleep(200L);
            Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
            currentState.set(ExecutionState.RUNNING);
            long timeout = System.currentTimeMillis() + 10000L;
            do {
                Thread.sleep(20L);
            } while (System.currentTimeMillis() < timeout && coord.getNumberOfPendingCheckpoints() == 0);
            Assert.assertTrue((coord.getNumberOfPendingCheckpoints() > 0 ? 1 : 0) != 0);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testConcurrentSavepoints() throws Exception {
        JobID jobId = new JobID();
        ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
        ExecutionVertex vertex1 = CheckpointCoordinatorTest.mockExecutionVertex(attemptID1);
        StandaloneCheckpointIDCounter checkpointIDCounter = new StandaloneCheckpointIDCounter();
        CheckpointCoordinator coord = new CheckpointCoordinator(jobId, 100000L, 200000L, 0L, 1, ExternalizedCheckpointSettings.none(), new ExecutionVertex[]{vertex1}, new ExecutionVertex[]{vertex1}, new ExecutionVertex[]{vertex1}, (CheckpointIDCounter)checkpointIDCounter, (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2), null, Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
        ArrayList<CompletableFuture> savepointFutures = new ArrayList<CompletableFuture>();
        int numSavepoints = 5;
        String savepointDir = this.tmpFolder.newFolder().getAbsolutePath();
        for (int i = 0; i < numSavepoints; ++i) {
            savepointFutures.add(coord.triggerSavepoint((long)i, savepointDir));
        }
        for (CompletableFuture savepointFuture : savepointFutures) {
            Assert.assertFalse((boolean)savepointFuture.isDone());
        }
        long checkpointId = checkpointIDCounter.getLast();
        int i = 0;
        while (i < numSavepoints) {
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, checkpointId));
            ++i;
            --checkpointId;
        }
        for (CompletableFuture savepointFuture : savepointFutures) {
            Assert.assertTrue((boolean)savepointFuture.isDone());
        }
    }

    @Test
    public void testMinDelayBetweenSavepoints() throws Exception {
        JobID jobId = new JobID();
        ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
        ExecutionVertex vertex1 = CheckpointCoordinatorTest.mockExecutionVertex(attemptID1);
        CheckpointCoordinator coord = new CheckpointCoordinator(jobId, 100000L, 200000L, 100000000L, 1, ExternalizedCheckpointSettings.none(), new ExecutionVertex[]{vertex1}, new ExecutionVertex[]{vertex1}, new ExecutionVertex[]{vertex1}, (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2), null, Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
        String savepointDir = this.tmpFolder.newFolder().getAbsolutePath();
        CompletableFuture savepoint0 = coord.triggerSavepoint(0L, savepointDir);
        Assert.assertFalse((String)"Did not trigger savepoint", (boolean)savepoint0.isDone());
        CompletableFuture savepoint1 = coord.triggerSavepoint(1L, savepointDir);
        Assert.assertFalse((String)"Did not trigger savepoint", (boolean)savepoint1.isDone());
    }

    @Test
    public void testRestoreLatestCheckpointedState() throws Exception {
        AcknowledgeCheckpoint acknowledgeCheckpoint;
        TaskStateSnapshot subtaskState;
        int index;
        JobID jid = new JobID();
        long timestamp = System.currentTimeMillis();
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        int parallelism1 = 3;
        int parallelism2 = 2;
        int maxParallelism1 = 42;
        int maxParallelism2 = 13;
        ExecutionJobVertex jobVertex1 = CheckpointCoordinatorTest.mockExecutionJobVertex(jobVertexID1, parallelism1, maxParallelism1);
        ExecutionJobVertex jobVertex2 = CheckpointCoordinatorTest.mockExecutionJobVertex(jobVertexID2, parallelism2, maxParallelism2);
        ArrayList<ExecutionVertex> allExecutionVertices = new ArrayList<ExecutionVertex>(parallelism1 + parallelism2);
        allExecutionVertices.addAll(Arrays.asList(jobVertex1.getTaskVertices()));
        allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
        ExecutionVertex[] arrayExecutionVertices = allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
        RecoverableCompletedCheckpointStore store = new RecoverableCompletedCheckpointStore();
        CheckpointCoordinator coord = new CheckpointCoordinator(jid, 600000L, 600000L, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), arrayExecutionVertices, arrayExecutionVertices, arrayExecutionVertices, (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)store, null, Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
        coord.triggerCheckpoint(timestamp, false);
        Assert.assertTrue((coord.getPendingCheckpoints().keySet().size() == 1 ? 1 : 0) != 0);
        long checkpointId = (Long)Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
        List keyGroupPartitions1 = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism1, (int)parallelism1);
        List keyGroupPartitions2 = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism2, (int)parallelism2);
        for (index = 0; index < jobVertex1.getParallelism(); ++index) {
            subtaskState = CheckpointCoordinatorTest.mockSubtaskState(jobVertexID1, index, (KeyGroupRange)keyGroupPartitions1.get(index));
            acknowledgeCheckpoint = new AcknowledgeCheckpoint(jid, jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), checkpointId, new CheckpointMetrics(), subtaskState);
            coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
        }
        for (index = 0; index < jobVertex2.getParallelism(); ++index) {
            subtaskState = CheckpointCoordinatorTest.mockSubtaskState(jobVertexID2, index, (KeyGroupRange)keyGroupPartitions2.get(index));
            acknowledgeCheckpoint = new AcknowledgeCheckpoint(jid, jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), checkpointId, new CheckpointMetrics(), subtaskState);
            coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
        }
        List completedCheckpoints = coord.getSuccessfulCheckpoints();
        Assert.assertEquals((long)1L, (long)completedCheckpoints.size());
        store.shutdown(JobStatus.SUSPENDED);
        HashMap<JobVertexID, ExecutionJobVertex> tasks = new HashMap<JobVertexID, ExecutionJobVertex>();
        tasks.put(jobVertexID1, jobVertex1);
        tasks.put(jobVertexID2, jobVertex2);
        coord.restoreLatestCheckpointedState(tasks, true, false);
        for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) {
            for (OperatorState taskState : completedCheckpoint.getOperatorStates().values()) {
                for (OperatorSubtaskState subtaskState2 : taskState.getStates()) {
                    ((OperatorSubtaskState)Mockito.verify((Object)subtaskState2, (VerificationMode)Mockito.times((int)2))).registerSharedStates((SharedStateRegistry)Matchers.any(SharedStateRegistry.class));
                }
            }
        }
        CheckpointCoordinatorTest.verifyStateRestore(jobVertexID1, jobVertex1, keyGroupPartitions1);
        CheckpointCoordinatorTest.verifyStateRestore(jobVertexID2, jobVertex2, keyGroupPartitions2);
    }

    @Test(expected=IllegalStateException.class)
    public void testRestoreLatestCheckpointFailureWhenMaxParallelismChanges() throws Exception {
        AcknowledgeCheckpoint acknowledgeCheckpoint;
        TaskStateSnapshot taskOperatorSubtaskStates;
        OperatorSubtaskState operatorSubtaskState;
        KeyGroupsStateHandle keyGroupState;
        int index;
        JobID jid = new JobID();
        long timestamp = System.currentTimeMillis();
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        int parallelism1 = 3;
        int parallelism2 = 2;
        int maxParallelism1 = 42;
        int maxParallelism2 = 13;
        ExecutionJobVertex jobVertex1 = CheckpointCoordinatorTest.mockExecutionJobVertex(jobVertexID1, parallelism1, maxParallelism1);
        ExecutionJobVertex jobVertex2 = CheckpointCoordinatorTest.mockExecutionJobVertex(jobVertexID2, parallelism2, maxParallelism2);
        ArrayList<ExecutionVertex> allExecutionVertices = new ArrayList<ExecutionVertex>(parallelism1 + parallelism2);
        allExecutionVertices.addAll(Arrays.asList(jobVertex1.getTaskVertices()));
        allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
        ExecutionVertex[] arrayExecutionVertices = allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
        CheckpointCoordinator coord = new CheckpointCoordinator(jid, 600000L, 600000L, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), arrayExecutionVertices, arrayExecutionVertices, arrayExecutionVertices, (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(1), null, Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
        coord.triggerCheckpoint(timestamp, false);
        Assert.assertTrue((coord.getPendingCheckpoints().keySet().size() == 1 ? 1 : 0) != 0);
        long checkpointId = (Long)Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
        List keyGroupPartitions1 = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism1, (int)parallelism1);
        List keyGroupPartitions2 = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism2, (int)parallelism2);
        for (index = 0; index < jobVertex1.getParallelism(); ++index) {
            keyGroupState = CheckpointCoordinatorTest.generateKeyGroupState(jobVertexID1, (KeyGroupRange)keyGroupPartitions1.get(index), false);
            operatorSubtaskState = new OperatorSubtaskState(null, null, (KeyedStateHandle)keyGroupState, null);
            taskOperatorSubtaskStates = new TaskStateSnapshot();
            taskOperatorSubtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID((JobVertexID)jobVertexID1), operatorSubtaskState);
            acknowledgeCheckpoint = new AcknowledgeCheckpoint(jid, jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates);
            coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
        }
        for (index = 0; index < jobVertex2.getParallelism(); ++index) {
            keyGroupState = CheckpointCoordinatorTest.generateKeyGroupState(jobVertexID2, (KeyGroupRange)keyGroupPartitions2.get(index), false);
            operatorSubtaskState = new OperatorSubtaskState(null, null, (KeyedStateHandle)keyGroupState, null);
            taskOperatorSubtaskStates = new TaskStateSnapshot();
            taskOperatorSubtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID((JobVertexID)jobVertexID2), operatorSubtaskState);
            acknowledgeCheckpoint = new AcknowledgeCheckpoint(jid, jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates);
            coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
        }
        List completedCheckpoints = coord.getSuccessfulCheckpoints();
        Assert.assertEquals((long)1L, (long)completedCheckpoints.size());
        HashMap<JobVertexID, ExecutionJobVertex> tasks = new HashMap<JobVertexID, ExecutionJobVertex>();
        int newMaxParallelism1 = 20;
        int newMaxParallelism2 = 42;
        ExecutionJobVertex newJobVertex1 = CheckpointCoordinatorTest.mockExecutionJobVertex(jobVertexID1, parallelism1, newMaxParallelism1);
        ExecutionJobVertex newJobVertex2 = CheckpointCoordinatorTest.mockExecutionJobVertex(jobVertexID2, parallelism2, newMaxParallelism2);
        tasks.put(jobVertexID1, newJobVertex1);
        tasks.put(jobVertexID2, newJobVertex2);
        coord.restoreLatestCheckpointedState(tasks, true, false);
        Assert.fail((String)"The restoration should have failed because the max parallelism changed.");
    }

    @Test
    public void testRestoreLatestCheckpointedStateScaleIn() throws Exception {
        this.testRestoreLatestCheckpointedStateWithChangingParallelism(false);
    }

    @Test
    public void testRestoreLatestCheckpointedStateScaleOut() throws Exception {
        this.testRestoreLatestCheckpointedStateWithChangingParallelism(false);
    }

    @Test
    public void testStateRecoveryWhenTopologyChangeOut() throws Exception {
        this.testStateRecoveryWithTopologyChange(0);
    }

    @Test
    public void testStateRecoveryWhenTopologyChangeIn() throws Exception {
        this.testStateRecoveryWithTopologyChange(1);
    }

    @Test
    public void testStateRecoveryWhenTopologyChange() throws Exception {
        this.testStateRecoveryWithTopologyChange(2);
    }

    private void testRestoreLatestCheckpointedStateWithChangingParallelism(boolean scaleOut) throws Exception {
        JobID jid = new JobID();
        long timestamp = System.currentTimeMillis();
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        int parallelism1 = 3;
        int parallelism2 = scaleOut ? 2 : 13;
        int maxParallelism1 = 42;
        int maxParallelism2 = 13;
        int newParallelism2 = scaleOut ? 13 : 2;
        ExecutionJobVertex jobVertex1 = CheckpointCoordinatorTest.mockExecutionJobVertex(jobVertexID1, parallelism1, maxParallelism1);
        ExecutionJobVertex jobVertex2 = CheckpointCoordinatorTest.mockExecutionJobVertex(jobVertexID2, parallelism2, maxParallelism2);
        ArrayList<ExecutionVertex> allExecutionVertices = new ArrayList<ExecutionVertex>(parallelism1 + parallelism2);
        allExecutionVertices.addAll(Arrays.asList(jobVertex1.getTaskVertices()));
        allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
        ExecutionVertex[] arrayExecutionVertices = allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
        CheckpointCoordinator coord = new CheckpointCoordinator(jid, 600000L, 600000L, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), arrayExecutionVertices, arrayExecutionVertices, arrayExecutionVertices, (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(1), null, Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
        coord.triggerCheckpoint(timestamp, false);
        Assert.assertTrue((coord.getPendingCheckpoints().keySet().size() == 1 ? 1 : 0) != 0);
        long checkpointId = (Long)Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
        List keyGroupPartitions1 = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism1, (int)parallelism1);
        List keyGroupPartitions2 = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism2, (int)parallelism2);
        for (int index = 0; index < jobVertex1.getParallelism(); ++index) {
            OperatorStateHandle opStateBackend = CheckpointCoordinatorTest.generatePartitionableStateHandle(jobVertexID1, index, 2, 8, false);
            KeyGroupsStateHandle keyedStateBackend = CheckpointCoordinatorTest.generateKeyGroupState(jobVertexID1, (KeyGroupRange)keyGroupPartitions1.get(index), false);
            KeyGroupsStateHandle keyedStateRaw = CheckpointCoordinatorTest.generateKeyGroupState(jobVertexID1, (KeyGroupRange)keyGroupPartitions1.get(index), true);
            OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState(opStateBackend, null, (KeyedStateHandle)keyedStateBackend, (KeyedStateHandle)keyedStateRaw);
            TaskStateSnapshot taskOperatorSubtaskStates = new TaskStateSnapshot();
            taskOperatorSubtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID((JobVertexID)jobVertexID1), operatorSubtaskState);
            AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(jid, jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates);
            coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
        }
        ArrayList<ChainedStateHandle<OperatorStateHandle>> expectedOpStatesBackend = new ArrayList<ChainedStateHandle<OperatorStateHandle>>(jobVertex2.getParallelism());
        ArrayList<ChainedStateHandle<OperatorStateHandle>> expectedOpStatesRaw = new ArrayList<ChainedStateHandle<OperatorStateHandle>>(jobVertex2.getParallelism());
        for (int index = 0; index < jobVertex2.getParallelism(); ++index) {
            KeyGroupsStateHandle keyedStateBackend = CheckpointCoordinatorTest.generateKeyGroupState(jobVertexID2, (KeyGroupRange)keyGroupPartitions2.get(index), false);
            KeyGroupsStateHandle keyedStateRaw = CheckpointCoordinatorTest.generateKeyGroupState(jobVertexID2, (KeyGroupRange)keyGroupPartitions2.get(index), true);
            OperatorStateHandle opStateBackend = CheckpointCoordinatorTest.generatePartitionableStateHandle(jobVertexID2, index, 2, 8, false);
            OperatorStateHandle opStateRaw = CheckpointCoordinatorTest.generatePartitionableStateHandle(jobVertexID2, index, 2, 8, true);
            expectedOpStatesBackend.add(new ChainedStateHandle(Collections.singletonList(opStateBackend)));
            expectedOpStatesRaw.add(new ChainedStateHandle(Collections.singletonList(opStateRaw)));
            OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState(opStateBackend, opStateRaw, (KeyedStateHandle)keyedStateBackend, (KeyedStateHandle)keyedStateRaw);
            TaskStateSnapshot taskOperatorSubtaskStates = new TaskStateSnapshot();
            taskOperatorSubtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID((JobVertexID)jobVertexID2), operatorSubtaskState);
            AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(jid, jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates);
            coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
        }
        List completedCheckpoints = coord.getSuccessfulCheckpoints();
        Assert.assertEquals((long)1L, (long)completedCheckpoints.size());
        HashMap<JobVertexID, ExecutionJobVertex> tasks = new HashMap<JobVertexID, ExecutionJobVertex>();
        List newKeyGroupPartitions2 = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism2, (int)newParallelism2);
        ExecutionJobVertex newJobVertex1 = CheckpointCoordinatorTest.mockExecutionJobVertex(jobVertexID1, parallelism1, maxParallelism1);
        ExecutionJobVertex newJobVertex2 = CheckpointCoordinatorTest.mockExecutionJobVertex(jobVertexID2, newParallelism2, maxParallelism2);
        tasks.put(jobVertexID1, newJobVertex1);
        tasks.put(jobVertexID2, newJobVertex2);
        coord.restoreLatestCheckpointedState(tasks, true, false);
        CheckpointCoordinatorTest.verifyStateRestore(jobVertexID1, newJobVertex1, keyGroupPartitions1);
        ArrayList<List<Collection<OperatorStateHandle>>> actualOpStatesBackend = new ArrayList<List<Collection<OperatorStateHandle>>>(newJobVertex2.getParallelism());
        ArrayList<List<Collection<OperatorStateHandle>>> actualOpStatesRaw = new ArrayList<List<Collection<OperatorStateHandle>>>(newJobVertex2.getParallelism());
        for (int i = 0; i < newJobVertex2.getParallelism(); ++i) {
            List operatorIDs = newJobVertex2.getOperatorIDs();
            KeyGroupsStateHandle originalKeyedStateBackend = CheckpointCoordinatorTest.generateKeyGroupState(jobVertexID2, (KeyGroupRange)newKeyGroupPartitions2.get(i), false);
            KeyGroupsStateHandle originalKeyedStateRaw = CheckpointCoordinatorTest.generateKeyGroupState(jobVertexID2, (KeyGroupRange)newKeyGroupPartitions2.get(i), true);
            TaskStateSnapshot taskStateHandles = newJobVertex2.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskStateSnapshot();
            int headOpIndex = operatorIDs.size() - 1;
            ArrayList<Collection> allParallelManagedOpStates = new ArrayList<Collection>(operatorIDs.size());
            ArrayList<Collection> allParallelRawOpStates = new ArrayList<Collection>(operatorIDs.size());
            for (int idx = 0; idx < operatorIDs.size(); ++idx) {
                OperatorID operatorID = (OperatorID)operatorIDs.get(idx);
                OperatorSubtaskState opState = taskStateHandles.getSubtaskStateByOperatorID(operatorID);
                Collection opStateBackend = opState.getManagedOperatorState();
                Collection opStateRaw = opState.getRawOperatorState();
                allParallelManagedOpStates.add(opStateBackend);
                allParallelRawOpStates.add(opStateRaw);
                if (idx != headOpIndex) continue;
                Collection keyedStateBackend = opState.getManagedKeyedState();
                Collection keyGroupStateRaw = opState.getRawKeyedState();
                CheckpointCoordinatorTest.compareKeyedState(Collections.singletonList(originalKeyedStateBackend), keyedStateBackend);
                CheckpointCoordinatorTest.compareKeyedState(Collections.singletonList(originalKeyedStateRaw), keyGroupStateRaw);
            }
            actualOpStatesBackend.add(allParallelManagedOpStates);
            actualOpStatesRaw.add(allParallelRawOpStates);
        }
        CheckpointCoordinatorTest.comparePartitionableState(expectedOpStatesBackend, actualOpStatesBackend);
        CheckpointCoordinatorTest.comparePartitionableState(expectedOpStatesRaw, actualOpStatesRaw);
    }

    private static Tuple2<JobVertexID, OperatorID> generateIDPair() {
        JobVertexID jobVertexID = new JobVertexID();
        OperatorID operatorID = OperatorID.fromJobVertexID((JobVertexID)jobVertexID);
        return new Tuple2((Object)jobVertexID, (Object)operatorID);
    }

    public void testStateRecoveryWithTopologyChange(int scaleType) throws Exception {
        Tuple2<JobVertexID, OperatorID> id1 = CheckpointCoordinatorTest.generateIDPair();
        Tuple2<JobVertexID, OperatorID> id2 = CheckpointCoordinatorTest.generateIDPair();
        int parallelism1 = 10;
        int maxParallelism1 = 64;
        Tuple2<JobVertexID, OperatorID> id3 = CheckpointCoordinatorTest.generateIDPair();
        Tuple2<JobVertexID, OperatorID> id4 = CheckpointCoordinatorTest.generateIDPair();
        int parallelism2 = 10;
        int maxParallelism2 = 64;
        List keyGroupPartitions2 = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism2, (int)parallelism2);
        HashMap<Object, OperatorState> operatorStates = new HashMap<Object, OperatorState>();
        for (Tuple2 id : Arrays.asList(id1, id2)) {
            OperatorState taskState = new OperatorState((OperatorID)id.f1, parallelism1, maxParallelism1);
            operatorStates.put(id.f1, taskState);
            for (int index = 0; index < taskState.getParallelism(); ++index) {
                OperatorStateHandle subManagedOperatorState = CheckpointCoordinatorTest.generatePartitionableStateHandle((JobVertexID)id.f0, index, 2, 8, false);
                OperatorStateHandle subRawOperatorState = CheckpointCoordinatorTest.generatePartitionableStateHandle((JobVertexID)id.f0, index, 2, 8, true);
                OperatorSubtaskState subtaskState = new OperatorSubtaskState(subManagedOperatorState, subRawOperatorState, null, null);
                taskState.putState(index, subtaskState);
            }
        }
        ArrayList expectedManagedOperatorStates = new ArrayList();
        ArrayList expectedRawOperatorStates = new ArrayList();
        for (Tuple2 id : Arrays.asList(id3, id4)) {
            OperatorState operatorState = new OperatorState((OperatorID)id.f1, parallelism2, maxParallelism2);
            operatorStates.put(id.f1, operatorState);
            ArrayList<ChainedStateHandle> expectedManagedOperatorState = new ArrayList<ChainedStateHandle>();
            ArrayList<ChainedStateHandle> expectedRawOperatorState = new ArrayList<ChainedStateHandle>();
            expectedManagedOperatorStates.add(expectedManagedOperatorState);
            expectedRawOperatorStates.add(expectedRawOperatorState);
            for (int index = 0; index < operatorState.getParallelism(); ++index) {
                OperatorStateHandle subManagedOperatorState = (OperatorStateHandle)CheckpointCoordinatorTest.generateChainedPartitionableStateHandle((JobVertexID)id.f0, index, 2, 8, false).get(0);
                OperatorStateHandle subRawOperatorState = (OperatorStateHandle)CheckpointCoordinatorTest.generateChainedPartitionableStateHandle((JobVertexID)id.f0, index, 2, 8, true).get(0);
                KeyGroupsStateHandle subManagedKeyedState = ((JobVertexID)id.f0).equals(id3.f0) ? CheckpointCoordinatorTest.generateKeyGroupState((JobVertexID)id.f0, (KeyGroupRange)keyGroupPartitions2.get(index), false) : null;
                KeyGroupsStateHandle subRawKeyedState = ((JobVertexID)id.f0).equals(id3.f0) ? CheckpointCoordinatorTest.generateKeyGroupState((JobVertexID)id.f0, (KeyGroupRange)keyGroupPartitions2.get(index), true) : null;
                expectedManagedOperatorState.add(ChainedStateHandle.wrapSingleHandle((StateObject)subManagedOperatorState));
                expectedRawOperatorState.add(ChainedStateHandle.wrapSingleHandle((StateObject)subRawOperatorState));
                OperatorSubtaskState subtaskState = new OperatorSubtaskState(subManagedOperatorState, subRawOperatorState, (KeyedStateHandle)subManagedKeyedState, (KeyedStateHandle)subRawKeyedState);
                operatorState.putState(index, subtaskState);
            }
        }
        Tuple2<JobVertexID, OperatorID> id5 = CheckpointCoordinatorTest.generateIDPair();
        int newParallelism1 = 10;
        Tuple2<JobVertexID, OperatorID> id6 = CheckpointCoordinatorTest.generateIDPair();
        int newParallelism2 = parallelism2;
        if (scaleType == 0) {
            newParallelism2 = 20;
        } else if (scaleType == 1) {
            newParallelism2 = 8;
        }
        List newKeyGroupPartitions2 = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism2, (int)newParallelism2);
        ExecutionJobVertex newJobVertex1 = CheckpointCoordinatorTest.mockExecutionJobVertex((JobVertexID)id5.f0, Arrays.asList((OperatorID)id2.f1, (OperatorID)id1.f1, (OperatorID)id5.f1), newParallelism1, maxParallelism1);
        ExecutionJobVertex newJobVertex2 = CheckpointCoordinatorTest.mockExecutionJobVertex((JobVertexID)id3.f0, Arrays.asList((OperatorID)id6.f1, (OperatorID)id3.f1), newParallelism2, maxParallelism2);
        HashMap<Object, ExecutionJobVertex> tasks = new HashMap<Object, ExecutionJobVertex>();
        tasks.put(id5.f0, newJobVertex1);
        tasks.put(id3.f0, newJobVertex2);
        JobID jobID = new JobID();
        StandaloneCompletedCheckpointStore standaloneCompletedCheckpointStore = (StandaloneCompletedCheckpointStore)Mockito.spy((Object)new StandaloneCompletedCheckpointStore(1));
        CompletedCheckpoint completedCheckpoint = new CompletedCheckpoint(jobID, 2L, System.currentTimeMillis(), System.currentTimeMillis() + 3000L, operatorStates, Collections.emptyList(), CheckpointProperties.forStandardCheckpoint(), null, null);
        Mockito.when((Object)standaloneCompletedCheckpointStore.getLatestCheckpoint()).thenReturn((Object)completedCheckpoint);
        CheckpointCoordinator coord = new CheckpointCoordinator(new JobID(), 600000L, 600000L, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), newJobVertex1.getTaskVertices(), newJobVertex1.getTaskVertices(), newJobVertex1.getTaskVertices(), (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)standaloneCompletedCheckpointStore, null, Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
        coord.restoreLatestCheckpointedState(tasks, false, true);
        for (int i = 0; i < newJobVertex1.getParallelism(); ++i) {
            List operatorIds = newJobVertex1.getOperatorIDs();
            TaskStateSnapshot stateSnapshot = newJobVertex1.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskStateSnapshot();
            OperatorSubtaskState headOpState = stateSnapshot.getSubtaskStateByOperatorID((OperatorID)operatorIds.get(operatorIds.size() - 1));
            Assert.assertTrue((boolean)headOpState.getManagedKeyedState().isEmpty());
            Assert.assertTrue((boolean)headOpState.getRawKeyedState().isEmpty());
            int operatorIndexInChain = 2;
            OperatorSubtaskState opState = stateSnapshot.getSubtaskStateByOperatorID((OperatorID)operatorIds.get(operatorIndexInChain));
            Assert.assertTrue((boolean)opState.getManagedOperatorState().isEmpty());
            Assert.assertTrue((boolean)opState.getRawOperatorState().isEmpty());
            operatorIndexInChain = 1;
            opState = stateSnapshot.getSubtaskStateByOperatorID((OperatorID)operatorIds.get(operatorIndexInChain));
            OperatorStateHandle expectedManagedOpState = CheckpointCoordinatorTest.generatePartitionableStateHandle((JobVertexID)id1.f0, i, 2, 8, false);
            OperatorStateHandle expectedRawOpState = CheckpointCoordinatorTest.generatePartitionableStateHandle((JobVertexID)id1.f0, i, 2, 8, true);
            Collection managedOperatorState = opState.getManagedOperatorState();
            Assert.assertEquals((long)1L, (long)managedOperatorState.size());
            Assert.assertTrue((boolean)CommonTestUtils.isSteamContentEqual((InputStream)expectedManagedOpState.openInputStream(), (InputStream)((OperatorStateHandle)managedOperatorState.iterator().next()).openInputStream()));
            Collection rawOperatorState = opState.getRawOperatorState();
            Assert.assertEquals((long)1L, (long)rawOperatorState.size());
            Assert.assertTrue((boolean)CommonTestUtils.isSteamContentEqual((InputStream)expectedRawOpState.openInputStream(), (InputStream)((OperatorStateHandle)rawOperatorState.iterator().next()).openInputStream()));
            operatorIndexInChain = 0;
            opState = stateSnapshot.getSubtaskStateByOperatorID((OperatorID)operatorIds.get(operatorIndexInChain));
            expectedManagedOpState = CheckpointCoordinatorTest.generatePartitionableStateHandle((JobVertexID)id2.f0, i, 2, 8, false);
            expectedRawOpState = CheckpointCoordinatorTest.generatePartitionableStateHandle((JobVertexID)id2.f0, i, 2, 8, true);
            managedOperatorState = opState.getManagedOperatorState();
            Assert.assertEquals((long)1L, (long)managedOperatorState.size());
            Assert.assertTrue((boolean)CommonTestUtils.isSteamContentEqual((InputStream)expectedManagedOpState.openInputStream(), (InputStream)((OperatorStateHandle)managedOperatorState.iterator().next()).openInputStream()));
            rawOperatorState = opState.getRawOperatorState();
            Assert.assertEquals((long)1L, (long)rawOperatorState.size());
            Assert.assertTrue((boolean)CommonTestUtils.isSteamContentEqual((InputStream)expectedRawOpState.openInputStream(), (InputStream)((OperatorStateHandle)rawOperatorState.iterator().next()).openInputStream()));
        }
        ArrayList<List<Collection<OperatorStateHandle>>> actualManagedOperatorStates = new ArrayList<List<Collection<OperatorStateHandle>>>(newJobVertex2.getParallelism());
        ArrayList<List<Collection<OperatorStateHandle>>> actualRawOperatorStates = new ArrayList<List<Collection<OperatorStateHandle>>>(newJobVertex2.getParallelism());
        for (int i = 0; i < newJobVertex2.getParallelism(); ++i) {
            List operatorIds = newJobVertex2.getOperatorIDs();
            TaskStateSnapshot stateSnapshot = newJobVertex2.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskStateSnapshot();
            int operatorIndexInChain = 1;
            OperatorSubtaskState opState = stateSnapshot.getSubtaskStateByOperatorID((OperatorID)operatorIds.get(operatorIndexInChain));
            ArrayList<Collection> actualSubManagedOperatorState = new ArrayList<Collection>(1);
            actualSubManagedOperatorState.add(opState.getManagedOperatorState());
            ArrayList<Collection> actualSubRawOperatorState = new ArrayList<Collection>(1);
            actualSubRawOperatorState.add(opState.getRawOperatorState());
            actualManagedOperatorStates.add(actualSubManagedOperatorState);
            actualRawOperatorStates.add(actualSubRawOperatorState);
            operatorIndexInChain = 0;
            opState = stateSnapshot.getSubtaskStateByOperatorID((OperatorID)operatorIds.get(operatorIndexInChain));
            Assert.assertTrue((boolean)opState.getManagedOperatorState().isEmpty());
            Assert.assertTrue((boolean)opState.getRawOperatorState().isEmpty());
            KeyGroupsStateHandle originalKeyedStateBackend = CheckpointCoordinatorTest.generateKeyGroupState((JobVertexID)id3.f0, (KeyGroupRange)newKeyGroupPartitions2.get(i), false);
            KeyGroupsStateHandle originalKeyedStateRaw = CheckpointCoordinatorTest.generateKeyGroupState((JobVertexID)id3.f0, (KeyGroupRange)newKeyGroupPartitions2.get(i), true);
            OperatorSubtaskState headOpState = stateSnapshot.getSubtaskStateByOperatorID((OperatorID)operatorIds.get(operatorIds.size() - 1));
            Collection keyedStateBackend = headOpState.getManagedKeyedState();
            Collection keyGroupStateRaw = headOpState.getRawKeyedState();
            CheckpointCoordinatorTest.compareKeyedState(Collections.singletonList(originalKeyedStateBackend), keyedStateBackend);
            CheckpointCoordinatorTest.compareKeyedState(Collections.singletonList(originalKeyedStateRaw), keyGroupStateRaw);
        }
        CheckpointCoordinatorTest.comparePartitionableState((List)expectedManagedOperatorStates.get(0), actualManagedOperatorStates);
        CheckpointCoordinatorTest.comparePartitionableState((List)expectedRawOperatorStates.get(0), actualRawOperatorStates);
    }

    @Test
    public void testExternalizedCheckpoints() throws Exception {
        try {
            JobID jid = new JobID();
            long timestamp = System.currentTimeMillis();
            ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
            ExecutionVertex vertex1 = CheckpointCoordinatorTest.mockExecutionVertex(attemptID1);
            CheckpointCoordinator coord = new CheckpointCoordinator(jid, 600000L, 600000L, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.externalizeCheckpoints((boolean)true), new ExecutionVertex[]{vertex1}, new ExecutionVertex[]{vertex1}, new ExecutionVertex[]{vertex1}, (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(1), "fake-directory", Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
            Assert.assertTrue((boolean)coord.triggerCheckpoint(timestamp, false));
            for (PendingCheckpoint checkpoint : coord.getPendingCheckpoints().values()) {
                CheckpointProperties props = checkpoint.getProps();
                CheckpointProperties expected = CheckpointProperties.forExternalizedCheckpoint((boolean)true);
                Assert.assertEquals((Object)expected, (Object)props);
            }
            coord.shutdown(JobStatus.FINISHED);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testReplicateModeStateHandle() {
        HashMap<String, OperatorStateHandle.StateMetaInfo> metaInfoMap = new HashMap<String, OperatorStateHandle.StateMetaInfo>(1);
        metaInfoMap.put("t-1", new OperatorStateHandle.StateMetaInfo(new long[]{0L, 23L}, OperatorStateHandle.Mode.BROADCAST));
        metaInfoMap.put("t-2", new OperatorStateHandle.StateMetaInfo(new long[]{42L, 64L}, OperatorStateHandle.Mode.BROADCAST));
        metaInfoMap.put("t-3", new OperatorStateHandle.StateMetaInfo(new long[]{72L, 83L}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
        OperatorStateHandle osh = new OperatorStateHandle(metaInfoMap, (StreamStateHandle)new ByteStreamStateHandle("test", new byte[100]));
        OperatorStateRepartitioner repartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE;
        List repartitionedStates = repartitioner.repartitionState(Collections.singletonList(osh), 3);
        HashMap<String, Integer> checkCounts = new HashMap<String, Integer>(3);
        for (Collection operatorStateHandles : repartitionedStates) {
            for (OperatorStateHandle operatorStateHandle : operatorStateHandles) {
                for (Map.Entry stateNameToMetaInfo : operatorStateHandle.getStateNameToPartitionOffsets().entrySet()) {
                    String stateName = (String)stateNameToMetaInfo.getKey();
                    Integer count = (Integer)checkCounts.get(stateName);
                    if (null == count) {
                        checkCounts.put(stateName, 1);
                    } else {
                        checkCounts.put(stateName, 1 + count);
                    }
                    OperatorStateHandle.StateMetaInfo stateMetaInfo = (OperatorStateHandle.StateMetaInfo)stateNameToMetaInfo.getValue();
                    if (OperatorStateHandle.Mode.SPLIT_DISTRIBUTE.equals((Object)stateMetaInfo.getDistributionMode())) {
                        Assert.assertEquals((long)1L, (long)((OperatorStateHandle.StateMetaInfo)stateNameToMetaInfo.getValue()).getOffsets().length);
                        continue;
                    }
                    Assert.assertEquals((long)2L, (long)((OperatorStateHandle.StateMetaInfo)stateNameToMetaInfo.getValue()).getOffsets().length);
                }
            }
        }
        Assert.assertEquals((long)3L, (long)checkCounts.size());
        Assert.assertEquals((long)3L, (long)((Integer)checkCounts.get("t-1")).intValue());
        Assert.assertEquals((long)3L, (long)((Integer)checkCounts.get("t-2")).intValue());
        Assert.assertEquals((long)2L, (long)((Integer)checkCounts.get("t-3")).intValue());
    }

    public static KeyGroupsStateHandle generateKeyGroupState(JobVertexID jobVertexID, KeyGroupRange keyGroupPartition, boolean rawState) throws IOException {
        ArrayList<Integer> testStatesLists = new ArrayList<Integer>(keyGroupPartition.getNumberOfKeyGroups());
        Iterator iterator = keyGroupPartition.iterator();
        while (iterator.hasNext()) {
            int keyGroupIndex = (Integer)iterator.next();
            int vertexHash = jobVertexID.hashCode();
            int seed = rawState ? vertexHash * (31 + keyGroupIndex) : vertexHash + keyGroupIndex;
            Random random = new Random(seed);
            int simulatedStateValue = random.nextInt();
            testStatesLists.add(simulatedStateValue);
        }
        return CheckpointCoordinatorTest.generateKeyGroupState(keyGroupPartition, testStatesLists);
    }

    public static KeyGroupsStateHandle generateKeyGroupState(KeyGroupRange keyGroupRange, List<? extends Serializable> states) throws IOException {
        Preconditions.checkArgument((keyGroupRange.getNumberOfKeyGroups() == states.size() ? 1 : 0) != 0);
        Tuple2<byte[], List<long[]>> serializedDataWithOffsets = CheckpointCoordinatorTest.serializeTogetherAndTrackOffsets(Collections.singletonList(states));
        KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(keyGroupRange, (long[])((List)serializedDataWithOffsets.f1).get(0));
        TestByteStreamStateHandleDeepCompare allSerializedStatesHandle = new TestByteStreamStateHandleDeepCompare(String.valueOf(UUID.randomUUID()), (byte[])serializedDataWithOffsets.f0);
        return new KeyGroupsStateHandle(keyGroupRangeOffsets, (StreamStateHandle)allSerializedStatesHandle);
    }

    public static Tuple2<byte[], List<long[]>> serializeTogetherAndTrackOffsets(List<List<? extends Serializable>> serializables) throws IOException {
        ArrayList<long[]> offsets = new ArrayList<long[]>(serializables.size());
        ArrayList<byte[]> serializedGroupValues = new ArrayList<byte[]>();
        int runningGroupsOffset = 0;
        for (List<? extends Serializable> list : serializables) {
            long[] currentOffsets = new long[list.size()];
            offsets.add(currentOffsets);
            for (int i = 0; i < list.size(); ++i) {
                currentOffsets[i] = runningGroupsOffset;
                byte[] serializedValue = InstantiationUtil.serializeObject((Object)list.get(i));
                serializedGroupValues.add(serializedValue);
                runningGroupsOffset += serializedValue.length;
            }
        }
        byte[] allSerializedValuesConcatenated = new byte[runningGroupsOffset];
        runningGroupsOffset = 0;
        for (byte[] serializedGroupValue : serializedGroupValues) {
            System.arraycopy(serializedGroupValue, 0, allSerializedValuesConcatenated, runningGroupsOffset, serializedGroupValue.length);
            runningGroupsOffset += serializedGroupValue.length;
        }
        return new Tuple2((Object)allSerializedValuesConcatenated, offsets);
    }

    public static OperatorStateHandle generatePartitionableStateHandle(JobVertexID jobVertexID, int index, int namedStates, int partitionsPerState, boolean rawState) throws IOException {
        HashMap<String, List<? extends Serializable>> statesListsMap = new HashMap<String, List<? extends Serializable>>(namedStates);
        for (int i = 0; i < namedStates; ++i) {
            ArrayList<Integer> testStatesLists = new ArrayList<Integer>(partitionsPerState);
            int seed = jobVertexID.hashCode() * index + i * namedStates;
            if (rawState) {
                seed = (seed + 1) * 31;
            }
            Random random = new Random(seed);
            for (int j = 0; j < partitionsPerState; ++j) {
                int simulatedStateValue = random.nextInt();
                testStatesLists.add(simulatedStateValue);
            }
            statesListsMap.put("state-" + i, testStatesLists);
        }
        return CheckpointCoordinatorTest.generatePartitionableStateHandle(statesListsMap);
    }

    public static ChainedStateHandle<OperatorStateHandle> generateChainedPartitionableStateHandle(JobVertexID jobVertexID, int index, int namedStates, int partitionsPerState, boolean rawState) throws IOException {
        HashMap<String, List<? extends Serializable>> statesListsMap = new HashMap<String, List<? extends Serializable>>(namedStates);
        for (int i = 0; i < namedStates; ++i) {
            ArrayList<Integer> testStatesLists = new ArrayList<Integer>(partitionsPerState);
            int seed = jobVertexID.hashCode() * index + i * namedStates;
            if (rawState) {
                seed = (seed + 1) * 31;
            }
            Random random = new Random(seed);
            for (int j = 0; j < partitionsPerState; ++j) {
                int simulatedStateValue = random.nextInt();
                testStatesLists.add(simulatedStateValue);
            }
            statesListsMap.put("state-" + i, testStatesLists);
        }
        return ChainedStateHandle.wrapSingleHandle((StateObject)CheckpointCoordinatorTest.generatePartitionableStateHandle(statesListsMap));
    }

    private static OperatorStateHandle generatePartitionableStateHandle(Map<String, List<? extends Serializable>> states) throws IOException {
        ArrayList<List<? extends Serializable>> namedStateSerializables = new ArrayList<List<? extends Serializable>>(states.size());
        for (Map.Entry<String, List<? extends Serializable>> entry : states.entrySet()) {
            namedStateSerializables.add(entry.getValue());
        }
        Tuple2<byte[], List<long[]>> serializationWithOffsets = CheckpointCoordinatorTest.serializeTogetherAndTrackOffsets(namedStateSerializables);
        HashMap<String, OperatorStateHandle.StateMetaInfo> offsetsMap = new HashMap<String, OperatorStateHandle.StateMetaInfo>(states.size());
        int idx = 0;
        for (Map.Entry<String, List<? extends Serializable>> entry : states.entrySet()) {
            offsetsMap.put(entry.getKey(), new OperatorStateHandle.StateMetaInfo((long[])((List)serializationWithOffsets.f1).get(idx), OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
            ++idx;
        }
        TestByteStreamStateHandleDeepCompare streamStateHandle = new TestByteStreamStateHandleDeepCompare(String.valueOf(UUID.randomUUID()), (byte[])serializationWithOffsets.f0);
        return new OperatorStateHandle(offsetsMap, (StreamStateHandle)streamStateHandle);
    }

    static ExecutionJobVertex mockExecutionJobVertex(JobVertexID jobVertexID, int parallelism, int maxParallelism) {
        return CheckpointCoordinatorTest.mockExecutionJobVertex(jobVertexID, Collections.singletonList(OperatorID.fromJobVertexID((JobVertexID)jobVertexID)), parallelism, maxParallelism);
    }

    static ExecutionJobVertex mockExecutionJobVertex(JobVertexID jobVertexID, List<OperatorID> jobVertexIDs, int parallelism, int maxParallelism) {
        ExecutionJobVertex executionJobVertex = (ExecutionJobVertex)Mockito.mock(ExecutionJobVertex.class);
        ExecutionVertex[] executionVertices = new ExecutionVertex[parallelism];
        for (int i = 0; i < parallelism; ++i) {
            executionVertices[i] = CheckpointCoordinatorTest.mockExecutionVertex(new ExecutionAttemptID(), jobVertexID, jobVertexIDs, parallelism, maxParallelism, ExecutionState.RUNNING, new ExecutionState[0]);
            Mockito.when((Object)executionVertices[i].getParallelSubtaskIndex()).thenReturn((Object)i);
        }
        Mockito.when((Object)executionJobVertex.getJobVertexId()).thenReturn((Object)jobVertexID);
        Mockito.when((Object)executionJobVertex.getTaskVertices()).thenReturn((Object)executionVertices);
        Mockito.when((Object)executionJobVertex.getParallelism()).thenReturn((Object)parallelism);
        Mockito.when((Object)executionJobVertex.getMaxParallelism()).thenReturn((Object)maxParallelism);
        Mockito.when((Object)executionJobVertex.isMaxParallelismConfigured()).thenReturn((Object)true);
        Mockito.when((Object)executionJobVertex.getOperatorIDs()).thenReturn(jobVertexIDs);
        Mockito.when((Object)executionJobVertex.getUserDefinedOperatorIDs()).thenReturn(Arrays.asList(new OperatorID[jobVertexIDs.size()]));
        return executionJobVertex;
    }

    static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID) {
        JobVertexID jobVertexID = new JobVertexID();
        return CheckpointCoordinatorTest.mockExecutionVertex(attemptID, jobVertexID, Collections.singletonList(OperatorID.fromJobVertexID((JobVertexID)jobVertexID)), 1, 1, ExecutionState.RUNNING, new ExecutionState[0]);
    }

    private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID, JobVertexID jobVertexID, List<OperatorID> jobVertexIDs, int parallelism, int maxParallelism, ExecutionState state, ExecutionState ... successiveStates) {
        ExecutionVertex vertex = (ExecutionVertex)Mockito.mock(ExecutionVertex.class);
        Execution exec = (Execution)Mockito.spy((Object)new Execution((Executor)Mockito.mock(Executor.class), vertex, 1, 1L, 1L, Time.milliseconds((long)500L)));
        Mockito.when((Object)exec.getAttemptId()).thenReturn((Object)attemptID);
        Mockito.when((Object)exec.getState()).thenReturn((Object)state, (Object[])successiveStates);
        Mockito.when((Object)vertex.getJobvertexId()).thenReturn((Object)jobVertexID);
        Mockito.when((Object)vertex.getCurrentExecutionAttempt()).thenReturn((Object)exec);
        Mockito.when((Object)vertex.getTotalNumberOfParallelSubtasks()).thenReturn((Object)parallelism);
        Mockito.when((Object)vertex.getMaxParallelism()).thenReturn((Object)maxParallelism);
        ExecutionJobVertex jobVertex = (ExecutionJobVertex)Mockito.mock(ExecutionJobVertex.class);
        Mockito.when((Object)jobVertex.getOperatorIDs()).thenReturn(jobVertexIDs);
        Mockito.when((Object)vertex.getJobVertex()).thenReturn((Object)jobVertex);
        return vertex;
    }

    static TaskStateSnapshot mockSubtaskState(JobVertexID jobVertexID, int index, KeyGroupRange keyGroupRange) throws IOException {
        OperatorStateHandle partitionableState = CheckpointCoordinatorTest.generatePartitionableStateHandle(jobVertexID, index, 2, 8, false);
        KeyGroupsStateHandle partitionedKeyGroupState = CheckpointCoordinatorTest.generateKeyGroupState(jobVertexID, keyGroupRange, false);
        TaskStateSnapshot subtaskStates = (TaskStateSnapshot)Mockito.spy((Object)new TaskStateSnapshot());
        OperatorSubtaskState subtaskState = (OperatorSubtaskState)Mockito.spy((Object)new OperatorSubtaskState(partitionableState, null, (KeyedStateHandle)partitionedKeyGroupState, null));
        subtaskStates.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID((JobVertexID)jobVertexID), subtaskState);
        return subtaskStates;
    }

    public static void verifyStateRestore(JobVertexID jobVertexID, ExecutionJobVertex executionJobVertex, List<KeyGroupRange> keyGroupPartitions) throws Exception {
        for (int i = 0; i < executionJobVertex.getParallelism(); ++i) {
            TaskStateSnapshot stateSnapshot = executionJobVertex.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskStateSnapshot();
            OperatorSubtaskState operatorState = stateSnapshot.getSubtaskStateByOperatorID(OperatorID.fromJobVertexID((JobVertexID)jobVertexID));
            ChainedStateHandle<OperatorStateHandle> expectedOpStateBackend = CheckpointCoordinatorTest.generateChainedPartitionableStateHandle(jobVertexID, i, 2, 8, false);
            Assert.assertTrue((boolean)CommonTestUtils.isSteamContentEqual((InputStream)((OperatorStateHandle)expectedOpStateBackend.get(0)).openInputStream(), (InputStream)((OperatorStateHandle)operatorState.getManagedOperatorState().iterator().next()).openInputStream()));
            KeyGroupsStateHandle expectPartitionedKeyGroupState = CheckpointCoordinatorTest.generateKeyGroupState(jobVertexID, keyGroupPartitions.get(i), false);
            CheckpointCoordinatorTest.compareKeyedState(Collections.singletonList(expectPartitionedKeyGroupState), operatorState.getManagedKeyedState());
        }
    }

    public static void compareKeyedState(Collection<KeyGroupsStateHandle> expectPartitionedKeyGroupState, Collection<? extends KeyedStateHandle> actualPartitionedKeyGroupState) throws Exception {
        KeyGroupsStateHandle expectedHeadOpKeyGroupStateHandle = expectPartitionedKeyGroupState.iterator().next();
        int expectedTotalKeyGroups = expectedHeadOpKeyGroupStateHandle.getKeyGroupRange().getNumberOfKeyGroups();
        int actualTotalKeyGroups = 0;
        for (KeyedStateHandle keyedStateHandle : actualPartitionedKeyGroupState) {
            Assert.assertTrue((boolean)(keyedStateHandle instanceof KeyGroupsStateHandle));
            actualTotalKeyGroups += keyedStateHandle.getKeyGroupRange().getNumberOfKeyGroups();
        }
        Assert.assertEquals((long)expectedTotalKeyGroups, (long)actualTotalKeyGroups);
        Throwable throwable = null;
        try (FSDataInputStream inputStream = expectedHeadOpKeyGroupStateHandle.openInputStream();){
            Iterator iterator = expectedHeadOpKeyGroupStateHandle.getKeyGroupRange().iterator();
            while (iterator.hasNext()) {
                int groupId = (Integer)iterator.next();
                long offset = expectedHeadOpKeyGroupStateHandle.getOffsetForKeyGroup(groupId);
                inputStream.seek(offset);
                int expectedKeyGroupState = (Integer)InstantiationUtil.deserializeObject((InputStream)inputStream, (ClassLoader)Thread.currentThread().getContextClassLoader());
                for (KeyedStateHandle keyedStateHandle : actualPartitionedKeyGroupState) {
                    Assert.assertTrue((boolean)(keyedStateHandle instanceof KeyGroupsStateHandle));
                    KeyGroupsStateHandle oneActualKeyGroupStateHandle = (KeyGroupsStateHandle)keyedStateHandle;
                    if (!oneActualKeyGroupStateHandle.getKeyGroupRange().contains(groupId)) continue;
                    long actualOffset = oneActualKeyGroupStateHandle.getOffsetForKeyGroup(groupId);
                    FSDataInputStream actualInputStream = oneActualKeyGroupStateHandle.openInputStream();
                    Throwable throwable2 = null;
                    try {
                        actualInputStream.seek(actualOffset);
                        int actualGroupState = (Integer)InstantiationUtil.deserializeObject((InputStream)actualInputStream, (ClassLoader)Thread.currentThread().getContextClassLoader());
                        Assert.assertEquals((long)expectedKeyGroupState, (long)actualGroupState);
                    }
                    catch (Throwable throwable3) {
                        throwable2 = throwable3;
                        throw throwable3;
                    }
                    finally {
                        if (actualInputStream == null) continue;
                        if (throwable2 != null) {
                            try {
                                actualInputStream.close();
                            }
                            catch (Throwable throwable4) {
                                throwable2.addSuppressed(throwable4);
                            }
                            continue;
                        }
                        actualInputStream.close();
                    }
                }
            }
        }
        catch (Throwable throwable5) {
            Throwable throwable6 = throwable5;
            throw throwable5;
        }
    }

    public static void comparePartitionableState(List<ChainedStateHandle<OperatorStateHandle>> expected, List<List<Collection<OperatorStateHandle>>> actual) throws Exception {
        ArrayList<String> expectedResult = new ArrayList<String>();
        for (ChainedStateHandle<OperatorStateHandle> chainedStateHandle : expected) {
            for (int i = 0; i < chainedStateHandle.getLength(); ++i) {
                OperatorStateHandle operatorStateHandle = (OperatorStateHandle)chainedStateHandle.get(i);
                CheckpointCoordinatorTest.collectResult(i, operatorStateHandle, expectedResult);
            }
        }
        Collections.sort(expectedResult);
        ArrayList<String> actualResult = new ArrayList<String>();
        for (List<Collection<OperatorStateHandle>> collectionList : actual) {
            if (collectionList == null) continue;
            for (int i = 0; i < collectionList.size(); ++i) {
                Collection<OperatorStateHandle> stateHandles = collectionList.get(i);
                Assert.assertNotNull(stateHandles);
                for (OperatorStateHandle operatorStateHandle : stateHandles) {
                    CheckpointCoordinatorTest.collectResult(i, operatorStateHandle, actualResult);
                }
            }
        }
        Collections.sort(actualResult);
        Assert.assertEquals(expectedResult, actualResult);
    }

    private static void collectResult(int opIdx, OperatorStateHandle operatorStateHandle, List<String> resultCollector) throws Exception {
        try (FSDataInputStream in = operatorStateHandle.openInputStream();){
            for (Map.Entry entry : operatorStateHandle.getStateNameToPartitionOffsets().entrySet()) {
                for (long offset : ((OperatorStateHandle.StateMetaInfo)entry.getValue()).getOffsets()) {
                    in.seek(offset);
                    Integer state = (Integer)InstantiationUtil.deserializeObject((InputStream)in, (ClassLoader)Thread.currentThread().getContextClassLoader());
                    resultCollector.add(opIdx + " : " + (String)entry.getKey() + " : " + state);
                }
            }
        }
    }

    @Test
    public void testCreateKeyGroupPartitions() {
        this.testCreateKeyGroupPartitions(1, 1);
        this.testCreateKeyGroupPartitions(13, 1);
        this.testCreateKeyGroupPartitions(13, 2);
        this.testCreateKeyGroupPartitions(Short.MAX_VALUE, 1);
        this.testCreateKeyGroupPartitions(Short.MAX_VALUE, 13);
        this.testCreateKeyGroupPartitions(Short.MAX_VALUE, Short.MAX_VALUE);
        Random r = new Random(1234L);
        for (int k = 0; k < 1000; ++k) {
            int maxParallelism = 1 + r.nextInt(32766);
            int parallelism = 1 + r.nextInt(maxParallelism);
            this.testCreateKeyGroupPartitions(maxParallelism, parallelism);
        }
    }

    @Test
    public void testStopPeriodicScheduler() throws Exception {
        ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
        ExecutionVertex vertex1 = CheckpointCoordinatorTest.mockExecutionVertex(attemptID1);
        CheckpointCoordinator coord = new CheckpointCoordinator(new JobID(), 600000L, 600000L, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), new ExecutionVertex[]{vertex1}, new ExecutionVertex[]{vertex1}, new ExecutionVertex[]{vertex1}, (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(1), null, Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
        CheckpointTriggerResult triggerResult = coord.triggerCheckpoint(System.currentTimeMillis(), CheckpointProperties.forStandardCheckpoint(), null, true);
        Assert.assertTrue((boolean)triggerResult.isFailure());
        Assert.assertEquals((Object)CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN, (Object)triggerResult.getFailureReason());
        triggerResult = coord.triggerCheckpoint(System.currentTimeMillis(), CheckpointProperties.forStandardCheckpoint(), null, false);
        Assert.assertFalse((boolean)triggerResult.isFailure());
    }

    private void testCreateKeyGroupPartitions(int maxParallelism, int parallelism) {
        List ranges = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism, (int)parallelism);
        for (int i = 0; i < maxParallelism; ++i) {
            KeyGroupRange range = (KeyGroupRange)ranges.get(KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup((int)maxParallelism, (int)parallelism, (int)i));
            if (range.contains(i)) continue;
            Assert.fail((String)("Could not find expected key-group " + i + " in range " + range));
        }
    }

    @Test
    public void testPartitionableStateRepartitioning() {
        Random r = new Random(42L);
        for (int run = 0; run < 10000; ++run) {
            int oldParallelism = 1 + r.nextInt(9);
            int newParallelism = 1 + r.nextInt(9);
            int numNamedStates = 1 + r.nextInt(9);
            int maxPartitionsPerState = 1 + r.nextInt(9);
            this.doTestPartitionableStateRepartitioning(r, oldParallelism, newParallelism, numNamedStates, maxPartitionsPerState);
        }
    }

    private void doTestPartitionableStateRepartitioning(Random r, int oldParallelism, int newParallelism, int numNamedStates, int maxPartitionsPerState) {
        ArrayList<OperatorStateHandle> previousParallelOpInstanceStates = new ArrayList<OperatorStateHandle>(oldParallelism);
        for (int i = 0; i < oldParallelism; ++i) {
            Path fakePath = new Path("/fake-" + i);
            HashMap namedStatesToOffsets = new HashMap();
            int off = 0;
            for (int s = 0; s < numNamedStates; ++s) {
                long[] offs = new long[1 + r.nextInt(maxPartitionsPerState)];
                for (int o = 0; o < offs.length; ++o) {
                    offs[o] = off;
                    ++off;
                }
                OperatorStateHandle.Mode mode = r.nextInt(10) == 0 ? OperatorStateHandle.Mode.BROADCAST : OperatorStateHandle.Mode.SPLIT_DISTRIBUTE;
                namedStatesToOffsets.put("State-" + s, new OperatorStateHandle.StateMetaInfo(offs, mode));
            }
            previousParallelOpInstanceStates.add(new OperatorStateHandle((Map)namedStatesToOffsets, (StreamStateHandle)new FileStateHandle(fakePath, -1L)));
        }
        HashMap expected = new HashMap();
        int expectedTotalPartitions = 0;
        for (OperatorStateHandle psh : previousParallelOpInstanceStates) {
            Map offsMap = psh.getStateNameToPartitionOffsets();
            HashMap offsMapWithList = new HashMap(offsMap.size());
            for (Map.Entry e : offsMap.entrySet()) {
                long[] offs = ((OperatorStateHandle.StateMetaInfo)e.getValue()).getOffsets();
                int replication = ((OperatorStateHandle.StateMetaInfo)e.getValue()).getDistributionMode().equals((Object)OperatorStateHandle.Mode.BROADCAST) ? newParallelism : 1;
                expectedTotalPartitions += replication * offs.length;
                ArrayList<Long> offsList = new ArrayList<Long>(offs.length);
                for (Object off : (Object)offs) {
                    for (int p = 0; p < replication; ++p) {
                        offsList.add((long)off);
                    }
                }
                offsMapWithList.put(e.getKey(), offsList);
            }
            expected.put(psh.getDelegateStateHandle(), offsMapWithList);
        }
        OperatorStateRepartitioner repartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE;
        List pshs = repartitioner.repartitionState(previousParallelOpInstanceStates, newParallelism);
        HashMap actual = new HashMap();
        int minCount = Integer.MAX_VALUE;
        int maxCount = 0;
        int actualTotalPartitions = 0;
        for (int p = 0; p < newParallelism; ++p) {
            int partitionCount = 0;
            Collection pshc = (Collection)pshs.get(p);
            for (OperatorStateHandle sh : pshc) {
                for (Map.Entry namedState : sh.getStateNameToPartitionOffsets().entrySet()) {
                    long[] add;
                    ArrayList<Long> actualOffs;
                    HashMap stateToOffsets = (HashMap)actual.get(sh.getDelegateStateHandle());
                    if (stateToOffsets == null) {
                        stateToOffsets = new HashMap();
                        actual.put(sh.getDelegateStateHandle(), stateToOffsets);
                    }
                    if ((actualOffs = (ArrayList<Long>)stateToOffsets.get(namedState.getKey())) == null) {
                        actualOffs = new ArrayList<Long>();
                        stateToOffsets.put(namedState.getKey(), actualOffs);
                    }
                    for (long l : add = ((OperatorStateHandle.StateMetaInfo)namedState.getValue()).getOffsets()) {
                        actualOffs.add(l);
                    }
                    partitionCount += ((OperatorStateHandle.StateMetaInfo)namedState.getValue()).getOffsets().length;
                }
            }
            minCount = Math.min(minCount, partitionCount);
            maxCount = Math.max(maxCount, partitionCount);
            actualTotalPartitions += partitionCount;
        }
        for (Map v : actual.values()) {
            for (List l : v.values()) {
                Collections.sort(l);
            }
        }
        int maxLoadDiff = maxCount - minCount;
        Assert.assertTrue((String)("Difference in partition load is > 1 : " + maxLoadDiff), (maxLoadDiff <= 1 ? 1 : 0) != 0);
        Assert.assertEquals((long)expectedTotalPartitions, (long)actualTotalPartitions);
        Assert.assertEquals(expected, actual);
    }

    @Test
    public void testCheckpointStatsTrackerPendingCheckpointCallback() {
        long timestamp = System.currentTimeMillis();
        ExecutionVertex vertex1 = CheckpointCoordinatorTest.mockExecutionVertex(new ExecutionAttemptID());
        CheckpointCoordinator coord = new CheckpointCoordinator(new JobID(), 600000L, 600000L, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), new ExecutionVertex[]{vertex1}, new ExecutionVertex[]{vertex1}, new ExecutionVertex[]{vertex1}, (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(1), null, Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
        CheckpointStatsTracker tracker = (CheckpointStatsTracker)Mockito.mock(CheckpointStatsTracker.class);
        coord.setCheckpointStatsTracker(tracker);
        Mockito.when((Object)tracker.reportPendingCheckpoint(Matchers.anyLong(), Matchers.anyLong(), (CheckpointProperties)Matchers.any(CheckpointProperties.class))).thenReturn(Mockito.mock(PendingCheckpointStats.class));
        Assert.assertTrue((boolean)coord.triggerCheckpoint(timestamp, false));
        ((CheckpointStatsTracker)Mockito.verify((Object)tracker, (VerificationMode)Mockito.times((int)1))).reportPendingCheckpoint(Mockito.eq((long)1L), Mockito.eq((long)timestamp), (CheckpointProperties)Mockito.eq((Object)CheckpointProperties.forStandardCheckpoint()));
    }

    @Test
    public void testCheckpointStatsTrackerRestoreCallback() throws Exception {
        ExecutionVertex vertex1 = CheckpointCoordinatorTest.mockExecutionVertex(new ExecutionAttemptID());
        StandaloneCompletedCheckpointStore store = new StandaloneCompletedCheckpointStore(1);
        CheckpointCoordinator coord = new CheckpointCoordinator(new JobID(), 600000L, 600000L, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), new ExecutionVertex[]{vertex1}, new ExecutionVertex[]{vertex1}, new ExecutionVertex[]{vertex1}, (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)store, null, Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
        store.addCheckpoint(new CompletedCheckpoint(new JobID(), 0L, 0L, 0L, Collections.emptyMap(), Collections.emptyList(), CheckpointProperties.forStandardCheckpoint(), null, null));
        CheckpointStatsTracker tracker = (CheckpointStatsTracker)Mockito.mock(CheckpointStatsTracker.class);
        coord.setCheckpointStatsTracker(tracker);
        Assert.assertTrue((boolean)coord.restoreLatestCheckpointedState(Collections.emptyMap(), false, true));
        ((CheckpointStatsTracker)Mockito.verify((Object)tracker, (VerificationMode)Mockito.times((int)1))).reportRestoredCheckpoint((RestoredCheckpointStats)Matchers.any(RestoredCheckpointStats.class));
    }

    @Test
    public void testSavepointsAreNotAddedToCompletedCheckpointStore() throws Exception {
        JobID jobId = new JobID();
        ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
        ExecutionVertex vertex1 = CheckpointCoordinatorTest.mockExecutionVertex(executionAttemptId);
        StandaloneCompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
        long checkpointTimestamp1 = 1L;
        long savepointTimestamp = 2L;
        long checkpointTimestamp2 = 3L;
        String savepointDir = this.tmpFolder.newFolder().getAbsolutePath();
        StandaloneCheckpointIDCounter checkpointIDCounter = new StandaloneCheckpointIDCounter();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobId, 600000L, 600000L, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), new ExecutionVertex[]{vertex1}, new ExecutionVertex[]{vertex1}, new ExecutionVertex[]{vertex1}, (CheckpointIDCounter)checkpointIDCounter, (CompletedCheckpointStore)completedCheckpointStore, null, Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY);
        Assert.assertTrue((String)"Triggering of a checkpoint should work.", (boolean)checkpointCoordinator.triggerCheckpoint(1L, false));
        Assert.assertTrue((0 == completedCheckpointStore.getNumberOfRetainedCheckpoints() ? 1 : 0) != 0);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, executionAttemptId, checkpointIDCounter.getLast()));
        Assert.assertTrue((1 == completedCheckpointStore.getNumberOfRetainedCheckpoints() ? 1 : 0) != 0);
        CompletableFuture savepointFuture = checkpointCoordinator.triggerSavepoint(2L, savepointDir);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, executionAttemptId, checkpointIDCounter.getLast()));
        CompletedCheckpoint savepoint = (CompletedCheckpoint)savepointFuture.get();
        Assert.assertFalse((String)"The savepoint should not have been added to the completed checkpoint store", (savepoint.getCheckpointID() == completedCheckpointStore.getLatestCheckpoint().getCheckpointID() ? 1 : 0) != 0);
        Assert.assertTrue((String)"Triggering of a checkpoint should work.", (boolean)checkpointCoordinator.triggerCheckpoint(3L, false));
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, executionAttemptId, checkpointIDCounter.getLast()));
        Assert.assertTrue((String)"The latest completed (proper) checkpoint should have been added to the completed checkpoint store.", (completedCheckpointStore.getLatestCheckpoint().getCheckpointID() == checkpointIDCounter.getLast() ? 1 : 0) != 0);
    }

    @Test
    public void testSharedStateRegistrationOnRestore() throws Exception {
        JobID jid = new JobID();
        long timestamp = System.currentTimeMillis();
        JobVertexID jobVertexID1 = new JobVertexID();
        int parallelism1 = 2;
        int maxParallelism1 = 4;
        ExecutionJobVertex jobVertex1 = CheckpointCoordinatorTest.mockExecutionJobVertex(jobVertexID1, parallelism1, maxParallelism1);
        ArrayList<ExecutionVertex> allExecutionVertices = new ArrayList<ExecutionVertex>(parallelism1);
        allExecutionVertices.addAll(Arrays.asList(jobVertex1.getTaskVertices()));
        ExecutionVertex[] arrayExecutionVertices = allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
        RecoverableCompletedCheckpointStore store = new RecoverableCompletedCheckpointStore(10);
        ArrayList createdSharedStateRegistries = new ArrayList(2);
        CheckpointCoordinator coord = new CheckpointCoordinator(jid, 600000L, 600000L, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), arrayExecutionVertices, arrayExecutionVertices, arrayExecutionVertices, (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)store, null, Executors.directExecutor(), deleteExecutor -> {
            SharedStateRegistry instance = new SharedStateRegistry(deleteExecutor);
            createdSharedStateRegistries.add(instance);
            return instance;
        });
        int numCheckpoints = 3;
        List keyGroupPartitions1 = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism1, (int)parallelism1);
        for (int i = 0; i < 3; ++i) {
            this.performIncrementalCheckpoint(jid, coord, jobVertex1, keyGroupPartitions1, timestamp + (long)i, i);
        }
        List completedCheckpoints = coord.getSuccessfulCheckpoints();
        Assert.assertEquals((long)3L, (long)completedCheckpoints.size());
        int sharedHandleCount = 0;
        ArrayList sharedHandlesByCheckpoint = new ArrayList(3);
        for (int i = 0; i < 3; ++i) {
            sharedHandlesByCheckpoint.add(new HashMap(2));
        }
        int cp = 0;
        for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) {
            for (OperatorState taskState : completedCheckpoint.getOperatorStates().values()) {
                for (Object subtaskState : taskState.getStates()) {
                    for (KeyedStateHandle keyedStateHandle : subtaskState.getManagedKeyedState()) {
                        ((KeyedStateHandle)Mockito.verify((Object)keyedStateHandle, (VerificationMode)Mockito.times((int)1))).registerSharedStates((SharedStateRegistry)createdSharedStateRegistries.get(0));
                        IncrementalKeyedStateHandle incrementalKeyedStateHandle = (IncrementalKeyedStateHandle)keyedStateHandle;
                        ((Map)sharedHandlesByCheckpoint.get(cp)).putAll(incrementalKeyedStateHandle.getSharedState());
                        for (StreamStateHandle streamStateHandle : incrementalKeyedStateHandle.getSharedState().values()) {
                            Assert.assertTrue((!(streamStateHandle instanceof PlaceholderStreamStateHandle) ? 1 : 0) != 0);
                            ((StreamStateHandle)Mockito.verify((Object)streamStateHandle, (VerificationMode)Mockito.never())).discardState();
                            ++sharedHandleCount;
                        }
                        for (StreamStateHandle streamStateHandle : incrementalKeyedStateHandle.getPrivateState().values()) {
                            ((StreamStateHandle)Mockito.verify((Object)streamStateHandle, (VerificationMode)Mockito.never())).discardState();
                        }
                        ((StreamStateHandle)Mockito.verify((Object)incrementalKeyedStateHandle.getMetaStateHandle(), (VerificationMode)Mockito.never())).discardState();
                    }
                    ((OperatorSubtaskState)Mockito.verify((Object)subtaskState, (VerificationMode)Mockito.never())).discardState();
                }
            }
            ++cp;
        }
        Assert.assertEquals((long)10L, (long)sharedHandleCount);
        store.removeOldestCheckpoint();
        for (Map map : sharedHandlesByCheckpoint) {
            for (StreamStateHandle streamStateHandle : map.values()) {
                ((StreamStateHandle)Mockito.verify((Object)streamStateHandle, (VerificationMode)Mockito.never())).discardState();
            }
        }
        store.shutdown(JobStatus.SUSPENDED);
        HashMap<JobVertexID, ExecutionJobVertex> tasks = new HashMap<JobVertexID, ExecutionJobVertex>();
        tasks.put(jobVertexID1, jobVertex1);
        coord.restoreLatestCheckpointedState(tasks, true, false);
        cp = 0;
        for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) {
            for (OperatorState operatorState : completedCheckpoint.getOperatorStates().values()) {
                for (OperatorSubtaskState subtaskState : operatorState.getStates()) {
                    for (KeyedStateHandle keyedStateHandle : subtaskState.getManagedKeyedState()) {
                        VerificationMode verificationMode = cp > 0 ? Mockito.times((int)1) : Mockito.never();
                        ((KeyedStateHandle)Mockito.verify((Object)keyedStateHandle, (VerificationMode)verificationMode)).registerSharedStates((SharedStateRegistry)createdSharedStateRegistries.get(1));
                    }
                }
            }
            ++cp;
        }
        store.removeOldestCheckpoint();
        for (Map map : sharedHandlesByCheckpoint) {
            for (Map.Entry entry : map.entrySet()) {
                String key = ((StateHandleID)entry.getKey()).getKeyString();
                int belongToCP = Integer.parseInt(String.valueOf(key.charAt(key.length() - 1)));
                if (belongToCP == 0) {
                    ((StreamStateHandle)Mockito.verify(entry.getValue(), (VerificationMode)Mockito.times((int)1))).discardState();
                    continue;
                }
                ((StreamStateHandle)Mockito.verify(entry.getValue(), (VerificationMode)Mockito.never())).discardState();
            }
        }
        store.removeOldestCheckpoint();
        for (Map map : sharedHandlesByCheckpoint) {
            for (StreamStateHandle streamStateHandle : map.values()) {
                ((StreamStateHandle)Mockito.verify((Object)streamStateHandle, (VerificationMode)Mockito.times((int)1))).discardState();
            }
        }
    }

    private void performIncrementalCheckpoint(JobID jid, CheckpointCoordinator coord, ExecutionJobVertex jobVertex1, List<KeyGroupRange> keyGroupPartitions1, long timestamp, int cpSequenceNumber) throws Exception {
        coord.triggerCheckpoint(timestamp, false);
        Assert.assertTrue((coord.getPendingCheckpoints().keySet().size() == 1 ? 1 : 0) != 0);
        long checkpointId = (Long)Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
        for (int index = 0; index < jobVertex1.getParallelism(); ++index) {
            KeyGroupRange keyGroupRange = keyGroupPartitions1.get(index);
            HashMap<StateHandleID, Object> privateState = new HashMap<StateHandleID, Object>();
            privateState.put(new StateHandleID("private-1"), Mockito.spy((Object)new ByteStreamStateHandle("private-1", new byte[]{112})));
            HashMap<StateHandleID, Object> sharedState = new HashMap<StateHandleID, Object>();
            if (cpSequenceNumber > 0) {
                sharedState.put(new StateHandleID("shared-" + (cpSequenceNumber - 1)), Mockito.spy((Object)new PlaceholderStreamStateHandle()));
            }
            sharedState.put(new StateHandleID("shared-" + cpSequenceNumber), Mockito.spy((Object)new ByteStreamStateHandle("shared-" + cpSequenceNumber + "-" + keyGroupRange, new byte[]{115})));
            IncrementalKeyedStateHandle managedState = (IncrementalKeyedStateHandle)Mockito.spy((Object)new IncrementalKeyedStateHandle(new UUID(42L, 42L), keyGroupRange, checkpointId, sharedState, privateState, (StreamStateHandle)Mockito.spy((Object)new ByteStreamStateHandle("meta", new byte[]{109}))));
            OperatorSubtaskState operatorSubtaskState = (OperatorSubtaskState)Mockito.spy((Object)new OperatorSubtaskState(Collections.emptyList(), Collections.emptyList(), Collections.singletonList(managedState), Collections.emptyList()));
            HashMap opStates = new HashMap();
            opStates.put(jobVertex1.getOperatorIDs().get(0), operatorSubtaskState);
            TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot(opStates);
            AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(jid, jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), checkpointId, new CheckpointMetrics(), taskStateSnapshot);
            coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
        }
    }
}

