/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.disk.iomanager;

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader;
import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.disk.iomanager.QueuingCallback;
import org.apache.flink.runtime.io.disk.iomanager.RequestDoneCallback;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.util.Preconditions;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class BufferFileWriterReaderTest {
    private static final int BUFFER_SIZE = 32768;
    private static final BufferRecycler BUFFER_RECYCLER = FreeingBufferRecycler.INSTANCE;
    private static final Random random = new Random();
    private static final IOManager ioManager = new IOManagerAsync();
    private BufferFileWriter writer;
    private BufferFileReader reader;
    private LinkedBlockingQueue<Buffer> returnedBuffers = new LinkedBlockingQueue();

    @AfterClass
    public static void shutdown() {
        ioManager.shutdown();
    }

    @Before
    public void setUpWriterAndReader() {
        FileIOChannel.ID channel = ioManager.createChannel();
        try {
            this.writer = ioManager.createBufferFileWriter(channel);
            this.reader = ioManager.createBufferFileReader(channel, (RequestDoneCallback)new QueuingCallback(this.returnedBuffers));
        }
        catch (IOException e) {
            if (this.writer != null) {
                this.writer.deleteChannel();
            }
            if (this.reader != null) {
                this.reader.deleteChannel();
            }
            Assert.fail((String)"Failed to setup writer and reader.");
        }
    }

    @After
    public void tearDownWriterAndReader() {
        if (this.writer != null) {
            this.writer.deleteChannel();
        }
        if (this.reader != null) {
            this.reader.deleteChannel();
        }
        this.returnedBuffers.clear();
    }

    @Test
    public void testWriteRead() throws IOException {
        Buffer buffer;
        int i;
        int numBuffers = 1024;
        int currentNumber = 0;
        int minBufferSize = 8192;
        for (i = 0; i < numBuffers; ++i) {
            Buffer buffer2 = this.createBuffer();
            int size = this.getNextMultipleOf(this.getRandomNumberInRange(8192, 32768), 4);
            currentNumber = BufferFileWriterReaderTest.fillBufferWithAscendingNumbers(buffer2, currentNumber, size);
            this.writer.writeBlock((Object)buffer2);
        }
        this.writer.close();
        for (i = 0; i < numBuffers; ++i) {
            Assert.assertFalse((boolean)this.reader.hasReachedEndOfFile());
            this.reader.readInto(this.createBuffer());
        }
        this.reader.close();
        Assert.assertTrue((boolean)this.reader.hasReachedEndOfFile());
        Assert.assertEquals((String)"Read less buffers than written.", (long)numBuffers, (long)this.returnedBuffers.size());
        currentNumber = 0;
        while ((buffer = this.returnedBuffers.poll()) != null) {
            currentNumber = BufferFileWriterReaderTest.verifyBufferFilledWithAscendingNumbers(buffer, currentNumber);
        }
    }

    @Test
    public void testWriteSkipRead() throws IOException {
        Buffer buffer;
        int numBuffers = 1024;
        int currentNumber = 0;
        for (int i = 0; i < numBuffers; ++i) {
            Buffer buffer2 = this.createBuffer();
            currentNumber = BufferFileWriterReaderTest.fillBufferWithAscendingNumbers(buffer2, currentNumber, buffer2.getMaxCapacity());
            this.writer.writeBlock((Object)buffer2);
        }
        this.writer.close();
        int toSkip = 32;
        this.reader.seekToPosition(0x100100L);
        numBuffers -= 32;
        for (int i = 0; i < numBuffers; ++i) {
            Assert.assertFalse((boolean)this.reader.hasReachedEndOfFile());
            this.reader.readInto(this.createBuffer());
        }
        this.reader.close();
        Assert.assertTrue((boolean)this.reader.hasReachedEndOfFile());
        Assert.assertEquals((String)"Read less buffers than written.", (long)numBuffers, (long)this.returnedBuffers.size());
        currentNumber = 262144;
        while ((buffer = this.returnedBuffers.poll()) != null) {
            currentNumber = BufferFileWriterReaderTest.verifyBufferFilledWithAscendingNumbers(buffer, currentNumber);
        }
    }

    private int getRandomNumberInRange(int min, int max) {
        return random.nextInt(max - min + 1) + min;
    }

    private int getNextMultipleOf(int number, int multiple) {
        int mod = number % multiple;
        if (mod == 0) {
            return number;
        }
        return number + multiple - mod;
    }

    private Buffer createBuffer() {
        return new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment((int)32768), BUFFER_RECYCLER);
    }

    static int fillBufferWithAscendingNumbers(Buffer buffer, int currentNumber, int size) {
        Preconditions.checkArgument((size % 4 == 0 ? 1 : 0) != 0);
        MemorySegment segment = buffer.getMemorySegment();
        for (int i = 0; i < size; i += 4) {
            segment.putInt(i, currentNumber++);
        }
        buffer.setSize(size);
        return currentNumber;
    }

    static int verifyBufferFilledWithAscendingNumbers(Buffer buffer, int currentNumber) {
        MemorySegment segment = buffer.getMemorySegment();
        int size = buffer.getSize();
        for (int i = 0; i < size; i += 4) {
            if (segment.getInt(i) == currentNumber++) continue;
            throw new IllegalStateException("Read unexpected number from buffer.");
        }
        return currentNumber;
    }
}

