package com.xdja.rcs.sc.client.core.producer;

import com.fasterxml.jackson.core.type.TypeReference;
import com.xdja.platform.cacheableQueue.CacheableQueue;
import com.xdja.platform.cacheableQueue.exception.QueueCacheAccessFailureException;
import com.xdja.platform.cacheableQueue.exception.UninitializedException;
import com.xdja.platform.cacheableQueue.exception.UnknownElementClassException;
import com.xdja.platform.util.json.JSONException;
import com.xdja.platform.util.json.JSONUtil;
import com.xdja.rcs.sc.client.core.config.ScClientConfigProxy;
import com.xdja.rcs.sc.client.core.config.node.ScServerNode;
import com.xdja.rcs.sc.core.bean.Message;
import com.xdja.rcs.sc.core.bean.MessageResponse;
import com.xdja.rcs.sc.core.exception.UnrecoverableException;
import com.xdja.rcs.sc.queue.ScMessageQueueFactory;
import com.xdja.rcs.sc.remoting.api.RemotingAPI;
import com.xdja.rcs.sc.remoting.exception.RemotingConnectException;
import com.xdja.rcs.sc.remoting.exception.RemotingSendRequestException;
import com.xdja.rcs.sc.remoting.exception.RemotingTimeoutException;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xdja/rcs/sc/client/core/producer/Producer.class */
public class Producer {
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final Map<String, ScServerNode> scServers;

    public Producer(Map<String, ScServerNode> map) throws UnrecoverableException {
        this.scServers = map;
        initQueueThreadPool(map.values());
    }

    public String calculateMsgId(String str) throws UnrecoverableException {
        if (this.scServers.containsKey(str)) {
            return this.scServers.get(str).getAppId() + "_" + UUID.randomUUID().toString().replaceAll("-", "");
        }
        throw UnrecoverableException.create("获取消息标识失败：订阅中心服务标识\"" + str + "\"对应的信息不存在");
    }

    public void sendMessage(String str, Message message) throws UnrecoverableException, RemotingSendRequestException, IllegalArgumentException {
        ScServerNode scServerNode = this.scServers.get(str);
        if (scServerNode == null) {
            throw UnrecoverableException.create("消息发送失败：未找到\"" + str + "\"对应的订阅中心服务信息");
        }
        if (StringUtils.isBlank(scServerNode.getAddr())) {
            throw new IllegalArgumentException("消息发送失败：\"" + str + "\"对应的订阅中心服务信息中缺少addr");
        }
        if (StringUtils.isBlank(scServerNode.getAppId())) {
            throw new IllegalArgumentException("消息发送失败：\"" + str + "\"对应的订阅中心服务信息中缺少appId(生产者标识)");
        }
        if (StringUtils.isBlank(message.getMsgId())) {
            message.setMsgId(calculateMsgId(str));
        }
        try {
            message.setProducer(scServerNode.getAppId());
            MessageResponse messageResponse = (MessageResponse) JSONUtil.toJavaBean(RemotingAPI.sendData(scServerNode.getAddr(), scServerNode.getPort(), JSONUtil.toJSONBytes(message)).getContent(), new TypeReference<MessageResponse>() { // from class: com.xdja.rcs.sc.client.core.producer.Producer.1
            });
            if (!messageResponse.isSuccess()) {
                throw new RemotingSendRequestException("消息发送失败：" + messageResponse.getReason(), messageResponse.isRecoverable());
            }
            this.logger.debug("主题为{}的消息{}发送成功", message.getTopicId(), message.getMsgId());
        } catch (InterruptedException e) {
            throw new RemotingSendRequestException("消息发送失败：线程中断异常", e, true);
        } catch (JSONException e2) {
            throw new RemotingSendRequestException("消息发送失败：json处理失败", e2);
        } catch (RemotingTimeoutException e3) {
            throw new RemotingSendRequestException("消息发送失败：等待服务端返回响应数据超时", e3, true);
        } catch (RemotingConnectException e4) {
            throw new RemotingSendRequestException("消息发送失败：连接服务端失败", e4, true);
        }
    }

    public void sendMessageAsync(String str, Message message) throws UnrecoverableException {
        try {
            ScMessageQueueFactory.getQueue(str, ScClientConfigProxy.getQueueConfig()).put(message);
        } catch (UnknownElementClassException e) {
        } catch (JSONException e2) {
            throw UnrecoverableException.create("把消息转换为json字符串失败", e2);
        } catch (UninitializedException e3) {
            throw UnrecoverableException.create("队列模块未初始化。", e3);
        } catch (QueueCacheAccessFailureException e4) {
            throw UnrecoverableException.create("队列缓存访问异常", e4);
        }
    }

    private void initQueueThreadPool(Collection<ScServerNode> collection) throws UnrecoverableException {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(collection.size());
        for (ScServerNode scServerNode : collection) {
            try {
                CacheableQueue queue = ScMessageQueueFactory.getQueue(scServerNode.getId(), ScClientConfigProxy.getQueueConfig());
                queue.recover();
                this.logger.debug("开始启动异步发送消息线程...");
                newFixedThreadPool.execute(new SendMessageAsync(queue, scServerNode));
            } catch (JSONException e) {
                throw UnrecoverableException.create("初始消息队列失败：恢复/创建消息\"" + scServerNode.getId() + "\"的队列出现异常", e);
            } catch (QueueCacheAccessFailureException e2) {
                throw UnrecoverableException.create("初始消息队列失败：队列缓存访问异常", e2);
            } catch (UnknownElementClassException e3) {
                throw UnrecoverableException.create("初始消息队列失败：队列中元素的所属对象与队列标识所对应的队列初次创建时不一致", e3);
            }
        }
    }
}
