package com.alibaba.jstorm.utils;

import backtype.storm.utils.DisruptorQueue;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.metric.JStormTimer;
import com.lmax.disruptor.EventHandler;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/alibaba/jstorm/utils/DisruptorRunable.class */
public abstract class DisruptorRunable extends RunnableCallback implements EventHandler {
    private static final Logger LOG = Logger.getLogger(DisruptorRunable.class);
    protected DisruptorQueue queue;
    protected String idStr;
    protected AtomicBoolean active;
    protected JStormTimer timer;

    public DisruptorRunable(DisruptorQueue disruptorQueue, JStormTimer jStormTimer, String str, AtomicBoolean atomicBoolean) {
        this.queue = disruptorQueue;
        this.timer = jStormTimer;
        this.idStr = str;
        this.active = atomicBoolean;
    }

    public abstract void handleEvent(Object obj, boolean z) throws Exception;

    public void onEvent(Object obj, long j, boolean z) throws Exception {
        if (obj == null) {
            return;
        }
        this.timer.start();
        try {
            handleEvent(obj, z);
            this.timer.stop();
        } catch (Throwable th) {
            this.timer.stop();
            throw th;
        }
    }

    public void run() {
        LOG.info("Successfully start thread " + this.idStr);
        this.queue.consumerStarted();
        while (this.active.get()) {
            try {
                this.queue.consumeBatchWhenAvailable(this);
            } catch (Exception e) {
                if (this.active.get()) {
                    LOG.error("DrainerRunable send error", e);
                    throw new RuntimeException(e);
                }
            }
        }
        LOG.info("Successfully exit thread " + this.idStr);
    }

    public Object getResult() {
        return this.active.get() ? 0 : -1;
    }
}
