/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.disk;

import java.io.EOFException;
import java.util.Collection;
import java.util.List;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.TestData;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class ChannelViewsTest {
    private static final long SEED = 649180756312423613L;
    private static final int KEY_MAX = Integer.MAX_VALUE;
    private static final int VALUE_SHORT_LENGTH = 114;
    private static final int VALUE_LONG_LENGTH = 114688;
    private static final int NUM_PAIRS_SHORT = 1000000;
    private static final int NUM_PAIRS_LONG = 3000;
    private static final int MEMORY_SIZE = 0x100000;
    private static final int MEMORY_PAGE_SIZE = 65536;
    private static final int NUM_MEMORY_SEGMENTS = 3;
    private final AbstractInvokable parentTask = new DummyInvokable();
    private IOManager ioManager;
    private MemoryManager memoryManager;

    @Before
    public void beforeTest() {
        this.memoryManager = new MemoryManager(0x100000L, 1, 65536, MemoryType.HEAP, true);
        this.ioManager = new IOManagerAsync();
    }

    @After
    public void afterTest() {
        this.ioManager.shutdown();
        if (!this.ioManager.isProperlyShutDown()) {
            Assert.fail((String)"I/O Manager was not properly shut down.");
        }
        if (this.memoryManager != null) {
            Assert.assertTrue((String)"Memory leak: not all segments have been returned to the memory manager.", (boolean)this.memoryManager.verifyEmpty());
            this.memoryManager.shutdown();
            this.memoryManager = null;
        }
    }

    @Test
    public void testWriteReadSmallRecords() throws Exception {
        TestData.TupleGenerator generator = new TestData.TupleGenerator(649180756312423613L, Integer.MAX_VALUE, 114, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.RANDOM_LENGTH);
        FileIOChannel.ID channel = this.ioManager.createChannel();
        TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
        List memory = this.memoryManager.allocatePages((Object)this.parentTask, 3);
        BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
        ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, 65536);
        Tuple2 rec = new Tuple2();
        for (int i = 0; i < 1000000; ++i) {
            generator.next((Tuple2<Integer, String>)rec);
            serializer.serialize((Object)rec, (DataOutputView)outView);
        }
        this.memoryManager.release((Collection)outView.close());
        memory = this.memoryManager.allocatePages((Object)this.parentTask, 3);
        BlockChannelReader reader = this.ioManager.createBlockChannelReader(channel);
        ChannelReaderInputView inView = new ChannelReaderInputView(reader, memory, outView.getBlockCount(), true);
        generator.reset();
        Tuple2 readRec = new Tuple2();
        for (int i = 0; i < 1000000; ++i) {
            generator.next((Tuple2<Integer, String>)rec);
            serializer.deserialize((Object)readRec, (DataInputView)inView);
            int k1 = (Integer)rec.f0;
            String v1 = (String)rec.f1;
            int k2 = (Integer)readRec.f0;
            String v2 = (String)readRec.f1;
            Assert.assertTrue((String)"The re-generated and the read record do not match.", (k1 == k2 && v1.equals(v2) ? 1 : 0) != 0);
        }
        this.memoryManager.release((Collection)inView.close());
        reader.deleteChannel();
    }

    @Test
    public void testWriteAndReadLongRecords() throws Exception {
        TestData.TupleGenerator generator = new TestData.TupleGenerator(649180756312423613L, Integer.MAX_VALUE, 114688, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.RANDOM_LENGTH);
        FileIOChannel.ID channel = this.ioManager.createChannel();
        TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
        List memory = this.memoryManager.allocatePages((Object)this.parentTask, 3);
        BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
        ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, 65536);
        Tuple2 rec = new Tuple2();
        for (int i = 0; i < 3000; ++i) {
            generator.next((Tuple2<Integer, String>)rec);
            serializer.serialize((Object)rec, (DataOutputView)outView);
        }
        this.memoryManager.release((Collection)outView.close());
        memory = this.memoryManager.allocatePages((Object)this.parentTask, 3);
        BlockChannelReader reader = this.ioManager.createBlockChannelReader(channel);
        ChannelReaderInputView inView = new ChannelReaderInputView(reader, memory, outView.getBlockCount(), true);
        generator.reset();
        Tuple2 readRec = new Tuple2();
        for (int i = 0; i < 3000; ++i) {
            generator.next((Tuple2<Integer, String>)rec);
            serializer.deserialize((Object)readRec, (DataInputView)inView);
            int k1 = (Integer)rec.f0;
            String v1 = (String)rec.f1;
            int k2 = (Integer)readRec.f0;
            String v2 = (String)readRec.f1;
            Assert.assertTrue((String)"The re-generated and the read record do not match.", (k1 == k2 && v1.equals(v2) ? 1 : 0) != 0);
        }
        this.memoryManager.release((Collection)inView.close());
        reader.deleteChannel();
    }

    @Test
    public void testReadTooMany() throws Exception {
        TestData.TupleGenerator generator = new TestData.TupleGenerator(649180756312423613L, Integer.MAX_VALUE, 114, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.RANDOM_LENGTH);
        FileIOChannel.ID channel = this.ioManager.createChannel();
        TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
        List memory = this.memoryManager.allocatePages((Object)this.parentTask, 3);
        BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
        ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, 65536);
        Tuple2 rec = new Tuple2();
        for (int i = 0; i < 1000000; ++i) {
            generator.next((Tuple2<Integer, String>)rec);
            serializer.serialize((Object)rec, (DataOutputView)outView);
        }
        this.memoryManager.release((Collection)outView.close());
        memory = this.memoryManager.allocatePages((Object)this.parentTask, 3);
        BlockChannelReader reader = this.ioManager.createBlockChannelReader(channel);
        ChannelReaderInputView inView = new ChannelReaderInputView(reader, memory, outView.getBlockCount(), true);
        generator.reset();
        try {
            Tuple2 readRec = new Tuple2();
            for (int i = 0; i < 1000001; ++i) {
                generator.next((Tuple2<Integer, String>)rec);
                serializer.deserialize((Object)readRec, (DataInputView)inView);
                int k1 = (Integer)rec.f0;
                String v1 = (String)rec.f1;
                int k2 = (Integer)readRec.f0;
                String v2 = (String)readRec.f1;
                Assert.assertTrue((String)"The re-generated and the read record do not match.", (k1 == k2 && v1.equals(v2) ? 1 : 0) != 0);
            }
            Assert.fail((String)"Expected an EOFException which did not occur.");
        }
        catch (EOFException readRec) {
        }
        catch (Throwable t) {
            Assert.fail((String)("Unexpected Exception: " + t.getMessage()));
        }
        this.memoryManager.release((Collection)inView.close());
        reader.deleteChannel();
    }

    @Test
    public void testReadWithoutKnownBlockCount() throws Exception {
        TestData.TupleGenerator generator = new TestData.TupleGenerator(649180756312423613L, Integer.MAX_VALUE, 114, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.RANDOM_LENGTH);
        FileIOChannel.ID channel = this.ioManager.createChannel();
        TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
        List memory = this.memoryManager.allocatePages((Object)this.parentTask, 3);
        BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
        ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, 65536);
        Tuple2 rec = new Tuple2();
        for (int i = 0; i < 1000000; ++i) {
            generator.next((Tuple2<Integer, String>)rec);
            serializer.serialize((Object)rec, (DataOutputView)outView);
        }
        this.memoryManager.release((Collection)outView.close());
        memory = this.memoryManager.allocatePages((Object)this.parentTask, 3);
        BlockChannelReader reader = this.ioManager.createBlockChannelReader(channel);
        ChannelReaderInputView inView = new ChannelReaderInputView(reader, memory, true);
        generator.reset();
        Tuple2 readRec = new Tuple2();
        for (int i = 0; i < 1000000; ++i) {
            generator.next((Tuple2<Integer, String>)rec);
            serializer.deserialize((Object)readRec, (DataInputView)inView);
            int k1 = (Integer)rec.f0;
            String v1 = (String)rec.f1;
            int k2 = (Integer)readRec.f0;
            String v2 = (String)readRec.f1;
            Assert.assertTrue((String)"The re-generated and the read record do not match.", (k1 == k2 && v1.equals(v2) ? 1 : 0) != 0);
        }
        this.memoryManager.release((Collection)inView.close());
        reader.deleteChannel();
    }

    @Test
    public void testWriteReadOneBufferOnly() throws Exception {
        TestData.TupleGenerator generator = new TestData.TupleGenerator(649180756312423613L, Integer.MAX_VALUE, 114, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.RANDOM_LENGTH);
        FileIOChannel.ID channel = this.ioManager.createChannel();
        TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
        List memory = this.memoryManager.allocatePages((Object)this.parentTask, 1);
        BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
        ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, 65536);
        Tuple2 rec = new Tuple2();
        for (int i = 0; i < 1000000; ++i) {
            generator.next((Tuple2<Integer, String>)rec);
            serializer.serialize((Object)rec, (DataOutputView)outView);
        }
        this.memoryManager.release((Collection)outView.close());
        memory = this.memoryManager.allocatePages((Object)this.parentTask, 1);
        BlockChannelReader reader = this.ioManager.createBlockChannelReader(channel);
        ChannelReaderInputView inView = new ChannelReaderInputView(reader, memory, outView.getBlockCount(), true);
        generator.reset();
        Tuple2 readRec = new Tuple2();
        for (int i = 0; i < 1000000; ++i) {
            generator.next((Tuple2<Integer, String>)rec);
            serializer.deserialize((Object)readRec, (DataInputView)inView);
            int k1 = (Integer)rec.f0;
            String v1 = (String)rec.f1;
            int k2 = (Integer)readRec.f0;
            String v2 = (String)readRec.f1;
            Assert.assertTrue((String)"The re-generated and the read record do not match.", (k1 == k2 && v1.equals(v2) ? 1 : 0) != 0);
        }
        this.memoryManager.release((Collection)inView.close());
        reader.deleteChannel();
    }

    @Test
    public void testWriteReadNotAll() throws Exception {
        TestData.TupleGenerator generator = new TestData.TupleGenerator(649180756312423613L, Integer.MAX_VALUE, 114, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.RANDOM_LENGTH);
        FileIOChannel.ID channel = this.ioManager.createChannel();
        TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
        List memory = this.memoryManager.allocatePages((Object)this.parentTask, 3);
        BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
        ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, 65536);
        Tuple2 rec = new Tuple2();
        for (int i = 0; i < 1000000; ++i) {
            generator.next((Tuple2<Integer, String>)rec);
            serializer.serialize((Object)rec, (DataOutputView)outView);
        }
        this.memoryManager.release((Collection)outView.close());
        memory = this.memoryManager.allocatePages((Object)this.parentTask, 3);
        BlockChannelReader reader = this.ioManager.createBlockChannelReader(channel);
        ChannelReaderInputView inView = new ChannelReaderInputView(reader, memory, outView.getBlockCount(), true);
        generator.reset();
        Tuple2 readRec = new Tuple2();
        for (int i = 0; i < 500000; ++i) {
            generator.next((Tuple2<Integer, String>)rec);
            serializer.deserialize((Object)readRec, (DataInputView)inView);
            int k1 = (Integer)rec.f0;
            String v1 = (String)rec.f1;
            int k2 = (Integer)readRec.f0;
            String v2 = (String)readRec.f1;
            Assert.assertTrue((String)"The re-generated and the read record do not match.", (k1 == k2 && v1.equals(v2) ? 1 : 0) != 0);
        }
        this.memoryManager.release((Collection)inView.close());
        reader.deleteChannel();
    }
}

