package com.alibaba.jstorm.message.netty;

import backtype.storm.messaging.TaskMessage;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.utils.IntervalCheck;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/jstorm/message/netty/NettyClientAsync.class */
class NettyClientAsync extends NettyClient {
    private static final Logger LOG = LoggerFactory.getLogger(NettyClientAsync.class);
    public static final String PREFIX = "Netty-Client-";
    protected long BATCH_THREASHOLD_WARN;
    protected final boolean directlySend;
    protected AtomicBoolean flush_later;
    protected int flushCheckInterval;
    protected final boolean blockSend;

    boolean isDirectSend(Map map) {
        return JStormServerUtils.isOnePending(map) || !ConfigExtension.isNettyTransferAsyncBatch(map);
    }

    boolean isBlockSend(Map map) {
        if (ConfigExtension.isTopologyContainAcker(map)) {
            return ConfigExtension.isNettyASyncBlock(map);
        }
        return false;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public NettyClientAsync(Map map, ChannelFactory channelFactory, ScheduledExecutorService scheduledExecutorService, String str, int i, ReconnectRunnable reconnectRunnable) {
        super(map, channelFactory, scheduledExecutorService, str, i, reconnectRunnable);
        this.BATCH_THREASHOLD_WARN = ConfigExtension.getNettyBufferThresholdSize(map);
        this.blockSend = isBlockSend(map);
        this.directlySend = isDirectSend(map);
        this.flush_later = new AtomicBoolean(false);
        this.flushCheckInterval = Utils.getInt(map.get("storm.messaging.netty.flush.check.interval.ms"), 10).intValue();
        scheduledExecutorService.scheduleWithFixedDelay(new Runnable() { // from class: com.alibaba.jstorm.message.netty.NettyClientAsync.1
            @Override // java.lang.Runnable
            public void run() {
                NettyClientAsync.this.flush();
            }
        }, Math.min(1000, this.max_sleep_ms * this.max_retries), this.flushCheckInterval, TimeUnit.MILLISECONDS);
        this.clientChannelFactory = channelFactory;
        start();
        LOG.info(toString());
    }

    @Override // com.alibaba.jstorm.message.netty.NettyClient
    public void send(List<TaskMessage> list) {
        if (isClosed()) {
            LOG.warn("Client is being closed, and does not take requests any more");
            return;
        }
        this.sendTimer.start();
        try {
            try {
                pushBatch(list);
                this.sendTimer.stop();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            this.sendTimer.stop();
            throw th;
        }
    }

    @Override // com.alibaba.jstorm.message.netty.NettyClient
    public void send(TaskMessage taskMessage) {
        if (isClosed()) {
            LOG.warn("Client is being closed, and does not take requests any more");
            return;
        }
        this.sendTimer.start();
        try {
            try {
                pushBatch(taskMessage);
                this.sendTimer.stop();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            this.sendTimer.stop();
            throw th;
        }
    }

    void waitChannelReady(long j, long j2) {
        long currentTimeMillis = System.currentTimeMillis();
        boolean z = false;
        IntervalCheck intervalCheck = new IntervalCheck();
        IntervalCheck intervalCheck2 = new IntervalCheck();
        intervalCheck2.setInterval(this.timeoutSecond);
        while (isChannelReady() == null) {
            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
            if (intervalCheck.check()) {
                LOG.warn("Target server  {} is unavailable, pending {}, bufferSize {}, block sending {}ms", new Object[]{this.name, Long.valueOf(this.pendings.get()), Long.valueOf(j), Long.valueOf(currentTimeMillis2)});
            }
            if (intervalCheck2.check()) {
                if (this.messageBatchRef.get() != null) {
                    LOG.warn("Target server  {} is unavailable, wait too much time, throw timeout message", this.name);
                    this.messageBatchRef.set(null);
                }
                setChannel(null);
                LOG.warn("Reset channel as null");
            }
            reconnect();
            JStormUtils.sleepMs(j2);
            if (currentTimeMillis2 > 2 * this.timeoutSecond * 1000 && !z && this.channelRef.get() != null && this.BATCH_THREASHOLD_WARN >= 2 * this.messageBatchSize) {
                this.BATCH_THREASHOLD_WARN /= 2;
                LOG.info("Reduce BATCH_THREASHOLD_WARN to {}", Long.valueOf(this.BATCH_THREASHOLD_WARN));
                z = true;
            }
            if (isClosed()) {
                LOG.info("Channel has been closed " + name());
                return;
            }
        }
    }

    long getDelaySec(long j) {
        long pow = (long) (Math.pow(2.0d, j / this.BATCH_THREASHOLD_WARN) * 10.0d);
        if (pow > 1000) {
            pow = 1000;
        }
        return pow;
    }

    void handleFailedChannel(MessageBatch messageBatch) {
        this.messageBatchRef.set(messageBatch);
        this.flush_later.set(true);
        long encoded_length = messageBatch.getEncoded_length();
        if (encoded_length > this.BATCH_THREASHOLD_WARN) {
            long delaySec = getDelaySec(encoded_length);
            if (this.blockSend) {
                waitChannelReady(encoded_length, delaySec);
                return;
            }
            LOG.warn("Target server  {} is unavailable, pending {}, bufferSize {}, block sending {}ms", new Object[]{this.name, Long.valueOf(this.pendings.get()), Long.valueOf(encoded_length), Long.valueOf(delaySec)});
            JStormUtils.sleepMs(delaySec);
            reconnect();
        }
    }

    void pushBatch(List<TaskMessage> list) {
        Channel isChannelReady;
        if (list.isEmpty()) {
            return;
        }
        MessageBatch andSet = this.messageBatchRef.getAndSet(null);
        if (null == andSet) {
            andSet = new MessageBatch(this.messageBatchSize);
        }
        for (TaskMessage taskMessage : list) {
            if (!TaskMessage.isEmpty(taskMessage)) {
                andSet.add(taskMessage);
                if (andSet.isFull() && (isChannelReady = isChannelReady()) != null) {
                    flushRequest(isChannelReady, andSet);
                    andSet = new MessageBatch(this.messageBatchSize);
                }
            }
        }
        Channel isChannelReady2 = isChannelReady();
        if (isChannelReady2 == null) {
            handleFailedChannel(andSet);
        } else {
            if (andSet.isEmpty()) {
                return;
            }
            flushRequest(isChannelReady2, andSet);
        }
    }

    void pushBatch(TaskMessage taskMessage) {
        if (TaskMessage.isEmpty(taskMessage)) {
            return;
        }
        MessageBatch andSet = this.messageBatchRef.getAndSet(null);
        if (null == andSet) {
            andSet = new MessageBatch(this.messageBatchSize);
        }
        andSet.add(taskMessage);
        Channel isChannelReady = isChannelReady();
        if (isChannelReady == null) {
            handleFailedChannel(andSet);
            return;
        }
        if (andSet.isFull()) {
            flushRequest(isChannelReady, andSet);
            return;
        }
        if (this.directlySend) {
            flushRequest(isChannelReady, andSet);
        } else if (this.messageBatchRef.compareAndSet(null, andSet)) {
            this.flush_later.set(true);
        } else {
            LOG.error("MessageBatch will be lost. This should not happen.");
        }
    }

    void flush() {
        Channel isChannelReady;
        if (isClosed() || !this.flush_later.get() || (isChannelReady = isChannelReady()) == null) {
            return;
        }
        this.flush_later.set(false);
        flushRequest(isChannelReady, this.messageBatchRef.getAndSet(null));
    }

    Channel isChannelReady() {
        Channel channel = this.channelRef.get();
        if (channel == null || !channel.isWritable()) {
            return null;
        }
        if (!this.blockSend || this.pendings.get() < this.MAX_SEND_PENDING) {
            return channel;
        }
        return null;
    }

    @Override // com.alibaba.jstorm.message.netty.NettyClient
    public void handleResponse() {
    }

    public String toString() {
        return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
    }
}
