package com.xdja.rcs.sc.server.process.dispatcher;

import com.xdja.platform.cacheableQueue.exception.QueueCacheAccessFailureException;
import com.xdja.platform.cacheableQueue.exception.UninitializedException;
import com.xdja.platform.cacheableQueue.exception.UnknownElementClassException;
import com.xdja.platform.singleton.thread.SingleThreadFactory;
import com.xdja.platform.util.json.JSONException;
import com.xdja.rcs.sc.core.bean.Message;
import com.xdja.rcs.sc.queue.ScMessageQueueFactory;
import com.xdja.rcs.sc.server.config.ScServerConfigProxy;
import com.xdja.rcs.sc.server.config.node.ConsumerNode;
import com.xdja.rcs.sc.server.exception.MessageProcessException;
import com.xdja.rcs.sc.server.process.MessageDispatcher;
import com.xdja.rcs.sc.server.route.MessageRouteThread;
import org.apache.commons.lang3.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xdja/rcs/sc/server/process/dispatcher/MessageDispatcherImpl.class */
public class MessageDispatcherImpl implements MessageDispatcher {
    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override // com.xdja.rcs.sc.server.process.MessageDispatcher
    public void dispatch(Message message) throws MessageProcessException {
        String[] strArr = null;
        for (ConsumerNode consumerNode : ScServerConfigProxy.getTopic(message.getTopicId()).getConsumers()) {
            if (ArrayUtils.isNotEmpty(consumerNode.getTags()) && ArrayUtils.isNotEmpty(message.getTags())) {
                strArr = (String[]) ArrayUtils.removeElements(message.getTags(), ArrayUtils.removeElements(message.getTags(), consumerNode.getTags()));
            }
            if (null == strArr || strArr.length > 0) {
                Message clone = message.clone();
                if (null != strArr) {
                    clone.setTags(strArr);
                }
                clone.setOrder(System.currentTimeMillis());
                try {
                    ScMessageQueueFactory.getQueue(consumerNode.getAppId(), ScServerConfigProxy.getQueueConfig()).put(clone);
                } catch (UnknownElementClassException e) {
                } catch (JSONException e2) {
                    throw new MessageProcessException("把消息转换为json字符串失败", e2);
                } catch (UninitializedException e3) {
                    throw new MessageProcessException("消费者\"" + consumerNode.getAppId() + "\"的消息队列模块未初始化", e3);
                } catch (QueueCacheAccessFailureException e4) {
                    throw new MessageProcessException("访问队列缓存出现异常", e4).setRecoverable(true);
                }
                SingleThreadFactory.run(MessageRouteThread.class, consumerNode.getAppId());
                strArr = null;
            } else {
                strArr = null;
            }
        }
        this.logger.debug("收到生产者应用\"{}\"发送的主题为\"{}\"的消息\"{}\",并加入对应的路由队列成功", new Object[]{message.getProducer(), message.getTopicId(), message.getMsgId()});
    }

    @Override // com.xdja.rcs.sc.server.process.MessageDispatcher
    public int getOrder() {
        return 0;
    }
}
