package backtype.storm.utils;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.HashMap;
import java.util.concurrent.LinkedBlockingDeque;
import org.apache.log4j.Logger;

/* loaded from: input_file:backtype/storm/utils/DisruptorWrapBlockingQueue.class */
public class DisruptorWrapBlockingQueue extends DisruptorQueue {
    private static final Logger LOG = Logger.getLogger(DisruptorWrapBlockingQueue.class);
    private static final long QUEUE_CAPACITY = 512;
    private LinkedBlockingDeque<Object> queue = new LinkedBlockingDeque<>();
    private String queueName;

    /* loaded from: input_file:backtype/storm/utils/DisruptorWrapBlockingQueue$ObjectEventFactory.class */
    public static class ObjectEventFactory implements EventFactory<MutableObject> {
        /* renamed from: newInstance, reason: merged with bridge method [inline-methods] */
        public MutableObject m526newInstance() {
            return new MutableObject();
        }
    }

    public DisruptorWrapBlockingQueue(String str, ProducerType producerType, int i, WaitStrategy waitStrategy) {
        this.queueName = str;
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public String getName() {
        return this.queueName;
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public void consumeBatch(EventHandler<Object> eventHandler) {
        consumeBatchToCursor(0L, eventHandler);
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public void haltWithInterrupt() {
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public Object poll() {
        return this.queue.poll();
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public Object take() {
        try {
            return this.queue.take();
        } catch (InterruptedException e) {
            return null;
        }
    }

    public void drainQueue(Object obj, EventHandler<Object> eventHandler) {
        while (obj != null) {
            try {
                eventHandler.onEvent(obj, 0L, false);
                obj = this.queue.poll();
            } catch (InterruptedException e) {
                LOG.warn("Occur interrupt error, " + obj);
                return;
            } catch (Exception e2) {
                throw new RuntimeException(e2);
            }
        }
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public void consumeBatchWhenAvailable(EventHandler<Object> eventHandler) {
        Object poll = this.queue.poll();
        if (poll == null) {
            try {
                poll = this.queue.take();
            } catch (InterruptedException e) {
                LOG.warn("Occur interrupt error, " + poll);
            } catch (Exception e2) {
                throw new RuntimeException(e2);
            }
        }
        drainQueue(poll, eventHandler);
    }

    public void consumeBatchToCursor(long j, EventHandler<Object> eventHandler) {
        drainQueue(this.queue.poll(), eventHandler);
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public void publish(Object obj) {
        boolean offer = this.queue.offer(obj);
        while (!offer) {
            try {
                Thread.sleep(1L);
            } catch (InterruptedException e) {
            }
            offer = this.queue.offer(obj);
        }
    }

    public void tryPublish(Object obj) throws InsufficientCapacityException {
        if (!this.queue.offer(obj)) {
            throw InsufficientCapacityException.INSTANCE;
        }
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public void publish(Object obj, boolean z) throws InsufficientCapacityException {
        if (z) {
            publish(obj);
        } else {
            tryPublish(obj);
        }
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public void consumerStarted() {
    }

    private void flushCache() {
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public void clear() {
        this.queue.clear();
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public long population() {
        return this.queue.size();
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public long capacity() {
        long size = this.queue.size();
        return size < QUEUE_CAPACITY ? QUEUE_CAPACITY : size;
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public long writePos() {
        return 0L;
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public long readPos() {
        return this.queue.size();
    }

    @Override // backtype.storm.utils.DisruptorQueue
    public float pctFull() {
        long size = this.queue.size();
        if (size < QUEUE_CAPACITY) {
            return (1.0f * ((float) size)) / 512.0f;
        }
        return 1.0f;
    }

    @Override // backtype.storm.metric.api.IStatefulObject
    public Object getState() {
        HashMap hashMap = new HashMap();
        long readPos = readPos();
        long writePos = writePos();
        hashMap.put("capacity", Long.valueOf(capacity()));
        hashMap.put("population", Long.valueOf(writePos - readPos));
        hashMap.put("write_pos", Long.valueOf(writePos));
        hashMap.put("read_pos", Long.valueOf(readPos));
        return hashMap;
    }
}
