package com.alibaba.jstorm.daemon.worker;

import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.TaskMessage;
import backtype.storm.utils.DisruptorQueue;
import com.alibaba.jstorm.metric.JStormTimer;
import com.alibaba.jstorm.metric.Metrics;
import com.alibaba.jstorm.utils.DisruptorRunable;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/alibaba/jstorm/daemon/worker/VirtualPortDispatch.class */
public class VirtualPortDispatch extends DisruptorRunable {
    private ConcurrentHashMap<Integer, DisruptorQueue> deserializeQueues;
    private IConnection recvConnection;
    private static final Logger LOG = Logger.getLogger(VirtualPortDispatch.class);
    private static JStormTimer timer = Metrics.registerTimer((String) null, "Virtual_Port_Dispatch_Time", (String) null, Metrics.MetricType.WORKER);

    public VirtualPortDispatch(WorkerData workerData, IConnection iConnection, DisruptorQueue disruptorQueue) {
        super(disruptorQueue, timer, VirtualPortDispatch.class.getSimpleName(), workerData.getActive());
        this.recvConnection = iConnection;
        this.deserializeQueues = workerData.getDeserializeQueues();
        Metrics.registerQueue((String) null, "Virtual_Port_Dispatch_Queue", this.queue, (String) null, Metrics.MetricType.WORKER);
    }

    public void cleanup() {
        LOG.info("Begin to shutdown VirtualPortDispatch");
        try {
            this.recvConnection.close();
        } catch (Exception e) {
        }
        this.recvConnection = null;
        LOG.info("Successfully shudown VirtualPortDispatch");
    }

    @Override // com.alibaba.jstorm.utils.DisruptorRunable
    public void handleEvent(Object obj, boolean z) throws Exception {
        TaskMessage taskMessage = (TaskMessage) obj;
        int task = taskMessage.task();
        DisruptorQueue disruptorQueue = this.deserializeQueues.get(Integer.valueOf(task));
        if (disruptorQueue == null) {
            LOG.warn("Received invalid message directed at port " + task + ". Dropping...");
        } else {
            disruptorQueue.publish(taskMessage.message());
        }
    }
}
