/*
 * Decompiled with CFR 0.152.
 */
package com.alipay.sofa.registry.server.data.change.event;

import com.alipay.sofa.registry.common.model.dataserver.Datum;
import com.alipay.sofa.registry.common.model.store.Publisher;
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig;
import com.alipay.sofa.registry.server.data.cache.DatumCache;
import com.alipay.sofa.registry.server.data.cache.UnPublisher;
import com.alipay.sofa.registry.server.data.change.ChangeData;
import com.alipay.sofa.registry.server.data.change.DataChangeTypeEnum;
import com.alipay.sofa.registry.server.data.change.DataSourceTypeEnum;
import com.alipay.sofa.registry.server.data.change.event.ClientChangeEvent;
import com.alipay.sofa.registry.server.data.change.event.DataChangeEvent;
import com.alipay.sofa.registry.server.data.change.event.DataChangeScopeEnum;
import com.alipay.sofa.registry.server.data.change.event.IDataChangeEvent;
import com.alipay.sofa.registry.server.data.executor.ExecutorFactory;
import com.alipay.sofa.registry.server.data.node.DataServerNode;
import com.alipay.sofa.registry.server.data.remoting.dataserver.DataServerNodeFactory;
import com.google.common.collect.Interners;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.locks.ReentrantLock;

public class DataChangeEventQueue {
    private static final Logger LOGGER = LoggerFactory.getLogger(DataChangeEventQueue.class);
    private final String name;
    private final BlockingQueue<IDataChangeEvent> eventQueue;
    private final Map<String, Map<String, ChangeData>> CHANGE_DATA_MAP = new ConcurrentHashMap<String, Map<String, ChangeData>>();
    private final DelayQueue<ChangeData> CHANGE_QUEUE = new DelayQueue();
    private final int notifyIntervalMs;
    private final ReentrantLock lock = new ReentrantLock();
    private DataServerConfig dataServerConfig;

    public DataChangeEventQueue(int idx, DataServerConfig dataServerConfig) {
        this.name = String.format("%s_%s", DataChangeEventQueue.class.getSimpleName(), idx);
        this.dataServerConfig = dataServerConfig;
        int queueSize = dataServerConfig.getQueueSize();
        this.eventQueue = queueSize <= 0 ? new LinkedBlockingDeque<IDataChangeEvent>() : new LinkedBlockingDeque<IDataChangeEvent>(queueSize);
        this.notifyIntervalMs = dataServerConfig.getNotifyIntervalMs();
    }

    public void onChange(IDataChangeEvent event) {
        this.eventQueue.add(event);
    }

    public String getName() {
        return this.name;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ChangeData take() throws InterruptedException {
        ChangeData changeData = (ChangeData)this.CHANGE_QUEUE.take();
        this.lock.lock();
        try {
            Datum datum = changeData.getDatum();
            this.CHANGE_DATA_MAP.get(datum.getDataCenter()).remove(datum.getDataInfoId());
            ChangeData changeData2 = changeData;
            return changeData2;
        }
        finally {
            this.lock.unlock();
        }
    }

    private ChangeData getChangeData(String dataCenter, String dataInfoId, DataSourceTypeEnum sourceType, DataChangeTypeEnum changeType) {
        ChangeData changeData;
        ConcurrentHashMap<String, ChangeData> newMap;
        ConcurrentHashMap<String, ChangeData> map = this.CHANGE_DATA_MAP.get(dataCenter);
        if (map == null && (map = (ConcurrentHashMap<String, ChangeData>)this.CHANGE_DATA_MAP.putIfAbsent(dataCenter, newMap = new ConcurrentHashMap<String, ChangeData>())) == null) {
            map = newMap;
        }
        if ((changeData = map.get(dataInfoId)) == null) {
            ChangeData newChangeData = new ChangeData(null, this.notifyIntervalMs, sourceType, changeType);
            changeData = map.putIfAbsent(dataInfoId, newChangeData);
            if (changeData == null) {
                changeData = newChangeData;
            }
            this.CHANGE_QUEUE.put(changeData);
        }
        return changeData;
    }

    public void start() {
        LOGGER.info("[{}] begin start DataChangeEventQueue", (Object)this.getName());
        Executor executor = ExecutorFactory.newSingleThreadExecutor(String.format("%s_%s", DataChangeEventQueue.class.getSimpleName(), this.getName()));
        executor.execute(() -> {
            while (true) {
                try {
                    while (true) {
                        IDataChangeEvent event;
                        DataChangeScopeEnum scope;
                        if ((scope = (event = this.eventQueue.take()).getScope()) == DataChangeScopeEnum.DATUM) {
                            DataChangeEvent dataChangeEvent = (DataChangeEvent)event;
                            this.handleDatum(dataChangeEvent.getChangeType(), dataChangeEvent.getSourceType(), dataChangeEvent.getDatum());
                            continue;
                        }
                        if (scope != DataChangeScopeEnum.CLIENT) continue;
                        this.handleHost((ClientChangeEvent)event);
                    }
                }
                catch (Throwable e) {
                    LOGGER.error("[{}] handle change event failed", (Object)this.getName(), (Object)e);
                    continue;
                }
                break;
            }
        });
        LOGGER.info("[{}] start DataChangeEventQueue success", (Object)this.getName());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleHost(ClientChangeEvent event) {
        String clientHost = event.getHost();
        Object object = Interners.newWeakInterner().intern((Object)clientHost);
        synchronized (object) {
            Map<String, Publisher> pubMap = DatumCache.getByHost(clientHost);
            if (pubMap != null && !pubMap.isEmpty()) {
                int count = 0;
                for (Publisher publisher : pubMap.values()) {
                    DataServerNode dataServerNode = DataServerNodeFactory.computeDataServerNode(this.dataServerConfig.getLocalDataCenter(), publisher.getDataInfoId());
                    if (!DataServerConfig.IP.equals(dataServerNode.getIp())) continue;
                    Datum datum = new Datum((Publisher)new UnPublisher(publisher.getDataInfoId(), publisher.getRegisterId(), event.getOccurredTimestamp()), event.getDataCenter(), event.getVersion());
                    datum.setContainsUnPub(true);
                    this.handleDatum(DataChangeTypeEnum.MERGE, DataSourceTypeEnum.PUB, datum);
                    ++count;
                }
                LOGGER.info("[{}] client off handle, host={}, occurTimestamp={},version={},handle pub size={}", new Object[]{this.getName(), clientHost, event.getOccurredTimestamp(), event.getVersion(), count});
            } else {
                LOGGER.info("[{}] no datum to handle, host={}", (Object)this.getName(), (Object)clientHost);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleDatum(DataChangeTypeEnum changeType, DataSourceTypeEnum sourceType, Datum targetDatum) {
        this.lock.lock();
        try {
            ChangeData changeData = this.getChangeData(targetDatum.getDataCenter(), targetDatum.getDataInfoId(), sourceType, changeType);
            Datum cacheDatum = changeData.getDatum();
            if (changeType == DataChangeTypeEnum.COVER || cacheDatum == null) {
                changeData.setDatum(targetDatum);
            } else {
                Map targetPubMap = targetDatum.getPubMap();
                Map cachePubMap = cacheDatum.getPubMap();
                for (Publisher pub : targetPubMap.values()) {
                    String registerId = pub.getRegisterId();
                    Publisher cachePub = (Publisher)cachePubMap.get(registerId);
                    if (cachePub != null && (pub.getRegisterTimestamp() < cachePub.getRegisterTimestamp() || !(pub instanceof UnPublisher) && !(cachePub instanceof UnPublisher) && pub.getSourceAddress().equals(cachePub.getSourceAddress()) && cachePub.getVersion() >= pub.getVersion())) continue;
                    cachePubMap.put(registerId, pub);
                    cacheDatum.setVersion(targetDatum.getVersion());
                }
            }
        }
        finally {
            this.lock.unlock();
        }
    }
}

