package com.ejtone.mars.kernel.core.sender;

import com.ejtone.mars.kernel.core.fault.Err;
import com.ejtone.mars.kernel.core.fault.Fault;
import com.ejtone.mars.kernel.core.message.Request;
import com.ejtone.mars.kernel.util.Timer;
import com.ejtone.mars.kernel.util.config.ConfigUtils;
import com.ejtone.mars.kernel.util.flowctrl.DummyFlowControler;
import com.ejtone.mars.kernel.util.flowctrl.FlowControler;
import com.ejtone.mars.kernel.util.monitor.ExecutorMonitor;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ejtone/mars/kernel/core/sender/AbstractSender.class */
public abstract class AbstractSender extends SenderStateListenerProvider implements Sender {
    private static final Logger logger = LoggerFactory.getLogger(AbstractSender.class);
    private String name;
    private ThreadPoolExecutor threadPool;
    private FlowControler flowControler;
    private int threadPoolCoreSize = ConfigUtils.getInt("thread_pool_core_size", 2);
    private int threadPoolQueueLimit = ConfigUtils.getInt("thread_pool_queue_limit", 4096);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/ejtone/mars/kernel/core/sender/AbstractSender$DefaultSenderTask.class */
    public class DefaultSenderTask implements Runnable {
        private Request request;
        private RequestListener listener;

        public DefaultSenderTask(Request request, RequestListener requestListener) {
            this.request = request;
            this.listener = requestListener;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (AbstractSender.this.isRunning()) {
                    AbstractSender.this.flowControler.control(this.request.getFlowNumber());
                    AbstractSender.this.requestAsync(this.request, this.listener);
                } else {
                    this.listener.requestException(this.request, Err.IllegalState.makeFault());
                }
            } catch (InterruptedException e) {
                this.listener.requestException(this.request, Err.IllegalState.makeFault());
            } catch (Throwable th) {
                AbstractSender.logger.error("what's wrong?", th);
                this.listener.requestException(this.request, Err.InternalError.makeFault(th));
            }
        }
    }

    public AbstractSender(String str) {
        this.name = str;
    }

    @Override // com.ejtone.mars.kernel.core.sender.Sender
    public String getName() {
        return this.name;
    }

    public void setThreadPoolCoreSize(int i) {
        this.threadPoolCoreSize = i;
    }

    public void setThreadPoolQueueLimit(int i) {
        this.threadPoolQueueLimit = i;
    }

    @Override // com.ejtone.mars.kernel.core.sender.Sender
    public void request(Request request) throws Fault {
        if (!isRunning()) {
            throw Err.IllegalState.makeFault();
        }
        try {
            this.threadPool.execute(newSenderTask(request, RequestListener.NoopListener));
        } catch (RejectedExecutionException e) {
            throw Err.SystemBusy.makeFault();
        }
    }

    @Override // com.ejtone.mars.kernel.core.sender.Sender
    public void request(Request request, RequestListener requestListener) {
        if (!isRunning()) {
            requestListener.requestException(request, Err.IllegalState.makeFault());
            return;
        }
        try {
            this.threadPool.execute(newSenderTask(request, requestListener));
        } catch (RejectedExecutionException e) {
            requestListener.requestException(request, Err.SystemBusy.makeFault());
        }
    }

    protected void doStart() throws Exception {
        super.doStart();
        if (this.flowControler == null) {
            this.flowControler = new DummyFlowControler();
        }
        if (this.threadPool == null) {
            this.threadPool = new ThreadPoolExecutor(this.threadPoolCoreSize, this.threadPoolCoreSize, 0L, TimeUnit.SECONDS, new ArrayBlockingQueue(this.threadPoolQueueLimit));
        }
        this.threadPool.prestartAllCoreThreads();
        ExecutorMonitor.getInstance().regist(this.name, this.threadPool);
        DelegateSenderService.getInstance().regist(this);
        if (isAvailable()) {
            notifyState(SenderState.available);
        }
    }

    protected void doStop() throws Exception {
        Timer timer = new Timer();
        notifyState(SenderState.unavailable);
        DelegateSenderService.getInstance().unregist(this);
        if (this.threadPool != null) {
            ExecutorMonitor.getInstance().unregist(this.threadPool);
            this.threadPool.shutdown();
            this.threadPool.awaitTermination(getStopTimeout() / 2, TimeUnit.MILLISECONDS);
            if (this.threadPool.isTerminated()) {
                logger.info("threadPool {} stopped safely", this.name);
            } else {
                List<Runnable> shutdownNow = this.threadPool.shutdownNow();
                if (shutdownNow == null || shutdownNow.size() <= 0) {
                    logger.info("threadPool {} stopped safely", this.name);
                } else {
                    logger.warn("threadPool {} stopped, {} request maybe discard", this.name, Integer.valueOf(shutdownNow.size()));
                }
            }
        }
        super.doStop();
        logger.info("sender {} stopped, cost {} ms", this.name, Long.valueOf(timer.cost()));
    }

    protected abstract void requestAsync(Request request, RequestListener requestListener);

    protected Runnable newSenderTask(Request request, RequestListener requestListener) {
        return new DefaultSenderTask(request, requestListener);
    }

    public ThreadPoolExecutor getThreadPool() {
        return this.threadPool;
    }

    public void setThreadPool(ThreadPoolExecutor threadPoolExecutor) {
        if (isRunning()) {
            throw new IllegalStateException("can not set threadpool after sender started");
        }
        this.threadPool = threadPoolExecutor;
    }

    public FlowControler getFlowControler() {
        return this.flowControler;
    }

    public void setFlowControler(FlowControler flowControler) {
        this.flowControler = flowControler;
    }
}
