/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.rocketmq.client.impl.consumer;

import com.alibaba.rocketmq.client.consumer.PullCallback;
import com.alibaba.rocketmq.client.consumer.PullResult;
import com.alibaba.rocketmq.client.consumer.PullStatus;
import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.hook.FilterMessageContext;
import com.alibaba.rocketmq.client.hook.FilterMessageHook;
import com.alibaba.rocketmq.client.impl.CommunicationMode;
import com.alibaba.rocketmq.client.impl.FindBrokerResult;
import com.alibaba.rocketmq.client.impl.consumer.PullResultExt;
import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
import com.alibaba.rocketmq.client.log.ClientLogger;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageAccessor;
import com.alibaba.rocketmq.common.message.MessageDecoder;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.common.protocol.header.PullMessageRequestHeader;
import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
import com.alibaba.rocketmq.common.protocol.route.TopicRouteData;
import com.alibaba.rocketmq.common.sysflag.PullSysFlag;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;

public class PullAPIWrapper {
    private final Logger log = ClientLogger.getLog();
    private ConcurrentHashMap<MessageQueue, AtomicLong> pullFromWhichNodeTable = new ConcurrentHashMap(32);
    private final MQClientInstance mQClientFactory;
    private final String consumerGroup;
    private final boolean unitMode;
    private volatile boolean connectBrokerByUser = false;
    private volatile long defaultBrokerId = 0L;
    private Random random = new Random(System.currentTimeMillis());
    private ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList();

    public PullAPIWrapper(MQClientInstance mQClientFactory, String consumerGroup, boolean unitMode) {
        this.mQClientFactory = mQClientFactory;
        this.consumerGroup = consumerGroup;
        this.unitMode = unitMode;
    }

    public void updatePullFromWhichNode(MessageQueue mq, long brokerId) {
        AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
        if (null == suggest) {
            this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId));
        } else {
            suggest.set(brokerId);
        }
    }

    public int randomNum() {
        int value = this.random.nextInt();
        if (value < 0 && (value = Math.abs(value)) < 0) {
            value = 0;
        }
        return value;
    }

    private String computPullFromWhichFilterServer(String topic, String brokerAddr) throws MQClientException {
        TopicRouteData topicRouteData;
        List list;
        ConcurrentHashMap<String, TopicRouteData> topicRouteTable = this.mQClientFactory.getTopicRouteTable();
        if (topicRouteTable != null && (list = (List)(topicRouteData = topicRouteTable.get(topic)).getFilterServerTable().get(brokerAddr)) != null && !list.isEmpty()) {
            return (String)list.get(this.randomNum() % list.size());
        }
        throw new MQClientException("Find Filter Server Failed, Broker Addr: " + brokerAddr + " topic: " + topic, null);
    }

    public PullResult processPullResult(MessageQueue mq, PullResult pullResult, SubscriptionData subscriptionData) {
        PullResultExt pullResultExt = (PullResultExt)pullResult;
        this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
        if (PullStatus.FOUND == pullResult.getPullStatus()) {
            ArrayList<MessageExt> msgList;
            ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
            ArrayList<MessageExt> msgListFilterAgain = msgList = MessageDecoder.decodes((ByteBuffer)byteBuffer);
            if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
                msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
                for (MessageExt msg : msgList) {
                    if (msg.getTags() == null || !subscriptionData.getTagsSet().contains(msg.getTags())) continue;
                    msgListFilterAgain.add(msg);
                }
            }
            if (this.hasHook()) {
                FilterMessageContext filterMessageContext = new FilterMessageContext();
                filterMessageContext.setUnitMode(this.unitMode);
                filterMessageContext.setMsgList(msgListFilterAgain);
                this.executeHook(filterMessageContext);
            }
            for (MessageExt msg : msgListFilterAgain) {
                MessageAccessor.putProperty((Message)msg, (String)"MIN_OFFSET", (String)Long.toString(pullResult.getMinOffset()));
                MessageAccessor.putProperty((Message)msg, (String)"MAX_OFFSET", (String)Long.toString(pullResult.getMaxOffset()));
            }
            pullResultExt.setMsgFoundList(msgListFilterAgain);
        }
        pullResultExt.setMessageBinary(null);
        return pullResult;
    }

    public long recalculatePullFromWhichNode(MessageQueue mq) {
        if (this.isConnectBrokerByUser()) {
            return this.defaultBrokerId;
        }
        AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
        if (suggest != null) {
            return suggest.get();
        }
        return 0L;
    }

    public PullResult pullKernelImpl(MessageQueue mq, String subExpression, long subVersion, long offset, int maxNums, int sysFlag, long commitOffset, long brokerSuspendMaxTimeMillis, long timeoutMillis, CommunicationMode communicationMode, PullCallback pullCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq), false);
        if (null == findBrokerResult) {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
            findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq), false);
        }
        if (findBrokerResult != null) {
            int sysFlagInner = sysFlag;
            if (findBrokerResult.isSlave()) {
                sysFlagInner = PullSysFlag.clearCommitOffsetFlag((int)sysFlagInner);
            }
            PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
            requestHeader.setConsumerGroup(this.consumerGroup);
            requestHeader.setTopic(mq.getTopic());
            requestHeader.setQueueId(Integer.valueOf(mq.getQueueId()));
            requestHeader.setQueueOffset(Long.valueOf(offset));
            requestHeader.setMaxMsgNums(Integer.valueOf(maxNums));
            requestHeader.setSysFlag(Integer.valueOf(sysFlagInner));
            requestHeader.setCommitOffset(Long.valueOf(commitOffset));
            requestHeader.setSuspendTimeoutMillis(Long.valueOf(brokerSuspendMaxTimeMillis));
            requestHeader.setSubscription(subExpression);
            requestHeader.setSubVersion(Long.valueOf(subVersion));
            String brokerAddr = findBrokerResult.getBrokerAddr();
            if (PullSysFlag.hasClassFilterFlag((int)sysFlagInner)) {
                brokerAddr = this.computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
            }
            PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(brokerAddr, requestHeader, timeoutMillis, communicationMode, pullCallback);
            return pullResult;
        }
        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }

    public boolean hasHook() {
        return !this.filterMessageHookList.isEmpty();
    }

    public void registerFilterMessageHook(ArrayList<FilterMessageHook> filterMessageHookList) {
        this.filterMessageHookList = filterMessageHookList;
    }

    public void executeHook(FilterMessageContext context) {
        if (!this.filterMessageHookList.isEmpty()) {
            for (FilterMessageHook hook : this.filterMessageHookList) {
                try {
                    hook.filterMessage(context);
                }
                catch (Throwable e) {
                    this.log.error("execute hook error. hookName={}", (Object)hook.hookName());
                }
            }
        }
    }

    public long getDefaultBrokerId() {
        return this.defaultBrokerId;
    }

    public void setDefaultBrokerId(long defaultBrokerId) {
        this.defaultBrokerId = defaultBrokerId;
    }

    public boolean isConnectBrokerByUser() {
        return this.connectBrokerByUser;
    }

    public void setConnectBrokerByUser(boolean connectBrokerByUser) {
        this.connectBrokerByUser = connectBrokerByUser;
    }
}

