/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.PrimitiveIterator;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.ParameterlessTypeSerializerConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
import org.apache.flink.api.common.typeutils.base.FloatSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateRegistryListener;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.InternalPriorityQueueTestBase;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.heap.AbstractHeapState;
import org.apache.flink.runtime.state.heap.NestedMapsStateTable;
import org.apache.flink.runtime.state.heap.StateTable;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.shaded.guava18.com.google.common.base.Joiner;
import org.apache.flink.testutils.ArtificialCNFExceptionThrowingClassLoader;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.StateMigrationException;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.core.Is;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public abstract class StateBackendTestBase<B extends AbstractStateBackend>
extends TestLogger {
    @Rule
    public final ExpectedException expectedException = ExpectedException.none();
    private CheckpointStorageLocation checkpointStorageLocation;

    protected abstract B getStateBackend() throws Exception;

    protected abstract boolean isSerializerPresenceRequiredOnRestore();

    protected CheckpointStreamFactory createStreamFactory() throws Exception {
        if (this.checkpointStorageLocation == null) {
            this.checkpointStorageLocation = this.getStateBackend().createCheckpointStorage(new JobID()).initializeLocationForCheckpoint(1L);
        }
        return this.checkpointStorageLocation;
    }

    protected <K> AbstractKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> keySerializer) throws Exception {
        return this.createKeyedBackend(keySerializer, new DummyEnvironment());
    }

    protected <K> AbstractKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> keySerializer, Environment env) throws Exception {
        return this.createKeyedBackend(keySerializer, 10, new KeyGroupRange(0, 9), env);
    }

    protected <K> AbstractKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, Environment env) throws Exception {
        AbstractKeyedStateBackend backend = this.getStateBackend().createKeyedStateBackend(env, new JobID(), "test_op", keySerializer, numberOfKeyGroups, keyGroupRange, env.getTaskKvStateRegistry(), TtlTimeProvider.DEFAULT);
        backend.restore(null);
        return backend;
    }

    protected <K> AbstractKeyedStateBackend<K> restoreKeyedBackend(TypeSerializer<K> keySerializer, KeyedStateHandle state) throws Exception {
        return this.restoreKeyedBackend(keySerializer, state, new DummyEnvironment());
    }

    protected <K> AbstractKeyedStateBackend<K> restoreKeyedBackend(TypeSerializer<K> keySerializer, KeyedStateHandle state, Environment env) throws Exception {
        return this.restoreKeyedBackend(keySerializer, 10, new KeyGroupRange(0, 9), Collections.singletonList(state), env);
    }

    protected <K> AbstractKeyedStateBackend<K> restoreKeyedBackend(TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, List<KeyedStateHandle> state, Environment env) throws Exception {
        AbstractKeyedStateBackend backend = this.getStateBackend().createKeyedStateBackend(env, new JobID(), "test_op", keySerializer, numberOfKeyGroups, keyGroupRange, env.getTaskKvStateRegistry(), TtlTimeProvider.DEFAULT);
        backend.restore((Object)new StateObjectCollection(state));
        return backend;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testGetKeys() throws Exception {
        int namespace1ElementsNum = 1000;
        int namespace2ElementsNum = 1000;
        String fieldName = "get-keys-test";
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            int expectedKey;
            PrimitiveIterator.OfInt actualIterator;
            String ns1 = "ns1";
            ValueState keyedState1 = (ValueState)backend.getPartitionedState((Object)"ns1", (TypeSerializer)StringSerializer.INSTANCE, (StateDescriptor)new ValueStateDescriptor(fieldName, (TypeSerializer)IntSerializer.INSTANCE));
            for (int key = 0; key < 1000; ++key) {
                backend.setCurrentKey((Object)key);
                keyedState1.update((Object)(key * 2));
            }
            String ns2 = "ns2";
            ValueState keyedState2 = (ValueState)backend.getPartitionedState((Object)"ns2", (TypeSerializer)StringSerializer.INSTANCE, (StateDescriptor)new ValueStateDescriptor(fieldName, (TypeSerializer)IntSerializer.INSTANCE));
            for (int key = 1000; key < 2000; ++key) {
                backend.setCurrentKey((Object)key);
                keyedState2.update((Object)(key * 2));
            }
            try (Stream<Integer> keysStream = backend.getKeys(fieldName, (Object)"ns1").sorted();){
                actualIterator = keysStream.mapToInt(value -> value).iterator();
                for (expectedKey = 0; expectedKey < 1000; ++expectedKey) {
                    Assert.assertTrue((boolean)actualIterator.hasNext());
                    Assert.assertEquals((long)expectedKey, (long)actualIterator.nextInt());
                }
                Assert.assertFalse((boolean)actualIterator.hasNext());
            }
            keysStream = backend.getKeys(fieldName, (Object)"ns2").sorted();
            var10_12 = null;
            try {
                actualIterator = keysStream.mapToInt(value -> value).iterator();
                for (expectedKey = 1000; expectedKey < 2000; ++expectedKey) {
                    Assert.assertTrue((boolean)actualIterator.hasNext());
                    Assert.assertEquals((long)expectedKey, (long)actualIterator.nextInt());
                }
                Assert.assertFalse((boolean)actualIterator.hasNext());
            }
            catch (Throwable throwable) {
                var10_12 = throwable;
                throw throwable;
            }
            finally {
                if (keysStream != null) {
                    if (var10_12 != null) {
                        try {
                            keysStream.close();
                        }
                        catch (Throwable throwable) {
                            var10_12.addSuppressed(throwable);
                        }
                    } else {
                        keysStream.close();
                    }
                }
            }
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    @Test
    public void testBackendUsesRegisteredKryoDefaultSerializer() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        DummyEnvironment env = new DummyEnvironment();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, env);
        env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, ExceptionThrowingTestSerializer.class);
        GenericTypeInfo pojoType = new GenericTypeInfo(TestPojo.class);
        Assert.assertTrue((boolean)(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer));
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
        ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        int numExceptions = 0;
        backend.setCurrentKey((Object)1);
        try {
            state.update((Object)new TestPojo("u1", 1));
        }
        catch (ExpectedKryoTestException e) {
            ++numExceptions;
        }
        catch (Exception e) {
            if (e.getCause() instanceof ExpectedKryoTestException) {
                ++numExceptions;
            }
            throw e;
        }
        try {
            this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        }
        catch (ExpectedKryoTestException e) {
            ++numExceptions;
        }
        catch (Exception e) {
            if (e.getCause() instanceof ExpectedKryoTestException) {
                ++numExceptions;
            }
            throw e;
        }
        Assert.assertEquals((String)"Didn't see the expected Kryo exception.", (long)1L, (long)numExceptions);
        backend.dispose();
    }

    @Test
    public void testBackendUsesRegisteredKryoDefaultSerializerUsingGetOrCreate() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        DummyEnvironment env = new DummyEnvironment();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, env);
        env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, ExceptionThrowingTestSerializer.class);
        GenericTypeInfo pojoType = new GenericTypeInfo(TestPojo.class);
        Assert.assertTrue((boolean)(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer));
        pojoType.createSerializer(env.getExecutionConfig());
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
        ValueState state = (ValueState)backend.getOrCreateKeyedState((TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        Assert.assertTrue((boolean)(state instanceof InternalValueState));
        ((InternalValueState)state).setCurrentNamespace((Object)VoidNamespace.INSTANCE);
        int numExceptions = 0;
        backend.setCurrentKey((Object)1);
        try {
            state.update((Object)new TestPojo("u1", 1));
        }
        catch (ExpectedKryoTestException e) {
            ++numExceptions;
        }
        catch (Exception e) {
            if (e.getCause() instanceof ExpectedKryoTestException) {
                ++numExceptions;
            }
            throw e;
        }
        try {
            this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        }
        catch (ExpectedKryoTestException e) {
            ++numExceptions;
        }
        catch (Exception e) {
            if (e.getCause() instanceof ExpectedKryoTestException) {
                ++numExceptions;
            }
            throw e;
        }
        Assert.assertEquals((String)"Didn't see the expected Kryo exception.", (long)1L, (long)numExceptions);
        backend.dispose();
    }

    @Test
    public void testBackendUsesRegisteredKryoSerializer() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        DummyEnvironment env = new DummyEnvironment();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, env);
        env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, ExceptionThrowingTestSerializer.class);
        GenericTypeInfo pojoType = new GenericTypeInfo(TestPojo.class);
        Assert.assertTrue((boolean)(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer));
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
        ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        int numExceptions = 0;
        backend.setCurrentKey((Object)1);
        try {
            state.update((Object)new TestPojo("u1", 1));
        }
        catch (ExpectedKryoTestException e) {
            ++numExceptions;
        }
        catch (Exception e) {
            if (e.getCause() instanceof ExpectedKryoTestException) {
                ++numExceptions;
            }
            throw e;
        }
        try {
            this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        }
        catch (ExpectedKryoTestException e) {
            ++numExceptions;
        }
        catch (Exception e) {
            if (e.getCause() instanceof ExpectedKryoTestException) {
                ++numExceptions;
            }
            throw e;
        }
        Assert.assertEquals((String)"Didn't see the expected Kryo exception.", (long)1L, (long)numExceptions);
        backend.dispose();
    }

    @Test
    public void testBackendUsesRegisteredKryoSerializerUsingGetOrCreate() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        DummyEnvironment env = new DummyEnvironment();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, env);
        env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, ExceptionThrowingTestSerializer.class);
        GenericTypeInfo pojoType = new GenericTypeInfo(TestPojo.class);
        Assert.assertTrue((boolean)(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer));
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
        ValueState state = (ValueState)backend.getOrCreateKeyedState((TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        Assert.assertTrue((boolean)(state instanceof InternalValueState));
        ((InternalValueState)state).setCurrentNamespace((Object)VoidNamespace.INSTANCE);
        int numExceptions = 0;
        backend.setCurrentKey((Object)1);
        try {
            state.update((Object)new TestPojo("u1", 1));
        }
        catch (ExpectedKryoTestException e) {
            ++numExceptions;
        }
        catch (Exception e) {
            if (e.getCause() instanceof ExpectedKryoTestException) {
                ++numExceptions;
            }
            throw e;
        }
        try {
            this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        }
        catch (ExpectedKryoTestException e) {
            ++numExceptions;
        }
        catch (Exception e) {
            if (e.getCause() instanceof ExpectedKryoTestException) {
                ++numExceptions;
            }
            throw e;
        }
        Assert.assertEquals((String)"Didn't see the expected Kryo exception.", (long)1L, (long)numExceptions);
        backend.dispose();
    }

    @Test
    public void testKryoRegisteringRestoreResilienceWithRegisteredType() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        DummyEnvironment env = new DummyEnvironment();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, env);
        GenericTypeInfo pojoType = new GenericTypeInfo(TestPojo.class);
        Assert.assertTrue((boolean)(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer));
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
        ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.setCurrentKey((Object)1);
        state.update((Object)new TestPojo("u1", 1));
        backend.setCurrentKey((Object)2);
        state.update((Object)new TestPojo("u2", 2));
        KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        backend.dispose();
        env.getExecutionConfig().registerKryoType(TestPojo.class);
        backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot, env);
        snapshot.discardState();
        state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)state.value(), (Object)new TestPojo("u1", 1));
        backend.setCurrentKey((Object)2);
        Assert.assertEquals((Object)state.value(), (Object)new TestPojo("u2", 2));
        backend.dispose();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testKryoRegisteringRestoreResilienceWithDefaultSerializer() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        DummyEnvironment env = new DummyEnvironment();
        AbstractKeyedStateBackend backend = null;
        try {
            backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, env);
            GenericTypeInfo pojoType = new GenericTypeInfo(TestPojo.class);
            Assert.assertTrue((boolean)(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer));
            ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.update((Object)new TestPojo("u1", 1));
            backend.setCurrentKey((Object)2);
            state.update((Object)new TestPojo("u2", 2));
            KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.dispose();
            env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, CustomKryoTestSerializer.class);
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot, env);
            kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
            state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.update((Object)new TestPojo("u1", 11));
            KeyedStateHandle snapshot2 = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            snapshot.discardState();
            backend.dispose();
            env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, CustomKryoTestSerializer.class);
            this.expectedException.expect(ExpectedKryoTestException.class);
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot2, env);
            state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.value();
            snapshot2.discardState();
            backend.dispose();
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testKryoRegisteringRestoreResilienceWithRegisteredSerializer() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        DummyEnvironment env = new DummyEnvironment();
        AbstractKeyedStateBackend backend = null;
        try {
            backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, env);
            GenericTypeInfo pojoType = new GenericTypeInfo(TestPojo.class);
            Assert.assertTrue((boolean)(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer));
            ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.update((Object)new TestPojo("u1", 1));
            backend.setCurrentKey((Object)2);
            state.update((Object)new TestPojo("u2", 2));
            KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.dispose();
            env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, CustomKryoTestSerializer.class);
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot, env);
            kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
            state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.update((Object)new TestPojo("u1", 11));
            KeyedStateHandle snapshot2 = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            snapshot.discardState();
            backend.dispose();
            env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, CustomKryoTestSerializer.class);
            this.expectedException.expect(ExpectedKryoTestException.class);
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot2, env);
            state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.value();
            backend.dispose();
        }
        finally {
            if (backend != null) {
                backend.dispose();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testKryoRestoreResilienceWithDifferentRegistrationOrder() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        DummyEnvironment env = new DummyEnvironment();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        env.getExecutionConfig().registerKryoType(TestNestedPojoClassA.class);
        env.getExecutionConfig().registerKryoType(TestNestedPojoClassB.class);
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, env);
        try {
            GenericTypeInfo pojoType = new GenericTypeInfo(TestPojo.class);
            Assert.assertTrue((boolean)(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer));
            ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            InternalKvState internalKvState = (InternalKvState)state;
            KryoSerializer kryoSerializer = (KryoSerializer)internalKvState.getValueSerializer();
            int mainPojoClassRegistrationId = kryoSerializer.getKryo().getRegistration(TestPojo.class).getId();
            int nestedPojoClassARegistrationId = kryoSerializer.getKryo().getRegistration(TestNestedPojoClassA.class).getId();
            int nestedPojoClassBRegistrationId = kryoSerializer.getKryo().getRegistration(TestNestedPojoClassB.class).getId();
            backend.setCurrentKey((Object)1);
            state.update((Object)new TestPojo("u1", 1, new TestNestedPojoClassA(1.0, 2), new TestNestedPojoClassB(2.3, "foo")));
            backend.setCurrentKey((Object)2);
            state.update((Object)new TestPojo("u2", 2, new TestNestedPojoClassA(2.0, 5), new TestNestedPojoClassB(3.1, "bar")));
            KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.dispose();
            env = new DummyEnvironment();
            env.getExecutionConfig().registerKryoType(TestNestedPojoClassB.class);
            env.getExecutionConfig().registerKryoType(TestNestedPojoClassA.class);
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot, env);
            kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
            state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            internalKvState = (InternalKvState)state;
            kryoSerializer = (KryoSerializer)internalKvState.getValueSerializer();
            Assert.assertEquals((long)mainPojoClassRegistrationId, (long)kryoSerializer.getKryo().getRegistration(TestPojo.class).getId());
            Assert.assertEquals((long)nestedPojoClassARegistrationId, (long)kryoSerializer.getKryo().getRegistration(TestNestedPojoClassA.class).getId());
            Assert.assertEquals((long)nestedPojoClassBRegistrationId, (long)kryoSerializer.getKryo().getRegistration(TestNestedPojoClassB.class).getId());
            backend.setCurrentKey((Object)1);
            state.update((Object)new TestPojo("u1", 11, new TestNestedPojoClassA(22.1, 12), new TestNestedPojoClassB(1.23, "foobar")));
            this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            snapshot.discardState();
        }
        finally {
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPojoRestoreResilienceWithDifferentRegistrationOrder() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        DummyEnvironment env = new DummyEnvironment();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        env.getExecutionConfig().registerPojoType(TestNestedPojoClassA.class);
        env.getExecutionConfig().registerPojoType(TestNestedPojoClassB.class);
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, env);
        try {
            TypeInformation pojoType = TypeExtractor.getForClass(TestPojo.class);
            Assert.assertTrue((boolean)(pojoType.createSerializer(env.getExecutionConfig()) instanceof PojoSerializer));
            ValueStateDescriptor kvId = new ValueStateDescriptor("id", pojoType);
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.update((Object)new TestPojo("u1", 1, new TestNestedPojoClassA(1.0, 2), new TestNestedPojoClassB(2.3, "foo")));
            backend.setCurrentKey((Object)2);
            state.update((Object)new TestPojo("u2", 2, new TestNestedPojoClassA(2.0, 5), new TestNestedPojoClassB(3.1, "bar")));
            KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.dispose();
            env = new DummyEnvironment();
            env.getExecutionConfig().registerPojoType(TestNestedPojoClassB.class);
            env.getExecutionConfig().registerPojoType(TestNestedPojoClassA.class);
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot, env);
            kvId = new ValueStateDescriptor("id", pojoType);
            state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.update((Object)new TestPojo("u1", 11, new TestNestedPojoClassA(22.1, 12), new TestNestedPojoClassB(1.23, "foobar")));
            this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            snapshot.discardState();
        }
        finally {
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testStateSerializerReconfiguration() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        DummyEnvironment env = new DummyEnvironment();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, env);
        try {
            ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeSerializer)new TestReconfigurableCustomTypeSerializer());
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.update((Object)new TestCustomStateClass("test-message-1", "this-should-be-ignored"));
            backend.setCurrentKey((Object)2);
            state.update((Object)new TestCustomStateClass("test-message-2", "this-should-be-ignored"));
            InternalKvState internal = (InternalKvState)state;
            Assert.assertTrue((boolean)(internal.getValueSerializer() instanceof TestReconfigurableCustomTypeSerializer));
            Assert.assertFalse((boolean)((TestReconfigurableCustomTypeSerializer)internal.getValueSerializer()).isReconfigured());
            KeyedStateHandle snapshot1 = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.dispose();
            env = new DummyEnvironment();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot1, env);
            kvId = new ValueStateDescriptor("id", (TypeSerializer)new TestReconfigurableCustomTypeSerializer());
            state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            internal = (InternalKvState)state;
            Assert.assertTrue((boolean)(internal.getValueSerializer() instanceof TestReconfigurableCustomTypeSerializer));
            Assert.assertTrue((boolean)((TestReconfigurableCustomTypeSerializer)internal.getValueSerializer()).isReconfigured());
            backend.setCurrentKey((Object)1);
            TestCustomStateClass restoredState1 = (TestCustomStateClass)state.value();
            Assert.assertEquals((Object)"test-message-1", (Object)restoredState1.getMessage());
            Assert.assertNull((Object)restoredState1.getExtraMessage());
            state.update((Object)new TestCustomStateClass("new-test-message-1", "extra-message-1"));
            backend.setCurrentKey((Object)2);
            TestCustomStateClass restoredState2 = (TestCustomStateClass)state.value();
            Assert.assertEquals((Object)"test-message-2", (Object)restoredState2.getMessage());
            Assert.assertNull((Object)restoredState1.getExtraMessage());
            state.update((Object)new TestCustomStateClass("new-test-message-2", "extra-message-2"));
            KeyedStateHandle snapshot2 = this.runSnapshot(backend.snapshot(682375462379L, 3L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            snapshot1.discardState();
            backend.dispose();
            env = new DummyEnvironment();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot2, env);
            snapshot2.discardState();
            kvId = new ValueStateDescriptor("id", (TypeSerializer)new TestReconfigurableCustomTypeSerializer());
            state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            internal = (InternalKvState)state;
            Assert.assertTrue((boolean)(internal.getValueSerializer() instanceof TestReconfigurableCustomTypeSerializer));
            Assert.assertTrue((boolean)((TestReconfigurableCustomTypeSerializer)internal.getValueSerializer()).isReconfigured());
            backend.setCurrentKey((Object)1);
            restoredState1 = (TestCustomStateClass)state.value();
            Assert.assertEquals((Object)"new-test-message-1", (Object)restoredState1.getMessage());
            Assert.assertEquals((Object)"extra-message-1", (Object)restoredState1.getExtraMessage());
            backend.setCurrentKey((Object)2);
            restoredState2 = (TestCustomStateClass)state.value();
            Assert.assertEquals((Object)"new-test-message-2", (Object)restoredState2.getMessage());
            Assert.assertEquals((Object)"extra-message-2", (Object)restoredState2.getExtraMessage());
        }
        finally {
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSerializerPresenceOnRestore() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        DummyEnvironment env = new DummyEnvironment();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, env);
        try {
            ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeSerializer)new TestReconfigurableCustomTypeSerializerPreUpgrade());
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.update((Object)new TestCustomStateClass("test-message-1", "this-should-be-ignored"));
            backend.setCurrentKey((Object)2);
            state.update((Object)new TestCustomStateClass("test-message-2", "this-should-be-ignored"));
            KeyedStateHandle snapshot1 = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.dispose();
            env = new DummyEnvironment((ClassLoader)new ArtificialCNFExceptionThrowingClassLoader(((Object)((Object)this)).getClass().getClassLoader(), Collections.singleton(TestReconfigurableCustomTypeSerializerPreUpgrade.class.getName())));
            try {
                backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot1, env);
            }
            catch (IOException e) {
                if (!this.isSerializerPresenceRequiredOnRestore()) {
                    Assert.fail((String)"Presence of old serializer should not have been required.");
                }
                backend.dispose();
                return;
            }
            kvId = new ValueStateDescriptor("id", (TypeSerializer)new TestReconfigurableCustomTypeSerializerUpgraded());
            state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.update((Object)new TestCustomStateClass("new-test-message-1", "extra-message-1"));
            backend.setCurrentKey((Object)2);
            state.update((Object)new TestCustomStateClass("new-test-message-2", "extra-message-2"));
            KeyedStateHandle snapshot2 = this.runSnapshot(backend.snapshot(682375462379L, 3L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            snapshot1.discardState();
        }
        finally {
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPriorityQueueSerializerUpdates() throws Exception {
        String stateName = "test";
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            InternalPriorityQueueTestBase.TestElementSerializer serializer = InternalPriorityQueueTestBase.TestElementSerializer.INSTANCE;
            KeyGroupedInternalPriorityQueue priorityQueue = keyedBackend.create("test", (TypeSerializer)serializer);
            priorityQueue.add((Object)new InternalPriorityQueueTestBase.TestElement(42L, 0L));
            RunnableFuture snapshot = keyedBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
            KeyedStateHandle keyedStateHandle = this.runSnapshot(snapshot, sharedStateRegistry);
            keyedBackend.dispose();
            keyedBackend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, keyedStateHandle);
            serializer = new ModifiedTestElementSerializer();
            priorityQueue = keyedBackend.create("test", (TypeSerializer)serializer);
            InternalPriorityQueueTestBase.TestElement checkElement = new InternalPriorityQueueTestBase.TestElement(4711L, 1L);
            priorityQueue.add((Object)checkElement);
            snapshot = keyedBackend.snapshot(1L, 1L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
            keyedStateHandle = this.runSnapshot(snapshot, sharedStateRegistry);
            keyedBackend.dispose();
            keyedBackend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, keyedStateHandle);
            priorityQueue = keyedBackend.create("test", (TypeSerializer)serializer);
            priorityQueue.poll();
            ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos();
            DataOutputViewStreamWrapper outWrapper = new DataOutputViewStreamWrapper((OutputStream)out);
            serializer.serialize(checkElement, (DataOutputView)outWrapper);
            InternalPriorityQueueTestBase.TestElement expected = (InternalPriorityQueueTestBase.TestElement)serializer.deserialize((DataInputView)new DataInputViewStreamWrapper((InputStream)new ByteArrayInputStreamWithPos(out.toByteArray())));
            Assert.assertEquals((Object)expected, (Object)priorityQueue.poll());
            Assert.assertTrue((boolean)priorityQueue.isEmpty());
            keyedBackend.dispose();
            serializer = InternalPriorityQueueTestBase.TestElementSerializer.INSTANCE;
            keyedBackend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, keyedStateHandle);
            try {
                keyedBackend.create("test", (TypeSerializer)serializer);
                Assert.fail((String)"Expected exception from incompatible serializer.");
            }
            catch (Exception e) {
                Assert.assertTrue((String)("Exception was not caused by state migration: " + e), (boolean)ExceptionUtils.findThrowable((Throwable)e, StateMigrationException.class).isPresent());
            }
        }
        finally {
            keyedBackend.dispose();
        }
    }

    @Test
    public void testValueState() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class);
        IntSerializer keySerializer = IntSerializer.INSTANCE;
        VoidNamespaceSerializer namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
        ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        InternalKvState kvState = (InternalKvState)state;
        TypeSerializer valueSerializer = kvId.getSerializer();
        backend.setCurrentKey((Object)1);
        Assert.assertNull((Object)state.value());
        Assert.assertNull(StateBackendTestBase.getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        state.update((Object)"1");
        backend.setCurrentKey((Object)2);
        Assert.assertNull((Object)state.value());
        Assert.assertNull(StateBackendTestBase.getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        state.update((Object)"2");
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"1", (Object)state.value());
        Assert.assertEquals((Object)"1", StateBackendTestBase.getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        KeyedStateHandle snapshot1 = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        backend.setCurrentKey((Object)1);
        state.update((Object)"u1");
        backend.setCurrentKey((Object)2);
        state.update((Object)"u2");
        backend.setCurrentKey((Object)3);
        state.update((Object)"u3");
        KeyedStateHandle snapshot2 = this.runSnapshot(backend.snapshot(682375462379L, 4L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"u1", (Object)state.value());
        Assert.assertEquals((Object)"u1", StateBackendTestBase.getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.setCurrentKey((Object)2);
        Assert.assertEquals((Object)"u2", (Object)state.value());
        Assert.assertEquals((Object)"u2", StateBackendTestBase.getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.setCurrentKey((Object)3);
        Assert.assertEquals((Object)"u3", (Object)state.value());
        Assert.assertEquals((Object)"u3", StateBackendTestBase.getSerializedValue(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.dispose();
        backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot1);
        snapshot1.discardState();
        ValueState restored1 = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        InternalKvState restoredKvState1 = (InternalKvState)restored1;
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"1", (Object)restored1.value());
        Assert.assertEquals((Object)"1", StateBackendTestBase.getSerializedValue(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.setCurrentKey((Object)2);
        Assert.assertEquals((Object)"2", (Object)restored1.value());
        Assert.assertEquals((Object)"2", StateBackendTestBase.getSerializedValue(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.dispose();
        backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot2);
        snapshot2.discardState();
        ValueState restored2 = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        InternalKvState restoredKvState2 = (InternalKvState)restored2;
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"u1", (Object)restored2.value());
        Assert.assertEquals((Object)"u1", StateBackendTestBase.getSerializedValue(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.setCurrentKey((Object)2);
        Assert.assertEquals((Object)"u2", (Object)restored2.value());
        Assert.assertEquals((Object)"u2", StateBackendTestBase.getSerializedValue(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.setCurrentKey((Object)3);
        Assert.assertEquals((Object)"u3", (Object)restored2.value());
        Assert.assertEquals((Object)"u3", StateBackendTestBase.getSerializedValue(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.dispose();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testValueStateWorkWithTtl() throws Exception {
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ValueStateDescriptor kvId = new ValueStateDescriptor("id", MutableLong.class);
            kvId.enableTimeToLive(StateTtlConfig.newBuilder((Time)Time.seconds((long)1L)).build());
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.update((Object)new MutableLong());
            state.value();
        }
        finally {
            backend.close();
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testValueStateRace() throws Exception {
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        Integer namespace = 1;
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class);
        IntSerializer keySerializer = IntSerializer.INSTANCE;
        IntSerializer namespaceSerializer = IntSerializer.INSTANCE;
        final ValueState state = (ValueState)backend.getPartitionedState((Object)namespace, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)kvId);
        TypeSerializer valueSerializer = kvId.getSerializer();
        final InternalKvState kvState = (InternalKvState)state;
        boolean key1 = true;
        backend.setCurrentKey((Object)1);
        kvState.setCurrentNamespace((Object)2);
        state.update((Object)"2");
        Assert.assertEquals((Object)"2", (Object)state.value());
        Assert.assertNull(StateBackendTestBase.getSerializedValue(kvState, 3, keySerializer, namespace, IntSerializer.INSTANCE, valueSerializer));
        Assert.assertEquals((Object)"2", (Object)state.value());
        kvState.setCurrentNamespace((Object)namespace);
        int key2 = 10;
        backend.setCurrentKey((Object)10);
        Assert.assertNull((Object)state.value());
        Assert.assertNull(StateBackendTestBase.getSerializedValue(kvState, 10, keySerializer, namespace, namespaceSerializer, valueSerializer));
        state.update((Object)"1");
        final CheckedThread getter = new CheckedThread("State getter"){

            public void go() throws Exception {
                while (!this.isInterrupted()) {
                    Assert.assertEquals((Object)"1", (Object)state.value());
                }
            }
        };
        final CheckedThread serializedGetter = new CheckedThread("Serialized state getter", (TypeSerializer)keySerializer, namespace, (TypeSerializer)namespaceSerializer, valueSerializer){
            final /* synthetic */ TypeSerializer val$keySerializer;
            final /* synthetic */ Integer val$namespace;
            final /* synthetic */ TypeSerializer val$namespaceSerializer;
            final /* synthetic */ TypeSerializer val$valueSerializer;
            {
                this.val$keySerializer = typeSerializer;
                this.val$namespace = n;
                this.val$namespaceSerializer = typeSerializer2;
                this.val$valueSerializer = typeSerializer3;
                super(x0);
            }

            public void go() throws Exception {
                while (!this.isInterrupted() && getter.isAlive()) {
                    String serializedValue = (String)StateBackendTestBase.getSerializedValue(kvState, 10, this.val$keySerializer, this.val$namespace, this.val$namespaceSerializer, this.val$valueSerializer);
                    Assert.assertEquals((Object)"1", (Object)serializedValue);
                }
            }
        };
        getter.start();
        serializedGetter.start();
        Timer t = new Timer("stopper");
        t.schedule(new TimerTask(){

            @Override
            public void run() {
                getter.interrupt();
                serializedGetter.interrupt();
                this.cancel();
            }
        }, 100L);
        try {
            serializedGetter.sync();
            getter.interrupt();
            getter.sync();
            t.cancel();
        }
        finally {
            backend.dispose();
        }
    }

    @Test
    public void testMultipleValueStates() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), new DummyEnvironment());
        ValueStateDescriptor desc1 = new ValueStateDescriptor("a-string", (TypeSerializer)StringSerializer.INSTANCE);
        ValueStateDescriptor desc2 = new ValueStateDescriptor("an-integer", (TypeSerializer)IntSerializer.INSTANCE);
        ValueState state1 = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc1);
        ValueState state2 = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc2);
        backend.setCurrentKey((Object)1);
        Assert.assertNull((Object)state1.value());
        Assert.assertNull((Object)state2.value());
        state1.update((Object)"1");
        Assert.assertEquals((Object)"1", (Object)state1.value());
        Assert.assertNull((Object)state2.value());
        state2.update((Object)13);
        Assert.assertEquals((Object)"1", (Object)state1.value());
        Assert.assertEquals((long)13L, (long)((Integer)state2.value()).intValue());
        KeyedStateHandle snapshot1 = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        backend.dispose();
        backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), Collections.singletonList(snapshot1), new DummyEnvironment());
        snapshot1.discardState();
        backend.setCurrentKey((Object)1);
        state1 = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc1);
        state2 = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc2);
        Assert.assertEquals((Object)"1", (Object)state1.value());
        Assert.assertEquals((long)13L, (long)((Integer)state2.value()).intValue());
        backend.dispose();
    }

    @Test
    public void testValueStateNullUpdate() throws Exception {
        try {
            LongSerializer.INSTANCE.serialize(null, (DataOutputView)new DataOutputViewStreamWrapper((OutputStream)new ByteArrayOutputStream()));
            Assert.fail((String)"Should fail with NullPointerException");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeSerializer)LongSerializer.INSTANCE, (Object)42L);
        ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((long)42L, (long)((Long)state.value()));
        state.update((Object)1L);
        Assert.assertEquals((long)1L, (long)((Long)state.value()));
        backend.setCurrentKey((Object)2);
        Assert.assertEquals((long)42L, (long)((Long)state.value()));
        backend.setCurrentKey((Object)1);
        state.clear();
        Assert.assertEquals((long)42L, (long)((Long)state.value()));
        state.update((Object)17L);
        Assert.assertEquals((long)17L, (long)((Long)state.value()));
        state.update(null);
        Assert.assertEquals((long)42L, (long)((Long)state.value()));
        KeyedStateHandle snapshot1 = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        backend.dispose();
        backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot1);
        snapshot1.discardState();
        backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.dispose();
    }

    @Test
    public void testListState() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        ListStateDescriptor kvId = new ListStateDescriptor("id", String.class);
        IntSerializer keySerializer = IntSerializer.INSTANCE;
        VoidNamespaceSerializer namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
        ListState state = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        InternalKvState kvState = (InternalKvState)state;
        TypeSerializer valueSerializer = kvId.getElementSerializer();
        Joiner joiner = Joiner.on((String)",");
        backend.setCurrentKey((Object)1);
        Assert.assertNull((Object)state.get());
        Assert.assertNull(StateBackendTestBase.getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        state.add((Object)"1");
        backend.setCurrentKey((Object)2);
        Assert.assertNull((Object)state.get());
        Assert.assertNull(StateBackendTestBase.getSerializedList(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        state.update(Arrays.asList("2"));
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"1", (Object)joiner.join((Iterable)state.get()));
        Assert.assertEquals((Object)"1", (Object)joiner.join(StateBackendTestBase.getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
        KeyedStateHandle snapshot1 = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        backend.setCurrentKey((Object)1);
        state.add((Object)"u1");
        backend.setCurrentKey((Object)2);
        state.add((Object)"u2");
        backend.setCurrentKey((Object)3);
        state.add((Object)"u3");
        KeyedStateHandle snapshot2 = this.runSnapshot(backend.snapshot(682375462379L, 4L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"1,u1", (Object)joiner.join((Iterable)state.get()));
        Assert.assertEquals((Object)"1,u1", (Object)joiner.join(StateBackendTestBase.getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
        backend.setCurrentKey((Object)2);
        Assert.assertEquals((Object)"2,u2", (Object)joiner.join((Iterable)state.get()));
        Assert.assertEquals((Object)"2,u2", (Object)joiner.join(StateBackendTestBase.getSerializedList(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
        backend.setCurrentKey((Object)3);
        Assert.assertEquals((Object)"u3", (Object)joiner.join((Iterable)state.get()));
        Assert.assertEquals((Object)"u3", (Object)joiner.join(StateBackendTestBase.getSerializedList(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
        backend.dispose();
        backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot1);
        snapshot1.discardState();
        ListState restored1 = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        InternalKvState restoredKvState1 = (InternalKvState)restored1;
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"1", (Object)joiner.join((Iterable)restored1.get()));
        Assert.assertEquals((Object)"1", (Object)joiner.join(StateBackendTestBase.getSerializedList(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
        backend.setCurrentKey((Object)2);
        Assert.assertEquals((Object)"2", (Object)joiner.join((Iterable)restored1.get()));
        Assert.assertEquals((Object)"2", (Object)joiner.join(StateBackendTestBase.getSerializedList(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
        backend.dispose();
        backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot2);
        snapshot2.discardState();
        ListState restored2 = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        InternalKvState restoredKvState2 = (InternalKvState)restored2;
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"1,u1", (Object)joiner.join((Iterable)restored2.get()));
        Assert.assertEquals((Object)"1,u1", (Object)joiner.join(StateBackendTestBase.getSerializedList(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
        backend.setCurrentKey((Object)2);
        Assert.assertEquals((Object)"2,u2", (Object)joiner.join((Iterable)restored2.get()));
        Assert.assertEquals((Object)"2,u2", (Object)joiner.join(StateBackendTestBase.getSerializedList(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
        backend.setCurrentKey((Object)3);
        Assert.assertEquals((Object)"u3", (Object)joiner.join((Iterable)restored2.get()));
        Assert.assertEquals((Object)"u3", (Object)joiner.join(StateBackendTestBase.getSerializedList(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
        backend.dispose();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testListStateAddNull() throws Exception {
        AbstractKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        try {
            ListState state = (ListState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            Assert.assertNull((Object)state.get());
            this.expectedException.expect(NullPointerException.class);
            state.add(null);
        }
        finally {
            keyedBackend.close();
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testListStateAddAllNullEntries() throws Exception {
        AbstractKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        try {
            ListState state = (ListState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            Assert.assertNull((Object)state.get());
            this.expectedException.expect(NullPointerException.class);
            ArrayList<Long> adding = new ArrayList<Long>();
            adding.add(3L);
            adding.add(null);
            adding.add(5L);
            state.addAll(adding);
        }
        finally {
            keyedBackend.close();
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testListStateAddAllNull() throws Exception {
        AbstractKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        try {
            ListState state = (ListState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            Assert.assertNull((Object)state.get());
            this.expectedException.expect(NullPointerException.class);
            state.addAll(null);
        }
        finally {
            keyedBackend.close();
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testListStateUpdateNullEntries() throws Exception {
        AbstractKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        try {
            ListState state = (ListState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            Assert.assertNull((Object)state.get());
            this.expectedException.expect(NullPointerException.class);
            ArrayList<Long> adding = new ArrayList<Long>();
            adding.add(3L);
            adding.add(null);
            adding.add(5L);
            state.update(adding);
        }
        finally {
            keyedBackend.close();
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testListStateUpdateNull() throws Exception {
        AbstractKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        try {
            ListState state = (ListState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            Assert.assertNull((Object)state.get());
            this.expectedException.expect(NullPointerException.class);
            state.update(null);
        }
        finally {
            keyedBackend.close();
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testListStateAPIs() throws Exception {
        AbstractKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        try {
            ListState state = (ListState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertNull((Object)state.get());
            state.add((Object)17L);
            state.add((Object)11L);
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{17L, 11L}));
            state.update(Collections.emptyList());
            Assert.assertNull((Object)state.get());
            state.update(Arrays.asList(10L, 16L));
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{16L, 10L}));
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{16L, 10L}));
            keyedBackend.setCurrentKey((Object)"abc");
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"g");
            Assert.assertNull((Object)state.get());
            Assert.assertNull((Object)state.get());
            state.addAll(Collections.emptyList());
            Assert.assertNull((Object)state.get());
            state.addAll(Arrays.asList(3L, 4L));
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{3L, 4L}));
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{3L, 4L}));
            state.addAll(new ArrayList());
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{3L, 4L}));
            state.addAll(Arrays.asList(5L, 6L));
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{3L, 4L, 5L, 6L}));
            state.addAll(new ArrayList());
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{3L, 4L, 5L, 6L}));
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{3L, 4L, 5L, 6L}));
            state.update(Arrays.asList(1L, 2L));
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{1L, 2L}));
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{10L, 16L}));
            state.clear();
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"g");
            state.add((Object)3L);
            state.add((Object)2L);
            state.add((Object)1L);
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"g");
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{1L, 2L, 3L, 2L, 1L}));
            state.update(Arrays.asList(5L, 6L));
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{5L, 6L}));
            state.clear();
            Assert.assertThat((String)"State backend is not empty.", (Object)keyedBackend.numKeyValueStateEntries(), (Matcher)Is.is((Object)0));
        }
        finally {
            keyedBackend.close();
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testListStateMerging() throws Exception {
        AbstractKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        try {
            InternalListState state = (InternalListState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)33L);
            state.add((Object)55L);
            state.setCurrentNamespace((Object)namespace2);
            state.add((Object)22L);
            state.add((Object)11L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)44L);
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)44L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)22L);
            state.add((Object)55L);
            state.add((Object)33L);
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"abc");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{11L, 22L, 33L, 44L, 55L}));
            keyedBackend.setCurrentKey((Object)"def");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{11L, 22L, 33L, 44L, 55L}));
            keyedBackend.setCurrentKey((Object)"ghi");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"jkl");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{11L, 22L, 33L, 44L, 55L}));
            keyedBackend.setCurrentKey((Object)"mno");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{11L, 22L, 33L, 44L, 55L}));
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"ghi");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assert.assertThat((String)"State backend is not empty.", (Object)keyedBackend.numKeyValueStateEntries(), (Matcher)Is.is((Object)0));
        }
        finally {
            keyedBackend.close();
            keyedBackend.dispose();
        }
    }

    @Test
    public void testReducingState() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        ReducingStateDescriptor kvId = new ReducingStateDescriptor("id", (ReduceFunction)new AppendingReduce(), String.class);
        IntSerializer keySerializer = IntSerializer.INSTANCE;
        VoidNamespaceSerializer namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
        ReducingState state = (ReducingState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        InternalKvState kvState = (InternalKvState)state;
        TypeSerializer valueSerializer = kvId.getSerializer();
        backend.setCurrentKey((Object)1);
        Assert.assertNull((Object)state.get());
        Assert.assertNull(StateBackendTestBase.getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        state.add((Object)"1");
        backend.setCurrentKey((Object)2);
        Assert.assertNull((Object)state.get());
        Assert.assertNull(StateBackendTestBase.getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        state.add((Object)"2");
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"1", (Object)state.get());
        Assert.assertEquals((Object)"1", StateBackendTestBase.getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        KeyedStateHandle snapshot1 = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        backend.setCurrentKey((Object)1);
        state.add((Object)"u1");
        backend.setCurrentKey((Object)2);
        state.add((Object)"u2");
        backend.setCurrentKey((Object)3);
        state.add((Object)"u3");
        KeyedStateHandle snapshot2 = this.runSnapshot(backend.snapshot(682375462379L, 4L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"1,u1", (Object)state.get());
        Assert.assertEquals((Object)"1,u1", StateBackendTestBase.getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.setCurrentKey((Object)2);
        Assert.assertEquals((Object)"2,u2", (Object)state.get());
        Assert.assertEquals((Object)"2,u2", StateBackendTestBase.getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.setCurrentKey((Object)3);
        Assert.assertEquals((Object)"u3", (Object)state.get());
        Assert.assertEquals((Object)"u3", StateBackendTestBase.getSerializedValue(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.dispose();
        backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot1);
        snapshot1.discardState();
        ReducingState restored1 = (ReducingState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        InternalKvState restoredKvState1 = (InternalKvState)restored1;
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"1", (Object)restored1.get());
        Assert.assertEquals((Object)"1", StateBackendTestBase.getSerializedValue(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.setCurrentKey((Object)2);
        Assert.assertEquals((Object)"2", (Object)restored1.get());
        Assert.assertEquals((Object)"2", StateBackendTestBase.getSerializedValue(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.dispose();
        backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot2);
        snapshot2.discardState();
        ReducingState restored2 = (ReducingState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        InternalKvState restoredKvState2 = (InternalKvState)restored2;
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"1,u1", (Object)restored2.get());
        Assert.assertEquals((Object)"1,u1", StateBackendTestBase.getSerializedValue(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.setCurrentKey((Object)2);
        Assert.assertEquals((Object)"2,u2", (Object)restored2.get());
        Assert.assertEquals((Object)"2,u2", StateBackendTestBase.getSerializedValue(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.setCurrentKey((Object)3);
        Assert.assertEquals((Object)"u3", (Object)restored2.get());
        Assert.assertEquals((Object)"u3", StateBackendTestBase.getSerializedValue(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.dispose();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReducingStateAddAndGet() throws Exception {
        ReducingStateDescriptor stateDescr = new ReducingStateDescriptor("my-state", (ReduceFunction & Serializable)(a, b) -> a + b, Long.class);
        AbstractKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            ReducingState state = (ReducingState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertNull((Object)state.get());
            state.add((Object)17L);
            state.add((Object)11L);
            Assert.assertEquals((long)28L, (long)((Long)state.get()));
            keyedBackend.setCurrentKey((Object)"abc");
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"g");
            Assert.assertNull((Object)state.get());
            state.add((Object)1L);
            state.add((Object)2L);
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertEquals((long)28L, (long)((Long)state.get()));
            state.clear();
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"g");
            state.add((Object)3L);
            state.add((Object)2L);
            state.add((Object)1L);
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"g");
            Assert.assertEquals((long)9L, (long)((Long)state.get()));
            state.clear();
            Assert.assertThat((String)"State backend is not empty.", (Object)keyedBackend.numKeyValueStateEntries(), (Matcher)Is.is((Object)0));
        }
        finally {
            keyedBackend.close();
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReducingStateMerging() throws Exception {
        ReducingStateDescriptor stateDescr = new ReducingStateDescriptor("my-state", (ReduceFunction & Serializable)(a, b) -> a + b, Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        Long expectedResult = 165L;
        AbstractKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            InternalReducingState state = (InternalReducingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)33L);
            state.add((Object)55L);
            state.setCurrentNamespace((Object)namespace2);
            state.add((Object)22L);
            state.add((Object)11L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)44L);
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)44L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)22L);
            state.add((Object)55L);
            state.add((Object)33L);
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"abc");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"def");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"ghi");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"jkl");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"mno");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"ghi");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assert.assertThat((String)"State backend is not empty.", (Object)keyedBackend.numKeyValueStateEntries(), (Matcher)Is.is((Object)0));
        }
        finally {
            keyedBackend.close();
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAggregatingStateAddAndGetWithMutableAccumulator() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new MutableAggregatingAddingFunction(), MutableLong.class);
        AbstractKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            AggregatingState state = (AggregatingState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertNull((Object)state.get());
            state.add((Object)17L);
            state.add((Object)11L);
            Assert.assertEquals((long)28L, (long)((Long)state.get()));
            keyedBackend.setCurrentKey((Object)"abc");
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"g");
            Assert.assertNull((Object)state.get());
            state.add((Object)1L);
            state.add((Object)2L);
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertEquals((long)28L, (long)((Long)state.get()));
            state.clear();
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"g");
            state.add((Object)3L);
            state.add((Object)2L);
            state.add((Object)1L);
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"g");
            Assert.assertEquals((long)9L, (long)((Long)state.get()));
            state.clear();
            Assert.assertThat((String)"State backend is not empty.", (Object)keyedBackend.numKeyValueStateEntries(), (Matcher)Is.is((Object)0));
        }
        finally {
            keyedBackend.close();
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAggregatingStateMergingWithMutableAccumulator() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new MutableAggregatingAddingFunction(), MutableLong.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        Long expectedResult = 165L;
        AbstractKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            InternalAggregatingState state = (InternalAggregatingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)33L);
            state.add((Object)55L);
            state.setCurrentNamespace((Object)namespace2);
            state.add((Object)22L);
            state.add((Object)11L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)44L);
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)44L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)22L);
            state.add((Object)55L);
            state.add((Object)33L);
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"abc");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"def");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"ghi");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"jkl");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"mno");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"ghi");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assert.assertThat((String)"State backend is not empty.", (Object)keyedBackend.numKeyValueStateEntries(), (Matcher)Is.is((Object)0));
        }
        finally {
            keyedBackend.close();
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAggregatingStateAddAndGetWithImmutableAccumulator() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new ImmutableAggregatingAddingFunction(), Long.class);
        AbstractKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            AggregatingState state = (AggregatingState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertNull((Object)state.get());
            state.add((Object)17L);
            state.add((Object)11L);
            Assert.assertEquals((long)28L, (long)((Long)state.get()));
            keyedBackend.setCurrentKey((Object)"abc");
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"g");
            Assert.assertNull((Object)state.get());
            state.add((Object)1L);
            state.add((Object)2L);
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertEquals((long)28L, (long)((Long)state.get()));
            state.clear();
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"g");
            state.add((Object)3L);
            state.add((Object)2L);
            state.add((Object)1L);
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"g");
            Assert.assertEquals((long)9L, (long)((Long)state.get()));
            state.clear();
            Assert.assertThat((String)"State backend is not empty.", (Object)keyedBackend.numKeyValueStateEntries(), (Matcher)Is.is((Object)0));
        }
        finally {
            keyedBackend.close();
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAggregatingStateMergingWithImmutableAccumulator() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new ImmutableAggregatingAddingFunction(), Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        Long expectedResult = 165L;
        AbstractKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            InternalAggregatingState state = (InternalAggregatingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)33L);
            state.add((Object)55L);
            state.setCurrentNamespace((Object)namespace2);
            state.add((Object)22L);
            state.add((Object)11L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)44L);
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)44L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)22L);
            state.add((Object)55L);
            state.add((Object)33L);
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"abc");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"def");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"ghi");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"jkl");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"mno");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"ghi");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assert.assertThat((String)"State backend is not empty.", (Object)keyedBackend.numKeyValueStateEntries(), (Matcher)Is.is((Object)0));
        }
        finally {
            keyedBackend.close();
            keyedBackend.dispose();
        }
    }

    @Test
    public void testFoldingState() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        FoldingStateDescriptor kvId = new FoldingStateDescriptor("id", (Object)"Fold-Initial:", (FoldFunction)new AppendingFold(), String.class);
        IntSerializer keySerializer = IntSerializer.INSTANCE;
        VoidNamespaceSerializer namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
        FoldingState state = (FoldingState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        InternalKvState kvState = (InternalKvState)state;
        TypeSerializer valueSerializer = kvId.getSerializer();
        backend.setCurrentKey((Object)1);
        Assert.assertNull((Object)state.get());
        Assert.assertNull(StateBackendTestBase.getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        state.add((Object)1);
        backend.setCurrentKey((Object)2);
        Assert.assertNull((Object)state.get());
        Assert.assertNull(StateBackendTestBase.getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        state.add((Object)2);
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"Fold-Initial:,1", (Object)state.get());
        Assert.assertEquals((Object)"Fold-Initial:,1", StateBackendTestBase.getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        KeyedStateHandle snapshot1 = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        backend.setCurrentKey((Object)1);
        state.clear();
        state.add((Object)101);
        backend.setCurrentKey((Object)2);
        state.add((Object)102);
        backend.setCurrentKey((Object)3);
        state.add((Object)103);
        KeyedStateHandle snapshot2 = this.runSnapshot(backend.snapshot(682375462379L, 4L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"Fold-Initial:,101", (Object)state.get());
        Assert.assertEquals((Object)"Fold-Initial:,101", StateBackendTestBase.getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.setCurrentKey((Object)2);
        Assert.assertEquals((Object)"Fold-Initial:,2,102", (Object)state.get());
        Assert.assertEquals((Object)"Fold-Initial:,2,102", StateBackendTestBase.getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.setCurrentKey((Object)3);
        Assert.assertEquals((Object)"Fold-Initial:,103", (Object)state.get());
        Assert.assertEquals((Object)"Fold-Initial:,103", StateBackendTestBase.getSerializedValue(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.dispose();
        backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot1);
        snapshot1.discardState();
        FoldingState restored1 = (FoldingState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        InternalKvState restoredKvState1 = (InternalKvState)restored1;
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"Fold-Initial:,1", (Object)restored1.get());
        Assert.assertEquals((Object)"Fold-Initial:,1", StateBackendTestBase.getSerializedValue(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.setCurrentKey((Object)2);
        Assert.assertEquals((Object)"Fold-Initial:,2", (Object)restored1.get());
        Assert.assertEquals((Object)"Fold-Initial:,2", StateBackendTestBase.getSerializedValue(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.dispose();
        backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot2);
        snapshot1.discardState();
        FoldingState restored2 = (FoldingState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        InternalKvState restoredKvState2 = (InternalKvState)restored2;
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"Fold-Initial:,101", (Object)restored2.get());
        Assert.assertEquals((Object)"Fold-Initial:,101", StateBackendTestBase.getSerializedValue(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.setCurrentKey((Object)2);
        Assert.assertEquals((Object)"Fold-Initial:,2,102", (Object)restored2.get());
        Assert.assertEquals((Object)"Fold-Initial:,2,102", StateBackendTestBase.getSerializedValue(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.setCurrentKey((Object)3);
        Assert.assertEquals((Object)"Fold-Initial:,103", (Object)restored2.get());
        Assert.assertEquals((Object)"Fold-Initial:,103", StateBackendTestBase.getSerializedValue(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.dispose();
    }

    @Test
    public void testMapState() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        MapStateDescriptor kvId = new MapStateDescriptor("id", Integer.class, String.class);
        StringSerializer keySerializer = StringSerializer.INSTANCE;
        VoidNamespaceSerializer namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
        MapState state = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        InternalKvState kvState = (InternalKvState)state;
        TypeSerializer userKeySerializer = kvId.getKeySerializer();
        TypeSerializer userValueSerializer = kvId.getValueSerializer();
        backend.setCurrentKey((Object)"1");
        Assert.assertNull((Object)state.get((Object)1));
        Assert.assertNull(StateBackendTestBase.getSerializedMap(kvState, "1", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
        state.put((Object)1, (Object)"1");
        backend.setCurrentKey((Object)"2");
        Assert.assertNull((Object)state.get((Object)2));
        Assert.assertNull(StateBackendTestBase.getSerializedMap(kvState, "2", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
        state.put((Object)2, (Object)"2");
        backend.setCurrentKey((Object)"11");
        state.put((Object)11, (Object)"11");
        backend.setCurrentKey((Object)"1");
        Assert.assertTrue((boolean)state.contains((Object)1));
        Assert.assertEquals((Object)"1", (Object)state.get((Object)1));
        Assert.assertEquals((Object)new HashMap<Integer, String>(){
            {
                this.put(1, "1");
            }
        }, StateBackendTestBase.getSerializedMap(kvState, "1", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
        Assert.assertEquals((Object)new HashMap<Integer, String>(){
            {
                this.put(11, "11");
            }
        }, StateBackendTestBase.getSerializedMap(kvState, "11", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
        KeyedStateHandle snapshot1 = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        backend.setCurrentKey((Object)"1");
        state.put((Object)1, (Object)"101");
        backend.setCurrentKey((Object)"2");
        state.put((Object)102, (Object)"102");
        backend.setCurrentKey((Object)"3");
        state.put((Object)103, (Object)"103");
        state.putAll((Map)new HashMap<Integer, String>(){
            {
                this.put(1031, "1031");
                this.put(1032, "1032");
            }
        });
        KeyedStateHandle snapshot2 = this.runSnapshot(backend.snapshot(682375462379L, 4L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        backend.setCurrentKey((Object)"1");
        Assert.assertEquals((Object)"101", (Object)state.get((Object)1));
        Assert.assertEquals((Object)new HashMap<Integer, String>(){
            {
                this.put(1, "101");
            }
        }, StateBackendTestBase.getSerializedMap(kvState, "1", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
        backend.setCurrentKey((Object)"2");
        Assert.assertEquals((Object)"102", (Object)state.get((Object)102));
        Assert.assertEquals((Object)new HashMap<Integer, String>(){
            {
                this.put(2, "2");
                this.put(102, "102");
            }
        }, StateBackendTestBase.getSerializedMap(kvState, "2", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
        backend.setCurrentKey((Object)"3");
        Assert.assertTrue((boolean)state.contains((Object)103));
        Assert.assertEquals((Object)"103", (Object)state.get((Object)103));
        Assert.assertEquals((Object)new HashMap<Integer, String>(){
            {
                this.put(103, "103");
                this.put(1031, "1031");
                this.put(1032, "1032");
            }
        }, StateBackendTestBase.getSerializedMap(kvState, "3", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
        ArrayList<Integer> keys = new ArrayList<Integer>();
        for (Integer key : state.keys()) {
            keys.add(key);
        }
        List<Integer> expectedKeys = Arrays.asList(103, 1031, 1032);
        Assert.assertEquals((long)keys.size(), (long)expectedKeys.size());
        keys.removeAll(expectedKeys);
        Assert.assertTrue((boolean)keys.isEmpty());
        ArrayList<String> values = new ArrayList<String>();
        for (String value : state.values()) {
            values.add(value);
        }
        List<String> expectedValues = Arrays.asList("103", "1031", "1032");
        Assert.assertEquals((long)values.size(), (long)expectedValues.size());
        values.removeAll(expectedValues);
        Assert.assertTrue((boolean)values.isEmpty());
        backend.setCurrentKey((Object)"1");
        state.clear();
        backend.setCurrentKey((Object)"2");
        state.remove((Object)102);
        backend.setCurrentKey((Object)"3");
        String updateSuffix = "_updated";
        Iterator iterator = state.iterator();
        while (iterator.hasNext()) {
            Map.Entry entry = (Map.Entry)iterator.next();
            if (((String)entry.getValue()).length() != 4) {
                iterator.remove();
                continue;
            }
            entry.setValue((String)entry.getValue() + "_updated");
        }
        backend.setCurrentKey((Object)"1");
        backend.setCurrentKey((Object)"2");
        Assert.assertFalse((boolean)state.contains((Object)102));
        backend.setCurrentKey((Object)"3");
        for (Map.Entry entry : state.entries()) {
            Assert.assertEquals((long)(4 + "_updated".length()), (long)((String)entry.getValue()).length());
            Assert.assertTrue((boolean)((String)entry.getValue()).endsWith("_updated"));
        }
        backend.dispose();
        backend = this.restoreKeyedBackend((TypeSerializer)StringSerializer.INSTANCE, snapshot1);
        snapshot1.discardState();
        MapState restored1 = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        InternalKvState restoredKvState1 = (InternalKvState)restored1;
        backend.setCurrentKey((Object)"1");
        Assert.assertEquals((Object)"1", (Object)restored1.get((Object)1));
        Assert.assertEquals((Object)new HashMap<Integer, String>(){
            {
                this.put(1, "1");
            }
        }, StateBackendTestBase.getSerializedMap(restoredKvState1, "1", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
        backend.setCurrentKey((Object)"2");
        Assert.assertEquals((Object)"2", (Object)restored1.get((Object)2));
        Assert.assertEquals((Object)new HashMap<Integer, String>(){
            {
                this.put(2, "2");
            }
        }, StateBackendTestBase.getSerializedMap(restoredKvState1, "2", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
        backend.dispose();
        backend = this.restoreKeyedBackend((TypeSerializer)StringSerializer.INSTANCE, snapshot2);
        snapshot2.discardState();
        MapState restored2 = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        InternalKvState restoredKvState2 = (InternalKvState)restored2;
        backend.setCurrentKey((Object)"1");
        Assert.assertEquals((Object)"101", (Object)restored2.get((Object)1));
        Assert.assertEquals((Object)new HashMap<Integer, String>(){
            {
                this.put(1, "101");
            }
        }, StateBackendTestBase.getSerializedMap(restoredKvState2, "1", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
        backend.setCurrentKey((Object)"2");
        Assert.assertEquals((Object)"102", (Object)restored2.get((Object)102));
        Assert.assertEquals((Object)new HashMap<Integer, String>(){
            {
                this.put(2, "2");
                this.put(102, "102");
            }
        }, StateBackendTestBase.getSerializedMap(restoredKvState2, "2", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
        backend.setCurrentKey((Object)"3");
        Assert.assertEquals((Object)"103", (Object)restored2.get((Object)103));
        Assert.assertEquals((Object)new HashMap<Integer, String>(){
            {
                this.put(103, "103");
                this.put(1031, "1031");
                this.put(1032, "1032");
            }
        }, StateBackendTestBase.getSerializedMap(restoredKvState2, "3", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
        backend.dispose();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMapStateIteratorArbitraryAccess() throws Exception {
        MapStateDescriptor kvId = new MapStateDescriptor("id", Integer.class, Long.class);
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            MapState state = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            int stateSize = 4096;
            for (int i = 0; i < stateSize; ++i) {
                state.put((Object)i, (Object)((long)i * 2L));
            }
            Iterator iterator = state.iterator();
            int iteratorCount = 0;
            while (iterator.hasNext()) {
                Map.Entry entry = (Map.Entry)iterator.next();
                Assert.assertEquals((long)iteratorCount, (long)((Integer)entry.getKey()).intValue());
                switch (ThreadLocalRandom.current().nextInt() % 3) {
                    case 0: {
                        iterator.remove();
                        try {
                            iterator.remove();
                            Assert.fail();
                        }
                        catch (IllegalStateException illegalStateException) {}
                        break;
                    }
                    case 1: {
                        iterator.hasNext();
                        iterator.remove();
                        break;
                    }
                }
                ++iteratorCount;
            }
            Assert.assertEquals((long)stateSize, (long)iteratorCount);
        }
        finally {
            backend.dispose();
        }
    }

    @Test
    public void testValueStateNullAsDefaultValue() throws Exception {
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class, null);
        ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.setCurrentKey((Object)1);
        Assert.assertNull((Object)state.value());
        state.update((Object)"Ciao");
        Assert.assertEquals((Object)"Ciao", (Object)state.value());
        state.clear();
        Assert.assertNull((Object)state.value());
        backend.dispose();
    }

    @Test
    public void testValueStateDefaultValue() throws Exception {
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class, (Object)"Hello");
        ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"Hello", (Object)state.value());
        state.update((Object)"Ciao");
        Assert.assertEquals((Object)"Ciao", (Object)state.value());
        state.clear();
        Assert.assertEquals((Object)"Hello", (Object)state.value());
        backend.dispose();
    }

    @Test
    public void testReducingStateDefaultValue() throws Exception {
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        ReducingStateDescriptor kvId = new ReducingStateDescriptor("id", (ReduceFunction)new AppendingReduce(), String.class);
        ReducingState state = (ReducingState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.setCurrentKey((Object)1);
        Assert.assertNull((Object)state.get());
        state.add((Object)"Ciao");
        Assert.assertEquals((Object)"Ciao", (Object)state.get());
        state.clear();
        Assert.assertNull((Object)state.get());
        backend.dispose();
    }

    @Test
    public void testFoldingStateDefaultValue() throws Exception {
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        FoldingStateDescriptor kvId = new FoldingStateDescriptor("id", (Object)"Fold-Initial:", (FoldFunction)new AppendingFold(), String.class);
        FoldingState state = (FoldingState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.setCurrentKey((Object)1);
        Assert.assertNull((Object)state.get());
        state.add((Object)1);
        state.add((Object)2);
        Assert.assertEquals((Object)"Fold-Initial:,1,2", (Object)state.get());
        state.clear();
        Assert.assertNull((Object)state.get());
        backend.dispose();
    }

    @Test
    public void testListStateDefaultValue() throws Exception {
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        ListStateDescriptor kvId = new ListStateDescriptor("id", String.class);
        ListState state = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.setCurrentKey((Object)1);
        Assert.assertNull((Object)state.get());
        state.update(Arrays.asList("Ciao", "Bello"));
        Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new String[]{"Ciao", "Bello"}));
        state.clear();
        Assert.assertNull((Object)state.get());
        backend.dispose();
    }

    @Test
    public void testMapStateDefaultValue() throws Exception {
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        MapStateDescriptor kvId = new MapStateDescriptor("id", String.class, String.class);
        MapState state = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.setCurrentKey((Object)1);
        Assert.assertNull((Object)state.entries());
        state.put((Object)"Ciao", (Object)"Hello");
        state.put((Object)"Bello", (Object)"Nice");
        Assert.assertNotNull((Object)state.entries());
        Assert.assertEquals((Object)state.get((Object)"Ciao"), (Object)"Hello");
        Assert.assertEquals((Object)state.get((Object)"Bello"), (Object)"Nice");
        state.clear();
        Assert.assertNull((Object)state.entries());
        backend.dispose();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSnapshotNonAccessedState() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        String stateName = "test-name";
        try {
            MapStateDescriptor kvId = new MapStateDescriptor("test-name", Integer.class, String.class);
            MapState mapState = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)"1");
            mapState.put((Object)11, (Object)"foo");
            backend.setCurrentKey((Object)"2");
            mapState.put((Object)8, (Object)"bar");
            backend.setCurrentKey((Object)"3");
            mapState.put((Object)91, (Object)"hello world");
            KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)StringSerializer.INSTANCE, snapshot);
            snapshot = this.runSnapshot(backend.snapshot(2L, 3L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)StringSerializer.INSTANCE, snapshot);
            mapState = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)"1");
            Assert.assertEquals((Object)"foo", (Object)mapState.get((Object)11));
            backend.setCurrentKey((Object)"2");
            Assert.assertEquals((Object)"bar", (Object)mapState.get((Object)8));
            backend.setCurrentKey((Object)"3");
            Assert.assertEquals((Object)"hello world", (Object)mapState.get((Object)91));
            snapshot.discardState();
        }
        finally {
            backend.dispose();
        }
    }

    @Test
    public void testKeyGroupSnapshotRestore() throws Exception {
        int MAX_PARALLELISM = 10;
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, 10, new KeyGroupRange(0, 9), new DummyEnvironment());
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class);
        ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        int keyInFirstHalf = 17;
        int keyInSecondHalf = 42;
        Random rand = new Random(0L);
        int firstKeyHalf = KeyGroupRangeAssignment.assignKeyToParallelOperator((Object)keyInFirstHalf, (int)10, (int)2);
        int secondKeyHalf = KeyGroupRangeAssignment.assignKeyToParallelOperator((Object)keyInFirstHalf, (int)10, (int)2);
        while (firstKeyHalf == secondKeyHalf) {
            keyInSecondHalf = rand.nextInt();
            secondKeyHalf = KeyGroupRangeAssignment.assignKeyToParallelOperator((Object)keyInSecondHalf, (int)10, (int)2);
        }
        backend.setCurrentKey((Object)keyInFirstHalf);
        state.update((Object)"ShouldBeInFirstHalf");
        backend.setCurrentKey((Object)keyInSecondHalf);
        state.update((Object)"ShouldBeInSecondHalf");
        KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        List firstHalfKeyGroupStates = StateAssignmentOperation.getKeyedStateHandles(Collections.singletonList(snapshot), (KeyGroupRange)KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex((int)10, (int)2, (int)0));
        List secondHalfKeyGroupStates = StateAssignmentOperation.getKeyedStateHandles(Collections.singletonList(snapshot), (KeyGroupRange)KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex((int)10, (int)2, (int)1));
        backend.dispose();
        AbstractKeyedStateBackend firstHalfBackend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, 10, new KeyGroupRange(0, 4), firstHalfKeyGroupStates, new DummyEnvironment());
        AbstractKeyedStateBackend secondHalfBackend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, 10, new KeyGroupRange(5, 9), secondHalfKeyGroupStates, new DummyEnvironment());
        ValueState firstHalfState = (ValueState)firstHalfBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        firstHalfBackend.setCurrentKey((Object)keyInFirstHalf);
        Assert.assertTrue((boolean)((String)firstHalfState.value()).equals("ShouldBeInFirstHalf"));
        firstHalfBackend.setCurrentKey((Object)keyInSecondHalf);
        Assert.assertTrue((firstHalfState.value() == null ? 1 : 0) != 0);
        ValueState secondHalfState = (ValueState)secondHalfBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        secondHalfBackend.setCurrentKey((Object)keyInFirstHalf);
        Assert.assertTrue((secondHalfState.value() == null ? 1 : 0) != 0);
        secondHalfBackend.setCurrentKey((Object)keyInSecondHalf);
        Assert.assertTrue((boolean)((String)secondHalfState.value()).equals("ShouldBeInSecondHalf"));
        firstHalfBackend.dispose();
        secondHalfBackend.dispose();
    }

    @Test
    public void testRestoreWithWrongKeySerializer() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class);
        ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.setCurrentKey((Object)1);
        state.update((Object)"1");
        backend.setCurrentKey((Object)2);
        state.update((Object)"2");
        KeyedStateHandle snapshot1 = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        backend.dispose();
        try {
            this.restoreKeyedBackend((TypeSerializer)DoubleSerializer.INSTANCE, snapshot1);
            Assert.fail((String)"should recognize wrong key serializer");
        }
        catch (StateMigrationException stateMigrationException) {
            // empty catch block
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testValueStateRestoreWithWrongSerializers() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class);
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.update((Object)"1");
            backend.setCurrentKey((Object)2);
            state.update((Object)"2");
            KeyedStateHandle snapshot1 = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot1);
            snapshot1.discardState();
            FloatSerializer fakeStringSerializer = FloatSerializer.INSTANCE;
            try {
                kvId = new ValueStateDescriptor("id", (TypeSerializer)fakeStringSerializer);
                state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
                state.value();
                Assert.fail((String)"should recognize wrong serializers");
            }
            catch (StateMigrationException stateMigrationException) {
                // empty catch block
            }
        }
        finally {
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testListStateRestoreWithWrongSerializers() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ListStateDescriptor kvId = new ListStateDescriptor("id", String.class);
            ListState state = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.add((Object)"1");
            backend.setCurrentKey((Object)2);
            state.add((Object)"2");
            KeyedStateHandle snapshot1 = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot1);
            snapshot1.discardState();
            FloatSerializer fakeStringSerializer = FloatSerializer.INSTANCE;
            try {
                kvId = new ListStateDescriptor("id", (TypeSerializer)fakeStringSerializer);
                state = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
                state.get();
                Assert.fail((String)"should recognize wrong serializers");
            }
            catch (StateMigrationException stateMigrationException) {
                // empty catch block
            }
        }
        finally {
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReducingStateRestoreWithWrongSerializers() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ReducingStateDescriptor kvId = new ReducingStateDescriptor("id", (ReduceFunction)new AppendingReduce(), (TypeSerializer)StringSerializer.INSTANCE);
            ReducingState state = (ReducingState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.add((Object)"1");
            backend.setCurrentKey((Object)2);
            state.add((Object)"2");
            KeyedStateHandle snapshot1 = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot1);
            snapshot1.discardState();
            FloatSerializer fakeStringSerializer = FloatSerializer.INSTANCE;
            try {
                kvId = new ReducingStateDescriptor("id", (ReduceFunction)new AppendingReduce(), (TypeSerializer)fakeStringSerializer);
                state = (ReducingState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
                state.get();
                Assert.fail((String)"should recognize wrong serializers");
            }
            catch (StateMigrationException stateMigrationException) {
                // empty catch block
            }
        }
        finally {
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMapStateRestoreWithWrongSerializers() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            MapStateDescriptor kvId = new MapStateDescriptor("id", (TypeSerializer)StringSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE);
            MapState state = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.put((Object)"1", (Object)"First");
            backend.setCurrentKey((Object)2);
            state.put((Object)"2", (Object)"Second");
            KeyedStateHandle snapshot1 = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot1);
            snapshot1.discardState();
            FloatSerializer fakeStringSerializer = FloatSerializer.INSTANCE;
            try {
                kvId = new MapStateDescriptor("id", (TypeSerializer)fakeStringSerializer, (TypeSerializer)StringSerializer.INSTANCE);
                state = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
                state.entries();
                Assert.fail((String)"should recognize wrong serializers");
            }
            catch (StateMigrationException stateMigrationException) {
                // empty catch block
            }
            backend.dispose();
        }
        finally {
            backend.dispose();
        }
    }

    @Test
    public void testCopyDefaultValue() throws Exception {
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", IntValue.class, (Object)new IntValue(-1));
        ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.setCurrentKey((Object)1);
        IntValue default1 = (IntValue)state.value();
        backend.setCurrentKey((Object)2);
        IntValue default2 = (IntValue)state.value();
        Assert.assertNotNull((Object)default1);
        Assert.assertNotNull((Object)default2);
        Assert.assertEquals((Object)default1, (Object)default2);
        Assert.assertFalse((default1 == default2 ? 1 : 0) != 0);
        backend.dispose();
    }

    @Test
    public void testRequireNonNullNamespace() throws Exception {
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", IntValue.class, (Object)new IntValue(-1));
        try {
            backend.getPartitionedState(null, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            Assert.fail((String)"Did not throw expected NullPointerException");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
        try {
            backend.getPartitionedState((Object)VoidNamespace.INSTANCE, null, (StateDescriptor)kvId);
            Assert.fail((String)"Did not throw expected NullPointerException");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
        try {
            backend.getPartitionedState(null, null, (StateDescriptor)kvId);
            Assert.fail((String)"Did not throw expected NullPointerException");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
        backend.dispose();
    }

    protected void testConcurrentMapIfQueryable() throws Exception {
        boolean numberOfKeyGroups = true;
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), new DummyEnvironment());
        ValueStateDescriptor desc = new ValueStateDescriptor("value-state", Integer.class, (Object)-1);
        desc.setQueryable("my-query");
        ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
        InternalKvState kvState = (InternalKvState)state;
        Assert.assertTrue((boolean)(kvState instanceof AbstractHeapState));
        kvState.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
        backend.setCurrentKey((Object)1);
        state.update((Object)121818273);
        StateTable stateTable = ((AbstractHeapState)kvState).getStateTable();
        this.checkConcurrentStateTable(stateTable, 1);
        desc = new ListStateDescriptor("list-state", Integer.class);
        desc.setQueryable("my-query");
        state = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
        kvState = (InternalKvState)state;
        Assert.assertTrue((boolean)(kvState instanceof AbstractHeapState));
        kvState.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
        backend.setCurrentKey((Object)1);
        state.add((Object)121818273);
        stateTable = ((AbstractHeapState)kvState).getStateTable();
        this.checkConcurrentStateTable(stateTable, 1);
        desc = new ReducingStateDescriptor("reducing-state", (ReduceFunction)new ReduceFunction<Integer>(){

            public Integer reduce(Integer value1, Integer value2) throws Exception {
                return value1 + value2;
            }
        }, Integer.class);
        desc.setQueryable("my-query");
        state = (ReducingState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
        kvState = (InternalKvState)state;
        Assert.assertTrue((boolean)(kvState instanceof AbstractHeapState));
        kvState.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
        backend.setCurrentKey((Object)1);
        state.add((Object)121818273);
        stateTable = ((AbstractHeapState)kvState).getStateTable();
        this.checkConcurrentStateTable(stateTable, 1);
        desc = new FoldingStateDescriptor("folding-state", (Object)0, (FoldFunction)new FoldFunction<Integer, Integer>(){

            public Integer fold(Integer accumulator, Integer value) throws Exception {
                return accumulator + value;
            }
        }, Integer.class);
        desc.setQueryable("my-query");
        state = (FoldingState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
        kvState = (InternalKvState)state;
        Assert.assertTrue((boolean)(kvState instanceof AbstractHeapState));
        kvState.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
        backend.setCurrentKey((Object)1);
        state.add((Object)121818273);
        stateTable = ((AbstractHeapState)kvState).getStateTable();
        this.checkConcurrentStateTable(stateTable, 1);
        desc = new MapStateDescriptor("map-state", Integer.class, String.class);
        desc.setQueryable("my-query");
        state = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
        kvState = (InternalKvState)state;
        Assert.assertTrue((boolean)(kvState instanceof AbstractHeapState));
        kvState.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
        backend.setCurrentKey((Object)1);
        state.put((Object)121818273, (Object)"121818273");
        int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup((Object)1, (int)1);
        StateTable stateTable2 = ((AbstractHeapState)kvState).getStateTable();
        Assert.assertNotNull((String)"State not set", (Object)stateTable2.get((Object)keyGroupIndex));
        this.checkConcurrentStateTable(stateTable2, 1);
        backend.dispose();
    }

    private void checkConcurrentStateTable(StateTable<?, ?, ?> stateTable, int numberOfKeyGroups) {
        Assert.assertNotNull((String)"State not set", stateTable);
        if (stateTable instanceof NestedMapsStateTable) {
            int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup((Object)1, (int)numberOfKeyGroups);
            NestedMapsStateTable nestedMapsStateTable = (NestedMapsStateTable)stateTable;
            Assert.assertTrue((boolean)(nestedMapsStateTable.getState()[keyGroupIndex] instanceof ConcurrentHashMap));
            Assert.assertTrue((boolean)(nestedMapsStateTable.getState()[keyGroupIndex].get(VoidNamespace.INSTANCE) instanceof ConcurrentHashMap));
        }
    }

    @Test
    public void testQueryableStateRegistration() throws Exception {
        DummyEnvironment env = new DummyEnvironment();
        KvStateRegistry registry = env.getKvStateRegistry();
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, env);
        KeyGroupRange expectedKeyGroupRange = backend.getKeyGroupRange();
        KvStateRegistryListener listener = (KvStateRegistryListener)Mockito.mock(KvStateRegistryListener.class);
        registry.registerListener(HighAvailabilityServices.DEFAULT_JOB_ID, listener);
        ValueStateDescriptor desc = new ValueStateDescriptor("test", (TypeSerializer)IntSerializer.INSTANCE);
        desc.setQueryable("banana");
        backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
        ((KvStateRegistryListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.times((int)1))).notifyKvStateRegistered((JobID)Matchers.eq((Object)env.getJobID()), (JobVertexID)Matchers.eq((Object)env.getJobVertexId()), (KeyGroupRange)Matchers.eq((Object)expectedKeyGroupRange), (String)Matchers.eq((Object)"banana"), (KvStateID)Matchers.any(KvStateID.class));
        KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(682375462379L, 4L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        backend.dispose();
        ((KvStateRegistryListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.times((int)1))).notifyKvStateUnregistered((JobID)Matchers.eq((Object)env.getJobID()), (JobVertexID)Matchers.eq((Object)env.getJobVertexId()), (KeyGroupRange)Matchers.eq((Object)expectedKeyGroupRange), (String)Matchers.eq((Object)"banana"));
        backend.dispose();
        backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot, env);
        snapshot.discardState();
        backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
        ((KvStateRegistryListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.times((int)2))).notifyKvStateRegistered((JobID)Matchers.eq((Object)env.getJobID()), (JobVertexID)Matchers.eq((Object)env.getJobVertexId()), (KeyGroupRange)Matchers.eq((Object)expectedKeyGroupRange), (String)Matchers.eq((Object)"banana"), (KvStateID)Matchers.any(KvStateID.class));
        backend.dispose();
    }

    @Test
    public void testEmptyStateCheckpointing() {
        try {
            CheckpointStreamFactory streamFactory = this.createStreamFactory();
            SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
            AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
            ListStateDescriptor kvId = new ListStateDescriptor("id", String.class);
            KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(682375462379L, 1L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            Assert.assertNull((Object)snapshot);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot);
            backend.dispose();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testNumStateEntries() throws Exception {
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class);
        Assert.assertEquals((long)0L, (long)backend.numKeyValueStateEntries());
        ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.setCurrentKey((Object)0);
        state.update((Object)"hello");
        state.update((Object)"ciao");
        Assert.assertEquals((long)1L, (long)backend.numKeyValueStateEntries());
        backend.setCurrentKey((Object)42);
        state.update((Object)"foo");
        Assert.assertEquals((long)2L, (long)backend.numKeyValueStateEntries());
        backend.setCurrentKey((Object)0);
        state.clear();
        Assert.assertEquals((long)1L, (long)backend.numKeyValueStateEntries());
        backend.setCurrentKey((Object)42);
        state.clear();
        Assert.assertEquals((long)0L, (long)backend.numKeyValueStateEntries());
        backend.dispose();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testParallelAsyncSnapshots() throws Exception {
        OneShotLatch blocker = new OneShotLatch();
        OneShotLatch waiter = new OneShotLatch();
        BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(0x100000);
        streamFactory.setWaiterLatch(waiter);
        streamFactory.setBlockerLatch(blocker);
        streamFactory.setAfterNumberInvocations(10);
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            if (!backend.supportsAsynchronousSnapshots()) {
                return;
            }
            InternalValueState valueState = (InternalValueState)backend.createInternalState((TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)new ValueStateDescriptor("test", (TypeSerializer)IntSerializer.INSTANCE));
            valueState.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            for (int i = 0; i < 10; ++i) {
                backend.setCurrentKey((Object)i);
                valueState.update((Object)i);
            }
            RunnableFuture snapshot1 = backend.snapshot(0L, 0L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
            Thread runner1 = new Thread((Runnable)snapshot1, "snapshot-1-runner");
            runner1.start();
            waiter.await();
            for (int i = 5; i < 15; ++i) {
                backend.setCurrentKey((Object)i);
                valueState.update((Object)(i + 1));
            }
            streamFactory.setWaiterLatch(null);
            streamFactory.setBlockerLatch(null);
            RunnableFuture snapshot2 = backend.snapshot(1L, 1L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
            Thread runner2 = new Thread((Runnable)snapshot2, "snapshot-2-runner");
            runner2.start();
            snapshot2.get();
            blocker.trigger();
            snapshot1.get();
        }
        finally {
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAsyncSnapshot() throws Exception {
        InternalValueState valueState;
        OneShotLatch waiter = new OneShotLatch();
        BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(0x100000);
        streamFactory.setWaiterLatch(waiter);
        AbstractKeyedStateBackend backend = null;
        KeyedStateHandle stateHandle = null;
        try {
            backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
            valueState = (InternalValueState)backend.createInternalState((TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)new ValueStateDescriptor("test", (TypeSerializer)IntSerializer.INSTANCE));
            valueState.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            for (int i = 0; i < 10; ++i) {
                backend.setCurrentKey((Object)i);
                valueState.update((Object)i);
            }
            RunnableFuture snapshot = backend.snapshot(0L, 0L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
            Thread runner = new Thread(snapshot);
            runner.start();
            for (int i = 0; i < 20; ++i) {
                backend.setCurrentKey((Object)i);
                valueState.update((Object)(i + 1));
                if (10 != i) continue;
                waiter.await();
            }
            runner.join();
            SnapshotResult snapshotResult = (SnapshotResult)snapshot.get();
            stateHandle = (KeyedStateHandle)snapshotResult.getJobManagerOwnedSnapshot();
            for (int i = 0; i < 20; ++i) {
                backend.setCurrentKey((Object)i);
                Assert.assertEquals((long)(i + 1), (long)((Integer)valueState.value()).intValue());
            }
        }
        finally {
            if (null != backend) {
                IOUtils.closeQuietly(backend);
                backend.dispose();
            }
        }
        Assert.assertNotNull((Object)stateHandle);
        backend = null;
        try {
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, stateHandle);
            valueState = (InternalValueState)backend.createInternalState((TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)new ValueStateDescriptor("test", (TypeSerializer)IntSerializer.INSTANCE));
            valueState.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            for (int i = 0; i < 10; ++i) {
                backend.setCurrentKey((Object)i);
                Assert.assertEquals((long)i, (long)((Integer)valueState.value()).intValue());
            }
            backend.setCurrentKey((Object)11);
            Assert.assertNull((Object)valueState.value());
        }
        finally {
            if (null != backend) {
                IOUtils.closeQuietly(backend);
                backend.dispose();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentModificationWithApplyToAllKeys() throws Exception {
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ListStateDescriptor listStateDescriptor = new ListStateDescriptor("foo", (TypeSerializer)StringSerializer.INSTANCE);
            ListState listState = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)listStateDescriptor);
            for (int i = 0; i < 100; ++i) {
                backend.setCurrentKey((Object)i);
                listState.add((Object)("Hello" + i));
            }
            backend.applyToAllKeys((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)listStateDescriptor, (KeyedStateFunction)new KeyedStateFunction<Integer, ListState<String>>(){

                public void process(Integer key, ListState<String> state) throws Exception {
                    Assert.assertEquals((Object)("Hello" + key), ((Iterable)state.get()).iterator().next());
                }
            });
            backend.applyToAllKeys((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)listStateDescriptor, (KeyedStateFunction)new KeyedStateFunction<Integer, ListState<String>>(){

                public void process(Integer key, ListState<String> state) throws Exception {
                    state.clear();
                }
            });
            backend.applyToAllKeys((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)listStateDescriptor, (KeyedStateFunction)new KeyedStateFunction<Integer, ListState<String>>(){

                public void process(Integer key, ListState<String> state) throws Exception {
                    Assert.assertFalse((boolean)((Iterable)state.get()).iterator().hasNext());
                }
            });
            backend.applyToAllKeys((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)listStateDescriptor, (KeyedStateFunction)new KeyedStateFunction<Integer, ListState<String>>(){

                public void process(Integer key, ListState<String> state) throws Exception {
                    state.add((Object)("Hello" + key));
                    state.clear();
                    state.add((Object)("Hello_" + key));
                }
            });
            backend.applyToAllKeys((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)listStateDescriptor, (KeyedStateFunction)new KeyedStateFunction<Integer, ListState<String>>(){

                public void process(Integer key, ListState<String> state) throws Exception {
                    Iterator it = ((Iterable)state.get()).iterator();
                    Assert.assertEquals((Object)("Hello_" + key), it.next());
                    Assert.assertFalse((boolean)it.hasNext());
                }
            });
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testApplyToAllKeysLambdaFunction() throws Exception {
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ListStateDescriptor listStateDescriptor = new ListStateDescriptor("foo", (TypeSerializer)StringSerializer.INSTANCE);
            ListState listState = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)listStateDescriptor);
            for (int i = 0; i < 100; ++i) {
                backend.setCurrentKey((Object)i);
                listState.add((Object)("Hello" + i));
            }
            backend.applyToAllKeys((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)listStateDescriptor, (key, state) -> Assert.assertEquals((Object)("Hello" + key), ((Iterable)state.get()).iterator().next()));
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAsyncSnapshotCancellation() throws Exception {
        OneShotLatch blocker = new OneShotLatch();
        OneShotLatch waiter = new OneShotLatch();
        BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(0x100000);
        streamFactory.setWaiterLatch(waiter);
        streamFactory.setBlockerLatch(blocker);
        streamFactory.setAfterNumberInvocations(10);
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            if (!backend.supportsAsynchronousSnapshots()) {
                return;
            }
            InternalValueState valueState = (InternalValueState)backend.createInternalState((TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)new ValueStateDescriptor("test", (TypeSerializer)IntSerializer.INSTANCE));
            valueState.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            for (int i = 0; i < 10; ++i) {
                backend.setCurrentKey((Object)i);
                valueState.update((Object)i);
            }
            RunnableFuture snapshot = backend.snapshot(0L, 0L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
            Thread runner = new Thread(snapshot);
            runner.start();
            waiter.await();
            IOUtils.closeQuietly(backend);
            blocker.trigger();
            runner.join();
            try {
                snapshot.get();
                Assert.fail((String)"Close was not propagated.");
            }
            catch (CancellationException cancellationException) {
                // empty catch block
            }
        }
        finally {
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMapStateGetKeys() throws Exception {
        int namespace1ElementsNum = 1000;
        int namespace2ElementsNum = 1000;
        String fieldName = "get-keys-test";
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            int expectedKey;
            PrimitiveIterator.OfInt actualIterator;
            String ns1 = "ns1";
            MapState keyedState1 = (MapState)backend.getPartitionedState((Object)"ns1", (TypeSerializer)StringSerializer.INSTANCE, (StateDescriptor)new MapStateDescriptor(fieldName, (TypeSerializer)StringSerializer.INSTANCE, (TypeSerializer)IntSerializer.INSTANCE));
            for (int key = 0; key < 1000; ++key) {
                backend.setCurrentKey((Object)key);
                keyedState1.put((Object)"he", (Object)(key * 2));
                keyedState1.put((Object)"ho", (Object)(key * 2));
            }
            String ns2 = "ns2";
            MapState keyedState2 = (MapState)backend.getPartitionedState((Object)"ns2", (TypeSerializer)StringSerializer.INSTANCE, (StateDescriptor)new MapStateDescriptor(fieldName, (TypeSerializer)StringSerializer.INSTANCE, (TypeSerializer)IntSerializer.INSTANCE));
            for (int key = 1000; key < 2000; ++key) {
                backend.setCurrentKey((Object)key);
                keyedState2.put((Object)"he", (Object)(key * 2));
                keyedState2.put((Object)"ho", (Object)(key * 2));
            }
            try (Stream<Integer> keysStream = backend.getKeys(fieldName, (Object)"ns1").sorted();){
                actualIterator = keysStream.mapToInt(value -> value).iterator();
                for (expectedKey = 0; expectedKey < 1000; ++expectedKey) {
                    Assert.assertTrue((boolean)actualIterator.hasNext());
                    Assert.assertEquals((long)expectedKey, (long)actualIterator.nextInt());
                }
                Assert.assertFalse((boolean)actualIterator.hasNext());
            }
            keysStream = backend.getKeys(fieldName, (Object)"ns2").sorted();
            var10_12 = null;
            try {
                actualIterator = keysStream.mapToInt(value -> value).iterator();
                for (expectedKey = 1000; expectedKey < 2000; ++expectedKey) {
                    Assert.assertTrue((boolean)actualIterator.hasNext());
                    Assert.assertEquals((long)expectedKey, (long)actualIterator.nextInt());
                }
                Assert.assertFalse((boolean)actualIterator.hasNext());
            }
            catch (Throwable throwable) {
                var10_12 = throwable;
                throw throwable;
            }
            finally {
                if (keysStream != null) {
                    if (var10_12 != null) {
                        try {
                            keysStream.close();
                        }
                        catch (Throwable throwable) {
                            var10_12.addSuppressed(throwable);
                        }
                    } else {
                        keysStream.close();
                    }
                }
            }
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCheckConcurrencyProblemWhenPerformingCheckpointAsync() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        DummyEnvironment env = new DummyEnvironment();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, env);
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
        try {
            long checkpointID = 0L;
            ArrayList<Future<SnapshotResult<KeyedStateHandle>>> futureList = new ArrayList<Future<SnapshotResult<KeyedStateHandle>>>();
            for (int i = 0; i < 10; ++i) {
                ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("id" + i, (TypeSerializer)IntSerializer.INSTANCE);
                ValueState state = (ValueState)backend.getOrCreateKeyedState((TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)valueStateDescriptor);
                ((InternalValueState)state).setCurrentNamespace((Object)VoidNamespace.INSTANCE);
                backend.setCurrentKey((Object)i);
                state.update((Object)i);
                futureList.add(this.runSnapshotAsync(executorService, backend.snapshot(checkpointID++, System.currentTimeMillis(), streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation())));
            }
            for (Future future : futureList) {
                future.get(20L, TimeUnit.SECONDS);
            }
        }
        catch (Exception e) {
            Assert.fail();
        }
        finally {
            backend.dispose();
            executorService.shutdown();
        }
    }

    protected Future<SnapshotResult<KeyedStateHandle>> runSnapshotAsync(ExecutorService executorService, RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunnableFuture) throws Exception {
        if (!snapshotRunnableFuture.isDone()) {
            CompletableFuture<SnapshotResult<KeyedStateHandle>> completableFuture = new CompletableFuture<SnapshotResult<KeyedStateHandle>>();
            executorService.submit(() -> {
                try {
                    snapshotRunnableFuture.run();
                    completableFuture.complete((SnapshotResult<KeyedStateHandle>)snapshotRunnableFuture.get());
                }
                catch (Exception e) {
                    completableFuture.completeExceptionally(e);
                }
            });
            return completableFuture;
        }
        return CompletableFuture.completedFuture(snapshotRunnableFuture.get());
    }

    protected static <V, K, N> V getSerializedValue(InternalKvState<K, N, V> kvState, K key, TypeSerializer<K> keySerializer, N namespace, TypeSerializer<N> namespaceSerializer, TypeSerializer<V> valueSerializer) throws Exception {
        byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(key, keySerializer, namespace, namespaceSerializer);
        byte[] serializedValue = kvState.getSerializedValue(serializedKeyAndNamespace, kvState.getKeySerializer(), kvState.getNamespaceSerializer(), kvState.getValueSerializer());
        if (serializedValue == null) {
            return null;
        }
        return (V)KvStateSerializer.deserializeValue((byte[])serializedValue, valueSerializer);
    }

    private static <V, K, N> List<V> getSerializedList(InternalKvState<K, N, V> kvState, K key, TypeSerializer<K> keySerializer, N namespace, TypeSerializer<N> namespaceSerializer, TypeSerializer<V> valueSerializer) throws Exception {
        byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(key, keySerializer, namespace, namespaceSerializer);
        byte[] serializedValue = kvState.getSerializedValue(serializedKeyAndNamespace, kvState.getKeySerializer(), kvState.getNamespaceSerializer(), kvState.getValueSerializer());
        if (serializedValue == null) {
            return null;
        }
        return KvStateSerializer.deserializeList((byte[])serializedValue, valueSerializer);
    }

    private static <UK, UV, K, N> Map<UK, UV> getSerializedMap(InternalKvState<K, N, Map<UK, UV>> kvState, K key, TypeSerializer<K> keySerializer, N namespace, TypeSerializer<N> namespaceSerializer, TypeSerializer<UK> userKeySerializer, TypeSerializer<UV> userValueSerializer) throws Exception {
        byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(key, keySerializer, namespace, namespaceSerializer);
        byte[] serializedValue = kvState.getSerializedValue(serializedKeyAndNamespace, kvState.getKeySerializer(), kvState.getNamespaceSerializer(), kvState.getValueSerializer());
        if (serializedValue == null) {
            return null;
        }
        return KvStateSerializer.deserializeMap((byte[])serializedValue, userKeySerializer, userValueSerializer);
    }

    protected KeyedStateHandle runSnapshot(RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunnableFuture, SharedStateRegistry sharedStateRegistry) throws Exception {
        SnapshotResult snapshotResult;
        KeyedStateHandle jobManagerOwnedSnapshot;
        if (!snapshotRunnableFuture.isDone()) {
            snapshotRunnableFuture.run();
        }
        if ((jobManagerOwnedSnapshot = (KeyedStateHandle)(snapshotResult = (SnapshotResult)snapshotRunnableFuture.get()).getJobManagerOwnedSnapshot()) != null) {
            jobManagerOwnedSnapshot.registerSharedStates(sharedStateRegistry);
        }
        return jobManagerOwnedSnapshot;
    }

    private static final class MutableLong {
        long value;

        private MutableLong() {
        }
    }

    private static class ImmutableAggregatingAddingFunction
    implements AggregateFunction<Long, Long, Long> {
        private ImmutableAggregatingAddingFunction() {
        }

        public Long createAccumulator() {
            return 0L;
        }

        public Long add(Long value, Long accumulator) {
            accumulator = accumulator + value;
            return accumulator;
        }

        public Long getResult(Long accumulator) {
            return accumulator;
        }

        public Long merge(Long a, Long b) {
            return a + b;
        }
    }

    private static class MutableAggregatingAddingFunction
    implements AggregateFunction<Long, MutableLong, Long> {
        private MutableAggregatingAddingFunction() {
        }

        public MutableLong createAccumulator() {
            return new MutableLong();
        }

        public MutableLong add(Long value, MutableLong accumulator) {
            accumulator.value += value.longValue();
            return accumulator;
        }

        public Long getResult(MutableLong accumulator) {
            return accumulator.value;
        }

        public MutableLong merge(MutableLong a, MutableLong b) {
            a.value += b.value;
            return a;
        }
    }

    public static class CustomKryoTestSerializer
    extends JavaSerializer {
        public void write(Kryo kryo, Output output, Object object) {
            super.write(kryo, output, object);
        }

        public Object read(Kryo kryo, Input input, Class type) {
            throw new ExpectedKryoTestException();
        }
    }

    public static class ExceptionThrowingTestSerializer
    extends JavaSerializer {
        public void write(Kryo kryo, Output output, Object object) {
            throw new ExpectedKryoTestException();
        }

        public Object read(Kryo kryo, Input input, Class type) {
            throw new ExpectedKryoTestException();
        }
    }

    private static class ExpectedKryoTestException
    extends RuntimeException {
        private ExpectedKryoTestException() {
        }
    }

    public static class TestReconfigurableCustomTypeSerializerUpgraded
    extends TestReconfigurableCustomTypeSerializer {
    }

    public static class TestReconfigurableCustomTypeSerializerPreUpgrade
    extends TestReconfigurableCustomTypeSerializer {
    }

    public static class TestReconfigurableCustomTypeSerializer
    extends TypeSerializer<TestCustomStateClass> {
        private boolean reconfigured = false;

        public TestReconfigurableCustomTypeSerializer() {
        }

        private TestReconfigurableCustomTypeSerializer(boolean reconfigured) {
            this.reconfigured = reconfigured;
        }

        public TypeSerializer<TestCustomStateClass> duplicate() {
            return new TestReconfigurableCustomTypeSerializer(this.reconfigured);
        }

        public TestCustomStateClass createInstance() {
            return new TestCustomStateClass(null, null);
        }

        public void serialize(TestCustomStateClass record, DataOutputView target) throws IOException {
            target.writeBoolean(this.reconfigured);
            target.writeUTF(record.getMessage());
            if (this.reconfigured) {
                target.writeUTF(record.getExtraMessage());
            }
        }

        public TestCustomStateClass deserialize(DataInputView source) throws IOException {
            boolean isNewSchema = source.readBoolean();
            String message = source.readUTF();
            if (isNewSchema) {
                return new TestCustomStateClass(message, source.readUTF());
            }
            return new TestCustomStateClass(message, null);
        }

        public TestCustomStateClass deserialize(TestCustomStateClass reuse, DataInputView source) throws IOException {
            boolean isNewSchema = source.readBoolean();
            String message = source.readUTF();
            if (isNewSchema) {
                reuse.setMessage(message);
                reuse.setExtraMessage(source.readUTF());
                return reuse;
            }
            reuse.setMessage(message);
            reuse.setExtraMessage(null);
            return reuse;
        }

        public void copy(DataInputView source, DataOutputView target) throws IOException {
            boolean reconfigured = source.readBoolean();
            target.writeUTF(source.readUTF());
            if (reconfigured) {
                target.writeUTF(source.readUTF());
            }
        }

        public TestCustomStateClass copy(TestCustomStateClass from) {
            return new TestCustomStateClass(from.getMessage(), from.getExtraMessage());
        }

        public TestCustomStateClass copy(TestCustomStateClass from, TestCustomStateClass reuse) {
            reuse.setMessage(from.getMessage());
            reuse.setExtraMessage(from.getExtraMessage());
            return reuse;
        }

        public int getLength() {
            return 0;
        }

        public boolean isImmutableType() {
            return false;
        }

        public boolean canEqual(Object obj) {
            return obj instanceof TestReconfigurableCustomTypeSerializer;
        }

        public boolean equals(Object obj) {
            if (obj == null) {
                return false;
            }
            if (!(obj instanceof TestReconfigurableCustomTypeSerializer)) {
                return false;
            }
            if (obj == this) {
                return true;
            }
            TestReconfigurableCustomTypeSerializer other = (TestReconfigurableCustomTypeSerializer)((Object)obj);
            return other.reconfigured == this.reconfigured;
        }

        public int hashCode() {
            return Objects.hash(((Object)((Object)this)).getClass().getName(), this.reconfigured);
        }

        public boolean isReconfigured() {
            return this.reconfigured;
        }

        public TypeSerializerConfigSnapshot<TestCustomStateClass> snapshotConfiguration() {
            return new ParameterlessTypeSerializerConfig(((Object)((Object)this)).getClass().getName());
        }

        public CompatibilityResult<TestCustomStateClass> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
            if (configSnapshot instanceof ParameterlessTypeSerializerConfig && ((ParameterlessTypeSerializerConfig)configSnapshot).getSerializationFormatIdentifier().equals(((Object)((Object)this)).getClass().getName())) {
                this.reconfigured = true;
                return CompatibilityResult.compatible();
            }
            return CompatibilityResult.requiresMigration();
        }
    }

    public static class TestCustomStateClass {
        private String message;
        private String extraMessage;

        public TestCustomStateClass(String message, String extraMessage) {
            this.message = message;
            this.extraMessage = extraMessage;
        }

        public String getMessage() {
            return this.message;
        }

        public void setMessage(String message) {
            this.message = message;
        }

        public String getExtraMessage() {
            return this.extraMessage;
        }

        public void setExtraMessage(String extraMessage) {
            this.extraMessage = extraMessage;
        }
    }

    public static class TestNestedPojoClassB
    implements Serializable {
        private Double doubleField;
        private String strField;

        public TestNestedPojoClassB() {
        }

        public TestNestedPojoClassB(Double doubleField, String strField) {
            this.doubleField = doubleField;
            this.strField = strField;
        }

        public Double getDoubleField() {
            return this.doubleField;
        }

        public void setDoubleField(Double doubleField) {
            this.doubleField = doubleField;
        }

        public String getStrField() {
            return this.strField;
        }

        public void setStrField(String strField) {
            this.strField = strField;
        }

        public String toString() {
            return "TestNestedPojoClassB{doubleField='" + this.doubleField + '\'' + ", strField=" + this.strField + '}';
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            TestNestedPojoClassB testNestedPojoClassB = (TestNestedPojoClassB)o;
            if (!this.doubleField.equals(testNestedPojoClassB.doubleField)) {
                return false;
            }
            return this.strField.equals(testNestedPojoClassB.strField);
        }

        public int hashCode() {
            int result = this.doubleField.hashCode();
            result = 31 * result + this.strField.hashCode();
            return result;
        }
    }

    public static class TestNestedPojoClassA
    implements Serializable {
        private Double doubleField;
        private Integer intField;

        public TestNestedPojoClassA() {
        }

        public TestNestedPojoClassA(Double doubleField, Integer intField) {
            this.doubleField = doubleField;
            this.intField = intField;
        }

        public Double getDoubleField() {
            return this.doubleField;
        }

        public void setDoubleField(Double doubleField) {
            this.doubleField = doubleField;
        }

        public Integer getIntField() {
            return this.intField;
        }

        public void setIntField(Integer intField) {
            this.intField = intField;
        }

        public String toString() {
            return "TestNestedPojoClassA{doubleField='" + this.doubleField + '\'' + ", intField=" + this.intField + '}';
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            TestNestedPojoClassA testNestedPojoClassA = (TestNestedPojoClassA)o;
            if (!this.doubleField.equals(testNestedPojoClassA.doubleField)) {
                return false;
            }
            return this.intField.equals(testNestedPojoClassA.intField);
        }

        public int hashCode() {
            int result = this.doubleField.hashCode();
            result = 31 * result + this.intField.hashCode();
            return result;
        }
    }

    public static class TestPojo
    implements Serializable {
        private String strField;
        private Integer intField;
        private TestNestedPojoClassA kryoClassAField;
        private TestNestedPojoClassB kryoClassBField;

        public TestPojo() {
        }

        public TestPojo(String strField, Integer intField) {
            this.strField = strField;
            this.intField = intField;
            this.kryoClassAField = null;
            this.kryoClassBField = null;
        }

        public TestPojo(String strField, Integer intField, TestNestedPojoClassA classAField, TestNestedPojoClassB classBfield) {
            this.strField = strField;
            this.intField = intField;
            this.kryoClassAField = classAField;
            this.kryoClassBField = classBfield;
        }

        public String getStrField() {
            return this.strField;
        }

        public void setStrField(String strField) {
            this.strField = strField;
        }

        public Integer getIntField() {
            return this.intField;
        }

        public void setIntField(Integer intField) {
            this.intField = intField;
        }

        public TestNestedPojoClassA getKryoClassAField() {
            return this.kryoClassAField;
        }

        public void setKryoClassAField(TestNestedPojoClassA kryoClassAField) {
            this.kryoClassAField = kryoClassAField;
        }

        public TestNestedPojoClassB getKryoClassBField() {
            return this.kryoClassBField;
        }

        public void setKryoClassBField(TestNestedPojoClassB kryoClassBField) {
            this.kryoClassBField = kryoClassBField;
        }

        public String toString() {
            return "TestPojo{strField='" + this.strField + '\'' + ", intField=" + this.intField + '}';
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            TestPojo testPojo = (TestPojo)o;
            return this.strField.equals(testPojo.strField) && this.intField.equals(testPojo.intField) && (this.kryoClassAField == null && testPojo.kryoClassAField == null || this.kryoClassAField.equals(testPojo.kryoClassAField)) && (this.kryoClassBField == null && testPojo.kryoClassBField == null || this.kryoClassBField.equals(testPojo.kryoClassBField));
        }

        public int hashCode() {
            int result = this.strField.hashCode();
            result = 31 * result + this.intField.hashCode();
            if (this.kryoClassAField != null) {
                result = 31 * result + this.kryoClassAField.hashCode();
            }
            if (this.kryoClassBField != null) {
                result = 31 * result + this.kryoClassBField.hashCode();
            }
            return result;
        }
    }

    private static class AppendingFold
    implements FoldFunction<Integer, String> {
        private static final long serialVersionUID = 1L;

        private AppendingFold() {
        }

        public String fold(String acc, Integer value) throws Exception {
            return acc + "," + value;
        }
    }

    private static class AppendingReduce
    implements ReduceFunction<String> {
        private AppendingReduce() {
        }

        public String reduce(String value1, String value2) throws Exception {
            return value1 + "," + value2;
        }
    }

    public static class ModifiedTestElementSerializer
    extends InternalPriorityQueueTestBase.TestElementSerializer {
        @Override
        public void serialize(InternalPriorityQueueTestBase.TestElement record, DataOutputView target) throws IOException {
            super.serialize(new InternalPriorityQueueTestBase.TestElement(record.getKey() + 1L, record.getPriority() + 1L), target);
        }

        @Override
        protected int getRevision() {
            return super.getRevision() + 1;
        }
    }
}

