/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.heap;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.RunnableFuture;
import java.util.stream.Stream;
import org.apache.commons.collections.map.HashedMap;
import org.apache.commons.io.IOUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources;
import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.ArrayListSerializer;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.HashMapSerializer;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.heap.CopyOnWriteStateTable;
import org.apache.flink.runtime.state.heap.HeapAggregatingState;
import org.apache.flink.runtime.state.heap.HeapFoldingState;
import org.apache.flink.runtime.state.heap.HeapListState;
import org.apache.flink.runtime.state.heap.HeapMapState;
import org.apache.flink.runtime.state.heap.HeapReducingState;
import org.apache.flink.runtime.state.heap.HeapValueState;
import org.apache.flink.runtime.state.heap.NestedMapsStateTable;
import org.apache.flink.runtime.state.heap.StateTable;
import org.apache.flink.runtime.state.heap.StateTableByKeyGroupReader;
import org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders;
import org.apache.flink.runtime.state.heap.StateTableSnapshot;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalFoldingState;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HeapKeyedStateBackend<K>
extends AbstractKeyedStateBackend<K> {
    private static final Logger LOG = LoggerFactory.getLogger(HeapKeyedStateBackend.class);
    private final HashMap<String, StateTable<K, ?, ?>> stateTables = new HashMap();
    private final Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredKvStateMetaInfos;
    private final boolean asynchronousSnapshots;

    public HeapKeyedStateBackend(TaskKvStateRegistry kvStateRegistry, TypeSerializer<K> keySerializer, ClassLoader userCodeClassLoader, int numberOfKeyGroups, KeyGroupRange keyGroupRange, boolean asynchronousSnapshots, ExecutionConfig executionConfig) {
        super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig);
        this.asynchronousSnapshots = asynchronousSnapshots;
        LOG.info("Initializing heap keyed state backend with stream factory.");
        this.restoredKvStateMetaInfos = new HashMap();
    }

    private <N, V> StateTable<K, N, V> tryRegisterStateTable(TypeSerializer<N> namespaceSerializer, StateDescriptor<?, V> stateDesc) throws StateMigrationException {
        return this.tryRegisterStateTable(stateDesc.getName(), stateDesc.getType(), namespaceSerializer, stateDesc.getSerializer());
    }

    private <N, V> StateTable<K, N, V> tryRegisterStateTable(String stateName, StateDescriptor.Type stateType, TypeSerializer<N> namespaceSerializer, TypeSerializer<V> valueSerializer) throws StateMigrationException {
        RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<N, V>(stateType, stateName, namespaceSerializer, valueSerializer);
        StateTable<K, Object, Object> stateTable = this.stateTables.get(stateName);
        if (stateTable == null) {
            stateTable = this.newStateTable(newMetaInfo);
            this.stateTables.put(stateName, stateTable);
        } else {
            Preconditions.checkState((boolean)stateName.equals(stateTable.getMetaInfo().getName()), (Object)("Incompatible state names. Was [" + stateTable.getMetaInfo().getName() + "], registered with [" + newMetaInfo.getName() + "]."));
            if (!newMetaInfo.getStateType().equals((Object)StateDescriptor.Type.UNKNOWN) && !stateTable.getMetaInfo().getStateType().equals((Object)StateDescriptor.Type.UNKNOWN)) {
                Preconditions.checkState((boolean)newMetaInfo.getStateType().equals((Object)stateTable.getMetaInfo().getStateType()), (Object)("Incompatible state types. Was [" + stateTable.getMetaInfo().getStateType() + "], registered with [" + newMetaInfo.getStateType() + "]."));
            }
            RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> restoredMetaInfo = this.restoredKvStateMetaInfos.get(stateName);
            CompatibilityResult namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult(restoredMetaInfo.getNamespaceSerializer(), null, (TypeSerializerConfigSnapshot)restoredMetaInfo.getNamespaceSerializerConfigSnapshot(), newMetaInfo.getNamespaceSerializer());
            CompatibilityResult stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(restoredMetaInfo.getStateSerializer(), UnloadableDummyTypeSerializer.class, (TypeSerializerConfigSnapshot)restoredMetaInfo.getStateSerializerConfigSnapshot(), newMetaInfo.getStateSerializer());
            if (!namespaceCompatibility.isRequiresMigration() && !stateCompatibility.isRequiresMigration()) {
                stateTable.setMetaInfo(newMetaInfo);
            } else {
                throw new StateMigrationException("State migration isn't supported, yet.");
            }
        }
        return stateTable;
    }

    @Override
    public <N> Stream<K> getKeys(String state, N namespace) {
        if (!this.stateTables.containsKey(state)) {
            return Stream.empty();
        }
        StateTable<K, ?, ?> table = this.stateTables.get(state);
        return table.getKeys(namespace);
    }

    private boolean hasRegisteredState() {
        return !this.stateTables.isEmpty();
    }

    @Override
    public <N, V> InternalValueState<N, V> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<V> stateDesc) throws Exception {
        StateTable<K, N, V> stateTable = this.tryRegisterStateTable(namespaceSerializer, (StateDescriptor<?, V>)stateDesc);
        return new HeapValueState<K, N, V>(stateDesc, stateTable, this.keySerializer, namespaceSerializer);
    }

    @Override
    public <N, T> InternalListState<N, T> createListState(TypeSerializer<N> namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception {
        StateTable stateTable = this.tryRegisterStateTable(stateDesc.getName(), stateDesc.getType(), namespaceSerializer, new ArrayListSerializer(stateDesc.getElementSerializer()));
        return new HeapListState<K, N, T>(stateDesc, stateTable, this.keySerializer, namespaceSerializer);
    }

    @Override
    public <N, T> InternalReducingState<N, T> createReducingState(TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception {
        StateTable stateTable = this.tryRegisterStateTable(namespaceSerializer, (StateDescriptor)stateDesc);
        return new HeapReducingState<K, N, T>(stateDesc, stateTable, this.keySerializer, namespaceSerializer);
    }

    @Override
    public <N, T, ACC, R> InternalAggregatingState<N, T, R> createAggregatingState(TypeSerializer<N> namespaceSerializer, AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
        StateTable stateTable = this.tryRegisterStateTable(namespaceSerializer, (StateDescriptor)stateDesc);
        return new HeapAggregatingState<K, N, T, ACC, R>(stateDesc, stateTable, this.keySerializer, namespaceSerializer);
    }

    @Override
    public <N, T, ACC> InternalFoldingState<N, T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
        StateTable<K, N, ACC> stateTable = this.tryRegisterStateTable(namespaceSerializer, (StateDescriptor)stateDesc);
        return new HeapFoldingState<K, N, T, ACC>(stateDesc, stateTable, this.keySerializer, namespaceSerializer);
    }

    @Override
    public <N, UK, UV> InternalMapState<N, UK, UV> createMapState(TypeSerializer<N> namespaceSerializer, MapStateDescriptor<UK, UV> stateDesc) throws Exception {
        StateTable stateTable = this.tryRegisterStateTable(stateDesc.getName(), stateDesc.getType(), namespaceSerializer, new HashMapSerializer(stateDesc.getKeySerializer(), stateDesc.getValueSerializer()));
        return new HeapMapState<K, N, UK, UV>(stateDesc, stateTable, this.keySerializer, namespaceSerializer);
    }

    @Override
    public RunnableFuture<KeyedStateHandle> snapshot(final long checkpointId, final long timestamp, final CheckpointStreamFactory streamFactory, CheckpointOptions checkpointOptions) throws Exception {
        if (!this.hasRegisteredState()) {
            return DoneFuture.nullValue();
        }
        long syncStartTime = System.currentTimeMillis();
        Preconditions.checkState((this.stateTables.size() <= Short.MAX_VALUE ? 1 : 0) != 0, (Object)("Too many KV-States: " + this.stateTables.size() + ". Currently at most " + Short.MAX_VALUE + " states are supported"));
        ArrayList metaInfoSnapshots = new ArrayList(this.stateTables.size());
        HashMap<String, Integer> kVStateToId = new HashMap<String, Integer>(this.stateTables.size());
        HashedMap cowStateStableSnapshots = new HashedMap(this.stateTables.size());
        for (Map.Entry<String, StateTable<K, ?, ?>> kvState : this.stateTables.entrySet()) {
            kVStateToId.put(kvState.getKey(), kVStateToId.size());
            StateTable<K, ?, ?> stateTable = kvState.getValue();
            if (null == stateTable) continue;
            metaInfoSnapshots.add(stateTable.getMetaInfo().snapshot());
            cowStateStableSnapshots.put(stateTable, stateTable.createSnapshot());
        }
        KeyedBackendSerializationProxy serializationProxy = new KeyedBackendSerializationProxy(this.keySerializer, metaInfoSnapshots, !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, this.keyGroupCompressionDecorator));
        AbstractAsyncCallableWithResources<KeyedStateHandle> ioCallable = new AbstractAsyncCallableWithResources<KeyedStateHandle>((Map)cowStateStableSnapshots, serializationProxy, kVStateToId){
            CheckpointStreamFactory.CheckpointStateOutputStream stream = null;
            final /* synthetic */ Map val$cowStateStableSnapshots;
            final /* synthetic */ KeyedBackendSerializationProxy val$serializationProxy;
            final /* synthetic */ Map val$kVStateToId;
            {
                this.val$cowStateStableSnapshots = map;
                this.val$serializationProxy = keyedBackendSerializationProxy;
                this.val$kVStateToId = map2;
            }

            @Override
            protected void acquireResources() throws Exception {
                this.stream = streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
                HeapKeyedStateBackend.this.cancelStreamRegistry.registerCloseable((Closeable)((Object)this.stream));
            }

            @Override
            protected void releaseResources() throws Exception {
                if (HeapKeyedStateBackend.this.cancelStreamRegistry.unregisterCloseable((Closeable)((Object)this.stream))) {
                    IOUtils.closeQuietly((OutputStream)((Object)this.stream));
                    this.stream = null;
                }
                for (StateTableSnapshot tableSnapshot : this.val$cowStateStableSnapshots.values()) {
                    tableSnapshot.release();
                }
            }

            @Override
            protected void stopOperation() throws Exception {
                if (HeapKeyedStateBackend.this.cancelStreamRegistry.unregisterCloseable((Closeable)((Object)this.stream))) {
                    IOUtils.closeQuietly((OutputStream)((Object)this.stream));
                    this.stream = null;
                }
            }

            @Override
            public KeyGroupsStateHandle performOperation() throws Exception {
                long asyncStartTime = System.currentTimeMillis();
                CheckpointStreamFactory.CheckpointStateOutputStream localStream = this.stream;
                DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper((OutputStream)((Object)localStream));
                this.val$serializationProxy.write((DataOutputView)outView);
                long[] keyGroupRangeOffsets = new long[HeapKeyedStateBackend.this.keyGroupRange.getNumberOfKeyGroups()];
                for (int keyGroupPos = 0; keyGroupPos < HeapKeyedStateBackend.this.keyGroupRange.getNumberOfKeyGroups(); ++keyGroupPos) {
                    int keyGroupId = HeapKeyedStateBackend.this.keyGroupRange.getKeyGroupId(keyGroupPos);
                    keyGroupRangeOffsets[keyGroupPos] = localStream.getPos();
                    outView.writeInt(keyGroupId);
                    for (Map.Entry kvState : HeapKeyedStateBackend.this.stateTables.entrySet()) {
                        OutputStream kgCompressionOut = HeapKeyedStateBackend.this.keyGroupCompressionDecorator.decorateWithCompression((OutputStream)((Object)localStream));
                        DataOutputViewStreamWrapper kgCompressionView = new DataOutputViewStreamWrapper(kgCompressionOut);
                        kgCompressionView.writeShort(((Integer)this.val$kVStateToId.get(kvState.getKey())).intValue());
                        ((StateTableSnapshot)this.val$cowStateStableSnapshots.get(kvState.getValue())).writeMappingsInKeyGroup((DataOutputView)kgCompressionView, keyGroupId);
                        kgCompressionOut.close();
                    }
                }
                if (HeapKeyedStateBackend.this.cancelStreamRegistry.unregisterCloseable((Closeable)((Object)this.stream))) {
                    StreamStateHandle streamStateHandle = this.stream.closeAndGetHandle();
                    this.stream = null;
                    if (HeapKeyedStateBackend.this.asynchronousSnapshots) {
                        LOG.info("Heap backend snapshot ({}, asynchronous part) in thread {} took {} ms.", new Object[]{streamFactory, Thread.currentThread(), System.currentTimeMillis() - asyncStartTime});
                    }
                    if (streamStateHandle != null) {
                        KeyGroupRangeOffsets offsets = new KeyGroupRangeOffsets(HeapKeyedStateBackend.this.keyGroupRange, keyGroupRangeOffsets);
                        KeyGroupsStateHandle keyGroupsStateHandle = new KeyGroupsStateHandle(offsets, streamStateHandle);
                        return keyGroupsStateHandle;
                    }
                }
                return null;
            }
        };
        AsyncStoppableTaskWithCallback<KeyedStateHandle> task = AsyncStoppableTaskWithCallback.from(ioCallable);
        if (!this.asynchronousSnapshots) {
            task.run();
        }
        LOG.info("Heap backend snapshot (" + streamFactory + ", synchronous part) in thread " + Thread.currentThread() + " took " + (System.currentTimeMillis() - syncStartTime) + " ms.");
        return task;
    }

    @Override
    public void restore(Collection<KeyedStateHandle> restoredState) throws Exception {
        if (restoredState == null || restoredState.isEmpty()) {
            return;
        }
        LOG.info("Initializing heap keyed state backend from snapshot.");
        if (LOG.isDebugEnabled()) {
            LOG.debug("Restoring snapshot from state handles: {}.", restoredState);
        }
        this.restorePartitionedState(restoredState);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void restorePartitionedState(Collection<KeyedStateHandle> state) throws Exception {
        HashMap<Integer, String> kvStatesById = new HashMap<Integer, String>();
        int numRegisteredKvStates = 0;
        this.stateTables.clear();
        boolean keySerializerRestored = false;
        for (KeyedStateHandle keyedStateHandle : state) {
            if (keyedStateHandle == null) continue;
            if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) {
                throw new IllegalStateException("Unexpected state handle type, expected: " + KeyGroupsStateHandle.class + ", but found: " + keyedStateHandle.getClass());
            }
            KeyGroupsStateHandle keyGroupsStateHandle = (KeyGroupsStateHandle)keyedStateHandle;
            FSDataInputStream fsDataInputStream = keyGroupsStateHandle.openInputStream();
            this.cancelStreamRegistry.registerCloseable((Closeable)fsDataInputStream);
            try {
                DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper((InputStream)fsDataInputStream);
                KeyedBackendSerializationProxy serializationProxy = new KeyedBackendSerializationProxy(this.userCodeClassLoader);
                serializationProxy.read((DataInputView)inView);
                if (!keySerializerRestored) {
                    if (CompatibilityUtil.resolveCompatibilityResult(serializationProxy.getKeySerializer(), UnloadableDummyTypeSerializer.class, (TypeSerializerConfigSnapshot)serializationProxy.getKeySerializerConfigSnapshot(), (TypeSerializer)this.keySerializer).isRequiresMigration()) {
                        throw new StateMigrationException("The new key serializer is not compatible to read previous keys. Aborting now since state migration is currently not available");
                    }
                    keySerializerRestored = true;
                }
                List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos = serializationProxy.getStateMetaInfoSnapshots();
                for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> restoredMetaInfo : restoredMetaInfos) {
                    if (restoredMetaInfo.getStateSerializer() == null || restoredMetaInfo.getStateSerializer() instanceof UnloadableDummyTypeSerializer) {
                        throw new IOException("Unable to restore keyed state [" + restoredMetaInfo.getName() + "]. For memory-backed keyed state, the previous serializer of the keyed state must be present; the serializer could have been removed from the classpath, or its implementation have changed and could not be loaded. This is a temporary restriction that will be fixed in future versions.");
                    }
                    this.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo);
                    StateTable<K, ?, ?> stateTable = this.stateTables.get(restoredMetaInfo.getName());
                    if (null != stateTable) continue;
                    RegisteredKeyedBackendStateMetaInfo registeredKeyedBackendStateMetaInfo = new RegisteredKeyedBackendStateMetaInfo(restoredMetaInfo.getStateType(), restoredMetaInfo.getName(), restoredMetaInfo.getNamespaceSerializer(), restoredMetaInfo.getStateSerializer());
                    stateTable = this.newStateTable(registeredKeyedBackendStateMetaInfo);
                    this.stateTables.put(restoredMetaInfo.getName(), stateTable);
                    kvStatesById.put(numRegisteredKvStates, restoredMetaInfo.getName());
                    ++numRegisteredKvStates;
                }
                StreamCompressionDecorator streamCompressionDecorator = serializationProxy.isUsingKeyGroupCompression() ? SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE;
                for (Tuple2<Integer, Long> groupOffset : keyGroupsStateHandle.getGroupRangeOffsets()) {
                    int keyGroupIndex = (Integer)groupOffset.f0;
                    long offset = (Long)groupOffset.f1;
                    Preconditions.checkState((boolean)this.keyGroupRange.contains(keyGroupIndex), (Object)"The key group must belong to the backend.");
                    fsDataInputStream.seek(offset);
                    int writtenKeyGroupIndex = inView.readInt();
                    InputStream kgCompressionInStream = streamCompressionDecorator.decorateWithCompression((InputStream)fsDataInputStream);
                    Throwable throwable = null;
                    try {
                        DataInputViewStreamWrapper kgCompressionInView = new DataInputViewStreamWrapper(kgCompressionInStream);
                        Preconditions.checkState((writtenKeyGroupIndex == keyGroupIndex ? 1 : 0) != 0, (Object)"Unexpected key-group in restore.");
                        for (int i = 0; i < restoredMetaInfos.size(); ++i) {
                            short kvStateId = kgCompressionInView.readShort();
                            StateTable<K, ?, ?> stateTable = this.stateTables.get(kvStatesById.get(kvStateId));
                            StateTableByKeyGroupReader keyGroupReader = StateTableByKeyGroupReaders.readerForVersion(stateTable, serializationProxy.getReadVersion());
                            keyGroupReader.readMappingsInKeyGroup((DataInputView)kgCompressionInView, keyGroupIndex);
                        }
                    }
                    catch (Throwable throwable2) {
                        throwable = throwable2;
                        throw throwable2;
                    }
                    finally {
                        if (kgCompressionInStream == null) continue;
                        if (throwable != null) {
                            try {
                                kgCompressionInStream.close();
                            }
                            catch (Throwable throwable3) {
                                throwable.addSuppressed(throwable3);
                            }
                            continue;
                        }
                        kgCompressionInStream.close();
                    }
                }
            }
            finally {
                if (!this.cancelStreamRegistry.unregisterCloseable((Closeable)fsDataInputStream)) continue;
                IOUtils.closeQuietly((InputStream)fsDataInputStream);
            }
        }
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) {
    }

    public String toString() {
        return "HeapKeyedStateBackend";
    }

    @Override
    @VisibleForTesting
    public int numStateEntries() {
        int sum = 0;
        for (StateTable<K, ?, ?> stateTable : this.stateTables.values()) {
            sum += stateTable.size();
        }
        return sum;
    }

    @VisibleForTesting
    public int numStateEntries(Object namespace) {
        int sum = 0;
        for (StateTable<K, ?, ?> stateTable : this.stateTables.values()) {
            sum += stateTable.sizeOfNamespace(namespace);
        }
        return sum;
    }

    public <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo) {
        return this.asynchronousSnapshots ? new CopyOnWriteStateTable(this, newMetaInfo) : new NestedMapsStateTable(this, newMetaInfo);
    }

    @Override
    public boolean supportsAsynchronousSnapshots() {
        return this.asynchronousSnapshots;
    }
}

