/*
 * Decompiled with CFR 0.152.
 */
package com.xdja.csagent.engine;

import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.xdja.csagent.engine.AgentConnection;
import com.xdja.csagent.engine.AgentRoute;
import com.xdja.csagent.engine.IRoutePacketListener;
import com.xdja.csagent.engine.IWidget;
import com.xdja.csagent.engine.MakerUtils;
import com.xdja.csagent.engine.Utils;
import com.xdja.csagent.engine.bean.EngineParams;
import com.xdja.csagent.engine.consts.PacketSource;
import com.xdja.csagent.engine.metrics.MetricsFactory;
import com.xdja.csagent.engine.packet.ChannelPacket;
import com.xdja.csagent.engine.packet.ConnectBegin;
import com.xdja.csagent.engine.packet.FeedbackPacket;
import com.xdja.csagent.engine.packet.Packet;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AgentBackend
implements IWidget,
IRoutePacketListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(AgentBackend.class);
    public final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(0, (ThreadFactory)new DefaultThreadFactory("AgentBackend-Connect"));
    protected final ExecutorService executorService;
    private final EngineParams engineParams;
    private ConcurrentMap<String, AgentConnection> connectionMap = new ConcurrentHashMap<String, AgentConnection>();
    private Meter createMeter = MetricsFactory.metrics().meter("backend.connection.create.meter");
    private Meter closeMeter = MetricsFactory.metrics().meter("backend.connection.close.meter");
    private Timer connTimer = MetricsFactory.metrics().timer("backend.connection.timer");
    private AgentRoute route;

    public AgentBackend(EngineParams engineParams) {
        this(Executors.newFixedThreadPool(10, (ThreadFactory)new DefaultThreadFactory("AgentBackend")), engineParams);
    }

    public AgentBackend(ExecutorService executorService, EngineParams engineParams) {
        this.executorService = executorService;
        this.engineParams = engineParams;
    }

    public int getConnectedCount() {
        return this.connectionMap.size();
    }

    public AgentConnection getConnection(String id) {
        return (AgentConnection)this.connectionMap.get(id);
    }

    private PacketSource getPacketSource() {
        return PacketSource.Backend;
    }

    private void initBackendConnect(SocketChannel ch, final ConnectBegin connectBegin) throws Exception {
        if (Utils.EnableLogging) {
            ch.pipeline().addLast(new ChannelHandler[]{new LoggingHandler(LogLevel.INFO)});
        }
        ch.pipeline().addLast(new ChannelHandler[]{new IdleStateHandler(0, 0, this.engineParams.getConnectIdleMillis() / 1000)});
        ch.pipeline().addLast(new ChannelHandler[]{new ChannelInboundHandlerAdapter(){
            private Timer.Context time;
            private AgentConnection conn;

            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                this.time = AgentBackend.this.connTimer.time();
                AgentBackend.this.createMeter.mark();
                this.conn = MakerUtils.getMaker(connectBegin.getAgentType()).makeBackend(ctx.channel(), connectBegin.getChannelId(), connectBegin.isLocal(), AgentBackend.this);
                AgentBackend.this.connectionMap.put(this.conn.id(), this.conn);
                super.channelActive(ctx);
            }

            public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                super.channelInactive(ctx);
                AgentBackend.this.connectionMap.remove(this.conn.id());
                AgentBackend.this.closeMeter.mark();
                this.time.stop();
            }

            public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                IdleStateEvent e;
                if (evt instanceof IdleStateEvent && (e = (IdleStateEvent)evt).state() == IdleState.ALL_IDLE) {
                    LOGGER.warn("Connection {} idle {} seconds , will close !!!", (Object)this.conn.id(), (Object)(AgentBackend.this.engineParams.getConnectIdleMillis() / 1000));
                    ctx.close();
                }
            }
        }});
    }

    @Override
    public boolean isReceive(Packet packet) {
        if (packet instanceof ChannelPacket) {
            ChannelPacket bean = (ChannelPacket)packet;
            return this.getPacketSource() != bean.getSource();
        }
        return false;
    }

    @Override
    public void init(AgentRoute route) {
        this.route = route;
    }

    @Override
    public void close() {
        LOGGER.info("close AgentBackend begin");
        this.executorService.shutdownNow();
        this.nioEventLoopGroup.shutdownGracefully().awaitUninterruptibly();
        LOGGER.info("close AgentBackend over");
    }

    @Override
    public void onReceiveFromRoute(Packet packet, final AgentRoute route) {
        final ChannelPacket bean = (ChannelPacket)packet;
        final AgentConnection connection = (AgentConnection)this.connectionMap.get(bean.getChannelId());
        if (connection != null) {
            this.executorService.submit(new Runnable(){

                @Override
                public void run() {
                    connection.onReceiveFromRoute(bean);
                }
            });
        } else if (bean instanceof ConnectBegin) {
            final ConnectBegin connectBegin = (ConnectBegin)bean;
            final String connectHost = Utils.extractHost(connectBegin.getHost());
            final Integer connectPort = connectBegin.getPort();
            this.executorService.submit(new Runnable(){

                @Override
                public void run() {
                    LOGGER.debug("try connect " + connectHost + ":" + connectPort);
                    Bootstrap b = new Bootstrap();
                    ((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)b.group((EventLoopGroup)AgentBackend.this.nioEventLoopGroup)).channel(NioSocketChannel.class)).handler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

                        protected void initChannel(SocketChannel ch) throws Exception {
                            AgentBackend.this.initBackendConnect(ch, connectBegin);
                        }
                    })).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)AgentBackend.this.engineParams.getConnectTimeoutMillis())).option(ChannelOption.AUTO_READ, (Object)false);
                    ChannelFuture f = b.connect(connectHost, connectPort.intValue());
                    f.addListener((GenericFutureListener)new ChannelFutureListener(){

                        public void operationComplete(ChannelFuture future) {
                            if (!future.isSuccess()) {
                                LOGGER.warn("connect {}:{} failure!!!", (Object)connectHost, (Object)connectPort);
                                LOGGER.debug("notify: connect failure");
                                FeedbackPacket connectFailure = new FeedbackPacket(connectBegin.getChannelId(), 2);
                                connectFailure.setSource(PacketSource.Backend);
                                connectFailure.setLocal(connectBegin.isLocal());
                                route.send(connectFailure);
                                LOGGER.debug("close self channel");
                                future.channel().close();
                            }
                        }
                    });
                }
            });
        } else {
            LOGGER.warn("Ignore packet for {} !! class name is {}", (Object)bean.getChannelId(), (Object)bean.getClass().getSimpleName());
        }
    }

    public boolean sendToRoute(ChannelPacket packet) {
        boolean routeLocal = ((AgentConnection)this.connectionMap.get(packet.getChannelId())).isRouteLocal();
        packet.setLocal(routeLocal);
        packet.setSource(this.getPacketSource());
        if (this.route != null && (packet.isLocal() || this.route.isAvailable())) {
            return this.route.send(packet);
        }
        return false;
    }

    @Override
    public void shutdown() {
    }

    @Override
    public boolean isRunning() {
        return !this.executorService.isTerminated() || !this.nioEventLoopGroup.isTerminated();
    }

    @Override
    public void startup() throws Exception {
        LOGGER.info("Start AgentBackend ....");
    }
}

