package com.xdja.kafka.consumer;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.errors.WakeupException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.util.Assert;

/* loaded from: input_file:com/xdja/kafka/consumer/KafkaConsumerListenerContainer.class */
public class KafkaConsumerListenerContainer<K, V> implements MessageListenerContainer<K, V>, InitializingBean, DisposableBean {
    private final ConsumerFactory<K, V> consumerFactory;
    private ErrorHandler<K, V> errorHandler;
    private AsyncListenableTaskExecutor consumerTaskExecutor;
    private List<KafkaConsumerListener<K, V>> consumerListeners;
    protected final Log logger = LogFactory.getLog(getClass());
    private volatile boolean running = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/xdja/kafka/consumer/KafkaConsumerListenerContainer$ListenerConsumer.class */
    public final class ListenerConsumer implements SchedulingAwareRunnable {
        private final Consumer<K, V> consumer;
        private final KafkaConsumerListener<K, V> listener;
        private final ErrorHandler<K, V> errorHandler;
        protected final Log logger;

        private ListenerConsumer(KafkaConsumerListener<K, V> kafkaConsumerListener) {
            this.logger = LogFactory.getLog(getClass());
            Consumer<K, V> createConsumer = KafkaConsumerListenerContainer.this.consumerFactory.createConsumer(kafkaConsumerListener.getConfigs());
            createConsumer.subscribe(Arrays.asList(kafkaConsumerListener.getTopic()));
            this.errorHandler = KafkaConsumerListenerContainer.this.errorHandler;
            this.listener = kafkaConsumerListener;
            this.consumer = createConsumer;
        }

        public void run() {
            KafkaConsumerListenerContainer.this.setRunning(true);
            while (KafkaConsumerListenerContainer.this.isRunning()) {
                try {
                    ConsumerRecords<K, V> poll = this.consumer.poll(1000L);
                    if (poll != null && poll.count() > 0) {
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug("Received: " + poll.count() + " records");
                        }
                        invokeRecords(poll);
                    }
                } catch (Exception e) {
                    this.logger.error("Container exception", e);
                    try {
                        Thread.sleep(300000L);
                    } catch (InterruptedException e2) {
                    }
                }
            }
            try {
                this.consumer.unsubscribe();
            } catch (WakeupException e3) {
            }
            this.consumer.close();
            if (this.logger.isInfoEnabled()) {
                this.logger.info("Consumer stopped");
            }
        }

        private void invokeRecords(ConsumerRecords<K, V> consumerRecords) {
            Iterator it = consumerRecords.iterator();
            while (it.hasNext()) {
                ConsumerRecord<K, V> consumerRecord = (ConsumerRecord) it.next();
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("Processing " + consumerRecord);
                }
                try {
                    this.listener.onMessage(consumerRecord);
                } catch (Exception e) {
                    this.logger.error("Listener threw an exception for " + consumerRecord, e);
                    if (this.errorHandler != null) {
                        try {
                            this.errorHandler.handle(e, consumerRecord);
                        } catch (Exception e2) {
                            this.logger.error("Error handler threw an exception", e2);
                        }
                    }
                }
            }
        }

        public boolean isLongLived() {
            return true;
        }
    }

    public KafkaConsumerListenerContainer(ConsumerFactory<K, V> consumerFactory, List<KafkaConsumerListener<K, V>> list) {
        Assert.notNull(consumerFactory, "A ConsumerFactory must be provided");
        Assert.notNull(list, "A MessageListener is required");
        this.consumerFactory = consumerFactory;
        this.consumerListeners = list;
    }

    public void setErrorHandler(ErrorHandler<K, V> errorHandler) {
        this.errorHandler = errorHandler;
    }

    @Override // com.xdja.kafka.consumer.MessageListenerContainer
    public void register(KafkaConsumerListener<K, V> kafkaConsumerListener) {
        invokerListener(kafkaConsumerListener);
        this.consumerListeners.add(kafkaConsumerListener);
    }

    public boolean isRunning() {
        return this.running;
    }

    protected void setRunning(boolean z) {
        this.running = z;
    }

    public void destroy() throws Exception {
        setRunning(false);
    }

    public void afterPropertiesSet() throws Exception {
        this.consumerTaskExecutor = new SimpleAsyncTaskExecutor("kafka-consumer");
        Iterator<KafkaConsumerListener<K, V>> it = this.consumerListeners.iterator();
        while (it.hasNext()) {
            invokerListener(it.next());
        }
        this.logger.info("setup kafka consumer listeners success");
    }

    private void invokerListener(KafkaConsumerListener<K, V> kafkaConsumerListener) {
        this.consumerTaskExecutor.submitListenable(new ListenerConsumer(kafkaConsumerListener));
    }

    public void setConsumerListeners(List<KafkaConsumerListener<K, V>> list) {
        this.consumerListeners = list;
    }
}
