/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.retrytopic;

import java.time.Clock;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter;
import org.springframework.kafka.retrytopic.DeadLetterPublishingRecovererFactory;
import org.springframework.util.Assert;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;

public class ListenerContainerFactoryConfigurer {
    private static final Set<ConcurrentKafkaListenerContainerFactory<?, ?>> CONFIGURED_FACTORIES_CACHE;
    private static final LogAccessor LOGGER;
    private static final int MIN_POLL_TIMEOUT_VALUE = 100;
    private static final int MAX_POLL_TIMEOUT_VALUE = 5000;
    private static final int POLL_TIMEOUT_DIVISOR = 4;
    private static final long LOWEST_BACKOFF_THRESHOLD = 1500L;
    private Consumer<ConcurrentMessageListenerContainer<?, ?>> containerCustomizer = container -> {};
    private Consumer<CommonErrorHandler> errorHandlerCustomizer = errorHandler -> {};
    private final DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory;
    private final KafkaConsumerBackoffManager kafkaConsumerBackoffManager;
    private final Clock clock;

    ListenerContainerFactoryConfigurer(KafkaConsumerBackoffManager kafkaConsumerBackoffManager, DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory, @Qualifier(value="internalBackOffClock") Clock clock) {
        this.kafkaConsumerBackoffManager = kafkaConsumerBackoffManager;
        this.deadLetterPublishingRecovererFactory = deadLetterPublishingRecovererFactory;
        this.clock = clock;
    }

    public ConcurrentKafkaListenerContainerFactory<?, ?> configure(ConcurrentKafkaListenerContainerFactory<?, ?> containerFactory, Configuration configuration) {
        return this.isCached(containerFactory) ? containerFactory : this.addToCache(this.doConfigure(containerFactory, configuration.backOffValues));
    }

    public ConcurrentKafkaListenerContainerFactory<?, ?> configureWithoutBackOffValues(ConcurrentKafkaListenerContainerFactory<?, ?> containerFactory, Configuration configuration) {
        return this.isCached(containerFactory) ? containerFactory : this.doConfigure(containerFactory, Collections.emptyList());
    }

    private ConcurrentKafkaListenerContainerFactory<?, ?> doConfigure(ConcurrentKafkaListenerContainerFactory<?, ?> containerFactory, List<Long> backOffValues) {
        containerFactory.setContainerCustomizer((C container) -> this.setupBackoffAwareMessageListenerAdapter((ConcurrentMessageListenerContainer<?, ?>)container, backOffValues));
        containerFactory.setCommonErrorHandler(this.createErrorHandler(this.deadLetterPublishingRecovererFactory.create()));
        return containerFactory;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean isCached(ConcurrentKafkaListenerContainerFactory<?, ?> containerFactory) {
        Set<ConcurrentKafkaListenerContainerFactory<?, ?>> set = CONFIGURED_FACTORIES_CACHE;
        synchronized (set) {
            return CONFIGURED_FACTORIES_CACHE.contains(containerFactory);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ConcurrentKafkaListenerContainerFactory<?, ?> addToCache(ConcurrentKafkaListenerContainerFactory<?, ?> containerFactory) {
        Set<ConcurrentKafkaListenerContainerFactory<?, ?>> set = CONFIGURED_FACTORIES_CACHE;
        synchronized (set) {
            CONFIGURED_FACTORIES_CACHE.add(containerFactory);
            return containerFactory;
        }
    }

    public void setContainerCustomizer(Consumer<ConcurrentMessageListenerContainer<?, ?>> containerCustomizer) {
        Assert.notNull(containerCustomizer, (String)"'containerCustomizer' cannot be null");
        this.containerCustomizer = containerCustomizer;
    }

    public void setErrorHandlerCustomizer(Consumer<CommonErrorHandler> errorHandlerCustomizer) {
        this.errorHandlerCustomizer = errorHandlerCustomizer;
    }

    private CommonErrorHandler createErrorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
        DefaultErrorHandler errorHandler = new DefaultErrorHandler(deadLetterPublishingRecoverer, (BackOff)new FixedBackOff(0L, 0L));
        errorHandler.setCommitRecovered(true);
        errorHandler.setLogLevel(KafkaException.Level.DEBUG);
        this.errorHandlerCustomizer.accept(errorHandler);
        return errorHandler;
    }

    private void setupBackoffAwareMessageListenerAdapter(ConcurrentMessageListenerContainer<?, ?> container, List<Long> backOffValues) {
        AcknowledgingConsumerAwareMessageListener listener = this.checkAndCast(container.getContainerProperties().getMessageListener(), AcknowledgingConsumerAwareMessageListener.class);
        this.configurePollTimeoutAndIdlePartitionInterval(container, backOffValues);
        container.setupMessageListener(new KafkaBackoffAwareMessageListenerAdapter(listener, this.kafkaConsumerBackoffManager, container.getListenerId(), this.clock));
        this.containerCustomizer.accept(container);
    }

    private void configurePollTimeoutAndIdlePartitionInterval(ConcurrentMessageListenerContainer<?, ?> container, List<Long> backOffValues) {
        if (backOffValues.isEmpty()) {
            return;
        }
        ContainerProperties containerProperties = container.getContainerProperties();
        long pollTimeoutValue = this.getPollTimeoutValue(containerProperties, backOffValues);
        long idlePartitionEventInterval = this.getIdlePartitionInterval(containerProperties, pollTimeoutValue);
        LOGGER.debug(() -> "pollTimeout and idlePartitionEventInterval for back off values " + backOffValues + " will be set to " + pollTimeoutValue + " and " + idlePartitionEventInterval);
        containerProperties.setIdlePartitionEventInterval(idlePartitionEventInterval);
        containerProperties.setPollTimeout(pollTimeoutValue);
    }

    private long getIdlePartitionInterval(ContainerProperties containerProperties, long pollTimeoutValue) {
        Long idlePartitionEventInterval = containerProperties.getIdlePartitionEventInterval();
        return idlePartitionEventInterval != null && idlePartitionEventInterval > 0L ? idlePartitionEventInterval : pollTimeoutValue;
    }

    private long getPollTimeoutValue(ContainerProperties containerProperties, List<Long> backOffValues) {
        if (containerProperties.getPollTimeout() != 5000L) {
            return containerProperties.getPollTimeout();
        }
        Long lowestBackOff = (Long)backOffValues.stream().min(Comparator.naturalOrder()).orElseThrow(() -> new IllegalArgumentException("No back off values found!"));
        return lowestBackOff > 1500L ? this.applyLimits(lowestBackOff / 4L) : 100L;
    }

    private long applyLimits(long pollTimeoutValue) {
        return Math.min(Math.max(pollTimeoutValue, 100L), 5000L);
    }

    private <T> T checkAndCast(Object obj, Class<T> clazz) {
        Assert.isAssignable(clazz, obj.getClass(), () -> String.format("The provided class %s is not assignable from %s", obj.getClass().getSimpleName(), clazz.getSimpleName()));
        return (T)obj;
    }

    static {
        LOGGER = new LogAccessor(LogFactory.getLog(ListenerContainerFactoryConfigurer.class));
        CONFIGURED_FACTORIES_CACHE = new HashSet();
    }

    static class Configuration {
        private final List<Long> backOffValues;

        Configuration(List<Long> backOffValues) {
            this.backOffValues = backOffValues;
        }
    }
}

