package com.alibaba.jstorm.task;

import backtype.storm.messaging.TaskMessage;
import backtype.storm.serialization.KryoTupleSerializer;
import backtype.storm.tuple.TupleExt;
import backtype.storm.utils.DisruptorQueue;
import backtype.storm.utils.Utils;
import backtype.storm.utils.WorkerClassLoader;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.daemon.worker.WorkerData;
import com.alibaba.jstorm.metric.JStormTimer;
import com.alibaba.jstorm.metric.Metrics;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.Map;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/alibaba/jstorm/task/TaskTransfer.class */
public class TaskTransfer {
    private static Logger LOG = Logger.getLogger(TaskTransfer.class);
    private Map storm_conf;
    private DisruptorQueue transferQueue;
    private KryoTupleSerializer serializer;
    private Map<Integer, DisruptorQueue> innerTaskTransfer;
    private DisruptorQueue serializeQueue;
    private final AsyncLoopThread serializeThread;
    private volatile TaskStatus taskStatus;
    private String taskName;
    private JStormTimer timer;

    /* loaded from: input_file:com/alibaba/jstorm/task/TaskTransfer$TransferRunnable.class */
    class TransferRunnable extends RunnableCallback implements EventHandler {
        TransferRunnable() {
        }

        public String getThreadName() {
            return TaskTransfer.this.taskName + "-" + TransferRunnable.class.getSimpleName();
        }

        public void run() {
            WorkerClassLoader.switchThreadContext();
            while (!TaskTransfer.this.taskStatus.isShutdown()) {
                TaskTransfer.this.serializeQueue.consumeBatchWhenAvailable(this);
            }
            WorkerClassLoader.restoreThreadContext();
        }

        public Object getResult() {
            return !TaskTransfer.this.taskStatus.isShutdown() ? 0 : -1;
        }

        public void onEvent(Object obj, long j, boolean z) throws Exception {
            if (obj == null) {
                return;
            }
            TaskTransfer.this.timer.start();
            try {
                TupleExt tupleExt = (TupleExt) obj;
                TaskTransfer.this.transferQueue.publish(new TaskMessage(tupleExt.getTargetTaskId(), TaskTransfer.this.serializer.serialize(tupleExt)));
                TaskTransfer.this.timer.stop();
            } catch (Throwable th) {
                TaskTransfer.this.timer.stop();
                throw th;
            }
        }
    }

    public TaskTransfer(String str, KryoTupleSerializer kryoTupleSerializer, TaskStatus taskStatus, WorkerData workerData) {
        this.taskName = str;
        this.serializer = kryoTupleSerializer;
        this.taskStatus = taskStatus;
        this.storm_conf = workerData.getConf();
        this.transferQueue = workerData.getTransferQueue();
        this.innerTaskTransfer = workerData.getInnerTaskTransfer();
        this.serializeQueue = DisruptorQueue.mkInstance(str, ProducerType.MULTI, Utils.getInt(this.storm_conf.get("topology.executor.send.buffer.size")).intValue(), (WaitStrategy) Utils.newInstance((String) this.storm_conf.get("topology.disruptor.wait.strategy")));
        this.serializeQueue.consumerStarted();
        String substring = str.substring(str.indexOf(":") + 1);
        Metrics.registerQueue(str, "Serialize_Queue", this.serializeQueue, substring, Metrics.MetricType.TASK);
        this.timer = Metrics.registerTimer(str, "Serialize_Time", substring, Metrics.MetricType.TASK);
        this.serializeThread = new AsyncLoopThread(new TransferRunnable());
        LOG.info("Successfully start TaskTransfer thread");
    }

    public void transfer(TupleExt tupleExt) {
        DisruptorQueue disruptorQueue = this.innerTaskTransfer.get(Integer.valueOf(tupleExt.getTargetTaskId()));
        if (disruptorQueue != null) {
            disruptorQueue.publish(tupleExt);
        } else {
            this.serializeQueue.publish(tupleExt);
        }
    }

    public AsyncLoopThread getSerializeThread() {
        return this.serializeThread;
    }
}
