/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.store;

import java.net.Inet6Address;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.AppendMessageCallback;
import org.apache.rocketmq.store.AppendMessageResult;
import org.apache.rocketmq.store.AppendMessageStatus;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.store.MappedFile;
import org.apache.rocketmq.store.MappedFileQueue;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageLock;
import org.apache.rocketmq.store.PutMessageReentrantLock;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageSpinLock;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.StoreStatsService;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.ha.HAService;
import org.apache.rocketmq.store.schedule.ScheduleMessageService;

public class CommitLog {
    public static final int MESSAGE_MAGIC_CODE = -626843481;
    protected static final InternalLogger log = InternalLoggerFactory.getLogger((String)"RocketmqStore");
    protected static final int BLANK_MAGIC_CODE = -875286124;
    protected final MappedFileQueue mappedFileQueue;
    protected final DefaultMessageStore defaultMessageStore;
    private final FlushCommitLogService flushCommitLogService;
    private final FlushCommitLogService commitLogService;
    private final AppendMessageCallback appendMessageCallback;
    private final ThreadLocal<MessageExtBatchEncoder> batchEncoderThreadLocal;
    protected HashMap<String, Long> topicQueueTable = new HashMap(1024);
    protected volatile long confirmOffset = -1L;
    private volatile long beginTimeInLock = 0L;
    protected final PutMessageLock putMessageLock;

    public CommitLog(final DefaultMessageStore defaultMessageStore) {
        this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(), defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
        this.defaultMessageStore = defaultMessageStore;
        this.flushCommitLogService = FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType() ? new GroupCommitService() : new FlushRealTimeService();
        this.commitLogService = new CommitRealTimeService();
        this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
        this.batchEncoderThreadLocal = new ThreadLocal<MessageExtBatchEncoder>(){

            @Override
            protected MessageExtBatchEncoder initialValue() {
                return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
            }
        };
        this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
    }

    public boolean load() {
        boolean result = this.mappedFileQueue.load();
        log.info("load commit log " + (result ? "OK" : "Failed"));
        return result;
    }

    public void start() {
        this.flushCommitLogService.start();
        if (this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            this.commitLogService.start();
        }
    }

    public void shutdown() {
        if (this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            this.commitLogService.shutdown();
        }
        this.flushCommitLogService.shutdown();
    }

    public long flush() {
        this.mappedFileQueue.commit(0);
        this.mappedFileQueue.flush(0);
        return this.mappedFileQueue.getFlushedWhere();
    }

    public long getMaxOffset() {
        return this.mappedFileQueue.getMaxOffset();
    }

    public long remainHowManyDataToCommit() {
        return this.mappedFileQueue.remainHowManyDataToCommit();
    }

    public long remainHowManyDataToFlush() {
        return this.mappedFileQueue.remainHowManyDataToFlush();
    }

    public int deleteExpiredFile(long expiredTime, int deleteFilesInterval, long intervalForcibly, boolean cleanImmediately) {
        return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately);
    }

    public SelectMappedBufferResult getData(long offset) {
        return this.getData(offset, offset == 0L);
    }

    public SelectMappedBufferResult getData(long offset, boolean returnFirstOnNotFound) {
        int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);
        if (mappedFile != null) {
            int pos = (int)(offset % (long)mappedFileSize);
            SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);
            return result;
        }
        return null;
    }

    public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
        boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
        List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
        if (!mappedFiles.isEmpty()) {
            long mappedFileOffset;
            long processOffset;
            block8: {
                int index = mappedFiles.size() - 3;
                if (index < 0) {
                    index = 0;
                }
                MappedFile mappedFile = mappedFiles.get(index);
                ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
                processOffset = mappedFile.getFileFromOffset();
                mappedFileOffset = 0L;
                while (true) {
                    DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
                    int size = dispatchRequest.getMsgSize();
                    if (dispatchRequest.isSuccess() && size > 0) {
                        mappedFileOffset += (long)size;
                        continue;
                    }
                    if (dispatchRequest.isSuccess() && size == 0) {
                        if (++index >= mappedFiles.size()) {
                            log.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName());
                            break block8;
                        }
                        mappedFile = mappedFiles.get(index);
                        byteBuffer = mappedFile.sliceByteBuffer();
                        processOffset = mappedFile.getFileFromOffset();
                        mappedFileOffset = 0L;
                        log.info("recover next physics file, " + mappedFile.getFileName());
                        continue;
                    }
                    if (!dispatchRequest.isSuccess()) break;
                }
                log.info("recover physics file end, " + mappedFile.getFileName());
            }
            this.mappedFileQueue.setFlushedWhere(processOffset += mappedFileOffset);
            this.mappedFileQueue.setCommittedWhere(processOffset);
            this.mappedFileQueue.truncateDirtyFiles(processOffset);
            if (maxPhyOffsetOfConsumeQueue >= processOffset) {
                log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", (Object)maxPhyOffsetOfConsumeQueue, (Object)processOffset);
                this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
            }
        } else {
            log.warn("The commitlog files are deleted, and delete the consume queue files");
            this.mappedFileQueue.setFlushedWhere(0L);
            this.mappedFileQueue.setCommittedWhere(0L);
            this.defaultMessageStore.destroyLogics();
        }
    }

    public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, boolean checkCRC) {
        return this.checkMessageAndReturnSize(byteBuffer, checkCRC, true);
    }

    private void doNothingForDeadCode(Object obj) {
        if (obj != null) {
            log.debug(String.valueOf(obj.hashCode()));
        }
    }

    public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, boolean checkCRC, boolean readBody) {
        try {
            int readLength;
            int totalSize = byteBuffer.getInt();
            int magicCode = byteBuffer.getInt();
            switch (magicCode) {
                case -626843481: {
                    break;
                }
                case -875286124: {
                    return new DispatchRequest(0, true);
                }
                default: {
                    log.warn("found a illegal magic code 0x" + Integer.toHexString(magicCode));
                    return new DispatchRequest(-1, false);
                }
            }
            byte[] bytesContent = new byte[totalSize];
            int bodyCRC = byteBuffer.getInt();
            int queueId = byteBuffer.getInt();
            int flag = byteBuffer.getInt();
            long queueOffset = byteBuffer.getLong();
            long physicOffset = byteBuffer.getLong();
            int sysFlag = byteBuffer.getInt();
            long bornTimeStamp = byteBuffer.getLong();
            ByteBuffer byteBuffer1 = (sysFlag & 0x10) == 0 ? byteBuffer.get(bytesContent, 0, 8) : byteBuffer.get(bytesContent, 0, 20);
            long storeTimestamp = byteBuffer.getLong();
            ByteBuffer byteBuffer2 = (sysFlag & 0x20) == 0 ? byteBuffer.get(bytesContent, 0, 8) : byteBuffer.get(bytesContent, 0, 20);
            int reconsumeTimes = byteBuffer.getInt();
            long preparedTransactionOffset = byteBuffer.getLong();
            int bodyLen = byteBuffer.getInt();
            if (bodyLen > 0) {
                if (readBody) {
                    int crc;
                    byteBuffer.get(bytesContent, 0, bodyLen);
                    if (checkCRC && (crc = UtilAll.crc32((byte[])bytesContent, (int)0, (int)bodyLen)) != bodyCRC) {
                        log.warn("CRC check failed. bodyCRC={}, currentCRC={}", (Object)crc, (Object)bodyCRC);
                        return new DispatchRequest(-1, false);
                    }
                } else {
                    byteBuffer.position(byteBuffer.position() + bodyLen);
                }
            }
            byte topicLen = byteBuffer.get();
            byteBuffer.get(bytesContent, 0, topicLen);
            String topic = new String(bytesContent, 0, (int)topicLen, MessageDecoder.CHARSET_UTF8);
            long tagsCode = 0L;
            String keys = "";
            String uniqKey = null;
            short propertiesLength = byteBuffer.getShort();
            Map propertiesMap = null;
            if (propertiesLength > 0) {
                byteBuffer.get(bytesContent, 0, propertiesLength);
                String properties = new String(bytesContent, 0, (int)propertiesLength, MessageDecoder.CHARSET_UTF8);
                propertiesMap = MessageDecoder.string2messageProperties((String)properties);
                keys = (String)propertiesMap.get("KEYS");
                uniqKey = (String)propertiesMap.get("UNIQ_KEY");
                String tags = (String)propertiesMap.get("TAGS");
                if (tags != null && tags.length() > 0) {
                    tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType((int)sysFlag), tags);
                }
                String t = (String)propertiesMap.get("DELAY");
                if ("SCHEDULE_TOPIC_XXXX".equals(topic) && t != null) {
                    int delayLevel = Integer.parseInt(t);
                    if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                        delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();
                    }
                    if (delayLevel > 0) {
                        tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel, storeTimestamp);
                    }
                }
            }
            if (totalSize != (readLength = CommitLog.calMsgLength(sysFlag, bodyLen, topicLen, propertiesLength))) {
                this.doNothingForDeadCode(reconsumeTimes);
                this.doNothingForDeadCode(flag);
                this.doNothingForDeadCode(bornTimeStamp);
                this.doNothingForDeadCode(byteBuffer1);
                this.doNothingForDeadCode(byteBuffer2);
                log.error("[BUG]read total count not equals msg total size. totalSize={}, readTotalCount={}, bodyLen={}, topicLen={}, propertiesLength={}", new Object[]{totalSize, readLength, bodyLen, topicLen, propertiesLength});
                return new DispatchRequest(totalSize, false);
            }
            return new DispatchRequest(topic, queueId, physicOffset, totalSize, tagsCode, storeTimestamp, queueOffset, keys, uniqKey, sysFlag, preparedTransactionOffset, propertiesMap);
        }
        catch (Exception exception) {
            return new DispatchRequest(-1, false);
        }
    }

    protected static int calMsgLength(int sysFlag, int bodyLength, int topicLength, int propertiesLength) {
        int bornhostLength = (sysFlag & 0x10) == 0 ? 8 : 20;
        int storehostAddressLength = (sysFlag & 0x20) == 0 ? 8 : 20;
        int msgLen = 48 + bornhostLength + 8 + storehostAddressLength + 4 + 8 + 4 + (bodyLength > 0 ? bodyLength : 0) + 1 + topicLength + 2 + (propertiesLength > 0 ? propertiesLength : 0) + 0;
        return msgLen;
    }

    public long getConfirmOffset() {
        return this.confirmOffset;
    }

    public void setConfirmOffset(long phyOffset) {
        this.confirmOffset = phyOffset;
    }

    @Deprecated
    public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
        boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
        List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
        if (!mappedFiles.isEmpty()) {
            long mappedFileOffset;
            long processOffset;
            block9: {
                int index;
                MappedFile mappedFile = null;
                for (index = mappedFiles.size() - 1; index >= 0; --index) {
                    mappedFile = mappedFiles.get(index);
                    if (!this.isMappedFileMatchedRecover(mappedFile)) continue;
                    log.info("recover from this mapped file " + mappedFile.getFileName());
                    break;
                }
                if (index < 0) {
                    index = 0;
                    mappedFile = mappedFiles.get(index);
                }
                ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
                processOffset = mappedFile.getFileFromOffset();
                mappedFileOffset = 0L;
                while (true) {
                    DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
                    int size = dispatchRequest.getMsgSize();
                    if (!dispatchRequest.isSuccess()) break;
                    if (size > 0) {
                        mappedFileOffset += (long)size;
                        if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
                            if (dispatchRequest.getCommitLogOffset() >= this.defaultMessageStore.getConfirmOffset()) continue;
                            this.defaultMessageStore.doDispatch(dispatchRequest);
                            continue;
                        }
                        this.defaultMessageStore.doDispatch(dispatchRequest);
                        continue;
                    }
                    if (size != 0) continue;
                    if (++index >= mappedFiles.size()) {
                        log.info("recover physics file over, last mapped file " + mappedFile.getFileName());
                        break block9;
                    }
                    mappedFile = mappedFiles.get(index);
                    byteBuffer = mappedFile.sliceByteBuffer();
                    processOffset = mappedFile.getFileFromOffset();
                    mappedFileOffset = 0L;
                    log.info("recover next physics file, " + mappedFile.getFileName());
                }
                log.info("recover physics file end, " + mappedFile.getFileName() + " pos=" + byteBuffer.position());
            }
            this.mappedFileQueue.setFlushedWhere(processOffset += mappedFileOffset);
            this.mappedFileQueue.setCommittedWhere(processOffset);
            this.mappedFileQueue.truncateDirtyFiles(processOffset);
            if (maxPhyOffsetOfConsumeQueue >= processOffset) {
                log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", (Object)maxPhyOffsetOfConsumeQueue, (Object)processOffset);
                this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
            }
        } else {
            log.warn("The commitlog files are deleted, and delete the consume queue files");
            this.mappedFileQueue.setFlushedWhere(0L);
            this.mappedFileQueue.setCommittedWhere(0L);
            this.defaultMessageStore.destroyLogics();
        }
    }

    private boolean isMappedFileMatchedRecover(MappedFile mappedFile) {
        ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
        int magicCode = byteBuffer.getInt(4);
        if (magicCode != -626843481) {
            return false;
        }
        int sysFlag = byteBuffer.getInt(36);
        int bornhostLength = (sysFlag & 0x10) == 0 ? 8 : 20;
        int msgStoreTimePos = 48 + bornhostLength;
        long storeTimestamp = byteBuffer.getLong(msgStoreTimePos);
        if (0L == storeTimestamp) {
            return false;
        }
        if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable() && this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {
            if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {
                log.info("find check timestamp, {} {}", (Object)storeTimestamp, (Object)UtilAll.timeMillisToHumanString((long)storeTimestamp));
                return true;
            }
        } else if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {
            log.info("find check timestamp, {} {}", (Object)storeTimestamp, (Object)UtilAll.timeMillisToHumanString((long)storeTimestamp));
            return true;
        }
        return false;
    }

    private void notifyMessageArriving() {
    }

    public boolean resetOffset(long offset) {
        return this.mappedFileQueue.resetOffset(offset);
    }

    public long getBeginTimeInLock() {
        return this.beginTimeInLock;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
        msg.setStoreTimestamp(System.currentTimeMillis());
        msg.setBodyCRC(UtilAll.crc32((byte[])msg.getBody()));
        AppendMessageResult result = null;
        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
        String topic = msg.getTopic();
        int queueId = msg.getQueueId();
        int tranType = MessageSysFlag.getTransactionValue((int)msg.getSysFlag());
        if ((tranType == 0 || tranType == 8) && msg.getDelayTimeLevel() > 0) {
            if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
            }
            topic = "SCHEDULE_TOPIC_XXXX";
            queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
            MessageAccessor.putProperty((Message)msg, (String)"REAL_TOPIC", (String)msg.getTopic());
            MessageAccessor.putProperty((Message)msg, (String)"REAL_QID", (String)String.valueOf(msg.getQueueId()));
            msg.setPropertiesString(MessageDecoder.messageProperties2String((Map)msg.getProperties()));
            msg.setTopic(topic);
            msg.setQueueId(queueId);
        }
        long elapsedTimeInLock = 0L;
        MappedFile unlockMappedFile = null;
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
        this.putMessageLock.lock();
        try {
            long beginLockTimestamp;
            this.beginTimeInLock = beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            msg.setStoreTimestamp(beginLockTimestamp);
            if (null == mappedFile || mappedFile.isFull()) {
                mappedFile = this.mappedFileQueue.getLastMappedFile(0L);
            }
            if (null == mappedFile) {
                log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                this.beginTimeInLock = 0L;
                CompletableFuture<PutMessageResult> completableFuture = CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
                return completableFuture;
            }
            result = mappedFile.appendMessage(msg, this.appendMessageCallback);
            switch (result.getStatus()) {
                case PUT_OK: {
                    break;
                }
                case END_OF_FILE: {
                    unlockMappedFile = mappedFile;
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0L);
                    if (null == mappedFile) {
                        log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                        this.beginTimeInLock = 0L;
                        CompletableFuture<PutMessageResult> completableFuture = CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
                        return completableFuture;
                    }
                    result = mappedFile.appendMessage(msg, this.appendMessageCallback);
                    break;
                }
                case MESSAGE_SIZE_EXCEEDED: 
                case PROPERTIES_SIZE_EXCEEDED: {
                    this.beginTimeInLock = 0L;
                    CompletableFuture<PutMessageResult> completableFuture = CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
                    return completableFuture;
                }
                case UNKNOWN_ERROR: {
                    this.beginTimeInLock = 0L;
                    CompletableFuture<PutMessageResult> completableFuture = CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
                    return completableFuture;
                }
                default: {
                    this.beginTimeInLock = 0L;
                    CompletableFuture<PutMessageResult> completableFuture = CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
                    return completableFuture;
                }
            }
            elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
            this.beginTimeInLock = 0L;
        }
        finally {
            this.putMessageLock.unlock();
        }
        if (elapsedTimeInLock > 500L) {
            log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", new Object[]{elapsedTimeInLock, msg.getBody().length, result});
        }
        if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
        }
        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
        storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
        CompletableFuture<PutMessageStatus> flushResultFuture = this.submitFlushRequest(result, putMessageResult, msg);
        CompletableFuture<PutMessageStatus> replicaResultFuture = this.submitReplicaRequest(result, putMessageResult, msg);
        return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
            if (flushStatus != PutMessageStatus.PUT_OK) {
                putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
            }
            if (replicaStatus != PutMessageStatus.PUT_OK) {
                putMessageResult.setPutMessageStatus((PutMessageStatus)((Object)replicaStatus));
            }
            return putMessageResult;
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch) {
        AppendMessageResult result;
        messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
        int tranType = MessageSysFlag.getTransactionValue((int)messageExtBatch.getSysFlag());
        if (tranType != 0) {
            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
        }
        if (messageExtBatch.getDelayTimeLevel() > 0) {
            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
        }
        long elapsedTimeInLock = 0L;
        MappedFile unlockMappedFile = null;
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
        MessageExtBatchEncoder batchEncoder = this.batchEncoderThreadLocal.get();
        messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch));
        this.putMessageLock.lock();
        try {
            long beginLockTimestamp;
            this.beginTimeInLock = beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            messageExtBatch.setStoreTimestamp(beginLockTimestamp);
            if (null == mappedFile || mappedFile.isFull()) {
                mappedFile = this.mappedFileQueue.getLastMappedFile(0L);
            }
            if (null == mappedFile) {
                log.error("Create mapped file1 error, topic: {} clientAddr: {}", (Object)messageExtBatch.getTopic(), (Object)messageExtBatch.getBornHostString());
                this.beginTimeInLock = 0L;
                CompletableFuture<PutMessageResult> completableFuture = CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
                return completableFuture;
            }
            result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback);
            switch (result.getStatus()) {
                case PUT_OK: {
                    break;
                }
                case END_OF_FILE: {
                    unlockMappedFile = mappedFile;
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0L);
                    if (null == mappedFile) {
                        log.error("Create mapped file2 error, topic: {} clientAddr: {}", (Object)messageExtBatch.getTopic(), (Object)messageExtBatch.getBornHostString());
                        this.beginTimeInLock = 0L;
                        CompletableFuture<PutMessageResult> completableFuture = CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
                        return completableFuture;
                    }
                    result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback);
                    break;
                }
                case MESSAGE_SIZE_EXCEEDED: 
                case PROPERTIES_SIZE_EXCEEDED: {
                    this.beginTimeInLock = 0L;
                    CompletableFuture<PutMessageResult> completableFuture = CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
                    return completableFuture;
                }
                default: {
                    this.beginTimeInLock = 0L;
                    CompletableFuture<PutMessageResult> completableFuture = CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
                    return completableFuture;
                }
            }
            elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
            this.beginTimeInLock = 0L;
        }
        finally {
            this.putMessageLock.unlock();
        }
        if (elapsedTimeInLock > 500L) {
            log.warn("[NOTIFYME]putMessages in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", new Object[]{elapsedTimeInLock, messageExtBatch.getBody().length, result});
        }
        if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
        }
        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
        storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(result.getMsgNum());
        storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(result.getWroteBytes());
        CompletableFuture<PutMessageStatus> flushOKFuture = this.submitFlushRequest(result, putMessageResult, (MessageExt)messageExtBatch);
        CompletableFuture<PutMessageStatus> replicaOKFuture = this.submitReplicaRequest(result, putMessageResult, (MessageExt)messageExtBatch);
        return flushOKFuture.thenCombine(replicaOKFuture, (flushStatus, replicaStatus) -> {
            if (flushStatus != PutMessageStatus.PUT_OK) {
                putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
            }
            if (replicaStatus != PutMessageStatus.PUT_OK) {
                putMessageResult.setPutMessageStatus((PutMessageStatus)((Object)replicaStatus));
            }
            return putMessageResult;
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public PutMessageResult putMessage(MessageExtBrokerInner msg) {
        InetSocketAddress storeSocketAddress;
        InetSocketAddress bornSocketAddress;
        msg.setStoreTimestamp(System.currentTimeMillis());
        msg.setBodyCRC(UtilAll.crc32((byte[])msg.getBody()));
        AppendMessageResult result = null;
        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
        String topic = msg.getTopic();
        int queueId = msg.getQueueId();
        int tranType = MessageSysFlag.getTransactionValue((int)msg.getSysFlag());
        if ((tranType == 0 || tranType == 8) && msg.getDelayTimeLevel() > 0) {
            if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
            }
            topic = "SCHEDULE_TOPIC_XXXX";
            queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
            MessageAccessor.putProperty((Message)msg, (String)"REAL_TOPIC", (String)msg.getTopic());
            MessageAccessor.putProperty((Message)msg, (String)"REAL_QID", (String)String.valueOf(msg.getQueueId()));
            msg.setPropertiesString(MessageDecoder.messageProperties2String((Map)msg.getProperties()));
            msg.setTopic(topic);
            msg.setQueueId(queueId);
        }
        if ((bornSocketAddress = (InetSocketAddress)msg.getBornHost()).getAddress() instanceof Inet6Address) {
            msg.setBornHostV6Flag();
        }
        if ((storeSocketAddress = (InetSocketAddress)msg.getStoreHost()).getAddress() instanceof Inet6Address) {
            msg.setStoreHostAddressV6Flag();
        }
        long elapsedTimeInLock = 0L;
        MappedFile unlockMappedFile = null;
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
        this.putMessageLock.lock();
        try {
            long beginLockTimestamp;
            this.beginTimeInLock = beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            msg.setStoreTimestamp(beginLockTimestamp);
            if (null == mappedFile || mappedFile.isFull()) {
                mappedFile = this.mappedFileQueue.getLastMappedFile(0L);
            }
            if (null == mappedFile) {
                log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                this.beginTimeInLock = 0L;
                PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
                return putMessageResult;
            }
            result = mappedFile.appendMessage(msg, this.appendMessageCallback);
            switch (result.getStatus()) {
                case PUT_OK: {
                    break;
                }
                case END_OF_FILE: {
                    unlockMappedFile = mappedFile;
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0L);
                    if (null == mappedFile) {
                        log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                        this.beginTimeInLock = 0L;
                        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
                        return putMessageResult;
                    }
                    result = mappedFile.appendMessage(msg, this.appendMessageCallback);
                    break;
                }
                case MESSAGE_SIZE_EXCEEDED: 
                case PROPERTIES_SIZE_EXCEEDED: {
                    this.beginTimeInLock = 0L;
                    PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
                    return putMessageResult;
                }
                case UNKNOWN_ERROR: {
                    this.beginTimeInLock = 0L;
                    PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
                    return putMessageResult;
                }
                default: {
                    this.beginTimeInLock = 0L;
                    PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
                    return putMessageResult;
                }
            }
            elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
            this.beginTimeInLock = 0L;
        }
        finally {
            this.putMessageLock.unlock();
        }
        if (elapsedTimeInLock > 500L) {
            log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", new Object[]{elapsedTimeInLock, msg.getBody().length, result});
        }
        if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
        }
        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
        storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
        this.handleDiskFlush(result, putMessageResult, msg);
        this.handleHA(result, putMessageResult, msg);
        return putMessageResult;
    }

    public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            GroupCommitService service = (GroupCommitService)this.flushCommitLogService;
            if (messageExt.isWaitStoreMsgOK()) {
                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + (long)result.getWroteBytes(), this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                service.putRequest(request);
                return request.future();
            }
            service.wakeup();
            return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
        }
        if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            this.flushCommitLogService.wakeup();
        } else {
            this.commitLogService.wakeup();
        }
        return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
    }

    public CompletableFuture<PutMessageStatus> submitReplicaRequest(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
        if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
            HAService service = this.defaultMessageStore.getHaService();
            if (messageExt.isWaitStoreMsgOK()) {
                if (service.isSlaveOK((long)result.getWroteBytes() + result.getWroteOffset())) {
                    GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + (long)result.getWroteBytes(), this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                    service.putRequest(request);
                    service.getWaitNotifyObject().wakeupAll();
                    return request.future();
                }
                return CompletableFuture.completedFuture(PutMessageStatus.SLAVE_NOT_AVAILABLE);
            }
        }
        return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
    }

    public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            GroupCommitService service = (GroupCommitService)this.flushCommitLogService;
            if (messageExt.isWaitStoreMsgOK()) {
                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + (long)result.getWroteBytes());
                service.putRequest(request);
                CompletableFuture<PutMessageStatus> flushOkFuture = request.future();
                PutMessageStatus flushStatus = null;
                try {
                    flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(), TimeUnit.MILLISECONDS);
                }
                catch (InterruptedException | ExecutionException | TimeoutException exception) {
                    // empty catch block
                }
                if (flushStatus != PutMessageStatus.PUT_OK) {
                    log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostString());
                    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
                }
            } else {
                service.wakeup();
            }
        } else if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            this.flushCommitLogService.wakeup();
        } else {
            this.commitLogService.wakeup();
        }
    }

    public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
        if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
            HAService service = this.defaultMessageStore.getHaService();
            if (messageExt.isWaitStoreMsgOK()) {
                if (service.isSlaveOK(result.getWroteOffset() + (long)result.getWroteBytes())) {
                    GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + (long)result.getWroteBytes());
                    service.putRequest(request);
                    service.getWaitNotifyObject().wakeupAll();
                    PutMessageStatus replicaStatus = null;
                    try {
                        replicaStatus = request.future().get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(), TimeUnit.MILLISECONDS);
                    }
                    catch (InterruptedException | ExecutionException | TimeoutException exception) {
                        // empty catch block
                    }
                    if (replicaStatus != PutMessageStatus.PUT_OK) {
                        log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
                        putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
                    }
                } else {
                    putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public PutMessageResult putMessages(MessageExtBatch messageExtBatch) {
        AppendMessageResult result;
        InetSocketAddress storeSocketAddress;
        messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
        int tranType = MessageSysFlag.getTransactionValue((int)messageExtBatch.getSysFlag());
        if (tranType != 0) {
            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
        }
        if (messageExtBatch.getDelayTimeLevel() > 0) {
            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
        }
        InetSocketAddress bornSocketAddress = (InetSocketAddress)messageExtBatch.getBornHost();
        if (bornSocketAddress.getAddress() instanceof Inet6Address) {
            messageExtBatch.setBornHostV6Flag();
        }
        if ((storeSocketAddress = (InetSocketAddress)messageExtBatch.getStoreHost()).getAddress() instanceof Inet6Address) {
            messageExtBatch.setStoreHostAddressV6Flag();
        }
        long elapsedTimeInLock = 0L;
        MappedFile unlockMappedFile = null;
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
        MessageExtBatchEncoder batchEncoder = this.batchEncoderThreadLocal.get();
        messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch));
        this.putMessageLock.lock();
        try {
            long beginLockTimestamp;
            this.beginTimeInLock = beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            messageExtBatch.setStoreTimestamp(beginLockTimestamp);
            if (null == mappedFile || mappedFile.isFull()) {
                mappedFile = this.mappedFileQueue.getLastMappedFile(0L);
            }
            if (null == mappedFile) {
                log.error("Create mapped file1 error, topic: {} clientAddr: {}", (Object)messageExtBatch.getTopic(), (Object)messageExtBatch.getBornHostString());
                this.beginTimeInLock = 0L;
                PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
                return putMessageResult;
            }
            result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback);
            switch (result.getStatus()) {
                case PUT_OK: {
                    break;
                }
                case END_OF_FILE: {
                    unlockMappedFile = mappedFile;
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0L);
                    if (null == mappedFile) {
                        log.error("Create mapped file2 error, topic: {} clientAddr: {}", (Object)messageExtBatch.getTopic(), (Object)messageExtBatch.getBornHostString());
                        this.beginTimeInLock = 0L;
                        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
                        return putMessageResult;
                    }
                    result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback);
                    break;
                }
                case MESSAGE_SIZE_EXCEEDED: 
                case PROPERTIES_SIZE_EXCEEDED: {
                    this.beginTimeInLock = 0L;
                    PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
                    return putMessageResult;
                }
                case UNKNOWN_ERROR: {
                    this.beginTimeInLock = 0L;
                    PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
                    return putMessageResult;
                }
                default: {
                    this.beginTimeInLock = 0L;
                    PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
                    return putMessageResult;
                }
            }
            elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
            this.beginTimeInLock = 0L;
        }
        finally {
            this.putMessageLock.unlock();
        }
        if (elapsedTimeInLock > 500L) {
            log.warn("[NOTIFYME]putMessages in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", new Object[]{elapsedTimeInLock, messageExtBatch.getBody().length, result});
        }
        if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
        }
        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
        storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(result.getMsgNum());
        storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(result.getWroteBytes());
        this.handleDiskFlush(result, putMessageResult, (MessageExt)messageExtBatch);
        this.handleHA(result, putMessageResult, (MessageExt)messageExtBatch);
        return putMessageResult;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public long pickupStoreTimestamp(long offset, int size) {
        SelectMappedBufferResult result;
        if (offset >= this.getMinOffset() && null != (result = this.getMessage(offset, size))) {
            try {
                int sysFlag = result.getByteBuffer().getInt(36);
                int bornhostLength = (sysFlag & 0x10) == 0 ? 8 : 20;
                int msgStoreTimePos = 48 + bornhostLength;
                long l = result.getByteBuffer().getLong(msgStoreTimePos);
                return l;
            }
            finally {
                result.release();
            }
        }
        return -1L;
    }

    public long getMinOffset() {
        MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile();
        if (mappedFile != null) {
            if (mappedFile.isAvailable()) {
                return mappedFile.getFileFromOffset();
            }
            return this.rollNextFile(mappedFile.getFileFromOffset());
        }
        return -1L;
    }

    public SelectMappedBufferResult getMessage(long offset, int size) {
        int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0L);
        if (mappedFile != null) {
            int pos = (int)(offset % (long)mappedFileSize);
            return mappedFile.selectMappedBuffer(pos, size);
        }
        return null;
    }

    public long rollNextFile(long offset) {
        int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
        return offset + (long)mappedFileSize - offset % (long)mappedFileSize;
    }

    public HashMap<String, Long> getTopicQueueTable() {
        return this.topicQueueTable;
    }

    public void setTopicQueueTable(HashMap<String, Long> topicQueueTable) {
        this.topicQueueTable = topicQueueTable;
    }

    public void destroy() {
        this.mappedFileQueue.destroy();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean appendData(long startOffset, byte[] data) {
        this.putMessageLock.lock();
        try {
            MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(startOffset);
            if (null == mappedFile) {
                log.error("appendData getLastMappedFile error  " + startOffset);
                boolean bl = false;
                return bl;
            }
            boolean bl = mappedFile.appendMessage(data);
            return bl;
        }
        finally {
            this.putMessageLock.unlock();
        }
    }

    public boolean retryDeleteFirstFile(long intervalForcibly) {
        return this.mappedFileQueue.retryDeleteFirstFile(intervalForcibly);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void removeQueueFromTopicQueueTable(String topic, int queueId) {
        String key = topic + "-" + queueId;
        CommitLog commitLog = this;
        synchronized (commitLog) {
            this.topicQueueTable.remove(key);
        }
        log.info("removeQueueFromTopicQueueTable OK Topic: {} QueueId: {}", (Object)topic, (Object)queueId);
    }

    public void checkSelf() {
        this.mappedFileQueue.checkSelf();
    }

    public long lockTimeMills() {
        long diff = 0L;
        long begin = this.beginTimeInLock;
        if (begin > 0L) {
            diff = this.defaultMessageStore.now() - begin;
        }
        if (diff < 0L) {
            diff = 0L;
        }
        return diff;
    }

    public static class MessageExtBatchEncoder {
        private final ByteBuffer msgBatchMemory;
        private final int maxMessageSize;

        MessageExtBatchEncoder(int size) {
            this.msgBatchMemory = ByteBuffer.allocateDirect(size);
            this.maxMessageSize = size;
        }

        public ByteBuffer encode(MessageExtBatch messageExtBatch) {
            this.msgBatchMemory.clear();
            int totalMsgLen = 0;
            ByteBuffer messagesByteBuff = messageExtBatch.wrap();
            int sysFlag = messageExtBatch.getSysFlag();
            int bornHostLength = (sysFlag & 0x10) == 0 ? 8 : 20;
            int storeHostLength = (sysFlag & 0x20) == 0 ? 8 : 20;
            ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength);
            ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);
            while (messagesByteBuff.hasRemaining()) {
                messagesByteBuff.getInt();
                messagesByteBuff.getInt();
                messagesByteBuff.getInt();
                int flag = messagesByteBuff.getInt();
                int bodyLen = messagesByteBuff.getInt();
                int bodyPos = messagesByteBuff.position();
                int bodyCrc = UtilAll.crc32((byte[])messagesByteBuff.array(), (int)bodyPos, (int)bodyLen);
                messagesByteBuff.position(bodyPos + bodyLen);
                short propertiesLen = messagesByteBuff.getShort();
                int propertiesPos = messagesByteBuff.position();
                messagesByteBuff.position(propertiesPos + propertiesLen);
                byte[] topicData = messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
                int topicLength = topicData.length;
                int msgLen = CommitLog.calMsgLength(messageExtBatch.getSysFlag(), bodyLen, topicLength, propertiesLen);
                if (msgLen > this.maxMessageSize) {
                    log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLen + ", maxMessageSize: " + this.maxMessageSize);
                    throw new RuntimeException("message size exceeded");
                }
                if ((totalMsgLen += msgLen) > this.maxMessageSize) {
                    throw new RuntimeException("message size exceeded");
                }
                this.msgBatchMemory.putInt(msgLen);
                this.msgBatchMemory.putInt(-626843481);
                this.msgBatchMemory.putInt(bodyCrc);
                this.msgBatchMemory.putInt(messageExtBatch.getQueueId());
                this.msgBatchMemory.putInt(flag);
                this.msgBatchMemory.putLong(0L);
                this.msgBatchMemory.putLong(0L);
                this.msgBatchMemory.putInt(messageExtBatch.getSysFlag());
                this.msgBatchMemory.putLong(messageExtBatch.getBornTimestamp());
                this.resetByteBuffer(bornHostHolder, bornHostLength);
                this.msgBatchMemory.put(messageExtBatch.getBornHostBytes(bornHostHolder));
                this.msgBatchMemory.putLong(messageExtBatch.getStoreTimestamp());
                this.resetByteBuffer(storeHostHolder, storeHostLength);
                this.msgBatchMemory.put(messageExtBatch.getStoreHostBytes(storeHostHolder));
                this.msgBatchMemory.putInt(messageExtBatch.getReconsumeTimes());
                this.msgBatchMemory.putLong(0L);
                this.msgBatchMemory.putInt(bodyLen);
                if (bodyLen > 0) {
                    this.msgBatchMemory.put(messagesByteBuff.array(), bodyPos, bodyLen);
                }
                this.msgBatchMemory.put((byte)topicLength);
                this.msgBatchMemory.put(topicData);
                this.msgBatchMemory.putShort(propertiesLen);
                if (propertiesLen <= 0) continue;
                this.msgBatchMemory.put(messagesByteBuff.array(), propertiesPos, propertiesLen);
            }
            this.msgBatchMemory.flip();
            return this.msgBatchMemory;
        }

        private void resetByteBuffer(ByteBuffer byteBuffer, int limit) {
            byteBuffer.flip();
            byteBuffer.limit(limit);
        }
    }

    class DefaultAppendMessageCallback
    implements AppendMessageCallback {
        private static final int END_FILE_MIN_BLANK_LENGTH = 8;
        private final ByteBuffer msgIdMemory;
        private final ByteBuffer msgIdV6Memory;
        private final ByteBuffer msgStoreItemMemory;
        private final int maxMessageSize;
        private final StringBuilder keyBuilder = new StringBuilder();
        private final StringBuilder msgIdBuilder = new StringBuilder();

        DefaultAppendMessageCallback(int size) {
            this.msgIdMemory = ByteBuffer.allocate(16);
            this.msgIdV6Memory = ByteBuffer.allocate(28);
            this.msgStoreItemMemory = ByteBuffer.allocate(size + 8);
            this.maxMessageSize = size;
        }

        public ByteBuffer getMsgStoreItemMemory() {
            return this.msgStoreItemMemory;
        }

        @Override
        public AppendMessageResult doAppend(long fileFromOffset, ByteBuffer byteBuffer, int maxBlank, MessageExtBrokerInner msgInner) {
            int propertiesLength;
            long wroteOffset = fileFromOffset + (long)byteBuffer.position();
            int sysflag = msgInner.getSysFlag();
            int bornHostLength = (sysflag & 0x10) == 0 ? 8 : 20;
            int storeHostLength = (sysflag & 0x20) == 0 ? 8 : 20;
            ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength);
            ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);
            this.resetByteBuffer(storeHostHolder, storeHostLength);
            String msgId = (sysflag & 0x20) == 0 ? MessageDecoder.createMessageId((ByteBuffer)this.msgIdMemory, (ByteBuffer)msgInner.getStoreHostBytes(storeHostHolder), (long)wroteOffset) : MessageDecoder.createMessageId((ByteBuffer)this.msgIdV6Memory, (ByteBuffer)msgInner.getStoreHostBytes(storeHostHolder), (long)wroteOffset);
            this.keyBuilder.setLength(0);
            this.keyBuilder.append(msgInner.getTopic());
            this.keyBuilder.append('-');
            this.keyBuilder.append(msgInner.getQueueId());
            String key = this.keyBuilder.toString();
            Long queueOffset = CommitLog.this.topicQueueTable.get(key);
            if (null == queueOffset) {
                queueOffset = 0L;
                CommitLog.this.topicQueueTable.put(key, queueOffset);
            }
            int tranType = MessageSysFlag.getTransactionValue((int)msgInner.getSysFlag());
            switch (tranType) {
                case 4: 
                case 12: {
                    queueOffset = 0L;
                    break;
                }
            }
            byte[] propertiesData = msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
            int n = propertiesLength = propertiesData == null ? 0 : propertiesData.length;
            if (propertiesLength > Short.MAX_VALUE) {
                log.warn("putMessage message properties length too long. length={}", (Object)propertiesData.length);
                return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
            }
            byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
            int topicLength = topicData.length;
            int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
            int msgLen = CommitLog.calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);
            if (msgLen > this.maxMessageSize) {
                log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength + ", maxMessageSize: " + this.maxMessageSize);
                return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
            }
            if (msgLen + 8 > maxBlank) {
                this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
                this.msgStoreItemMemory.putInt(maxBlank);
                this.msgStoreItemMemory.putInt(-875286124);
                long beginTimeMills = CommitLog.this.defaultMessageStore.now();
                byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
                return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
            }
            this.resetByteBuffer(this.msgStoreItemMemory, msgLen);
            this.msgStoreItemMemory.putInt(msgLen);
            this.msgStoreItemMemory.putInt(-626843481);
            this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
            this.msgStoreItemMemory.putInt(msgInner.getQueueId());
            this.msgStoreItemMemory.putInt(msgInner.getFlag());
            this.msgStoreItemMemory.putLong(queueOffset);
            this.msgStoreItemMemory.putLong(fileFromOffset + (long)byteBuffer.position());
            this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
            this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
            this.resetByteBuffer(bornHostHolder, bornHostLength);
            this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));
            this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
            this.resetByteBuffer(storeHostHolder, storeHostLength);
            this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));
            this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
            this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
            this.msgStoreItemMemory.putInt(bodyLength);
            if (bodyLength > 0) {
                this.msgStoreItemMemory.put(msgInner.getBody());
            }
            this.msgStoreItemMemory.put((byte)topicLength);
            this.msgStoreItemMemory.put(topicData);
            this.msgStoreItemMemory.putShort((short)propertiesLength);
            if (propertiesLength > 0) {
                this.msgStoreItemMemory.put(propertiesData);
            }
            long beginTimeMills = CommitLog.this.defaultMessageStore.now();
            byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
            AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
            switch (tranType) {
                case 4: 
                case 12: {
                    break;
                }
                case 0: 
                case 8: {
                    queueOffset = queueOffset + 1L;
                    CommitLog.this.topicQueueTable.put(key, queueOffset);
                    break;
                }
            }
            return result;
        }

        @Override
        public AppendMessageResult doAppend(long fileFromOffset, ByteBuffer byteBuffer, int maxBlank, MessageExtBatch messageExtBatch) {
            byteBuffer.mark();
            long wroteOffset = fileFromOffset + (long)byteBuffer.position();
            this.keyBuilder.setLength(0);
            this.keyBuilder.append(messageExtBatch.getTopic());
            this.keyBuilder.append('-');
            this.keyBuilder.append(messageExtBatch.getQueueId());
            String key = this.keyBuilder.toString();
            Long queueOffset = CommitLog.this.topicQueueTable.get(key);
            if (null == queueOffset) {
                queueOffset = 0L;
                CommitLog.this.topicQueueTable.put(key, queueOffset);
            }
            long beginQueueOffset = queueOffset;
            int totalMsgLen = 0;
            int msgNum = 0;
            this.msgIdBuilder.setLength(0);
            long beginTimeMills = CommitLog.this.defaultMessageStore.now();
            ByteBuffer messagesByteBuff = messageExtBatch.getEncodedBuff();
            int sysFlag = messageExtBatch.getSysFlag();
            int storeHostLength = (sysFlag & 0x20) == 0 ? 8 : 20;
            ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);
            this.resetByteBuffer(storeHostHolder, storeHostLength);
            ByteBuffer storeHostBytes = messageExtBatch.getStoreHostBytes(storeHostHolder);
            messagesByteBuff.mark();
            while (messagesByteBuff.hasRemaining()) {
                int msgPos = messagesByteBuff.position();
                int msgLen = messagesByteBuff.getInt();
                int bodyLen = msgLen - 40;
                if (msgLen > this.maxMessageSize) {
                    log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLen + ", maxMessageSize: " + this.maxMessageSize);
                    return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
                }
                if ((totalMsgLen += msgLen) + 8 > maxBlank) {
                    this.resetByteBuffer(this.msgStoreItemMemory, 8);
                    this.msgStoreItemMemory.putInt(maxBlank);
                    this.msgStoreItemMemory.putInt(-875286124);
                    messagesByteBuff.reset();
                    byteBuffer.reset();
                    byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
                    return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, this.msgIdBuilder.toString(), messageExtBatch.getStoreTimestamp(), beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
                }
                messagesByteBuff.position(msgPos + 20);
                messagesByteBuff.putLong(queueOffset);
                messagesByteBuff.putLong(wroteOffset + (long)totalMsgLen - (long)msgLen);
                storeHostBytes.rewind();
                String msgId = (sysFlag & 0x20) == 0 ? MessageDecoder.createMessageId((ByteBuffer)this.msgIdMemory, (ByteBuffer)storeHostBytes, (long)(wroteOffset + (long)totalMsgLen - (long)msgLen)) : MessageDecoder.createMessageId((ByteBuffer)this.msgIdV6Memory, (ByteBuffer)storeHostBytes, (long)(wroteOffset + (long)totalMsgLen - (long)msgLen));
                if (this.msgIdBuilder.length() > 0) {
                    this.msgIdBuilder.append(',').append(msgId);
                } else {
                    this.msgIdBuilder.append(msgId);
                }
                Long l = queueOffset;
                Long l2 = queueOffset = Long.valueOf(queueOffset + 1L);
                ++msgNum;
                messagesByteBuff.position(msgPos + msgLen);
            }
            messagesByteBuff.position(0);
            messagesByteBuff.limit(totalMsgLen);
            byteBuffer.put(messagesByteBuff);
            messageExtBatch.setEncodedBuff(null);
            AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, totalMsgLen, this.msgIdBuilder.toString(), messageExtBatch.getStoreTimestamp(), beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
            result.setMsgNum(msgNum);
            CommitLog.this.topicQueueTable.put(key, queueOffset);
            return result;
        }

        private void resetByteBuffer(ByteBuffer byteBuffer, int limit) {
            byteBuffer.flip();
            byteBuffer.limit(limit);
        }
    }

    class GroupCommitService
    extends FlushCommitLogService {
        private volatile List<GroupCommitRequest> requestsWrite;
        private volatile List<GroupCommitRequest> requestsRead;

        GroupCommitService() {
            this.requestsWrite = new ArrayList<GroupCommitRequest>();
            this.requestsRead = new ArrayList<GroupCommitRequest>();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public synchronized void putRequest(GroupCommitRequest request) {
            List<GroupCommitRequest> list = this.requestsWrite;
            synchronized (list) {
                this.requestsWrite.add(request);
            }
            if (this.hasNotified.compareAndSet(false, true)) {
                this.waitPoint.countDown();
            }
        }

        private void swapRequests() {
            List<GroupCommitRequest> tmp = this.requestsWrite;
            this.requestsWrite = this.requestsRead;
            this.requestsRead = tmp;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void doCommit() {
            List<GroupCommitRequest> list = this.requestsRead;
            synchronized (list) {
                if (!this.requestsRead.isEmpty()) {
                    for (GroupCommitRequest req : this.requestsRead) {
                        boolean flushOK = false;
                        for (int i = 0; i < 2 && !flushOK; ++i) {
                            boolean bl = flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
                            if (flushOK) continue;
                            CommitLog.this.mappedFileQueue.flush(0);
                        }
                        req.wakeupCustomer(flushOK);
                    }
                    long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                    if (storeTimestamp > 0L) {
                        CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                    }
                    this.requestsRead.clear();
                } else {
                    CommitLog.this.mappedFileQueue.flush(0);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            log.info(this.getServiceName() + " service started");
            while (!this.isStopped()) {
                try {
                    this.waitForRunning(10L);
                    this.doCommit();
                }
                catch (Exception e) {
                    log.warn(this.getServiceName() + " service has exception. ", (Throwable)e);
                }
            }
            try {
                Thread.sleep(10L);
            }
            catch (InterruptedException e) {
                log.warn("GroupCommitService Exception, ", (Throwable)e);
            }
            GroupCommitService groupCommitService = this;
            synchronized (groupCommitService) {
                this.swapRequests();
            }
            this.doCommit();
            log.info(this.getServiceName() + " service end");
        }

        protected void onWaitEnd() {
            this.swapRequests();
        }

        public String getServiceName() {
            return GroupCommitService.class.getSimpleName();
        }

        public long getJointime() {
            return 300000L;
        }
    }

    public static class GroupCommitRequest {
        private final long nextOffset;
        private CompletableFuture<PutMessageStatus> flushOKFuture = new CompletableFuture();
        private final long startTimestamp = System.currentTimeMillis();
        private long timeoutMillis = Long.MAX_VALUE;

        public GroupCommitRequest(long nextOffset, long timeoutMillis) {
            this.nextOffset = nextOffset;
            this.timeoutMillis = timeoutMillis;
        }

        public GroupCommitRequest(long nextOffset) {
            this.nextOffset = nextOffset;
        }

        public long getNextOffset() {
            return this.nextOffset;
        }

        public void wakeupCustomer(boolean flushOK) {
            long endTimestamp = System.currentTimeMillis();
            PutMessageStatus result = flushOK && endTimestamp - this.startTimestamp <= this.timeoutMillis ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT;
            this.flushOKFuture.complete(result);
        }

        public CompletableFuture<PutMessageStatus> future() {
            return this.flushOKFuture;
        }
    }

    class FlushRealTimeService
    extends FlushCommitLogService {
        private long lastFlushTimestamp;
        private long printTimes;

        FlushRealTimeService() {
            this.lastFlushTimestamp = 0L;
            this.printTimes = 0L;
        }

        public void run() {
            log.info(this.getServiceName() + " service started");
            while (!this.isStopped()) {
                boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
                int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
                int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
                int flushPhysicQueueThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
                boolean printFlushProgress = false;
                long currentTimeMillis = System.currentTimeMillis();
                if (currentTimeMillis >= this.lastFlushTimestamp + (long)flushPhysicQueueThoroughInterval) {
                    this.lastFlushTimestamp = currentTimeMillis;
                    flushPhysicQueueLeastPages = 0;
                    printFlushProgress = this.printTimes++ % 10L == 0L;
                }
                try {
                    long past;
                    if (flushCommitLogTimed) {
                        Thread.sleep(interval);
                    } else {
                        this.waitForRunning(interval);
                    }
                    if (printFlushProgress) {
                        this.printFlushProgress();
                    }
                    long begin = System.currentTimeMillis();
                    CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
                    long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                    if (storeTimestamp > 0L) {
                        CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                    }
                    if ((past = System.currentTimeMillis() - begin) <= 500L) continue;
                    log.info("Flush data to disk costs {} ms", (Object)past);
                }
                catch (Throwable e) {
                    log.warn(this.getServiceName() + " service has exception. ", e);
                    this.printFlushProgress();
                }
            }
            boolean result = false;
            for (int i = 0; i < 10 && !result; ++i) {
                result = CommitLog.this.mappedFileQueue.flush(0);
                log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
            }
            this.printFlushProgress();
            log.info(this.getServiceName() + " service end");
        }

        public String getServiceName() {
            return FlushRealTimeService.class.getSimpleName();
        }

        private void printFlushProgress() {
        }

        public long getJointime() {
            return 300000L;
        }
    }

    class CommitRealTimeService
    extends FlushCommitLogService {
        private long lastCommitTimestamp;

        CommitRealTimeService() {
            this.lastCommitTimestamp = 0L;
        }

        public String getServiceName() {
            return CommitRealTimeService.class.getSimpleName();
        }

        public void run() {
            log.info(this.getServiceName() + " service started");
            while (!this.isStopped()) {
                int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
                int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
                int commitDataThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();
                long begin = System.currentTimeMillis();
                if (begin >= this.lastCommitTimestamp + (long)commitDataThoroughInterval) {
                    this.lastCommitTimestamp = begin;
                    commitDataLeastPages = 0;
                }
                try {
                    boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
                    long end = System.currentTimeMillis();
                    if (!result) {
                        this.lastCommitTimestamp = end;
                        CommitLog.this.flushCommitLogService.wakeup();
                    }
                    if (end - begin > 500L) {
                        log.info("Commit data to file costs {} ms", (Object)(end - begin));
                    }
                    this.waitForRunning(interval);
                }
                catch (Throwable e) {
                    log.error(this.getServiceName() + " service has exception. ", e);
                }
            }
            boolean result = false;
            for (int i = 0; i < 10 && !result; ++i) {
                result = CommitLog.this.mappedFileQueue.commit(0);
                log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
            }
            log.info(this.getServiceName() + " service end");
        }
    }

    abstract class FlushCommitLogService
    extends ServiceThread {
        protected static final int RETRY_TIMES_OVER = 10;

        FlushCommitLogService() {
        }
    }
}

