/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter;
import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsyncWithNoOpBufferFileWriter;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.SpillableSubpartition;
import org.apache.flink.runtime.io.network.partition.SpillableSubpartitionView;
import org.apache.flink.runtime.io.network.partition.SpilledSubpartitionView;
import org.apache.flink.runtime.io.network.partition.SubpartitionTestBase;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class SpillableSubpartitionTest
extends SubpartitionTestBase {
    @Rule
    public ExpectedException exception = ExpectedException.none();
    private static final ExecutorService executorService = Executors.newCachedThreadPool();
    private static final IOManager ioManager = new IOManagerAsync();

    @AfterClass
    public static void shutdown() {
        executorService.shutdownNow();
        ioManager.shutdown();
    }

    SpillableSubpartition createSubpartition() {
        return SpillableSubpartitionTest.createSubpartition(ioManager);
    }

    private static SpillableSubpartition createSubpartition(IOManager ioManager) {
        ResultPartition parent = (ResultPartition)Mockito.mock(ResultPartition.class);
        BufferProvider bufferProvider = (BufferProvider)Mockito.mock(BufferProvider.class);
        Mockito.when((Object)parent.getBufferProvider()).thenReturn((Object)bufferProvider);
        Mockito.when((Object)bufferProvider.getMemorySegmentSize()).thenReturn((Object)32768);
        return new SpillableSubpartition(0, parent, ioManager);
    }

    @Test
    public void testConcurrentFinishAndReleaseMemory() throws Exception {
        final CountDownLatch doneLatch = new CountDownLatch(1);
        final CountDownLatch blockLatch = new CountDownLatch(1);
        AsynchronousBufferFileWriter spillWriter = (AsynchronousBufferFileWriter)Mockito.mock(AsynchronousBufferFileWriter.class);
        ((AsynchronousBufferFileWriter)Mockito.doAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock invocation) throws Throwable {
                blockLatch.countDown();
                doneLatch.await();
                return null;
            }
        }).when((Object)spillWriter)).close();
        IOManager ioManager = (IOManager)Mockito.mock(IOManager.class);
        Mockito.when((Object)ioManager.createBufferFileWriter((FileIOChannel.ID)Matchers.any(FileIOChannel.ID.class))).thenReturn((Object)spillWriter);
        final SpillableSubpartition partition = new SpillableSubpartition(0, (ResultPartition)Mockito.mock(ResultPartition.class), ioManager);
        Assert.assertEquals((long)0L, (long)partition.releaseMemory());
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<Void> blockingFinish = executor.submit(new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                partition.finish();
                return null;
            }
        });
        blockLatch.await();
        partition.releaseMemory();
        doneLatch.countDown();
        blockingFinish.get();
    }

    @Test
    public void testReleasePartitionAndGetNext() throws Exception {
        SpillableSubpartition partition = this.createSubpartition();
        partition.finish();
        ResultSubpartitionView readView = (ResultSubpartitionView)Mockito.spy((Object)partition.createReadView(new BufferAvailabilityListener(){

            public void notifyBuffersAvailable(long numBuffers) {
            }
        }));
        ((ResultSubpartitionView)Mockito.doNothing().when((Object)readView)).releaseAllResources();
        partition.release();
        Assert.assertNull((Object)readView.getNextBuffer());
    }

    @Test
    public void testConsumeSpilledPartition() throws Exception {
        SpillableSubpartition partition = this.createSubpartition();
        Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment((int)4096), FreeingBufferRecycler.INSTANCE);
        buffer.retain();
        buffer.retain();
        partition.add(buffer);
        partition.add(buffer);
        partition.add(buffer);
        Assert.assertEquals((long)3L, (long)partition.getTotalNumberOfBuffers());
        Assert.assertEquals((long)12288L, (long)partition.getTotalNumberOfBytes());
        Assert.assertFalse((boolean)buffer.isRecycled());
        Assert.assertEquals((long)3L, (long)partition.releaseMemory());
        Assert.assertEquals((long)3L, (long)partition.getTotalNumberOfBuffers());
        Assert.assertEquals((long)12288L, (long)partition.getTotalNumberOfBytes());
        partition.finish();
        Assert.assertEquals((long)4L, (long)partition.getTotalNumberOfBuffers());
        Assert.assertEquals((long)12292L, (long)partition.getTotalNumberOfBytes());
        BufferAvailabilityListener listener = (BufferAvailabilityListener)Mockito.spy((Object)new AwaitableBufferAvailablityListener());
        SpilledSubpartitionView reader = (SpilledSubpartitionView)partition.createReadView(listener);
        ((BufferAvailabilityListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.times((int)1))).notifyBuffersAvailable(Matchers.eq((long)4L));
        Buffer read = reader.getNextBuffer();
        Assert.assertNotNull((Object)read);
        Assert.assertNotSame((Object)buffer, (Object)read);
        Assert.assertFalse((boolean)read.isRecycled());
        read.recycle();
        Assert.assertTrue((boolean)read.isRecycled());
        read = reader.getNextBuffer();
        Assert.assertNotNull((Object)read);
        Assert.assertNotSame((Object)buffer, (Object)read);
        Assert.assertFalse((boolean)read.isRecycled());
        read.recycle();
        Assert.assertTrue((boolean)read.isRecycled());
        read = reader.getNextBuffer();
        Assert.assertNotNull((Object)read);
        Assert.assertNotSame((Object)buffer, (Object)read);
        Assert.assertFalse((boolean)read.isRecycled());
        read.recycle();
        Assert.assertTrue((boolean)read.isRecycled());
        read = reader.getNextBuffer();
        Assert.assertNotNull((Object)read);
        Assert.assertEquals(EndOfPartitionEvent.class, EventSerializer.fromBuffer((Buffer)read, (ClassLoader)ClassLoader.getSystemClassLoader()).getClass());
        Assert.assertFalse((boolean)read.isRecycled());
        read.recycle();
        Assert.assertTrue((boolean)read.isRecycled());
        long deadline = System.currentTimeMillis() + 30000L;
        while (!buffer.isRecycled() && System.currentTimeMillis() < deadline) {
            Thread.sleep(1L);
        }
        Assert.assertTrue((boolean)buffer.isRecycled());
    }

    @Test
    public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception {
        SpillableSubpartition partition = this.createSubpartition();
        Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment((int)4096), FreeingBufferRecycler.INSTANCE);
        buffer.retain();
        buffer.retain();
        partition.add(buffer);
        partition.add(buffer);
        partition.add(buffer);
        partition.finish();
        Assert.assertEquals((long)4L, (long)partition.getTotalNumberOfBuffers());
        Assert.assertEquals((long)12292L, (long)partition.getTotalNumberOfBytes());
        AwaitableBufferAvailablityListener listener = new AwaitableBufferAvailablityListener();
        SpillableSubpartitionView reader = (SpillableSubpartitionView)partition.createReadView((BufferAvailabilityListener)listener);
        Assert.assertEquals((long)1L, (long)listener.getNumNotifiedBuffers());
        Assert.assertFalse((boolean)buffer.isRecycled());
        Buffer read = reader.getNextBuffer();
        Assert.assertSame((Object)buffer, (Object)read);
        read.recycle();
        Assert.assertEquals((long)2L, (long)listener.getNumNotifiedBuffers());
        Assert.assertFalse((boolean)buffer.isRecycled());
        Assert.assertEquals((long)2L, (long)partition.releaseMemory());
        Assert.assertFalse((boolean)buffer.isRecycled());
        Assert.assertEquals((long)4L, (long)partition.getTotalNumberOfBuffers());
        Assert.assertEquals((long)12292L, (long)partition.getTotalNumberOfBytes());
        listener.awaitNotifications(4L, 30000L);
        Assert.assertEquals((long)4L, (long)listener.getNumNotifiedBuffers());
        read = reader.getNextBuffer();
        Assert.assertSame((Object)buffer, (Object)read);
        read.recycle();
        read = reader.getNextBuffer();
        Assert.assertNotNull((Object)read);
        Assert.assertNotSame((Object)buffer, (Object)read);
        Assert.assertFalse((boolean)read.isRecycled());
        read.recycle();
        Assert.assertTrue((boolean)read.isRecycled());
        read = reader.getNextBuffer();
        Assert.assertNotNull((Object)read);
        Assert.assertEquals(EndOfPartitionEvent.class, EventSerializer.fromBuffer((Buffer)read, (ClassLoader)ClassLoader.getSystemClassLoader()).getClass());
        Assert.assertFalse((boolean)read.isRecycled());
        read.recycle();
        Assert.assertTrue((boolean)read.isRecycled());
        long deadline = System.currentTimeMillis() + 30000L;
        while (!buffer.isRecycled() && System.currentTimeMillis() < deadline) {
            Thread.sleep(1L);
        }
        Assert.assertTrue((boolean)buffer.isRecycled());
    }

    @Test
    public void testAddOnFinishedSpillablePartition() throws Exception {
        this.testAddOnFinishedPartition(false);
    }

    @Test
    public void testAddOnFinishedSpilledPartition() throws Exception {
        this.testAddOnFinishedPartition(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testAddOnFinishedPartition(boolean spilled) throws Exception {
        SpillableSubpartition partition = this.createSubpartition();
        if (spilled) {
            Assert.assertEquals((long)0L, (long)partition.releaseMemory());
        }
        partition.finish();
        Assert.assertEquals((long)1L, (long)partition.getTotalNumberOfBuffers());
        Assert.assertEquals((long)4L, (long)partition.getTotalNumberOfBytes());
        Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment((int)4096), FreeingBufferRecycler.INSTANCE);
        try {
            partition.add(buffer);
        }
        finally {
            if (!buffer.isRecycled()) {
                buffer.recycle();
                Assert.fail((String)"buffer not recycled");
            }
            Assert.assertEquals((long)1L, (long)partition.getTotalNumberOfBuffers());
            Assert.assertEquals((long)4L, (long)partition.getTotalNumberOfBytes());
        }
    }

    @Test
    public void testAddOnReleasedSpillablePartition() throws Exception {
        this.testAddOnReleasedPartition(false);
    }

    @Test
    public void testAddOnReleasedSpilledPartition() throws Exception {
        this.testAddOnReleasedPartition(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testAddOnReleasedPartition(boolean spilled) throws Exception {
        SpillableSubpartition partition = this.createSubpartition();
        partition.release();
        if (spilled) {
            Assert.assertEquals((long)0L, (long)partition.releaseMemory());
        }
        Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment((int)4096), FreeingBufferRecycler.INSTANCE);
        try {
            partition.add(buffer);
        }
        finally {
            if (!buffer.isRecycled()) {
                buffer.recycle();
                Assert.fail((String)"buffer not recycled");
            }
            Assert.assertEquals((long)0L, (long)partition.getTotalNumberOfBuffers());
            Assert.assertEquals((long)0L, (long)partition.getTotalNumberOfBytes());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAddOnSpilledPartitionWithSlowWriter() throws Exception {
        IOManagerAsyncWithNoOpBufferFileWriter ioManager = new IOManagerAsyncWithNoOpBufferFileWriter();
        SpillableSubpartition partition = SpillableSubpartitionTest.createSubpartition((IOManager)ioManager);
        Assert.assertEquals((long)0L, (long)partition.releaseMemory());
        Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment((int)4096), FreeingBufferRecycler.INSTANCE);
        try {
            partition.add(buffer);
        }
        finally {
            ioManager.shutdown();
            if (buffer.isRecycled()) {
                Assert.fail((String)"buffer recycled before the write operation completed");
            }
            buffer.recycle();
            Assert.assertEquals((long)1L, (long)partition.getTotalNumberOfBuffers());
            Assert.assertEquals((long)4096L, (long)partition.getTotalNumberOfBytes());
        }
    }

    @Test
    public void testReleaseOnSpillablePartitionWithoutViewWithSlowWriter() throws Exception {
        this.testReleaseOnSpillablePartitionWithSlowWriter(false);
    }

    @Test
    public void testReleaseOnSpillablePartitionWithViewWithSlowWriter() throws Exception {
        this.testReleaseOnSpillablePartitionWithSlowWriter(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testReleaseOnSpillablePartitionWithSlowWriter(boolean createView) throws Exception {
        IOManagerAsyncWithNoOpBufferFileWriter ioManager = new IOManagerAsyncWithNoOpBufferFileWriter();
        SpillableSubpartition partition = SpillableSubpartitionTest.createSubpartition((IOManager)ioManager);
        Buffer buffer1 = new Buffer(MemorySegmentFactory.allocateUnpooledSegment((int)4096), FreeingBufferRecycler.INSTANCE);
        Buffer buffer2 = new Buffer(MemorySegmentFactory.allocateUnpooledSegment((int)4096), FreeingBufferRecycler.INSTANCE);
        try {
            partition.add(buffer1);
            partition.add(buffer2);
            Assert.assertFalse((String)"buffer1 should not be recycled (still in the queue)", (boolean)buffer1.isRecycled());
            Assert.assertFalse((String)"buffer2 should not be recycled (still in the queue)", (boolean)buffer2.isRecycled());
            Assert.assertEquals((long)2L, (long)partition.getTotalNumberOfBuffers());
            Assert.assertEquals((long)8192L, (long)partition.getTotalNumberOfBytes());
            if (createView) {
                partition.finish();
                partition.createReadView(numBuffers -> {});
            }
            Assert.assertEquals((long)2L, (long)partition.releaseMemory());
            Assert.assertFalse((String)"buffer1 should not be recycled (advertised as nextBuffer)", (boolean)buffer1.isRecycled());
            Assert.assertFalse((String)"buffer2 should not be recycled (not written yet)", (boolean)buffer2.isRecycled());
        }
        catch (Throwable throwable) {
            ioManager.shutdown();
            if (!buffer1.isRecycled()) {
                buffer1.recycle();
            }
            if (!buffer2.isRecycled()) {
                buffer2.recycle();
            }
            Assert.assertEquals((long)(2 + (createView ? 1 : 0)), (long)partition.getTotalNumberOfBuffers());
            Assert.assertEquals((long)(8192 + (createView ? 4 : 0)), (long)partition.getTotalNumberOfBytes());
            throw throwable;
        }
        ioManager.shutdown();
        if (!buffer1.isRecycled()) {
            buffer1.recycle();
        }
        if (!buffer2.isRecycled()) {
            buffer2.recycle();
        }
        Assert.assertEquals((long)(2 + (createView ? 1 : 0)), (long)partition.getTotalNumberOfBuffers());
        Assert.assertEquals((long)(8192 + (createView ? 4 : 0)), (long)partition.getTotalNumberOfBytes());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAddOnSpilledPartitionWithFailingWriter() throws Exception {
        IOManagerAsyncWithClosedBufferFileWriter ioManager = new IOManagerAsyncWithClosedBufferFileWriter();
        SpillableSubpartition partition = SpillableSubpartitionTest.createSubpartition((IOManager)ioManager);
        Assert.assertEquals((long)0L, (long)partition.releaseMemory());
        this.exception.expect(IOException.class);
        Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment((int)4096), FreeingBufferRecycler.INSTANCE);
        try {
            partition.add(buffer);
        }
        finally {
            ioManager.shutdown();
            if (!buffer.isRecycled()) {
                buffer.recycle();
                Assert.fail((String)"buffer not recycled");
            }
            Assert.assertEquals((long)0L, (long)partition.getTotalNumberOfBuffers());
            Assert.assertEquals((long)0L, (long)partition.getTotalNumberOfBytes());
        }
    }

    @Test
    public void testCleanupReleasedSpillablePartitionNoView() throws Exception {
        this.testCleanupReleasedPartition(false, false);
    }

    @Test
    public void testCleanupReleasedSpillablePartitionWithView() throws Exception {
        this.testCleanupReleasedPartition(false, true);
    }

    @Test
    public void testCleanupReleasedSpilledPartitionNoView() throws Exception {
        this.testCleanupReleasedPartition(true, false);
    }

    @Test
    public void testCleanupReleasedSpilledPartitionWithView() throws Exception {
        this.testCleanupReleasedPartition(true, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testCleanupReleasedPartition(boolean spilled, boolean createView) throws Exception {
        boolean buffer2Recycled;
        boolean buffer1Recycled;
        SpillableSubpartition partition = this.createSubpartition();
        Buffer buffer1 = new Buffer(MemorySegmentFactory.allocateUnpooledSegment((int)4096), FreeingBufferRecycler.INSTANCE);
        Buffer buffer2 = new Buffer(MemorySegmentFactory.allocateUnpooledSegment((int)4096), FreeingBufferRecycler.INSTANCE);
        try {
            partition.add(buffer1);
            partition.add(buffer2);
            ResultSubpartitionView view = null;
            if (createView) {
                partition.finish();
                view = partition.createReadView(numBuffers -> {});
            }
            if (spilled) {
                Assert.assertEquals((long)2L, (long)partition.releaseMemory());
            }
            partition.release();
            Assert.assertTrue((boolean)partition.isReleased());
            if (createView) {
                Assert.assertTrue((boolean)view.isReleased());
            }
            Assert.assertTrue((boolean)buffer1.isRecycled());
        }
        finally {
            buffer1Recycled = buffer1.isRecycled();
            if (!buffer1Recycled) {
                buffer1.recycle();
            }
            if (!(buffer2Recycled = buffer2.isRecycled())) {
                buffer2.recycle();
            }
        }
        if (!buffer1Recycled) {
            Assert.fail((String)"buffer 1 not recycled");
        }
        if (!buffer2Recycled) {
            Assert.fail((String)"buffer 2 not recycled");
        }
        Assert.assertEquals((long)(createView ? 3L : 2L), (long)partition.getTotalNumberOfBuffers());
        Assert.assertEquals((long)((createView ? 4 : 0) + 8192), (long)partition.getTotalNumberOfBytes());
    }

    private static class IOManagerAsyncWithClosedBufferFileWriter
    extends IOManagerAsync {
        private IOManagerAsyncWithClosedBufferFileWriter() {
        }

        public BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) throws IOException {
            BufferFileWriter bufferFileWriter = super.createBufferFileWriter(channelID);
            bufferFileWriter.close();
            return bufferFileWriter;
        }
    }

    private static class AwaitableBufferAvailablityListener
    implements BufferAvailabilityListener {
        private long numNotifiedBuffers;

        private AwaitableBufferAvailablityListener() {
        }

        public void notifyBuffersAvailable(long numBuffers) {
            this.numNotifiedBuffers += numBuffers;
        }

        long getNumNotifiedBuffers() {
            return this.numNotifiedBuffers;
        }

        void awaitNotifications(long awaitedNumNotifiedBuffers, long timeoutMillis) throws InterruptedException {
            long deadline = System.currentTimeMillis() + timeoutMillis;
            while (this.numNotifiedBuffers < awaitedNumNotifiedBuffers && System.currentTimeMillis() < deadline) {
                Thread.sleep(1L);
            }
        }
    }
}

