/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.util.Collections;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.DefaultKeyedStateStore;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

public class StreamingRuntimeContextTest {
    @Test
    public void testValueStateInstantiation() throws Exception {
        ExecutionConfig config = new ExecutionConfig();
        config.registerKryoType(Path.class);
        AtomicReference<Object> descriptorCapture = new AtomicReference<Object>();
        StreamingRuntimeContext context = new StreamingRuntimeContext(StreamingRuntimeContextTest.createDescriptorCapturingMockOp(descriptorCapture, config), StreamingRuntimeContextTest.createMockEnvironment(), Collections.emptyMap());
        ValueStateDescriptor descr = new ValueStateDescriptor("name", TaskInfo.class);
        context.getState(descr);
        StateDescriptor descrIntercepted = (StateDescriptor)descriptorCapture.get();
        TypeSerializer serializer = descrIntercepted.getSerializer();
        Assert.assertTrue((boolean)(serializer instanceof KryoSerializer));
        Assert.assertTrue((((KryoSerializer)serializer).getKryo().getRegistration(Path.class).getId() > 0 ? 1 : 0) != 0);
    }

    @Test
    public void testReducingStateInstantiation() throws Exception {
        ExecutionConfig config = new ExecutionConfig();
        config.registerKryoType(Path.class);
        AtomicReference<Object> descriptorCapture = new AtomicReference<Object>();
        StreamingRuntimeContext context = new StreamingRuntimeContext(StreamingRuntimeContextTest.createDescriptorCapturingMockOp(descriptorCapture, config), StreamingRuntimeContextTest.createMockEnvironment(), Collections.emptyMap());
        ReduceFunction reducer = (ReduceFunction)Mockito.mock(ReduceFunction.class);
        ReducingStateDescriptor descr = new ReducingStateDescriptor("name", reducer, TaskInfo.class);
        context.getReducingState(descr);
        StateDescriptor descrIntercepted = (StateDescriptor)descriptorCapture.get();
        TypeSerializer serializer = descrIntercepted.getSerializer();
        Assert.assertTrue((boolean)(serializer instanceof KryoSerializer));
        Assert.assertTrue((((KryoSerializer)serializer).getKryo().getRegistration(Path.class).getId() > 0 ? 1 : 0) != 0);
    }

    @Test
    public void testAggregatingStateInstantiation() throws Exception {
        ExecutionConfig config = new ExecutionConfig();
        config.registerKryoType(Path.class);
        AtomicReference<Object> descriptorCapture = new AtomicReference<Object>();
        StreamingRuntimeContext context = new StreamingRuntimeContext(StreamingRuntimeContextTest.createDescriptorCapturingMockOp(descriptorCapture, config), StreamingRuntimeContextTest.createMockEnvironment(), Collections.emptyMap());
        AggregateFunction aggregate = (AggregateFunction)Mockito.mock(AggregateFunction.class);
        AggregatingStateDescriptor descr = new AggregatingStateDescriptor("name", aggregate, TaskInfo.class);
        context.getAggregatingState(descr);
        AggregatingStateDescriptor descrIntercepted = (AggregatingStateDescriptor)descriptorCapture.get();
        TypeSerializer serializer = descrIntercepted.getSerializer();
        Assert.assertTrue((boolean)(serializer instanceof KryoSerializer));
        Assert.assertTrue((((KryoSerializer)serializer).getKryo().getRegistration(Path.class).getId() > 0 ? 1 : 0) != 0);
    }

    @Test
    public void testFoldingStateInstantiation() throws Exception {
        ExecutionConfig config = new ExecutionConfig();
        config.registerKryoType(Path.class);
        AtomicReference<Object> descriptorCapture = new AtomicReference<Object>();
        StreamingRuntimeContext context = new StreamingRuntimeContext(StreamingRuntimeContextTest.createDescriptorCapturingMockOp(descriptorCapture, config), StreamingRuntimeContextTest.createMockEnvironment(), Collections.emptyMap());
        FoldFunction folder = (FoldFunction)Mockito.mock(FoldFunction.class);
        FoldingStateDescriptor descr = new FoldingStateDescriptor("name", null, folder, TaskInfo.class);
        context.getFoldingState(descr);
        FoldingStateDescriptor descrIntercepted = (FoldingStateDescriptor)descriptorCapture.get();
        TypeSerializer serializer = descrIntercepted.getSerializer();
        Assert.assertTrue((boolean)(serializer instanceof KryoSerializer));
        Assert.assertTrue((((KryoSerializer)serializer).getKryo().getRegistration(Path.class).getId() > 0 ? 1 : 0) != 0);
    }

    @Test
    public void testListStateInstantiation() throws Exception {
        ExecutionConfig config = new ExecutionConfig();
        config.registerKryoType(Path.class);
        AtomicReference<Object> descriptorCapture = new AtomicReference<Object>();
        StreamingRuntimeContext context = new StreamingRuntimeContext(StreamingRuntimeContextTest.createDescriptorCapturingMockOp(descriptorCapture, config), StreamingRuntimeContextTest.createMockEnvironment(), Collections.emptyMap());
        ListStateDescriptor descr = new ListStateDescriptor("name", TaskInfo.class);
        context.getListState(descr);
        ListStateDescriptor descrIntercepted = (ListStateDescriptor)descriptorCapture.get();
        TypeSerializer serializer = descrIntercepted.getSerializer();
        Assert.assertTrue((boolean)(serializer instanceof ListSerializer));
        TypeSerializer elementSerializer = descrIntercepted.getElementSerializer();
        Assert.assertTrue((boolean)(elementSerializer instanceof KryoSerializer));
        Assert.assertTrue((((KryoSerializer)elementSerializer).getKryo().getRegistration(Path.class).getId() > 0 ? 1 : 0) != 0);
    }

    @Test
    public void testListStateReturnsEmptyListByDefault() throws Exception {
        StreamingRuntimeContext context = new StreamingRuntimeContext(StreamingRuntimeContextTest.createListPlainMockOp(), StreamingRuntimeContextTest.createMockEnvironment(), Collections.emptyMap());
        ListStateDescriptor descr = new ListStateDescriptor("name", String.class);
        ListState state = context.getListState(descr);
        Iterable value = (Iterable)state.get();
        Assert.assertNotNull((Object)value);
        Assert.assertFalse((boolean)value.iterator().hasNext());
    }

    @Test
    public void testMapStateInstantiation() throws Exception {
        ExecutionConfig config = new ExecutionConfig();
        config.registerKryoType(Path.class);
        AtomicReference<Object> descriptorCapture = new AtomicReference<Object>();
        StreamingRuntimeContext context = new StreamingRuntimeContext(StreamingRuntimeContextTest.createDescriptorCapturingMockOp(descriptorCapture, config), StreamingRuntimeContextTest.createMockEnvironment(), Collections.emptyMap());
        MapStateDescriptor descr = new MapStateDescriptor("name", String.class, TaskInfo.class);
        context.getMapState(descr);
        MapStateDescriptor descrIntercepted = (MapStateDescriptor)descriptorCapture.get();
        TypeSerializer valueSerializer = descrIntercepted.getValueSerializer();
        Assert.assertTrue((boolean)(valueSerializer instanceof KryoSerializer));
        Assert.assertTrue((((KryoSerializer)valueSerializer).getKryo().getRegistration(Path.class).getId() > 0 ? 1 : 0) != 0);
    }

    @Test
    public void testMapStateReturnsEmptyMapByDefault() throws Exception {
        StreamingRuntimeContext context = new StreamingRuntimeContext(StreamingRuntimeContextTest.createMapPlainMockOp(), StreamingRuntimeContextTest.createMockEnvironment(), Collections.emptyMap());
        MapStateDescriptor descr = new MapStateDescriptor("name", Integer.class, String.class);
        MapState state = context.getMapState(descr);
        Iterable value = state.entries();
        Assert.assertNotNull((Object)value);
        Assert.assertFalse((boolean)value.iterator().hasNext());
    }

    private static AbstractStreamOperator<?> createDescriptorCapturingMockOp(final AtomicReference<Object> ref, ExecutionConfig config) throws Exception {
        AbstractStreamOperator operatorMock = (AbstractStreamOperator)Mockito.mock(AbstractStreamOperator.class);
        KeyedStateBackend keyedStateBackend = (KeyedStateBackend)Mockito.mock(KeyedStateBackend.class);
        DefaultKeyedStateStore keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, config);
        Mockito.when((Object)operatorMock.getExecutionConfig()).thenReturn((Object)config);
        ((KeyedStateBackend)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ref.set(invocationOnMock.getArguments()[2]);
                return null;
            }
        }).when((Object)keyedStateBackend)).getPartitionedState(Matchers.any(), (TypeSerializer)Mockito.any(TypeSerializer.class), (StateDescriptor)Mockito.any(StateDescriptor.class));
        Mockito.when((Object)operatorMock.getKeyedStateStore()).thenReturn((Object)keyedStateStore);
        return operatorMock;
    }

    private static AbstractStreamOperator<?> createListPlainMockOp() throws Exception {
        AbstractStreamOperator operatorMock = (AbstractStreamOperator)Mockito.mock(AbstractStreamOperator.class);
        ExecutionConfig config = new ExecutionConfig();
        KeyedStateBackend keyedStateBackend = (KeyedStateBackend)Mockito.mock(KeyedStateBackend.class);
        DefaultKeyedStateStore keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, config);
        Mockito.when((Object)operatorMock.getExecutionConfig()).thenReturn((Object)config);
        ((KeyedStateBackend)Mockito.doAnswer((Answer)new Answer<ListState<String>>(){

            public ListState<String> answer(InvocationOnMock invocationOnMock) throws Throwable {
                ListStateDescriptor descr = (ListStateDescriptor)invocationOnMock.getArguments()[2];
                AbstractKeyedStateBackend backend = new MemoryStateBackend().createKeyedStateBackend((Environment)new DummyEnvironment("test_task", 1, 0), new JobID(), "test_op", (TypeSerializer)IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()));
                backend.setCurrentKey((Object)0);
                return (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)descr);
            }
        }).when((Object)keyedStateBackend)).getPartitionedState(Matchers.any(), (TypeSerializer)Mockito.any(TypeSerializer.class), (StateDescriptor)Mockito.any(ListStateDescriptor.class));
        Mockito.when((Object)operatorMock.getKeyedStateStore()).thenReturn((Object)keyedStateStore);
        return operatorMock;
    }

    private static AbstractStreamOperator<?> createMapPlainMockOp() throws Exception {
        AbstractStreamOperator operatorMock = (AbstractStreamOperator)Mockito.mock(AbstractStreamOperator.class);
        ExecutionConfig config = new ExecutionConfig();
        KeyedStateBackend keyedStateBackend = (KeyedStateBackend)Mockito.mock(KeyedStateBackend.class);
        DefaultKeyedStateStore keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, config);
        Mockito.when((Object)operatorMock.getExecutionConfig()).thenReturn((Object)config);
        ((KeyedStateBackend)Mockito.doAnswer((Answer)new Answer<MapState<Integer, String>>(){

            public MapState<Integer, String> answer(InvocationOnMock invocationOnMock) throws Throwable {
                MapStateDescriptor descr = (MapStateDescriptor)invocationOnMock.getArguments()[2];
                AbstractKeyedStateBackend backend = new MemoryStateBackend().createKeyedStateBackend((Environment)new DummyEnvironment("test_task", 1, 0), new JobID(), "test_op", (TypeSerializer)IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()));
                backend.setCurrentKey((Object)0);
                return (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)descr);
            }
        }).when((Object)keyedStateBackend)).getPartitionedState(Matchers.any(), (TypeSerializer)Mockito.any(TypeSerializer.class), (StateDescriptor)Mockito.any(MapStateDescriptor.class));
        Mockito.when((Object)operatorMock.getKeyedStateStore()).thenReturn((Object)keyedStateStore);
        return operatorMock;
    }

    private static Environment createMockEnvironment() {
        Environment env = (Environment)Mockito.mock(Environment.class);
        Mockito.when((Object)env.getUserClassLoader()).thenReturn((Object)StreamingRuntimeContextTest.class.getClassLoader());
        Mockito.when((Object)env.getDistributedCacheEntries()).thenReturn(Collections.emptyMap());
        Mockito.when((Object)env.getTaskInfo()).thenReturn((Object)new TaskInfo("test task", 1, 0, 1, 1));
        return env;
    }
}

