package org.apache.james.mailetcontainer.impl;

import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import org.apache.commons.configuration2.HierarchicalConfiguration;
import org.apache.commons.configuration2.tree.ImmutableNode;
import org.apache.james.lifecycle.api.Configurable;
import org.apache.james.lifecycle.api.Disposable;
import org.apache.james.lifecycle.api.LifecycleUtil;
import org.apache.james.mailetcontainer.api.MailProcessor;
import org.apache.james.mailetcontainer.api.jmx.MailSpoolerMBean;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.metrics.api.TimeMetric;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.api.MailQueueFactory;
import org.apache.james.util.concurrent.NamedThreadFactory;
import org.apache.mailet.Mail;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/apache/james/mailetcontainer/impl/JamesMailSpooler.class */
public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMBean {
    private static final Logger LOGGER = LoggerFactory.getLogger(JamesMailSpooler.class);
    public static final String SPOOL_PROCESSING = "spoolProcessing";
    private int numThreads;
    private final AtomicInteger processingActive = new AtomicInteger(0);
    private final MetricFactory metricFactory;
    private final MailProcessor mailProcessor;
    private final MailQueueFactory<?> queueFactory;
    private reactor.core.Disposable disposable;
    private Scheduler spooler;
    private int parallelismLevel;
    private MailQueue queue;

    @Inject
    public JamesMailSpooler(MetricFactory metricFactory, MailProcessor mailProcessor, MailQueueFactory<?> mailQueueFactory) {
        this.metricFactory = metricFactory;
        this.mailProcessor = mailProcessor;
        this.queueFactory = mailQueueFactory;
    }

    public void configure(HierarchicalConfiguration<ImmutableNode> hierarchicalConfiguration) {
        this.numThreads = hierarchicalConfiguration.getInt("threads", 100);
        this.parallelismLevel = Math.max(1, this.numThreads - 2);
    }

    @PostConstruct
    public void init() {
        LOGGER.info("init...");
        this.queue = this.queueFactory.createQueue(MailQueueFactory.SPOOL);
        this.spooler = Schedulers.fromExecutor(Executors.newFixedThreadPool(this.numThreads, NamedThreadFactory.withName("spooler")));
        LOGGER.info("uses {} Thread(s)", Integer.valueOf(this.numThreads));
        run();
    }

    private void run() {
        LOGGER.info("Queue={}", this.queue);
        this.disposable = Flux.from(this.queue.deQueue()).flatMap(mailQueueItem -> {
            return handleOnQueueItem(mailQueueItem).subscribeOn(this.spooler);
        }, this.parallelismLevel).onErrorContinue((th, obj) -> {
            LOGGER.error("Exception processing mail while spooling {}", obj, th);
        }).subscribeOn(this.spooler).subscribe();
    }

    private Mono<Void> handleOnQueueItem(MailQueue.MailQueueItem mailQueueItem) {
        TimeMetric timer = this.metricFactory.timer(SPOOL_PROCESSING);
        try {
            AtomicInteger atomicInteger = this.processingActive;
            Objects.requireNonNull(atomicInteger);
            return Mono.fromCallable(atomicInteger::incrementAndGet).flatMap(num -> {
                return processMail(mailQueueItem).subscribeOn(this.spooler);
            }).doOnSuccess(r4 -> {
                timer.stopAndPublish().logWhenExceedP99(TimeMetric.ExecutionResult.DEFAULT_100_MS_THRESHOLD);
            }).doOnSuccess(r3 -> {
                this.processingActive.decrementAndGet();
            });
        } catch (Throwable th) {
            return Mono.error(th);
        }
    }

    private Mono<Void> processMail(MailQueue.MailQueueItem mailQueueItem) {
        Objects.requireNonNull(mailQueueItem);
        return Mono.using(mailQueueItem::getMail, mail -> {
            return Mono.just(mail).doOnNext(mail -> {
                LOGGER.debug("==== Begin processing mail {} ====", mail.getName());
            }).map(mail2 -> {
                return performProcessMail(mailQueueItem, mail2);
            }).doOnNext(mail3 -> {
                LOGGER.debug("==== End processing mail {} ====", mail3.getName());
            });
        }, (v0) -> {
            LifecycleUtil.dispose(v0);
        }).then();
    }

    private Mail performProcessMail(MailQueue.MailQueueItem mailQueueItem, Mail mail) {
        try {
            this.mailProcessor.service(mail);
        } catch (Exception e) {
            try {
                mailQueueItem.done(false);
            } catch (MailQueue.MailQueueException e2) {
                throw new RuntimeException(e);
            }
        }
        if (Thread.currentThread().isInterrupted()) {
            throw new InterruptedException("Thread has been interrupted");
        }
        mailQueueItem.done(true);
        return mail;
    }

    @PreDestroy
    public void dispose() {
        LOGGER.info("start dispose() ...");
        this.disposable.dispose();
        this.spooler.dispose();
        try {
            this.queue.close();
        } catch (IOException e) {
            LOGGER.debug("error closing queue", e);
        }
        LOGGER.info("thread shutdown completed.");
    }

    public int getThreadCount() {
        return this.numThreads;
    }

    public int getCurrentSpoolCount() {
        return this.processingActive.get();
    }
}
