package com.alibaba.jstorm.task.execute;

import backtype.storm.serialization.KryoTupleDeserializer;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.DisruptorQueue;
import backtype.storm.utils.Utils;
import backtype.storm.utils.WorkerClassLoader;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.metric.JStormTimer;
import com.alibaba.jstorm.metric.Metrics;
import com.alibaba.jstorm.stats.CommonStatsRolling;
import com.alibaba.jstorm.task.TaskStatus;
import com.alibaba.jstorm.task.TaskTransfer;
import com.alibaba.jstorm.task.error.ITaskReportErr;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.Map;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/alibaba/jstorm/task/execute/BaseExecutors.class */
public class BaseExecutors extends RunnableCallback {
    private static Logger LOG = Logger.getLogger(BaseExecutors.class);
    protected final String component_id;
    protected final int taskId;
    protected final boolean isDebugRecv;
    protected final boolean isDebug;
    protected final String idStr;
    protected Map storm_conf;
    protected DisruptorQueue deserializeQueue;
    protected KryoTupleDeserializer deserializer;
    protected AsyncLoopThread deserializeThread;
    protected JStormTimer deserializeTimer;
    protected TopologyContext userTopologyCtx;
    protected CommonStatsRolling task_stats;
    protected volatile TaskStatus taskStatus;
    protected int message_timeout_secs;
    protected Throwable error = null;
    protected ITaskReportErr report_error;
    protected DisruptorQueue exeQueue;
    protected Map<Integer, DisruptorQueue> innerTaskTransfer;

    /* loaded from: input_file:com/alibaba/jstorm/task/execute/BaseExecutors$DeserializeRunnable.class */
    class DeserializeRunnable extends RunnableCallback implements EventHandler {
        DisruptorQueue deserializeQueue;
        DisruptorQueue exeQueue;

        DeserializeRunnable(DisruptorQueue disruptorQueue, DisruptorQueue disruptorQueue2) {
            this.deserializeQueue = disruptorQueue;
            this.exeQueue = disruptorQueue2;
        }

        public String getThreadName() {
            return BaseExecutors.this.idStr + "-deserializer";
        }

        protected Tuple deserialize(byte[] bArr) {
            BaseExecutors.this.deserializeTimer.start();
            try {
                if (bArr == null) {
                    BaseExecutors.this.deserializeTimer.stop();
                    return null;
                }
                try {
                    if (bArr.length == 0) {
                        BaseExecutors.this.deserializeTimer.stop();
                        return null;
                    }
                    if (bArr.length == 1) {
                        byte b = bArr[0];
                        BaseExecutors.LOG.info("Change task status as " + ((int) b));
                        BaseExecutors.this.taskStatus.setStatus(b);
                        BaseExecutors.this.deserializeTimer.stop();
                        return null;
                    }
                    Tuple deserialize = BaseExecutors.this.deserializer.deserialize(bArr);
                    if (BaseExecutors.this.isDebugRecv) {
                        BaseExecutors.LOG.info(BaseExecutors.this.idStr + " receive " + deserialize.toString());
                    }
                    BaseExecutors.this.deserializeTimer.stop();
                    return deserialize;
                } catch (Throwable th) {
                    if (!BaseExecutors.this.taskStatus.isShutdown()) {
                        BaseExecutors.LOG.error(BaseExecutors.this.idStr + " recv thread error " + JStormUtils.toPrintableString(bArr) + "\n", th);
                    }
                    BaseExecutors.this.deserializeTimer.stop();
                    return null;
                }
            } catch (Throwable th2) {
                BaseExecutors.this.deserializeTimer.stop();
                throw th2;
            }
        }

        public void onEvent(Object obj, long j, boolean z) throws Exception {
            Tuple deserialize = deserialize((byte[]) obj);
            if (deserialize != null) {
                this.exeQueue.publish(deserialize);
            }
        }

        public void run() {
            WorkerClassLoader.switchThreadContext();
            BaseExecutors.LOG.info("Successfully start recvThread of " + BaseExecutors.this.idStr);
            while (!BaseExecutors.this.taskStatus.isShutdown()) {
                try {
                    this.deserializeQueue.consumeBatchWhenAvailable(this);
                } catch (Throwable th) {
                    if (!BaseExecutors.this.taskStatus.isShutdown()) {
                        BaseExecutors.LOG.error("Unknow exception ", th);
                        BaseExecutors.this.report_error.report(th);
                    }
                }
            }
            WorkerClassLoader.restoreThreadContext();
            BaseExecutors.LOG.info("Successfully shutdown recvThread of " + BaseExecutors.this.idStr);
        }

        public Object getResult() {
            BaseExecutors.LOG.info("Begin to shutdown recvThread of " + BaseExecutors.this.idStr);
            return -1;
        }
    }

    public BaseExecutors(TaskTransfer taskTransfer, Map map, DisruptorQueue disruptorQueue, Map<Integer, DisruptorQueue> map2, TopologyContext topologyContext, TopologyContext topologyContext2, CommonStatsRolling commonStatsRolling, TaskStatus taskStatus, ITaskReportErr iTaskReportErr) {
        this.message_timeout_secs = 30;
        this.storm_conf = map;
        this.deserializeQueue = disruptorQueue;
        this.userTopologyCtx = topologyContext2;
        this.task_stats = commonStatsRolling;
        this.taskId = topologyContext.getThisTaskId();
        this.innerTaskTransfer = map2;
        this.component_id = topologyContext.getThisComponentId();
        this.idStr = JStormServerUtils.getName(this.component_id, this.taskId);
        this.taskStatus = taskStatus;
        this.report_error = iTaskReportErr;
        this.deserializer = new KryoTupleDeserializer(this.storm_conf, topologyContext);
        this.isDebugRecv = ConfigExtension.isTopologyDebugRecvTuple(this.storm_conf).booleanValue();
        this.isDebug = JStormUtils.parseBoolean(this.storm_conf.get("topology.debug"), false);
        this.message_timeout_secs = JStormUtils.parseInt(this.storm_conf.get("topology.message.timeout.secs"), 30).intValue();
        this.exeQueue = DisruptorQueue.mkInstance(this.idStr, ProducerType.MULTI, Utils.getInt(this.storm_conf.get("topology.executor.receive.buffer.size"), 256).intValue(), (WaitStrategy) Utils.newInstance((String) this.storm_conf.get("topology.disruptor.wait.strategy")));
        this.exeQueue.consumerStarted();
        registerInnerTransfer(this.exeQueue);
        this.deserializeThread = new AsyncLoopThread(new DeserializeRunnable(disruptorQueue, this.exeQueue));
        this.deserializeTimer = Metrics.registerTimer(this.idStr, "Deserialize_Time", String.valueOf(this.taskId), Metrics.MetricType.TASK);
        Metrics.registerQueue(this.idStr, "Deserialize_Queue", disruptorQueue, String.valueOf(this.taskId), Metrics.MetricType.TASK);
        Metrics.registerQueue(this.idStr, "Executor_Queue", this.exeQueue, String.valueOf(this.taskId), Metrics.MetricType.TASK);
    }

    public void run() {
        throw new RuntimeException("Should implement this function");
    }

    public Object getResult() {
        if (!this.taskStatus.isRun() && !this.taskStatus.isPause()) {
            if (this.taskStatus.isShutdown()) {
                shutdown();
                return -1;
            }
            LOG.info("Unknow TaskStatus, shutdown executing thread of " + this.idStr);
            shutdown();
            return -1;
        }
        return 0;
    }

    public Exception error() {
        if (this.error == null) {
            return null;
        }
        return new Exception(this.error);
    }

    public void shutdown() {
        LOG.info("Shutdown executing thread of " + this.idStr);
        if (!this.taskStatus.isShutdown()) {
            LOG.info("Taskstatus isn't shutdown, but enter shutdown method, Occur exception");
        }
        unregistorInnerTransfer();
    }

    protected void registerInnerTransfer(DisruptorQueue disruptorQueue) {
        LOG.info("Registor inner transfer for executor thread of " + this.idStr);
        DisruptorQueue disruptorQueue2 = this.innerTaskTransfer.get(Integer.valueOf(this.taskId));
        if (disruptorQueue2 != null) {
            LOG.info("Exist inner task transfer for executing thread of " + this.idStr);
            if (disruptorQueue2 != disruptorQueue) {
                throw new RuntimeException("Inner task transfer must be only one in executing thread of " + this.idStr);
            }
        }
        this.innerTaskTransfer.put(Integer.valueOf(this.taskId), disruptorQueue);
    }

    protected void unregistorInnerTransfer() {
        LOG.info("Unregistor inner transfer for executor thread of " + this.idStr);
        this.innerTaskTransfer.remove(Integer.valueOf(this.taskId));
    }

    public AsyncLoopThread getDeserlizeThread() {
        return this.deserializeThread;
    }
}
