package com.alibaba.jstorm.daemon.worker;

import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.TaskMessage;
import backtype.storm.scheduler.WorkerSlot;
import backtype.storm.utils.DisruptorQueue;
import com.alibaba.jstorm.metric.JStormTimer;
import com.alibaba.jstorm.metric.Metrics;
import com.alibaba.jstorm.utils.DisruptorRunable;
import com.alibaba.jstorm.utils.Pair;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/alibaba/jstorm/daemon/worker/BatchTupleRunable.class */
public class BatchTupleRunable extends DisruptorRunable {
    private DisruptorQueue transferQueue;
    private ConcurrentHashMap<WorkerSlot, IConnection> nodeportSocket;
    private ConcurrentHashMap<Integer, WorkerSlot> taskNodeport;
    private Map<IConnection, List<TaskMessage>> dispatchMap;
    private DisruptorQueue sendingQueue;
    private final boolean isDirectSend = true;
    private DisruptorQueue queue;
    private static final Logger LOG = Logger.getLogger(BatchTupleRunable.class);
    private static JStormTimer timer = Metrics.registerTimer((String) null, "Batch_Tuple_Time", (String) null, Metrics.MetricType.WORKER);

    public BatchTupleRunable(WorkerData workerData) {
        super(workerData.getTransferQueue(), timer, BatchTupleRunable.class.getSimpleName(), workerData.getActive());
        this.isDirectSend = true;
        this.sendingQueue = workerData.getSendingQueue();
        this.nodeportSocket = workerData.getNodeportSocket();
        this.taskNodeport = workerData.getTaskNodeport();
        this.dispatchMap = new HashMap();
        this.queue = workerData.getTransferQueue();
        Metrics.registerQueue((String) null, "Batch_Tuple_Queue", this.queue, (String) null, Metrics.MetricType.WORKER);
        this.queue.consumerStarted();
    }

    public void handleOneEvent(TaskMessage taskMessage) {
        int task = taskMessage.task();
        taskMessage.message();
        WorkerSlot workerSlot = this.taskNodeport.get(Integer.valueOf(task));
        if (workerSlot == null) {
            LOG.warn("DrainerRunable warn", new Exception("can`t not found IConnection to " + task));
            return;
        }
        IConnection iConnection = this.nodeportSocket.get(workerSlot);
        if (iConnection == null) {
            LOG.warn("DrainerRunable warn", new Exception("can`t not found nodePort " + workerSlot));
        } else {
            if (iConnection.isClosed()) {
                return;
            }
            iConnection.send(taskMessage);
        }
    }

    public void handleFinish() {
        for (Map.Entry<IConnection, List<TaskMessage>> entry : this.dispatchMap.entrySet()) {
            this.sendingQueue.publish(new Pair(entry.getKey(), entry.getValue()));
        }
        this.dispatchMap.clear();
    }

    @Override // com.alibaba.jstorm.utils.DisruptorRunable
    public void handleEvent(Object obj, boolean z) throws Exception {
        handleOneEvent((TaskMessage) obj);
        if (z) {
        }
    }
}
