/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStoreTest;
import org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.shaded.curator.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.zookeeper.org.apache.zookeeper.data.Stat;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class ZooKeeperCompletedCheckpointStoreITCase
extends CompletedCheckpointStoreTest {
    private static final ZooKeeperTestEnvironment ZOOKEEPER = new ZooKeeperTestEnvironment(1);
    private static final String CHECKPOINT_PATH = "/checkpoints";

    @AfterClass
    public static void tearDown() throws Exception {
        if (ZOOKEEPER != null) {
            ZOOKEEPER.shutdown();
        }
    }

    @Before
    public void cleanUp() throws Exception {
        ZOOKEEPER.deleteAll();
    }

    protected ZooKeeperCompletedCheckpointStore createCompletedCheckpoints(int maxNumberOfCheckpointsToRetain) throws Exception {
        return new ZooKeeperCompletedCheckpointStore(maxNumberOfCheckpointsToRetain, ZOOKEEPER.getClient(), CHECKPOINT_PATH, (RetrievableStateStorageHelper)new HeapStateStorageHelper(), Executors.directExecutor());
    }

    @Test
    public void testRecover() throws Exception {
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        ZooKeeperCompletedCheckpointStore checkpoints = this.createCompletedCheckpoints(3);
        CompletedCheckpointStoreTest.TestCompletedCheckpoint[] expected = new CompletedCheckpointStoreTest.TestCompletedCheckpoint[]{ZooKeeperCompletedCheckpointStoreITCase.createCheckpoint(0, sharedStateRegistry), ZooKeeperCompletedCheckpointStoreITCase.createCheckpoint(1, sharedStateRegistry), ZooKeeperCompletedCheckpointStoreITCase.createCheckpoint(2, sharedStateRegistry)};
        checkpoints.addCheckpoint((CompletedCheckpoint)expected[0]);
        checkpoints.addCheckpoint((CompletedCheckpoint)expected[1]);
        checkpoints.addCheckpoint((CompletedCheckpoint)expected[2]);
        this.verifyCheckpointRegistered(expected[0].getOperatorStates().values(), sharedStateRegistry);
        this.verifyCheckpointRegistered(expected[1].getOperatorStates().values(), sharedStateRegistry);
        this.verifyCheckpointRegistered(expected[2].getOperatorStates().values(), sharedStateRegistry);
        Assert.assertEquals((long)3L, (long)((List)ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH)).size());
        Assert.assertEquals((long)3L, (long)checkpoints.getNumberOfRetainedCheckpoints());
        sharedStateRegistry.close();
        sharedStateRegistry = new SharedStateRegistry();
        checkpoints.recover();
        Assert.assertEquals((long)3L, (long)((List)ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH)).size());
        Assert.assertEquals((long)3L, (long)checkpoints.getNumberOfRetainedCheckpoints());
        Assert.assertEquals((Object)((Object)expected[2]), (Object)checkpoints.getLatestCheckpoint());
        ArrayList<CompletedCheckpointStoreTest.TestCompletedCheckpoint> expectedCheckpoints = new ArrayList<CompletedCheckpointStoreTest.TestCompletedCheckpoint>(3);
        expectedCheckpoints.add(expected[1]);
        expectedCheckpoints.add(expected[2]);
        expectedCheckpoints.add(ZooKeeperCompletedCheckpointStoreITCase.createCheckpoint(3, sharedStateRegistry));
        checkpoints.addCheckpoint((CompletedCheckpoint)expectedCheckpoints.get(2));
        List actualCheckpoints = checkpoints.getAllCheckpoints();
        Assert.assertEquals(expectedCheckpoints, (Object)actualCheckpoints);
        for (CompletedCheckpoint actualCheckpoint : actualCheckpoints) {
            this.verifyCheckpointRegistered(actualCheckpoint.getOperatorStates().values(), sharedStateRegistry);
        }
    }

    @Test
    public void testShutdownDiscardsCheckpoints() throws Exception {
        CuratorFramework client = ZOOKEEPER.getClient();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        ZooKeeperCompletedCheckpointStore store = this.createCompletedCheckpoints(1);
        CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint = ZooKeeperCompletedCheckpointStoreITCase.createCheckpoint(0, sharedStateRegistry);
        store.addCheckpoint((CompletedCheckpoint)checkpoint);
        Assert.assertEquals((long)1L, (long)store.getNumberOfRetainedCheckpoints());
        Assert.assertNotNull((Object)client.checkExists().forPath(CHECKPOINT_PATH + ZooKeeperCompletedCheckpointStore.checkpointIdToPath((long)checkpoint.getCheckpointID())));
        store.shutdown(JobStatus.FINISHED);
        Assert.assertEquals((long)0L, (long)store.getNumberOfRetainedCheckpoints());
        Assert.assertNull((Object)client.checkExists().forPath(CHECKPOINT_PATH + ZooKeeperCompletedCheckpointStore.checkpointIdToPath((long)checkpoint.getCheckpointID())));
        sharedStateRegistry.close();
        store.recover();
        Assert.assertEquals((long)0L, (long)store.getNumberOfRetainedCheckpoints());
    }

    @Test
    public void testSuspendKeepsCheckpoints() throws Exception {
        CuratorFramework client = ZOOKEEPER.getClient();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        ZooKeeperCompletedCheckpointStore store = this.createCompletedCheckpoints(1);
        CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint = ZooKeeperCompletedCheckpointStoreITCase.createCheckpoint(0, sharedStateRegistry);
        store.addCheckpoint((CompletedCheckpoint)checkpoint);
        Assert.assertEquals((long)1L, (long)store.getNumberOfRetainedCheckpoints());
        Assert.assertNotNull((Object)client.checkExists().forPath(CHECKPOINT_PATH + ZooKeeperCompletedCheckpointStore.checkpointIdToPath((long)checkpoint.getCheckpointID())));
        store.shutdown(JobStatus.SUSPENDED);
        Assert.assertEquals((long)0L, (long)store.getNumberOfRetainedCheckpoints());
        String checkpointPath = CHECKPOINT_PATH + ZooKeeperCompletedCheckpointStore.checkpointIdToPath((long)checkpoint.getCheckpointID());
        Stat stat = (Stat)client.checkExists().forPath(checkpointPath);
        Assert.assertNotNull((String)"The checkpoint node should exist.", (Object)stat);
        Assert.assertEquals((String)"The checkpoint node should not be locked.", (long)0L, (long)stat.getNumChildren());
        sharedStateRegistry.close();
        store.recover();
        CompletedCheckpoint recovered = store.getLatestCheckpoint();
        Assert.assertEquals((Object)((Object)checkpoint), (Object)recovered);
    }

    @Test
    public void testLatestCheckpointRecovery() throws Exception {
        int numCheckpoints = 3;
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        ZooKeeperCompletedCheckpointStore checkpointStore = this.createCompletedCheckpoints(3);
        ArrayList<CompletedCheckpointStoreTest.TestCompletedCheckpoint> checkpoints = new ArrayList<CompletedCheckpointStoreTest.TestCompletedCheckpoint>(3);
        checkpoints.add(ZooKeeperCompletedCheckpointStoreITCase.createCheckpoint(9, sharedStateRegistry));
        checkpoints.add(ZooKeeperCompletedCheckpointStoreITCase.createCheckpoint(10, sharedStateRegistry));
        checkpoints.add(ZooKeeperCompletedCheckpointStoreITCase.createCheckpoint(11, sharedStateRegistry));
        for (CompletedCheckpoint completedCheckpoint : checkpoints) {
            checkpointStore.addCheckpoint(completedCheckpoint);
        }
        sharedStateRegistry.close();
        checkpointStore.recover();
        CompletedCheckpoint latestCheckpoint = checkpointStore.getLatestCheckpoint();
        Assert.assertEquals(checkpoints.get(checkpoints.size() - 1), (Object)latestCheckpoint);
    }

    @Test
    public void testConcurrentCheckpointOperations() throws Exception {
        boolean numberOfCheckpoints = true;
        long waitingTimeout = 50L;
        ZooKeeperCompletedCheckpointStore zkCheckpointStore1 = this.createCompletedCheckpoints(1);
        ZooKeeperCompletedCheckpointStore zkCheckpointStore2 = this.createCompletedCheckpoints(1);
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        CompletedCheckpointStoreTest.TestCompletedCheckpoint completedCheckpoint = ZooKeeperCompletedCheckpointStoreITCase.createCheckpoint(1, sharedStateRegistry);
        zkCheckpointStore1.addCheckpoint((CompletedCheckpoint)completedCheckpoint);
        sharedStateRegistry.close();
        sharedStateRegistry = new SharedStateRegistry();
        zkCheckpointStore2.recover();
        CompletedCheckpoint recoveredCheckpoint = zkCheckpointStore2.getLatestCheckpoint();
        Assert.assertTrue((boolean)(recoveredCheckpoint instanceof CompletedCheckpointStoreTest.TestCompletedCheckpoint));
        CompletedCheckpointStoreTest.TestCompletedCheckpoint recoveredTestCheckpoint = (CompletedCheckpointStoreTest.TestCompletedCheckpoint)recoveredCheckpoint;
        Assert.assertFalse((boolean)recoveredTestCheckpoint.isDiscarded());
        CompletedCheckpointStoreTest.TestCompletedCheckpoint completedCheckpoint2 = ZooKeeperCompletedCheckpointStoreITCase.createCheckpoint(2, sharedStateRegistry);
        zkCheckpointStore1.addCheckpoint((CompletedCheckpoint)completedCheckpoint2);
        List allCheckpoints = zkCheckpointStore1.getAllCheckpoints();
        Assert.assertEquals(Collections.singletonList(completedCheckpoint2), (Object)allCheckpoints);
        Assert.assertFalse((String)"The checkpoint should not have been discarded.", (boolean)recoveredTestCheckpoint.awaitDiscard(50L));
        Assert.assertFalse((boolean)recoveredTestCheckpoint.isDiscarded());
        CompletedCheckpointStoreTest.TestCompletedCheckpoint completedCheckpoint3 = ZooKeeperCompletedCheckpointStoreITCase.createCheckpoint(3, sharedStateRegistry);
        zkCheckpointStore2.addCheckpoint((CompletedCheckpoint)completedCheckpoint3);
        recoveredTestCheckpoint.awaitDiscard();
    }

    static class HeapRetrievableStateHandle<T extends Serializable>
    implements RetrievableStateHandle<T> {
        private static final long serialVersionUID = -268548467968932L;
        private static AtomicInteger nextKey = new AtomicInteger(0);
        private static HashMap<Integer, Object> stateMap = new HashMap();
        private final int key = nextKey.getAndIncrement();

        public HeapRetrievableStateHandle(T state) {
            stateMap.put(this.key, state);
        }

        public T retrieveState() {
            return (T)((Serializable)stateMap.get(this.key));
        }

        public void discardState() throws Exception {
            stateMap.remove(this.key);
        }

        public long getStateSize() {
            return 0L;
        }
    }

    static class HeapStateStorageHelper
    implements RetrievableStateStorageHelper<CompletedCheckpoint> {
        HeapStateStorageHelper() {
        }

        public RetrievableStateHandle<CompletedCheckpoint> store(CompletedCheckpoint state) throws Exception {
            return new HeapRetrievableStateHandle<CompletedCheckpoint>(state);
        }
    }
}

