package com.xdja.csagent.dataswap.core.swapManager.ftpSimplex;

import com.xdja.csagent.dataswap.comm.TransferUtils;
import com.xdja.csagent.dataswap.comm.bean.Heartbeat;
import com.xdja.csagent.dataswap.comm.bean.TransferBean;
import com.xdja.csagent.dataswap.core.AbstractSwapManager;
import com.xdja.csagent.dataswap.core.SwapConfig;
import com.xdja.csagent.dataswap.core.SwapManager;
import com.xdja.csagent.dataswap.core.swapManager.ftpSimplex.ftpClient.FtpTool;
import com.xdja.csagent.dataswap.core.swapManager.ftpSimplex.ftpServer.FtpReceiver;
import com.xdja.csagent.dataswap.core.swapManager.ftpSimplex.ftpServerMina.FtpServerMina;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.ScheduledFuture;
import io.netty.util.internal.OneTimeTask;
import java.io.IOException;
import java.io.Serializable;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;

/* loaded from: input_file:com/xdja/csagent/dataswap/core/swapManager/ftpSimplex/FtpSimplexSwapManager.class */
public class FtpSimplexSwapManager extends AbstractSwapManager implements ReceiveCallback {
    private static final int Transfer_Timeout_Second = Integer.parseInt(System.getProperty("transfer.timeout", "50"));
    private static final int Transfer_Check_Period_Second = Integer.parseInt(System.getProperty("transfer.checkPeriod", "10"));
    private static final int Heartbeat_Period_Second = Integer.parseInt(System.getProperty("transfer.heartPeriod", "10"));
    private static final int Connect_Timeout_Second = Integer.parseInt(System.getProperty("transfer.connectTimeout", "60"));
    public static final int TransferMaxCount = 2000;
    private final int Upload_Timeout_Seconds = 10;
    private long lastReceiveTime;
    private ScheduledFuture<?> connectTimeoutTaskFuture;
    private EventLoopGroup eventloop;
    private Promise<String> disconnectPromise;
    private volatile AbstractSwapManager.State state;
    private FtpTool sender;
    private FtpServerMina receiver;
    private EventExecutorGroup executorGroup;
    private Promise<SwapManager> connectedPromise;
    private Logger logger;
    private IdSequence idSequence;
    private List<Serializable> sendBuffer;
    private ScheduledExecutorService sendLoopExecutor;

    public FtpSimplexSwapManager(SwapConfig swapConfig) {
        super(swapConfig);
        this.Upload_Timeout_Seconds = 10;
        this.logger = LoggerFactory.getLogger(getClass());
        this.sendBuffer = new ArrayList(TransferMaxCount);
        this.state = AbstractSwapManager.State.init;
        this.idSequence = new IdSequence();
    }

    private FtpTool getSendFtpTool(SwapConfig swapConfig) {
        String destHost = swapConfig.getDestHost();
        Assert.hasText(destHost);
        int destPort = swapConfig.getDestPort();
        Assert.isTrue(destPort > 0, "destPort > 0");
        String destUsername = swapConfig.getDestUsername();
        Assert.hasText(destUsername);
        String destPassword = swapConfig.getDestPassword();
        Assert.hasText(destPassword);
        return new FtpTool(destHost, destPort, destUsername, destPassword);
    }

    @Override // com.xdja.csagent.dataswap.core.SwapManager
    public SocketAddress remoteAddress() {
        return new FtpSimplexAddress(this.receiver.remoteAddress(), this.sender.remoteAddress());
    }

    @Override // com.xdja.csagent.dataswap.core.SwapManager
    public SocketAddress localAddress() {
        return new FtpSimplexAddress(this.sender.localAddress(), this.receiver.localAddress());
    }

    @Override // com.xdja.csagent.dataswap.core.SwapManager
    public String getMode() {
        return "#";
    }

    @Override // com.xdja.csagent.dataswap.core.SwapManager
    public Future<SwapManager> startSwap() throws Exception {
        if (this.state != AbstractSwapManager.State.init) {
            throw new IllegalStateException(this.state.name());
        }
        if (this.executorGroup == null) {
            this.executorGroup = new DefaultEventExecutorGroup(3);
        }
        if (this.eventloop == null) {
            this.eventloop = new LocalEventLoopGroup(1);
        }
        this.disconnectPromise = this.executorGroup.next().newPromise();
        this.connectedPromise = this.executorGroup.next().newPromise();
        try {
            this.receiver = new FtpServerMina();
            this.receiver.start(getSwapConfig().getLocalPort(), new FtpReceiver() { // from class: com.xdja.csagent.dataswap.core.swapManager.ftpSimplex.FtpSimplexSwapManager.1
                @Override // com.xdja.csagent.dataswap.core.swapManager.ftpSimplex.ftpServer.FtpReceiver
                public void onFileReceive(String str, byte[] bArr) {
                    FtpSimplexSwapManager.this.logger.debug("ftp server receive {}", str);
                    if (str.startsWith(IdSequence.PREFIX)) {
                        FtpSimplexSwapManager.this.onReceive(bArr);
                    }
                }
            }, getSwapConfig().getLocalAccounts());
            this.connectTimeoutTaskFuture = this.eventloop.schedule(new OneTimeTask() { // from class: com.xdja.csagent.dataswap.core.swapManager.ftpSimplex.FtpSimplexSwapManager.2
                public void run() {
                    if (FtpSimplexSwapManager.this.state == AbstractSwapManager.State.connecting) {
                        FtpSimplexSwapManager.this.logger.error("FtpSimplex connect failure , timeout !");
                        FtpSimplexSwapManager.this.state = AbstractSwapManager.State.init;
                        FtpSimplexSwapManager.this.executorGroup.submit(new OneTimeTask() { // from class: com.xdja.csagent.dataswap.core.swapManager.ftpSimplex.FtpSimplexSwapManager.2.1
                            public void run() {
                                FtpSimplexSwapManager.this.stopAndReset();
                                FtpSimplexSwapManager.this.connectedPromise.setFailure(new ConnectTimeoutException());
                            }
                        });
                    }
                }
            }, Connect_Timeout_Second, TimeUnit.SECONDS);
            this.state = AbstractSwapManager.State.connecting;
            this.sender = getSendFtpTool(getSwapConfig());
            startSender();
            return this.connectedPromise;
        } catch (Exception e) {
            this.logger.error("ftp server start failure !");
            throw e;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendToServer(List<Serializable> list, int i) throws InterruptedException {
        final String nextId = this.idSequence.nextId();
        TransferBean transferBean = new TransferBean();
        transferBean.setRetainCount(i);
        transferBean.setDataList(list);
        final byte[] serialize = TransferUtils.serialize(transferBean);
        boolean z = false;
        do {
            Future submit = this.executorGroup.submit(new Callable<Void>() { // from class: com.xdja.csagent.dataswap.core.swapManager.ftpSimplex.FtpSimplexSwapManager.3
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Void call() throws Exception {
                    FtpSimplexSwapManager.this.sender.upload(nextId, serialize);
                    return null;
                }
            });
            try {
                submit.get(10L, TimeUnit.SECONDS);
                this.logger.debug("ftp client send " + nextId + " , size " + serialize.length);
                z = true;
            } catch (InterruptedException e) {
                throw e;
            } catch (Exception e2) {
                this.logger.warn("ftp client send exception : {}", e2.getMessage());
                submit.cancel(true);
                restartSender();
            }
            if (z) {
                return;
            }
        } while (!Thread.currentThread().isInterrupted());
    }

    private void restartSender() throws InterruptedException {
        this.logger.debug("restart ftp sender");
        int i = 10;
        while (true) {
            int i2 = i;
            i--;
            if (i2 <= 0) {
                return;
            }
            this.sender.disconnect();
            try {
                this.sender.connect();
                this.logger.debug("ftp sender reconnected !");
                return;
            } catch (IOException e) {
                TimeUnit.SECONDS.sleep(5L);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void startSender() {
        this.eventloop.submit(new Callable<Void>() { // from class: com.xdja.csagent.dataswap.core.swapManager.ftpSimplex.FtpSimplexSwapManager.4
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                FtpSimplexSwapManager.this.sender.connect();
                FtpSimplexSwapManager.this.logger.info("ftp sender connected ! {}:{}", FtpSimplexSwapManager.this.sender.getHost(), Integer.valueOf(FtpSimplexSwapManager.this.sender.getPort()));
                return null;
            }
        }).addListener(new GenericFutureListener<Future<? super Void>>() { // from class: com.xdja.csagent.dataswap.core.swapManager.ftpSimplex.FtpSimplexSwapManager.5
            public void operationComplete(Future<? super Void> future) throws Exception {
                if (future.isSuccess()) {
                    FtpSimplexSwapManager.this.startSendLoop();
                    FtpSimplexSwapManager.this.eventloop.scheduleAtFixedRate(new Runnable() { // from class: com.xdja.csagent.dataswap.core.swapManager.ftpSimplex.FtpSimplexSwapManager.5.1
                        @Override // java.lang.Runnable
                        public void run() {
                            FtpSimplexSwapManager.this.getSendQueue().add(Heartbeat.REQ);
                        }
                    }, 0L, FtpSimplexSwapManager.Heartbeat_Period_Second, TimeUnit.SECONDS);
                } else {
                    FtpSimplexSwapManager.this.logger.warn("ftp sender connect failure [{}:{}#{}/{}]  , will retry after 5 seconds # {}", new Object[]{FtpSimplexSwapManager.this.getSwapConfig().getDestHost(), Integer.valueOf(FtpSimplexSwapManager.this.getSwapConfig().getDestPort()), FtpSimplexSwapManager.this.getSwapConfig().getDestUsername(), FtpSimplexSwapManager.this.getSwapConfig().getDestPassword(), future.cause()});
                    FtpSimplexSwapManager.this.eventloop.schedule(new OneTimeTask() { // from class: com.xdja.csagent.dataswap.core.swapManager.ftpSimplex.FtpSimplexSwapManager.5.2
                        public void run() {
                            FtpSimplexSwapManager.this.startSender();
                        }
                    }, 5L, TimeUnit.SECONDS);
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void startSendLoop() {
        this.sendLoopExecutor = Executors.newSingleThreadScheduledExecutor();
        this.sendLoopExecutor.scheduleAtFixedRate(new Runnable() { // from class: com.xdja.csagent.dataswap.core.swapManager.ftpSimplex.FtpSimplexSwapManager.6
            @Override // java.lang.Runnable
            public void run() {
                while (FtpSimplexSwapManager.this.collectDataList()) {
                    try {
                        FtpSimplexSwapManager.this.sendToServer(FtpSimplexSwapManager.this.sendBuffer, 0);
                        FtpSimplexSwapManager.this.sendBuffer.clear();
                    } catch (InterruptedException e) {
                        FtpSimplexSwapManager.this.logger.error("send schedule interrupted!");
                        return;
                    }
                }
                if (!FtpSimplexSwapManager.this.sendBuffer.isEmpty()) {
                    FtpSimplexSwapManager.this.sendToServer(FtpSimplexSwapManager.this.sendBuffer, 0);
                    FtpSimplexSwapManager.this.sendBuffer.clear();
                }
            }
        }, 0L, getSwapConfig().getFtpSendPeriodMill(), TimeUnit.MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean collectDataList() {
        do {
            Serializable poll = getSendQueue().poll();
            if (poll == null) {
                return false;
            }
            this.sendBuffer.add(poll);
        } while (this.sendBuffer.size() < 2000);
        return true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void stopAndReset() {
        if (this.sender != null) {
            this.logger.debug("stop ftp sender");
            this.sender.disconnect();
        }
        if (this.receiver != null) {
            this.logger.debug("stop ftp server");
            this.receiver.stop();
        }
        if (this.sendLoopExecutor != null) {
            this.logger.debug("cancel send future");
            this.sendLoopExecutor.shutdownNow();
            this.sendLoopExecutor = null;
        }
        if (this.eventloop != null) {
            this.logger.debug("shutdown event loop");
            this.eventloop.shutdownGracefully().syncUninterruptibly();
            this.eventloop = null;
        }
        this.sender = null;
        this.receiver = null;
        this.logger.debug("stopAndReset over ...");
    }

    @Override // com.xdja.csagent.dataswap.core.SwapManager
    public void addDisconnectedListener(GenericFutureListener genericFutureListener) {
        if (this.disconnectPromise == null) {
            throw new IllegalStateException("not connected!");
        }
        this.disconnectPromise.addListener(genericFutureListener);
    }

    @Override // com.xdja.csagent.dataswap.core.SwapManager
    public void stopSwap() throws Exception {
        this.logger.debug("stop swap start");
        this.state = AbstractSwapManager.State.init;
        if (this.sender != null) {
            this.logger.debug("stop ftp sender");
            this.sender.disconnect();
        }
        if (this.receiver != null) {
            this.logger.debug("stop ftp server");
            this.receiver.stop();
        }
        if (this.sendLoopExecutor != null) {
            this.logger.debug("cancel send future");
            this.sendLoopExecutor.shutdownNow();
            this.sendLoopExecutor = null;
        }
        if (this.eventloop != null) {
            this.logger.debug("shutdown event loop");
            this.eventloop.shutdownGracefully().sync();
            this.eventloop = null;
        }
        if (this.executorGroup != null) {
            this.logger.debug("shutdown main thread group");
            this.executorGroup.shutdownGracefully().sync();
            this.executorGroup = null;
        }
        this.sender = null;
        this.receiver = null;
        this.logger.info("stop swap over");
    }

    @Override // com.xdja.csagent.dataswap.core.SwapManager
    public boolean isSwapConnected() {
        return this.state == AbstractSwapManager.State.connected;
    }

    @Override // com.xdja.csagent.dataswap.core.swapManager.ftpSimplex.ReceiveCallback
    public void onReceive(byte[] bArr) {
        TransferBean transferBean = (TransferBean) TransferUtils.deserialize(bArr, TransferBean.class);
        if (transferBean != null && transferBean.getDataList().size() > 0) {
            for (Serializable serializable : transferBean.getDataList()) {
                if (!(serializable instanceof Heartbeat)) {
                    getReceiveQueue().offer(serializable);
                } else if (((Heartbeat) serializable).getType() == 0) {
                    getSendQueue().add(Heartbeat.RESP);
                } else if (((Heartbeat) serializable).getType() == 1) {
                    this.lastReceiveTime = System.currentTimeMillis();
                    if (this.state == AbstractSwapManager.State.connecting) {
                        this.eventloop.submit(new OneTimeTask() { // from class: com.xdja.csagent.dataswap.core.swapManager.ftpSimplex.FtpSimplexSwapManager.7
                            public void run() {
                                FtpSimplexSwapManager.this.tryMakeConnected();
                            }
                        });
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void tryMakeConnected() {
        if (this.state == AbstractSwapManager.State.connecting) {
            this.state = AbstractSwapManager.State.connected;
            this.logger.info("ftp simplex swap manager connected ! local:{} # remote:{}", localAddress().toString(), remoteAddress().toString());
            if (this.connectTimeoutTaskFuture != null) {
                this.connectTimeoutTaskFuture.cancel(false);
            }
            this.eventloop.scheduleWithFixedDelay(new Runnable() { // from class: com.xdja.csagent.dataswap.core.swapManager.ftpSimplex.FtpSimplexSwapManager.8
                @Override // java.lang.Runnable
                public void run() {
                    if (System.currentTimeMillis() - FtpSimplexSwapManager.this.lastReceiveTime > FtpSimplexSwapManager.Transfer_Timeout_Second * 1000) {
                        FtpSimplexSwapManager.this.logger.error("FtpSimplex connection disconnected , timeout !");
                        FtpSimplexSwapManager.this.state = AbstractSwapManager.State.init;
                        FtpSimplexSwapManager.this.executorGroup.submit(new OneTimeTask() { // from class: com.xdja.csagent.dataswap.core.swapManager.ftpSimplex.FtpSimplexSwapManager.8.1
                            public void run() {
                                FtpSimplexSwapManager.this.stopAndReset();
                                FtpSimplexSwapManager.this.disconnectPromise.setFailure(new TimeoutException());
                            }
                        });
                    }
                }
            }, 0L, Transfer_Check_Period_Second, TimeUnit.SECONDS);
            if (this.connectedPromise != null) {
                this.connectedPromise.setSuccess(getThisSwapManager());
            }
        }
    }

    private SwapManager getThisSwapManager() {
        return this;
    }
}
