/*
 * Decompiled with CFR 0.152.
 */
package com.alipay.sofa.registry.server.data.event.handler;

import com.alipay.sofa.registry.common.model.CommonResponse;
import com.alipay.sofa.registry.common.model.dataserver.Datum;
import com.alipay.sofa.registry.common.model.dataserver.NotifyFetchDatumRequest;
import com.alipay.sofa.registry.common.model.dataserver.NotifyOnlineRequest;
import com.alipay.sofa.registry.common.model.metaserver.DataNode;
import com.alipay.sofa.registry.common.model.store.URL;
import com.alipay.sofa.registry.consistency.hash.ConsistentHash;
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.remoting.exchange.message.Request;
import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig;
import com.alipay.sofa.registry.server.data.cache.BackupTriad;
import com.alipay.sofa.registry.server.data.cache.DataServerCache;
import com.alipay.sofa.registry.server.data.cache.DatumCache;
import com.alipay.sofa.registry.server.data.correction.LocalDataServerCleanHandler;
import com.alipay.sofa.registry.server.data.event.LocalDataServerChangeEvent;
import com.alipay.sofa.registry.server.data.event.handler.AbstractEventHandler;
import com.alipay.sofa.registry.server.data.executor.ExecutorFactory;
import com.alipay.sofa.registry.server.data.node.DataNodeStatus;
import com.alipay.sofa.registry.server.data.node.DataServerNode;
import com.alipay.sofa.registry.server.data.remoting.DataNodeExchanger;
import com.alipay.sofa.registry.server.data.remoting.dataserver.DataServerNodeFactory;
import com.alipay.sofa.registry.server.data.util.LocalServerStatusEnum;
import com.alipay.sofa.registry.server.data.util.TimeUtil;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import org.springframework.beans.factory.annotation.Autowired;

public class LocalDataServerChangeEventHandler
extends AbstractEventHandler<LocalDataServerChangeEvent> {
    private static final Logger LOGGER = LoggerFactory.getLogger(LocalDataServerChangeEventHandler.class);
    @Autowired
    private DataServerConfig dataServerBootstrapConfig;
    @Autowired
    private LocalDataServerCleanHandler localDataServerCleanHandler;
    @Autowired
    private DataServerCache dataServerCache;
    @Autowired
    private DataNodeExchanger dataNodeExchanger;
    @Autowired
    private DataNodeStatus dataNodeStatus;
    private BlockingQueue<LocalDataServerChangeEvent> events = new LinkedBlockingDeque<LocalDataServerChangeEvent>();
    private AtomicBoolean isChanged = new AtomicBoolean(false);

    @Override
    public Class interest() {
        return LocalDataServerChangeEvent.class;
    }

    @Override
    public void doHandle(LocalDataServerChangeEvent localDataServerChangeEvent) {
        this.isChanged.set(true);
        this.localDataServerCleanHandler.reset();
        this.events.offer(localDataServerChangeEvent);
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        super.afterPropertiesSet();
        this.start();
    }

    public void start() {
        Executor executor = ExecutorFactory.newSingleThreadExecutor(LocalDataServerChangeEventHandler.class.getSimpleName());
        executor.execute(new LocalClusterDataSyncer());
    }

    private class LocalClusterDataSyncer
    implements Runnable {
        private LocalClusterDataSyncer() {
        }

        @Override
        public void run() {
            while (true) {
                try {
                    while (true) {
                        LocalDataServerChangeEvent event;
                        Set<String> newJoined;
                        if ((newJoined = (event = (LocalDataServerChangeEvent)LocalDataServerChangeEventHandler.this.events.take()).getNewJoined()).contains(DataServerConfig.IP) && LocalDataServerChangeEventHandler.this.dataNodeStatus.getStatus() != LocalServerStatusEnum.INITIAL) {
                            LocalDataServerChangeEventHandler.this.dataNodeStatus.setStatus(LocalServerStatusEnum.INITIAL);
                        }
                        if (LocalDataServerChangeEventHandler.this.events.size() > 0) continue;
                        long changeVersion = event.getVersion();
                        LOGGER.info("begin handle dataserver change, version={},localDataServer={}", (Object)changeVersion, event.getLocalDataServerMap().keySet());
                        LocalDataServerChangeEventHandler.this.isChanged.set(false);
                        if (LocalServerStatusEnum.WORKING == LocalDataServerChangeEventHandler.this.dataNodeStatus.getStatus()) {
                            this.notifyToFetch(event, changeVersion);
                            continue;
                        }
                        LocalDataServerChangeEventHandler.this.dataServerCache.checkAndUpdateStatus(changeVersion);
                        this.notifyOnline(changeVersion);
                        LocalDataServerChangeEventHandler.this.dataServerCache.updateItem(event.getLocalDataServerMap(), event.getLocalDataCenterversion(), LocalDataServerChangeEventHandler.this.dataServerBootstrapConfig.getLocalDataCenter());
                    }
                }
                catch (Throwable t) {
                    LOGGER.error("sync local data error", t);
                    continue;
                }
                break;
            }
        }

        private void notifyToFetch(LocalDataServerChangeEvent event, long changeVersion) {
            Map<String, DataNode> dataServerMapIn = event.getLocalDataServerMap();
            ArrayList dataServerNodeList = Lists.newArrayList(dataServerMapIn.values());
            ConsistentHash consistentHash = new ConsistentHash(LocalDataServerChangeEventHandler.this.dataServerBootstrapConfig.getNumberOfReplicas(), (Collection)dataServerNodeList);
            ConcurrentHashMap<String, DataNode> dataServerMap = new ConcurrentHashMap<String, DataNode>(dataServerMapIn);
            Map<String, Map<String, Map<String, BackupTriad>>> toBeSyncMap = this.getToBeSyncMap((ConsistentHash<DataNode>)consistentHash);
            if (!LocalDataServerChangeEventHandler.this.isChanged.get()) {
                if (!toBeSyncMap.isEmpty()) {
                    for (Map.Entry<String, Map<String, Map<String, BackupTriad>>> toBeSyncEntry : toBeSyncMap.entrySet()) {
                        String ip = toBeSyncEntry.getKey();
                        HashMap<String, Map<String, Long>> allVersionMap = new HashMap<String, Map<String, Long>>();
                        Map<String, Map<String, BackupTriad>> dataInfoMap = toBeSyncEntry.getValue();
                        for (Map.Entry<String, Map<String, BackupTriad>> dataCenterEntry : dataInfoMap.entrySet()) {
                            String dataCenter = dataCenterEntry.getKey();
                            HashMap<String, Long> versionMap = new HashMap<String, Long>();
                            Map<String, BackupTriad> dataTriadMap = dataCenterEntry.getValue();
                            for (Map.Entry<String, BackupTriad> dataTriadEntry : dataTriadMap.entrySet()) {
                                String dataInfoId = dataTriadEntry.getKey();
                                Datum datum = DatumCache.get(dataCenter, dataInfoId);
                                if (datum == null) continue;
                                versionMap.put(dataInfoId, datum.getVersion());
                            }
                            if (versionMap.isEmpty()) continue;
                            allVersionMap.put(dataCenter, versionMap);
                        }
                        if (allVersionMap.isEmpty()) continue;
                        dataServerMap.remove(ip);
                        if (!this.doNotify(ip, allVersionMap, changeVersion)) continue;
                        LocalDataServerChangeEventHandler.this.dataServerCache.removeNotifyNewStatusNode(ip);
                    }
                }
                if (!dataServerMap.isEmpty()) {
                    for (String targetIp : dataServerMap.keySet()) {
                        if (!this.doNotify(targetIp, new HashMap<String, Map<String, Long>>(), changeVersion)) continue;
                        LocalDataServerChangeEventHandler.this.dataServerCache.removeNotifyNewStatusNode(targetIp);
                    }
                }
                if (!LocalDataServerChangeEventHandler.this.isChanged.get()) {
                    LocalDataServerChangeEventHandler.this.dataServerCache.updateItem(dataServerMapIn, event.getLocalDataCenterversion(), LocalDataServerChangeEventHandler.this.dataServerBootstrapConfig.getLocalDataCenter());
                }
            }
        }

        private Map<String, Map<String, Map<String, BackupTriad>>> getToBeSyncMap(ConsistentHash<DataNode> consistentHash) {
            HashMap<String, Map<String, Map<String, BackupTriad>>> toBeSyncMap = new HashMap<String, Map<String, Map<String, BackupTriad>>>();
            HashMap<String, List> triadCache = new HashMap<String, List>();
            Map<String, Map<String, Datum>> allMap = DatumCache.getAll();
            for (Map.Entry<String, Map<String, Datum>> dataCenterEntry : allMap.entrySet()) {
                String dataCenter = dataCenterEntry.getKey();
                Map<String, Datum> datumMap = dataCenterEntry.getValue();
                for (String dataInfoId : datumMap.keySet()) {
                    BackupTriad backupTriad;
                    List backupNodes;
                    if (LocalDataServerChangeEventHandler.this.isChanged.get()) {
                        return new HashMap<String, Map<String, Map<String, BackupTriad>>>();
                    }
                    if (triadCache.containsKey(dataInfoId)) {
                        backupNodes = (List)triadCache.get(dataInfoId);
                    } else {
                        backupNodes = consistentHash.getNUniqueNodesFor((Object)dataInfoId, LocalDataServerChangeEventHandler.this.dataServerBootstrapConfig.getStoreNodes());
                        triadCache.put(dataInfoId, backupNodes);
                    }
                    if ((backupTriad = LocalDataServerChangeEventHandler.this.dataServerCache.calculateOldBackupTriad(dataInfoId, LocalDataServerChangeEventHandler.this.dataServerBootstrapConfig.getLocalDataCenter(), LocalDataServerChangeEventHandler.this.dataServerBootstrapConfig)) == null) continue;
                    List<DataNode> newJoinedNodes = backupTriad.getNewJoined(backupNodes, LocalDataServerChangeEventHandler.this.dataServerCache.getNotWorking());
                    LOGGER.info("DataInfoId {} has got newJoinedNodes={}  for backupNodes={},now backupTriad is {}", new Object[]{dataInfoId, newJoinedNodes, backupNodes, backupTriad});
                    if (newJoinedNodes.isEmpty()) continue;
                    for (DataNode node : newJoinedNodes) {
                        Map dataInfoMap;
                        String ip = node.getIp();
                        if (!toBeSyncMap.containsKey(ip)) {
                            toBeSyncMap.put(ip, new HashMap());
                        }
                        if (!(dataInfoMap = (Map)toBeSyncMap.get(ip)).containsKey(dataCenter)) {
                            dataInfoMap.put(dataCenter, new HashMap());
                        }
                        ((Map)dataInfoMap.get(dataCenter)).put(dataInfoId, backupTriad);
                    }
                }
            }
            LOGGER.info("Get to Be SyncMap {}", toBeSyncMap);
            return toBeSyncMap;
        }

        private boolean doNotify(String targetIp, final Map<String, Map<String, Long>> notifyVersionMap, final long version) {
            while (!LocalDataServerChangeEventHandler.this.isChanged.get()) {
                final DataServerNode targetNode = DataServerNodeFactory.getDataServerNode(LocalDataServerChangeEventHandler.this.dataServerBootstrapConfig.getLocalDataCenter(), targetIp);
                if (targetNode == null || targetNode.getConnection() == null) {
                    LOGGER.info("notify version change to sync has not connect,targetNode={}, map={}", (Object)targetIp, notifyVersionMap);
                    return false;
                }
                try {
                    CommonResponse response = (CommonResponse)LocalDataServerChangeEventHandler.this.dataNodeExchanger.request(new Request(){

                        public Object getRequestBody() {
                            return new NotifyFetchDatumRequest(notifyVersionMap, DataServerConfig.IP, version);
                        }

                        public URL getRequestUrl() {
                            return new URL(targetNode.getConnection().getRemoteIP(), targetNode.getConnection().getRemotePort());
                        }
                    }).getResult();
                    if (response.isSuccess()) {
                        LOGGER.info("notify {} version change to sync,current node list version={}, map={}", new Object[]{targetNode.getIp(), version, notifyVersionMap});
                        return true;
                    }
                    throw new RuntimeException(response.getMessage());
                }
                catch (Throwable e) {
                    LOGGER.error("notify {} to fetch datum error", (Object)targetIp, (Object)e);
                    TimeUtil.randomDelay(500);
                }
            }
            return false;
        }

        private void notifyOnline(final long changeVersion) {
            Map<String, DataServerNode> dataServerNodeMap = DataServerNodeFactory.getDataServerNodes(LocalDataServerChangeEventHandler.this.dataServerBootstrapConfig.getLocalDataCenter());
            block2: for (Map.Entry<String, DataServerNode> serverEntry : dataServerNodeMap.entrySet()) {
                while (true) {
                    String ip = serverEntry.getKey();
                    final DataServerNode dataServerNode = serverEntry.getValue();
                    if (dataServerNode == null) continue block2;
                    try {
                        if (dataServerNode.getConnection() == null || !dataServerNode.getConnection().isFine()) {
                            TimeUtil.randomDelay(1000);
                            continue;
                        }
                        CommonResponse response = (CommonResponse)LocalDataServerChangeEventHandler.this.dataNodeExchanger.request(new Request(){

                            public Object getRequestBody() {
                                return new NotifyOnlineRequest(DataServerConfig.IP, changeVersion);
                            }

                            public URL getRequestUrl() {
                                return new URL(dataServerNode.getConnection().getRemoteIP(), dataServerNode.getConnection().getRemotePort());
                            }
                        }).getResult();
                        if (response.isSuccess()) {
                            LOGGER.info("notify {} that i am newer success,version={}", (Object)ip, (Object)changeVersion);
                            continue block2;
                        }
                        throw new RuntimeException(response.getMessage());
                    }
                    catch (Exception e) {
                        LOGGER.info("notify {} that i am newer failed", (Object)ip);
                        LOGGER.error("notify {} that i am newer error", (Object)ip, (Object)e);
                        TimeUtil.randomDelay(500);
                        continue;
                    }
                    break;
                }
            }
        }
    }
}

