/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.heap;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.ArrayListSerializer;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.runtime.state.heap.CopyOnWriteStateTable;
import org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot;
import org.apache.flink.runtime.state.heap.InternalKeyContext;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;

public class CopyOnWriteStateTableTest
extends TestLogger {
    @Test
    public void testPutGetRemoveContainsTransform() throws Exception {
        RegisteredKeyedBackendStateMetaInfo metaInfo = new RegisteredKeyedBackendStateMetaInfo(StateDescriptor.Type.UNKNOWN, "test", (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)new ArrayListSerializer((TypeSerializer)IntSerializer.INSTANCE));
        MockInternalKeyContext keyContext = new MockInternalKeyContext(IntSerializer.INSTANCE);
        CopyOnWriteStateTable stateTable = new CopyOnWriteStateTable(keyContext, metaInfo);
        ArrayList state_1_1 = new ArrayList();
        state_1_1.add(41);
        ArrayList<Integer> state_2_1 = new ArrayList<Integer>();
        state_2_1.add(42);
        ArrayList<Integer> state_1_2 = new ArrayList<Integer>();
        state_1_2.add(43);
        Assert.assertNull((Object)stateTable.putAndGetOld((Object)1, (Object)1, state_1_1));
        Assert.assertEquals(state_1_1, (Object)stateTable.get((Object)1, (Object)1));
        Assert.assertEquals((long)1L, (long)stateTable.size());
        Assert.assertNull((Object)stateTable.putAndGetOld((Object)2, (Object)1, state_2_1));
        Assert.assertEquals(state_2_1, (Object)stateTable.get((Object)2, (Object)1));
        Assert.assertEquals((long)2L, (long)stateTable.size());
        Assert.assertNull((Object)stateTable.putAndGetOld((Object)1, (Object)2, state_1_2));
        Assert.assertEquals(state_1_2, (Object)stateTable.get((Object)1, (Object)2));
        Assert.assertEquals((long)3L, (long)stateTable.size());
        Assert.assertTrue((boolean)stateTable.containsKey((Object)2, (Object)1));
        Assert.assertFalse((boolean)stateTable.containsKey((Object)3, (Object)1));
        Assert.assertFalse((boolean)stateTable.containsKey((Object)2, (Object)3));
        stateTable.put((Object)2, (Object)1, null);
        Assert.assertTrue((boolean)stateTable.containsKey((Object)2, (Object)1));
        Assert.assertEquals((long)3L, (long)stateTable.size());
        Assert.assertNull((Object)stateTable.get((Object)2, (Object)1));
        stateTable.put((Object)2, (Object)1, state_2_1);
        Assert.assertEquals((long)3L, (long)stateTable.size());
        Assert.assertEquals(state_2_1, (Object)stateTable.removeAndGetOld((Object)2, (Object)1));
        Assert.assertFalse((boolean)stateTable.containsKey((Object)2, (Object)1));
        Assert.assertEquals((long)2L, (long)stateTable.size());
        stateTable.remove((Object)1, (Object)2);
        Assert.assertFalse((boolean)stateTable.containsKey((Object)1, (Object)2));
        Assert.assertEquals((long)1L, (long)stateTable.size());
        Assert.assertNull((Object)stateTable.removeAndGetOld((Object)4, (Object)2));
        Assert.assertEquals((long)1L, (long)stateTable.size());
        StateTransformationFunction<ArrayList<Integer>, Integer> function = new StateTransformationFunction<ArrayList<Integer>, Integer>(){

            public ArrayList<Integer> apply(ArrayList<Integer> previousState, Integer value) throws Exception {
                previousState.add(value);
                return previousState;
            }
        };
        int value = 4711;
        stateTable.transform((Object)1, (Object)1, (Object)4711, (StateTransformationFunction)function);
        state_1_1 = (ArrayList)function.apply(state_1_1, (Object)4711);
        Assert.assertEquals((Object)state_1_1, (Object)stateTable.get((Object)1, (Object)1));
    }

    @Test
    public void testIncrementalRehash() {
        RegisteredKeyedBackendStateMetaInfo metaInfo = new RegisteredKeyedBackendStateMetaInfo(StateDescriptor.Type.UNKNOWN, "test", (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)new ArrayListSerializer((TypeSerializer)IntSerializer.INSTANCE));
        MockInternalKeyContext keyContext = new MockInternalKeyContext(IntSerializer.INSTANCE);
        CopyOnWriteStateTable stateTable = new CopyOnWriteStateTable(keyContext, metaInfo);
        int insert = 0;
        int remove = 0;
        while (!stateTable.isRehashing()) {
            stateTable.put((Object)insert++, (Object)0, new ArrayList());
            if (insert % 8 != 0) continue;
            stateTable.remove((Object)remove++, (Object)0);
        }
        Assert.assertEquals((long)(insert - remove), (long)stateTable.size());
        while (stateTable.isRehashing()) {
            stateTable.put((Object)insert++, (Object)0, new ArrayList());
            if (insert % 8 != 0) continue;
            stateTable.remove((Object)remove++, (Object)0);
        }
        Assert.assertEquals((long)(insert - remove), (long)stateTable.size());
        for (int i = 0; i < insert; ++i) {
            if (i < remove) {
                Assert.assertFalse((boolean)stateTable.containsKey((Object)i, (Object)0));
                continue;
            }
            Assert.assertTrue((boolean)stateTable.containsKey((Object)i, (Object)0));
        }
    }

    @Test
    public void testRandomModificationsAndCopyOnWriteIsolation() throws Exception {
        RegisteredKeyedBackendStateMetaInfo metaInfo = new RegisteredKeyedBackendStateMetaInfo(StateDescriptor.Type.UNKNOWN, "test", (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)new ArrayListSerializer((TypeSerializer)IntSerializer.INSTANCE));
        MockInternalKeyContext keyContext = new MockInternalKeyContext(IntSerializer.INSTANCE);
        CopyOnWriteStateTable stateTable = new CopyOnWriteStateTable(keyContext, metaInfo);
        HashMap<Tuple2<Integer, Integer>, ArrayList<Integer>> referenceMap = new HashMap<Tuple2<Integer, Integer>, ArrayList<Integer>>();
        Random random = new Random(42L);
        CopyOnWriteStateTable.StateTableEntry[] snapshot = null;
        int snapshotSize = 0;
        Tuple3<Integer, Integer, ArrayList<Integer>>[] reference = null;
        int val = 0;
        int snapshotCounter = 0;
        int referencedSnapshotId = 0;
        StateTransformationFunction<ArrayList<Integer>, Integer> transformationFunction = new StateTransformationFunction<ArrayList<Integer>, Integer>(){

            public ArrayList<Integer> apply(ArrayList<Integer> previousState, Integer value) throws Exception {
                if (previousState == null) {
                    previousState = new ArrayList();
                }
                previousState.add(value);
                return previousState;
            }
        };
        for (int i = 0; i < 10000000; ++i) {
            int key = random.nextInt(20);
            int namespace = random.nextInt(4);
            Tuple2 compositeKey = new Tuple2((Object)key, (Object)namespace);
            int op = random.nextInt(7);
            ArrayList<Integer> state = null;
            ArrayList<Integer> referenceState = null;
            switch (op) {
                case 0: 
                case 1: {
                    state = (ArrayList<Integer>)stateTable.get((Object)key, (Object)namespace);
                    referenceState = (ArrayList<Integer>)referenceMap.get(compositeKey);
                    if (null != state) break;
                    state = new ArrayList<Integer>();
                    stateTable.put((Object)key, (Object)namespace, state);
                    referenceState = new ArrayList();
                    referenceMap.put(compositeKey, referenceState);
                    break;
                }
                case 2: {
                    stateTable.put((Object)key, (Object)namespace, new ArrayList());
                    referenceMap.put(compositeKey, new ArrayList());
                    break;
                }
                case 3: {
                    state = (ArrayList)stateTable.putAndGetOld((Object)key, (Object)namespace, new ArrayList());
                    referenceState = referenceMap.put(compositeKey, new ArrayList());
                    break;
                }
                case 4: {
                    stateTable.remove((Object)key, (Object)namespace);
                    referenceMap.remove(compositeKey);
                    break;
                }
                case 5: {
                    state = (ArrayList)stateTable.removeAndGetOld((Object)key, (Object)namespace);
                    referenceState = (ArrayList)referenceMap.remove(compositeKey);
                    break;
                }
                case 6: {
                    int updateValue = random.nextInt(1000);
                    stateTable.transform((Object)key, (Object)namespace, (Object)updateValue, (StateTransformationFunction)transformationFunction);
                    referenceMap.put((Tuple2<Integer, Integer>)compositeKey, (ArrayList<Integer>)transformationFunction.apply(referenceMap.remove(compositeKey), (Object)updateValue));
                    break;
                }
                default: {
                    Assert.fail((String)("Unknown op-code " + op));
                }
            }
            Assert.assertEquals((long)referenceMap.size(), (long)stateTable.size());
            if (state != null) {
                if (random.nextBoolean() && !state.isEmpty()) {
                    state.remove(state.size() - 1);
                    referenceState.remove(referenceState.size() - 1);
                } else {
                    state.add(val);
                    referenceState.add(val);
                    ++val;
                }
            }
            Assert.assertEquals(referenceState, state);
            if (i <= 0 || i % 500 != 0) continue;
            if (snapshot != null) {
                this.deepCheck(reference, CopyOnWriteStateTableTest.convert(snapshot, snapshotSize));
                if (i % 1000 == 0) {
                    stateTable.snapshotTableArrays();
                    stateTable.releaseSnapshot(++snapshotCounter);
                }
                if (i % 5000 != 0) continue;
                snapshot = null;
                reference = null;
                snapshotSize = 0;
                stateTable.releaseSnapshot(referencedSnapshotId);
                continue;
            }
            referencedSnapshotId = ++snapshotCounter;
            snapshot = stateTable.snapshotTableArrays();
            snapshotSize = stateTable.size();
            reference = this.manualDeepDump(referenceMap);
        }
    }

    @Test
    public void testCopyOnWriteContracts() {
        RegisteredKeyedBackendStateMetaInfo metaInfo = new RegisteredKeyedBackendStateMetaInfo(StateDescriptor.Type.UNKNOWN, "test", (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)new ArrayListSerializer((TypeSerializer)IntSerializer.INSTANCE));
        MockInternalKeyContext keyContext = new MockInternalKeyContext(IntSerializer.INSTANCE);
        CopyOnWriteStateTable stateTable = new CopyOnWriteStateTable(keyContext, metaInfo);
        ArrayList<Integer> originalState1 = new ArrayList<Integer>(1);
        ArrayList<Integer> originalState2 = new ArrayList<Integer>(1);
        ArrayList<Integer> originalState3 = new ArrayList<Integer>(1);
        ArrayList<Integer> originalState4 = new ArrayList<Integer>(1);
        ArrayList<Integer> originalState5 = new ArrayList<Integer>(1);
        originalState1.add(1);
        originalState2.add(2);
        originalState3.add(3);
        originalState4.add(4);
        originalState5.add(5);
        stateTable.put((Object)1, (Object)1, originalState1);
        stateTable.put((Object)2, (Object)1, originalState2);
        stateTable.put((Object)4, (Object)1, originalState4);
        stateTable.put((Object)5, (Object)1, originalState5);
        Assert.assertTrue((stateTable.get((Object)1, (Object)1) == originalState1 ? 1 : 0) != 0);
        CopyOnWriteStateTableSnapshot snapshot1 = stateTable.createSnapshot();
        ArrayList copyState = (ArrayList)stateTable.get((Object)1, (Object)1);
        Assert.assertFalse((copyState == originalState1 ? 1 : 0) != 0);
        Assert.assertEquals(originalState1, (Object)copyState);
        stateTable.put((Object)3, (Object)1, originalState3);
        Assert.assertTrue((copyState == stateTable.get((Object)1, (Object)1) ? 1 : 0) != 0);
        CopyOnWriteStateTableSnapshot snapshot2 = stateTable.createSnapshot();
        Assert.assertFalse((copyState == stateTable.get((Object)1, (Object)1) ? 1 : 0) != 0);
        Assert.assertEquals((Object)copyState, (Object)stateTable.get((Object)1, (Object)1));
        stateTable.releaseSnapshot(snapshot2);
        Assert.assertTrue((originalState3 == stateTable.get((Object)3, (Object)1) ? 1 : 0) != 0);
        Assert.assertFalse((originalState4 == stateTable.get((Object)4, (Object)1) ? 1 : 0) != 0);
        stateTable.releaseSnapshot(snapshot1);
        Assert.assertTrue((originalState5 == stateTable.get((Object)5, (Object)1) ? 1 : 0) != 0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSerializerDuplicationInSnapshot() throws IOException {
        TestDuplicateSerializer namespaceSerializer = new TestDuplicateSerializer();
        TestDuplicateSerializer stateSerializer = new TestDuplicateSerializer();
        final TestDuplicateSerializer keySerializer = new TestDuplicateSerializer();
        RegisteredKeyedBackendStateMetaInfo metaInfo = new RegisteredKeyedBackendStateMetaInfo(StateDescriptor.Type.VALUE, "test", (TypeSerializer)namespaceSerializer, (TypeSerializer)stateSerializer);
        final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
        InternalKeyContext<Integer> mockKeyContext = new InternalKeyContext<Integer>(){

            public Integer getCurrentKey() {
                return 0;
            }

            public int getCurrentKeyGroupIndex() {
                return 0;
            }

            public int getNumberOfKeyGroups() {
                return 1;
            }

            public KeyGroupRange getKeyGroupRange() {
                return keyGroupRange;
            }

            public TypeSerializer<Integer> getKeySerializer() {
                return keySerializer;
            }
        };
        CopyOnWriteStateTable table = new CopyOnWriteStateTable((InternalKeyContext)mockKeyContext, metaInfo);
        table.put((Object)0, 0, (Object)0, (Object)0);
        table.put((Object)1, 0, (Object)0, (Object)1);
        table.put((Object)2, 0, (Object)1, (Object)2);
        CopyOnWriteStateTableSnapshot snapshot = table.createSnapshot();
        try {
            namespaceSerializer.disable();
            keySerializer.disable();
            stateSerializer.disable();
            snapshot.writeMappingsInKeyGroup((DataOutputView)new DataOutputViewStreamWrapper((OutputStream)new ByteArrayOutputStreamWithPos(1024)), 0);
        }
        finally {
            table.releaseSnapshot(snapshot);
        }
    }

    /*
     * WARNING - void declaration
     */
    private static <K, N, S> Tuple3<K, N, S>[] convert(CopyOnWriteStateTable.StateTableEntry<K, N, S>[] snapshot, int mapSize) {
        Tuple3[] result = new Tuple3[mapSize];
        int pos = 0;
        for (CopyOnWriteStateTable.StateTableEntry<K, N, S> stateTableEntry : snapshot) {
            void var7_7;
            while (null != var7_7) {
                result[pos++] = new Tuple3(var7_7.getKey(), var7_7.getNamespace(), var7_7.getState());
                CopyOnWriteStateTable.StateTableEntry stateTableEntry2 = var7_7.next;
            }
        }
        Assert.assertEquals((long)mapSize, (long)pos);
        return result;
    }

    private Tuple3<Integer, Integer, ArrayList<Integer>>[] manualDeepDump(HashMap<Tuple2<Integer, Integer>, ArrayList<Integer>> map) {
        Tuple3[] result = new Tuple3[map.size()];
        int pos = 0;
        for (Map.Entry<Tuple2<Integer, Integer>, ArrayList<Integer>> entry : map.entrySet()) {
            Integer key = (Integer)entry.getKey().f0;
            Integer namespace = (Integer)entry.getKey().f1;
            result[pos++] = new Tuple3((Object)key, (Object)namespace, new ArrayList(entry.getValue()));
        }
        return result;
    }

    private void deepCheck(Tuple3<Integer, Integer, ArrayList<Integer>>[] a, Tuple3<Integer, Integer, ArrayList<Integer>>[] b) {
        if (a == b) {
            return;
        }
        Assert.assertEquals((long)a.length, (long)b.length);
        Comparator<Tuple3<Integer, Integer, ArrayList<Integer>>> comparator = new Comparator<Tuple3<Integer, Integer, ArrayList<Integer>>>(){

            @Override
            public int compare(Tuple3<Integer, Integer, ArrayList<Integer>> o1, Tuple3<Integer, Integer, ArrayList<Integer>> o2) {
                int namespaceDiff = (Integer)o1.f1 - (Integer)o2.f1;
                return namespaceDiff != 0 ? namespaceDiff : (Integer)o1.f0 - (Integer)o2.f0;
            }
        };
        Arrays.sort(a, comparator);
        Arrays.sort(b, comparator);
        for (int i = 0; i < a.length; ++i) {
            Tuple3<Integer, Integer, ArrayList<Integer>> av = a[i];
            Tuple3<Integer, Integer, ArrayList<Integer>> bv = b[i];
            Assert.assertEquals((Object)av.f0, (Object)bv.f0);
            Assert.assertEquals((Object)av.f1, (Object)bv.f1);
            Assert.assertEquals((Object)av.f2, (Object)bv.f2);
        }
    }

    static class TestDuplicateSerializer
    extends TypeSerializer<Integer> {
        private static final long serialVersionUID = 1L;
        private static final Integer ZERO = 0;
        private boolean disabled = false;

        public boolean isImmutableType() {
            return true;
        }

        public TypeSerializer<Integer> duplicate() {
            return new TestDuplicateSerializer();
        }

        public Integer createInstance() {
            return ZERO;
        }

        public Integer copy(Integer from) {
            return from;
        }

        public Integer copy(Integer from, Integer reuse) {
            return from;
        }

        public int getLength() {
            return 4;
        }

        public void serialize(Integer record, DataOutputView target) throws IOException {
            Assert.assertFalse((boolean)this.disabled);
            target.writeInt(record.intValue());
        }

        public Integer deserialize(DataInputView source) throws IOException {
            Assert.assertFalse((boolean)this.disabled);
            return source.readInt();
        }

        public Integer deserialize(Integer reuse, DataInputView source) throws IOException {
            Assert.assertFalse((boolean)this.disabled);
            return this.deserialize(source);
        }

        public void copy(DataInputView source, DataOutputView target) throws IOException {
            Assert.assertFalse((boolean)this.disabled);
            target.writeInt(source.readInt());
        }

        public boolean equals(Object obj) {
            return obj instanceof TestDuplicateSerializer;
        }

        public boolean canEqual(Object obj) {
            return obj instanceof TestDuplicateSerializer;
        }

        public int hashCode() {
            return ((Object)((Object)this)).getClass().hashCode();
        }

        public void disable() {
            this.disabled = true;
        }

        public TypeSerializerConfigSnapshot snapshotConfiguration() {
            throw new UnsupportedOperationException();
        }

        public CompatibilityResult<Integer> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
            throw new UnsupportedOperationException();
        }
    }

    static class MockInternalKeyContext<T>
    implements InternalKeyContext<T> {
        private T key;
        private final TypeSerializer<T> serializer;
        private final KeyGroupRange keyGroupRange;

        public MockInternalKeyContext(TypeSerializer<T> serializer) {
            this.serializer = serializer;
            this.keyGroupRange = new KeyGroupRange(0, 0);
        }

        public void setKey(T key) {
            this.key = key;
        }

        public T getCurrentKey() {
            return this.key;
        }

        public int getCurrentKeyGroupIndex() {
            return 0;
        }

        public int getNumberOfKeyGroups() {
            return 1;
        }

        public KeyGroupRange getKeyGroupRange() {
            return this.keyGroupRange;
        }

        public TypeSerializer<T> getKeySerializer() {
            return this.serializer;
        }
    }
}

