/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.netty.NettyProtocol;
import org.apache.flink.runtime.io.network.netty.NettyTestUtil;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
import org.junit.Assert;
import org.junit.Test;

public class NettyServerLowAndHighWatermarkTest {
    @Test
    public void testLargeLowAndHighWatermarks() throws Throwable {
        this.testLowAndHighWatermarks(65536);
    }

    @Test
    public void testSmallLowAndHighWatermarks() throws Throwable {
        this.testLowAndHighWatermarks(1024);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testLowAndHighWatermarks(final int pageSize) throws Throwable {
        final int expectedLowWatermark = pageSize + 1;
        final int expectedHighWatermark = 2 * pageSize;
        final AtomicReference error = new AtomicReference();
        NettyProtocol protocol = new NettyProtocol(null, null, true){

            public ChannelHandler[] getServerChannelHandlers() {
                return new ChannelHandler[]{new TestLowAndHighWatermarkHandler(pageSize, expectedLowWatermark, expectedHighWatermark, error)};
            }

            public ChannelHandler[] getClientChannelHandlers() {
                return new ChannelHandler[0];
            }
        };
        NettyConfig conf = NettyTestUtil.createConfig(pageSize);
        NettyTestUtil.NettyServerAndClient serverAndClient = NettyTestUtil.initServerAndClient(protocol, conf);
        try {
            Channel ch = NettyTestUtil.connect(serverAndClient);
            NettyTestUtil.awaitClose(ch);
            Throwable t = (Throwable)error.get();
            if (t != null) {
                throw t;
            }
        }
        finally {
            NettyTestUtil.shutdown(serverAndClient);
        }
    }

    private static ByteBuf buffer(int size) {
        return Unpooled.buffer((int)size).writerIndex(size);
    }

    private static class TestLowAndHighWatermarkHandler
    extends ChannelInboundHandlerAdapter {
        private final int pageSize;
        private final int expectedLowWatermark;
        private final int expectedHighWatermark;
        private final AtomicReference<Throwable> error;
        private boolean hasFlushed;

        public TestLowAndHighWatermarkHandler(int pageSize, int expectedLowWatermark, int expectedHighWatermark, AtomicReference<Throwable> error) {
            this.pageSize = pageSize;
            this.expectedLowWatermark = expectedLowWatermark;
            this.expectedHighWatermark = expectedHighWatermark;
            this.error = error;
        }

        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            Channel ch = ctx.channel();
            Assert.assertEquals((String)"Low watermark", (long)this.expectedLowWatermark, (long)ch.config().getWriteBufferLowWaterMark());
            Assert.assertEquals((String)"High watermark", (long)this.expectedHighWatermark, (long)ch.config().getWriteBufferHighWaterMark());
            Assert.assertTrue((boolean)ch.isWritable());
            ch.write((Object)this.buffer());
            Assert.assertTrue((boolean)ch.isWritable());
            ch.write((Object)this.buffer());
            Assert.assertFalse((boolean)ch.isWritable());
            this.hasFlushed = true;
            ch.flush();
        }

        public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
            if (this.hasFlushed) {
                Assert.assertTrue((boolean)ctx.channel().isWritable());
                ctx.close();
            }
            super.channelWritabilityChanged(ctx);
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            if (this.error.get() == null) {
                this.error.set(cause);
            }
            ctx.close();
            super.exceptionCaught(ctx, cause);
        }

        private ByteBuf buffer() {
            return NettyServerLowAndHighWatermarkTest.buffer(this.pageSize);
        }
    }
}

