/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.io.File;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Random;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.streaming.runtime.io.BufferSpiller;
import org.apache.flink.streaming.runtime.io.TestEvent;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BufferSpillerTest {
    private static final Logger LOG = LoggerFactory.getLogger(BufferSpillerTest.class);
    private static final int PAGE_SIZE = 4096;
    private static IOManager ioManager;
    private BufferSpiller spiller;

    @BeforeClass
    public static void setupIOManager() {
        ioManager = new IOManagerAsync();
    }

    @AfterClass
    public static void shutdownIOManager() {
        ioManager.shutdown();
    }

    @Before
    public void createSpiller() {
        try {
            this.spiller = new BufferSpiller(ioManager, 4096);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)("Cannot create BufferSpiller: " + e.getMessage()));
        }
    }

    @After
    public void cleanupSpiller() {
        if (this.spiller != null) {
            try {
                this.spiller.close();
            }
            catch (Exception e) {
                e.printStackTrace();
                Assert.fail((String)("Cannot properly close the BufferSpiller: " + e.getMessage()));
            }
            Assert.assertFalse((boolean)this.spiller.getCurrentChannel().isOpen());
            Assert.assertFalse((boolean)this.spiller.getCurrentSpillFile().exists());
        }
        BufferSpillerTest.checkNoTempFilesRemain();
    }

    @Test
    public void testRollOverEmptySequences() {
        try {
            Assert.assertNull((Object)this.spiller.rollOver());
            Assert.assertNull((Object)this.spiller.rollOver());
            Assert.assertNull((Object)this.spiller.rollOver());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testSpillAndRollOverSimple() {
        try {
            Random rnd = new Random();
            Random bufferRnd = new Random();
            int maxNumEventsAndBuffers = 3000;
            int maxNumChannels = 1656;
            for (int round = 0; round < 5; ++round) {
                long bufferSeed = rnd.nextLong();
                bufferRnd.setSeed(bufferSeed);
                int numEventsAndBuffers = rnd.nextInt(3000) + 1;
                int numChannels = rnd.nextInt(1656) + 1;
                ArrayList<BufferOrEvent> events = new ArrayList<BufferOrEvent>(128);
                for (int i = 0; i < numEventsAndBuffers; ++i) {
                    BufferOrEvent evt;
                    boolean isEvent;
                    boolean bl = isEvent = rnd.nextDouble() < 0.05;
                    if (isEvent) {
                        evt = BufferSpillerTest.generateRandomEvent(rnd, numChannels);
                        events.add(evt);
                        this.spiller.add(evt);
                        continue;
                    }
                    evt = BufferSpillerTest.generateRandomBuffer(bufferRnd.nextInt(4096) + 1, bufferRnd.nextInt(numChannels));
                    this.spiller.add(evt);
                }
                bufferRnd.setSeed(bufferSeed);
                BufferSpiller.SpilledBufferOrEventSequence seq = this.spiller.rollOver();
                seq.open();
                int numEvent = 0;
                for (int i = 0; i < numEventsAndBuffers; ++i) {
                    BufferOrEvent next = seq.getNext();
                    Assert.assertNotNull((Object)next);
                    if (next.isEvent()) {
                        BufferOrEvent expected = (BufferOrEvent)events.get(numEvent++);
                        Assert.assertEquals((Object)expected.getEvent(), (Object)next.getEvent());
                        Assert.assertEquals((long)expected.getChannelIndex(), (long)next.getChannelIndex());
                        continue;
                    }
                    BufferSpillerTest.validateBuffer(next, bufferRnd.nextInt(4096) + 1, bufferRnd.nextInt(numChannels));
                }
                Assert.assertNull((Object)seq.getNext());
                Assert.assertEquals((long)events.size(), (long)numEvent);
                seq.cleanup();
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testSpillWhileReading() {
        LOG.info("Starting SpillWhileReading test");
        try {
            int sequences = 10;
            Random rnd = new Random();
            int maxNumEventsAndBuffers = 30000;
            int maxNumChannels = 1656;
            int sequencesConsumed = 0;
            ArrayDeque<SequenceToConsume> pendingSequences = new ArrayDeque<SequenceToConsume>();
            SequenceToConsume currentSequence = null;
            int currentNumEvents = 0;
            int currentNumRecordAndEvents = 0;
            for (int round = 0; round < 20; ++round) {
                if (round % 2 == 1) {
                    Assert.assertNull((Object)this.spiller.rollOver());
                    continue;
                }
                long bufferSeed = rnd.nextLong();
                Random bufferRnd = new Random(bufferSeed);
                int numEventsAndBuffers = rnd.nextInt(30000) + 1;
                int numChannels = rnd.nextInt(1656) + 1;
                ArrayList<BufferOrEvent> events = new ArrayList<BufferOrEvent>(128);
                int generated = 0;
                while (generated < numEventsAndBuffers) {
                    if (currentSequence == null || rnd.nextDouble() < 0.5) {
                        BufferOrEvent evt;
                        boolean isEvent;
                        boolean bl = isEvent = rnd.nextDouble() < 0.05;
                        if (isEvent) {
                            evt = BufferSpillerTest.generateRandomEvent(rnd, numChannels);
                            events.add(evt);
                            this.spiller.add(evt);
                        } else {
                            evt = BufferSpillerTest.generateRandomBuffer(bufferRnd.nextInt(4096) + 1, bufferRnd.nextInt(numChannels));
                            this.spiller.add(evt);
                        }
                        ++generated;
                        continue;
                    }
                    BufferOrEvent next = currentSequence.sequence.getNext();
                    Assert.assertNotNull((Object)next);
                    if (next.isEvent()) {
                        BufferOrEvent expected = currentSequence.events.get(currentNumEvents++);
                        Assert.assertEquals((Object)expected.getEvent(), (Object)next.getEvent());
                        Assert.assertEquals((long)expected.getChannelIndex(), (long)next.getChannelIndex());
                    } else {
                        Random validationRnd = currentSequence.bufferRnd;
                        BufferSpillerTest.validateBuffer(next, validationRnd.nextInt(4096) + 1, validationRnd.nextInt(currentSequence.numChannels));
                    }
                    if (++currentNumRecordAndEvents != currentSequence.numBuffersAndEvents) continue;
                    currentSequence.sequence.cleanup();
                    ++sequencesConsumed;
                    Assert.assertEquals((long)currentSequence.events.size(), (long)currentNumEvents);
                    currentSequence = (SequenceToConsume)pendingSequences.pollFirst();
                    if (currentSequence != null) {
                        currentSequence.sequence.open();
                    }
                    currentNumRecordAndEvents = 0;
                    currentNumEvents = 0;
                }
                bufferRnd.setSeed(bufferSeed);
                BufferSpiller.SpilledBufferOrEventSequence seq = this.spiller.rollOver();
                SequenceToConsume stc = new SequenceToConsume(bufferRnd, events, seq, numEventsAndBuffers, numChannels);
                if (currentSequence == null) {
                    currentSequence = stc;
                    stc.sequence.open();
                    continue;
                }
                pendingSequences.addLast(stc);
            }
            while (currentSequence != null) {
                BufferOrEvent next = currentSequence.sequence.getNext();
                Assert.assertNotNull((Object)next);
                if (next.isEvent()) {
                    BufferOrEvent expected = currentSequence.events.get(currentNumEvents++);
                    Assert.assertEquals((Object)expected.getEvent(), (Object)next.getEvent());
                    Assert.assertEquals((long)expected.getChannelIndex(), (long)next.getChannelIndex());
                } else {
                    Random validationRnd = currentSequence.bufferRnd;
                    BufferSpillerTest.validateBuffer(next, validationRnd.nextInt(4096) + 1, validationRnd.nextInt(currentSequence.numChannels));
                }
                if (++currentNumRecordAndEvents != currentSequence.numBuffersAndEvents) continue;
                currentSequence.sequence.cleanup();
                ++sequencesConsumed;
                Assert.assertEquals((long)currentSequence.events.size(), (long)currentNumEvents);
                currentSequence = (SequenceToConsume)pendingSequences.pollFirst();
                if (currentSequence != null) {
                    currentSequence.sequence.open();
                }
                currentNumRecordAndEvents = 0;
                currentNumEvents = 0;
            }
            Assert.assertEquals((long)10L, (long)sequencesConsumed);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testHeaderSizeStaticField() throws Exception {
        int size = 13;
        BufferOrEvent boe = BufferSpillerTest.generateRandomBuffer(size, 0);
        this.spiller.add(boe);
        Assert.assertEquals((String)"Changed the header format, but did not adjust the HEADER_SIZE field", (long)(9 + size), (long)this.spiller.getBytesWritten());
    }

    private static BufferOrEvent generateRandomEvent(Random rnd, int numChannels) {
        long magicNumber = rnd.nextLong();
        byte[] data = new byte[rnd.nextInt(1000)];
        rnd.nextBytes(data);
        TestEvent evt = new TestEvent(magicNumber, data);
        int channelIndex = rnd.nextInt(numChannels);
        return new BufferOrEvent((AbstractEvent)evt, channelIndex);
    }

    private static BufferOrEvent generateRandomBuffer(int size, int channelIndex) {
        MemorySegment seg = MemorySegmentFactory.allocateUnpooledSegment((int)4096);
        for (int i = 0; i < size; ++i) {
            seg.put(i, (byte)i);
        }
        Buffer buf = new Buffer(seg, FreeingBufferRecycler.INSTANCE);
        buf.setSize(size);
        return new BufferOrEvent(buf, channelIndex);
    }

    private static void validateBuffer(BufferOrEvent boe, int expectedSize, int expectedChannelIndex) {
        Assert.assertEquals((String)"wrong channel index", (long)expectedChannelIndex, (long)boe.getChannelIndex());
        Assert.assertTrue((String)"is not buffer", (boolean)boe.isBuffer());
        Buffer buf = boe.getBuffer();
        Assert.assertEquals((String)"wrong buffer size", (long)expectedSize, (long)buf.getSize());
        MemorySegment seg = buf.getMemorySegment();
        for (int i = 0; i < expectedSize; ++i) {
            byte expected = (byte)i;
            if (expected == seg.get(i)) continue;
            Assert.fail((String)String.format("wrong buffer contents at position %s : expected=%d , found=%d", i, expected, seg.get(i)));
        }
    }

    private static void checkNoTempFilesRemain() {
        for (File dir : ioManager.getSpillingDirectories()) {
            for (String file : dir.list()) {
                if (file == null || file.equals(".") || file.equals("..")) continue;
                Assert.fail((String)("barrier buffer did not clean up temp files. remaining file: " + file));
            }
        }
    }

    private static class SequenceToConsume {
        final BufferSpiller.SpilledBufferOrEventSequence sequence;
        final ArrayList<BufferOrEvent> events;
        final Random bufferRnd;
        final int numBuffersAndEvents;
        final int numChannels;

        private SequenceToConsume(Random bufferRnd, ArrayList<BufferOrEvent> events, BufferSpiller.SpilledBufferOrEventSequence sequence, int numBuffersAndEvents, int numChannels) {
            this.bufferRnd = bufferRnd;
            this.events = events;
            this.sequence = sequence;
            this.numBuffersAndEvents = numBuffersAndEvents;
            this.numChannels = numChannels;
        }
    }
}

