/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PrimitiveIterator;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RunnableFuture;
import java.util.stream.Stream;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
import org.apache.flink.api.common.typeutils.base.FloatSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateRegistryListener;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.heap.AbstractHeapState;
import org.apache.flink.runtime.state.heap.NestedMapsStateTable;
import org.apache.flink.runtime.state.heap.StateTable;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.shaded.guava18.com.google.common.base.Joiner;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.FutureUtil;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.StateMigrationException;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.core.Is;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public abstract class StateBackendTestBase<B extends AbstractStateBackend>
extends TestLogger {
    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    protected abstract B getStateBackend() throws Exception;

    protected CheckpointStreamFactory createStreamFactory() throws Exception {
        return this.getStateBackend().createStreamFactory(new JobID(), "test_op");
    }

    protected <K> AbstractKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> keySerializer) throws Exception {
        return this.createKeyedBackend(keySerializer, new DummyEnvironment("test", 1, 0));
    }

    protected <K> AbstractKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> keySerializer, Environment env) throws Exception {
        return this.createKeyedBackend(keySerializer, 10, new KeyGroupRange(0, 9), env);
    }

    protected <K> AbstractKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, Environment env) throws Exception {
        AbstractKeyedStateBackend backend = this.getStateBackend().createKeyedStateBackend(env, new JobID(), "test_op", keySerializer, numberOfKeyGroups, keyGroupRange, env.getTaskKvStateRegistry());
        backend.restore(null);
        return backend;
    }

    protected <K> AbstractKeyedStateBackend<K> restoreKeyedBackend(TypeSerializer<K> keySerializer, KeyedStateHandle state) throws Exception {
        return this.restoreKeyedBackend(keySerializer, state, new DummyEnvironment("test", 1, 0));
    }

    protected <K> AbstractKeyedStateBackend<K> restoreKeyedBackend(TypeSerializer<K> keySerializer, KeyedStateHandle state, Environment env) throws Exception {
        return this.restoreKeyedBackend(keySerializer, 10, new KeyGroupRange(0, 9), Collections.singletonList(state), env);
    }

    protected <K> AbstractKeyedStateBackend<K> restoreKeyedBackend(TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, List<KeyedStateHandle> state, Environment env) throws Exception {
        AbstractKeyedStateBackend backend = this.getStateBackend().createKeyedStateBackend(env, new JobID(), "test_op", keySerializer, numberOfKeyGroups, keyGroupRange, env.getTaskKvStateRegistry());
        backend.restore(state);
        return backend;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testGetKeys() throws Exception {
        int elementsToTest = 1000;
        String fieldName = "get-keys-test";
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ValueState keyedState = (ValueState)backend.getOrCreateKeyedState((TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)new ValueStateDescriptor(fieldName, (TypeSerializer)IntSerializer.INSTANCE));
            ((InternalValueState)keyedState).setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            for (int key = 0; key < 1000; ++key) {
                backend.setCurrentKey((Object)key);
                keyedState.update((Object)(key * 2));
            }
            try (Stream<Integer> keysStream = backend.getKeys(fieldName, (Object)VoidNamespace.INSTANCE).sorted();){
                PrimitiveIterator.OfInt actualIterator = keysStream.mapToInt(value -> value).iterator();
                for (int expectedKey = 0; expectedKey < 1000; ++expectedKey) {
                    Assert.assertTrue((boolean)actualIterator.hasNext());
                    Assert.assertEquals((long)expectedKey, (long)actualIterator.nextInt());
                }
                Assert.assertFalse((boolean)actualIterator.hasNext());
            }
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    @Test
    public void testBackendUsesRegisteredKryoDefaultSerializer() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        DummyEnvironment env = new DummyEnvironment("test", 1, 0);
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, env);
        env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, ExceptionThrowingTestSerializer.class);
        GenericTypeInfo pojoType = new GenericTypeInfo(TestPojo.class);
        Assert.assertTrue((boolean)(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer));
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
        ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        int numExceptions = 0;
        backend.setCurrentKey((Object)1);
        try {
            state.update((Object)new TestPojo("u1", 1));
        }
        catch (ExpectedKryoTestException e) {
            ++numExceptions;
        }
        catch (Exception e) {
            if (e.getCause() instanceof ExpectedKryoTestException) {
                ++numExceptions;
            }
            throw e;
        }
        try {
            this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpoint()));
        }
        catch (ExpectedKryoTestException e) {
            ++numExceptions;
        }
        catch (Exception e) {
            if (e.getCause() instanceof ExpectedKryoTestException) {
                ++numExceptions;
            }
            throw e;
        }
        Assert.assertEquals((String)"Didn't see the expected Kryo exception.", (long)1L, (long)numExceptions);
        backend.dispose();
    }

    @Test
    public void testBackendUsesRegisteredKryoDefaultSerializerUsingGetOrCreate() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        DummyEnvironment env = new DummyEnvironment("test", 1, 0);
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, env);
        env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, ExceptionThrowingTestSerializer.class);
        GenericTypeInfo pojoType = new GenericTypeInfo(TestPojo.class);
        Assert.assertTrue((boolean)(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer));
        pojoType.createSerializer(env.getExecutionConfig());
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
        ValueState state = (ValueState)backend.getOrCreateKeyedState((TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        Assert.assertTrue((boolean)(state instanceof InternalValueState));
        ((InternalValueState)state).setCurrentNamespace((Object)VoidNamespace.INSTANCE);
        int numExceptions = 0;
        backend.setCurrentKey((Object)1);
        try {
            state.update((Object)new TestPojo("u1", 1));
        }
        catch (ExpectedKryoTestException e) {
            ++numExceptions;
        }
        catch (Exception e) {
            if (e.getCause() instanceof ExpectedKryoTestException) {
                ++numExceptions;
            }
            throw e;
        }
        try {
            this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpoint()));
        }
        catch (ExpectedKryoTestException e) {
            ++numExceptions;
        }
        catch (Exception e) {
            if (e.getCause() instanceof ExpectedKryoTestException) {
                ++numExceptions;
            }
            throw e;
        }
        Assert.assertEquals((String)"Didn't see the expected Kryo exception.", (long)1L, (long)numExceptions);
        backend.dispose();
    }

    @Test
    public void testBackendUsesRegisteredKryoSerializer() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        DummyEnvironment env = new DummyEnvironment("test", 1, 0);
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, env);
        env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, ExceptionThrowingTestSerializer.class);
        GenericTypeInfo pojoType = new GenericTypeInfo(TestPojo.class);
        Assert.assertTrue((boolean)(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer));
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
        ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        int numExceptions = 0;
        backend.setCurrentKey((Object)1);
        try {
            state.update((Object)new TestPojo("u1", 1));
        }
        catch (ExpectedKryoTestException e) {
            ++numExceptions;
        }
        catch (Exception e) {
            if (e.getCause() instanceof ExpectedKryoTestException) {
                ++numExceptions;
            }
            throw e;
        }
        try {
            this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpoint()));
        }
        catch (ExpectedKryoTestException e) {
            ++numExceptions;
        }
        catch (Exception e) {
            if (e.getCause() instanceof ExpectedKryoTestException) {
                ++numExceptions;
            }
            throw e;
        }
        Assert.assertEquals((String)"Didn't see the expected Kryo exception.", (long)1L, (long)numExceptions);
        backend.dispose();
    }

    @Test
    public void testBackendUsesRegisteredKryoSerializerUsingGetOrCreate() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        DummyEnvironment env = new DummyEnvironment("test", 1, 0);
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, env);
        env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, ExceptionThrowingTestSerializer.class);
        GenericTypeInfo pojoType = new GenericTypeInfo(TestPojo.class);
        Assert.assertTrue((boolean)(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer));
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
        ValueState state = (ValueState)backend.getOrCreateKeyedState((TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        Assert.assertTrue((boolean)(state instanceof InternalValueState));
        ((InternalValueState)state).setCurrentNamespace((Object)VoidNamespace.INSTANCE);
        int numExceptions = 0;
        backend.setCurrentKey((Object)1);
        try {
            state.update((Object)new TestPojo("u1", 1));
        }
        catch (ExpectedKryoTestException e) {
            ++numExceptions;
        }
        catch (Exception e) {
            if (e.getCause() instanceof ExpectedKryoTestException) {
                ++numExceptions;
            }
            throw e;
        }
        try {
            this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpoint()));
        }
        catch (ExpectedKryoTestException e) {
            ++numExceptions;
        }
        catch (Exception e) {
            if (e.getCause() instanceof ExpectedKryoTestException) {
                ++numExceptions;
            }
            throw e;
        }
        Assert.assertEquals((String)"Didn't see the expected Kryo exception.", (long)1L, (long)numExceptions);
        backend.dispose();
    }

    @Test
    public void testKryoRegisteringRestoreResilienceWithRegisteredType() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        DummyEnvironment env = new DummyEnvironment("test", 1, 0);
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, env);
        GenericTypeInfo pojoType = new GenericTypeInfo(TestPojo.class);
        Assert.assertTrue((boolean)(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer));
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
        ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.setCurrentKey((Object)1);
        state.update((Object)new TestPojo("u1", 1));
        backend.setCurrentKey((Object)2);
        state.update((Object)new TestPojo("u2", 2));
        KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpoint()));
        backend.dispose();
        env.getExecutionConfig().registerKryoType(TestPojo.class);
        backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot, env);
        snapshot.discardState();
        state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)state.value(), (Object)new TestPojo("u1", 1));
        backend.setCurrentKey((Object)2);
        Assert.assertEquals((Object)state.value(), (Object)new TestPojo("u2", 2));
        backend.dispose();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testKryoRegisteringRestoreResilienceWithDefaultSerializer() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        DummyEnvironment env = new DummyEnvironment("test", 1, 0);
        AbstractKeyedStateBackend backend = null;
        try {
            backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, env);
            GenericTypeInfo pojoType = new GenericTypeInfo(TestPojo.class);
            Assert.assertTrue((boolean)(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer));
            ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.update((Object)new TestPojo("u1", 1));
            backend.setCurrentKey((Object)2);
            state.update((Object)new TestPojo("u2", 2));
            KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpoint()));
            snapshot.registerSharedStates(sharedStateRegistry);
            backend.dispose();
            env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, CustomKryoTestSerializer.class);
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot, env);
            kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
            state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.update((Object)new TestPojo("u1", 11));
            KeyedStateHandle snapshot2 = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpoint()));
            snapshot2.registerSharedStates(sharedStateRegistry);
            snapshot.discardState();
            backend.dispose();
            env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, CustomKryoTestSerializer.class);
            this.expectedException.expect(ExpectedKryoTestException.class);
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot2, env);
            state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.value();
            snapshot2.discardState();
            backend.dispose();
        }
        finally {
            IOUtils.closeQuietly(backend);
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testKryoRegisteringRestoreResilienceWithRegisteredSerializer() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        DummyEnvironment env = new DummyEnvironment("test", 1, 0);
        AbstractKeyedStateBackend backend = null;
        try {
            backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, env);
            GenericTypeInfo pojoType = new GenericTypeInfo(TestPojo.class);
            Assert.assertTrue((boolean)(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer));
            ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.update((Object)new TestPojo("u1", 1));
            backend.setCurrentKey((Object)2);
            state.update((Object)new TestPojo("u2", 2));
            KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpoint()));
            snapshot.registerSharedStates(sharedStateRegistry);
            backend.dispose();
            env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, CustomKryoTestSerializer.class);
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot, env);
            kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
            state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.update((Object)new TestPojo("u1", 11));
            KeyedStateHandle snapshot2 = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpoint()));
            snapshot2.registerSharedStates(sharedStateRegistry);
            snapshot.discardState();
            backend.dispose();
            env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, CustomKryoTestSerializer.class);
            this.expectedException.expect(ExpectedKryoTestException.class);
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot2, env);
            state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.value();
            backend.dispose();
        }
        finally {
            if (backend != null) {
                backend.dispose();
            }
        }
    }

    @Test
    public void testKryoRestoreResilienceWithDifferentRegistrationOrder() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        DummyEnvironment env = new DummyEnvironment("test", 1, 0);
        env.getExecutionConfig().registerKryoType(TestNestedPojoClassA.class);
        env.getExecutionConfig().registerKryoType(TestNestedPojoClassB.class);
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, env);
        GenericTypeInfo pojoType = new GenericTypeInfo(TestPojo.class);
        Assert.assertTrue((boolean)(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer));
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
        ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.setCurrentKey((Object)1);
        state.update((Object)new TestPojo("u1", 1, new TestNestedPojoClassA(1.0, 2), new TestNestedPojoClassB(2.3, "foo")));
        backend.setCurrentKey((Object)2);
        state.update((Object)new TestPojo("u2", 2, new TestNestedPojoClassA(2.0, 5), new TestNestedPojoClassB(3.1, "bar")));
        KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpoint()));
        backend.dispose();
        env = new DummyEnvironment("test", 1, 0);
        env.getExecutionConfig().registerKryoType(TestNestedPojoClassB.class);
        env.getExecutionConfig().registerKryoType(TestNestedPojoClassA.class);
        backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot, env);
        snapshot.discardState();
        kvId = new ValueStateDescriptor("id", (TypeInformation)pojoType);
        state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.setCurrentKey((Object)1);
        state.update((Object)new TestPojo("u1", 11, new TestNestedPojoClassA(22.1, 12), new TestNestedPojoClassB(1.23, "foobar")));
        this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpoint()));
        backend.dispose();
    }

    @Test
    public void testPojoRestoreResilienceWithDifferentRegistrationOrder() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        DummyEnvironment env = new DummyEnvironment("test", 1, 0);
        env.getExecutionConfig().registerPojoType(TestNestedPojoClassA.class);
        env.getExecutionConfig().registerPojoType(TestNestedPojoClassB.class);
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, env);
        TypeInformation pojoType = TypeExtractor.getForClass(TestPojo.class);
        Assert.assertTrue((boolean)(pojoType.createSerializer(env.getExecutionConfig()) instanceof PojoSerializer));
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", pojoType);
        ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.setCurrentKey((Object)1);
        state.update((Object)new TestPojo("u1", 1, new TestNestedPojoClassA(1.0, 2), new TestNestedPojoClassB(2.3, "foo")));
        backend.setCurrentKey((Object)2);
        state.update((Object)new TestPojo("u2", 2, new TestNestedPojoClassA(2.0, 5), new TestNestedPojoClassB(3.1, "bar")));
        KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpoint()));
        backend.dispose();
        env = new DummyEnvironment("test", 1, 0);
        env.getExecutionConfig().registerPojoType(TestNestedPojoClassB.class);
        env.getExecutionConfig().registerPojoType(TestNestedPojoClassA.class);
        backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot, env);
        snapshot.discardState();
        kvId = new ValueStateDescriptor("id", pojoType);
        state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.setCurrentKey((Object)1);
        state.update((Object)new TestPojo("u1", 11, new TestNestedPojoClassA(22.1, 12), new TestNestedPojoClassB(1.23, "foobar")));
        this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpoint()));
        backend.dispose();
    }

    @Test
    public void testValueState() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class);
        IntSerializer keySerializer = IntSerializer.INSTANCE;
        VoidNamespaceSerializer namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
        ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        InternalKvState kvState = (InternalKvState)state;
        TypeSerializer valueSerializer = kvId.getSerializer();
        backend.setCurrentKey((Object)1);
        Assert.assertNull((Object)state.value());
        Assert.assertNull(StateBackendTestBase.getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        state.update((Object)"1");
        backend.setCurrentKey((Object)2);
        Assert.assertNull((Object)state.value());
        Assert.assertNull(StateBackendTestBase.getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        state.update((Object)"2");
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"1", (Object)state.value());
        Assert.assertEquals((Object)"1", StateBackendTestBase.getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        KeyedStateHandle snapshot1 = (KeyedStateHandle)FutureUtil.runIfNotDoneAndGet((RunnableFuture)backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpoint()));
        backend.setCurrentKey((Object)1);
        state.update((Object)"u1");
        backend.setCurrentKey((Object)2);
        state.update((Object)"u2");
        backend.setCurrentKey((Object)3);
        state.update((Object)"u3");
        KeyedStateHandle snapshot2 = (KeyedStateHandle)FutureUtil.runIfNotDoneAndGet((RunnableFuture)backend.snapshot(682375462379L, 4L, streamFactory, CheckpointOptions.forCheckpoint()));
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"u1", (Object)state.value());
        Assert.assertEquals((Object)"u1", StateBackendTestBase.getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.setCurrentKey((Object)2);
        Assert.assertEquals((Object)"u2", (Object)state.value());
        Assert.assertEquals((Object)"u2", StateBackendTestBase.getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.setCurrentKey((Object)3);
        Assert.assertEquals((Object)"u3", (Object)state.value());
        Assert.assertEquals((Object)"u3", StateBackendTestBase.getSerializedValue(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.dispose();
        backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot1);
        snapshot1.discardState();
        ValueState restored1 = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        InternalKvState restoredKvState1 = (InternalKvState)restored1;
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"1", (Object)restored1.value());
        Assert.assertEquals((Object)"1", StateBackendTestBase.getSerializedValue(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.setCurrentKey((Object)2);
        Assert.assertEquals((Object)"2", (Object)restored1.value());
        Assert.assertEquals((Object)"2", StateBackendTestBase.getSerializedValue(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.dispose();
        backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot2);
        snapshot2.discardState();
        ValueState restored2 = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        InternalKvState restoredKvState2 = (InternalKvState)restored2;
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"u1", (Object)restored2.value());
        Assert.assertEquals((Object)"u1", StateBackendTestBase.getSerializedValue(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.setCurrentKey((Object)2);
        Assert.assertEquals((Object)"u2", (Object)restored2.value());
        Assert.assertEquals((Object)"u2", StateBackendTestBase.getSerializedValue(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.setCurrentKey((Object)3);
        Assert.assertEquals((Object)"u3", (Object)restored2.value());
        Assert.assertEquals((Object)"u3", StateBackendTestBase.getSerializedValue(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.dispose();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testValueStateRace() throws Exception {
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        Integer namespace = 1;
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class);
        IntSerializer keySerializer = IntSerializer.INSTANCE;
        IntSerializer namespaceSerializer = IntSerializer.INSTANCE;
        final ValueState state = (ValueState)backend.getPartitionedState((Object)namespace, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)kvId);
        TypeSerializer valueSerializer = kvId.getSerializer();
        final InternalKvState kvState = (InternalKvState)state;
        boolean key1 = true;
        backend.setCurrentKey((Object)1);
        kvState.setCurrentNamespace((Object)2);
        state.update((Object)"2");
        Assert.assertEquals((Object)"2", (Object)state.value());
        Assert.assertNull(StateBackendTestBase.getSerializedValue(kvState, 3, keySerializer, namespace, IntSerializer.INSTANCE, valueSerializer));
        Assert.assertEquals((Object)"2", (Object)state.value());
        kvState.setCurrentNamespace((Object)namespace);
        int key2 = 10;
        backend.setCurrentKey((Object)10);
        Assert.assertNull((Object)state.value());
        Assert.assertNull(StateBackendTestBase.getSerializedValue(kvState, 10, keySerializer, namespace, namespaceSerializer, valueSerializer));
        state.update((Object)"1");
        final CheckedThread getter = new CheckedThread("State getter"){

            public void go() throws Exception {
                while (!this.isInterrupted()) {
                    Assert.assertEquals((Object)"1", (Object)state.value());
                }
            }
        };
        final CheckedThread serializedGetter = new CheckedThread("Serialized state getter", (TypeSerializer)keySerializer, namespace, (TypeSerializer)namespaceSerializer, valueSerializer){
            final /* synthetic */ TypeSerializer val$keySerializer;
            final /* synthetic */ Integer val$namespace;
            final /* synthetic */ TypeSerializer val$namespaceSerializer;
            final /* synthetic */ TypeSerializer val$valueSerializer;
            {
                this.val$keySerializer = typeSerializer;
                this.val$namespace = n;
                this.val$namespaceSerializer = typeSerializer2;
                this.val$valueSerializer = typeSerializer3;
                super(x0);
            }

            public void go() throws Exception {
                while (!this.isInterrupted() && getter.isAlive()) {
                    String serializedValue = (String)StateBackendTestBase.getSerializedValue(kvState, 10, this.val$keySerializer, this.val$namespace, this.val$namespaceSerializer, this.val$valueSerializer);
                    Assert.assertEquals((Object)"1", (Object)serializedValue);
                }
            }
        };
        getter.start();
        serializedGetter.start();
        Timer t = new Timer("stopper");
        t.schedule(new TimerTask(){

            @Override
            public void run() {
                getter.interrupt();
                serializedGetter.interrupt();
                this.cancel();
            }
        }, 100L);
        try {
            serializedGetter.sync();
            getter.interrupt();
            getter.sync();
            t.cancel();
        }
        finally {
            backend.dispose();
        }
    }

    @Test
    public void testMultipleValueStates() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), new DummyEnvironment("test_op", 1, 0));
        ValueStateDescriptor desc1 = new ValueStateDescriptor("a-string", (TypeSerializer)StringSerializer.INSTANCE);
        ValueStateDescriptor desc2 = new ValueStateDescriptor("an-integer", (TypeSerializer)IntSerializer.INSTANCE);
        ValueState state1 = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc1);
        ValueState state2 = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc2);
        backend.setCurrentKey((Object)1);
        Assert.assertNull((Object)state1.value());
        Assert.assertNull((Object)state2.value());
        state1.update((Object)"1");
        Assert.assertEquals((Object)"1", (Object)state1.value());
        Assert.assertNull((Object)state2.value());
        state2.update((Object)13);
        Assert.assertEquals((Object)"1", (Object)state1.value());
        Assert.assertEquals((long)13L, (long)((Integer)state2.value()).intValue());
        KeyedStateHandle snapshot1 = (KeyedStateHandle)FutureUtil.runIfNotDoneAndGet((RunnableFuture)backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpoint()));
        backend.dispose();
        backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), Collections.singletonList(snapshot1), new DummyEnvironment("test_op", 1, 0));
        snapshot1.discardState();
        backend.setCurrentKey((Object)1);
        state1 = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc1);
        state2 = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc2);
        Assert.assertEquals((Object)"1", (Object)state1.value());
        Assert.assertEquals((long)13L, (long)((Integer)state2.value()).intValue());
        backend.dispose();
    }

    @Test
    public void testValueStateNullUpdate() throws Exception {
        try {
            LongSerializer.INSTANCE.serialize(null, (DataOutputView)new DataOutputViewStreamWrapper((OutputStream)new ByteArrayOutputStream()));
            Assert.fail((String)"Should fail with NullPointerException");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", (TypeSerializer)LongSerializer.INSTANCE, (Object)42L);
        ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((long)42L, (long)((Long)state.value()));
        state.update((Object)1L);
        Assert.assertEquals((long)1L, (long)((Long)state.value()));
        backend.setCurrentKey((Object)2);
        Assert.assertEquals((long)42L, (long)((Long)state.value()));
        backend.setCurrentKey((Object)1);
        state.clear();
        Assert.assertEquals((long)42L, (long)((Long)state.value()));
        state.update((Object)17L);
        Assert.assertEquals((long)17L, (long)((Long)state.value()));
        state.update(null);
        Assert.assertEquals((long)42L, (long)((Long)state.value()));
        KeyedStateHandle snapshot1 = (KeyedStateHandle)FutureUtil.runIfNotDoneAndGet((RunnableFuture)backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpoint()));
        backend.dispose();
        backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot1);
        snapshot1.discardState();
        backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.dispose();
    }

    @Test
    public void testListState() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        ListStateDescriptor kvId = new ListStateDescriptor("id", String.class);
        IntSerializer keySerializer = IntSerializer.INSTANCE;
        VoidNamespaceSerializer namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
        ListState state = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        InternalKvState kvState = (InternalKvState)state;
        TypeSerializer valueSerializer = kvId.getElementSerializer();
        Joiner joiner = Joiner.on((String)",");
        backend.setCurrentKey((Object)1);
        Assert.assertEquals(null, (Object)state.get());
        Assert.assertEquals(null, StateBackendTestBase.getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        state.add((Object)"1");
        backend.setCurrentKey((Object)2);
        Assert.assertEquals(null, (Object)state.get());
        Assert.assertEquals(null, StateBackendTestBase.getSerializedList(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        state.add((Object)"2");
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"1", (Object)joiner.join((Iterable)state.get()));
        Assert.assertEquals((Object)"1", (Object)joiner.join(StateBackendTestBase.getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
        KeyedStateHandle snapshot1 = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpoint()));
        backend.setCurrentKey((Object)1);
        state.add((Object)"u1");
        backend.setCurrentKey((Object)2);
        state.add((Object)"u2");
        backend.setCurrentKey((Object)3);
        state.add((Object)"u3");
        KeyedStateHandle snapshot2 = this.runSnapshot(backend.snapshot(682375462379L, 4L, streamFactory, CheckpointOptions.forCheckpoint()));
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"1,u1", (Object)joiner.join((Iterable)state.get()));
        Assert.assertEquals((Object)"1,u1", (Object)joiner.join(StateBackendTestBase.getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
        backend.setCurrentKey((Object)2);
        Assert.assertEquals((Object)"2,u2", (Object)joiner.join((Iterable)state.get()));
        Assert.assertEquals((Object)"2,u2", (Object)joiner.join(StateBackendTestBase.getSerializedList(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
        backend.setCurrentKey((Object)3);
        Assert.assertEquals((Object)"u3", (Object)joiner.join((Iterable)state.get()));
        Assert.assertEquals((Object)"u3", (Object)joiner.join(StateBackendTestBase.getSerializedList(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
        backend.dispose();
        backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot1);
        snapshot1.discardState();
        ListState restored1 = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        InternalKvState restoredKvState1 = (InternalKvState)restored1;
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"1", (Object)joiner.join((Iterable)restored1.get()));
        Assert.assertEquals((Object)"1", (Object)joiner.join(StateBackendTestBase.getSerializedList(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
        backend.setCurrentKey((Object)2);
        Assert.assertEquals((Object)"2", (Object)joiner.join((Iterable)restored1.get()));
        Assert.assertEquals((Object)"2", (Object)joiner.join(StateBackendTestBase.getSerializedList(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
        backend.dispose();
        backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot2);
        snapshot2.discardState();
        ListState restored2 = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        InternalKvState restoredKvState2 = (InternalKvState)restored2;
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"1,u1", (Object)joiner.join((Iterable)restored2.get()));
        Assert.assertEquals((Object)"1,u1", (Object)joiner.join(StateBackendTestBase.getSerializedList(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
        backend.setCurrentKey((Object)2);
        Assert.assertEquals((Object)"2,u2", (Object)joiner.join((Iterable)restored2.get()));
        Assert.assertEquals((Object)"2,u2", (Object)joiner.join(StateBackendTestBase.getSerializedList(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
        backend.setCurrentKey((Object)3);
        Assert.assertEquals((Object)"u3", (Object)joiner.join((Iterable)restored2.get()));
        Assert.assertEquals((Object)"u3", (Object)joiner.join(StateBackendTestBase.getSerializedList(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
        backend.dispose();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testListStateAddAndGet() throws Exception {
        AbstractKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        try {
            ListState state = (ListState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertNull((Object)state.get());
            state.add((Object)17L);
            state.add((Object)11L);
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{17L, 11L}));
            keyedBackend.setCurrentKey((Object)"abc");
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"g");
            Assert.assertNull((Object)state.get());
            state.add((Object)1L);
            state.add((Object)2L);
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{11L, 17L}));
            state.clear();
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"g");
            state.add((Object)3L);
            state.add((Object)2L);
            state.add((Object)1L);
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"g");
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{1L, 2L, 3L, 2L, 1L}));
            state.clear();
            Assert.assertThat((String)"State backend is not empty.", (Object)keyedBackend.numStateEntries(), (Matcher)Is.is((Object)0));
        }
        finally {
            keyedBackend.close();
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testListStateMerging() throws Exception {
        AbstractKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        ListStateDescriptor stateDescr = new ListStateDescriptor("my-state", Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        try {
            InternalListState state = (InternalListState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)33L);
            state.add((Object)55L);
            state.setCurrentNamespace((Object)namespace2);
            state.add((Object)22L);
            state.add((Object)11L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)44L);
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)44L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)22L);
            state.add((Object)55L);
            state.add((Object)33L);
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"abc");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{11L, 22L, 33L, 44L, 55L}));
            keyedBackend.setCurrentKey((Object)"def");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{11L, 22L, 33L, 44L, 55L}));
            keyedBackend.setCurrentKey((Object)"ghi");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"jkl");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{11L, 22L, 33L, 44L, 55L}));
            keyedBackend.setCurrentKey((Object)"mno");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{11L, 22L, 33L, 44L, 55L}));
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"ghi");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assert.assertThat((String)"State backend is not empty.", (Object)keyedBackend.numStateEntries(), (Matcher)Is.is((Object)0));
        }
        finally {
            keyedBackend.close();
            keyedBackend.dispose();
        }
    }

    @Test
    public void testReducingState() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        ReducingStateDescriptor kvId = new ReducingStateDescriptor("id", (ReduceFunction)new AppendingReduce(), String.class);
        IntSerializer keySerializer = IntSerializer.INSTANCE;
        VoidNamespaceSerializer namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
        ReducingState state = (ReducingState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        InternalKvState kvState = (InternalKvState)state;
        TypeSerializer valueSerializer = kvId.getSerializer();
        backend.setCurrentKey((Object)1);
        Assert.assertEquals(null, (Object)state.get());
        Assert.assertNull(StateBackendTestBase.getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        state.add((Object)"1");
        backend.setCurrentKey((Object)2);
        Assert.assertEquals(null, (Object)state.get());
        Assert.assertNull(StateBackendTestBase.getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        state.add((Object)"2");
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"1", (Object)state.get());
        Assert.assertEquals((Object)"1", StateBackendTestBase.getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        KeyedStateHandle snapshot1 = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpoint()));
        backend.setCurrentKey((Object)1);
        state.add((Object)"u1");
        backend.setCurrentKey((Object)2);
        state.add((Object)"u2");
        backend.setCurrentKey((Object)3);
        state.add((Object)"u3");
        KeyedStateHandle snapshot2 = this.runSnapshot(backend.snapshot(682375462379L, 4L, streamFactory, CheckpointOptions.forCheckpoint()));
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"1,u1", (Object)state.get());
        Assert.assertEquals((Object)"1,u1", StateBackendTestBase.getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.setCurrentKey((Object)2);
        Assert.assertEquals((Object)"2,u2", (Object)state.get());
        Assert.assertEquals((Object)"2,u2", StateBackendTestBase.getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.setCurrentKey((Object)3);
        Assert.assertEquals((Object)"u3", (Object)state.get());
        Assert.assertEquals((Object)"u3", StateBackendTestBase.getSerializedValue(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.dispose();
        backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot1);
        snapshot1.discardState();
        ReducingState restored1 = (ReducingState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        InternalKvState restoredKvState1 = (InternalKvState)restored1;
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"1", (Object)restored1.get());
        Assert.assertEquals((Object)"1", StateBackendTestBase.getSerializedValue(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.setCurrentKey((Object)2);
        Assert.assertEquals((Object)"2", (Object)restored1.get());
        Assert.assertEquals((Object)"2", StateBackendTestBase.getSerializedValue(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.dispose();
        backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot2);
        snapshot2.discardState();
        ReducingState restored2 = (ReducingState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        InternalKvState restoredKvState2 = (InternalKvState)restored2;
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"1,u1", (Object)restored2.get());
        Assert.assertEquals((Object)"1,u1", StateBackendTestBase.getSerializedValue(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.setCurrentKey((Object)2);
        Assert.assertEquals((Object)"2,u2", (Object)restored2.get());
        Assert.assertEquals((Object)"2,u2", StateBackendTestBase.getSerializedValue(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.setCurrentKey((Object)3);
        Assert.assertEquals((Object)"u3", (Object)restored2.get());
        Assert.assertEquals((Object)"u3", StateBackendTestBase.getSerializedValue(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.dispose();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReducingStateAddAndGet() throws Exception {
        ReducingStateDescriptor stateDescr = new ReducingStateDescriptor("my-state", (ReduceFunction & Serializable)(a, b) -> a + b, Long.class);
        AbstractKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            ReducingState state = (ReducingState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertNull((Object)state.get());
            state.add((Object)17L);
            state.add((Object)11L);
            Assert.assertEquals((long)28L, (long)((Long)state.get()));
            keyedBackend.setCurrentKey((Object)"abc");
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"g");
            Assert.assertNull((Object)state.get());
            state.add((Object)1L);
            state.add((Object)2L);
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertEquals((long)28L, (long)((Long)state.get()));
            state.clear();
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"g");
            state.add((Object)3L);
            state.add((Object)2L);
            state.add((Object)1L);
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"g");
            Assert.assertEquals((long)9L, (long)((Long)state.get()));
            state.clear();
            Assert.assertThat((String)"State backend is not empty.", (Object)keyedBackend.numStateEntries(), (Matcher)Is.is((Object)0));
        }
        finally {
            keyedBackend.close();
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReducingStateMerging() throws Exception {
        ReducingStateDescriptor stateDescr = new ReducingStateDescriptor("my-state", (ReduceFunction & Serializable)(a, b) -> a + b, Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        Long expectedResult = 165L;
        AbstractKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            InternalReducingState state = (InternalReducingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)33L);
            state.add((Object)55L);
            state.setCurrentNamespace((Object)namespace2);
            state.add((Object)22L);
            state.add((Object)11L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)44L);
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)44L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)22L);
            state.add((Object)55L);
            state.add((Object)33L);
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"abc");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"def");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"ghi");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"jkl");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"mno");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"ghi");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assert.assertThat((String)"State backend is not empty.", (Object)keyedBackend.numStateEntries(), (Matcher)Is.is((Object)0));
        }
        finally {
            keyedBackend.close();
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAggregatingStateAddAndGetWithMutableAccumulator() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new MutableAggregatingAddingFunction(), MutableLong.class);
        AbstractKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            AggregatingState state = (AggregatingState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertNull((Object)state.get());
            state.add((Object)17L);
            state.add((Object)11L);
            Assert.assertEquals((long)28L, (long)((Long)state.get()));
            keyedBackend.setCurrentKey((Object)"abc");
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"g");
            Assert.assertNull((Object)state.get());
            state.add((Object)1L);
            state.add((Object)2L);
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertEquals((long)28L, (long)((Long)state.get()));
            state.clear();
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"g");
            state.add((Object)3L);
            state.add((Object)2L);
            state.add((Object)1L);
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"g");
            Assert.assertEquals((long)9L, (long)((Long)state.get()));
            state.clear();
            Assert.assertThat((String)"State backend is not empty.", (Object)keyedBackend.numStateEntries(), (Matcher)Is.is((Object)0));
        }
        finally {
            keyedBackend.close();
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAggregatingStateMergingWithMutableAccumulator() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new MutableAggregatingAddingFunction(), MutableLong.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        Long expectedResult = 165L;
        AbstractKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            InternalAggregatingState state = (InternalAggregatingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)33L);
            state.add((Object)55L);
            state.setCurrentNamespace((Object)namespace2);
            state.add((Object)22L);
            state.add((Object)11L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)44L);
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)44L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)22L);
            state.add((Object)55L);
            state.add((Object)33L);
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"abc");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"def");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"ghi");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"jkl");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"mno");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"ghi");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assert.assertThat((String)"State backend is not empty.", (Object)keyedBackend.numStateEntries(), (Matcher)Is.is((Object)0));
        }
        finally {
            keyedBackend.close();
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAggregatingStateAddAndGetWithImmutableAccumulator() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new ImmutableAggregatingAddingFunction(), Long.class);
        AbstractKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            AggregatingState state = (AggregatingState)keyedBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertNull((Object)state.get());
            state.add((Object)17L);
            state.add((Object)11L);
            Assert.assertEquals((long)28L, (long)((Long)state.get()));
            keyedBackend.setCurrentKey((Object)"abc");
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"g");
            Assert.assertNull((Object)state.get());
            state.add((Object)1L);
            state.add((Object)2L);
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertEquals((long)28L, (long)((Long)state.get()));
            state.clear();
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"g");
            state.add((Object)3L);
            state.add((Object)2L);
            state.add((Object)1L);
            keyedBackend.setCurrentKey((Object)"def");
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"g");
            Assert.assertEquals((long)9L, (long)((Long)state.get()));
            state.clear();
            Assert.assertThat((String)"State backend is not empty.", (Object)keyedBackend.numStateEntries(), (Matcher)Is.is((Object)0));
        }
        finally {
            keyedBackend.close();
            keyedBackend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAggregatingStateMergingWithImmutableAccumulator() throws Exception {
        AggregatingStateDescriptor stateDescr = new AggregatingStateDescriptor("my-state", (AggregateFunction)new ImmutableAggregatingAddingFunction(), Long.class);
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        Long expectedResult = 165L;
        AbstractKeyedStateBackend keyedBackend = this.createKeyedBackend((TypeSerializer)StringSerializer.INSTANCE);
        try {
            InternalAggregatingState state = (InternalAggregatingState)keyedBackend.getPartitionedState((Object)0, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)stateDescr);
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)33L);
            state.add((Object)55L);
            state.setCurrentNamespace((Object)namespace2);
            state.add((Object)22L);
            state.add((Object)11L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)44L);
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)44L);
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)22L);
            state.add((Object)55L);
            state.add((Object)33L);
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace3);
            state.add((Object)11L);
            state.add((Object)22L);
            state.add((Object)33L);
            state.add((Object)44L);
            state.add((Object)55L);
            keyedBackend.setCurrentKey((Object)"abc");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"def");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"ghi");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertNull((Object)state.get());
            keyedBackend.setCurrentKey((Object)"jkl");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"mno");
            state.mergeNamespaces((Object)namespace1, Arrays.asList(namespace2, namespace3));
            state.setCurrentNamespace((Object)namespace1);
            Assert.assertEquals((Object)expectedResult, (Object)state.get());
            keyedBackend.setCurrentKey((Object)"abc");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"def");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"ghi");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"jkl");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            keyedBackend.setCurrentKey((Object)"mno");
            state.setCurrentNamespace((Object)namespace1);
            state.clear();
            Assert.assertThat((String)"State backend is not empty.", (Object)keyedBackend.numStateEntries(), (Matcher)Is.is((Object)0));
        }
        finally {
            keyedBackend.close();
            keyedBackend.dispose();
        }
    }

    @Test
    public void testFoldingState() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        FoldingStateDescriptor kvId = new FoldingStateDescriptor("id", (Object)"Fold-Initial:", (FoldFunction)new AppendingFold(), String.class);
        IntSerializer keySerializer = IntSerializer.INSTANCE;
        VoidNamespaceSerializer namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
        FoldingState state = (FoldingState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        InternalKvState kvState = (InternalKvState)state;
        TypeSerializer valueSerializer = kvId.getSerializer();
        backend.setCurrentKey((Object)1);
        Assert.assertEquals(null, (Object)state.get());
        Assert.assertEquals(null, StateBackendTestBase.getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        state.add((Object)1);
        backend.setCurrentKey((Object)2);
        Assert.assertEquals(null, (Object)state.get());
        Assert.assertEquals(null, StateBackendTestBase.getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        state.add((Object)2);
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"Fold-Initial:,1", (Object)state.get());
        Assert.assertEquals((Object)"Fold-Initial:,1", StateBackendTestBase.getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        KeyedStateHandle snapshot1 = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpoint()));
        backend.setCurrentKey((Object)1);
        state.clear();
        state.add((Object)101);
        backend.setCurrentKey((Object)2);
        state.add((Object)102);
        backend.setCurrentKey((Object)3);
        state.add((Object)103);
        KeyedStateHandle snapshot2 = this.runSnapshot(backend.snapshot(682375462379L, 4L, streamFactory, CheckpointOptions.forCheckpoint()));
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"Fold-Initial:,101", (Object)state.get());
        Assert.assertEquals((Object)"Fold-Initial:,101", StateBackendTestBase.getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.setCurrentKey((Object)2);
        Assert.assertEquals((Object)"Fold-Initial:,2,102", (Object)state.get());
        Assert.assertEquals((Object)"Fold-Initial:,2,102", StateBackendTestBase.getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.setCurrentKey((Object)3);
        Assert.assertEquals((Object)"Fold-Initial:,103", (Object)state.get());
        Assert.assertEquals((Object)"Fold-Initial:,103", StateBackendTestBase.getSerializedValue(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.dispose();
        backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot1);
        snapshot1.discardState();
        FoldingState restored1 = (FoldingState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        InternalKvState restoredKvState1 = (InternalKvState)restored1;
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"Fold-Initial:,1", (Object)restored1.get());
        Assert.assertEquals((Object)"Fold-Initial:,1", StateBackendTestBase.getSerializedValue(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.setCurrentKey((Object)2);
        Assert.assertEquals((Object)"Fold-Initial:,2", (Object)restored1.get());
        Assert.assertEquals((Object)"Fold-Initial:,2", StateBackendTestBase.getSerializedValue(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.dispose();
        backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot2);
        snapshot1.discardState();
        FoldingState restored2 = (FoldingState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        InternalKvState restoredKvState2 = (InternalKvState)restored2;
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"Fold-Initial:,101", (Object)restored2.get());
        Assert.assertEquals((Object)"Fold-Initial:,101", StateBackendTestBase.getSerializedValue(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.setCurrentKey((Object)2);
        Assert.assertEquals((Object)"Fold-Initial:,2,102", (Object)restored2.get());
        Assert.assertEquals((Object)"Fold-Initial:,2,102", StateBackendTestBase.getSerializedValue(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.setCurrentKey((Object)3);
        Assert.assertEquals((Object)"Fold-Initial:,103", (Object)restored2.get());
        Assert.assertEquals((Object)"Fold-Initial:,103", StateBackendTestBase.getSerializedValue(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
        backend.dispose();
    }

    @Test
    public void testMapState() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        MapStateDescriptor kvId = new MapStateDescriptor("id", Integer.class, String.class);
        IntSerializer keySerializer = IntSerializer.INSTANCE;
        VoidNamespaceSerializer namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
        MapState state = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        InternalKvState kvState = (InternalKvState)state;
        TypeSerializer userKeySerializer = kvId.getKeySerializer();
        TypeSerializer userValueSerializer = kvId.getValueSerializer();
        backend.setCurrentKey((Object)1);
        Assert.assertEquals(null, (Object)state.get((Object)1));
        Assert.assertEquals(null, StateBackendTestBase.getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
        state.put((Object)1, (Object)"1");
        backend.setCurrentKey((Object)2);
        Assert.assertEquals(null, (Object)state.get((Object)2));
        Assert.assertEquals(null, StateBackendTestBase.getSerializedMap(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
        state.put((Object)2, (Object)"2");
        backend.setCurrentKey((Object)1);
        Assert.assertTrue((boolean)state.contains((Object)1));
        Assert.assertEquals((Object)"1", (Object)state.get((Object)1));
        Assert.assertEquals((Object)new HashMap<Integer, String>(){
            {
                this.put(1, "1");
            }
        }, StateBackendTestBase.getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
        KeyedStateHandle snapshot1 = this.runSnapshot(backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpoint()));
        backend.setCurrentKey((Object)1);
        state.put((Object)1, (Object)"101");
        backend.setCurrentKey((Object)2);
        state.put((Object)102, (Object)"102");
        backend.setCurrentKey((Object)3);
        state.put((Object)103, (Object)"103");
        state.putAll((Map)new HashMap<Integer, String>(){
            {
                this.put(1031, "1031");
                this.put(1032, "1032");
            }
        });
        KeyedStateHandle snapshot2 = this.runSnapshot(backend.snapshot(682375462379L, 4L, streamFactory, CheckpointOptions.forCheckpoint()));
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"101", (Object)state.get((Object)1));
        Assert.assertEquals((Object)new HashMap<Integer, String>(){
            {
                this.put(1, "101");
            }
        }, StateBackendTestBase.getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
        backend.setCurrentKey((Object)2);
        Assert.assertEquals((Object)"102", (Object)state.get((Object)102));
        Assert.assertEquals((Object)new HashMap<Integer, String>(){
            {
                this.put(2, "2");
                this.put(102, "102");
            }
        }, StateBackendTestBase.getSerializedMap(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
        backend.setCurrentKey((Object)3);
        Assert.assertTrue((boolean)state.contains((Object)103));
        Assert.assertEquals((Object)"103", (Object)state.get((Object)103));
        Assert.assertEquals((Object)new HashMap<Integer, String>(){
            {
                this.put(103, "103");
                this.put(1031, "1031");
                this.put(1032, "1032");
            }
        }, StateBackendTestBase.getSerializedMap(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
        ArrayList<Integer> keys = new ArrayList<Integer>();
        for (Integer key : state.keys()) {
            keys.add(key);
        }
        ArrayList<Integer> expectedKeys = new ArrayList<Integer>(){
            {
                this.add(103);
                this.add(1031);
                this.add(1032);
            }
        };
        Assert.assertEquals((long)keys.size(), (long)expectedKeys.size());
        keys.removeAll(expectedKeys);
        Assert.assertTrue((boolean)keys.isEmpty());
        ArrayList<String> values = new ArrayList<String>();
        for (String value : state.values()) {
            values.add(value);
        }
        ArrayList<String> expectedValues = new ArrayList<String>(){
            {
                this.add("103");
                this.add("1031");
                this.add("1032");
            }
        };
        Assert.assertEquals((long)values.size(), (long)expectedValues.size());
        values.removeAll(expectedValues);
        Assert.assertTrue((boolean)values.isEmpty());
        backend.setCurrentKey((Object)1);
        state.clear();
        backend.setCurrentKey((Object)2);
        state.remove((Object)102);
        backend.setCurrentKey((Object)3);
        String updateSuffix = "_updated";
        Iterator iterator = state.iterator();
        while (iterator.hasNext()) {
            Map.Entry entry = (Map.Entry)iterator.next();
            if (((String)entry.getValue()).length() != 4) {
                iterator.remove();
                continue;
            }
            entry.setValue((String)entry.getValue() + "_updated");
        }
        backend.setCurrentKey((Object)1);
        backend.setCurrentKey((Object)2);
        Assert.assertFalse((boolean)state.contains((Object)102));
        backend.setCurrentKey((Object)3);
        for (Map.Entry entry : state.entries()) {
            Assert.assertEquals((long)(4 + "_updated".length()), (long)((String)entry.getValue()).length());
            Assert.assertTrue((boolean)((String)entry.getValue()).endsWith("_updated"));
        }
        backend.dispose();
        backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot1);
        snapshot1.discardState();
        MapState restored1 = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        InternalKvState restoredKvState1 = (InternalKvState)restored1;
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"1", (Object)restored1.get((Object)1));
        Assert.assertEquals((Object)new HashMap<Integer, String>(){
            {
                this.put(1, "1");
            }
        }, StateBackendTestBase.getSerializedMap(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
        backend.setCurrentKey((Object)2);
        Assert.assertEquals((Object)"2", (Object)restored1.get((Object)2));
        Assert.assertEquals((Object)new HashMap<Integer, String>(){
            {
                this.put(2, "2");
            }
        }, StateBackendTestBase.getSerializedMap(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
        backend.dispose();
        backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot2);
        snapshot2.discardState();
        MapState restored2 = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        InternalKvState restoredKvState2 = (InternalKvState)restored2;
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"101", (Object)restored2.get((Object)1));
        Assert.assertEquals((Object)new HashMap<Integer, String>(){
            {
                this.put(1, "101");
            }
        }, StateBackendTestBase.getSerializedMap(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
        backend.setCurrentKey((Object)2);
        Assert.assertEquals((Object)"102", (Object)restored2.get((Object)102));
        Assert.assertEquals((Object)new HashMap<Integer, String>(){
            {
                this.put(2, "2");
                this.put(102, "102");
            }
        }, StateBackendTestBase.getSerializedMap(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
        backend.setCurrentKey((Object)3);
        Assert.assertEquals((Object)"103", (Object)restored2.get((Object)103));
        Assert.assertEquals((Object)new HashMap<Integer, String>(){
            {
                this.put(103, "103");
                this.put(1031, "1031");
                this.put(1032, "1032");
            }
        }, StateBackendTestBase.getSerializedMap(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
        backend.dispose();
    }

    @Test
    public void testValueStateNullAsDefaultValue() throws Exception {
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class, null);
        ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.setCurrentKey((Object)1);
        Assert.assertEquals(null, (Object)state.value());
        state.update((Object)"Ciao");
        Assert.assertEquals((Object)"Ciao", (Object)state.value());
        state.clear();
        Assert.assertEquals(null, (Object)state.value());
        backend.dispose();
    }

    @Test
    public void testValueStateDefaultValue() throws Exception {
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class, (Object)"Hello");
        ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.setCurrentKey((Object)1);
        Assert.assertEquals((Object)"Hello", (Object)state.value());
        state.update((Object)"Ciao");
        Assert.assertEquals((Object)"Ciao", (Object)state.value());
        state.clear();
        Assert.assertEquals((Object)"Hello", (Object)state.value());
        backend.dispose();
    }

    @Test
    public void testReducingStateDefaultValue() throws Exception {
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        ReducingStateDescriptor kvId = new ReducingStateDescriptor("id", (ReduceFunction)new AppendingReduce(), String.class);
        ReducingState state = (ReducingState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.setCurrentKey((Object)1);
        Assert.assertNull((Object)state.get());
        state.add((Object)"Ciao");
        Assert.assertEquals((Object)"Ciao", (Object)state.get());
        state.clear();
        Assert.assertNull((Object)state.get());
        backend.dispose();
    }

    @Test
    public void testFoldingStateDefaultValue() throws Exception {
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        FoldingStateDescriptor kvId = new FoldingStateDescriptor("id", (Object)"Fold-Initial:", (FoldFunction)new AppendingFold(), String.class);
        FoldingState state = (FoldingState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.setCurrentKey((Object)1);
        Assert.assertNull((Object)state.get());
        state.add((Object)1);
        state.add((Object)2);
        Assert.assertEquals((Object)"Fold-Initial:,1,2", (Object)state.get());
        state.clear();
        Assert.assertNull((Object)state.get());
        backend.dispose();
    }

    @Test
    public void testListStateDefaultValue() throws Exception {
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        ListStateDescriptor kvId = new ListStateDescriptor("id", String.class);
        ListState state = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.setCurrentKey((Object)1);
        Assert.assertNull((Object)state.get());
        state.add((Object)"Ciao");
        state.add((Object)"Bello");
        Assert.assertThat((Object)state.get(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new String[]{"Ciao", "Bello"}));
        state.clear();
        Assert.assertNull((Object)state.get());
        backend.dispose();
    }

    @Test
    public void testMapStateDefaultValue() throws Exception {
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        MapStateDescriptor kvId = new MapStateDescriptor("id", String.class, String.class);
        MapState state = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.setCurrentKey((Object)1);
        Assert.assertNull((Object)state.entries());
        state.put((Object)"Ciao", (Object)"Hello");
        state.put((Object)"Bello", (Object)"Nice");
        Assert.assertNotNull((Object)state.entries());
        Assert.assertEquals((Object)state.get((Object)"Ciao"), (Object)"Hello");
        Assert.assertEquals((Object)state.get((Object)"Bello"), (Object)"Nice");
        state.clear();
        Assert.assertNull((Object)state.entries());
        backend.dispose();
    }

    @Test
    public void testKeyGroupSnapshotRestore() throws Exception {
        int MAX_PARALLELISM = 10;
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, 10, new KeyGroupRange(0, 9), new DummyEnvironment("test", 1, 0));
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class);
        ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        int keyInFirstHalf = 17;
        int keyInSecondHalf = 42;
        Random rand = new Random(0L);
        int firstKeyHalf = KeyGroupRangeAssignment.assignKeyToParallelOperator((Object)keyInFirstHalf, (int)10, (int)2);
        int secondKeyHalf = KeyGroupRangeAssignment.assignKeyToParallelOperator((Object)keyInFirstHalf, (int)10, (int)2);
        while (firstKeyHalf == secondKeyHalf) {
            keyInSecondHalf = rand.nextInt();
            secondKeyHalf = KeyGroupRangeAssignment.assignKeyToParallelOperator((Object)keyInSecondHalf, (int)10, (int)2);
        }
        backend.setCurrentKey((Object)keyInFirstHalf);
        state.update((Object)"ShouldBeInFirstHalf");
        backend.setCurrentKey((Object)keyInSecondHalf);
        state.update((Object)"ShouldBeInSecondHalf");
        KeyedStateHandle snapshot = (KeyedStateHandle)FutureUtil.runIfNotDoneAndGet((RunnableFuture)backend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forCheckpoint()));
        List firstHalfKeyGroupStates = StateAssignmentOperation.getKeyedStateHandles(Collections.singletonList(snapshot), (KeyGroupRange)KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex((int)10, (int)2, (int)0));
        List secondHalfKeyGroupStates = StateAssignmentOperation.getKeyedStateHandles(Collections.singletonList(snapshot), (KeyGroupRange)KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex((int)10, (int)2, (int)1));
        backend.dispose();
        AbstractKeyedStateBackend firstHalfBackend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, 10, new KeyGroupRange(0, 4), firstHalfKeyGroupStates, new DummyEnvironment("test", 1, 0));
        AbstractKeyedStateBackend secondHalfBackend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, 10, new KeyGroupRange(5, 9), secondHalfKeyGroupStates, new DummyEnvironment("test", 1, 0));
        ValueState firstHalfState = (ValueState)firstHalfBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        firstHalfBackend.setCurrentKey((Object)keyInFirstHalf);
        Assert.assertTrue((boolean)((String)firstHalfState.value()).equals("ShouldBeInFirstHalf"));
        firstHalfBackend.setCurrentKey((Object)keyInSecondHalf);
        Assert.assertTrue((firstHalfState.value() == null ? 1 : 0) != 0);
        ValueState secondHalfState = (ValueState)secondHalfBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        secondHalfBackend.setCurrentKey((Object)keyInFirstHalf);
        Assert.assertTrue((secondHalfState.value() == null ? 1 : 0) != 0);
        secondHalfBackend.setCurrentKey((Object)keyInSecondHalf);
        Assert.assertTrue((boolean)((String)secondHalfState.value()).equals("ShouldBeInSecondHalf"));
        firstHalfBackend.dispose();
        secondHalfBackend.dispose();
    }

    @Test
    public void testRestoreWithWrongKeySerializer() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class);
        ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.setCurrentKey((Object)1);
        state.update((Object)"1");
        backend.setCurrentKey((Object)2);
        state.update((Object)"2");
        KeyedStateHandle snapshot1 = (KeyedStateHandle)FutureUtil.runIfNotDoneAndGet((RunnableFuture)backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpoint()));
        backend.dispose();
        try {
            this.restoreKeyedBackend((TypeSerializer)DoubleSerializer.INSTANCE, snapshot1);
            Assert.fail((String)"should recognize wrong key serializer");
        }
        catch (StateMigrationException stateMigrationException) {
            // empty catch block
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testValueStateRestoreWithWrongSerializers() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class);
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.update((Object)"1");
            backend.setCurrentKey((Object)2);
            state.update((Object)"2");
            KeyedStateHandle snapshot1 = (KeyedStateHandle)FutureUtil.runIfNotDoneAndGet((RunnableFuture)backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpoint()));
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot1);
            snapshot1.discardState();
            FloatSerializer fakeStringSerializer = FloatSerializer.INSTANCE;
            try {
                kvId = new ValueStateDescriptor("id", (TypeSerializer)fakeStringSerializer);
                state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
                state.value();
                Assert.fail((String)"should recognize wrong serializers");
            }
            catch (StateMigrationException stateMigrationException) {
                // empty catch block
            }
        }
        finally {
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testListStateRestoreWithWrongSerializers() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ListStateDescriptor kvId = new ListStateDescriptor("id", String.class);
            ListState state = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.add((Object)"1");
            backend.setCurrentKey((Object)2);
            state.add((Object)"2");
            KeyedStateHandle snapshot1 = (KeyedStateHandle)FutureUtil.runIfNotDoneAndGet((RunnableFuture)backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpoint()));
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot1);
            snapshot1.discardState();
            FloatSerializer fakeStringSerializer = FloatSerializer.INSTANCE;
            try {
                kvId = new ListStateDescriptor("id", (TypeSerializer)fakeStringSerializer);
                state = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
                state.get();
                Assert.fail((String)"should recognize wrong serializers");
            }
            catch (StateMigrationException stateMigrationException) {
                // empty catch block
            }
        }
        finally {
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReducingStateRestoreWithWrongSerializers() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ReducingStateDescriptor kvId = new ReducingStateDescriptor("id", (ReduceFunction)new AppendingReduce(), (TypeSerializer)StringSerializer.INSTANCE);
            ReducingState state = (ReducingState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.add((Object)"1");
            backend.setCurrentKey((Object)2);
            state.add((Object)"2");
            KeyedStateHandle snapshot1 = (KeyedStateHandle)FutureUtil.runIfNotDoneAndGet((RunnableFuture)backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpoint()));
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot1);
            snapshot1.discardState();
            FloatSerializer fakeStringSerializer = FloatSerializer.INSTANCE;
            try {
                kvId = new ReducingStateDescriptor("id", (ReduceFunction)new AppendingReduce(), (TypeSerializer)fakeStringSerializer);
                state = (ReducingState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
                state.get();
                Assert.fail((String)"should recognize wrong serializers");
            }
            catch (StateMigrationException stateMigrationException) {
                // empty catch block
            }
        }
        finally {
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMapStateRestoreWithWrongSerializers() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            MapStateDescriptor kvId = new MapStateDescriptor("id", (TypeSerializer)StringSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE);
            MapState state = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.put((Object)"1", (Object)"First");
            backend.setCurrentKey((Object)2);
            state.put((Object)"2", (Object)"Second");
            KeyedStateHandle snapshot1 = (KeyedStateHandle)FutureUtil.runIfNotDoneAndGet((RunnableFuture)backend.snapshot(682375462378L, 2L, streamFactory, CheckpointOptions.forCheckpoint()));
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot1);
            snapshot1.discardState();
            FloatSerializer fakeStringSerializer = FloatSerializer.INSTANCE;
            try {
                kvId = new MapStateDescriptor("id", (TypeSerializer)fakeStringSerializer, (TypeSerializer)StringSerializer.INSTANCE);
                state = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
                state.entries();
                Assert.fail((String)"should recognize wrong serializers");
            }
            catch (StateMigrationException stateMigrationException) {
                // empty catch block
            }
            backend.dispose();
        }
        finally {
            backend.dispose();
        }
    }

    @Test
    public void testCopyDefaultValue() throws Exception {
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", IntValue.class, (Object)new IntValue(-1));
        ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.setCurrentKey((Object)1);
        IntValue default1 = (IntValue)state.value();
        backend.setCurrentKey((Object)2);
        IntValue default2 = (IntValue)state.value();
        Assert.assertNotNull((Object)default1);
        Assert.assertNotNull((Object)default2);
        Assert.assertEquals((Object)default1, (Object)default2);
        Assert.assertFalse((default1 == default2 ? 1 : 0) != 0);
        backend.dispose();
    }

    @Test
    public void testRequireNonNullNamespace() throws Exception {
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", IntValue.class, (Object)new IntValue(-1));
        try {
            backend.getPartitionedState(null, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            Assert.fail((String)"Did not throw expected NullPointerException");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
        try {
            backend.getPartitionedState((Object)VoidNamespace.INSTANCE, null, (StateDescriptor)kvId);
            Assert.fail((String)"Did not throw expected NullPointerException");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
        try {
            backend.getPartitionedState(null, null, (StateDescriptor)kvId);
            Assert.fail((String)"Did not throw expected NullPointerException");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
        backend.dispose();
    }

    protected void testConcurrentMapIfQueryable() throws Exception {
        boolean numberOfKeyGroups = true;
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), new DummyEnvironment("test_op", 1, 0));
        ValueStateDescriptor desc = new ValueStateDescriptor("value-state", Integer.class, (Object)-1);
        desc.setQueryable("my-query");
        ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
        InternalKvState kvState = (InternalKvState)state;
        Assert.assertTrue((boolean)(kvState instanceof AbstractHeapState));
        kvState.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
        backend.setCurrentKey((Object)1);
        state.update((Object)121818273);
        StateTable stateTable = ((AbstractHeapState)kvState).getStateTable();
        this.checkConcurrentStateTable(stateTable, 1);
        desc = new ListStateDescriptor("list-state", Integer.class);
        desc.setQueryable("my-query");
        state = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
        kvState = (InternalKvState)state;
        Assert.assertTrue((boolean)(kvState instanceof AbstractHeapState));
        kvState.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
        backend.setCurrentKey((Object)1);
        state.add((Object)121818273);
        stateTable = ((AbstractHeapState)kvState).getStateTable();
        this.checkConcurrentStateTable(stateTable, 1);
        desc = new ReducingStateDescriptor("reducing-state", (ReduceFunction)new ReduceFunction<Integer>(){

            public Integer reduce(Integer value1, Integer value2) throws Exception {
                return value1 + value2;
            }
        }, Integer.class);
        desc.setQueryable("my-query");
        state = (ReducingState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
        kvState = (InternalKvState)state;
        Assert.assertTrue((boolean)(kvState instanceof AbstractHeapState));
        kvState.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
        backend.setCurrentKey((Object)1);
        state.add((Object)121818273);
        stateTable = ((AbstractHeapState)kvState).getStateTable();
        this.checkConcurrentStateTable(stateTable, 1);
        desc = new FoldingStateDescriptor("folding-state", (Object)0, (FoldFunction)new FoldFunction<Integer, Integer>(){

            public Integer fold(Integer accumulator, Integer value) throws Exception {
                return accumulator + value;
            }
        }, Integer.class);
        desc.setQueryable("my-query");
        state = (FoldingState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
        kvState = (InternalKvState)state;
        Assert.assertTrue((boolean)(kvState instanceof AbstractHeapState));
        kvState.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
        backend.setCurrentKey((Object)1);
        state.add((Object)121818273);
        stateTable = ((AbstractHeapState)kvState).getStateTable();
        this.checkConcurrentStateTable(stateTable, 1);
        desc = new MapStateDescriptor("map-state", Integer.class, String.class);
        desc.setQueryable("my-query");
        state = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
        kvState = (InternalKvState)state;
        Assert.assertTrue((boolean)(kvState instanceof AbstractHeapState));
        kvState.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
        backend.setCurrentKey((Object)1);
        state.put((Object)121818273, (Object)"121818273");
        int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup((Object)1, (int)1);
        StateTable stateTable2 = ((AbstractHeapState)kvState).getStateTable();
        Assert.assertNotNull((String)"State not set", (Object)stateTable2.get((Object)keyGroupIndex));
        this.checkConcurrentStateTable(stateTable2, 1);
        backend.dispose();
    }

    private void checkConcurrentStateTable(StateTable<?, ?, ?> stateTable, int numberOfKeyGroups) {
        Assert.assertNotNull((String)"State not set", stateTable);
        if (stateTable instanceof NestedMapsStateTable) {
            int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup((Object)1, (int)numberOfKeyGroups);
            NestedMapsStateTable nestedMapsStateTable = (NestedMapsStateTable)stateTable;
            Assert.assertTrue((boolean)(nestedMapsStateTable.getState()[keyGroupIndex] instanceof ConcurrentHashMap));
            Assert.assertTrue((boolean)(nestedMapsStateTable.getState()[keyGroupIndex].get(VoidNamespace.INSTANCE) instanceof ConcurrentHashMap));
        }
    }

    @Test
    public void testQueryableStateRegistration() throws Exception {
        DummyEnvironment env = new DummyEnvironment("test", 1, 0);
        KvStateRegistry registry = env.getKvStateRegistry();
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, env);
        KeyGroupRange expectedKeyGroupRange = backend.getKeyGroupRange();
        KvStateRegistryListener listener = (KvStateRegistryListener)Mockito.mock(KvStateRegistryListener.class);
        registry.registerListener(listener);
        ValueStateDescriptor desc = new ValueStateDescriptor("test", (TypeSerializer)IntSerializer.INSTANCE);
        desc.setQueryable("banana");
        backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
        ((KvStateRegistryListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.times((int)1))).notifyKvStateRegistered((JobID)Matchers.eq((Object)env.getJobID()), (JobVertexID)Matchers.eq((Object)env.getJobVertexId()), (KeyGroupRange)Matchers.eq((Object)expectedKeyGroupRange), (String)Matchers.eq((Object)"banana"), (KvStateID)Matchers.any(KvStateID.class));
        KeyedStateHandle snapshot = (KeyedStateHandle)FutureUtil.runIfNotDoneAndGet((RunnableFuture)backend.snapshot(682375462379L, 4L, streamFactory, CheckpointOptions.forCheckpoint()));
        backend.dispose();
        ((KvStateRegistryListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.times((int)1))).notifyKvStateUnregistered((JobID)Matchers.eq((Object)env.getJobID()), (JobVertexID)Matchers.eq((Object)env.getJobVertexId()), (KeyGroupRange)Matchers.eq((Object)expectedKeyGroupRange), (String)Matchers.eq((Object)"banana"));
        backend.dispose();
        backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot, env);
        snapshot.discardState();
        backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
        ((KvStateRegistryListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.times((int)2))).notifyKvStateRegistered((JobID)Matchers.eq((Object)env.getJobID()), (JobVertexID)Matchers.eq((Object)env.getJobVertexId()), (KeyGroupRange)Matchers.eq((Object)expectedKeyGroupRange), (String)Matchers.eq((Object)"banana"), (KvStateID)Matchers.any(KvStateID.class));
        backend.dispose();
    }

    @Test
    public void testEmptyStateCheckpointing() {
        try {
            CheckpointStreamFactory streamFactory = this.createStreamFactory();
            AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
            ListStateDescriptor kvId = new ListStateDescriptor("id", String.class);
            KeyedStateHandle snapshot = (KeyedStateHandle)FutureUtil.runIfNotDoneAndGet((RunnableFuture)backend.snapshot(682375462379L, 1L, streamFactory, CheckpointOptions.forCheckpoint()));
            Assert.assertNull((Object)snapshot);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot);
            backend.dispose();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testNumStateEntries() throws Exception {
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class);
        Assert.assertEquals((long)0L, (long)backend.numStateEntries());
        ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        backend.setCurrentKey((Object)0);
        state.update((Object)"hello");
        state.update((Object)"ciao");
        Assert.assertEquals((long)1L, (long)backend.numStateEntries());
        backend.setCurrentKey((Object)42);
        state.update((Object)"foo");
        Assert.assertEquals((long)2L, (long)backend.numStateEntries());
        backend.setCurrentKey((Object)0);
        state.clear();
        Assert.assertEquals((long)1L, (long)backend.numStateEntries());
        backend.setCurrentKey((Object)42);
        state.clear();
        Assert.assertEquals((long)0L, (long)backend.numStateEntries());
        backend.dispose();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testParallelAsyncSnapshots() throws Exception {
        OneShotLatch blocker = new OneShotLatch();
        OneShotLatch waiter = new OneShotLatch();
        BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(0x100000);
        streamFactory.setWaiterLatch(waiter);
        streamFactory.setBlockerLatch(blocker);
        streamFactory.setAfterNumberInvocations(10);
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            if (!backend.supportsAsynchronousSnapshots()) {
                return;
            }
            InternalValueState valueState = backend.createValueState((TypeSerializer)VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor("test", (TypeSerializer)IntSerializer.INSTANCE));
            valueState.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            for (int i = 0; i < 10; ++i) {
                backend.setCurrentKey((Object)i);
                valueState.update((Object)i);
            }
            RunnableFuture snapshot1 = backend.snapshot(0L, 0L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpoint());
            Thread runner1 = new Thread((Runnable)snapshot1, "snapshot-1-runner");
            runner1.start();
            waiter.await();
            for (int i = 5; i < 15; ++i) {
                backend.setCurrentKey((Object)i);
                valueState.update((Object)(i + 1));
            }
            streamFactory.setWaiterLatch(null);
            streamFactory.setBlockerLatch(null);
            RunnableFuture snapshot2 = backend.snapshot(1L, 1L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpoint());
            Thread runner2 = new Thread((Runnable)snapshot2, "snapshot-2-runner");
            runner2.start();
            snapshot2.get();
            blocker.trigger();
            snapshot1.get();
        }
        finally {
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAsyncSnapshot() throws Exception {
        InternalValueState valueState;
        OneShotLatch waiter = new OneShotLatch();
        BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(0x100000);
        streamFactory.setWaiterLatch(waiter);
        AbstractKeyedStateBackend backend = null;
        KeyedStateHandle stateHandle = null;
        try {
            int i;
            backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
            valueState = backend.createValueState((TypeSerializer)VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor("test", (TypeSerializer)IntSerializer.INSTANCE));
            valueState.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            for (int i2 = 0; i2 < 10; ++i2) {
                backend.setCurrentKey((Object)i2);
                valueState.update((Object)i2);
            }
            RunnableFuture snapshot = backend.snapshot(0L, 0L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpoint());
            Thread runner = new Thread(snapshot);
            runner.start();
            for (i = 0; i < 20; ++i) {
                backend.setCurrentKey((Object)i);
                valueState.update((Object)(i + 1));
                if (10 != i) continue;
                waiter.await();
            }
            runner.join();
            stateHandle = (KeyedStateHandle)snapshot.get();
            for (i = 0; i < 20; ++i) {
                backend.setCurrentKey((Object)i);
                Assert.assertEquals((long)(i + 1), (long)((Integer)valueState.value()).intValue());
            }
        }
        finally {
            if (null != backend) {
                IOUtils.closeQuietly(backend);
                backend.dispose();
            }
        }
        Assert.assertNotNull((Object)stateHandle);
        backend = null;
        try {
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, stateHandle);
            valueState = backend.createValueState((TypeSerializer)VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor("test", (TypeSerializer)IntSerializer.INSTANCE));
            valueState.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            for (int i = 0; i < 10; ++i) {
                backend.setCurrentKey((Object)i);
                Assert.assertEquals((long)i, (long)((Integer)valueState.value()).intValue());
            }
            backend.setCurrentKey((Object)11);
            Assert.assertEquals(null, (Object)valueState.value());
        }
        finally {
            if (null != backend) {
                IOUtils.closeQuietly(backend);
                backend.dispose();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAsyncSnapshotCancellation() throws Exception {
        OneShotLatch blocker = new OneShotLatch();
        OneShotLatch waiter = new OneShotLatch();
        BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(0x100000);
        streamFactory.setWaiterLatch(waiter);
        streamFactory.setBlockerLatch(blocker);
        streamFactory.setAfterNumberInvocations(10);
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            if (!backend.supportsAsynchronousSnapshots()) {
                return;
            }
            InternalValueState valueState = backend.createValueState((TypeSerializer)VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor("test", (TypeSerializer)IntSerializer.INSTANCE));
            valueState.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
            for (int i = 0; i < 10; ++i) {
                backend.setCurrentKey((Object)i);
                valueState.update((Object)i);
            }
            RunnableFuture snapshot = backend.snapshot(0L, 0L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpoint());
            Thread runner = new Thread(snapshot);
            runner.start();
            waiter.await();
            IOUtils.closeQuietly(backend);
            blocker.trigger();
            runner.join();
            try {
                snapshot.get();
                Assert.fail((String)"Close was not propagated.");
            }
            catch (ExecutionException executionException) {
                // empty catch block
            }
        }
        finally {
            backend.dispose();
        }
    }

    protected static <V, K, N> V getSerializedValue(InternalKvState<N> kvState, K key, TypeSerializer<K> keySerializer, N namespace, TypeSerializer<N> namespaceSerializer, TypeSerializer<V> valueSerializer) throws Exception {
        byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(key, keySerializer, namespace, namespaceSerializer);
        byte[] serializedValue = kvState.getSerializedValue(serializedKeyAndNamespace);
        if (serializedValue == null) {
            return null;
        }
        return (V)KvStateSerializer.deserializeValue((byte[])serializedValue, valueSerializer);
    }

    private static <V, K, N> List<V> getSerializedList(InternalKvState<N> kvState, K key, TypeSerializer<K> keySerializer, N namespace, TypeSerializer<N> namespaceSerializer, TypeSerializer<V> valueSerializer) throws Exception {
        byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(key, keySerializer, namespace, namespaceSerializer);
        byte[] serializedValue = kvState.getSerializedValue(serializedKeyAndNamespace);
        if (serializedValue == null) {
            return null;
        }
        return KvStateSerializer.deserializeList((byte[])serializedValue, valueSerializer);
    }

    private static <UK, UV, K, N> Map<UK, UV> getSerializedMap(InternalKvState<N> kvState, K key, TypeSerializer<K> keySerializer, N namespace, TypeSerializer<N> namespaceSerializer, TypeSerializer<UK> userKeySerializer, TypeSerializer<UV> userValueSerializer) throws Exception {
        byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(key, keySerializer, namespace, namespaceSerializer);
        byte[] serializedValue = kvState.getSerializedValue(serializedKeyAndNamespace);
        if (serializedValue == null) {
            return null;
        }
        return KvStateSerializer.deserializeMap((byte[])serializedValue, userKeySerializer, userValueSerializer);
    }

    protected KeyedStateHandle runSnapshot(RunnableFuture<KeyedStateHandle> snapshotRunnableFuture) throws Exception {
        if (!snapshotRunnableFuture.isDone()) {
            Thread runner = new Thread(snapshotRunnableFuture);
            runner.start();
        }
        return (KeyedStateHandle)snapshotRunnableFuture.get();
    }

    private static final class MutableLong {
        long value;

        private MutableLong() {
        }
    }

    private static class ImmutableAggregatingAddingFunction
    implements AggregateFunction<Long, Long, Long> {
        private ImmutableAggregatingAddingFunction() {
        }

        public Long createAccumulator() {
            return 0L;
        }

        public Long add(Long value, Long accumulator) {
            accumulator = accumulator + value;
            return accumulator;
        }

        public Long getResult(Long accumulator) {
            return accumulator;
        }

        public Long merge(Long a, Long b) {
            return a + b;
        }
    }

    private static class MutableAggregatingAddingFunction
    implements AggregateFunction<Long, MutableLong, Long> {
        private MutableAggregatingAddingFunction() {
        }

        public MutableLong createAccumulator() {
            return new MutableLong();
        }

        public MutableLong add(Long value, MutableLong accumulator) {
            accumulator.value += value.longValue();
            return accumulator;
        }

        public Long getResult(MutableLong accumulator) {
            return accumulator.value;
        }

        public MutableLong merge(MutableLong a, MutableLong b) {
            a.value += b.value;
            return a;
        }
    }

    public static class CustomKryoTestSerializer
    extends JavaSerializer {
        public void write(Kryo kryo, Output output, Object object) {
            super.write(kryo, output, object);
        }

        public Object read(Kryo kryo, Input input, Class type) {
            throw new ExpectedKryoTestException();
        }
    }

    public static class ExceptionThrowingTestSerializer
    extends JavaSerializer {
        public void write(Kryo kryo, Output output, Object object) {
            throw new ExpectedKryoTestException();
        }

        public Object read(Kryo kryo, Input input, Class type) {
            throw new ExpectedKryoTestException();
        }
    }

    private static class ExpectedKryoTestException
    extends RuntimeException {
        private ExpectedKryoTestException() {
        }
    }

    public static class TestNestedPojoClassB
    implements Serializable {
        private Double doubleField;
        private String strField;

        public TestNestedPojoClassB() {
        }

        public TestNestedPojoClassB(Double doubleField, String strField) {
            this.doubleField = doubleField;
            this.strField = strField;
        }

        public Double getDoubleField() {
            return this.doubleField;
        }

        public void setDoubleField(Double doubleField) {
            this.doubleField = doubleField;
        }

        public String getStrField() {
            return this.strField;
        }

        public void setStrField(String strField) {
            this.strField = strField;
        }

        public String toString() {
            return "TestNestedPojoClassB{doubleField='" + this.doubleField + '\'' + ", strField=" + this.strField + '}';
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            TestNestedPojoClassB testNestedPojoClassB = (TestNestedPojoClassB)o;
            if (!this.doubleField.equals(testNestedPojoClassB.doubleField)) {
                return false;
            }
            return this.strField.equals(testNestedPojoClassB.strField);
        }

        public int hashCode() {
            int result = this.doubleField.hashCode();
            result = 31 * result + this.strField.hashCode();
            return result;
        }
    }

    public static class TestNestedPojoClassA
    implements Serializable {
        private Double doubleField;
        private Integer intField;

        public TestNestedPojoClassA() {
        }

        public TestNestedPojoClassA(Double doubleField, Integer intField) {
            this.doubleField = doubleField;
            this.intField = intField;
        }

        public Double getDoubleField() {
            return this.doubleField;
        }

        public void setDoubleField(Double doubleField) {
            this.doubleField = doubleField;
        }

        public Integer getIntField() {
            return this.intField;
        }

        public void setIntField(Integer intField) {
            this.intField = intField;
        }

        public String toString() {
            return "TestNestedPojoClassA{doubleField='" + this.doubleField + '\'' + ", intField=" + this.intField + '}';
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            TestNestedPojoClassA testNestedPojoClassA = (TestNestedPojoClassA)o;
            if (!this.doubleField.equals(testNestedPojoClassA.doubleField)) {
                return false;
            }
            return this.intField.equals(testNestedPojoClassA.intField);
        }

        public int hashCode() {
            int result = this.doubleField.hashCode();
            result = 31 * result + this.intField.hashCode();
            return result;
        }
    }

    public static class TestPojo
    implements Serializable {
        private String strField;
        private Integer intField;
        private TestNestedPojoClassA kryoClassAField;
        private TestNestedPojoClassB kryoClassBField;

        public TestPojo() {
        }

        public TestPojo(String strField, Integer intField) {
            this.strField = strField;
            this.intField = intField;
            this.kryoClassAField = null;
            this.kryoClassBField = null;
        }

        public TestPojo(String strField, Integer intField, TestNestedPojoClassA classAField, TestNestedPojoClassB classBfield) {
            this.strField = strField;
            this.intField = intField;
            this.kryoClassAField = classAField;
            this.kryoClassBField = classBfield;
        }

        public String getStrField() {
            return this.strField;
        }

        public void setStrField(String strField) {
            this.strField = strField;
        }

        public Integer getIntField() {
            return this.intField;
        }

        public void setIntField(Integer intField) {
            this.intField = intField;
        }

        public TestNestedPojoClassA getKryoClassAField() {
            return this.kryoClassAField;
        }

        public void setKryoClassAField(TestNestedPojoClassA kryoClassAField) {
            this.kryoClassAField = kryoClassAField;
        }

        public TestNestedPojoClassB getKryoClassBField() {
            return this.kryoClassBField;
        }

        public void setKryoClassBField(TestNestedPojoClassB kryoClassBField) {
            this.kryoClassBField = kryoClassBField;
        }

        public String toString() {
            return "TestPojo{strField='" + this.strField + '\'' + ", intField=" + this.intField + '}';
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            TestPojo testPojo = (TestPojo)o;
            return this.strField.equals(testPojo.strField) && this.intField.equals(testPojo.intField) && (this.kryoClassAField == null && testPojo.kryoClassAField == null || this.kryoClassAField.equals(testPojo.kryoClassAField)) && (this.kryoClassBField == null && testPojo.kryoClassBField == null || this.kryoClassBField.equals(testPojo.kryoClassBField));
        }

        public int hashCode() {
            int result = this.strField.hashCode();
            result = 31 * result + this.intField.hashCode();
            if (this.kryoClassAField != null) {
                result = 31 * result + this.kryoClassAField.hashCode();
            }
            if (this.kryoClassBField != null) {
                result = 31 * result + this.kryoClassBField.hashCode();
            }
            return result;
        }
    }

    private static class AppendingFold
    implements FoldFunction<Integer, String> {
        private static final long serialVersionUID = 1L;

        private AppendingFold() {
        }

        public String fold(String acc, Integer value) throws Exception {
            return acc + "," + value;
        }
    }

    private static class AppendingReduce
    implements ReduceFunction<String> {
        private AppendingReduce() {
        }

        public String reduce(String value1, String value2) throws Exception {
            return value1 + "," + value2;
        }
    }
}

