/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.util.FutureUtil;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@RunWith(value=PowerMockRunner.class)
@PrepareForTest(value={TypeSerializerSerializationUtil.class, IntSerializer.class})
public class OperatorStateBackendTest {
    private final ClassLoader classLoader = this.getClass().getClassLoader();

    @Test
    public void testCreateOnAbstractStateBackend() throws Exception {
        MemoryStateBackend abstractStateBackend = new MemoryStateBackend();
        OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend(OperatorStateBackendTest.createMockEnvironment(), "test-operator");
        Assert.assertNotNull((Object)operatorStateBackend);
        Assert.assertTrue((boolean)operatorStateBackend.getRegisteredStateNames().isEmpty());
    }

    @Test
    public void testRegisterStatesWithoutTypeSerializer() throws Exception {
        Class<FutureTask> registeredType = FutureTask.class;
        Assert.assertFalse((boolean)(new KryoSerializer(File.class, new ExecutionConfig()).getKryo().getDefaultSerializer(registeredType) instanceof com.esotericsoftware.kryo.serializers.JavaSerializer));
        ExecutionConfig cfg = new ExecutionConfig();
        cfg.registerTypeWithKryoSerializer(registeredType, com.esotericsoftware.kryo.serializers.JavaSerializer.class);
        DefaultOperatorStateBackend operatorStateBackend = new DefaultOperatorStateBackend(this.classLoader, cfg, false);
        ListStateDescriptor stateDescriptor = new ListStateDescriptor("test", File.class);
        ListStateDescriptor stateDescriptor2 = new ListStateDescriptor("test2", String.class);
        ListState listState = operatorStateBackend.getListState(stateDescriptor);
        Assert.assertNotNull((Object)listState);
        ListState listState2 = operatorStateBackend.getListState(stateDescriptor2);
        Assert.assertNotNull((Object)listState2);
        Assert.assertEquals((long)2L, (long)operatorStateBackend.getRegisteredStateNames().size());
        TypeSerializer serializer = ((DefaultOperatorStateBackend.PartitionableListState)listState).getStateMetaInfo().getPartitionStateSerializer();
        Assert.assertTrue((boolean)(serializer instanceof KryoSerializer));
        Assert.assertTrue((boolean)(((KryoSerializer)serializer).getKryo().getSerializer(registeredType) instanceof com.esotericsoftware.kryo.serializers.JavaSerializer));
        Iterator it = ((Iterable)listState2.get()).iterator();
        Assert.assertFalse((boolean)it.hasNext());
        listState2.add((Object)"kevin");
        listState2.add((Object)"sunny");
        it = ((Iterable)listState2.get()).iterator();
        Assert.assertEquals((Object)"kevin", it.next());
        Assert.assertEquals((Object)"sunny", it.next());
        Assert.assertFalse((boolean)it.hasNext());
    }

    @Test
    public void testRegisterStates() throws Exception {
        DefaultOperatorStateBackend operatorStateBackend = new DefaultOperatorStateBackend(this.classLoader, new ExecutionConfig(), false);
        ListStateDescriptor stateDescriptor1 = new ListStateDescriptor("test1", (TypeSerializer)new JavaSerializer());
        ListStateDescriptor stateDescriptor2 = new ListStateDescriptor("test2", (TypeSerializer)new JavaSerializer());
        ListStateDescriptor stateDescriptor3 = new ListStateDescriptor("test3", (TypeSerializer)new JavaSerializer());
        ListState listState1 = operatorStateBackend.getListState(stateDescriptor1);
        Assert.assertNotNull((Object)listState1);
        Assert.assertEquals((long)1L, (long)operatorStateBackend.getRegisteredStateNames().size());
        Iterator it = ((Iterable)listState1.get()).iterator();
        Assert.assertFalse((boolean)it.hasNext());
        listState1.add((Object)42);
        listState1.add((Object)4711);
        it = ((Iterable)listState1.get()).iterator();
        Assert.assertEquals((Object)42, it.next());
        Assert.assertEquals((Object)4711, it.next());
        Assert.assertFalse((boolean)it.hasNext());
        ListState listState2 = operatorStateBackend.getListState(stateDescriptor2);
        Assert.assertNotNull((Object)listState2);
        Assert.assertEquals((long)2L, (long)operatorStateBackend.getRegisteredStateNames().size());
        Assert.assertFalse((boolean)it.hasNext());
        listState2.add((Object)7);
        listState2.add((Object)13);
        listState2.add((Object)23);
        it = ((Iterable)listState2.get()).iterator();
        Assert.assertEquals((Object)7, it.next());
        Assert.assertEquals((Object)13, it.next());
        Assert.assertEquals((Object)23, it.next());
        Assert.assertFalse((boolean)it.hasNext());
        ListState listState3 = operatorStateBackend.getUnionListState(stateDescriptor3);
        Assert.assertNotNull((Object)listState3);
        Assert.assertEquals((long)3L, (long)operatorStateBackend.getRegisteredStateNames().size());
        Assert.assertFalse((boolean)it.hasNext());
        listState3.add((Object)17);
        listState3.add((Object)3);
        listState3.add((Object)123);
        it = ((Iterable)listState3.get()).iterator();
        Assert.assertEquals((Object)17, it.next());
        Assert.assertEquals((Object)3, it.next());
        Assert.assertEquals((Object)123, it.next());
        Assert.assertFalse((boolean)it.hasNext());
        ListState listState1b = operatorStateBackend.getListState(stateDescriptor1);
        Assert.assertNotNull((Object)listState1b);
        listState1b.add((Object)123);
        it = ((Iterable)listState1b.get()).iterator();
        Assert.assertEquals((Object)42, it.next());
        Assert.assertEquals((Object)4711, it.next());
        Assert.assertEquals((Object)123, it.next());
        Assert.assertFalse((boolean)it.hasNext());
        it = ((Iterable)listState1.get()).iterator();
        Assert.assertEquals((Object)42, it.next());
        Assert.assertEquals((Object)4711, it.next());
        Assert.assertEquals((Object)123, it.next());
        Assert.assertFalse((boolean)it.hasNext());
        it = ((Iterable)listState1b.get()).iterator();
        Assert.assertEquals((Object)42, it.next());
        Assert.assertEquals((Object)4711, it.next());
        Assert.assertEquals((Object)123, it.next());
        Assert.assertFalse((boolean)it.hasNext());
        try {
            operatorStateBackend.getUnionListState(stateDescriptor2);
            Assert.fail((String)"Did not detect changed mode");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
        try {
            operatorStateBackend.getListState(stateDescriptor3);
            Assert.fail((String)"Did not detect changed mode");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    @Test
    public void testCorrectClassLoaderUsedOnSnapshot() throws Exception {
        MemoryStateBackend abstractStateBackend = new MemoryStateBackend(4096);
        Environment env = OperatorStateBackendTest.createMockEnvironment();
        OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend(env, "test-op-name");
        AtomicInteger copyCounter = new AtomicInteger(0);
        VerifyingIntSerializer serializer = new VerifyingIntSerializer(env.getUserClassLoader(), copyCounter);
        ListStateDescriptor stateDescriptor = new ListStateDescriptor("test", (TypeSerializer)serializer);
        ListState listState = operatorStateBackend.getListState(stateDescriptor);
        listState.add((Object)42);
        CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
        RunnableFuture runnableFuture = operatorStateBackend.snapshot(1L, 1L, streamFactory, CheckpointOptions.forCheckpoint());
        FutureUtil.runIfNotDoneAndGet((RunnableFuture)runnableFuture);
        Assert.assertTrue((copyCounter.get() > 0 ? 1 : 0) != 0);
    }

    @Test
    public void testSnapshotEmpty() throws Exception {
        MemoryStateBackend abstractStateBackend = new MemoryStateBackend(4096);
        OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend(OperatorStateBackendTest.createMockEnvironment(), "testOperator");
        CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
        RunnableFuture snapshot = operatorStateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forCheckpoint());
        OperatorStateHandle stateHandle = (OperatorStateHandle)FutureUtil.runIfNotDoneAndGet((RunnableFuture)snapshot);
        Assert.assertNull((Object)stateHandle);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSnapshotRestoreSync() throws Exception {
        MemoryStateBackend abstractStateBackend = new MemoryStateBackend(4096);
        OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend(OperatorStateBackendTest.createMockEnvironment(), "test-op-name");
        ListStateDescriptor stateDescriptor1 = new ListStateDescriptor("test1", (TypeSerializer)new JavaSerializer());
        ListStateDescriptor stateDescriptor2 = new ListStateDescriptor("test2", (TypeSerializer)new JavaSerializer());
        ListStateDescriptor stateDescriptor3 = new ListStateDescriptor("test3", (TypeSerializer)new JavaSerializer());
        ListState listState1 = operatorStateBackend.getListState(stateDescriptor1);
        ListState listState2 = operatorStateBackend.getListState(stateDescriptor2);
        ListState listState3 = operatorStateBackend.getUnionListState(stateDescriptor3);
        listState1.add((Object)42);
        listState1.add((Object)4711);
        listState2.add((Object)7);
        listState2.add((Object)13);
        listState2.add((Object)23);
        listState3.add((Object)17);
        listState3.add((Object)18);
        listState3.add((Object)19);
        listState3.add((Object)20);
        CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
        RunnableFuture runnableFuture = operatorStateBackend.snapshot(1L, 1L, streamFactory, CheckpointOptions.forCheckpoint());
        OperatorStateHandle stateHandle = (OperatorStateHandle)FutureUtil.runIfNotDoneAndGet((RunnableFuture)runnableFuture);
        try {
            operatorStateBackend.close();
            operatorStateBackend.dispose();
            operatorStateBackend = abstractStateBackend.createOperatorStateBackend(OperatorStateBackendTest.createMockEnvironment(), "testOperator");
            operatorStateBackend.restore(Collections.singletonList(stateHandle));
            Assert.assertEquals((long)3L, (long)operatorStateBackend.getRegisteredStateNames().size());
            listState1 = operatorStateBackend.getListState(stateDescriptor1);
            listState2 = operatorStateBackend.getListState(stateDescriptor2);
            listState3 = operatorStateBackend.getUnionListState(stateDescriptor3);
            Assert.assertEquals((long)3L, (long)operatorStateBackend.getRegisteredStateNames().size());
            Iterator it = ((Iterable)listState1.get()).iterator();
            Assert.assertEquals((Object)42, it.next());
            Assert.assertEquals((Object)4711, it.next());
            Assert.assertFalse((boolean)it.hasNext());
            it = ((Iterable)listState2.get()).iterator();
            Assert.assertEquals((Object)7, it.next());
            Assert.assertEquals((Object)13, it.next());
            Assert.assertEquals((Object)23, it.next());
            Assert.assertFalse((boolean)it.hasNext());
            it = ((Iterable)listState3.get()).iterator();
            Assert.assertEquals((Object)17, it.next());
            Assert.assertEquals((Object)18, it.next());
            Assert.assertEquals((Object)19, it.next());
            Assert.assertEquals((Object)20, it.next());
            Assert.assertFalse((boolean)it.hasNext());
            operatorStateBackend.close();
            operatorStateBackend.dispose();
        }
        finally {
            stateHandle.discardState();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSnapshotRestoreAsync() throws Exception {
        DefaultOperatorStateBackend operatorStateBackend = new DefaultOperatorStateBackend(OperatorStateBackendTest.class.getClassLoader(), new ExecutionConfig(), true);
        ListStateDescriptor stateDescriptor1 = new ListStateDescriptor("test1", (TypeSerializer)new JavaSerializer());
        ListStateDescriptor stateDescriptor2 = new ListStateDescriptor("test2", (TypeSerializer)new JavaSerializer());
        ListStateDescriptor stateDescriptor3 = new ListStateDescriptor("test3", (TypeSerializer)new JavaSerializer());
        ListState listState1 = operatorStateBackend.getListState(stateDescriptor1);
        ListState listState2 = operatorStateBackend.getListState(stateDescriptor2);
        ListState listState3 = operatorStateBackend.getUnionListState(stateDescriptor3);
        listState1.add((Object)MutableType.of(42));
        listState1.add((Object)MutableType.of(4711));
        listState2.add((Object)MutableType.of(7));
        listState2.add((Object)MutableType.of(13));
        listState2.add((Object)MutableType.of(23));
        listState3.add((Object)MutableType.of(17));
        listState3.add((Object)MutableType.of(18));
        listState3.add((Object)MutableType.of(19));
        listState3.add((Object)MutableType.of(20));
        BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(0x100000);
        OneShotLatch waiterLatch = new OneShotLatch();
        OneShotLatch blockerLatch = new OneShotLatch();
        streamFactory.setWaiterLatch(waiterLatch);
        streamFactory.setBlockerLatch(blockerLatch);
        RunnableFuture runnableFuture = operatorStateBackend.snapshot(1L, 1L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpoint());
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        executorService.submit(runnableFuture);
        waiterLatch.await();
        listState1.add((Object)MutableType.of(77));
        int n = 0;
        for (MutableType mutableType : (Iterable)listState2.get()) {
            if (++n == 2) {
                blockerLatch.trigger();
            }
            mutableType.setValue(mutableType.getValue() + 10);
        }
        listState3.clear();
        operatorStateBackend.getListState(new ListStateDescriptor("test4", (TypeSerializer)new JavaSerializer()));
        OperatorStateHandle stateHandle = (OperatorStateHandle)runnableFuture.get();
        try {
            operatorStateBackend.close();
            operatorStateBackend.dispose();
            MemoryStateBackend abstractStateBackend = new MemoryStateBackend(4096);
            operatorStateBackend = abstractStateBackend.createOperatorStateBackend(OperatorStateBackendTest.createMockEnvironment(), "testOperator");
            operatorStateBackend.restore(Collections.singletonList(stateHandle));
            Assert.assertEquals((long)3L, (long)operatorStateBackend.getRegisteredStateNames().size());
            listState1 = operatorStateBackend.getListState(stateDescriptor1);
            listState2 = operatorStateBackend.getListState(stateDescriptor2);
            listState3 = operatorStateBackend.getUnionListState(stateDescriptor3);
            Assert.assertEquals((long)3L, (long)operatorStateBackend.getRegisteredStateNames().size());
            Iterator it = ((Iterable)listState1.get()).iterator();
            Assert.assertEquals((long)42L, (long)((MutableType)it.next()).value);
            Assert.assertEquals((long)4711L, (long)((MutableType)it.next()).value);
            Assert.assertFalse((boolean)it.hasNext());
            it = ((Iterable)listState2.get()).iterator();
            Assert.assertEquals((long)7L, (long)((MutableType)it.next()).value);
            Assert.assertEquals((long)13L, (long)((MutableType)it.next()).value);
            Assert.assertEquals((long)23L, (long)((MutableType)it.next()).value);
            Assert.assertFalse((boolean)it.hasNext());
            it = ((Iterable)listState3.get()).iterator();
            Assert.assertEquals((long)17L, (long)((MutableType)it.next()).value);
            Assert.assertEquals((long)18L, (long)((MutableType)it.next()).value);
            Assert.assertEquals((long)19L, (long)((MutableType)it.next()).value);
            Assert.assertEquals((long)20L, (long)((MutableType)it.next()).value);
            Assert.assertFalse((boolean)it.hasNext());
            operatorStateBackend.close();
            operatorStateBackend.dispose();
        }
        finally {
            stateHandle.discardState();
        }
        executorService.shutdown();
    }

    @Test
    public void testSnapshotAsyncClose() throws Exception {
        DefaultOperatorStateBackend operatorStateBackend = new DefaultOperatorStateBackend(OperatorStateBackendTest.class.getClassLoader(), new ExecutionConfig(), true);
        ListStateDescriptor stateDescriptor1 = new ListStateDescriptor("test1", (TypeSerializer)new JavaSerializer());
        ListState listState1 = operatorStateBackend.getOperatorState(stateDescriptor1);
        listState1.add((Object)MutableType.of(42));
        listState1.add((Object)MutableType.of(4711));
        BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(0x100000);
        OneShotLatch waiterLatch = new OneShotLatch();
        OneShotLatch blockerLatch = new OneShotLatch();
        streamFactory.setWaiterLatch(waiterLatch);
        streamFactory.setBlockerLatch(blockerLatch);
        RunnableFuture runnableFuture = operatorStateBackend.snapshot(1L, 1L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpoint());
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        executorService.submit(runnableFuture);
        waiterLatch.await();
        operatorStateBackend.close();
        blockerLatch.trigger();
        try {
            runnableFuture.get(60L, TimeUnit.SECONDS);
            Assert.fail();
        }
        catch (ExecutionException eex) {
            Assert.assertTrue((boolean)(eex.getCause() instanceof IOException));
        }
    }

    @Test
    public void testSnapshotAsyncCancel() throws Exception {
        DefaultOperatorStateBackend operatorStateBackend = new DefaultOperatorStateBackend(OperatorStateBackendTest.class.getClassLoader(), new ExecutionConfig(), true);
        ListStateDescriptor stateDescriptor1 = new ListStateDescriptor("test1", (TypeSerializer)new JavaSerializer());
        ListState listState1 = operatorStateBackend.getOperatorState(stateDescriptor1);
        listState1.add((Object)MutableType.of(42));
        listState1.add((Object)MutableType.of(4711));
        BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(0x100000);
        OneShotLatch waiterLatch = new OneShotLatch();
        OneShotLatch blockerLatch = new OneShotLatch();
        streamFactory.setWaiterLatch(waiterLatch);
        streamFactory.setBlockerLatch(blockerLatch);
        RunnableFuture runnableFuture = operatorStateBackend.snapshot(1L, 1L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpoint());
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        executorService.submit(runnableFuture);
        waiterLatch.await();
        runnableFuture.cancel(true);
        Assert.assertTrue((boolean)streamFactory.getLastCreatedStream().isClosed());
        blockerLatch.trigger();
        try {
            runnableFuture.get(60L, TimeUnit.SECONDS);
            Assert.fail();
        }
        catch (CancellationException cancellationException) {
            // empty catch block
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRestoreFailsIfSerializerDeserializationFails() throws Exception {
        MemoryStateBackend abstractStateBackend = new MemoryStateBackend(4096);
        OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend(OperatorStateBackendTest.createMockEnvironment(), "test-op-name");
        ListStateDescriptor stateDescriptor1 = new ListStateDescriptor("test1", (TypeSerializer)new JavaSerializer());
        ListStateDescriptor stateDescriptor2 = new ListStateDescriptor("test2", (TypeSerializer)new JavaSerializer());
        ListStateDescriptor stateDescriptor3 = new ListStateDescriptor("test3", (TypeSerializer)new JavaSerializer());
        ListState listState1 = operatorStateBackend.getListState(stateDescriptor1);
        ListState listState2 = operatorStateBackend.getListState(stateDescriptor2);
        ListState listState3 = operatorStateBackend.getUnionListState(stateDescriptor3);
        listState1.add((Object)42);
        listState1.add((Object)4711);
        listState2.add((Object)7);
        listState2.add((Object)13);
        listState2.add((Object)23);
        listState3.add((Object)17);
        listState3.add((Object)18);
        listState3.add((Object)19);
        listState3.add((Object)20);
        CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
        RunnableFuture runnableFuture = operatorStateBackend.snapshot(1L, 1L, streamFactory, CheckpointOptions.forCheckpoint());
        OperatorStateHandle stateHandle = (OperatorStateHandle)FutureUtil.runIfNotDoneAndGet((RunnableFuture)runnableFuture);
        try {
            operatorStateBackend.close();
            operatorStateBackend.dispose();
            operatorStateBackend = abstractStateBackend.createOperatorStateBackend(OperatorStateBackendTest.createMockEnvironment(), "testOperator");
            TypeSerializerSerializationUtil.TypeSerializerSerializationProxy mockProxy = (TypeSerializerSerializationUtil.TypeSerializerSerializationProxy)Mockito.mock(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class);
            ((TypeSerializerSerializationUtil.TypeSerializerSerializationProxy)Mockito.doThrow((Throwable)new IOException()).when((Object)mockProxy)).read((DataInputView)Matchers.any(DataInputViewStreamWrapper.class));
            PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn((Object)mockProxy);
            operatorStateBackend.restore(Collections.singletonList(stateHandle));
            Assert.fail((String)"The operator state restore should have failed if the previous state serializer could not be loaded.");
        }
        catch (IOException expected) {
            Assert.assertTrue((boolean)expected.getMessage().contains("Unable to restore operator state"));
        }
        finally {
            stateHandle.discardState();
        }
    }

    private static Environment createMockEnvironment() {
        Environment env = (Environment)Mockito.mock(Environment.class);
        Mockito.when((Object)env.getExecutionConfig()).thenReturn((Object)new ExecutionConfig());
        Mockito.when((Object)env.getUserClassLoader()).thenReturn((Object)OperatorStateBackendTest.class.getClassLoader());
        return env;
    }

    static final class MutableType
    implements Serializable {
        private static final long serialVersionUID = 1L;
        private int value;

        public MutableType() {
            this(0);
        }

        public MutableType(int value) {
            this.value = value;
        }

        public int getValue() {
            return this.value;
        }

        public void setValue(int value) {
            this.value = value;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            MutableType that = (MutableType)o;
            return this.value == that.value;
        }

        public int hashCode() {
            return this.value;
        }

        static MutableType of(int value) {
            return new MutableType(value);
        }
    }

    private static final class VerifyingIntSerializer
    extends TypeSerializer<Integer> {
        private static final long serialVersionUID = -5344563614550163898L;
        private transient ClassLoader classLoader;
        private transient AtomicInteger atomicInteger;

        private VerifyingIntSerializer(ClassLoader classLoader, AtomicInteger atomicInteger) {
            this.classLoader = (ClassLoader)Preconditions.checkNotNull((Object)classLoader);
            this.atomicInteger = (AtomicInteger)Preconditions.checkNotNull((Object)atomicInteger);
        }

        public boolean isImmutableType() {
            return false;
        }

        public TypeSerializer<Integer> duplicate() {
            return this;
        }

        public Integer createInstance() {
            return 0;
        }

        public Integer copy(Integer from) {
            Assert.assertEquals((Object)this.classLoader, (Object)Thread.currentThread().getContextClassLoader());
            this.atomicInteger.incrementAndGet();
            return IntSerializer.INSTANCE.copy(from);
        }

        public Integer copy(Integer from, Integer reuse) {
            Assert.assertEquals((Object)this.classLoader, (Object)Thread.currentThread().getContextClassLoader());
            this.atomicInteger.incrementAndGet();
            return IntSerializer.INSTANCE.copy(from, reuse);
        }

        public int getLength() {
            return IntSerializer.INSTANCE.getLength();
        }

        public void serialize(Integer record, DataOutputView target) throws IOException {
            IntSerializer.INSTANCE.serialize(record, target);
        }

        public Integer deserialize(DataInputView source) throws IOException {
            return IntSerializer.INSTANCE.deserialize(source);
        }

        public Integer deserialize(Integer reuse, DataInputView source) throws IOException {
            return IntSerializer.INSTANCE.deserialize(reuse, source);
        }

        public void copy(DataInputView source, DataOutputView target) throws IOException {
            Assert.assertEquals((Object)this.classLoader, (Object)Thread.currentThread().getContextClassLoader());
            this.atomicInteger.incrementAndGet();
            IntSerializer.INSTANCE.copy(source, target);
        }

        public boolean equals(Object obj) {
            if (obj instanceof VerifyingIntSerializer) {
                return ((VerifyingIntSerializer)((Object)obj)).canEqual((Object)this);
            }
            return false;
        }

        public boolean canEqual(Object obj) {
            return obj instanceof VerifyingIntSerializer;
        }

        public int hashCode() {
            return ((Object)((Object)this)).getClass().hashCode();
        }

        public TypeSerializerConfigSnapshot snapshotConfiguration() {
            return IntSerializer.INSTANCE.snapshotConfiguration();
        }

        public CompatibilityResult<Integer> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
            return IntSerializer.INSTANCE.ensureCompatibility(configSnapshot);
        }
    }
}

