package com.alibaba.jstorm.message.netty;

import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.TaskMessage;
import backtype.storm.utils.DisruptorQueue;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.utils.JStormUtils;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/alibaba/jstorm/message/netty/NettyServer.class */
public class NettyServer implements IConnection {
    private static final Logger LOG = LoggerFactory.getLogger(NettyServer.class);
    Map storm_conf;
    int port;
    DisruptorQueue recvQueue;
    volatile ChannelGroup allChannels = new DefaultChannelGroup("jstorm-server");
    final ChannelFactory factory;
    final ServerBootstrap bootstrap;
    private final boolean isSyncMode;

    /* JADX INFO: Access modifiers changed from: package-private */
    public NettyServer(Map map, int i, boolean z) {
        this.storm_conf = map;
        this.port = i;
        this.isSyncMode = z;
        int intValue = Utils.getInt(map.get("storm.messaging.netty.buffer_size")).intValue();
        int intValue2 = Utils.getInt(map.get("storm.messaging.netty.server_worker_threads")).intValue();
        NettyRenameThreadFactory nettyRenameThreadFactory = new NettyRenameThreadFactory("server-boss");
        NettyRenameThreadFactory nettyRenameThreadFactory2 = new NettyRenameThreadFactory("server-worker");
        if (intValue2 > 0) {
            this.factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(nettyRenameThreadFactory), Executors.newCachedThreadPool(nettyRenameThreadFactory2), intValue2);
        } else {
            this.factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(nettyRenameThreadFactory), Executors.newCachedThreadPool(nettyRenameThreadFactory2));
        }
        this.bootstrap = new ServerBootstrap(this.factory);
        this.bootstrap.setOption("reuserAddress", true);
        this.bootstrap.setOption("child.tcpNoDelay", true);
        this.bootstrap.setOption("child.receiveBufferSize", Integer.valueOf(intValue));
        this.bootstrap.setOption("child.keepAlive", true);
        this.bootstrap.setPipelineFactory(new StormServerPipelineFactory(this));
        this.allChannels.add(this.bootstrap.bind(new InetSocketAddress(i)));
        LOG.info("Successfull bind {}, buffer_size:{}, maxWorkers:{}", new Object[]{Integer.valueOf(i), Integer.valueOf(intValue), Integer.valueOf(intValue2)});
    }

    public void registerQueue(DisruptorQueue disruptorQueue) {
        this.recvQueue = disruptorQueue;
    }

    public void enqueue(TaskMessage taskMessage) {
        this.recvQueue.publish(taskMessage);
    }

    public TaskMessage recv(int i) {
        try {
            return (i & 1) == 1 ? (TaskMessage) this.recvQueue.poll() : (TaskMessage) this.recvQueue.take();
        } catch (Exception e) {
            LOG.warn("Occur unexception ", e);
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addChannel(Channel channel) {
        this.allChannels.add(channel);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void closeChannel(Channel channel) {
        channel.close().awaitUninterruptibly();
        this.allChannels.remove(channel);
    }

    public synchronized void close() {
        LOG.info("Begin to shutdown NettyServer");
        if (this.allChannels != null) {
            new Thread(new Runnable() { // from class: com.alibaba.jstorm.message.netty.NettyServer.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        NettyServer.this.allChannels.close().await(1L, TimeUnit.SECONDS);
                        NettyServer.LOG.info("Successfully close all channel");
                        NettyServer.this.factory.releaseExternalResources();
                    } catch (Exception e) {
                    }
                    NettyServer.this.allChannels = null;
                }
            }).start();
            JStormUtils.sleepMs(1000L);
        }
        LOG.info("Successfully shutdown NettyServer");
    }

    public void send(List<TaskMessage> list) {
        throw new UnsupportedOperationException("Server connection should not send any messages");
    }

    public void send(TaskMessage taskMessage) {
        throw new UnsupportedOperationException("Server connection should not send any messages");
    }

    public boolean isClosed() {
        return false;
    }

    public boolean isSyncMode() {
        return this.isSyncMode;
    }
}
