/*
 * Decompiled with CFR 0.152.
 */
package com.xdja.transfer.mq.impl;

import com.xdja.common.logger.LoggerUtil;
import com.xdja.transfer.constant.ExecuteTransferRequestStatus;
import com.xdja.transfer.context.TransferContext;
import com.xdja.transfer.entity.TransferMessage;
import com.xdja.transfer.mq.BaseMessageListenerImpl;
import com.xdja.transfer.util.Utils;
import java.util.List;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

public class MessageListenerConcurrentlyImpl
extends BaseMessageListenerImpl
implements MessageListenerConcurrently {
    private final TransferContext context;

    public MessageListenerConcurrentlyImpl(TransferContext context) {
        this.context = context;
    }

    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        if (msgs == null || msgs.isEmpty()) {
            LoggerUtil.warn((String)"##### msgs is empty", (Object[])new Object[0]);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        LoggerUtil.info((String)"##### msgs's size is {}", (Object[])new Object[]{msgs.size()});
        ConsumeConcurrentlyStatus status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
        for (MessageExt msg : msgs) {
            try {
                LoggerUtil.info((String)"##### queueId:{}, topic:{}, msgId:{}, DelayTimeLevel:{}, reconsumeTimes:{}", (Object[])new Object[]{msg.getQueueId(), msg.getTopic(), msg.getMsgId(), msg.getDelayTimeLevel(), msg.getReconsumeTimes()});
                TransferMessage message = this.createTransferMessage(msg);
                if (message == null) continue;
                ExecuteTransferRequestStatus exeStatus = this.executeTransferRequest(this.context, message);
                switch (exeStatus) {
                    case EXE_SUCCESS: {
                        status = ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        break;
                    }
                    default: {
                        status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
                        break;
                    }
                }
            }
            catch (Exception e) {
                LoggerUtil.error((String)"exception detail:{}", (Object[])new Object[]{Utils.getStackTrace(e)});
            }
        }
        return status;
    }
}

