package com.xdja.drs.workflow.business;

import com.xdja.drs.dao.DaoService;
import com.xdja.drs.model.OutsideTable;
import com.xdja.drs.util.KafkaUtil;
import com.xdja.drs.util.ServiceException;
import com.xdja.drs.workflow.WorkFlow;
import com.xdja.drs.workflow.WorkSheet;
import com.xdja.drs.workflow.tools.OrganizeSql;
import java.io.InputStream;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;

/* loaded from: input_file:com/xdja/drs/workflow/business/KafkaCommon.class */
public class KafkaCommon implements WorkFlow {
    private static final Logger logger = LoggerFactory.getLogger(KafkaCommon.class);
    private static Producer<String, String> producer;

    @Override // com.xdja.drs.workflow.WorkFlow
    public void process(WorkSheet workSheet) throws ServiceException {
        logger.debug("↓↓↓↓↓↓↓↓↓↓ start KafkaCommon ↓↓↓↓↓↓↓↓↓↓");
        Long valueOf = Long.valueOf(System.currentTimeMillis());
        new OrganizeSql().process(workSheet);
        OutsideTable currOutTable = workSheet.getCurrOutTable();
        String url = DaoService.getDataSourceDao().getDS(currOutTable.getOutdsId()).getUrl();
        if (StringUtils.isEmpty(url)) {
            throw new ServiceException("kafka地址不能为空!");
        }
        producer = initProducer(url);
        String owner = currOutTable.getOwner();
        if (StringUtils.isEmpty(owner)) {
            throw new ServiceException("topic配置不能为空!");
        }
        String[] split = workSheet.getQueryParameters().getCondition().split("msgData\\s*=\\s*'");
        if (split.length != 2) {
            throw new ServiceException("传输condition 格式异常!");
        }
        String substring = split[1].substring(0, split[1].length() - 1);
        logger.debug("西藏DRS kafka msgData :" + substring);
        try {
            KafkaUtil kafkaUtil = new KafkaUtil(producer);
            kafkaUtil.sendMsgToKafka(owner, substring);
            kafkaUtil.closeKafkaProducer();
            logger.debug("↑↑↑↑↑↑↑↑↑↑ end KafkaCommon ↑↑↑↑↑↑↑↑↑↑共耗时:" + Long.valueOf(System.currentTimeMillis() - valueOf.longValue()) + "ms");
        } catch (Exception e) {
            throw new ServiceException("kafka消息发送失败:" + e.getMessage());
        }
    }

    private static Producer initProducer(String str) throws ServiceException {
        logger.debug("kafka.properties文件");
        InputStream inputStream = null;
        try {
            try {
                inputStream = Thread.currentThread().getContextClassLoader().getResourceAsStream("/kafkaXz.properties");
                Properties properties = new Properties();
                properties.load(inputStream);
                properties.setProperty("bootstrap.servers", str);
                producer = new KafkaProducer(properties);
                Producer<String, String> producer2 = producer;
                if (inputStream != null) {
                    try {
                        inputStream.close();
                    } catch (Exception e) {
                        logger.error(e.getMessage());
                    }
                }
                return producer2;
            } catch (Exception e2) {
                logger.error("启动加载kafka.properties文件出错", e2);
                throw new ServiceException("kafka 启动加载失败:" + e2.getMessage());
            }
        } catch (Throwable th) {
            if (inputStream != null) {
                try {
                    inputStream.close();
                } catch (Exception e3) {
                    logger.error(e3.getMessage());
                    throw th;
                }
            }
            throw th;
        }
    }
}
