package com.xdja.rcs.sc.client.core.producer;

import com.fasterxml.jackson.core.type.TypeReference;
import com.xdja.platform.cacheableQueue.CacheableQueue;
import com.xdja.platform.util.json.JSONException;
import com.xdja.platform.util.json.JSONUtil;
import com.xdja.rcs.sc.alarm.AlarmHelper;
import com.xdja.rcs.sc.client.api.ScClientAPI;
import com.xdja.rcs.sc.client.core.config.node.ScServerNode;
import com.xdja.rcs.sc.core.bean.Message;
import com.xdja.rcs.sc.core.bean.MessageResponse;
import com.xdja.rcs.sc.remoting.api.RemotingAPI;
import com.xdja.rcs.sc.remoting.protocol.RemotingData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/sc-client-1.1-20150413.030425-2.jar:com/xdja/rcs/sc/client/core/producer/SendMessageAsync.class */
public class SendMessageAsync implements Runnable {
    private CacheableQueue<Message> queue;
    private ScServerNode server;
    private Logger logger = LoggerFactory.getLogger(getClass());
    private final int retryInterval = 5000;
    private final int retryCount = 24;
    protected int retryTimes = 0;

    public SendMessageAsync(CacheableQueue<Message> cacheableQueue, ScServerNode scServerNode) {
        this.queue = cacheableQueue;
        this.server = scServerNode;
    }

    @Override // java.lang.Runnable
    public void run() {
        String id = this.server.getId();
        Message message = null;
        while (true) {
            try {
                message = this.queue.take();
                if (null != message) {
                    RemotingData sendData = RemotingAPI.sendData(this.server.getAddr(), this.server.getPort(), JSONUtil.toJSONBytes(message));
                    if (null != sendData) {
                        MessageResponse messageResponse = (MessageResponse) JSONUtil.toJavaBean(sendData.getContent(), new TypeReference<MessageResponse>() { // from class: com.xdja.rcs.sc.client.core.producer.SendMessageAsync.1
                        });
                        if (messageResponse.isSuccess()) {
                            this.retryTimes = 0;
                        } else if (messageResponse.isRecoverable()) {
                            this.logger.error("发送消息到SC\"{}\"失败，但可以重新发送，返回的结果为：{}", id, messageResponse.getReason());
                            retry(message);
                        } else {
                            ScClientAPI.logUnrecoverableMsg(this.server, message, messageResponse.getReason());
                            this.logger.error("发送消息到SC\"{}\"失败且不可重新发送，返回的结果为：{}", id, messageResponse.getReason());
                            this.retryTimes = 0;
                        }
                    }
                    this.logger.debug("主题为{}的消息{}发送成功", message.getTopicId(), message.getMsgId());
                }
            } catch (JSONException e) {
                ScClientAPI.logUnrecoverableMsg(this.server, message, e);
                this.logger.error("发送消息到SC\"" + id + "\"失败且不可重新发送", (Throwable) e);
            } catch (Throwable th) {
                if (null != message) {
                    this.logger.error("发送消息到SC\"" + id + "\"失败，尝试重新发送", th);
                    retry(message);
                }
            }
        }
    }

    private void retry(Message message) {
        try {
            this.queue.put4FirstTaking(message);
            int i = this.retryTimes;
            this.retryTimes = i + 1;
            if (i >= 24) {
                AlarmHelper.sendAlarmEmail("告警内容：生产者发送消息到SC\"" + this.server.getId() + "\"失败");
                this.retryTimes = 0;
            }
        } catch (Throwable th) {
            String format = String.format("主题为\"{}\"的消息\"{}\"重新放回路由队列失败", message.getTopicId(), message.getMsgId());
            ScClientAPI.logUnrecoverableMsg(this.server, message, new Exception(format, th));
            this.logger.error(format, th);
        }
        try {
            getClass();
            Thread.sleep(5000L);
        } catch (Throwable th2) {
            this.logger.warn("消息重试时线程休眠失败");
        }
    }
}
