package com.alibaba.jstorm.task;

import backtype.storm.messaging.IContext;
import backtype.storm.serialization.KryoTupleSerializer;
import backtype.storm.spout.ISpout;
import backtype.storm.task.IBolt;
import backtype.storm.task.TopologyContext;
import backtype.storm.utils.DisruptorQueue;
import backtype.storm.utils.Utils;
import backtype.storm.utils.WorkerClassLoader;
import clojure.lang.Atom;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.cluster.StormZkClusterState;
import com.alibaba.jstorm.daemon.worker.WorkerData;
import com.alibaba.jstorm.daemon.worker.WorkerHaltRunable;
import com.alibaba.jstorm.stats.CommonStatsRolling;
import com.alibaba.jstorm.task.comm.TaskSendTargets;
import com.alibaba.jstorm.task.comm.UnanchoredSend;
import com.alibaba.jstorm.task.error.ITaskReportErr;
import com.alibaba.jstorm.task.error.TaskReportError;
import com.alibaba.jstorm.task.error.TaskReportErrorAndDie;
import com.alibaba.jstorm.task.execute.BaseExecutors;
import com.alibaba.jstorm.task.execute.BoltExecutors;
import com.alibaba.jstorm.task.execute.spout.MultipleThreadSpoutExecutors;
import com.alibaba.jstorm.task.execute.spout.SingleThreadSpoutExecutors;
import com.alibaba.jstorm.task.execute.spout.SpoutExecutors;
import com.alibaba.jstorm.task.heartbeat.TaskHeartbeatRunable;
import com.alibaba.jstorm.task.heartbeat.TaskStats;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/alibaba/jstorm/task/Task.class */
public class Task {
    private static final Logger LOG = Logger.getLogger(Task.class);
    private Map<Object, Object> stormConf;
    private TopologyContext topologyContext;
    private TopologyContext userContext;
    private String topologyid;
    private IContext context;
    private TaskTransfer taskTransfer;
    private Map<Integer, DisruptorQueue> innerTaskTransfer;
    private Map<Integer, DisruptorQueue> deserializeQueues;
    private WorkerHaltRunable workHalt;
    private Integer taskid;
    private String componentid;
    private StormClusterState zkCluster;
    private Object taskObj;
    private CommonStatsRolling taskStats;
    private WorkerData workerData;
    private String componentType;
    private UptimeComputer uptime = new UptimeComputer();
    private Atom openOrPrepareWasCalled = new Atom(false);
    private volatile TaskStatus taskStatus = new TaskStatus();

    public Task(WorkerData workerData, int i) throws Exception {
        this.workerData = workerData;
        this.topologyContext = workerData.getContextMaker().makeTopologyContext(workerData.getSysTopology(), Integer.valueOf(i), this.openOrPrepareWasCalled);
        this.userContext = workerData.getContextMaker().makeTopologyContext(workerData.getRawTopology(), Integer.valueOf(i), this.openOrPrepareWasCalled);
        this.taskid = Integer.valueOf(i);
        this.componentid = this.topologyContext.getThisComponentId();
        this.taskTransfer = getSendingTransfer(workerData);
        this.innerTaskTransfer = workerData.getInnerTaskTransfer();
        this.deserializeQueues = workerData.getDeserializeQueues();
        this.topologyid = workerData.getTopologyId();
        this.context = workerData.getContext();
        this.workHalt = workerData.getWorkHalt();
        this.zkCluster = new StormZkClusterState(workerData.getZkClusterstate());
        this.stormConf = Common.component_conf(workerData.getStormConf(), this.topologyContext, this.componentid);
        this.taskObj = Common.get_task_object(this.topologyContext.getRawTopology(), this.componentid, WorkerClassLoader.getInstance());
        this.taskStats = new CommonStatsRolling(Integer.valueOf(StormConfig.sampling_rate(this.stormConf).intValue()));
        LOG.info("Loading task " + this.componentid + ":" + this.taskid);
    }

    private void setComponentType() {
        if (this.taskObj instanceof IBolt) {
            this.componentType = "bolt";
        } else if (this.taskObj instanceof ISpout) {
            this.componentType = "spout";
        }
    }

    private TaskSendTargets makeSendTargets() {
        return new TaskSendTargets(this.stormConf, this.topologyContext.getThisComponentId(), Common.outbound_components(this.topologyContext, this.workerData), this.topologyContext, JStormUtils.reverse_map(this.topologyContext.getTaskToComponent()), this.taskStats);
    }

    private TaskTransfer getSendingTransfer(WorkerData workerData) {
        return new TaskTransfer(JStormServerUtils.getName(this.componentid, this.taskid.intValue()), new KryoTupleSerializer(workerData.getStormConf(), this.topologyContext), this.taskStatus, workerData);
    }

    public TaskSendTargets echoToSystemBolt() {
        ArrayList arrayList = new ArrayList();
        arrayList.add("startup");
        TaskSendTargets makeSendTargets = makeSendTargets();
        UnanchoredSend.send(this.topologyContext, makeSendTargets, this.taskTransfer, Common.SYSTEM_STREAM_ID, arrayList);
        return makeSendTargets;
    }

    public boolean isSingleThread(Map map) {
        if (JStormServerUtils.isOnePending(map)) {
            return true;
        }
        return ConfigExtension.isSpoutSingleThread(map);
    }

    public RunnableCallback mk_executors(DisruptorQueue disruptorQueue, TaskSendTargets taskSendTargets, ITaskReportErr iTaskReportErr) {
        if (this.taskObj instanceof IBolt) {
            return new BoltExecutors((IBolt) this.taskObj, this.taskTransfer, this.innerTaskTransfer, this.stormConf, disruptorQueue, taskSendTargets, this.taskStatus, this.topologyContext, this.userContext, this.taskStats, iTaskReportErr);
        }
        if (this.taskObj instanceof ISpout) {
            return isSingleThread(this.stormConf) ? new SingleThreadSpoutExecutors((ISpout) this.taskObj, this.taskTransfer, this.innerTaskTransfer, this.stormConf, disruptorQueue, taskSendTargets, this.taskStatus, this.topologyContext, this.userContext, this.taskStats, iTaskReportErr) : new MultipleThreadSpoutExecutors((ISpout) this.taskObj, this.taskTransfer, this.innerTaskTransfer, this.stormConf, disruptorQueue, taskSendTargets, this.taskStatus, this.topologyContext, this.userContext, this.taskStats, iTaskReportErr);
        }
        return null;
    }

    private RunnableCallback mkExecutor(DisruptorQueue disruptorQueue, TaskSendTargets taskSendTargets) {
        return mk_executors(disruptorQueue, taskSendTargets, new TaskReportErrorAndDie(new TaskReportError(this.zkCluster, this.topologyid, this.taskid.intValue()), this.workHalt));
    }

    public DisruptorQueue registerDisruptorQueue() {
        DisruptorQueue mkInstance = DisruptorQueue.mkInstance("TaskDeserialize", ProducerType.SINGLE, JStormUtils.parseInt(this.stormConf.get("topology.executor.receive.buffer.size"), 256).intValue(), (WaitStrategy) Utils.newInstance((String) this.stormConf.get("topology.disruptor.wait.strategy")));
        this.deserializeQueues.put(this.taskid, mkInstance);
        return mkInstance;
    }

    public TaskShutdownDameon execute() throws Exception {
        setComponentType();
        DisruptorQueue registerDisruptorQueue = registerDisruptorQueue();
        RunnableCallback mkExecutor = mkExecutor(registerDisruptorQueue, echoToSystemBolt());
        AsyncLoopThread asyncLoopThread = new AsyncLoopThread(mkExecutor, false, 10, true);
        ArrayList arrayList = new ArrayList();
        arrayList.add(asyncLoopThread);
        TaskHeartbeatRunable.registerTaskStats(this.taskid.intValue(), new TaskStats(this.componentType, this.taskStats));
        LOG.info("Finished loading task " + this.componentid + ":" + this.taskid);
        return getShutdown(arrayList, registerDisruptorQueue, mkExecutor);
    }

    public TaskShutdownDameon getShutdown(List<AsyncLoopThread> list, DisruptorQueue disruptorQueue, RunnableCallback runnableCallback) {
        AsyncLoopThread ackerRunnableThread;
        if ((runnableCallback instanceof SpoutExecutors) && (ackerRunnableThread = ((SpoutExecutors) runnableCallback).getAckerRunnableThread()) != null) {
            list.add(ackerRunnableThread);
        }
        list.add(((BaseExecutors) runnableCallback).getDeserlizeThread());
        list.add(this.taskTransfer.getSerializeThread());
        return new TaskShutdownDameon(this.taskStatus, this.topologyid, this.taskid, list, this.zkCluster, this.taskObj);
    }

    public static TaskShutdownDameon mk_task(WorkerData workerData, int i) throws Exception {
        return new Task(workerData, i).execute();
    }
}
