/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.listener;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

public class ConcurrentMessageListenerContainer<K, V>
extends AbstractMessageListenerContainer<K, V> {
    private final List<KafkaMessageListenerContainer<K, V>> containers = new ArrayList<KafkaMessageListenerContainer<K, V>>();
    private final List<AsyncListenableTaskExecutor> executors = new ArrayList<AsyncListenableTaskExecutor>();
    private int concurrency = 1;
    private boolean alwaysClientIdSuffix = true;

    public ConcurrentMessageListenerContainer(ConsumerFactory<? super K, ? super V> consumerFactory, ContainerProperties containerProperties) {
        super(consumerFactory, containerProperties);
        Assert.notNull(consumerFactory, (String)"A ConsumerFactory must be provided");
    }

    public int getConcurrency() {
        return this.concurrency;
    }

    public void setConcurrency(int concurrency) {
        Assert.isTrue((concurrency > 0 ? 1 : 0) != 0, (String)"concurrency must be greater than 0");
        this.concurrency = concurrency;
    }

    public void setAlwaysClientIdSuffix(boolean alwaysClientIdSuffix) {
        this.alwaysClientIdSuffix = alwaysClientIdSuffix;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<KafkaMessageListenerContainer<K, V>> getContainers() {
        Object object = this.lifecycleMonitor;
        synchronized (object) {
            return Collections.unmodifiableList(new ArrayList<KafkaMessageListenerContainer<K, V>>(this.containers));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Collection<TopicPartition> getAssignedPartitions() {
        Object object = this.lifecycleMonitor;
        synchronized (object) {
            return this.containers.stream().map(KafkaMessageListenerContainer::getAssignedPartitions).filter(Objects::nonNull).flatMap(Collection::stream).collect(Collectors.toList());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Map<String, Collection<TopicPartition>> getAssignmentsByClientId() {
        Object object = this.lifecycleMonitor;
        synchronized (object) {
            HashMap<String, Collection<TopicPartition>> assignments = new HashMap<String, Collection<TopicPartition>>();
            this.containers.forEach(container -> {
                Map<String, Collection<TopicPartition>> byClientId = container.getAssignmentsByClientId();
                if (byClientId != null) {
                    assignments.putAll(byClientId);
                }
            });
            return assignments;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isContainerPaused() {
        Object object = this.lifecycleMonitor;
        synchronized (object) {
            boolean paused = this.isPaused();
            if (paused) {
                for (AbstractMessageListenerContainer abstractMessageListenerContainer : this.containers) {
                    if (abstractMessageListenerContainer.isContainerPaused()) continue;
                    return false;
                }
            }
            return paused;
        }
    }

    @Override
    public boolean isChildRunning() {
        if (!this.isRunning()) {
            return false;
        }
        for (MessageListenerContainer messageListenerContainer : this.containers) {
            if (!messageListenerContainer.isRunning()) continue;
            return true;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Map<String, Map<MetricName, ? extends Metric>> metrics() {
        Object object = this.lifecycleMonitor;
        synchronized (object) {
            HashMap<String, Map<MetricName, Metric>> metrics = new HashMap<String, Map<MetricName, Metric>>();
            for (KafkaMessageListenerContainer<K, V> container : this.containers) {
                metrics.putAll(container.metrics());
            }
            return Collections.unmodifiableMap(metrics);
        }
    }

    @Override
    protected void doStart() {
        if (!this.isRunning()) {
            this.checkTopics();
            ContainerProperties containerProperties = this.getContainerProperties();
            TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();
            if (topicPartitions != null && this.concurrency > topicPartitions.length) {
                this.logger.warn(() -> "When specific partitions are provided, the concurrency must be less than or equal to the number of partitions; reduced from " + this.concurrency + " to " + topicPartitions.length);
                this.concurrency = topicPartitions.length;
            }
            this.setRunning(true);
            for (int i = 0; i < this.concurrency; ++i) {
                KafkaMessageListenerContainer<K, V> container = this.constructContainer(containerProperties, topicPartitions, i);
                this.configureChildContainer(i, container);
                if (this.isPaused()) {
                    container.pause();
                }
                container.start();
                this.containers.add(container);
            }
        }
    }

    private void configureChildContainer(int index, KafkaMessageListenerContainer<K, V> container) {
        ApplicationEventPublisher publisher;
        String beanName = this.getBeanName();
        beanName = (beanName == null ? "consumer" : beanName) + "-" + index;
        container.setBeanName(beanName);
        ApplicationContext applicationContext = this.getApplicationContext();
        if (applicationContext != null) {
            container.setApplicationContext(applicationContext);
        }
        if ((publisher = this.getApplicationEventPublisher()) != null) {
            container.setApplicationEventPublisher(publisher);
        }
        container.setClientIdSuffix(this.concurrency > 1 || this.alwaysClientIdSuffix ? "-" + index : "");
        container.setGenericErrorHandler(this.getGenericErrorHandler());
        container.setCommonErrorHandler(this.getCommonErrorHandler());
        container.setAfterRollbackProcessor(this.getAfterRollbackProcessor());
        container.setRecordInterceptor(this.getRecordInterceptor());
        container.setBatchInterceptor(this.getBatchInterceptor());
        container.setInterceptBeforeTx(this.isInterceptBeforeTx());
        container.setEmergencyStop(() -> this.stopAbnormally(() -> {}));
        AsyncListenableTaskExecutor exec = container.getContainerProperties().getConsumerTaskExecutor();
        if (exec == null) {
            if (this.executors.size() > index) {
                exec = this.executors.get(index);
            } else {
                exec = new SimpleAsyncTaskExecutor(beanName + "-C-");
                this.executors.add(exec);
            }
            container.getContainerProperties().setConsumerTaskExecutor(exec);
        }
    }

    private KafkaMessageListenerContainer<K, V> constructContainer(ContainerProperties containerProperties, @Nullable TopicPartitionOffset[] topicPartitions, int i) {
        KafkaMessageListenerContainer container = topicPartitions == null ? new KafkaMessageListenerContainer(this, this.consumerFactory, containerProperties) : new KafkaMessageListenerContainer(this, this.consumerFactory, containerProperties, this.partitionSubset(containerProperties, i));
        return container;
    }

    private TopicPartitionOffset[] partitionSubset(ContainerProperties containerProperties, int index) {
        TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();
        if (this.concurrency == 1) {
            return topicPartitions;
        }
        int numPartitions = topicPartitions.length;
        if (numPartitions == this.concurrency) {
            return new TopicPartitionOffset[]{topicPartitions[index]};
        }
        int perContainer = numPartitions / this.concurrency;
        TopicPartitionOffset[] subset = index == this.concurrency - 1 ? Arrays.copyOfRange(topicPartitions, index * perContainer, topicPartitions.length) : Arrays.copyOfRange(topicPartitions, index * perContainer, (index + 1) * perContainer);
        return subset;
    }

    @Override
    protected void doStop(Runnable callback, boolean normal) {
        AtomicInteger count = new AtomicInteger();
        if (this.isRunning()) {
            boolean childRunning = this.isChildRunning();
            this.setRunning(false);
            if (!childRunning) {
                callback.run();
            }
            for (KafkaMessageListenerContainer<K, V> container : this.containers) {
                if (!container.isRunning()) continue;
                count.incrementAndGet();
            }
            for (KafkaMessageListenerContainer<K, V> container : this.containers) {
                if (!container.isRunning()) continue;
                if (normal) {
                    container.stop(() -> {
                        if (count.decrementAndGet() <= 0) {
                            callback.run();
                        }
                    });
                    continue;
                }
                container.stopAbnormally(() -> {
                    if (count.decrementAndGet() <= 0) {
                        callback.run();
                    }
                });
            }
            this.containers.clear();
            this.setStoppedNormally(normal);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void pause() {
        Object object = this.lifecycleMonitor;
        synchronized (object) {
            super.pause();
            this.containers.forEach(AbstractMessageListenerContainer::pause);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void resume() {
        Object object = this.lifecycleMonitor;
        synchronized (object) {
            super.resume();
            this.containers.forEach(AbstractMessageListenerContainer::resume);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void pausePartition(TopicPartition topicPartition) {
        Object object = this.lifecycleMonitor;
        synchronized (object) {
            super.pausePartition(topicPartition);
            this.containers.stream().filter(container -> this.containsPartition(topicPartition, (KafkaMessageListenerContainer<K, V>)container)).forEach(container -> container.pausePartition(topicPartition));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void resumePartition(TopicPartition topicPartition) {
        Object object = this.lifecycleMonitor;
        synchronized (object) {
            super.resumePartition(topicPartition);
            this.containers.stream().filter(container -> this.containsPartition(topicPartition, (KafkaMessageListenerContainer<K, V>)container)).forEach(container -> container.resumePartition(topicPartition));
        }
    }

    @Override
    public boolean isPartitionPaused(TopicPartition topicPartition) {
        return this.containers.stream().anyMatch(container -> container.isPartitionPaused(topicPartition));
    }

    @Override
    public boolean isInExpectedState() {
        return (this.isRunning() || this.isStoppedNormally()) && this.containers.stream().map(container -> container.isInExpectedState()).allMatch(bool -> Boolean.TRUE.equals(bool));
    }

    private boolean containsPartition(TopicPartition topicPartition, KafkaMessageListenerContainer<K, V> container) {
        Collection<TopicPartition> assignedPartitions = container.getAssignedPartitions();
        return assignedPartitions != null && assignedPartitions.contains(topicPartition);
    }

    public String toString() {
        return "ConcurrentMessageListenerContainer [concurrency=" + this.concurrency + ", beanName=" + this.getBeanName() + ", running=" + this.isRunning() + "]";
    }
}

