package com.xdja.rcs.sc.client.core.consumer;

import com.fasterxml.jackson.core.type.TypeReference;
import com.xdja.platform.cacheableQueue.CacheableQueue;
import com.xdja.platform.cacheableQueue.exception.QueueCacheAccessFailureException;
import com.xdja.platform.cacheableQueue.exception.UninitializedException;
import com.xdja.platform.cacheableQueue.exception.UnknownElementClassException;
import com.xdja.platform.singleton.thread.SingleThreadFactory;
import com.xdja.platform.util.json.JSONException;
import com.xdja.platform.util.json.JSONUtil;
import com.xdja.rcs.sc.client.core.config.ScClientConfigProxy;
import com.xdja.rcs.sc.core.bean.Message;
import com.xdja.rcs.sc.core.bean.MessageResponse;
import com.xdja.rcs.sc.queue.ScMessageQueueFactory;
import com.xdja.rcs.sc.remoting.RemotingServerCallback;
import com.xdja.rcs.sc.remoting.protocol.RemotingData;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xdja/rcs/sc/client/core/consumer/ConsumerRemotingServerCallback.class */
public class ConsumerRemotingServerCallback implements RemotingServerCallback {
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final Map<String, Long> msgOrderCache = new ConcurrentHashMap();

    public RemotingData callWithResponse(RemotingData remotingData) {
        MessageResponse messageResponse = new MessageResponse();
        try {
            Message message = (Message) JSONUtil.toJavaBean(remotingData.getContent(), new TypeReference<Message>() { // from class: com.xdja.rcs.sc.client.core.consumer.ConsumerRemotingServerCallback.1
            });
            if (StringUtils.isBlank(message.getTopicId())) {
                messageResponse.setReason("发送的消息中不包含主题标识");
            } else {
                if (!this.msgOrderCache.containsKey(message.getTopicId()) || message.getOrder() > this.msgOrderCache.get(message.getTopicId()).longValue()) {
                    CacheableQueue queue = ScMessageQueueFactory.getQueue(message.getTopicId(), ScClientConfigProxy.getQueueConfig());
                    try {
                        queue.put(message);
                    } catch (UninitializedException e) {
                        this.logger.warn("主题{}对应的消费队列未初始化，重新进行初始化", message.getTopicId());
                        queue.recover();
                        queue.put(message);
                    }
                    SingleThreadFactory.run(MessageConsumeThread.class, message.getTopicId());
                    this.msgOrderCache.put(message.getTopicId(), Long.valueOf(message.getOrder()));
                    this.logger.debug("收到订阅中心路由的主题为\"{}\"的消息\"{}\",并加入对应的消费队列成功", message.getTopicId(), message.getMsgId());
                } else {
                    this.logger.debug("消息{}已经被接收并处理过", message.getMsgId());
                }
                messageResponse.setSuccess(true);
                messageResponse.setTopicId(message.getTopicId());
                messageResponse.setMsgId(message.getMsgId());
            }
        } catch (JSONException e2) {
            this.logger.error("消息从byte[]转换为Message失败", e2);
            messageResponse.setReason("消息从byte[]转换为Message失败");
        } catch (UninitializedException e3) {
            this.logger.error("消费队列模块未初始化", e3);
            messageResponse.setReason("消费队列模块未初始化");
        } catch (QueueCacheAccessFailureException e4) {
            this.logger.error("消费队列缓存访问异常", e4);
            messageResponse.setReason("消费队列缓存访问异常");
        } catch (UnknownElementClassException e5) {
        }
        try {
            return RemotingData.createResponse(JSONUtil.toJSONBytes(messageResponse));
        } catch (JSONException e6) {
            this.logger.error("响应数据从MessageResponse转换为byte[]失败", e6);
            return null;
        }
    }

    public void callWithoutResponse(RemotingData remotingData) {
    }
}
