/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.disk.iomanager;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileSegmentReader;
import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.FileSegment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.disk.iomanager.QueuingCallback;
import org.apache.flink.runtime.io.disk.iomanager.RequestDoneCallback;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.testutils.DiscardingRecycler;
import org.apache.flink.runtime.util.event.NotificationListener;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class BufferFileWriterFileSegmentReaderTest {
    private static final int BUFFER_SIZE = 32768;
    private static final BufferRecycler BUFFER_RECYCLER = new DiscardingRecycler();
    private static final Random random = new Random();
    private static final IOManager ioManager = new IOManagerAsync();
    private BufferFileWriter writer;
    private AsynchronousBufferFileSegmentReader reader;
    private LinkedBlockingQueue<FileSegment> returnedFileSegments = new LinkedBlockingQueue();

    @AfterClass
    public static void shutdown() {
        ioManager.shutdown();
    }

    @Before
    public void setUpWriterAndReader() {
        FileIOChannel.ID channel = ioManager.createChannel();
        try {
            this.writer = ioManager.createBufferFileWriter(channel);
            this.reader = (AsynchronousBufferFileSegmentReader)ioManager.createBufferFileSegmentReader(channel, (RequestDoneCallback)new QueuingCallback(this.returnedFileSegments));
        }
        catch (IOException e) {
            if (this.writer != null) {
                this.writer.deleteChannel();
            }
            if (this.reader != null) {
                this.reader.deleteChannel();
            }
            Assert.fail((String)"Failed to setup writer and reader.");
        }
    }

    @After
    public void tearDownWriterAndReader() {
        if (this.writer != null) {
            this.writer.deleteChannel();
        }
        if (this.reader != null) {
            this.reader.deleteChannel();
        }
        this.returnedFileSegments.clear();
    }

    @Test
    public void testWriteRead() throws IOException, InterruptedException {
        FileSegment fileSegment;
        int i;
        int numBuffers = 1024;
        int currentNumber = 0;
        int minBufferSize = 8192;
        for (i = 0; i < numBuffers; ++i) {
            Buffer buffer = this.createBuffer();
            int size = this.getNextMultipleOf(this.getRandomNumberInRange(8192, 32768), 4);
            buffer.setSize(size);
            currentNumber = BufferFileWriterFileSegmentReaderTest.fillBufferWithAscendingNumbers(buffer, currentNumber);
            this.writer.writeBlock((Object)buffer);
        }
        this.writer.close();
        for (i = 0; i < numBuffers; ++i) {
            Assert.assertFalse((boolean)this.reader.hasReachedEndOfFile());
            this.reader.read();
        }
        final CountDownLatch sync = new CountDownLatch(1);
        NotificationListener listener = new NotificationListener(){

            public void onNotification() {
                sync.countDown();
            }
        };
        if (this.reader.registerAllRequestsProcessedListener(listener)) {
            sync.await();
        }
        Assert.assertTrue((boolean)this.reader.hasReachedEndOfFile());
        Assert.assertEquals((String)"Read less buffers than written.", (long)numBuffers, (long)this.returnedFileSegments.size());
        currentNumber = 0;
        ByteBuffer buffer = ByteBuffer.allocate(32768);
        while ((fileSegment = this.returnedFileSegments.poll()) != null) {
            buffer.position(0);
            buffer.limit(fileSegment.getLength());
            fileSegment.getFileChannel().read(buffer, fileSegment.getPosition());
            currentNumber = this.verifyBufferFilledWithAscendingNumbers(new Buffer(MemorySegmentFactory.wrap((byte[])buffer.array()), BUFFER_RECYCLER), currentNumber, fileSegment.getLength());
        }
        this.reader.close();
    }

    private int getRandomNumberInRange(int min, int max) {
        return random.nextInt(max - min + 1) + min;
    }

    private int getNextMultipleOf(int number, int multiple) {
        int mod = number % multiple;
        if (mod == 0) {
            return number;
        }
        return number + multiple - mod;
    }

    private Buffer createBuffer() {
        return new Buffer(MemorySegmentFactory.allocateUnpooledSegment((int)32768), BUFFER_RECYCLER);
    }

    public static int fillBufferWithAscendingNumbers(Buffer buffer, int currentNumber) {
        MemorySegment segment = buffer.getMemorySegment();
        int size = buffer.getSize();
        for (int i = 0; i < size; i += 4) {
            segment.putInt(i, currentNumber++);
        }
        return currentNumber;
    }

    private int verifyBufferFilledWithAscendingNumbers(Buffer buffer, int currentNumber, int size) {
        MemorySegment segment = buffer.getMemorySegment();
        for (int i = 0; i < size; i += 4) {
            if (segment.getInt(i) == currentNumber++) continue;
            throw new IllegalStateException("Read unexpected number from buffer.");
        }
        return currentNumber;
    }
}

