package com.alibaba.jstorm.task.execute.spout;

import backtype.storm.spout.ISpout;
import backtype.storm.spout.ISpoutOutputCollector;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.DisruptorQueue;
import backtype.storm.utils.WorkerClassLoader;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.daemon.worker.TimeTick;
import com.alibaba.jstorm.metric.JStormTimer;
import com.alibaba.jstorm.metric.Metrics;
import com.alibaba.jstorm.stats.CommonStatsRolling;
import com.alibaba.jstorm.task.TaskStatus;
import com.alibaba.jstorm.task.TaskTransfer;
import com.alibaba.jstorm.task.comm.TaskSendTargets;
import com.alibaba.jstorm.task.comm.TupleInfo;
import com.alibaba.jstorm.task.error.ITaskReportErr;
import com.alibaba.jstorm.task.execute.BaseExecutors;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.RotatingMap;
import com.lmax.disruptor.EventHandler;
import java.util.Map;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/alibaba/jstorm/task/execute/spout/SpoutExecutors.class */
public class SpoutExecutors extends BaseExecutors implements EventHandler {
    private static Logger LOG = Logger.getLogger(SpoutExecutors.class);
    protected final Integer max_spout_pending;
    protected ISpout spout;
    protected RotatingMap<Long, TupleInfo> pending;
    protected ISpoutOutputCollector output_collector;
    protected boolean firstTime;
    protected JStormTimer nextTupleTimer;
    protected JStormTimer ackerTimer;
    protected TimerRatio emptyCpuCounter;
    protected AsyncLoopThread ackerRunnableThread;
    protected boolean isSpoutFullSleep;

    public SpoutExecutors(ISpout iSpout, TaskTransfer taskTransfer, Map<Integer, DisruptorQueue> map, Map map2, DisruptorQueue disruptorQueue, TaskSendTargets taskSendTargets, TaskStatus taskStatus, TopologyContext topologyContext, TopologyContext topologyContext2, CommonStatsRolling commonStatsRolling, ITaskReportErr iTaskReportErr) {
        super(taskTransfer, map2, disruptorQueue, map, topologyContext, topologyContext2, commonStatsRolling, taskStatus, iTaskReportErr);
        this.firstTime = true;
        this.spout = iSpout;
        this.max_spout_pending = JStormUtils.parseInt(this.storm_conf.get("topology.max.spout.pending"));
        this.nextTupleTimer = Metrics.registerTimer(this.idStr, "Execute_Time", String.valueOf(this.taskId), Metrics.MetricType.TASK);
        this.ackerTimer = Metrics.registerTimer(this.idStr, "Acker_Time", String.valueOf(this.taskId), Metrics.MetricType.TASK);
        this.emptyCpuCounter = new TimerRatio();
        Metrics.register(this.idStr, "Empty_Cpu_Ratio", this.emptyCpuCounter, String.valueOf(this.taskId), Metrics.MetricType.TASK);
        TimeTick.registerTimer(this.idStr + "-acker-tick", this.exeQueue);
        this.isSpoutFullSleep = ConfigExtension.isSpoutPendFullSleep(this.storm_conf);
        LOG.info("isSpoutFullSleep:" + this.isSpoutFullSleep);
    }

    public void prepare(TaskSendTargets taskSendTargets, TaskTransfer taskTransfer, TopologyContext topologyContext) {
        this.output_collector = new SpoutCollector(Integer.valueOf(this.taskId), this.spout, this.task_stats, taskSendTargets, this.storm_conf, taskTransfer, this.pending, topologyContext, this.exeQueue, this.report_error);
        try {
            try {
                WorkerClassLoader.switchThreadContext();
                this.spout.open(this.storm_conf, this.userTopologyCtx, new SpoutOutputCollector(this.output_collector));
                WorkerClassLoader.restoreThreadContext();
            } catch (Throwable th) {
                this.error = th;
                LOG.error("spout open error ", th);
                this.report_error.report(th);
                WorkerClassLoader.restoreThreadContext();
            }
            LOG.info("Successfully create SpoutExecutors " + this.idStr);
        } catch (Throwable th2) {
            WorkerClassLoader.restoreThreadContext();
            throw th2;
        }
    }

    public void nextTuple() {
        if (this.firstTime) {
            JStormUtils.sleepMs(ConfigExtension.getSpoutDelayRunSeconds(this.storm_conf) * 1000);
            this.firstTime = false;
            this.emptyCpuCounter.init();
            LOG.info(this.idStr + " is ready ");
        }
        if (!this.taskStatus.isRun()) {
            JStormUtils.sleepMs(1L);
            return;
        }
        if (this.max_spout_pending != null && this.pending.size() >= this.max_spout_pending.intValue()) {
            if (this.isSpoutFullSleep) {
                JStormUtils.sleepMs(1L);
            }
            this.emptyCpuCounter.start();
            return;
        }
        this.emptyCpuCounter.stop();
        this.nextTupleTimer.start();
        try {
            try {
                this.spout.nextTuple();
                this.nextTupleTimer.stop();
            } catch (Throwable th) {
                this.error = th;
                LOG.error("spout execute error ", th);
                this.report_error.report(th);
                this.nextTupleTimer.stop();
            }
        } catch (Throwable th2) {
            this.nextTupleTimer.stop();
            throw th2;
        }
    }

    @Override // com.alibaba.jstorm.task.execute.BaseExecutors
    public void run() {
        throw new RuntimeException("Should implement this function");
    }

    public void onEvent(Object obj, long j, boolean z) throws Exception {
        Runnable runnable;
        this.ackerTimer.start();
        try {
            if (obj == null) {
                this.ackerTimer.stop();
                return;
            }
            try {
                if (obj instanceof Tuple) {
                    Tuple tuple = (Tuple) obj;
                    Object value = tuple.getValue(0);
                    Object remove = this.pending.remove((Long) value);
                    if (remove == null) {
                        if (this.isDebug) {
                            LOG.info("Pending map no entry:" + value);
                        }
                        this.ackerTimer.stop();
                        return;
                    }
                    TupleInfo tupleInfo = (TupleInfo) remove;
                    String sourceStreamId = tuple.getSourceStreamId();
                    if (sourceStreamId.equals("__ack_ack")) {
                        runnable = new AckSpoutMsg(this.spout, tupleInfo, this.task_stats, this.isDebug);
                    } else {
                        if (!sourceStreamId.equals("__ack_fail")) {
                            LOG.warn("Receive one unknow source Tuple " + this.idStr);
                            this.ackerTimer.stop();
                            return;
                        }
                        runnable = new FailSpoutMsg(value, this.spout, tupleInfo, this.task_stats, this.isDebug);
                    }
                    this.task_stats.recv_tuple(tuple.getSourceComponent(), tuple.getSourceStreamId());
                } else {
                    if (obj instanceof TimeTick.Tick) {
                        for (Map.Entry entry : this.pending.rotate().entrySet()) {
                            new FailSpoutMsg(entry.getKey(), this.spout, (TupleInfo) entry.getValue(), this.task_stats, this.isDebug).run();
                        }
                        this.ackerTimer.stop();
                        return;
                    }
                    if (obj instanceof IAckMsg) {
                        runnable = (Runnable) obj;
                    } else {
                        if (!(obj instanceof Runnable)) {
                            LOG.warn("Receive one unknow event " + this.idStr);
                            this.ackerTimer.stop();
                            return;
                        }
                        runnable = (Runnable) obj;
                    }
                }
                runnable.run();
                this.ackerTimer.stop();
            } catch (Throwable th) {
                if (!this.taskStatus.isShutdown()) {
                    LOG.info("Unknow excpetion ", th);
                    this.report_error.report(th);
                }
                this.ackerTimer.stop();
            }
        } catch (Throwable th2) {
            this.ackerTimer.stop();
            throw th2;
        }
    }

    public AsyncLoopThread getAckerRunnableThread() {
        return this.ackerRunnableThread;
    }
}
