/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
import org.apache.flink.runtime.io.network.netty.NettyClient;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.netty.NettyProtocol;
import org.apache.flink.runtime.io.network.netty.NettyServer;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundHandlerAdapter;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise;
import org.apache.flink.util.NetUtils;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

@Ignore
public class PartitionRequestClientFactoryTest {
    private static final int SERVER_PORT = NetUtils.getAvailablePort();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testResourceReleaseAfterInterruptedConnect() throws Exception {
        final CountDownLatch syncOnConnect = new CountDownLatch(1);
        Tuple2<NettyServer, NettyClient> netty = PartitionRequestClientFactoryTest.createNettyServerAndClient(new NettyProtocol(null, null, true){

            public ChannelHandler[] getServerChannelHandlers() {
                return new ChannelHandler[0];
            }

            public ChannelHandler[] getClientChannelHandlers() {
                return new ChannelHandler[]{new CountDownLatchOnConnectHandler(syncOnConnect)};
            }
        });
        NettyServer server = (NettyServer)netty.f0;
        NettyClient client = (NettyClient)netty.f1;
        UncaughtTestExceptionHandler exceptionHandler = new UncaughtTestExceptionHandler();
        try {
            final PartitionRequestClientFactory factory = new PartitionRequestClientFactory(client);
            Thread connect = new Thread(new Runnable(){

                @Override
                public void run() {
                    ConnectionID serverAddress = null;
                    try {
                        serverAddress = PartitionRequestClientFactoryTest.createServerConnectionID(0);
                        factory.createPartitionRequestClient(serverAddress);
                    }
                    catch (Throwable t) {
                        if (serverAddress != null) {
                            factory.closeOpenChannelConnections(serverAddress);
                            Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), t);
                        }
                        t.printStackTrace();
                        Assert.fail((String)"Could not create RemoteAddress for server.");
                    }
                }
            });
            connect.setUncaughtExceptionHandler(exceptionHandler);
            connect.start();
            syncOnConnect.await();
            connect.interrupt();
            connect.join();
            Assert.assertEquals((long)0L, (long)factory.getNumberOfActiveClients());
            Assert.assertTrue((exceptionHandler.getErrors().size() > 0 ? 1 : 0) != 0);
        }
        finally {
            if (server != null) {
                server.shutdown();
            }
            if (client != null) {
                client.shutdown();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static Tuple2<NettyServer, NettyClient> createNettyServerAndClient(NettyProtocol protocol) throws IOException {
        NettyConfig config = new NettyConfig(InetAddress.getLocalHost(), SERVER_PORT, 32768, 1, new Configuration());
        NettyServer server = new NettyServer(config);
        NettyClient client = new NettyClient(config);
        boolean success = false;
        try {
            NettyBufferPool bufferPool = new NettyBufferPool(1);
            server.init(protocol, bufferPool);
            client.init(protocol, bufferPool);
            success = true;
        }
        finally {
            if (!success) {
                server.shutdown();
                client.shutdown();
            }
        }
        return new Tuple2((Object)server, (Object)client);
    }

    private static ConnectionID createServerConnectionID(int connectionIndex) throws UnknownHostException {
        return new ConnectionID(new InetSocketAddress(InetAddress.getLocalHost(), SERVER_PORT), connectionIndex);
    }

    private static class UncaughtTestExceptionHandler
    implements Thread.UncaughtExceptionHandler {
        private final List<Throwable> errors = new ArrayList<Throwable>(1);

        private UncaughtTestExceptionHandler() {
        }

        @Override
        public void uncaughtException(Thread t, Throwable e) {
            this.errors.add(e);
        }

        private List<Throwable> getErrors() {
            return this.errors;
        }
    }

    private static class CountDownLatchOnConnectHandler
    extends ChannelOutboundHandlerAdapter {
        private final CountDownLatch syncOnConnect;

        public CountDownLatchOnConnectHandler(CountDownLatch syncOnConnect) {
            this.syncOnConnect = syncOnConnect;
        }

        public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
            this.syncOnConnect.countDown();
        }
    }
}

