/*
 * Decompiled with CFR 0.152.
 */
package com.xdja.csagent.engine;

import com.google.common.collect.Sets;
import com.xdja.csagent.dataswap.core.SwapManager;
import com.xdja.csagent.engine.IRoutePacketListener;
import com.xdja.csagent.engine.IWidget;
import com.xdja.csagent.engine.packet.Packet;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.Serializable;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AgentRoute
implements IWidget {
    public static final Logger LOGGER = LoggerFactory.getLogger(AgentRoute.class);
    protected final ExecutorService executorService = Executors.newFixedThreadPool(2, (ThreadFactory)new DefaultThreadFactory("AgentRoute-Loop"));
    private final Set<IRoutePacketListener> packetListenerSet = Sets.newConcurrentHashSet();
    private final SwapManager swap;
    private volatile boolean isStop;
    private Future<?> remoteFuture;
    private Future<?> localFuture;
    private BlockingQueue<Packet> localPacketQueue = new LinkedBlockingQueue<Packet>(10000);

    public AgentRoute(SwapManager swap) {
        this.swap = swap;
    }

    public boolean addPacketListener(IRoutePacketListener packetListener) {
        boolean rst = this.packetListenerSet.add(packetListener);
        if (rst) {
            packetListener.init(this);
        }
        return rst;
    }

    private void doRoute(Packet bean) {
        for (IRoutePacketListener packetListener : this.packetListenerSet) {
            if (!packetListener.isReceive(bean)) continue;
            packetListener.onReceiveFromRoute(bean, this);
            break;
        }
    }

    public boolean isAvailable(Packet packet) {
        return packet.isLocal() || this.swap.isSwapConnected();
    }

    public boolean isSwapConnected() {
        return this.swap.isSwapConnected();
    }

    public void removePacketListener(IRoutePacketListener listener) {
        this.packetListenerSet.remove(listener);
        listener.close();
    }

    public boolean send(Packet bean) {
        boolean result = false;
        try {
            if (bean.isLocal()) {
                this.localPacketQueue.put(bean);
                result = true;
            } else {
                result = this.swap.send((Serializable)bean);
            }
        }
        catch (Exception e) {
            LOGGER.error("send data to swap module error!", (Throwable)e);
        }
        return result;
    }

    @Override
    public void shutdown() {
        this.isStop = true;
        for (IRoutePacketListener one : this.packetListenerSet) {
            one.close();
        }
        this.remoteFuture.cancel(true);
        this.localFuture.cancel(true);
        this.executorService.shutdown();
    }

    @Override
    public boolean isRunning() {
        return !this.isStop;
    }

    @Override
    public void startup() throws Exception {
        LOGGER.info("Start AgentRoute.....");
        this.isStop = false;
        this.remoteFuture = this.executorService.submit(new Runnable(){

            @Override
            public void run() {
                while (!AgentRoute.this.isStop) {
                    try {
                        Serializable take = AgentRoute.this.swap.receive();
                        Packet bean = (Packet)take;
                        AgentRoute.this.doRoute(bean);
                    }
                    catch (Exception e) {
                        if (AgentRoute.this.isStop) {
                            LOGGER.info("AgentDistribute remote thread stop !");
                            continue;
                        }
                        LOGGER.warn("AgentDistribute-Loop thread remote error!", (Throwable)e);
                    }
                }
            }
        });
        this.localFuture = this.executorService.submit(new Runnable(){

            @Override
            public void run() {
                while (!AgentRoute.this.isStop) {
                    try {
                        Serializable take = (Serializable)AgentRoute.this.localPacketQueue.take();
                        Packet bean = (Packet)take;
                        AgentRoute.this.doRoute(bean);
                    }
                    catch (Exception e) {
                        if (AgentRoute.this.isStop) {
                            LOGGER.info("AgentDistribute local thread stop !");
                            continue;
                        }
                        LOGGER.warn("AgentDistribute-Loop thread local error!", (Throwable)e);
                    }
                }
            }
        });
    }
}

