/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter;
import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsyncWithNoOpBufferFileWriter;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.partition.AwaitableBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.SpillableSubpartition;
import org.apache.flink.runtime.io.network.partition.SpillableSubpartitionView;
import org.apache.flink.runtime.io.network.partition.SpilledSubpartitionView;
import org.apache.flink.runtime.io.network.partition.SubpartitionTestBase;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

public class SpillableSubpartitionTest
extends SubpartitionTestBase {
    private static final int BUFFER_DATA_SIZE = 4096;
    @Rule
    public ExpectedException exception = ExpectedException.none();
    private static final ExecutorService executorService = Executors.newCachedThreadPool();
    private static IOManager ioManager;

    @BeforeClass
    public static void setup() {
        ioManager = new IOManagerAsync();
    }

    @AfterClass
    public static void shutdown() {
        executorService.shutdownNow();
        ioManager.shutdown();
    }

    SpillableSubpartition createSubpartition() {
        return SpillableSubpartitionTest.createSubpartition(ioManager);
    }

    private static SpillableSubpartition createSubpartition(IOManager ioManager) {
        ResultPartition parent = (ResultPartition)Mockito.mock(ResultPartition.class);
        BufferProvider bufferProvider = (BufferProvider)Mockito.mock(BufferProvider.class);
        Mockito.when((Object)parent.getBufferProvider()).thenReturn((Object)bufferProvider);
        Mockito.when((Object)bufferProvider.getMemorySegmentSize()).thenReturn((Object)32768);
        return new SpillableSubpartition(0, parent, ioManager);
    }

    @Test
    public void testConcurrentFinishAndReleaseMemory() throws Exception {
        final CountDownLatch doneLatch = new CountDownLatch(1);
        final CountDownLatch blockLatch = new CountDownLatch(1);
        AsynchronousBufferFileWriter spillWriter = (AsynchronousBufferFileWriter)Mockito.mock(AsynchronousBufferFileWriter.class);
        ((AsynchronousBufferFileWriter)Mockito.doAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock invocation) throws Throwable {
                blockLatch.countDown();
                doneLatch.await();
                return null;
            }
        }).when((Object)spillWriter)).close();
        IOManager ioManager = (IOManager)Mockito.mock(IOManager.class);
        Mockito.when((Object)ioManager.createBufferFileWriter((FileIOChannel.ID)ArgumentMatchers.nullable(FileIOChannel.ID.class))).thenReturn((Object)spillWriter);
        final SpillableSubpartition partition = new SpillableSubpartition(0, (ResultPartition)Mockito.mock(ResultPartition.class), ioManager);
        Assert.assertEquals((long)0L, (long)partition.releaseMemory());
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<Void> blockingFinish = executor.submit(new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                partition.finish();
                return null;
            }
        });
        blockLatch.await();
        partition.releaseMemory();
        doneLatch.countDown();
        blockingFinish.get();
    }

    @Test
    public void testReleasePartitionAndGetNext() throws Exception {
        SpillableSubpartition partition = this.createSubpartition();
        partition.finish();
        ResultSubpartitionView readView = (ResultSubpartitionView)Mockito.spy((Object)partition.createReadView((BufferAvailabilityListener)new NoOpBufferAvailablityListener()));
        ((ResultSubpartitionView)Mockito.doNothing().when((Object)readView)).releaseAllResources();
        partition.release();
        Assert.assertNull((Object)readView.getNextBuffer());
    }

    @Test
    public void testConsumeSpilledPartition() throws Exception {
        SpillableSubpartition partition = this.createSubpartition();
        BufferConsumer bufferConsumer = BufferBuilderTestUtils.createFilledBufferConsumer(4096, 4096);
        BufferConsumer eventBufferConsumer = EventSerializer.toBufferConsumer((AbstractEvent)new CancelCheckpointMarker(1L));
        int eventSize = eventBufferConsumer.getWrittenBytes();
        partition.add(bufferConsumer.copy());
        partition.add(bufferConsumer.copy());
        partition.add(eventBufferConsumer);
        partition.add(bufferConsumer);
        Assert.assertEquals((long)4L, (long)partition.getTotalNumberOfBuffers());
        Assert.assertEquals((long)3L, (long)partition.getBuffersInBacklog());
        Assert.assertEquals((long)0L, (long)partition.getTotalNumberOfBytes());
        Assert.assertFalse((boolean)bufferConsumer.isRecycled());
        Assert.assertEquals((long)4L, (long)partition.releaseMemory());
        Assert.assertEquals((long)4L, (long)partition.getTotalNumberOfBuffers());
        Assert.assertEquals((long)3L, (long)partition.getBuffersInBacklog());
        Assert.assertEquals((long)(12288 + eventSize), (long)partition.getTotalNumberOfBytes());
        partition.finish();
        Assert.assertEquals((long)5L, (long)partition.getTotalNumberOfBuffers());
        Assert.assertEquals((long)3L, (long)partition.getBuffersInBacklog());
        Assert.assertEquals((long)(12288 + eventSize + 4), (long)partition.getTotalNumberOfBytes());
        AwaitableBufferAvailablityListener listener = new AwaitableBufferAvailablityListener();
        SpilledSubpartitionView reader = (SpilledSubpartitionView)partition.createReadView((BufferAvailabilityListener)listener);
        Assert.assertEquals((long)1L, (long)listener.getNumNotifications());
        Assert.assertFalse((boolean)reader.nextBufferIsEvent());
        SpillableSubpartitionTest.assertNextBuffer((ResultSubpartitionView)reader, 4096, true, 2, false, true);
        Assert.assertEquals((long)2L, (long)partition.getBuffersInBacklog());
        SpillableSubpartitionTest.assertNextBuffer((ResultSubpartitionView)reader, 4096, true, 1, true, true);
        Assert.assertEquals((long)1L, (long)partition.getBuffersInBacklog());
        SpillableSubpartitionTest.assertNextEvent((ResultSubpartitionView)reader, eventSize, CancelCheckpointMarker.class, true, 1, false, true);
        Assert.assertEquals((long)1L, (long)partition.getBuffersInBacklog());
        SpillableSubpartitionTest.assertNextBuffer((ResultSubpartitionView)reader, 4096, true, 0, true, true);
        Assert.assertEquals((long)0L, (long)partition.getBuffersInBacklog());
        SpillableSubpartitionTest.assertNextEvent((ResultSubpartitionView)reader, 4, EndOfPartitionEvent.class, false, 0, false, true);
        Assert.assertEquals((long)0L, (long)partition.getBuffersInBacklog());
        long deadline = System.currentTimeMillis() + 30000L;
        while (!bufferConsumer.isRecycled() && System.currentTimeMillis() < deadline) {
            Thread.sleep(1L);
        }
        Assert.assertTrue((boolean)bufferConsumer.isRecycled());
    }

    @Test
    public void testConsumeSpilledPartitionSpilledBeforeAdd() throws Exception {
        SpillableSubpartition partition = this.createSubpartition();
        Assert.assertEquals((long)0L, (long)partition.releaseMemory());
        BufferBuilder[] bufferBuilders = new BufferBuilder[]{BufferBuilderTestUtils.createBufferBuilder(4096), BufferBuilderTestUtils.createBufferBuilder(4096), BufferBuilderTestUtils.createBufferBuilder(4096), BufferBuilderTestUtils.createBufferBuilder(4096)};
        BufferConsumer[] bufferConsumers = (BufferConsumer[])Arrays.stream(bufferBuilders).map(BufferBuilder::createBufferConsumer).toArray(BufferConsumer[]::new);
        BufferConsumer eventBufferConsumer = EventSerializer.toBufferConsumer((AbstractEvent)new CancelCheckpointMarker(1L));
        int eventSize = eventBufferConsumer.getWrittenBytes();
        partition.add(bufferConsumers[0]);
        BufferBuilderTestUtils.fillBufferBuilder(bufferBuilders[0], 4096).finish();
        partition.add(bufferConsumers[1]);
        BufferBuilderTestUtils.fillBufferBuilder(bufferBuilders[1], 4096).finish();
        partition.add(eventBufferConsumer);
        partition.add(bufferConsumers[2]);
        bufferBuilders[2].finish();
        partition.add(bufferConsumers[3]);
        BufferBuilderTestUtils.fillBufferBuilder(bufferBuilders[3], 2048);
        int expectedSize = 8192 + eventSize;
        Assert.assertEquals((long)5L, (long)partition.getTotalNumberOfBuffers());
        Assert.assertEquals((long)3L, (long)partition.getBuffersInBacklog());
        Assert.assertEquals((long)expectedSize, (long)partition.getTotalNumberOfBytes());
        partition.finish();
        expectedSize += 2048;
        Assert.assertEquals((long)6L, (long)partition.getTotalNumberOfBuffers());
        Assert.assertEquals((long)3L, (long)partition.getBuffersInBacklog());
        Assert.assertEquals((long)(expectedSize += 4), (long)partition.getTotalNumberOfBytes());
        Arrays.stream(bufferConsumers).forEach(bufferConsumer -> Assert.assertTrue((boolean)bufferConsumer.isRecycled()));
        AwaitableBufferAvailablityListener listener = new AwaitableBufferAvailablityListener();
        SpilledSubpartitionView reader = (SpilledSubpartitionView)partition.createReadView((BufferAvailabilityListener)listener);
        Assert.assertEquals((long)1L, (long)listener.getNumNotifications());
        Assert.assertFalse((boolean)reader.nextBufferIsEvent());
        SpillableSubpartitionTest.assertNextBuffer((ResultSubpartitionView)reader, 4096, true, 2, false, true);
        Assert.assertEquals((long)2L, (long)partition.getBuffersInBacklog());
        SpillableSubpartitionTest.assertNextBuffer((ResultSubpartitionView)reader, 4096, true, 1, true, true);
        Assert.assertEquals((long)1L, (long)partition.getBuffersInBacklog());
        SpillableSubpartitionTest.assertNextEvent((ResultSubpartitionView)reader, eventSize, CancelCheckpointMarker.class, true, 1, false, true);
        Assert.assertEquals((long)1L, (long)partition.getBuffersInBacklog());
        SpillableSubpartitionTest.assertNextBuffer((ResultSubpartitionView)reader, 2048, true, 0, true, true);
        Assert.assertEquals((long)0L, (long)partition.getBuffersInBacklog());
        SpillableSubpartitionTest.assertNextEvent((ResultSubpartitionView)reader, 4, EndOfPartitionEvent.class, false, 0, false, true);
        Assert.assertEquals((long)0L, (long)partition.getBuffersInBacklog());
        Arrays.stream(bufferConsumers).forEach(bufferConsumer -> bufferConsumer.close());
    }

    @Test
    public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception {
        SpillableSubpartition partition = this.createSubpartition();
        BufferConsumer bufferConsumer = BufferBuilderTestUtils.createFilledBufferConsumer(4096, 4096);
        BufferConsumer eventBufferConsumer = EventSerializer.toBufferConsumer((AbstractEvent)new CancelCheckpointMarker(1L));
        int eventSize = eventBufferConsumer.getWrittenBytes();
        partition.add(bufferConsumer.copy());
        partition.add(bufferConsumer.copy());
        partition.add(eventBufferConsumer);
        partition.add(bufferConsumer);
        partition.finish();
        Assert.assertEquals((long)5L, (long)partition.getTotalNumberOfBuffers());
        Assert.assertEquals((long)3L, (long)partition.getBuffersInBacklog());
        Assert.assertEquals((long)0L, (long)partition.getTotalNumberOfBytes());
        AwaitableBufferAvailablityListener listener = new AwaitableBufferAvailablityListener();
        SpillableSubpartitionView reader = (SpillableSubpartitionView)partition.createReadView((BufferAvailabilityListener)listener);
        Assert.assertEquals((long)1L, (long)listener.getNumNotifications());
        Assert.assertFalse((boolean)bufferConsumer.isRecycled());
        Assert.assertFalse((boolean)reader.nextBufferIsEvent());
        SpillableSubpartitionTest.assertNextBuffer((ResultSubpartitionView)reader, 4096, true, 2, false, false);
        Assert.assertEquals((long)4096L, (long)partition.getTotalNumberOfBytes());
        Assert.assertEquals((long)2L, (long)partition.getBuffersInBacklog());
        Assert.assertEquals((long)1L, (long)listener.getNumNotifications());
        Assert.assertFalse((boolean)bufferConsumer.isRecycled());
        Assert.assertEquals((long)3L, (long)partition.releaseMemory());
        Assert.assertFalse((boolean)bufferConsumer.isRecycled());
        Assert.assertEquals((long)5L, (long)partition.getTotalNumberOfBuffers());
        Assert.assertEquals((long)2L, (long)partition.getBuffersInBacklog());
        Assert.assertEquals((long)(8192 + eventSize + 4), (long)partition.getTotalNumberOfBytes());
        listener.awaitNotifications(2L, 30000L);
        Assert.assertEquals((long)2L, (long)listener.getNumNotifications());
        Buffer buffer = bufferConsumer.build();
        buffer.retainBuffer();
        SpillableSubpartitionTest.assertNextBuffer((ResultSubpartitionView)reader, 4096, true, 1, true, false);
        Assert.assertEquals((long)(12288 + eventSize + 4), (long)partition.getTotalNumberOfBytes());
        Assert.assertEquals((long)1L, (long)partition.getBuffersInBacklog());
        bufferConsumer.close();
        SpillableSubpartitionTest.assertNextEvent((ResultSubpartitionView)reader, eventSize, CancelCheckpointMarker.class, true, 1, false, true);
        Assert.assertEquals((long)(12288 + eventSize + 4), (long)partition.getTotalNumberOfBytes());
        Assert.assertEquals((long)1L, (long)partition.getBuffersInBacklog());
        SpillableSubpartitionTest.assertNextBuffer((ResultSubpartitionView)reader, 4096, true, 0, true, true);
        Assert.assertEquals((long)(12288 + eventSize + 4), (long)partition.getTotalNumberOfBytes());
        Assert.assertEquals((long)0L, (long)partition.getBuffersInBacklog());
        buffer.recycleBuffer();
        Assert.assertTrue((boolean)buffer.isRecycled());
        SpillableSubpartitionTest.assertNextEvent((ResultSubpartitionView)reader, 4, EndOfPartitionEvent.class, false, 0, false, true);
        Assert.assertEquals((long)(12288 + eventSize + 4), (long)partition.getTotalNumberOfBytes());
        Assert.assertEquals((long)0L, (long)partition.getBuffersInBacklog());
        long deadline = System.currentTimeMillis() + 30000L;
        while (!bufferConsumer.isRecycled() && System.currentTimeMillis() < deadline) {
            Thread.sleep(1L);
        }
        Assert.assertTrue((boolean)bufferConsumer.isRecycled());
    }

    @Test
    public void testAddOnFinishedSpillablePartition() throws Exception {
        this.testAddOnFinishedPartition(false);
    }

    @Test
    public void testAddOnFinishedSpilledPartition() throws Exception {
        this.testAddOnFinishedPartition(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testAddOnFinishedPartition(boolean spilled) throws Exception {
        SpillableSubpartition partition = this.createSubpartition();
        if (spilled) {
            Assert.assertEquals((long)0L, (long)partition.releaseMemory());
        }
        partition.finish();
        Assert.assertEquals((long)1L, (long)partition.getTotalNumberOfBuffers());
        Assert.assertEquals((long)(spilled ? 4L : 0L), (long)partition.getTotalNumberOfBytes());
        BufferConsumer buffer = BufferBuilderTestUtils.createFilledBufferConsumer(4096, 4096);
        try {
            partition.add(buffer);
        }
        finally {
            if (!buffer.isRecycled()) {
                buffer.close();
                Assert.fail((String)"buffer not recycled");
            }
        }
        Assert.assertEquals((long)1L, (long)partition.getTotalNumberOfBuffers());
        Assert.assertEquals((long)(spilled ? 4L : 0L), (long)partition.getTotalNumberOfBytes());
    }

    @Test
    public void testAddOnReleasedSpillablePartition() throws Exception {
        this.testAddOnReleasedPartition(false);
    }

    @Test
    public void testAddOnReleasedSpilledPartition() throws Exception {
        this.testAddOnReleasedPartition(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testAddOnReleasedPartition(boolean spilled) throws Exception {
        boolean bufferRecycled;
        SpillableSubpartition partition = this.createSubpartition();
        partition.release();
        if (spilled) {
            Assert.assertEquals((long)0L, (long)partition.releaseMemory());
        }
        BufferConsumer buffer = BufferBuilderTestUtils.createFilledBufferConsumer(4096, 4096);
        try {
            partition.add(buffer);
        }
        finally {
            bufferRecycled = buffer.isRecycled();
            if (!bufferRecycled) {
                buffer.close();
            }
        }
        if (!bufferRecycled) {
            Assert.fail((String)"buffer not recycled");
        }
        Assert.assertEquals((long)0L, (long)partition.getTotalNumberOfBuffers());
        Assert.assertEquals((long)0L, (long)partition.getTotalNumberOfBytes());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAddOnSpilledPartitionWithSlowWriter() throws Exception {
        boolean bufferRecycled;
        IOManagerAsyncWithNoOpBufferFileWriter ioManager = new IOManagerAsyncWithNoOpBufferFileWriter();
        SpillableSubpartition partition = SpillableSubpartitionTest.createSubpartition((IOManager)ioManager);
        Assert.assertEquals((long)0L, (long)partition.releaseMemory());
        BufferConsumer buffer = BufferBuilderTestUtils.createFilledBufferConsumer(4096, 4096);
        try {
            partition.add(buffer);
        }
        finally {
            ioManager.shutdown();
            bufferRecycled = buffer.isRecycled();
            if (!bufferRecycled) {
                buffer.close();
            }
        }
        if (bufferRecycled) {
            Assert.fail((String)"buffer recycled before the write operation completed");
        }
        Assert.assertEquals((long)1L, (long)partition.getTotalNumberOfBuffers());
        Assert.assertEquals((long)4096L, (long)partition.getTotalNumberOfBytes());
    }

    @Test
    public void testReleaseOnSpillablePartitionWithoutViewWithSlowWriter() throws Exception {
        this.testReleaseOnSpillablePartitionWithSlowWriter(false);
    }

    @Test
    public void testReleaseOnSpillablePartitionWithViewWithSlowWriter() throws Exception {
        this.testReleaseOnSpillablePartitionWithSlowWriter(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testReleaseOnSpillablePartitionWithSlowWriter(boolean createView) throws Exception {
        IOManagerAsyncWithNoOpBufferFileWriter ioManager = new IOManagerAsyncWithNoOpBufferFileWriter();
        SpillableSubpartition partition = SpillableSubpartitionTest.createSubpartition((IOManager)ioManager);
        BufferConsumer buffer1 = BufferBuilderTestUtils.createFilledBufferConsumer(4096, 4096);
        BufferConsumer buffer2 = BufferBuilderTestUtils.createFilledBufferConsumer(4096, 4096);
        try {
            partition.add(buffer1);
            partition.add(buffer2);
            Assert.assertFalse((String)"buffer1 should not be recycled (still in the queue)", (boolean)buffer1.isRecycled());
            Assert.assertFalse((String)"buffer2 should not be recycled (still in the queue)", (boolean)buffer2.isRecycled());
            Assert.assertEquals((long)2L, (long)partition.getTotalNumberOfBuffers());
            Assert.assertEquals((long)0L, (long)partition.getTotalNumberOfBytes());
            if (createView) {
                partition.finish();
                partition.createReadView((BufferAvailabilityListener)new NoOpBufferAvailablityListener());
                Assert.assertEquals((long)0L, (long)partition.getTotalNumberOfBytes());
            }
            Assert.assertEquals((long)2L, (long)partition.releaseMemory());
            Assert.assertFalse((String)"buffer1 should not be recycled (advertised as nextBuffer)", (boolean)buffer1.isRecycled());
            Assert.assertFalse((String)"buffer2 should not be recycled (not written yet)", (boolean)buffer2.isRecycled());
        }
        finally {
            ioManager.shutdown();
            if (!buffer1.isRecycled()) {
                buffer1.close();
            }
            if (!buffer2.isRecycled()) {
                buffer2.close();
            }
        }
        Assert.assertEquals((long)(2 + (createView ? 1 : 0)), (long)partition.getTotalNumberOfBuffers());
        Assert.assertEquals((long)(4096 + (createView ? 4 : 4096)), (long)partition.getTotalNumberOfBytes());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAddOnSpilledPartitionWithFailingWriter() throws Exception {
        boolean bufferRecycled;
        IOManagerAsyncWithClosedBufferFileWriter ioManager = new IOManagerAsyncWithClosedBufferFileWriter();
        SpillableSubpartition partition = SpillableSubpartitionTest.createSubpartition((IOManager)ioManager);
        Assert.assertEquals((long)0L, (long)partition.releaseMemory());
        this.exception.expect(IOException.class);
        BufferConsumer buffer = BufferBuilderTestUtils.createFilledBufferConsumer(4096, 4096);
        try {
            partition.add(buffer);
        }
        finally {
            ioManager.shutdown();
            bufferRecycled = buffer.isRecycled();
            if (!bufferRecycled) {
                buffer.close();
            }
        }
        if (!bufferRecycled) {
            Assert.fail((String)"buffer not recycled");
        }
        Assert.assertEquals((long)0L, (long)partition.getTotalNumberOfBuffers());
        Assert.assertEquals((long)0L, (long)partition.getTotalNumberOfBytes());
    }

    @Test
    public void testCleanupReleasedSpillablePartitionNoView() throws Exception {
        this.testCleanupReleasedPartition(false, false);
    }

    @Test
    public void testCleanupReleasedSpillablePartitionWithView() throws Exception {
        this.testCleanupReleasedPartition(false, true);
    }

    @Test
    public void testCleanupReleasedSpilledPartitionNoView() throws Exception {
        this.testCleanupReleasedPartition(true, false);
    }

    @Test
    public void testCleanupReleasedSpilledPartitionWithView() throws Exception {
        this.testCleanupReleasedPartition(true, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testCleanupReleasedPartition(boolean spilled, boolean createView) throws Exception {
        boolean buffer2Recycled;
        boolean buffer1Recycled;
        SpillableSubpartition partition = this.createSubpartition();
        BufferConsumer buffer1 = BufferBuilderTestUtils.createFilledBufferConsumer(4096, 4096);
        BufferConsumer buffer2 = BufferBuilderTestUtils.createFilledBufferConsumer(4096, 4096);
        try {
            partition.add(buffer1);
            partition.add(buffer2);
            ResultSubpartitionView view = null;
            if (createView) {
                partition.finish();
                view = partition.createReadView((BufferAvailabilityListener)new NoOpBufferAvailablityListener());
            }
            if (spilled) {
                Assert.assertEquals((long)2L, (long)partition.releaseMemory());
            }
            partition.release();
            Assert.assertTrue((boolean)partition.isReleased());
            if (createView) {
                Assert.assertTrue((boolean)view.isReleased());
            }
            Assert.assertTrue((boolean)buffer1.isRecycled());
        }
        finally {
            buffer1Recycled = buffer1.isRecycled();
            if (!buffer1Recycled) {
                buffer1.close();
            }
            if (!(buffer2Recycled = buffer2.isRecycled())) {
                buffer2.close();
            }
        }
        if (!buffer1Recycled) {
            Assert.fail((String)"buffer 1 not recycled");
        }
        if (!buffer2Recycled) {
            Assert.fail((String)"buffer 2 not recycled");
        }
        Assert.assertEquals((long)(createView ? 3L : 2L), (long)partition.getTotalNumberOfBuffers());
        if (spilled) {
            Assert.assertEquals((long)(4096 + (createView ? 4 : 4096)), (long)partition.getTotalNumberOfBytes());
        } else {
            Assert.assertEquals((long)0L, (long)partition.getTotalNumberOfBytes());
        }
    }

    @Test
    public void testSpillFinishedBufferConsumersFull() throws Exception {
        SpillableSubpartition partition = this.createSubpartition();
        BufferBuilder bufferBuilder = BufferBuilderTestUtils.createBufferBuilder(4096);
        partition.add(bufferBuilder.createBufferConsumer());
        Assert.assertEquals((long)0L, (long)partition.releaseMemory());
        Assert.assertEquals((long)1L, (long)partition.getBuffersInBacklog());
        BufferBuilderTestUtils.fillBufferBuilder(bufferBuilder, 4096).finish();
        Assert.assertEquals((long)4096L, (long)partition.spillFinishedBufferConsumers(false));
        Assert.assertEquals((long)1L, (long)partition.getBuffersInBacklog());
    }

    @Test
    public void testSpillFinishedBufferConsumersPartial() throws Exception {
        SpillableSubpartition partition = this.createSubpartition();
        BufferBuilder bufferBuilder = BufferBuilderTestUtils.createBufferBuilder(8192);
        partition.add(bufferBuilder.createBufferConsumer());
        BufferBuilderTestUtils.fillBufferBuilder(bufferBuilder, 4096);
        Assert.assertEquals((long)0L, (long)partition.releaseMemory());
        Assert.assertEquals((long)2L, (long)partition.getBuffersInBacklog());
        BufferBuilderTestUtils.fillBufferBuilder(bufferBuilder, 4096).finish();
        Assert.assertEquals((long)4096L, (long)partition.spillFinishedBufferConsumers(false));
        Assert.assertEquals((long)2L, (long)partition.getBuffersInBacklog());
    }

    private static class IOManagerAsyncWithClosedBufferFileWriter
    extends IOManagerAsync {
        private IOManagerAsyncWithClosedBufferFileWriter() {
        }

        public BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) throws IOException {
            BufferFileWriter bufferFileWriter = super.createBufferFileWriter(channelID);
            bufferFileWriter.close();
            return bufferFileWriter;
        }
    }
}

