/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.RunnableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.testutils.statemigration.TestType;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.StateMigrationException;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public abstract class StateBackendMigrationTestBase<B extends AbstractStateBackend>
extends TestLogger {
    @Rule
    public final TemporaryFolder tempFolder = new TemporaryFolder();
    private CheckpointStorageLocation checkpointStorageLocation;

    protected abstract B getStateBackend() throws Exception;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testKeyedValueStateMigration() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        String stateName = "test-name";
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ValueStateDescriptor kvId = new ValueStateDescriptor("test-name", (TypeSerializer)new TestType.V1TestTypeSerializer());
            ValueState valueState = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)CustomVoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            valueState.update((Object)new TestType("foo", 1456));
            backend.setCurrentKey((Object)2);
            valueState.update((Object)new TestType("bar", 478));
            backend.setCurrentKey((Object)3);
            valueState.update((Object)new TestType("hello", 189));
            KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot);
            kvId = new ValueStateDescriptor("test-name", (TypeSerializer)new TestType.V2TestTypeSerializer());
            valueState = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)CustomVoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            snapshot.discardState();
            backend.setCurrentKey((Object)1);
            Assert.assertEquals((Object)new TestType("foo", 1456), (Object)valueState.value());
            valueState.update((Object)new TestType("newValue1", 751));
            backend.setCurrentKey((Object)2);
            Assert.assertEquals((Object)new TestType("bar", 478), (Object)valueState.value());
            valueState.update((Object)new TestType("newValue2", 167));
            backend.setCurrentKey((Object)3);
            Assert.assertEquals((Object)new TestType("hello", 189), (Object)valueState.value());
            valueState.update((Object)new TestType("newValue3", 444));
        }
        finally {
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testKeyedListStateMigration() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        String stateName = "test-name";
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ListStateDescriptor kvId = new ListStateDescriptor("test-name", (TypeSerializer)new TestType.V1TestTypeSerializer());
            ListState listState = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)CustomVoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            listState.add((Object)new TestType("key-1", 1));
            listState.add((Object)new TestType("key-1", 2));
            listState.add((Object)new TestType("key-1", 3));
            backend.setCurrentKey((Object)2);
            listState.add((Object)new TestType("key-2", 1));
            backend.setCurrentKey((Object)3);
            listState.add((Object)new TestType("key-3", 1));
            listState.add((Object)new TestType("key-3", 2));
            KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot);
            kvId = new ListStateDescriptor("test-name", (TypeSerializer)new TestType.V2TestTypeSerializer());
            listState = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)CustomVoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            snapshot.discardState();
            backend.setCurrentKey((Object)1);
            Iterator iterable1 = ((Iterable)listState.get()).iterator();
            Assert.assertEquals((Object)new TestType("key-1", 1), iterable1.next());
            Assert.assertEquals((Object)new TestType("key-1", 2), iterable1.next());
            Assert.assertEquals((Object)new TestType("key-1", 3), iterable1.next());
            Assert.assertFalse((boolean)iterable1.hasNext());
            listState.add((Object)new TestType("new-key-1", 123));
            backend.setCurrentKey((Object)2);
            Iterator iterable2 = ((Iterable)listState.get()).iterator();
            Assert.assertEquals((Object)new TestType("key-2", 1), iterable2.next());
            Assert.assertFalse((boolean)iterable2.hasNext());
            listState.add((Object)new TestType("new-key-2", 456));
            backend.setCurrentKey((Object)3);
            Iterator iterable3 = ((Iterable)listState.get()).iterator();
            Assert.assertEquals((Object)new TestType("key-3", 1), iterable3.next());
            Assert.assertEquals((Object)new TestType("key-3", 2), iterable3.next());
            Assert.assertFalse((boolean)iterable3.hasNext());
            listState.add((Object)new TestType("new-key-3", 777));
        }
        finally {
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testKeyedValueStateRegistrationFailsIfNewStateSerializerIsIncompatible() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        String stateName = "test-name";
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ValueStateDescriptor kvId = new ValueStateDescriptor("test-name", (TypeSerializer)new TestType.V1TestTypeSerializer());
            ValueState valueState = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)CustomVoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            valueState.update((Object)new TestType("foo", 1456));
            backend.setCurrentKey((Object)2);
            valueState.update((Object)new TestType("bar", 478));
            backend.setCurrentKey((Object)3);
            valueState.update((Object)new TestType("hello", 189));
            KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot);
            kvId = new ValueStateDescriptor("test-name", (TypeSerializer)new TestType.IncompatibleTestTypeSerializer());
            backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)CustomVoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            Assert.fail((String)"should have failed");
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, StateMigrationException.class).isPresent());
        }
        finally {
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testKeyedListStateRegistrationFailsIfNewStateSerializerIsIncompatible() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        String stateName = "test-name";
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ListStateDescriptor kvId = new ListStateDescriptor("test-name", (TypeSerializer)new TestType.V1TestTypeSerializer());
            ListState listState = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)CustomVoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            listState.add((Object)new TestType("key-1", 1));
            listState.add((Object)new TestType("key-1", 2));
            listState.add((Object)new TestType("key-1", 3));
            backend.setCurrentKey((Object)2);
            listState.add((Object)new TestType("key-2", 1));
            backend.setCurrentKey((Object)3);
            listState.add((Object)new TestType("key-3", 1));
            listState.add((Object)new TestType("key-3", 2));
            KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot);
            kvId = new ListStateDescriptor("test-name", (TypeSerializer)new TestType.IncompatibleTestTypeSerializer());
            backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)CustomVoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            Assert.fail((String)"should have failed");
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, StateMigrationException.class).isPresent());
        }
        finally {
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPriorityQueueStateCreationFailsIfNewSerializerIsNotCompatible() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            KeyGroupedInternalPriorityQueue internalPriorityQueue = backend.create("testPriorityQueue", (TypeSerializer)new TestType.V1TestTypeSerializer());
            internalPriorityQueue.add((Object)new TestType("key-1", 123));
            internalPriorityQueue.add((Object)new TestType("key-2", 346));
            internalPriorityQueue.add((Object)new TestType("key-1", 777));
            KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot);
            backend.create("testPriorityQueue", (TypeSerializer)new TestType.IncompatibleTestTypeSerializer());
            Assert.fail((String)"should have failed");
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, StateMigrationException.class).isPresent());
        }
        finally {
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testStateBackendCreationFailsIfNewKeySerializerIsNotCompatible() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend<TestType> backend = this.createKeyedBackend(new TestType.V1TestTypeSerializer());
        String stateName = "test-name";
        try {
            ValueStateDescriptor kvId = new ValueStateDescriptor("test-name", Integer.class);
            ValueState valueState = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)CustomVoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)new TestType("foo", 123));
            valueState.update((Object)1);
            backend.setCurrentKey((Object)new TestType("bar", 456));
            valueState.update((Object)5);
            KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.dispose();
            try {
                this.restoreKeyedBackend(new TestType.IncompatibleTestTypeSerializer(), snapshot);
                Assert.fail((String)"should have failed");
            }
            catch (Exception e) {
                Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, StateMigrationException.class).isPresent());
            }
            try {
                this.restoreKeyedBackend(new TestType.V2TestTypeSerializer(), snapshot);
                Assert.fail((String)"should have failed");
            }
            catch (Exception e) {
                Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, StateMigrationException.class).isPresent());
            }
        }
        finally {
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testKeyedStateRegistrationFailsIfNewNamespaceSerializerIsNotCompatible() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        String stateName = "test-name";
        try {
            ValueStateDescriptor kvId = new ValueStateDescriptor("test-name", Integer.class);
            ValueState valueState = (ValueState)backend.getPartitionedState((Object)new TestType("namespace", 123), (TypeSerializer)new TestType.V1TestTypeSerializer(), (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            valueState.update((Object)10);
            backend.setCurrentKey((Object)5);
            valueState.update((Object)50);
            KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot);
            try {
                backend.getPartitionedState((Object)new TestType("namespace", 123), (TypeSerializer)new TestType.IncompatibleTestTypeSerializer(), (StateDescriptor)kvId);
                Assert.fail((String)"should have failed");
            }
            catch (Exception e) {
                Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, StateMigrationException.class).isPresent());
            }
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot);
            try {
                backend.getPartitionedState((Object)new TestType("namespace", 123), (TypeSerializer)new TestType.V2TestTypeSerializer(), (StateDescriptor)kvId);
                Assert.fail((String)"should have failed");
            }
            catch (Exception e) {
                Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, StateMigrationException.class).isPresent());
            }
        }
        finally {
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testOperatorParitionableListStateMigration() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        OperatorStateBackend backend = this.createOperatorStateBackend();
        String stateName = "partitionable-list-state";
        try {
            ListStateDescriptor descriptor = new ListStateDescriptor("partitionable-list-state", (TypeSerializer)new TestType.V1TestTypeSerializer());
            ListState state = backend.getListState(descriptor);
            state.add((Object)new TestType("foo", 13));
            state.add((Object)new TestType("bar", 278));
            OperatorStateHandle snapshot = this.runSnapshot(backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()));
            backend.dispose();
            backend = this.restoreOperatorStateBackend(snapshot);
            descriptor = new ListStateDescriptor("partitionable-list-state", (TypeSerializer)new TestType.V2TestTypeSerializer());
            state = backend.getListState(descriptor);
            Iterator iterator = ((Iterable)state.get()).iterator();
            Assert.assertEquals((Object)new TestType("foo", 13), iterator.next());
            Assert.assertEquals((Object)new TestType("bar", 278), iterator.next());
            Assert.assertFalse((boolean)iterator.hasNext());
            state.add((Object)new TestType("new-entry", 777));
        }
        finally {
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testUnionListStateMigration() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        OperatorStateBackend backend = this.createOperatorStateBackend();
        String stateName = "union-list-state";
        try {
            ListStateDescriptor descriptor = new ListStateDescriptor("union-list-state", (TypeSerializer)new TestType.V1TestTypeSerializer());
            ListState state = backend.getUnionListState(descriptor);
            state.add((Object)new TestType("foo", 13));
            state.add((Object)new TestType("bar", 278));
            OperatorStateHandle snapshot = this.runSnapshot(backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()));
            backend.dispose();
            backend = this.restoreOperatorStateBackend(snapshot);
            descriptor = new ListStateDescriptor("union-list-state", (TypeSerializer)new TestType.V2TestTypeSerializer());
            state = backend.getUnionListState(descriptor);
            Iterator iterator = ((Iterable)state.get()).iterator();
            Assert.assertEquals((Object)new TestType("foo", 13), iterator.next());
            Assert.assertEquals((Object)new TestType("bar", 278), iterator.next());
            Assert.assertFalse((boolean)iterator.hasNext());
            state.add((Object)new TestType("new-entry", 777));
        }
        finally {
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBroadcastStateValueMigration() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        OperatorStateBackend backend = this.createOperatorStateBackend();
        String stateName = "broadcast-state";
        try {
            MapStateDescriptor descriptor = new MapStateDescriptor("broadcast-state", (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)new TestType.V1TestTypeSerializer());
            BroadcastState state = backend.getBroadcastState(descriptor);
            state.put((Object)3, (Object)new TestType("foo", 13));
            state.put((Object)5, (Object)new TestType("bar", 278));
            OperatorStateHandle snapshot = this.runSnapshot(backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()));
            backend.dispose();
            backend = this.restoreOperatorStateBackend(snapshot);
            descriptor = new MapStateDescriptor("broadcast-state", (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)new TestType.V2TestTypeSerializer());
            state = backend.getBroadcastState(descriptor);
            Assert.assertEquals((Object)new TestType("foo", 13), (Object)state.get((Object)3));
            Assert.assertEquals((Object)new TestType("bar", 278), (Object)state.get((Object)5));
            state.put((Object)17, (Object)new TestType("new-entry", 777));
        }
        finally {
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBroadcastStateKeyMigration() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        OperatorStateBackend backend = this.createOperatorStateBackend();
        String stateName = "broadcast-state";
        try {
            MapStateDescriptor descriptor = new MapStateDescriptor("broadcast-state", (TypeSerializer)new TestType.V1TestTypeSerializer(), (TypeSerializer)IntSerializer.INSTANCE);
            BroadcastState state = backend.getBroadcastState(descriptor);
            state.put((Object)new TestType("foo", 13), (Object)3);
            state.put((Object)new TestType("bar", 278), (Object)5);
            OperatorStateHandle snapshot = this.runSnapshot(backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()));
            backend.dispose();
            backend = this.restoreOperatorStateBackend(snapshot);
            descriptor = new MapStateDescriptor("broadcast-state", (TypeSerializer)new TestType.V2TestTypeSerializer(), (TypeSerializer)IntSerializer.INSTANCE);
            state = backend.getBroadcastState(descriptor);
            Assert.assertEquals((Object)3, (Object)state.get((Object)new TestType("foo", 13)));
            Assert.assertEquals((Object)5, (Object)state.get((Object)new TestType("bar", 278)));
            state.put((Object)new TestType("new-entry", 777), (Object)17);
        }
        finally {
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testOperatorParitionableListStateRegistrationFailsIfNewSerializerIsIncompatible() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        OperatorStateBackend backend = this.createOperatorStateBackend();
        String stateName = "partitionable-list-state";
        try {
            ListStateDescriptor descriptor = new ListStateDescriptor("partitionable-list-state", (TypeSerializer)new TestType.V1TestTypeSerializer());
            ListState state = backend.getListState(descriptor);
            state.add((Object)new TestType("foo", 13));
            state.add((Object)new TestType("bar", 278));
            OperatorStateHandle snapshot = this.runSnapshot(backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()));
            backend.dispose();
            backend = this.restoreOperatorStateBackend(snapshot);
            descriptor = new ListStateDescriptor("partitionable-list-state", (TypeSerializer)new TestType.IncompatibleTestTypeSerializer());
            backend.getListState(descriptor);
            Assert.fail((String)"should have failed.");
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, StateMigrationException.class).isPresent());
        }
        finally {
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testUnionListStateRegistrationFailsIfNewSerializerIsIncompatible() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        OperatorStateBackend backend = this.createOperatorStateBackend();
        String stateName = "union-list-state";
        try {
            ListStateDescriptor descriptor = new ListStateDescriptor("union-list-state", (TypeSerializer)new TestType.V1TestTypeSerializer());
            ListState state = backend.getUnionListState(descriptor);
            state.add((Object)new TestType("foo", 13));
            state.add((Object)new TestType("bar", 278));
            OperatorStateHandle snapshot = this.runSnapshot(backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()));
            backend.dispose();
            backend = this.restoreOperatorStateBackend(snapshot);
            descriptor = new ListStateDescriptor("union-list-state", (TypeSerializer)new TestType.IncompatibleTestTypeSerializer());
            backend.getUnionListState(descriptor);
            Assert.fail((String)"should have failed.");
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, StateMigrationException.class).isPresent());
        }
        finally {
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBroadcastStateRegistrationFailsIfNewValueSerializerIsIncompatible() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        OperatorStateBackend backend = this.createOperatorStateBackend();
        String stateName = "broadcast-state";
        try {
            MapStateDescriptor descriptor = new MapStateDescriptor("broadcast-state", (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)new TestType.V1TestTypeSerializer());
            BroadcastState state = backend.getBroadcastState(descriptor);
            state.put((Object)3, (Object)new TestType("foo", 13));
            state.put((Object)5, (Object)new TestType("bar", 278));
            OperatorStateHandle snapshot = this.runSnapshot(backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()));
            backend.dispose();
            backend = this.restoreOperatorStateBackend(snapshot);
            descriptor = new MapStateDescriptor("broadcast-state", (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)new TestType.IncompatibleTestTypeSerializer());
            backend.getBroadcastState(descriptor);
            Assert.fail((String)"should have failed.");
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, StateMigrationException.class).isPresent());
        }
        finally {
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBroadcastStateRegistrationFailsIfNewKeySerializerIsIncompatible() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        OperatorStateBackend backend = this.createOperatorStateBackend();
        String stateName = "broadcast-state";
        try {
            MapStateDescriptor descriptor = new MapStateDescriptor("broadcast-state", (TypeSerializer)new TestType.V1TestTypeSerializer(), (TypeSerializer)IntSerializer.INSTANCE);
            BroadcastState state = backend.getBroadcastState(descriptor);
            state.put((Object)new TestType("foo", 13), (Object)3);
            state.put((Object)new TestType("bar", 278), (Object)5);
            OperatorStateHandle snapshot = this.runSnapshot(backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()));
            backend.dispose();
            backend = this.restoreOperatorStateBackend(snapshot);
            descriptor = new MapStateDescriptor("broadcast-state", (TypeSerializer)new TestType.IncompatibleTestTypeSerializer(), (TypeSerializer)IntSerializer.INSTANCE);
            backend.getBroadcastState(descriptor);
            Assert.fail((String)"should have failed.");
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, StateMigrationException.class).isPresent());
        }
        finally {
            backend.dispose();
        }
    }

    private CheckpointStreamFactory createStreamFactory() throws Exception {
        if (this.checkpointStorageLocation == null) {
            this.checkpointStorageLocation = this.getStateBackend().createCheckpointStorage(new JobID()).initializeLocationForCheckpoint(1L);
        }
        return this.checkpointStorageLocation;
    }

    private <K> AbstractKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> keySerializer) throws Exception {
        return this.createKeyedBackend(keySerializer, new DummyEnvironment());
    }

    private <K> AbstractKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> keySerializer, Environment env) throws Exception {
        return this.createKeyedBackend(keySerializer, 10, new KeyGroupRange(0, 9), env);
    }

    private <K> AbstractKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, Environment env) throws Exception {
        AbstractKeyedStateBackend backend = this.getStateBackend().createKeyedStateBackend(env, new JobID(), "test_op", keySerializer, numberOfKeyGroups, keyGroupRange, env.getTaskKvStateRegistry());
        backend.restore(null);
        return backend;
    }

    private <K> AbstractKeyedStateBackend<K> restoreKeyedBackend(TypeSerializer<K> keySerializer, KeyedStateHandle state) throws Exception {
        return this.restoreKeyedBackend(keySerializer, state, new DummyEnvironment());
    }

    private <K> AbstractKeyedStateBackend<K> restoreKeyedBackend(TypeSerializer<K> keySerializer, KeyedStateHandle state, Environment env) throws Exception {
        return this.restoreKeyedBackend(keySerializer, 10, new KeyGroupRange(0, 9), Collections.singletonList(state), env);
    }

    private <K> AbstractKeyedStateBackend<K> restoreKeyedBackend(TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, List<KeyedStateHandle> state, Environment env) throws Exception {
        AbstractKeyedStateBackend backend = this.getStateBackend().createKeyedStateBackend(env, new JobID(), "test_op", keySerializer, numberOfKeyGroups, keyGroupRange, env.getTaskKvStateRegistry());
        backend.restore((Object)new StateObjectCollection(state));
        return backend;
    }

    private KeyedStateHandle runSnapshot(RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunnableFuture, SharedStateRegistry sharedStateRegistry) throws Exception {
        SnapshotResult snapshotResult;
        KeyedStateHandle jobManagerOwnedSnapshot;
        if (!snapshotRunnableFuture.isDone()) {
            snapshotRunnableFuture.run();
        }
        if ((jobManagerOwnedSnapshot = (KeyedStateHandle)(snapshotResult = (SnapshotResult)snapshotRunnableFuture.get()).getJobManagerOwnedSnapshot()) != null) {
            jobManagerOwnedSnapshot.registerSharedStates(sharedStateRegistry);
        }
        return jobManagerOwnedSnapshot;
    }

    private OperatorStateBackend createOperatorStateBackend() throws Exception {
        return this.getStateBackend().createOperatorStateBackend((Environment)new DummyEnvironment(), "test_op");
    }

    private OperatorStateBackend restoreOperatorStateBackend(OperatorStateHandle state) throws Exception {
        OperatorStateBackend operatorStateBackend = this.createOperatorStateBackend();
        operatorStateBackend.restore((Object)StateObjectCollection.singleton((StateObject)state));
        return operatorStateBackend;
    }

    private OperatorStateHandle runSnapshot(RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshotRunnableFuture) throws Exception {
        if (!snapshotRunnableFuture.isDone()) {
            snapshotRunnableFuture.run();
        }
        return (OperatorStateHandle)((SnapshotResult)snapshotRunnableFuture.get()).getJobManagerOwnedSnapshot();
    }

    public static class CustomVoidNamespaceSerializerSnapshot
    implements TypeSerializerSnapshot<VoidNamespace> {
        public TypeSerializer<VoidNamespace> restoreSerializer() {
            return new CustomVoidNamespaceSerializer();
        }

        public TypeSerializerSchemaCompatibility<VoidNamespace> resolveSchemaCompatibility(TypeSerializer<VoidNamespace> newSerializer) {
            return TypeSerializerSchemaCompatibility.compatibleAsIs();
        }

        public void writeSnapshot(DataOutputView out) throws IOException {
        }

        public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
        }

        public boolean equals(Object obj) {
            return obj instanceof CustomVoidNamespaceSerializerSnapshot;
        }

        public int hashCode() {
            return 0;
        }

        public int getCurrentVersion() {
            return 0;
        }
    }

    public static class CustomVoidNamespaceSerializer
    extends TypeSerializer<VoidNamespace> {
        private static final long serialVersionUID = 1L;
        public static final CustomVoidNamespaceSerializer INSTANCE = new CustomVoidNamespaceSerializer();

        public boolean isImmutableType() {
            return true;
        }

        public VoidNamespace createInstance() {
            return VoidNamespace.get();
        }

        public VoidNamespace copy(VoidNamespace from) {
            return VoidNamespace.get();
        }

        public VoidNamespace copy(VoidNamespace from, VoidNamespace reuse) {
            return VoidNamespace.get();
        }

        public int getLength() {
            return 0;
        }

        public void serialize(VoidNamespace record, DataOutputView target) throws IOException {
            target.write(0);
        }

        public VoidNamespace deserialize(DataInputView source) throws IOException {
            source.readByte();
            return VoidNamespace.get();
        }

        public VoidNamespace deserialize(VoidNamespace reuse, DataInputView source) throws IOException {
            source.readByte();
            return VoidNamespace.get();
        }

        public void copy(DataInputView source, DataOutputView target) throws IOException {
            target.write((int)source.readByte());
        }

        public TypeSerializer<VoidNamespace> duplicate() {
            return this;
        }

        public boolean canEqual(Object obj) {
            return obj instanceof CustomVoidNamespaceSerializer;
        }

        public boolean equals(Object obj) {
            return obj instanceof CustomVoidNamespaceSerializer;
        }

        public int hashCode() {
            return ((Object)((Object)this)).getClass().hashCode();
        }

        public TypeSerializerSnapshot<VoidNamespace> snapshotConfiguration() {
            return new CustomVoidNamespaceSerializerSnapshot();
        }
    }
}

