package org.apache.flink.streaming.runtime.operators;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Queue;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.io.CheckpointableInputFormat;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.class */
public class ContinuousFileProcessingRescalingTest {

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest$BlockingFileInputFormat.class */
    private static class BlockingFileInputFormat extends FileInputFormat<String> implements CheckpointableInputFormat<FileInputSplit, Integer> {
        private final OneShotLatch triggerLatch;
        private final OneShotLatch waitingLatch;
        private final int elementsBeforeCheckpoint;
        private final int linesPerSplit;
        private FileInputSplit split;
        private int state;

        BlockingFileInputFormat(OneShotLatch oneShotLatch, OneShotLatch oneShotLatch2, Path path, int i, int i2) {
            super(path);
            this.triggerLatch = oneShotLatch;
            this.waitingLatch = oneShotLatch2;
            this.elementsBeforeCheckpoint = i2;
            this.linesPerSplit = i;
            this.state = 0;
        }

        /* renamed from: createInputSplits, reason: merged with bridge method [inline-methods] */
        public FileInputSplit[] m52createInputSplits(int i) throws IOException {
            FileInputSplit[] fileInputSplitArr = new FileInputSplit[i];
            for (int i2 = 0; i2 < i; i2++) {
                fileInputSplitArr[i2] = new FileInputSplit(i2, getFilePaths()[0], (i2 * this.linesPerSplit) + 1, this.linesPerSplit, (String[]) null);
            }
            return fileInputSplitArr;
        }

        public void open(FileInputSplit fileInputSplit) throws IOException {
            this.split = fileInputSplit;
            this.state = 0;
        }

        public void reopen(FileInputSplit fileInputSplit, Integer num) throws IOException {
            this.split = fileInputSplit;
            this.state = num.intValue();
        }

        /* renamed from: getCurrentState, reason: merged with bridge method [inline-methods] */
        public Integer m53getCurrentState() throws IOException {
            return Integer.valueOf(this.state);
        }

        public boolean reachedEnd() throws IOException {
            if (this.state == this.elementsBeforeCheckpoint) {
                this.triggerLatch.trigger();
                if (!this.waitingLatch.isTriggered()) {
                    try {
                        this.waitingLatch.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
            return this.state == this.linesPerSplit;
        }

        public String nextRecord(String str) throws IOException {
            if (reachedEnd()) {
                return null;
            }
            StringBuilder append = new StringBuilder().append(this.split.getSplitNumber()).append(": test line ");
            int i = this.state;
            this.state = i + 1;
            return append.append(i).toString();
        }
    }

    @Test
    public void testReaderScalingDown() throws Exception {
        OneShotLatch oneShotLatch = new OneShotLatch();
        OneShotLatch oneShotLatch2 = new OneShotLatch();
        BlockingFileInputFormat blockingFileInputFormat = new BlockingFileInputFormat(oneShotLatch2, oneShotLatch, new Path("test"), 20, 5);
        FileInputSplit[] m52createInputSplits = blockingFileInputFormat.m52createInputSplits(2);
        OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> testHarness = getTestHarness(blockingFileInputFormat, 2, 0);
        testHarness.open();
        testHarness.processElement(new StreamRecord<>(getTimestampedSplit(0L, m52createInputSplits[0])));
        if (!oneShotLatch2.isTriggered()) {
            oneShotLatch2.await();
        }
        OneShotLatch oneShotLatch3 = new OneShotLatch();
        OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> testHarness2 = getTestHarness(new BlockingFileInputFormat(oneShotLatch3, oneShotLatch, new Path("test"), 20, 15), 2, 1);
        testHarness2.open();
        testHarness2.processElement(new StreamRecord<>(getTimestampedSplit(0L, m52createInputSplits[1])));
        if (!oneShotLatch3.isTriggered()) {
            oneShotLatch3.await();
        }
        testHarness.getOutput().clear();
        testHarness2.getOutput().clear();
        OperatorSubtaskState repackageState = AbstractStreamOperatorTestHarness.repackageState(testHarness2.snapshot(0L, 0L), testHarness.snapshot(0L, 0L));
        OneShotLatch oneShotLatch4 = new OneShotLatch();
        OneShotLatch oneShotLatch5 = new OneShotLatch();
        OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> testHarness3 = getTestHarness(new BlockingFileInputFormat(oneShotLatch4, oneShotLatch5, new Path("test"), 20, 5), 1, 0);
        testHarness3.initializeState(repackageState);
        testHarness3.open();
        oneShotLatch4.trigger();
        oneShotLatch5.trigger();
        oneShotLatch.trigger();
        synchronized (testHarness.getCheckpointLock()) {
            testHarness.close();
        }
        synchronized (testHarness2.getCheckpointLock()) {
            testHarness2.close();
        }
        synchronized (testHarness3.getCheckpointLock()) {
            testHarness3.close();
        }
        ArrayDeque arrayDeque = new ArrayDeque();
        putElementsInQ(arrayDeque, testHarness.getOutput());
        putElementsInQ(arrayDeque, testHarness2.getOutput());
        ArrayDeque arrayDeque2 = new ArrayDeque();
        putElementsInQ(arrayDeque2, testHarness3.getOutput());
        Assert.assertEquals(20L, arrayDeque2.size());
        Assert.assertArrayEquals(arrayDeque.toArray(), arrayDeque2.toArray());
    }

    @Test
    public void testReaderScalingUp() throws Exception {
        OneShotLatch oneShotLatch = new OneShotLatch();
        OneShotLatch oneShotLatch2 = new OneShotLatch();
        BlockingFileInputFormat blockingFileInputFormat = new BlockingFileInputFormat(oneShotLatch2, oneShotLatch, new Path("test"), 20, 5);
        FileInputSplit[] m52createInputSplits = blockingFileInputFormat.m52createInputSplits(2);
        OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> testHarness = getTestHarness(blockingFileInputFormat, 1, 0);
        testHarness.open();
        testHarness.processElement(new StreamRecord<>(getTimestampedSplit(0L, m52createInputSplits[0])));
        testHarness.processElement(new StreamRecord<>(getTimestampedSplit(1L, m52createInputSplits[1])));
        if (!oneShotLatch2.isTriggered()) {
            oneShotLatch2.await();
        }
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.getOutput().clear();
        oneShotLatch.trigger();
        OneShotLatch oneShotLatch3 = new OneShotLatch();
        OneShotLatch oneShotLatch4 = new OneShotLatch();
        OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> testHarness2 = getTestHarness(new BlockingFileInputFormat(oneShotLatch3, oneShotLatch4, new Path("test"), 20, 15), 2, 0);
        testHarness2.setup();
        testHarness2.initializeState(snapshot);
        testHarness2.open();
        OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> testHarness3 = getTestHarness(new BlockingFileInputFormat(oneShotLatch3, oneShotLatch4, new Path("test"), 20, 15), 2, 1);
        testHarness3.setup();
        testHarness3.initializeState(snapshot);
        testHarness3.open();
        oneShotLatch3.trigger();
        oneShotLatch4.trigger();
        synchronized (testHarness.getCheckpointLock()) {
            testHarness.close();
        }
        synchronized (testHarness2.getCheckpointLock()) {
            testHarness2.close();
        }
        synchronized (testHarness3.getCheckpointLock()) {
            testHarness3.close();
        }
        ArrayDeque arrayDeque = new ArrayDeque();
        putElementsInQ(arrayDeque, testHarness.getOutput());
        ArrayDeque arrayDeque2 = new ArrayDeque();
        putElementsInQ(arrayDeque2, testHarness2.getOutput());
        putElementsInQ(arrayDeque2, testHarness3.getOutput());
        Assert.assertEquals(35L, arrayDeque2.size());
        Assert.assertArrayEquals(arrayDeque.toArray(), arrayDeque2.toArray());
    }

    private void putElementsInQ(Queue<Object> queue, Queue<Object> queue2) {
        for (Object obj : queue2) {
            if (!(obj instanceof Watermark)) {
                queue.add(obj);
            }
        }
    }

    private OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> getTestHarness(BlockingFileInputFormat blockingFileInputFormat, int i, int i2) throws Exception {
        ContinuousFileReaderOperator continuousFileReaderOperator = new ContinuousFileReaderOperator(blockingFileInputFormat);
        continuousFileReaderOperator.setOutputType(TypeExtractor.getInputFormatTypes(blockingFileInputFormat), new ExecutionConfig());
        OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness<>(continuousFileReaderOperator, 10, i, i2);
        oneInputStreamOperatorTestHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
        return oneInputStreamOperatorTestHarness;
    }

    private TimestampedFileInputSplit getTimestampedSplit(long j, FileInputSplit fileInputSplit) {
        Preconditions.checkNotNull(fileInputSplit);
        return new TimestampedFileInputSplit(j, fileInputSplit.getSplitNumber(), fileInputSplit.getPath(), fileInputSplit.getStart(), fileInputSplit.getLength(), fileInputSplit.getHostnames());
    }
}
