/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateInitializationContextImpl;
import org.apache.flink.runtime.state.StatePartitionStreamProvider;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.util.LongArrayList;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

public class StateInitializationContextImplTest {
    static final int NUM_HANDLES = 10;
    private StateInitializationContextImpl initializationContext;
    private CloseableRegistry closableRegistry;
    private int writtenKeyGroups;
    private Set<Integer> writtenOperatorStates;

    @Before
    public void setUp() throws Exception {
        KeyGroupRangeOffsets offsets;
        DataOutputViewStreamWrapper dov;
        this.writtenKeyGroups = 0;
        this.writtenOperatorStates = new HashSet<Integer>();
        this.closableRegistry = new CloseableRegistry();
        OperatorStateStore stateStore = (OperatorStateStore)Mockito.mock(OperatorStateStore.class);
        ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos(64);
        ArrayList<KeyGroupsStateHandle> keyedStateHandles = new ArrayList<KeyGroupsStateHandle>(10);
        int prev = 0;
        for (int i = 0; i < 10; ++i) {
            out.reset();
            int size = i % 4;
            int end = prev + size;
            dov = new DataOutputViewStreamWrapper((OutputStream)out);
            offsets = new KeyGroupRangeOffsets(i == 9 ? KeyGroupRange.EMPTY_KEY_GROUP_RANGE : new KeyGroupRange(prev, end));
            prev = end + 1;
            Iterator iterator = offsets.getKeyGroupRange().iterator();
            while (iterator.hasNext()) {
                int kg = (Integer)iterator.next();
                offsets.setKeyGroupOffset(kg, (long)out.getPosition());
                dov.writeInt(kg);
                ++this.writtenKeyGroups;
            }
            KeyGroupsStateHandle handle = new KeyGroupsStateHandle(offsets, (StreamStateHandle)new ByteStateHandleCloseChecking("kg-" + i, out.toByteArray()));
            keyedStateHandles.add(handle);
        }
        ArrayList<OperatorStateHandle> operatorStateHandles = new ArrayList<OperatorStateHandle>(10);
        for (int i = 0; i < 10; ++i) {
            int size = i % 4;
            out.reset();
            dov = new DataOutputViewStreamWrapper((OutputStream)out);
            offsets = new LongArrayList(size);
            for (int s = 0; s < size; ++s) {
                offsets.add((long)out.getPosition());
                int val = i * 10 + s;
                dov.writeInt(val);
                this.writtenOperatorStates.add(val);
            }
            HashMap<String, OperatorStateHandle.StateMetaInfo> offsetsMap = new HashMap<String, OperatorStateHandle.StateMetaInfo>();
            offsetsMap.put("_default_", new OperatorStateHandle.StateMetaInfo(offsets.toArray(), OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
            OperatorStateHandle operatorStateHandle = new OperatorStateHandle(offsetsMap, (StreamStateHandle)new ByteStateHandleCloseChecking("os-" + i, out.toByteArray()));
            operatorStateHandles.add(operatorStateHandle);
        }
        this.initializationContext = new StateInitializationContextImpl(true, stateStore, (KeyedStateStore)Mockito.mock(KeyedStateStore.class), keyedStateHandles, operatorStateHandles, this.closableRegistry);
    }

    @Test
    public void getOperatorStateStreams() throws Exception {
        int i = 0;
        int s = 0;
        for (StatePartitionStreamProvider streamProvider : this.initializationContext.getRawOperatorStateInputs()) {
            if (0 == i % 4) {
                ++i;
            }
            Assert.assertNotNull((Object)streamProvider);
            try (InputStream is = streamProvider.getStream();){
                DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(is);
                int val = div.readInt();
                Assert.assertEquals((long)(i * 10 + s), (long)val);
            }
            if (++s != i % 4) continue;
            s = 0;
            ++i;
        }
    }

    @Test
    public void getKeyedStateStreams() throws Exception {
        int readKeyGroupCount = 0;
        for (KeyGroupStatePartitionStreamProvider stateStreamProvider : this.initializationContext.getRawKeyedStateInputs()) {
            Assert.assertNotNull((Object)stateStreamProvider);
            InputStream is = stateStreamProvider.getStream();
            Throwable throwable = null;
            try {
                DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(is);
                int val = div.readInt();
                ++readKeyGroupCount;
                Assert.assertEquals((long)stateStreamProvider.getKeyGroupId(), (long)val);
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (is == null) continue;
                if (throwable != null) {
                    try {
                        is.close();
                    }
                    catch (Throwable throwable3) {
                        throwable.addSuppressed(throwable3);
                    }
                    continue;
                }
                is.close();
            }
        }
        Assert.assertEquals((long)this.writtenKeyGroups, (long)readKeyGroupCount);
    }

    @Test
    public void getOperatorStateStore() throws Exception {
        HashSet<Integer> readStatesCount = new HashSet<Integer>();
        for (StatePartitionStreamProvider statePartitionStreamProvider : this.initializationContext.getRawOperatorStateInputs()) {
            Assert.assertNotNull((Object)statePartitionStreamProvider);
            InputStream is = statePartitionStreamProvider.getStream();
            Throwable throwable = null;
            try {
                DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(is);
                Assert.assertTrue((boolean)readStatesCount.add(div.readInt()));
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (is == null) continue;
                if (throwable != null) {
                    try {
                        is.close();
                    }
                    catch (Throwable throwable3) {
                        throwable.addSuppressed(throwable3);
                    }
                    continue;
                }
                is.close();
            }
        }
        Assert.assertEquals(this.writtenOperatorStates, readStatesCount);
    }

    @Test
    public void close() throws Exception {
        int count = 0;
        int stopCount = 5;
        boolean isClosed = false;
        try {
            for (KeyGroupStatePartitionStreamProvider stateStreamProvider : this.initializationContext.getRawKeyedStateInputs()) {
                Assert.assertNotNull((Object)stateStreamProvider);
                if (count == stopCount) {
                    this.initializationContext.close();
                    isClosed = true;
                }
                InputStream is = stateStreamProvider.getStream();
                Throwable throwable = null;
                try {
                    DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(is);
                    try {
                        int val = div.readInt();
                        Assert.assertEquals((long)stateStreamProvider.getKeyGroupId(), (long)val);
                        if (isClosed) {
                            Assert.fail((String)"Close was ignored: stream");
                        }
                        ++count;
                    }
                    catch (IOException ioex) {
                        if (isClosed) continue;
                        throw ioex;
                    }
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                finally {
                    if (is == null) continue;
                    if (throwable != null) {
                        try {
                            is.close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                        continue;
                    }
                    is.close();
                }
            }
            Assert.fail((String)"Close was ignored: registry");
        }
        catch (IOException iex) {
            Assert.assertTrue((boolean)isClosed);
            Assert.assertEquals((long)stopCount, (long)count);
        }
    }

    static final class ByteStateHandleCloseChecking
    extends ByteStreamStateHandle {
        private static final long serialVersionUID = -6201941296931334140L;

        public ByteStateHandleCloseChecking(String handleName, byte[] data) {
            super(handleName, data);
        }

        public FSDataInputStream openInputStream() throws IOException {
            return new FSDataInputStream(){
                private int index = 0;
                private boolean closed = false;

                public void seek(long desired) throws IOException {
                    Preconditions.checkArgument((desired >= 0L && desired < Integer.MAX_VALUE ? 1 : 0) != 0);
                    this.index = (int)desired;
                }

                public long getPos() throws IOException {
                    return this.index;
                }

                public int read() throws IOException {
                    if (this.closed) {
                        throw new IOException("Stream closed");
                    }
                    return this.index < data.length ? data[this.index++] & 0xFF : -1;
                }

                public void close() throws IOException {
                    super.close();
                    this.closed = true;
                }
            };
        }
    }
}

