package com.xdja.rcs.sc.server.route;

import com.fasterxml.jackson.core.type.TypeReference;
import com.xdja.platform.cacheableQueue.CacheableQueue;
import com.xdja.platform.cacheableQueue.exception.QueueCacheAccessFailureException;
import com.xdja.platform.cacheableQueue.exception.UnknownElementClassException;
import com.xdja.platform.util.json.JSONException;
import com.xdja.platform.util.json.JSONUtil;
import com.xdja.rcs.sc.alarm.AlarmHelper;
import com.xdja.rcs.sc.core.bean.Message;
import com.xdja.rcs.sc.core.bean.MessageResponse;
import com.xdja.rcs.sc.queue.ScMessageQueueFactory;
import com.xdja.rcs.sc.remoting.api.RemotingAPI;
import com.xdja.rcs.sc.remoting.protocol.RemotingData;
import com.xdja.rcs.sc.server.config.ScServerConfigProxy;
import com.xdja.rcs.sc.server.config.node.ConsumerNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xdja/rcs/sc/server/route/MessageRouteThread.class */
public final class MessageRouteThread implements Runnable {
    private CacheableQueue<Message> queue;
    private Logger logger = LoggerFactory.getLogger(getClass());
    private final int retryInterval = 5000;
    private final int retryCount = 24;
    protected int retryTimes = 0;
    private String tag = null;

    public void setTag(String str) {
        this.tag = str;
        try {
            this.queue = ScMessageQueueFactory.getQueue(str, ScServerConfigProxy.getQueueConfig());
        } catch (UnknownElementClassException e) {
        } catch (QueueCacheAccessFailureException e2) {
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        Message message = null;
        while (true) {
            try {
                message = (Message) this.queue.take();
                if (null != message) {
                    ConsumerNode consumer = ScServerConfigProxy.getConsumer(this.tag);
                    RemotingData sendData = RemotingAPI.sendData(consumer.getAddr(), consumer.getPort(), JSONUtil.toJSONBytes(message));
                    if (null != sendData) {
                        MessageResponse messageResponse = (MessageResponse) JSONUtil.toJavaBean(sendData.getContent(), new TypeReference<MessageResponse>() { // from class: com.xdja.rcs.sc.server.route.MessageRouteThread.1
                        });
                        if (messageResponse.isSuccess()) {
                            this.retryTimes = 0;
                        } else {
                            this.logger.error("发送消息到消费者\"" + this.tag + "\"失败，返回的结果为：" + messageResponse.getReason());
                            retry(message);
                        }
                    }
                } else {
                    this.retryTimes = 0;
                }
            } catch (JSONException e) {
                this.logger.error("发送消息到消费者\"" + this.tag + "\"失败", e);
            } catch (Throwable th) {
                if (null != message) {
                    this.logger.error("发送消息到消费者\"" + this.tag + "\"失败：尝试重新发送", th);
                    retry(message);
                }
            }
        }
    }

    private void retry(Message message) {
        try {
            this.queue.put4FirstTaking(message);
            int i = this.retryTimes;
            this.retryTimes = i + 1;
            if (i >= 24) {
                AlarmHelper.sendAlarmEmail("告警内容：订阅中心发送消息到消费者\"" + this.tag + "\"失败");
                this.retryTimes = 0;
            }
        } catch (Throwable th) {
            this.logger.error("主题为\"" + message.getTopicId() + "\"的消息\"" + message.getMsgId() + "\"重新放回路由队列失败", th);
        }
        try {
            getClass();
            Thread.sleep(5000L);
        } catch (Throwable th2) {
            this.logger.warn("路由消息重试时线程休眠失败");
        }
    }
}
