/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.Closeable;
import java.util.concurrent.RunnableFuture;
import org.apache.commons.io.IOUtils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class StateSnapshotCompressionTest
extends TestLogger {
    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCompressionConfiguration() {
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setUseSnapshotCompression(true);
        HeapKeyedStateBackend stateBackend = new HeapKeyedStateBackend((TaskKvStateRegistry)Mockito.mock(TaskKvStateRegistry.class), (TypeSerializer)StringSerializer.INSTANCE, StateSnapshotCompressionTest.class.getClassLoader(), 16, new KeyGroupRange(0, 15), true, executionConfig, TestLocalRecoveryConfig.disabled(), (HeapPriorityQueueSetFactory)Mockito.mock(HeapPriorityQueueSetFactory.class), TtlTimeProvider.DEFAULT);
        try {
            Assert.assertTrue((boolean)SnappyStreamCompressionDecorator.INSTANCE.equals(stateBackend.getKeyGroupCompressionDecorator()));
        }
        finally {
            IOUtils.closeQuietly((Closeable)stateBackend);
            stateBackend.dispose();
        }
        executionConfig = new ExecutionConfig();
        executionConfig.setUseSnapshotCompression(false);
        stateBackend = new HeapKeyedStateBackend((TaskKvStateRegistry)Mockito.mock(TaskKvStateRegistry.class), (TypeSerializer)StringSerializer.INSTANCE, StateSnapshotCompressionTest.class.getClassLoader(), 16, new KeyGroupRange(0, 15), true, executionConfig, TestLocalRecoveryConfig.disabled(), (HeapPriorityQueueSetFactory)Mockito.mock(HeapPriorityQueueSetFactory.class), TtlTimeProvider.DEFAULT);
        try {
            Assert.assertTrue((boolean)UncompressedStreamCompressionDecorator.INSTANCE.equals(stateBackend.getKeyGroupCompressionDecorator()));
        }
        finally {
            IOUtils.closeQuietly((Closeable)stateBackend);
            stateBackend.dispose();
        }
    }

    @Test
    public void snapshotRestoreRoundtripWithCompression() throws Exception {
        this.snapshotRestoreRoundtrip(true);
    }

    @Test
    public void snapshotRestoreRoundtripUncompressed() throws Exception {
        this.snapshotRestoreRoundtrip(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void snapshotRestoreRoundtrip(boolean useCompression) throws Exception {
        InternalValueState state;
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setUseSnapshotCompression(useCompression);
        KeyedStateHandle stateHandle = null;
        ValueStateDescriptor stateDescriptor = new ValueStateDescriptor("test", String.class);
        stateDescriptor.initializeSerializerUnlessSet(executionConfig);
        HeapKeyedStateBackend stateBackend = new HeapKeyedStateBackend((TaskKvStateRegistry)Mockito.mock(TaskKvStateRegistry.class), (TypeSerializer)StringSerializer.INSTANCE, StateSnapshotCompressionTest.class.getClassLoader(), 16, new KeyGroupRange(0, 15), true, executionConfig, TestLocalRecoveryConfig.disabled(), (HeapPriorityQueueSetFactory)Mockito.mock(HeapPriorityQueueSetFactory.class), TtlTimeProvider.DEFAULT);
        try {
            state = (InternalValueState)stateBackend.createInternalState((TypeSerializer)new VoidNamespaceSerializer(), (StateDescriptor)stateDescriptor);
            stateBackend.setCurrentKey((Object)"A");
            state.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            state.update((Object)"42");
            stateBackend.setCurrentKey((Object)"B");
            state.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            state.update((Object)"43");
            stateBackend.setCurrentKey((Object)"C");
            state.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            state.update((Object)"44");
            stateBackend.setCurrentKey((Object)"D");
            state.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            state.update((Object)"45");
            MemCheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(0x400000);
            RunnableFuture snapshot = stateBackend.snapshot(0L, 0L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
            snapshot.run();
            SnapshotResult snapshotResult = (SnapshotResult)snapshot.get();
            stateHandle = (KeyedStateHandle)snapshotResult.getJobManagerOwnedSnapshot();
        }
        finally {
            IOUtils.closeQuietly((Closeable)stateBackend);
            stateBackend.dispose();
        }
        executionConfig = new ExecutionConfig();
        stateBackend = new HeapKeyedStateBackend((TaskKvStateRegistry)Mockito.mock(TaskKvStateRegistry.class), (TypeSerializer)StringSerializer.INSTANCE, StateSnapshotCompressionTest.class.getClassLoader(), 16, new KeyGroupRange(0, 15), true, executionConfig, TestLocalRecoveryConfig.disabled(), (HeapPriorityQueueSetFactory)Mockito.mock(HeapPriorityQueueSetFactory.class), TtlTimeProvider.DEFAULT);
        try {
            stateBackend.restore((Object)StateObjectCollection.singleton((StateObject)stateHandle));
            state = (InternalValueState)stateBackend.createInternalState((TypeSerializer)new VoidNamespaceSerializer(), (StateDescriptor)stateDescriptor);
            stateBackend.setCurrentKey((Object)"A");
            state.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            Assert.assertEquals((Object)"42", (Object)state.value());
            stateBackend.setCurrentKey((Object)"B");
            state.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            Assert.assertEquals((Object)"43", (Object)state.value());
            stateBackend.setCurrentKey((Object)"C");
            state.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            Assert.assertEquals((Object)"44", (Object)state.value());
            stateBackend.setCurrentKey((Object)"D");
            state.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            Assert.assertEquals((Object)"45", (Object)state.value());
        }
        finally {
            IOUtils.closeQuietly((Closeable)stateBackend);
            stateBackend.dispose();
        }
    }
}

