/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StoppableStreamSource;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.StreamSourceContexts;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.CollectorOutput;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

public class StreamSourceOperatorTest {
    @Test
    public void testEmitMaxWatermarkForFiniteSource() throws Exception {
        StreamSource operator = new StreamSource(new FiniteSource());
        ArrayList<StreamElement> output = new ArrayList<StreamElement>();
        StreamSourceOperatorTest.setupSourceOperator(operator, TimeCharacteristic.EventTime, 0L, 0L);
        operator.run(new Object(), (StreamStatusMaintainer)Mockito.mock(StreamStatusMaintainer.class), new CollectorOutput(output));
        Assert.assertEquals((long)1L, (long)output.size());
        Assert.assertEquals((Object)Watermark.MAX_WATERMARK, output.get(0));
    }

    @Test
    public void testNoMaxWatermarkOnImmediateCancel() throws Exception {
        ArrayList<StreamElement> output = new ArrayList<StreamElement>();
        StreamSource operator = new StreamSource(new InfiniteSource());
        StreamSourceOperatorTest.setupSourceOperator(operator, TimeCharacteristic.EventTime, 0L, 0L);
        operator.cancel();
        operator.run(new Object(), (StreamStatusMaintainer)Mockito.mock(StreamStatusMaintainer.class), new CollectorOutput(output));
        Assert.assertTrue((boolean)output.isEmpty());
    }

    @Test
    public void testNoMaxWatermarkOnAsyncCancel() throws Exception {
        ArrayList<StreamElement> output = new ArrayList<StreamElement>();
        final Thread runner = Thread.currentThread();
        final StreamSource operator = new StreamSource(new InfiniteSource());
        StreamSourceOperatorTest.setupSourceOperator(operator, TimeCharacteristic.EventTime, 0L, 0L);
        new Thread("canceler"){

            @Override
            public void run() {
                try {
                    Thread.sleep(200L);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                operator.cancel();
                runner.interrupt();
            }
        }.start();
        try {
            operator.run(new Object(), (StreamStatusMaintainer)Mockito.mock(StreamStatusMaintainer.class), new CollectorOutput(output));
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        Assert.assertTrue((boolean)output.isEmpty());
    }

    @Test
    public void testNoMaxWatermarkOnImmediateStop() throws Exception {
        ArrayList<StreamElement> output = new ArrayList<StreamElement>();
        StoppableStreamSource operator = new StoppableStreamSource(new InfiniteSource());
        StreamSourceOperatorTest.setupSourceOperator(operator, TimeCharacteristic.EventTime, 0L, 0L);
        operator.stop();
        operator.run(new Object(), (StreamStatusMaintainer)Mockito.mock(StreamStatusMaintainer.class), new CollectorOutput(output));
        Assert.assertTrue((boolean)output.isEmpty());
    }

    @Test
    public void testNoMaxWatermarkOnAsyncStop() throws Exception {
        ArrayList<StreamElement> output = new ArrayList<StreamElement>();
        final StoppableStreamSource operator = new StoppableStreamSource(new InfiniteSource());
        StreamSourceOperatorTest.setupSourceOperator(operator, TimeCharacteristic.EventTime, 0L, 0L);
        new Thread("canceler"){

            @Override
            public void run() {
                try {
                    Thread.sleep(200L);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                operator.stop();
            }
        }.start();
        operator.run(new Object(), (StreamStatusMaintainer)Mockito.mock(StreamStatusMaintainer.class), new CollectorOutput(output));
        Assert.assertTrue((boolean)output.isEmpty());
    }

    @Test
    public void testLatencyMarkEmission() throws Exception {
        int i;
        ArrayList<StreamElement> output = new ArrayList<StreamElement>();
        long maxProcessingTime = 100L;
        long latencyMarkInterval = 10L;
        TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
        testProcessingTimeService.setCurrentTime(0L);
        List<Long> processingTimes = Arrays.asList(1L, 10L, 11L, 21L, 100L);
        StreamSource operator = new StreamSource((SourceFunction)new ProcessingTimeServiceSource(testProcessingTimeService, processingTimes));
        StreamSourceOperatorTest.setupSourceOperator(operator, TimeCharacteristic.EventTime, 0L, 10L, (ProcessingTimeService)testProcessingTimeService);
        operator.run(new Object(), (StreamStatusMaintainer)Mockito.mock(StreamStatusMaintainer.class), new CollectorOutput(output));
        int numberLatencyMarkers = 11;
        Assert.assertEquals((long)(numberLatencyMarkers + 1), (long)output.size());
        long timestamp = 0L;
        for (i = 0; i < output.size() - 1; ++i) {
            StreamElement se = (StreamElement)output.get(i);
            Assert.assertTrue((boolean)se.isLatencyMarker());
            Assert.assertEquals((long)-1L, (long)se.asLatencyMarker().getVertexID());
            Assert.assertEquals((long)0L, (long)se.asLatencyMarker().getSubtaskIndex());
            Assert.assertTrue((se.asLatencyMarker().getMarkedTime() == timestamp ? 1 : 0) != 0);
            timestamp += 10L;
        }
        Assert.assertTrue((boolean)((StreamElement)output.get(i)).isWatermark());
    }

    @Test
    public void testAutomaticWatermarkContext() throws Exception {
        StoppableStreamSource operator = new StoppableStreamSource(new InfiniteSource());
        long watermarkInterval = 10L;
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        processingTimeService.setCurrentTime(0L);
        StreamSourceOperatorTest.setupSourceOperator(operator, TimeCharacteristic.IngestionTime, watermarkInterval, 0L, (ProcessingTimeService)processingTimeService);
        ArrayList<StreamElement> output = new ArrayList<StreamElement>();
        StreamSourceContexts.getSourceContext((TimeCharacteristic)TimeCharacteristic.IngestionTime, (ProcessingTimeService)operator.getContainingTask().getProcessingTimeService(), (Object)operator.getContainingTask().getCheckpointLock(), (StreamStatusMaintainer)operator.getContainingTask().getStreamStatusMaintainer(), new CollectorOutput(output), (long)operator.getExecutionConfig().getAutoWatermarkInterval(), (long)-1L);
        for (long i = 1L; i < 100L; i += watermarkInterval) {
            processingTimeService.setCurrentTime(i);
        }
        Assert.assertTrue((output.size() == 9 ? 1 : 0) != 0);
        long nextWatermark = 0L;
        for (StreamElement el : output) {
            Watermark wm = (Watermark)el;
            Assert.assertTrue((wm.getTimestamp() == (nextWatermark += watermarkInterval) ? 1 : 0) != 0);
        }
    }

    private static <T> void setupSourceOperator(StreamSource<T, ?> operator, TimeCharacteristic timeChar, long watermarkInterval, long latencyMarkInterval) {
        StreamSourceOperatorTest.setupSourceOperator(operator, timeChar, watermarkInterval, latencyMarkInterval, (ProcessingTimeService)new TestProcessingTimeService());
    }

    private static <T> void setupSourceOperator(StreamSource<T, ?> operator, TimeCharacteristic timeChar, long watermarkInterval, long latencyMarkInterval, final ProcessingTimeService timeProvider) {
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setAutoWatermarkInterval(watermarkInterval);
        executionConfig.setLatencyTrackingInterval(latencyMarkInterval);
        StreamConfig cfg = new StreamConfig(new Configuration());
        cfg.setStateBackend((AbstractStateBackend)new MemoryStateBackend());
        cfg.setTimeCharacteristic(timeChar);
        DummyEnvironment env = new DummyEnvironment("MockTwoInputTask", 1, 0);
        StreamStatusMaintainer streamStatusMaintainer = (StreamStatusMaintainer)Mockito.mock(StreamStatusMaintainer.class);
        Mockito.when((Object)streamStatusMaintainer.getStreamStatus()).thenReturn((Object)StreamStatus.ACTIVE);
        StreamTask mockTask = (StreamTask)Mockito.mock(StreamTask.class);
        Mockito.when((Object)mockTask.getName()).thenReturn((Object)"Mock Task");
        Mockito.when((Object)mockTask.getCheckpointLock()).thenReturn(new Object());
        Mockito.when((Object)mockTask.getConfiguration()).thenReturn((Object)cfg);
        Mockito.when((Object)mockTask.getEnvironment()).thenReturn((Object)env);
        Mockito.when((Object)mockTask.getExecutionConfig()).thenReturn((Object)executionConfig);
        Mockito.when((Object)mockTask.getAccumulatorMap()).thenReturn(Collections.emptyMap());
        Mockito.when((Object)mockTask.getStreamStatusMaintainer()).thenReturn((Object)streamStatusMaintainer);
        ((StreamTask)Mockito.doAnswer((Answer)new Answer<ProcessingTimeService>(){

            public ProcessingTimeService answer(InvocationOnMock invocation) throws Throwable {
                if (timeProvider == null) {
                    throw new RuntimeException("The time provider is null.");
                }
                return timeProvider;
            }
        }).when((Object)mockTask)).getProcessingTimeService();
        operator.setup(mockTask, cfg, (Output)Mockito.mock(Output.class));
    }

    private static final class ProcessingTimeServiceSource
    implements SourceFunction<Long> {
        private final TestProcessingTimeService processingTimeService;
        private final List<Long> processingTimes;
        private boolean cancelled = false;

        private ProcessingTimeServiceSource(TestProcessingTimeService processingTimeService, List<Long> processingTimes) {
            this.processingTimeService = processingTimeService;
            this.processingTimes = processingTimes;
        }

        public void run(SourceFunction.SourceContext<Long> ctx) throws Exception {
            for (Long processingTime : this.processingTimes) {
                if (this.cancelled) break;
                this.processingTimeService.setCurrentTime(processingTime.longValue());
            }
        }

        public void cancel() {
            this.cancelled = true;
        }
    }

    private static final class InfiniteSource<T>
    implements SourceFunction<T>,
    StoppableFunction {
        private volatile boolean running = true;

        private InfiniteSource() {
        }

        public void run(SourceFunction.SourceContext<T> ctx) throws Exception {
            while (this.running) {
                Thread.sleep(20L);
            }
        }

        public void cancel() {
            this.running = false;
        }

        public void stop() {
            this.running = false;
        }
    }

    private static final class FiniteSource<T>
    implements SourceFunction<T>,
    StoppableFunction {
        private FiniteSource() {
        }

        public void run(SourceFunction.SourceContext<T> ctx) {
        }

        public void cancel() {
        }

        public void stop() {
        }
    }
}

