package com.alibaba.jstorm.task.execute.spout;

import backtype.storm.spout.ISpout;
import backtype.storm.spout.ISpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.MessageId;
import backtype.storm.tuple.TupleExt;
import backtype.storm.tuple.TupleImplExt;
import backtype.storm.utils.DisruptorQueue;
import com.alibaba.jstorm.metric.JStormTimer;
import com.alibaba.jstorm.metric.Metrics;
import com.alibaba.jstorm.stats.CommonStatsRolling;
import com.alibaba.jstorm.task.TaskTransfer;
import com.alibaba.jstorm.task.comm.TaskSendTargets;
import com.alibaba.jstorm.task.comm.TupleInfo;
import com.alibaba.jstorm.task.comm.UnanchoredSend;
import com.alibaba.jstorm.task.error.ITaskReportErr;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.TimeOutMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/alibaba/jstorm/task/execute/spout/SpoutCollector.class */
public class SpoutCollector implements ISpoutOutputCollector {
    private static Logger LOG = Logger.getLogger(SpoutCollector.class);
    private TaskSendTargets sendTargets;
    private Map storm_conf;
    private TaskTransfer transfer_fn;
    private TimeOutMap<Long, TupleInfo> pending;
    private TopologyContext topology_context;
    private DisruptorQueue disruptorAckerQueue;
    private CommonStatsRolling task_stats;
    private ISpout spout;
    private ITaskReportErr report_error;
    private Integer task_id;
    private Integer ackerNum;
    private boolean isDebug;
    private JStormTimer emitTotalTimer;
    Random random = new Random();

    public SpoutCollector(Integer num, ISpout iSpout, CommonStatsRolling commonStatsRolling, TaskSendTargets taskSendTargets, Map map, TaskTransfer taskTransfer, TimeOutMap<Long, TupleInfo> timeOutMap, TopologyContext topologyContext, DisruptorQueue disruptorQueue, ITaskReportErr iTaskReportErr) {
        this.isDebug = false;
        this.sendTargets = taskSendTargets;
        this.storm_conf = map;
        this.transfer_fn = taskTransfer;
        this.pending = timeOutMap;
        this.topology_context = topologyContext;
        this.disruptorAckerQueue = disruptorQueue;
        this.task_stats = commonStatsRolling;
        this.spout = iSpout;
        this.task_id = num;
        this.report_error = iTaskReportErr;
        this.ackerNum = JStormUtils.parseInt(this.storm_conf.get("topology.acker.executors"));
        this.isDebug = JStormUtils.parseBoolean(this.storm_conf.get("topology.debug"), false);
        this.random.setSeed(System.currentTimeMillis());
        this.emitTotalTimer = Metrics.registerTimer(JStormServerUtils.getName(topologyContext.getThisComponentId(), num.intValue()), "Emit_Time", String.valueOf(num), Metrics.MetricType.TASK);
    }

    public List<Integer> emit(String str, List<Object> list, Object obj) {
        return sendSpoutMsg(str, list, obj, null);
    }

    public void emitDirect(int i, String str, List<Object> list, Object obj) {
        sendSpoutMsg(str, list, obj, Integer.valueOf(i));
    }

    private List<Integer> sendSpoutMsg(String str, List<Object> list, Object obj, Integer num) {
        MessageId makeUnanchored;
        this.emitTotalTimer.start();
        try {
            List<Integer> list2 = num != null ? this.sendTargets.get(num, str, list) : this.sendTargets.get(str, list);
            if (list2.size() == 0) {
                return list2;
            }
            ArrayList arrayList = new ArrayList();
            Boolean valueOf = Boolean.valueOf(obj != null && this.ackerNum.intValue() > 0);
            Long valueOf2 = Long.valueOf(MessageId.generateId(this.random));
            if (valueOf.booleanValue()) {
                while (this.pending.containsKey(valueOf2)) {
                    valueOf2 = Long.valueOf(MessageId.generateId(this.random));
                }
            }
            for (Integer num2 : list2) {
                if (valueOf.booleanValue()) {
                    Long valueOf3 = Long.valueOf(MessageId.generateId(this.random));
                    makeUnanchored = MessageId.makeRootId(valueOf2.longValue(), valueOf3.longValue());
                    arrayList.add(valueOf3);
                } else {
                    makeUnanchored = MessageId.makeUnanchored();
                }
                TupleExt tupleImplExt = new TupleImplExt(this.topology_context, list, this.task_id.intValue(), str, makeUnanchored);
                tupleImplExt.setTargetTaskId(num2.intValue());
                this.transfer_fn.transfer(tupleImplExt);
            }
            if (valueOf.booleanValue()) {
                TupleInfo tupleInfo = new TupleInfo();
                tupleInfo.setStream(str);
                tupleInfo.setValues(list);
                tupleInfo.setMessageId(obj);
                tupleInfo.setTimestamp(System.currentTimeMillis());
                this.pending.putHead(valueOf2, tupleInfo);
                UnanchoredSend.send(this.topology_context, this.sendTargets, this.transfer_fn, "__ack_init", JStormUtils.mk_list(new Object[]{valueOf2, JStormUtils.bit_xor_vals(arrayList), this.task_id}));
            } else if (obj != null) {
                TupleInfo tupleInfo2 = new TupleInfo();
                tupleInfo2.setStream(str);
                tupleInfo2.setValues(list);
                tupleInfo2.setMessageId(obj);
                tupleInfo2.setTimestamp(0L);
                new AckSpoutMsg(this.spout, tupleInfo2, this.task_stats, this.isDebug).run();
            }
            List<Integer> list3 = list2;
            this.emitTotalTimer.stop();
            return list3;
        } finally {
            this.emitTotalTimer.stop();
        }
    }

    public void reportError(Throwable th) {
        this.report_error.report(th);
    }
}
