package com.alibaba.jstorm.task.execute;

import backtype.storm.task.IBolt;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.DisruptorQueue;
import backtype.storm.utils.WorkerClassLoader;
import com.alibaba.jstorm.daemon.worker.TimeTick;
import com.alibaba.jstorm.metric.JStormTimer;
import com.alibaba.jstorm.metric.Metrics;
import com.alibaba.jstorm.stats.CommonStatsRolling;
import com.alibaba.jstorm.task.TaskStatus;
import com.alibaba.jstorm.task.TaskTransfer;
import com.alibaba.jstorm.task.comm.TaskSendTargets;
import com.alibaba.jstorm.task.error.ITaskReportErr;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.RotatingMap;
import com.alibaba.jstorm.utils.TimeUtils;
import com.lmax.disruptor.EventHandler;
import java.util.Iterator;
import java.util.Map;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/alibaba/jstorm/task/execute/BoltExecutors.class */
public class BoltExecutors extends BaseExecutors implements EventHandler {
    private static Logger LOG = Logger.getLogger(BoltExecutors.class);
    protected IBolt bolt;
    protected RotatingMap<Tuple, Long> tuple_start_times;
    private int ackerNum;
    private OutputCollector outputCollector;
    private JStormTimer boltExeTimer;

    public BoltExecutors(IBolt iBolt, TaskTransfer taskTransfer, Map<Integer, DisruptorQueue> map, Map map2, DisruptorQueue disruptorQueue, TaskSendTargets taskSendTargets, TaskStatus taskStatus, TopologyContext topologyContext, TopologyContext topologyContext2, CommonStatsRolling commonStatsRolling, ITaskReportErr iTaskReportErr) {
        super(taskTransfer, map2, disruptorQueue, map, topologyContext, topologyContext2, commonStatsRolling, taskStatus, iTaskReportErr);
        this.ackerNum = 0;
        this.bolt = iBolt;
        this.tuple_start_times = new RotatingMap<>(3);
        this.ackerNum = JStormUtils.parseInt(map2.get("topology.acker.executors")).intValue();
        this.outputCollector = new OutputCollector(new BoltCollector(this.message_timeout_secs, iTaskReportErr, taskSendTargets, map2, taskTransfer, topologyContext, Integer.valueOf(this.taskId), this.tuple_start_times, commonStatsRolling));
        this.boltExeTimer = Metrics.registerTimer(this.idStr, "Execute_Time", String.valueOf(this.taskId), Metrics.MetricType.TASK);
        TimeTick.registerTimer(this.idStr + "-sampling-tick", this.exeQueue);
        try {
            try {
                WorkerClassLoader.switchThreadContext();
                this.bolt.prepare(map2, topologyContext2, this.outputCollector);
                WorkerClassLoader.restoreThreadContext();
            } catch (Throwable th) {
                this.error = th;
                LOG.error("bolt prepare error ", th);
                this.report_error.report(th);
                WorkerClassLoader.restoreThreadContext();
            }
            LOG.info("Successfully create BoltExecutors " + this.idStr);
        } catch (Throwable th2) {
            WorkerClassLoader.restoreThreadContext();
            throw th2;
        }
    }

    public String getThreadName() {
        return this.idStr + "-" + BoltExecutors.class.getSimpleName();
    }

    @Override // com.alibaba.jstorm.task.execute.BaseExecutors
    public void run() {
        WorkerClassLoader.switchThreadContext();
        while (!this.taskStatus.isShutdown()) {
            try {
                this.exeQueue.consumeBatchWhenAvailable(this);
            } catch (Throwable th) {
                if (!this.taskStatus.isShutdown()) {
                    LOG.error(this.idStr + " bolt exeutor  error", th);
                }
            }
        }
        WorkerClassLoader.restoreThreadContext();
    }

    public void onEvent(Object obj, long j, boolean z) throws Exception {
        Long l;
        if (obj == null) {
            return;
        }
        this.boltExeTimer.start();
        try {
            if (obj instanceof TimeTick.Tick) {
                Map rotate = this.tuple_start_times.rotate();
                if (this.ackerNum > 0) {
                    Iterator it = rotate.entrySet().iterator();
                    while (it.hasNext()) {
                        Tuple tuple = (Tuple) ((Map.Entry) it.next()).getKey();
                        this.task_stats.bolt_failed_tuple(tuple.getSourceComponent(), tuple.getSourceStreamId());
                    }
                }
                return;
            }
            Tuple tuple2 = (Tuple) obj;
            this.task_stats.recv_tuple(tuple2.getSourceComponent(), tuple2.getSourceStreamId());
            this.tuple_start_times.put(tuple2, Long.valueOf(System.currentTimeMillis()));
            try {
                this.bolt.execute(tuple2);
            } catch (Throwable th) {
                this.error = th;
                LOG.error("bolt execute error ", th);
                this.report_error.report(th);
            }
            if (this.ackerNum == 0 && (l = (Long) this.tuple_start_times.remove(tuple2)) != null) {
                this.task_stats.bolt_acked_tuple(tuple2.getSourceComponent(), tuple2.getSourceStreamId(), Long.valueOf(TimeUtils.time_delta_ms(l.longValue())));
            }
            this.boltExeTimer.stop();
        } finally {
            this.boltExeTimer.stop();
        }
    }
}
