/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.netty.NettyProtocol;
import org.apache.flink.runtime.io.network.netty.NettyTestUtil;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory;
import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundHandlerAdapter;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise;
import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class ClientTransportErrorHandlingTest {
    @Test
    public void testExceptionOnWrite() throws Exception {
        NettyProtocol protocol = new NettyProtocol((ResultPartitionProvider)Mockito.mock(ResultPartitionProvider.class), (TaskEventDispatcher)Mockito.mock(TaskEventDispatcher.class), true){

            public ChannelHandler[] getServerChannelHandlers() {
                return new ChannelHandler[0];
            }
        };
        NettyTestUtil.NettyServerAndClient serverAndClient = NettyTestUtil.initServerAndClient(protocol, NettyTestUtil.createConfig());
        Channel ch = NettyTestUtil.connect(serverAndClient);
        NetworkClientHandler handler = this.getClientHandler(ch);
        ch.pipeline().addFirst(new ChannelHandler[]{new ChannelOutboundHandlerAdapter(){
            int writeNum = 0;

            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                if (this.writeNum >= 1) {
                    throw new RuntimeException("Expected test exception.");
                }
                ++this.writeNum;
                ctx.write(msg, promise);
            }
        }});
        PartitionRequestClient requestClient = new PartitionRequestClient(ch, handler, (ConnectionID)Mockito.mock(ConnectionID.class), (PartitionRequestClientFactory)Mockito.mock(PartitionRequestClientFactory.class));
        RemoteInputChannel[] rich = new RemoteInputChannel[]{this.createRemoteInputChannel(), this.createRemoteInputChannel()};
        final CountDownLatch sync = new CountDownLatch(1);
        ((RemoteInputChannel)Mockito.doAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock invocation) throws Throwable {
                sync.countDown();
                return null;
            }
        }).when((Object)rich[1])).onError((Throwable)Matchers.isA(LocalTransportException.class));
        ChannelFuture f = requestClient.requestSubpartition(new ResultPartitionID(), 0, rich[0], 0);
        Assert.assertTrue((boolean)f.await().isSuccess());
        f = requestClient.requestSubpartition(new ResultPartitionID(), 0, rich[1], 0);
        Assert.assertFalse((boolean)f.await().isSuccess());
        ((RemoteInputChannel)Mockito.verify((Object)rich[0], (VerificationMode)Mockito.times((int)0))).onError((Throwable)Matchers.any(LocalTransportException.class));
        if (!sync.await(TestingUtils.TESTING_DURATION().toMillis(), TimeUnit.MILLISECONDS)) {
            Assert.fail((String)("Timed out after waiting for " + TestingUtils.TESTING_DURATION().toMillis() + " ms to be notified about the channel error."));
        }
        NettyTestUtil.shutdown(serverAndClient);
    }

    @Test
    public void testWrappingOfRemoteErrorMessage() throws Exception {
        RemoteInputChannel[] rich;
        EmbeddedChannel ch = this.createEmbeddedChannel();
        NetworkClientHandler handler = this.getClientHandler((Channel)ch);
        for (RemoteInputChannel r : rich = new RemoteInputChannel[]{this.createRemoteInputChannel(), this.createRemoteInputChannel()}) {
            Mockito.when((Object)r.getInputChannelId()).thenReturn((Object)new InputChannelID());
            handler.addInputChannel(r);
        }
        ch.pipeline().fireChannelRead((Object)new NettyMessage.ErrorResponse((Throwable)new RuntimeException("Expected test exception"), rich[0].getInputChannelId()));
        try {
            ch.checkException();
        }
        catch (Exception e) {
            Assert.fail((String)"The exception reached the end of the pipeline and was not handled correctly by the last handler.");
        }
        ((RemoteInputChannel)Mockito.verify((Object)rich[0], (VerificationMode)Mockito.times((int)1))).onError((Throwable)Matchers.isA(RemoteTransportException.class));
        ((RemoteInputChannel)Mockito.verify((Object)rich[1], (VerificationMode)Mockito.never())).onError((Throwable)Matchers.any(Throwable.class));
        ch.pipeline().fireChannelRead((Object)new NettyMessage.ErrorResponse((Throwable)new RuntimeException("Expected test exception")));
        try {
            ch.checkException();
        }
        catch (Exception e) {
            Assert.fail((String)"The exception reached the end of the pipeline and was not handled correctly by the last handler.");
        }
        ((RemoteInputChannel)Mockito.verify((Object)rich[0], (VerificationMode)Mockito.times((int)2))).onError((Throwable)Matchers.isA(RemoteTransportException.class));
        ((RemoteInputChannel)Mockito.verify((Object)rich[1], (VerificationMode)Mockito.times((int)1))).onError((Throwable)Matchers.isA(RemoteTransportException.class));
    }

    @Test
    public void testExceptionOnRemoteClose() throws Exception {
        NettyProtocol protocol = new NettyProtocol((ResultPartitionProvider)Mockito.mock(ResultPartitionProvider.class), (TaskEventDispatcher)Mockito.mock(TaskEventDispatcher.class), true){

            public ChannelHandler[] getServerChannelHandlers() {
                return new ChannelHandler[]{new ChannelInboundHandlerAdapter(){

                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                        ctx.channel().close();
                    }
                }};
            }
        };
        NettyTestUtil.NettyServerAndClient serverAndClient = NettyTestUtil.initServerAndClient(protocol, NettyTestUtil.createConfig());
        Channel ch = NettyTestUtil.connect(serverAndClient);
        NetworkClientHandler handler = this.getClientHandler(ch);
        RemoteInputChannel[] rich = new RemoteInputChannel[]{this.createRemoteInputChannel(), this.createRemoteInputChannel()};
        final CountDownLatch sync = new CountDownLatch(rich.length);
        Answer<Void> countDownLatch = new Answer<Void>(){

            public Void answer(InvocationOnMock invocation) throws Throwable {
                sync.countDown();
                return null;
            }
        };
        for (RemoteInputChannel r : rich) {
            ((RemoteInputChannel)Mockito.doAnswer((Answer)countDownLatch).when((Object)r)).onError((Throwable)Matchers.any(Throwable.class));
            handler.addInputChannel(r);
        }
        ch.writeAndFlush((Object)Unpooled.buffer().writerIndex(16));
        if (!sync.await(TestingUtils.TESTING_DURATION().toMillis(), TimeUnit.MILLISECONDS)) {
            Assert.fail((String)("Timed out after waiting for " + TestingUtils.TESTING_DURATION().toMillis() + " ms to be notified about remote connection close."));
        }
        for (RemoteInputChannel r : rich) {
            ((RemoteInputChannel)Mockito.verify((Object)r)).onError((Throwable)Matchers.isA(RemoteTransportException.class));
        }
        NettyTestUtil.shutdown(serverAndClient);
    }

    @Test
    public void testExceptionCaught() throws Exception {
        RemoteInputChannel[] rich;
        EmbeddedChannel ch = this.createEmbeddedChannel();
        NetworkClientHandler handler = this.getClientHandler((Channel)ch);
        for (RemoteInputChannel r : rich = new RemoteInputChannel[]{this.createRemoteInputChannel(), this.createRemoteInputChannel()}) {
            Mockito.when((Object)r.getInputChannelId()).thenReturn((Object)new InputChannelID());
            handler.addInputChannel(r);
        }
        ch.pipeline().fireExceptionCaught((Throwable)new Exception());
        try {
            ch.checkException();
        }
        catch (Exception e) {
            Assert.fail((String)"The exception reached the end of the pipeline and was not handled correctly by the last handler.");
        }
        for (RemoteInputChannel r : rich) {
            ((RemoteInputChannel)Mockito.verify((Object)r)).onError((Throwable)Matchers.isA(LocalTransportException.class));
        }
    }

    @Test
    public void testConnectionResetByPeer() throws Throwable {
        EmbeddedChannel ch = this.createEmbeddedChannel();
        NetworkClientHandler handler = this.getClientHandler((Channel)ch);
        RemoteInputChannel rich = this.addInputChannel(handler);
        final Throwable[] error = new Throwable[1];
        ((RemoteInputChannel)Mockito.doAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock invocation) throws Throwable {
                Throwable cause = (Throwable)invocation.getArguments()[0];
                try {
                    Assert.assertEquals(RemoteTransportException.class, cause.getClass());
                    Assert.assertNotEquals((Object)"Connection reset by peer", (Object)cause.getMessage());
                    Assert.assertEquals(IOException.class, cause.getCause().getClass());
                    Assert.assertEquals((Object)"Connection reset by peer", (Object)cause.getCause().getMessage());
                }
                catch (Throwable t) {
                    error[0] = t;
                }
                return null;
            }
        }).when((Object)rich)).onError((Throwable)Matchers.any(Throwable.class));
        ch.pipeline().fireExceptionCaught((Throwable)new IOException("Connection reset by peer"));
        Assert.assertNull((Object)error[0]);
    }

    @Test
    public void testChannelClosedOnExceptionDuringErrorNotification() throws Exception {
        EmbeddedChannel ch = this.createEmbeddedChannel();
        NetworkClientHandler handler = this.getClientHandler((Channel)ch);
        RemoteInputChannel rich = this.addInputChannel(handler);
        ((RemoteInputChannel)Mockito.doThrow((Throwable[])new Throwable[]{new RuntimeException("Expected test exception")}).when((Object)rich)).onError((Throwable)Matchers.any(Throwable.class));
        ch.pipeline().fireExceptionCaught((Throwable)new Exception());
        Assert.assertFalse((boolean)ch.isActive());
    }

    private EmbeddedChannel createEmbeddedChannel() {
        NettyProtocol protocol = new NettyProtocol((ResultPartitionProvider)Mockito.mock(ResultPartitionProvider.class), (TaskEventDispatcher)Mockito.mock(TaskEventDispatcher.class), true);
        return new EmbeddedChannel(protocol.getClientChannelHandlers());
    }

    private RemoteInputChannel addInputChannel(NetworkClientHandler clientHandler) throws IOException {
        RemoteInputChannel rich = this.createRemoteInputChannel();
        clientHandler.addInputChannel(rich);
        return rich;
    }

    private NetworkClientHandler getClientHandler(Channel ch) {
        return (NetworkClientHandler)ch.pipeline().get(NetworkClientHandler.class);
    }

    private RemoteInputChannel createRemoteInputChannel() {
        return (RemoteInputChannel)Mockito.when((Object)((RemoteInputChannel)Mockito.mock(RemoteInputChannel.class)).getInputChannelId()).thenReturn((Object)new InputChannelID()).getMock();
    }
}

