/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.partition.AwaitableBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.PipelinedSubpartition;
import org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.SubpartitionTestBase;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

public class PipelinedSubpartitionWithReadViewTest {
    private PipelinedSubpartition subpartition;
    private AwaitableBufferAvailablityListener availablityListener;
    private PipelinedSubpartitionView readView;

    @Before
    public void setup() throws IOException {
        ResultPartition parent = (ResultPartition)Mockito.mock(ResultPartition.class);
        this.subpartition = new PipelinedSubpartition(0, parent);
        this.availablityListener = new AwaitableBufferAvailablityListener();
        this.readView = this.subpartition.createReadView((BufferAvailabilityListener)this.availablityListener);
    }

    @After
    public void tearDown() {
        this.readView.releaseAllResources();
        this.subpartition.release();
    }

    @Test(expected=IllegalStateException.class)
    public void testAddTwoNonFinishedBuffer() {
        this.subpartition.add(BufferBuilderTestUtils.createBufferBuilder().createBufferConsumer());
        this.subpartition.add(BufferBuilderTestUtils.createBufferBuilder().createBufferConsumer());
        Assert.assertNull((Object)this.readView.getNextBuffer());
    }

    @Test
    public void testAddEmptyNonFinishedBuffer() {
        Assert.assertEquals((long)0L, (long)this.availablityListener.getNumNotifications());
        BufferBuilder bufferBuilder = BufferBuilderTestUtils.createBufferBuilder();
        this.subpartition.add(bufferBuilder.createBufferConsumer());
        Assert.assertEquals((long)0L, (long)this.availablityListener.getNumNotifications());
        Assert.assertNull((Object)this.readView.getNextBuffer());
        bufferBuilder.finish();
        bufferBuilder = BufferBuilderTestUtils.createBufferBuilder();
        this.subpartition.add(bufferBuilder.createBufferConsumer());
        Assert.assertEquals((long)1L, (long)this.availablityListener.getNumNotifications());
        Assert.assertNull((Object)this.readView.getNextBuffer());
        Assert.assertEquals((long)1L, (long)this.subpartition.getBuffersInBacklog());
    }

    @Test
    public void testAddNonEmptyNotFinishedBuffer() throws Exception {
        Assert.assertEquals((long)0L, (long)this.availablityListener.getNumNotifications());
        BufferBuilder bufferBuilder = BufferBuilderTestUtils.createBufferBuilder();
        bufferBuilder.appendAndCommit(ByteBuffer.allocate(1024));
        this.subpartition.add(bufferBuilder.createBufferConsumer());
        SubpartitionTestBase.assertNextBuffer((ResultSubpartitionView)this.readView, 1024, false, 1, false, false);
        Assert.assertEquals((long)1L, (long)this.subpartition.getBuffersInBacklog());
    }

    @Test
    public void testUnfinishedBufferBehindFinished() throws Exception {
        this.subpartition.add(BufferBuilderTestUtils.createFilledBufferConsumer(1025));
        this.subpartition.add(BufferBuilderTestUtils.createFilledBufferBuilder(1024).createBufferConsumer());
        MatcherAssert.assertThat((Object)this.availablityListener.getNumNotifications(), (Matcher)Matchers.greaterThan((Comparable)Long.valueOf(0L)));
        SubpartitionTestBase.assertNextBuffer((ResultSubpartitionView)this.readView, 1025, false, 1, false, true);
        SubpartitionTestBase.assertNextBuffer((ResultSubpartitionView)this.readView, 1024, false, 1, false, false);
        SubpartitionTestBase.assertNoNextBuffer((ResultSubpartitionView)this.readView);
    }

    @Test
    public void testFlushWithUnfinishedBufferBehindFinished() throws Exception {
        this.subpartition.add(BufferBuilderTestUtils.createFilledBufferConsumer(1025));
        this.subpartition.add(BufferBuilderTestUtils.createFilledBufferBuilder(1024).createBufferConsumer());
        long oldNumNotifications = this.availablityListener.getNumNotifications();
        this.subpartition.flush();
        MatcherAssert.assertThat((Object)oldNumNotifications, (Matcher)Matchers.greaterThan((Comparable)Long.valueOf(0L)));
        Assert.assertEquals((long)oldNumNotifications, (long)this.availablityListener.getNumNotifications());
        SubpartitionTestBase.assertNextBuffer((ResultSubpartitionView)this.readView, 1025, true, 1, false, true);
        SubpartitionTestBase.assertNextBuffer((ResultSubpartitionView)this.readView, 1024, false, 1, false, false);
        SubpartitionTestBase.assertNoNextBuffer((ResultSubpartitionView)this.readView);
    }

    @Test
    public void testFlushWithUnfinishedBufferBehindFinished2() throws Exception {
        this.subpartition.flush();
        Assert.assertEquals((long)0L, (long)this.availablityListener.getNumNotifications());
        this.subpartition.add(BufferBuilderTestUtils.createFilledBufferConsumer(1025));
        this.subpartition.add(BufferBuilderTestUtils.createFilledBufferBuilder(1024).createBufferConsumer());
        SubpartitionTestBase.assertNextBuffer((ResultSubpartitionView)this.readView, 1025, false, 1, false, true);
        long oldNumNotifications = this.availablityListener.getNumNotifications();
        this.subpartition.flush();
        Assert.assertEquals((long)(oldNumNotifications + 1L), (long)this.availablityListener.getNumNotifications());
        this.subpartition.flush();
        Assert.assertEquals((long)(oldNumNotifications + 1L), (long)this.availablityListener.getNumNotifications());
        SubpartitionTestBase.assertNextBuffer((ResultSubpartitionView)this.readView, 1024, false, 1, false, false);
        SubpartitionTestBase.assertNoNextBuffer((ResultSubpartitionView)this.readView);
    }

    @Test
    public void testMultipleEmptyBuffers() throws Exception {
        Assert.assertEquals((long)0L, (long)this.availablityListener.getNumNotifications());
        this.subpartition.add(BufferBuilderTestUtils.createFilledBufferConsumer(0));
        Assert.assertEquals((long)1L, (long)this.availablityListener.getNumNotifications());
        this.subpartition.add(BufferBuilderTestUtils.createFilledBufferConsumer(0));
        Assert.assertEquals((long)2L, (long)this.availablityListener.getNumNotifications());
        this.subpartition.add(BufferBuilderTestUtils.createFilledBufferConsumer(0));
        Assert.assertEquals((long)2L, (long)this.availablityListener.getNumNotifications());
        Assert.assertEquals((long)3L, (long)this.subpartition.getBuffersInBacklog());
        this.subpartition.add(BufferBuilderTestUtils.createFilledBufferConsumer(1024));
        Assert.assertEquals((long)2L, (long)this.availablityListener.getNumNotifications());
        SubpartitionTestBase.assertNextBuffer((ResultSubpartitionView)this.readView, 1024, false, 0, false, true);
    }

    @Test
    public void testEmptyFlush() {
        this.subpartition.flush();
        Assert.assertEquals((long)0L, (long)this.availablityListener.getNumNotifications());
    }

    @Test
    public void testBasicPipelinedProduceConsumeLogic() throws Exception {
        Assert.assertFalse((boolean)this.readView.nextBufferIsEvent());
        SubpartitionTestBase.assertNoNextBuffer((ResultSubpartitionView)this.readView);
        Assert.assertFalse((boolean)this.readView.nextBufferIsEvent());
        Assert.assertEquals((long)0L, (long)this.availablityListener.getNumNotifications());
        this.subpartition.add(BufferBuilderTestUtils.createFilledBufferConsumer(32768));
        Assert.assertFalse((boolean)this.readView.nextBufferIsEvent());
        Assert.assertEquals((long)1L, (long)this.subpartition.getTotalNumberOfBuffers());
        Assert.assertEquals((long)1L, (long)this.subpartition.getBuffersInBacklog());
        Assert.assertEquals((long)0L, (long)this.subpartition.getTotalNumberOfBytes());
        Assert.assertEquals((long)1L, (long)this.availablityListener.getNumNotifications());
        SubpartitionTestBase.assertNextBuffer((ResultSubpartitionView)this.readView, 32768, false, this.subpartition.getBuffersInBacklog() - 1, false, true);
        Assert.assertEquals((long)32768L, (long)this.subpartition.getTotalNumberOfBytes());
        Assert.assertEquals((long)0L, (long)this.subpartition.getBuffersInBacklog());
        SubpartitionTestBase.assertNoNextBuffer((ResultSubpartitionView)this.readView);
        Assert.assertEquals((long)0L, (long)this.subpartition.getBuffersInBacklog());
        this.subpartition.add(BufferBuilderTestUtils.createFilledBufferConsumer(32768));
        Assert.assertFalse((boolean)this.readView.nextBufferIsEvent());
        Assert.assertEquals((long)2L, (long)this.subpartition.getTotalNumberOfBuffers());
        Assert.assertEquals((long)1L, (long)this.subpartition.getBuffersInBacklog());
        Assert.assertEquals((long)32768L, (long)this.subpartition.getTotalNumberOfBytes());
        Assert.assertEquals((long)2L, (long)this.availablityListener.getNumNotifications());
        SubpartitionTestBase.assertNextBuffer((ResultSubpartitionView)this.readView, 32768, false, this.subpartition.getBuffersInBacklog() - 1, false, true);
        Assert.assertEquals((long)65536L, (long)this.subpartition.getTotalNumberOfBytes());
        Assert.assertEquals((long)0L, (long)this.subpartition.getBuffersInBacklog());
        SubpartitionTestBase.assertNoNextBuffer((ResultSubpartitionView)this.readView);
        Assert.assertEquals((long)0L, (long)this.subpartition.getBuffersInBacklog());
        this.subpartition.add(BufferBuilderTestUtils.createFilledBufferConsumer(32768));
        Assert.assertFalse((boolean)this.readView.nextBufferIsEvent());
        this.subpartition.add(BufferBuilderTestUtils.createEventBufferConsumer(32768));
        Assert.assertFalse((boolean)this.readView.nextBufferIsEvent());
        this.subpartition.add(BufferBuilderTestUtils.createFilledBufferConsumer(32768));
        Assert.assertFalse((boolean)this.readView.nextBufferIsEvent());
        Assert.assertEquals((long)5L, (long)this.subpartition.getTotalNumberOfBuffers());
        Assert.assertEquals((long)2L, (long)this.subpartition.getBuffersInBacklog());
        Assert.assertEquals((long)65536L, (long)this.subpartition.getTotalNumberOfBytes());
        Assert.assertEquals((long)4L, (long)this.availablityListener.getNumNotifications());
        SubpartitionTestBase.assertNextBuffer((ResultSubpartitionView)this.readView, 32768, true, this.subpartition.getBuffersInBacklog() - 1, true, true);
        Assert.assertEquals((long)98304L, (long)this.subpartition.getTotalNumberOfBytes());
        Assert.assertEquals((long)1L, (long)this.subpartition.getBuffersInBacklog());
        SubpartitionTestBase.assertNextEvent((ResultSubpartitionView)this.readView, 32768, null, true, this.subpartition.getBuffersInBacklog(), false, true);
        Assert.assertEquals((long)131072L, (long)this.subpartition.getTotalNumberOfBytes());
        Assert.assertEquals((long)1L, (long)this.subpartition.getBuffersInBacklog());
        SubpartitionTestBase.assertNextBuffer((ResultSubpartitionView)this.readView, 32768, false, this.subpartition.getBuffersInBacklog() - 1, false, true);
        Assert.assertEquals((long)163840L, (long)this.subpartition.getTotalNumberOfBytes());
        Assert.assertEquals((long)0L, (long)this.subpartition.getBuffersInBacklog());
        SubpartitionTestBase.assertNoNextBuffer((ResultSubpartitionView)this.readView);
        Assert.assertEquals((long)0L, (long)this.subpartition.getBuffersInBacklog());
        Assert.assertEquals((long)5L, (long)this.subpartition.getTotalNumberOfBuffers());
        Assert.assertEquals((long)163840L, (long)this.subpartition.getTotalNumberOfBytes());
        Assert.assertEquals((long)4L, (long)this.availablityListener.getNumNotifications());
    }
}

