package com.xdja.platform.redis.reliablequeue;

import ch.qos.logback.core.spi.AbstractComponentTracker;
import com.xdja.platform.redis.core.RedisClient;
import com.xdja.platform.redis.core.action.JedisActionNoResult;
import java.util.Collections;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Executors;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisException;

/* loaded from: input_file:WEB-INF/lib/platform-redis-2.0.2.jar:com/xdja/platform/redis/reliablequeue/ConsumerThread.class */
class ConsumerThread implements Runnable {
    private static final String NULL_VALUE = "_nil_";
    private String queueName;
    private String queueNameBak;
    private RedisClient redisClient;
    private ConsumeCallback callback;
    static final long DEFAULT_MONITOR_PERIOD = 60000;
    private Logger logger = LoggerFactory.getLogger(getClass());
    private boolean isRunning = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConsumerThread(String str, RedisClient redisClient) {
        this.queueName = str;
        this.queueNameBak = str + "_bak";
        this.redisClient = redisClient;
    }

    @Override // java.lang.Runnable
    public void run() {
        this.isRunning = true;
        while (this.isRunning) {
            try {
                try {
                    String brpoplpush = this.redisClient.brpoplpush(this.queueName, this.queueNameBak);
                    if (StringUtils.isNotBlank(brpoplpush)) {
                        this.redisClient.lpush(this.queueNameBak, NULL_VALUE);
                        if (this.callback.consume(this.queueName, brpoplpush)) {
                            this.redisClient.execute(new JedisActionNoResult() { // from class: com.xdja.platform.redis.reliablequeue.ConsumerThread.1
                                @Override // com.xdja.platform.redis.core.action.JedisActionNoResult
                                public void action(Jedis jedis) {
                                    jedis.ltrim(ConsumerThread.this.queueNameBak, 2L, -1L);
                                }
                            });
                        } else {
                            this.redisClient.lpop(this.queueNameBak);
                        }
                    } else {
                        this.redisClient.lpop(this.queueNameBak);
                    }
                    try {
                        this.redisClient.remAll(this.queueNameBak, NULL_VALUE);
                    } catch (Throwable th) {
                        this.logger.warn("从可靠队列{}中删除所有的\"_nil_\"失败，下次操作可靠队列时会继续删除", this.queueName);
                    }
                } catch (Throwable th2) {
                    this.logger.error("可靠队列{}消费过程中出现异常：{}", this.queueName, th2.getMessage());
                    try {
                        this.redisClient.remAll(this.queueNameBak, NULL_VALUE);
                    } catch (Throwable th3) {
                        this.logger.warn("从可靠队列{}中删除所有的\"_nil_\"失败，下次操作可靠队列时会继续删除", this.queueName);
                    }
                }
            } catch (Throwable th4) {
                try {
                    this.redisClient.remAll(this.queueNameBak, NULL_VALUE);
                } catch (Throwable th5) {
                    this.logger.warn("从可靠队列{}中删除所有的\"_nil_\"失败，下次操作可靠队列时会继续删除", this.queueName);
                }
                throw th4;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start(ConsumeCallback consumeCallback, long j) {
        if (this.isRunning) {
            return;
        }
        this.callback = consumeCallback;
        Executors.newSingleThreadExecutor().execute(this);
        startMonitor(j);
    }

    private void startMonitor(long j) {
        new Timer().schedule(new TimerTask() { // from class: com.xdja.platform.redis.reliablequeue.ConsumerThread.2
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                List<String> emptyList = Collections.emptyList();
                try {
                    emptyList = ConsumerThread.this.redisClient.lrange(ConsumerThread.this.queueNameBak, 0L, -1L);
                } catch (JedisException e) {
                    ConsumerThread.this.logger.error("从缓存中加载数据失败", (Throwable) e);
                }
                if (emptyList.isEmpty()) {
                    return;
                }
                int i = ConsumerThread.NULL_VALUE.equals(emptyList.get(0)) ? 2 : 0;
                for (int size = emptyList.size() - 1; size >= i; size--) {
                    String str = emptyList.get(size);
                    try {
                        if (StringUtils.isBlank(str) || ConsumerThread.NULL_VALUE.equals(str) || ConsumerThread.this.callback.consume(ConsumerThread.this.queueName, str)) {
                            ConsumerThread.this.redisClient.rremOne(ConsumerThread.this.queueNameBak, str);
                        }
                    } catch (Throwable th) {
                        ConsumerThread.this.logger.error("可靠队列{}重试消费过程中出现异常：{}", ConsumerThread.this.queueName, th.getMessage());
                    }
                }
            }
        }, 0L, (j < AbstractComponentTracker.LINGERING_TIMEOUT || j > 180000) ? 60000L : j);
    }
}
