package com.alibaba.jstorm.message.netty;

import backtype.storm.messaging.TaskMessage;
import backtype.storm.utils.DisruptorQueue;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.metric.Metrics;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
import com.codahale.metrics.Gauge;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/alibaba/jstorm/message/netty/NettyClientSync.class */
public class NettyClientSync extends NettyClient implements EventHandler {
    private static final Logger LOG = LoggerFactory.getLogger(NettyClientSync.class);
    private ConcurrentLinkedQueue<MessageBatch> batchQueue;
    private DisruptorQueue disruptorQueue;
    private ExecutorService bossExecutor;
    private ExecutorService workerExecutor;
    private AtomicLong emitTs;

    /* JADX INFO: Access modifiers changed from: package-private */
    public NettyClientSync(Map map, ChannelFactory channelFactory, ScheduledExecutorService scheduledExecutorService, String str, int i, ReconnectRunnable reconnectRunnable) {
        super(map, channelFactory, scheduledExecutorService, str, i, reconnectRunnable);
        this.emitTs = new AtomicLong(0L);
        this.batchQueue = new ConcurrentLinkedQueue<>();
        Metrics.register(this.address, "Netty_Client_Sync_BatchQueue", new Gauge<Integer>() { // from class: com.alibaba.jstorm.message.netty.NettyClientSync.1
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public Integer m66getValue() {
                return Integer.valueOf(NettyClientSync.this.batchQueue.size());
            }
        }, (String) null, Metrics.MetricType.WORKER);
        this.disruptorQueue = DisruptorQueue.mkInstance(this.name, ProducerType.MULTI, this.MAX_SEND_PENDING * 8, (WaitStrategy) Utils.newInstance((String) map.get("topology.disruptor.wait.strategy")));
        this.disruptorQueue.consumerStarted();
        Metrics.registerQueue(this.address, "Netty_Client_Sync_DisrQueue", this.disruptorQueue, (String) null, Metrics.MetricType.WORKER);
        scheduledExecutorService.scheduleWithFixedDelay(new Runnable() { // from class: com.alibaba.jstorm.message.netty.NettyClientSync.2
            @Override // java.lang.Runnable
            public void run() {
                NettyClientSync.this.trigger();
            }
        }, 10L, 1L, TimeUnit.SECONDS);
        this.bossExecutor = Executors.newCachedThreadPool(new NettyRenameThreadFactory("Netty-Client-" + JStormServerUtils.getName(str, i) + "-boss"));
        this.workerExecutor = Executors.newCachedThreadPool(new NettyRenameThreadFactory("Netty-Client-" + JStormServerUtils.getName(str, i) + "-worker"));
        this.clientChannelFactory = new NioClientSocketChannelFactory(this.bossExecutor, this.workerExecutor, 1);
        start();
        LOG.info(toString());
    }

    @Override // com.alibaba.jstorm.message.netty.NettyClient
    public void send(List<TaskMessage> list) {
        Iterator<TaskMessage> it = list.iterator();
        while (it.hasNext()) {
            this.disruptorQueue.publish(it.next());
        }
    }

    @Override // com.alibaba.jstorm.message.netty.NettyClient
    public void send(TaskMessage taskMessage) {
        this.disruptorQueue.publish(taskMessage);
    }

    public void flushBatch(MessageBatch messageBatch, Channel channel) {
        this.emitTs.set(System.currentTimeMillis());
        if (messageBatch == null) {
            LOG.warn("Handle no data to {}, this shouldn't occur", this.name);
        } else if (channel != null && channel.isWritable()) {
            flushRequest(channel, messageBatch);
        } else {
            LOG.warn("Channel occur exception, during batch messages {}", this.name);
            this.batchQueue.offer(messageBatch);
        }
    }

    public void sendData() {
        this.sendTimer.start();
        try {
            try {
                MessageBatch poll = this.batchQueue.poll();
                if (poll == null) {
                    this.disruptorQueue.consumeBatchWhenAvailable(this);
                    poll = this.batchQueue.poll();
                }
                flushBatch(poll, this.channelRef.get());
                this.sendTimer.stop();
            } catch (Throwable th) {
                LOG.error("Occur e", th);
                JStormUtils.halt_process(-1, this.name + " nettyclient occur unknow exception");
                this.sendTimer.stop();
            }
        } catch (Throwable th2) {
            this.sendTimer.stop();
            throw th2;
        }
    }

    public void sendAllData() {
        this.sendTimer.start();
        try {
            try {
                this.disruptorQueue.consumeBatch(this);
                MessageBatch poll = this.batchQueue.poll();
                while (poll != null) {
                    Channel channel = this.channelRef.get();
                    if (channel == null) {
                        LOG.info("No channel {} to flush all data", this.name);
                        this.sendTimer.stop();
                        return;
                    } else if (!channel.isWritable()) {
                        LOG.info("Channel {} is no writable", this.name);
                        this.sendTimer.stop();
                        return;
                    } else {
                        flushBatch(poll, channel);
                        poll = this.batchQueue.poll();
                    }
                }
                this.sendTimer.stop();
            } catch (Throwable th) {
                LOG.error("Occur e", th);
                JStormUtils.halt_process(-1, this.name + " nettyclient occur unknow exception");
                this.sendTimer.stop();
            }
        } catch (Throwable th2) {
            this.sendTimer.stop();
            throw th2;
        }
    }

    @Override // com.alibaba.jstorm.message.netty.NettyClient
    public void handleResponse() {
        this.emitTs.set(0L);
        sendData();
    }

    public void onEvent(Object obj, long j, boolean z) throws Exception {
        if (obj == null) {
            return;
        }
        TaskMessage taskMessage = (TaskMessage) obj;
        MessageBatch andSet = this.messageBatchRef.getAndSet(null);
        if (null == andSet) {
            andSet = new MessageBatch(this.messageBatchSize);
        }
        andSet.add(taskMessage);
        if (andSet.isFull()) {
            this.batchQueue.offer(andSet);
        } else if (z) {
            this.batchQueue.offer(andSet);
        } else {
            this.messageBatchRef.set(andSet);
        }
    }

    void trigger() {
        Channel channel;
        if (isClosed()) {
            return;
        }
        long j = this.emitTs.get();
        if (j == 0) {
            return;
        }
        long currentTimeMillis = System.currentTimeMillis() - j;
        if (currentTimeMillis >= this.timeoutSecond * 100 && (channel = this.channelRef.get()) != null) {
            LOG.info("Long time no response of {}, {}s", this.name, Long.valueOf(currentTimeMillis / 1000));
            channel.write(ControlMessage.EOB_MESSAGE);
        }
    }

    protected void shutdownPool() {
        this.bossExecutor.shutdownNow();
        this.workerExecutor.shutdownNow();
        try {
            this.bossExecutor.awaitTermination(1L, TimeUnit.SECONDS);
            this.workerExecutor.awaitTermination(1L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            LOG.error("Error when shutting down client scheduler", e);
        }
        this.clientChannelFactory.releaseExternalResources();
    }

    @Override // com.alibaba.jstorm.message.netty.NettyClient
    public void close() {
        LOG.info("Begin to close connection to {} and flush all data, batchQueue {}, disruptor {}", new Object[]{this.name, Integer.valueOf(this.batchQueue.size()), Long.valueOf(this.disruptorQueue.population())});
        sendAllData();
        this.disruptorQueue.haltWithInterrupt();
        Metrics.unregister(this.address, "Netty_Client_Sync_BatchQueue", (String) null, Metrics.MetricType.WORKER);
        Metrics.unregister(this.address, "Netty_Client_Sync_DisrQueue", (String) null, Metrics.MetricType.WORKER);
        super.close();
        shutdownPool();
    }

    public String toString() {
        return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
    }
}
