/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.SpilledSubpartitionView;
import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.junit.AfterClass;
import org.junit.Test;
import org.mockito.Mockito;

public class SpilledSubpartitionViewTest {
    private static final IOManager IO_MANAGER = new IOManagerAsync();
    private static final TestInfiniteBufferProvider writerBufferPool = new TestInfiniteBufferProvider();

    @AfterClass
    public static void shutdown() {
        IO_MANAGER.shutdown();
    }

    @Test
    public void testWriteConsume() throws Exception {
        int numberOfBuffersToWrite = 512;
        BufferFileWriter writer = SpilledSubpartitionViewTest.createWriterAndWriteBuffers(IO_MANAGER, writerBufferPool, 512);
        writer.close();
        TestPooledBufferProvider viewBufferPool = new TestPooledBufferProvider(1);
        TestSubpartitionConsumer consumer = new TestSubpartitionConsumer(false, new TestConsumerCallback.RecyclingCallback());
        SpilledSubpartitionView view = new SpilledSubpartitionView((ResultSubpartition)Mockito.mock(ResultSubpartition.class), viewBufferPool.getMemorySegmentSize(), writer, 513L, (BufferAvailabilityListener)consumer);
        consumer.setSubpartitionView((ResultSubpartitionView)view);
        consumer.call();
    }

    @Test
    public void testConsumeWithFewBuffers() throws Exception {
        int numberOfBuffersToWrite = 512;
        BufferFileWriter writer = SpilledSubpartitionViewTest.createWriterAndWriteBuffers(IO_MANAGER, writerBufferPool, 512);
        writer.close();
        TestSubpartitionConsumer consumer = new TestSubpartitionConsumer(false, new TestConsumerCallback.RecyclingCallback());
        SpilledSubpartitionView view = new SpilledSubpartitionView((ResultSubpartition)Mockito.mock(ResultSubpartition.class), 32768, writer, 513L, (BufferAvailabilityListener)consumer);
        consumer.setSubpartitionView((ResultSubpartitionView)view);
        consumer.call();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReadMultipleFilesWithSingleBufferPool() throws Exception {
        BufferFileWriter[] readers;
        ExecutorService executor;
        block17: {
            executor = null;
            BufferFileWriter[] writers = null;
            readers = null;
            try {
                executor = Executors.newCachedThreadPool();
                writers = new BufferFileWriter[]{SpilledSubpartitionViewTest.createWriterAndWriteBuffers(IO_MANAGER, writerBufferPool, 512), SpilledSubpartitionViewTest.createWriterAndWriteBuffers(IO_MANAGER, writerBufferPool, 512)};
                readers = new ResultSubpartitionView[writers.length];
                TestSubpartitionConsumer[] consumers = new TestSubpartitionConsumer[writers.length];
                TestPooledBufferProvider inputBuffers = new TestPooledBufferProvider(2);
                ResultSubpartition parent = (ResultSubpartition)Mockito.mock(ResultSubpartition.class);
                for (BufferFileWriter writer : writers) {
                    writer.close();
                }
                for (int i = 0; i < readers.length; ++i) {
                    consumers[i] = new TestSubpartitionConsumer(false, new TestConsumerCallback.RecyclingCallback());
                    readers[i] = new SpilledSubpartitionView(parent, inputBuffers.getMemorySegmentSize(), writers[i], 513L, (BufferAvailabilityListener)consumers[i]);
                    consumers[i].setSubpartitionView((ResultSubpartitionView)readers[i]);
                }
                ArrayList results = Lists.newArrayList();
                for (TestSubpartitionConsumer consumer : consumers) {
                    results.add(executor.submit(consumer));
                }
                for (Future res : results) {
                    try {
                        res.get(2L, TimeUnit.MINUTES);
                    }
                    catch (TimeoutException e) {
                        throw new TimeoutException("There has been a timeout in the test. This indicates that there is a bug/deadlock in the tested subpartition view.");
                    }
                }
                if (writers == null) break block17;
            }
            catch (Throwable throwable) {
                if (writers != null) {
                    for (void var16_26 : writers) {
                        if (var16_26 == null) continue;
                        var16_26.deleteChannel();
                    }
                }
                if (readers != null) {
                    for (BufferFileWriter bufferFileWriter : readers) {
                        if (bufferFileWriter == null) continue;
                        bufferFileWriter.releaseAllResources();
                    }
                }
                if (executor != null) {
                    executor.shutdown();
                }
                throw throwable;
            }
            for (BufferFileWriter writer : writers) {
                if (writer == null) continue;
                writer.deleteChannel();
            }
        }
        if (readers != null) {
            for (BufferFileWriter reader : readers) {
                if (reader == null) continue;
                reader.releaseAllResources();
            }
        }
        if (executor != null) {
            executor.shutdown();
        }
    }

    static BufferFileWriter createWriterAndWriteBuffers(IOManager ioManager, BufferProvider bufferProvider, int numberOfBuffers) throws IOException {
        BufferFileWriter writer = ioManager.createBufferFileWriter(ioManager.createChannel());
        for (int i = 0; i < numberOfBuffers; ++i) {
            writer.writeBlock((Object)bufferProvider.requestBuffer());
        }
        writer.writeBlock((Object)EventSerializer.toBuffer((AbstractEvent)EndOfPartitionEvent.INSTANCE));
        return writer;
    }
}

