package com.xdja.kafka.producer;

import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.util.ObjectUtils;

/* loaded from: input_file:com/xdja/kafka/producer/KafkaProducerTemplate.class */
public class KafkaProducerTemplate<K, V> implements KafkaProducerOperations<K, V> {
    protected final Log logger;
    private final ProducerFactory<K, V> producerFactory;
    private final boolean autoFlush;
    private volatile Producer<K, V> producer;

    public KafkaProducerTemplate(ProducerFactory<K, V> producerFactory) {
        this(producerFactory, false);
    }

    public KafkaProducerTemplate(ProducerFactory<K, V> producerFactory, boolean z) {
        this.logger = LogFactory.getLog(getClass());
        this.producerFactory = producerFactory;
        this.autoFlush = z;
    }

    @Override // com.xdja.kafka.producer.KafkaProducerOperations
    public ListenableFuture<SendResult<K, V>> send(String str, V v) {
        return doSend(new ProducerRecord<>(str, v));
    }

    @Override // com.xdja.kafka.producer.KafkaProducerOperations
    public Future<RecordMetadata> send(String str, V v, Callback callback) {
        return getProducer().send(new ProducerRecord(str, v), callback);
    }

    @Override // com.xdja.kafka.producer.KafkaProducerOperations
    public ListenableFuture<SendResult<K, V>> send(String str, K k, V v) {
        return doSend(new ProducerRecord<>(str, k, v));
    }

    @Override // com.xdja.kafka.producer.KafkaProducerOperations
    public void flush() {
        this.producer.flush();
    }

    protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
        final SettableListenableFuture settableListenableFuture = new SettableListenableFuture();
        getProducer().send(producerRecord, new Callback() { // from class: com.xdja.kafka.producer.KafkaProducerTemplate.1
            public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                if (exc == null) {
                    settableListenableFuture.set(new SendResult(producerRecord, recordMetadata));
                    return;
                }
                settableListenableFuture.setException(new KafkaProducerException(producerRecord, "Failed to send", exc));
                if (KafkaProducerTemplate.this.logger.isErrorEnabled()) {
                    KafkaProducerTemplate.this.logger.error("Exception thrown when sending a message,with value=" + ObjectUtils.nullSafeToString(producerRecord.value()) + "to topic " + producerRecord.topic(), exc);
                }
            }
        });
        if (this.autoFlush) {
            flush();
        }
        return settableListenableFuture;
    }

    protected Producer<K, V> getProducer() {
        if (this.producer == null) {
            synchronized (this) {
                if (this.producer == null) {
                    this.producer = this.producerFactory.createProducer();
                }
            }
        }
        return this.producer;
    }
}
