package org.pentaho.di.sdk.myplugins.jobentries.ftpplus;

import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.pentaho.di.core.Result;
import org.pentaho.di.core.annotations.JobEntry;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.logging.LogChannelInterface;
import org.pentaho.di.sdk.myplugins.jobentries.commons.AbstractFileDownloadCommonJobEntry;
import org.pentaho.di.sdk.myplugins.jobentries.commons.Constant;
import org.pentaho.di.sdk.myplugins.jobentries.commons.ProgressUtil;
import org.pentaho.di.sdk.myplugins.jobentries.commons.StrUtil;
import org.pentaho.di.sdk.myplugins.jobentries.ftpcommon.FileInfo;
import org.pentaho.di.sdk.myplugins.jobentries.ftpcommon.FtpClientUtil;
import org.pentaho.di.sdk.myplugins.jobentries.ftpcommon.Progress;
import org.pentaho.di.sdk.myplugins.jobentries.ftpcommon.core.FtpClientFactory;
import org.pentaho.di.sdk.myplugins.jobentries.statistics.PhaseEnum;

@JobEntry(id = "JobEntryFtpPlus", name = "JobEntryFtpPlus.Name", description = "JobEntryFtpPlus.TooltipDesc", categoryDescription = "i18n:org.pentaho.di.job:JobCategory.Category.FileTransfer", i18nPackageName = "org.pentaho.di.sdk.myplugins.jobentries.ftpplus", documentationUrl = "JobEntryFtpPlus.DocumentationURL", casesUrl = "JobEntryFtpPlus.CasesURL", forumUrl = "JobEntryFtpPlus.ForumURL")
/* loaded from: input_file:org/pentaho/di/sdk/myplugins/jobentries/ftpplus/JobEntryFtpPlus.class */
public class JobEntryFtpPlus extends AbstractFileDownloadCommonJobEntry {
    private JobEntryFtpPlusParamsDO jobEntryFtpPlusParamsDO;
    private static Class<?> PKG = JobEntryFtpPlus.class;

    public Result execute(Result result, int i) {
        logBasic("开始");
        long nanoTime = System.nanoTime();
        this.result = result;
        this.result.setResult(false);
        this.jobEntryFtpPlusParamsDO = (JobEntryFtpPlusParamsDO) JSON.parseObject(this.configInfo, JobEntryFtpPlusParamsDO.class);
        if (!paramsCheck()) {
            logBasic("结束");
            this.result.setNrErrors(1L);
            return this.result;
        }
        if (!initKeywordFilter()) {
            logBasic("结束");
            this.result.setNrErrors(1L);
            return this.result;
        }
        FTPClient connectFtpServer = FtpClientUtil.connectFtpServer(this.log, this.jobEntryFtpPlusParamsDO);
        if (connectFtpServer == null) {
            logError("FTP连接失败");
            checkFail(this.log, this.jobEntryFtpPlusParamsDO.getFileInfoList());
            this.result.setNrErrors(1L);
            return this.result;
        }
        try {
            if (connectFtpServer.getReplyCode() == 530) {
                logError("请检查FTP账号密码是否正确");
                checkFail(this.log, this.jobEntryFtpPlusParamsDO.getFileInfoList());
                this.result.setNrErrors(1L);
                return this.result;
            }
            try {
                makeLocalDirByTargetDir();
                Progress progress = new Progress();
                if (CollectionUtils.isNotEmpty(this.jobEntryFtpPlusParamsDO.getFileInfoList())) {
                    syncLocalDirByFtpFileList(connectFtpServer, this.jobEntryFtpPlusParamsDO, this.jobEntryFtpPlusParamsDO.getFileInfoList(), progress);
                }
                this.result.setNrErrors(0L);
                this.result.setResult(true);
                FtpClientUtil.closeFTPConnect(connectFtpServer);
            } catch (Exception e) {
                logError("下载文件出现异常，原因:" + e.getMessage());
                this.result.setNrErrors(1L);
                this.result.setResult(false);
                FtpClientUtil.closeFTPConnect(connectFtpServer);
            }
            this.downloadResult.setStrategyId(this.jobEntryFtpPlusParamsDO.getStrategyId());
            this.downloadResult.setJobId(this.jobEntryFtpPlusParamsDO.getJobId());
            logDebug("downloadResult: " + this.downloadResult.toString());
            Map extensionDataMap = this.parentJob.getExtensionDataMap();
            extensionDataMap.put(Constant.FILE_DOWNLOAD_RESULT_KEY, this.downloadResult);
            extensionDataMap.put(Constant.FILE_DOWNLOAD_KEYWORD_ABS_DIRECTORY, this.jobEntryFtpPlusParamsDO.getKeywordAbsDirectory());
            extensionDataMap.put(Constant.FILE_DOWNLOAD_KEYSWITCH, this.jobEntryFtpPlusParamsDO.getKeySwitch());
            writeDownloadResultToFile();
            unloadKeywordFilterModule();
            double nanoTime2 = (System.nanoTime() - nanoTime) / 1.0E9d;
            logDebug("Job.execute elapsedTime(s): sec " + nanoTime2);
            logDebug("Job.execute 并发线程数: " + this.threadNumber);
            logDebug("Job.execute 成功下载总大小: " + StrUtil.stringify(this.bytesFilesRetrieved.longValue()));
            logDebug("Job.execute 平均每秒下载大小: " + StrUtil.stringify((long) (this.bytesFilesRetrieved.longValue() / nanoTime2)) + "/s");
            logDebug("Job.execute 成功下载总文件数: " + this.nrFilesRetrieved);
            logDebug("Job.execute 平均每秒下载文件数: " + (this.nrFilesRetrieved.longValue() / nanoTime2));
            logBasic("结束");
            return this.result;
        } catch (Throwable th) {
            FtpClientUtil.closeFTPConnect(connectFtpServer);
            throw th;
        }
    }

    private boolean paramsCheck() {
        if (this.jobEntryFtpPlusParamsDO != null) {
            return commonParamsCheck().booleanValue();
        }
        this.log.logError("参数为空");
        return false;
    }

    private void checkFail(LogChannelInterface logChannelInterface, List<FileInfo> list) {
        StringBuffer stringBuffer = new StringBuffer("totalCount: 0, successCount: 0, totalBytes: 0, successBytes: 0");
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(list.size());
        list.forEach(fileInfo -> {
            if (fileInfo.getType() == 0) {
                newArrayListWithCapacity.add(new Progress.FileItem(fileInfo.getName(), "upload file failed", fileInfo.getRelativePath(), fileInfo.getSize(), fileInfo.getTime(), 1, fileInfo.getIncrementType()));
            }
        });
        stringBuffer.append(", failList: ").append(JSON.toJSONString(newArrayListWithCapacity));
        logBasic(stringBuffer.toString());
        logBasic("结束");
    }

    private Boolean downloadFileFromFtp(FTPClient fTPClient, JobEntryFtpPlusParamsDO jobEntryFtpPlusParamsDO, String str, FileInfo fileInfo, Progress progress) {
        if (fileInfo.getStep() == 2) {
            progress.setSuccessCount(progress.getSuccessCount() + 1);
            progress.setSuccessBytes(progress.getSuccessBytes() + fileInfo.getSize());
            addFilenameToResultFilenames(fileInfo);
            return true;
        }
        if (!fTPClient.isConnected() || !fTPClient.isAvailable()) {
            logError("FTP链接不可用");
            return false;
        }
        String relativePath = fileInfo.getRelativePath();
        if (StringUtils.isBlank(str) || StringUtils.isBlank(relativePath)) {
            logError("本地目录或FTP目录不存在");
            return false;
        }
        String sourceDirectory = jobEntryFtpPlusParamsDO.getSourceDirectory();
        logDebug("ftpFileDir: " + sourceDirectory);
        FileOutputStream fileOutputStream = null;
        InputStream inputStream = null;
        try {
            try {
                String concat = sourceDirectory.concat(relativePath);
                FTPFile fTPFile = null;
                if (concat.contains(Constant.BLANK)) {
                    String substring = concat.substring(0, concat.lastIndexOf("/"));
                    String substring2 = concat.substring(concat.lastIndexOf("/") + 1);
                    fTPClient.changeWorkingDirectory(substring);
                    FTPFile[] listFiles = fTPClient.listFiles();
                    int length = listFiles.length;
                    int i = 0;
                    while (true) {
                        if (i >= length) {
                            break;
                        }
                        FTPFile fTPFile2 = listFiles[i];
                        if (fTPFile2.getName().equals(substring2)) {
                            fTPFile = fTPFile2;
                            break;
                        }
                        i++;
                    }
                } else {
                    FTPFile[] listFiles2 = fTPClient.listFiles(concat);
                    if (listFiles2 != null && listFiles2.length >= 1) {
                        fTPFile = listFiles2[0];
                    }
                }
                if (fTPFile != null && fTPFile.isFile()) {
                    File file = new File(str, relativePath);
                    if (!file.getParentFile().exists()) {
                        file.getParentFile().mkdirs();
                    }
                    String str2 = sourceDirectory;
                    if (relativePath.indexOf("/") >= 0) {
                        str2 = concat.substring(0, concat.lastIndexOf("/"));
                    }
                    fTPClient.changeWorkingDirectory(str2);
                    fileOutputStream = new FileOutputStream(file);
                    inputStream = fTPClient.retrieveFileStream(fTPFile.getName());
                    monitorStreamDownload(fileInfo, progress, inputStream, fileOutputStream);
                    logBasic(Constant.DOWNLOAD_MONITOR + JSON.toJSONString(progress.extractDownladMonitorInfo()));
                    if (!fTPClient.completePendingCommand()) {
                        IOUtils.closeQuietly(inputStream);
                        IOUtils.closeQuietly(fileOutputStream);
                        IOUtils.closeQuietly(inputStream);
                        return false;
                    }
                }
                IOUtils.closeQuietly(fileOutputStream);
                IOUtils.closeQuietly(inputStream);
                return true;
            } catch (IOException e) {
                logError("文件下载发生异常，路径：" + relativePath + "，原因：" + e);
                progress.setFailCount(progress.getFailCount() + 1);
                progress.getFailList().add(new Progress.FileItem(relativePath, "download fail happened exception, detail:" + e.getMessage(), fileInfo.getRelativePath(), fileInfo.getSize(), fileInfo.getTime(), 1, 1));
                IOUtils.closeQuietly(fileOutputStream);
                IOUtils.closeQuietly(inputStream);
                return false;
            }
        } catch (Throwable th) {
            IOUtils.closeQuietly(fileOutputStream);
            IOUtils.closeQuietly(inputStream);
            throw th;
        }
    }

    private Boolean downloadFileFromFtpMultiThread(GenericObjectPool<FTPClient> genericObjectPool, String str, FileInfo fileInfo) {
        FTPClient fTPClient = null;
        try {
            fTPClient = (FTPClient) genericObjectPool.borrowObject();
        } catch (Exception e) {
            e.printStackTrace();
        }
        if (fileInfo.getStep() == 2) {
            addBytesFilesRetrieved(fileInfo.getSize());
            return true;
        }
        if (!fTPClient.isConnected() || !fTPClient.isAvailable()) {
            logError("FTP链接不可用");
            return false;
        }
        String relativePath = fileInfo.getRelativePath();
        if (StringUtils.isBlank(str) || StringUtils.isBlank(relativePath)) {
            logError("本地目录或FTP目录不存在");
            return false;
        }
        String sourceDirectory = this.jobEntryDownloadParams.getSourceDirectory();
        logDebug("ftpFileDir: " + sourceDirectory);
        FileOutputStream fileOutputStream = null;
        InputStream inputStream = null;
        try {
            try {
                String concat = sourceDirectory.concat(relativePath);
                FTPFile fTPFile = null;
                if (concat.contains(Constant.BLANK)) {
                    String substring = concat.substring(0, concat.lastIndexOf("/"));
                    String substring2 = concat.substring(concat.lastIndexOf("/") + 1);
                    fTPClient.changeWorkingDirectory(substring);
                    FTPFile[] listFiles = fTPClient.listFiles();
                    int length = listFiles.length;
                    int i = 0;
                    while (true) {
                        if (i >= length) {
                            break;
                        }
                        FTPFile fTPFile2 = listFiles[i];
                        if (fTPFile2.getName().equals(substring2)) {
                            fTPFile = fTPFile2;
                            break;
                        }
                        i++;
                    }
                } else {
                    FTPFile[] listFiles2 = fTPClient.listFiles(concat);
                    if (listFiles2 != null && listFiles2.length >= 1) {
                        fTPFile = listFiles2[0];
                    }
                }
                if (fTPFile != null && fTPFile.isFile()) {
                    File file = new File(str, relativePath);
                    if (!file.getParentFile().exists()) {
                        file.getParentFile().mkdirs();
                    }
                    String str2 = sourceDirectory;
                    if (relativePath.indexOf("/") >= 0) {
                        str2 = concat.substring(0, concat.lastIndexOf("/"));
                    }
                    fTPClient.changeWorkingDirectory(str2);
                    fileOutputStream = new FileOutputStream(file);
                    inputStream = fTPClient.retrieveFileStream(fTPFile.getName());
                    monitorStreamDownload(fileInfo, inputStream, fileOutputStream);
                    if (!fTPClient.completePendingCommand()) {
                        IOUtils.closeQuietly(inputStream);
                        IOUtils.closeQuietly(fileOutputStream);
                        IOUtils.closeQuietly(inputStream);
                        genericObjectPool.returnObject(fTPClient);
                        return false;
                    }
                }
                IOUtils.closeQuietly(fileOutputStream);
                IOUtils.closeQuietly(inputStream);
                genericObjectPool.returnObject(fTPClient);
                return true;
            } catch (IOException e2) {
                logError("文件下载发生异常，路径：" + relativePath + "，原因：" + e2);
                IOUtils.closeQuietly(fileOutputStream);
                IOUtils.closeQuietly(inputStream);
                genericObjectPool.returnObject(fTPClient);
                return false;
            }
        } catch (Throwable th) {
            IOUtils.closeQuietly(fileOutputStream);
            IOUtils.closeQuietly(inputStream);
            genericObjectPool.returnObject(fTPClient);
            throw th;
        }
    }

    public void syncLocalDirByFtpFileList(FTPClient fTPClient, JobEntryFtpPlusParamsDO jobEntryFtpPlusParamsDO, List<FileInfo> list, Progress progress) throws KettleException {
        if (!fTPClient.isConnected() || !fTPClient.isAvailable()) {
            logError("FTP链接不可用");
            return;
        }
        String targetDirectory = jobEntryFtpPlusParamsDO.getTargetDirectory();
        logDebug("ftp server directory file count: " + list.size() + ", file: " + JSON.toJSONString(list));
        List<FileInfo> fileInfoList = jobEntryFtpPlusParamsDO.getFileInfoList();
        ProgressUtil.getProgressByFileList(fileInfoList, progress);
        mkdirAndDeleteHandle(progress, targetDirectory, fileInfoList);
        if (this.jobEntryDownloadParams.getMultiThreadDownload() == null || !this.jobEntryDownloadParams.getMultiThreadDownload().booleanValue()) {
            for (FileInfo fileInfo : list) {
                if (checkFileInfoForDownload(fileInfo)) {
                    logDebug("ready to download, file: " + fileInfo.getRelativePath());
                    Boolean downloadFileFromFtp = downloadFileFromFtp(fTPClient, jobEntryFtpPlusParamsDO, targetDirectory, fileInfo, progress);
                    if (downloadFileFromFtp.booleanValue()) {
                        updateRetrievedFiles();
                    }
                    afterDownloadHandle(progress, fileInfo, downloadFileFromFtp);
                }
            }
        } else {
            multiThreadHandle(fileInfoList, progress, targetDirectory, fTPClient);
        }
        StringBuffer stringBuffer = new StringBuffer(String.format("totalCount: %d, successCount: %d, totalBytes: %d, successBytes: %d", Integer.valueOf(progress.getTotalCount()), Integer.valueOf(progress.getSuccessCount()), Long.valueOf(progress.getTotalBytes()), Long.valueOf(progress.getSuccessBytes())));
        if (!progress.getFailList().isEmpty()) {
            stringBuffer.append(", failList: ").append(JSON.toJSONString(progress.getFailList()));
        }
        if (!progress.getFilterList().isEmpty()) {
            stringBuffer.append(", filterList: ").append(JSON.toJSONString(progress.getFilterList()));
        }
        logBasic(stringBuffer.toString());
    }

    private void multiThreadHandle(List<FileInfo> list, Progress progress, String str, FTPClient fTPClient) {
        int availableProcessors = Runtime.getRuntime().availableProcessors() << 1;
        this.threadNumber = Integer.valueOf(availableProcessors);
        GenericObjectPool genericObjectPool = new GenericObjectPool(new FtpClientFactory(this.jobEntryFtpPlusParamsDO));
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(availableProcessors);
        int size = list.size();
        long nanoTime = System.nanoTime();
        CompletableFuture<Boolean>[] completableFutureArr = new CompletableFuture[size];
        for (int i = 0; i < list.size(); i++) {
            final FileInfo fileInfo = list.get(i);
            if (checkFileInfoForDownload(fileInfo)) {
                completableFutureArr[i] = CompletableFuture.supplyAsync(() -> {
                    fileInfo.setCurrentPhase(PhaseEnum.DOWNLOAD);
                    Boolean downloadFileFromFtpMultiThread = downloadFileFromFtpMultiThread(genericObjectPool, str, fileInfo);
                    if (downloadFileFromFtpMultiThread.booleanValue()) {
                        fileInfo.getPhaseResultMap().put((EnumMap<PhaseEnum, Boolean>) PhaseEnum.DOWNLOAD, (PhaseEnum) true);
                        updateRetrievedFiles();
                    } else {
                        fileInfo.getPhaseResultMap().put((EnumMap<PhaseEnum, Boolean>) PhaseEnum.DOWNLOAD, (PhaseEnum) false);
                    }
                    return downloadFileFromFtpMultiThread;
                }, newFixedThreadPool).thenApply((Function) new Function<Boolean, Boolean>() { // from class: org.pentaho.di.sdk.myplugins.jobentries.ftpplus.JobEntryFtpPlus.3
                    @Override // java.util.function.Function
                    public Boolean apply(Boolean bool) {
                        if (!bool.booleanValue()) {
                            return false;
                        }
                        fileInfo.setCurrentPhase(PhaseEnum.FILE_TYPE_CHECK);
                        Boolean fileTypeCheck = JobEntryFtpPlus.this.fileTypeCheck(fileInfo);
                        if (fileTypeCheck.booleanValue()) {
                            fileInfo.getPhaseResultMap().put((EnumMap<PhaseEnum, Boolean>) PhaseEnum.FILE_TYPE_CHECK, (PhaseEnum) true);
                        } else {
                            fileInfo.getPhaseResultMap().put((EnumMap<PhaseEnum, Boolean>) PhaseEnum.FILE_TYPE_CHECK, (PhaseEnum) false);
                        }
                        return fileTypeCheck;
                    }
                }).thenApply((Function) new Function<Boolean, Boolean>() { // from class: org.pentaho.di.sdk.myplugins.jobentries.ftpplus.JobEntryFtpPlus.2
                    @Override // java.util.function.Function
                    public Boolean apply(Boolean bool) {
                        if (!bool.booleanValue()) {
                            return false;
                        }
                        fileInfo.setCurrentPhase(PhaseEnum.VIRUS_DETECTION);
                        Boolean valueOf = Boolean.valueOf(JobEntryFtpPlus.this.virusDetect(fileInfo));
                        if (valueOf.booleanValue()) {
                            fileInfo.getPhaseResultMap().put((EnumMap<PhaseEnum, Boolean>) PhaseEnum.VIRUS_DETECTION, (PhaseEnum) true);
                        } else {
                            fileInfo.getPhaseResultMap().put((EnumMap<PhaseEnum, Boolean>) PhaseEnum.VIRUS_DETECTION, (PhaseEnum) false);
                        }
                        return valueOf;
                    }
                }).thenApply((Function) new Function<Boolean, Boolean>() { // from class: org.pentaho.di.sdk.myplugins.jobentries.ftpplus.JobEntryFtpPlus.1
                    @Override // java.util.function.Function
                    public Boolean apply(Boolean bool) {
                        if (!bool.booleanValue()) {
                            return false;
                        }
                        fileInfo.setCurrentPhase(PhaseEnum.KEYWORD_FILTER);
                        Boolean valueOf = Boolean.valueOf(JobEntryFtpPlus.this.keywordFilter(fileInfo));
                        if (valueOf.booleanValue()) {
                            fileInfo.getPhaseResultMap().put((EnumMap<PhaseEnum, Boolean>) PhaseEnum.KEYWORD_FILTER, (PhaseEnum) true);
                        } else {
                            fileInfo.getPhaseResultMap().put((EnumMap<PhaseEnum, Boolean>) PhaseEnum.KEYWORD_FILTER, (PhaseEnum) false);
                        }
                        return valueOf;
                    }
                });
                completableFutureArr[i].whenComplete((bool, th) -> {
                    logDebug(String.format("文件: %s 处理完成, result: %s", fileInfo.getName(), bool));
                });
            }
        }
        completableFutureMonitor(nanoTime, completableFutureArr);
        resultCombine(list, progress);
        newFixedThreadPool.shutdown();
        genericObjectPool.close();
    }

    public JobEntryFtpPlusParamsDO getJobEntryFtpPlusParamsDO() {
        return this.jobEntryFtpPlusParamsDO;
    }

    public void setJobEntryFtpPlusParamsDO(JobEntryFtpPlusParamsDO jobEntryFtpPlusParamsDO) {
        this.jobEntryFtpPlusParamsDO = jobEntryFtpPlusParamsDO;
    }

    @Override // org.pentaho.di.sdk.myplugins.jobentries.commons.AbstractFileDownloadCommonJobEntry
    public String toString() {
        return "JobEntryFtpPlus(jobEntryFtpPlusParamsDO=" + getJobEntryFtpPlusParamsDO() + ")";
    }
}
