/*
 * Decompiled with CFR 0.152.
 */
package com.dangdang.ddframe.rdb.sharding.executor;

import com.dangdang.ddframe.rdb.sharding.config.ShardingProperties;
import com.dangdang.ddframe.rdb.sharding.config.ShardingPropertiesConstant;
import com.dangdang.ddframe.rdb.sharding.exception.ShardingJdbcException;
import com.dangdang.ddframe.rdb.sharding.executor.ExecuteUnit;
import com.dangdang.ddframe.rdb.sharding.executor.ExecutorExceptionHandler;
import com.dangdang.ddframe.rdb.sharding.executor.MergeUnit;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ExecutorEngine {
    private static final Logger log = LoggerFactory.getLogger(ExecutorEngine.class);
    private final ListeningExecutorService executorService;

    public ExecutorEngine(ShardingProperties shardingProperties) {
        int executorSize = (Integer)shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_SIZE);
        this.executorService = MoreExecutors.listeningDecorator((ExecutorService)new ThreadPoolExecutor(executorSize, executorSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ShardingJDBC-%d").build()));
        MoreExecutors.addDelayedShutdownHook((ExecutorService)this.executorService, (long)60L, (TimeUnit)TimeUnit.SECONDS);
    }

    public <I, O> List<O> execute(Collection<I> inputs, ExecuteUnit<I, O> executeUnit) {
        List restOutputs;
        O firstOutput;
        Iterator<I> iterator = inputs.iterator();
        if (!iterator.hasNext()) {
            return Collections.emptyList();
        }
        I firstInput = iterator.next();
        ListenableFuture<List<O>> restListFuture = this.asyncRun(Lists.newArrayList(iterator), executeUnit);
        try {
            firstOutput = executeUnit.execute(firstInput);
            restOutputs = (List)restListFuture.get();
        }
        catch (Exception ex) {
            ExecutorExceptionHandler.handleException(ex);
            return null;
        }
        LinkedList result = Lists.newLinkedList((Iterable)restOutputs);
        result.add(0, firstOutput);
        return result;
    }

    public <I, M, O> O execute(Collection<I> inputs, ExecuteUnit<I, M> executeUnit, MergeUnit<M, O> mergeUnit) {
        return mergeUnit.merge(this.execute(inputs, executeUnit));
    }

    private <I, O> ListenableFuture<List<O>> asyncRun(Collection<I> inputs, final ExecuteUnit<I, O> executeUnit) {
        ArrayList<ListenableFuture> result = new ArrayList<ListenableFuture>(inputs.size());
        for (final I each : inputs) {
            result.add(this.executorService.submit(new Callable<O>(){

                @Override
                public O call() throws Exception {
                    return executeUnit.execute(each);
                }
            }));
        }
        return Futures.allAsList(result);
    }

    public void shutdown() {
        this.executorService.shutdownNow();
        try {
            this.executorService.awaitTermination(5L, TimeUnit.SECONDS);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        if (!this.executorService.isTerminated()) {
            throw new ShardingJdbcException("ExecutorEngine can not been terminated", new Object[0]);
        }
    }
}

