package com.alibaba.jstorm.daemon.nimbus;

import backtype.storm.generated.Bolt;
import backtype.storm.generated.ComponentCommon;
import backtype.storm.generated.ErrorInfo;
import backtype.storm.generated.NotAliveException;
import backtype.storm.generated.SpoutSpec;
import backtype.storm.generated.StateSpoutSpec;
import backtype.storm.generated.StormTopology;
import backtype.storm.generated.SupervisorSummary;
import backtype.storm.generated.TaskMetricData;
import backtype.storm.generated.TaskSummary;
import backtype.storm.generated.TopologySummary;
import backtype.storm.generated.WorkerMetricData;
import backtype.storm.generated.WorkerSummary;
import backtype.storm.utils.ThriftTopologyUtils;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Cluster;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.cluster.StormMonitor;
import com.alibaba.jstorm.daemon.supervisor.SupervisorInfo;
import com.alibaba.jstorm.daemon.worker.WorkerMetricInfo;
import com.alibaba.jstorm.daemon.worker.metrics.AlimonitorClient;
import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
import com.alibaba.jstorm.task.Assignment;
import com.alibaba.jstorm.task.TaskMetricInfo;
import com.alibaba.jstorm.task.TkHbCacheTime;
import com.alibaba.jstorm.task.error.TaskError;
import com.alibaba.jstorm.task.heartbeat.TaskHeartbeat;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.PathUtils;
import com.alibaba.jstorm.utils.TimeUtils;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/alibaba/jstorm/daemon/nimbus/NimbusUtils.class */
public class NimbusUtils {
    private static Logger LOG = Logger.getLogger(NimbusUtils.class);

    private static Map mapifySerializations(List list) {
        HashMap hashMap = new HashMap();
        if (list != null) {
            int size = list.size();
            for (int i = 0; i < size; i++) {
                if (list.get(i) instanceof Map) {
                    hashMap.putAll((Map) list.get(i));
                } else {
                    hashMap.put(list.get(i), null);
                }
            }
        }
        return hashMap;
    }

    public static Map normalizeConf(Map map, Map map2, StormTopology stormTopology) throws Exception {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        HashMap hashMap = new HashMap();
        hashMap.putAll(map);
        hashMap.putAll(map2);
        Object obj = hashMap.get("topology.kryo.register");
        if (obj != null) {
            LOG.info("topology:" + map2.get("topology.name") + ", TOPOLOGY_KRYO_REGISTER" + obj.getClass().getName());
            JStormUtils.mergeList(arrayList, obj);
        }
        Object obj2 = hashMap.get("topology.kryo.decorators");
        if (obj2 != null) {
            LOG.info("topology:" + map2.get("topology.name") + ", TOPOLOGY_KRYO_DECORATOR" + obj2.getClass().getName());
            JStormUtils.mergeList(arrayList2, obj2);
        }
        for (String str : ThriftTopologyUtils.getComponentIds(stormTopology)) {
            String str2 = ThriftTopologyUtils.getComponentCommon(stormTopology, str).get_json_conf();
            if (str2 != null) {
                Map map3 = (Map) JStormUtils.from_json(str2);
                if (map3 == null) {
                    StringBuilder sb = new StringBuilder();
                    sb.append("Failed to deserilaize " + str);
                    sb.append(" json configuration: ");
                    sb.append(str2);
                    LOG.info(sb.toString());
                    throw new Exception(sb.toString());
                }
                Object obj3 = map3.get("topology.kryo.register");
                if (obj3 != null) {
                    LOG.info("topology:" + map2.get("topology.name") + ", componentId:" + str + ", TOPOLOGY_KRYO_REGISTER" + obj3.getClass().getName());
                    JStormUtils.mergeList(arrayList, obj3);
                }
                Object obj4 = map3.get("topology.kryo.decorators");
                if (obj4 != null) {
                    LOG.info("topology:" + map2.get("topology.name") + ", componentId:" + str + ", TOPOLOGY_KRYO_DECORATOR" + obj4.getClass().getName());
                    JStormUtils.mergeList(arrayList2, obj4);
                }
            }
        }
        Map mapifySerializations = mapifySerializations(arrayList);
        List distinctList = JStormUtils.distinctList(arrayList2);
        Integer parseInt = JStormUtils.parseInt(hashMap.get("topology.acker.executors"));
        if (parseInt == null) {
            parseInt = 1;
        }
        HashMap hashMap2 = new HashMap();
        hashMap2.putAll(map2);
        hashMap2.put("topology.kryo.decorators", distinctList);
        hashMap2.put("topology.kryo.register", mapifySerializations);
        hashMap2.put("topology.acker.executors", parseInt);
        hashMap2.put("topology.max.task.parallelism", hashMap.get("topology.max.task.parallelism"));
        return hashMap2;
    }

    public static Integer componentParalism(Map map, ComponentCommon componentCommon) {
        HashMap hashMap = new HashMap();
        hashMap.putAll(map);
        String str = componentCommon.get_json_conf();
        if (str != null) {
            hashMap.putAll((Map) JStormUtils.from_json(str));
        }
        Integer valueOf = Integer.valueOf(componentCommon.get_parallelism_hint());
        if (valueOf == null) {
            valueOf = 1;
        }
        Object obj = hashMap.get("topology.max.task.parallelism");
        return obj == null ? valueOf : Integer.valueOf(Math.min(JStormUtils.parseInt(obj).intValue(), valueOf.intValue()));
    }

    public static StormTopology normalizeTopology(Map map, StormTopology stormTopology, boolean z) {
        Integer spoutParallelism;
        Integer spoutParallelism2;
        Integer boltParallelism;
        StormTopology deepCopy = stormTopology.deepCopy();
        for (Map.Entry entry : ThriftTopologyUtils.getComponents(deepCopy).entrySet()) {
            Object value = entry.getValue();
            String str = (String) entry.getKey();
            ComponentCommon componentCommon = null;
            if (value instanceof Bolt) {
                componentCommon = ((Bolt) value).get_common();
                if (z && (boltParallelism = ConfigExtension.getBoltParallelism(map, str)) != null) {
                    componentCommon.set_parallelism_hint(boltParallelism.intValue());
                }
            }
            if (value instanceof SpoutSpec) {
                componentCommon = ((SpoutSpec) value).get_common();
                if (z && (spoutParallelism2 = ConfigExtension.getSpoutParallelism(map, str)) != null) {
                    componentCommon.set_parallelism_hint(spoutParallelism2.intValue());
                }
            }
            if (value instanceof StateSpoutSpec) {
                componentCommon = ((StateSpoutSpec) value).get_common();
                if (z && (spoutParallelism = ConfigExtension.getSpoutParallelism(map, str)) != null) {
                    componentCommon.set_parallelism_hint(spoutParallelism.intValue());
                }
            }
            HashMap hashMap = new HashMap();
            String str2 = componentCommon.get_json_conf();
            if (str2 != null) {
                hashMap.putAll((Map) JStormUtils.from_json(str2));
            }
            Integer componentParalism = componentParalism(map, componentCommon);
            hashMap.put("topology.tasks", componentParalism);
            componentCommon.set_parallelism_hint(componentParalism.intValue());
            LOG.info("Set " + str + " parallelism " + componentParalism);
            componentCommon.set_json_conf(JStormUtils.to_json(hashMap));
        }
        return deepCopy;
    }

    public static void cleanupCorruptTopologies(NimbusData nimbusData) throws Exception {
        StormClusterState stormClusterState = nimbusData.getStormClusterState();
        List read_dir_contents = PathUtils.read_dir_contents(StormConfig.masterStormdistRoot(nimbusData.getConf()));
        List<String> active_storms = nimbusData.getStormClusterState().active_storms();
        if (active_storms != null && active_storms.size() > 0) {
            if (read_dir_contents != null) {
                active_storms.removeAll(read_dir_contents);
            }
            for (String str : active_storms) {
                LOG.info("Corrupt topology " + str + " has state on zookeeper but doesn't have a local dir on Nimbus. Cleaning up...");
                stormClusterState.remove_storm(str);
            }
        }
        LOG.info("Successfully cleanup all old toplogies");
    }

    public static List<String> listStormdistFiles(NimbusData nimbusData) throws Exception {
        return PathUtils.read_dir_contents(StormConfig.masterStormdistRoot(nimbusData.getConf()));
    }

    public static String findTopoFileInStormdist(NimbusData nimbusData, String str) throws Exception {
        String str2 = null;
        Iterator<String> it = listStormdistFiles(nimbusData).iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            String next = it.next();
            if (next.indexOf(str) != -1) {
                str2 = next;
                break;
            }
        }
        return str2;
    }

    public static boolean isTaskDead(NimbusData nimbusData, String str, Integer num) {
        String str2 = " topology:" + str + ",taskid:" + num;
        Integer num2 = null;
        try {
            TaskHeartbeat task_heartbeat = nimbusData.getStormClusterState().task_heartbeat(str, num.intValue());
            if (task_heartbeat != null) {
                num2 = Integer.valueOf(task_heartbeat.getTimeSecs());
            }
            Map<Integer, TkHbCacheTime> map = nimbusData.getTaskHeartbeatsCache().get(str);
            if (map == null) {
                LOG.info("No task heartbeat cache " + str);
                map = new HashMap();
                nimbusData.getTaskHeartbeatsCache().put(str, map);
            }
            TkHbCacheTime tkHbCacheTime = map.get(num);
            if (tkHbCacheTime == null) {
                LOG.info("No task heartbeat cache " + str2);
                if (task_heartbeat == null) {
                    LOG.info("No ZK task hearbeat " + str2);
                    return true;
                }
                TkHbCacheTime tkHbCacheTime2 = new TkHbCacheTime();
                tkHbCacheTime2.update(task_heartbeat);
                map.put(num, tkHbCacheTime2);
                return false;
            }
            if (num2 == null) {
                LOG.debug("No ZK task heartbeat " + str2);
                if (TimeUtils.current_time_secs() - tkHbCacheTime.getTaskAssignedTime() <= JStormUtils.parseInt(nimbusData.getConf().get("nimbus.task.launch.secs")).intValue()) {
                    return false;
                }
                LOG.info(str2 + " failed to init ");
                return true;
            }
            int nimbusTime = tkHbCacheTime.getNimbusTime();
            int taskReportedTime = tkHbCacheTime.getTaskReportedTime();
            int current_time_secs = TimeUtils.current_time_secs();
            if (nimbusTime == 0) {
                tkHbCacheTime.setNimbusTime(current_time_secs);
                tkHbCacheTime.setTaskReportedTime(num2.intValue());
                LOG.info("Update taskheartbeat to nimbus cache " + str2);
                return false;
            }
            if (taskReportedTime != num2.intValue()) {
                tkHbCacheTime.setNimbusTime(current_time_secs);
                tkHbCacheTime.setTaskReportedTime(num2.intValue());
                LOG.debug(str2 + ",nimbusTime " + current_time_secs + ",zkReport:" + num2 + ",report:" + taskReportedTime);
                return false;
            }
            if (current_time_secs - nimbusTime <= JStormUtils.parseInt(nimbusData.getConf().get("nimbus.task.timeout.secs")).intValue()) {
                return false;
            }
            Date date = new Date(nimbusTime * 1000);
            StringBuilder sb = new StringBuilder();
            sb.append(str2);
            sb.append(" last tasktime is ");
            sb.append(nimbusTime);
            sb.append(":").append(date);
            sb.append(",current ");
            sb.append(current_time_secs);
            sb.append(":").append(new Date(current_time_secs * 1000));
            LOG.info(sb.toString());
            return true;
        } catch (Exception e) {
            LOG.error("Failed to get ZK task hearbeat " + str2, e);
            return true;
        }
    }

    public static void updateTaskHbStartTime(NimbusData nimbusData, Assignment assignment, String str) {
        Map<Integer, TkHbCacheTime> map = nimbusData.getTaskHeartbeatsCache().get(str);
        if (map == null) {
            map = new HashMap();
            nimbusData.getTaskHeartbeatsCache().put(str, map);
        }
        for (Map.Entry<Integer, Integer> entry : assignment.getTaskStartTimeSecs().entrySet()) {
            Integer key = entry.getKey();
            Integer value = entry.getValue();
            TkHbCacheTime tkHbCacheTime = map.get(key);
            if (tkHbCacheTime == null) {
                tkHbCacheTime = new TkHbCacheTime();
                map.put(key, tkHbCacheTime);
            }
            tkHbCacheTime.setTaskAssignedTime(value.intValue());
        }
    }

    public static <T> void transitionName(NimbusData nimbusData, String str, boolean z, StatusType statusType, T... tArr) throws Exception {
        String str2 = Cluster.get_topology_id(nimbusData.getStormClusterState(), str);
        if (str2 == null) {
            throw new NotAliveException(str);
        }
        transition(nimbusData, str2, z, statusType, tArr);
    }

    public static <T> void transition(NimbusData nimbusData, String str, boolean z, StatusType statusType, T... tArr) {
        try {
            nimbusData.getStatusTransition().transition(str, z, statusType, tArr);
        } catch (Exception e) {
            LOG.error("Failed to do status transition,", e);
        }
    }

    public static TopologySummary mkTopologySummary(Assignment assignment, String str, String str2, String str3, int i, Map<Integer, String> map) {
        int size = assignment.getWorkers().size();
        int i2 = 0;
        Iterator<ResourceWorkerSlot> it = assignment.getWorkers().iterator();
        while (it.hasNext()) {
            i2 += it.next().getTasks().size();
        }
        long currentTimeMillis = System.currentTimeMillis() / 1000;
        String str4 = AlimonitorClient.DEFAULT_ERROR_INFO;
        if (map != null) {
            Iterator<Map.Entry<Integer, String>> it2 = map.entrySet().iterator();
            while (true) {
                if (!it2.hasNext()) {
                    break;
                }
                if (currentTimeMillis - Long.valueOf(it2.next().getValue()).longValue() < r0.getKey().intValue()) {
                    str4 = "Y";
                    break;
                }
            }
        }
        return new TopologySummary(str, str2, str3, i, i2, size, str4);
    }

    public static SupervisorSummary mkSupervisorSummary(SupervisorInfo supervisorInfo, String str, Map<String, Integer> map) {
        Integer num = map.get(str);
        return new SupervisorSummary(supervisorInfo.getHostName(), str, supervisorInfo.getUptimeSecs(), supervisorInfo.getWorkerPorts().size(), num == null ? 0 : num.intValue());
    }

    public static List<SupervisorSummary> mkSupervisorSummaries(Map<String, SupervisorInfo> map, Map<String, Assignment> map2) {
        HashMap hashMap = new HashMap();
        Iterator<Map.Entry<String, Assignment>> it = map2.entrySet().iterator();
        while (it.hasNext()) {
            Iterator<ResourceWorkerSlot> it2 = it.next().getValue().getWorkers().iterator();
            while (it2.hasNext()) {
                String nodeId = it2.next().getNodeId();
                if (map.get(nodeId) != null) {
                    Integer num = (Integer) hashMap.get(nodeId);
                    if (num == null) {
                        num = 0;
                        hashMap.put(nodeId, null);
                    }
                    hashMap.put(nodeId, Integer.valueOf(num.intValue() + 1));
                }
            }
        }
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<String, SupervisorInfo> entry : map.entrySet()) {
            arrayList.add(mkSupervisorSummary(entry.getValue(), entry.getKey(), hashMap));
        }
        Collections.sort(arrayList, new Comparator<SupervisorSummary>() { // from class: com.alibaba.jstorm.daemon.nimbus.NimbusUtils.1
            @Override // java.util.Comparator
            public int compare(SupervisorSummary supervisorSummary, SupervisorSummary supervisorSummary2) {
                return supervisorSummary.get_host().compareTo(supervisorSummary2.get_host());
            }
        });
        return arrayList;
    }

    public static TaskSummary mkSimpleTaskSummary(ResourceWorkerSlot resourceWorkerSlot, int i, String str, String str2, String str3, int i2) {
        TaskSummary taskSummary = new TaskSummary();
        taskSummary.set_task_id(i);
        taskSummary.set_component_id(str);
        taskSummary.set_component_type(str2);
        taskSummary.set_host(str3);
        taskSummary.set_port(resourceWorkerSlot.getPort());
        taskSummary.set_uptime_secs(i2);
        taskSummary.set_errors(new ArrayList());
        return taskSummary;
    }

    public static Map<Integer, TaskSummary> mkTaskSummary(StormClusterState stormClusterState, Assignment assignment, Map<Integer, String> map, String str) throws Exception {
        TreeMap treeMap = new TreeMap();
        for (ResourceWorkerSlot resourceWorkerSlot : assignment.getWorkers()) {
            for (Integer num : resourceWorkerSlot.getTasks()) {
                TaskSummary taskSummary = new TaskSummary();
                taskSummary.set_task_id(num.intValue());
                taskSummary.set_component_id(map.get(num));
                taskSummary.set_host(resourceWorkerSlot.getHostname());
                taskSummary.set_port(resourceWorkerSlot.getPort());
                List<TaskError> task_errors = stormClusterState.task_errors(str, num.intValue());
                ArrayList arrayList = new ArrayList();
                if (task_errors != null) {
                    int size = task_errors.size();
                    for (int i = 0; i < size; i++) {
                        TaskError taskError = task_errors.get(i);
                        arrayList.add(new ErrorInfo(taskError.getError(), taskError.getTimSecs()));
                    }
                }
                taskSummary.set_errors(arrayList);
                TaskHeartbeat task_heartbeat = stormClusterState.task_heartbeat(str, num.intValue());
                if (task_heartbeat == null) {
                    LOG.warn("Topology " + str + " task " + num + " hasn't been started");
                    taskSummary.set_status(ConfigExtension.TASK_STATUS_STARTING);
                } else {
                    taskSummary.set_uptime_secs(task_heartbeat.getUptimeSecs());
                    taskSummary.set_stats(task_heartbeat.getStats().getTaskStats());
                    taskSummary.set_status(ConfigExtension.TASK_STATUS_ACTIVE);
                }
                treeMap.put(num, taskSummary);
            }
        }
        return treeMap;
    }

    public static List<WorkerSummary> mkWorkerSummary(String str, Assignment assignment, Map<Integer, TaskSummary> map) {
        Set<ResourceWorkerSlot> workers = assignment.getWorkers();
        ArrayList arrayList = new ArrayList();
        for (ResourceWorkerSlot resourceWorkerSlot : workers) {
            WorkerSummary workerSummary = new WorkerSummary();
            workerSummary.set_topology(str);
            workerSummary.set_port(resourceWorkerSlot.getPort());
            ArrayList arrayList2 = new ArrayList();
            workerSummary.set_tasks(arrayList2);
            Iterator<Integer> it = resourceWorkerSlot.getTasks().iterator();
            while (it.hasNext()) {
                TaskSummary taskSummary = map.get(it.next());
                if (taskSummary != null) {
                    arrayList2.add(taskSummary);
                }
            }
            arrayList.add(workerSummary);
        }
        return arrayList;
    }

    public static void updateMetricMonitorStatus(StormClusterState stormClusterState, String str, boolean z) throws Exception {
        stormClusterState.set_storm_monitor(str, new StormMonitor(z));
    }

    public static void updateMetricsInfo(NimbusData nimbusData, String str, Assignment assignment) {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        StormClusterState stormClusterState = nimbusData.getStormClusterState();
        for (ResourceWorkerSlot resourceWorkerSlot : assignment.getWorkers()) {
            arrayList2.add(resourceWorkerSlot.getHostname() + ":" + resourceWorkerSlot.getPort());
            arrayList.addAll(resourceWorkerSlot.getTasks());
        }
        try {
            Iterator<String> it = stormClusterState.get_metric_taskIds(str).iterator();
            while (it.hasNext()) {
                Integer valueOf = Integer.valueOf(it.next());
                if (!arrayList.contains(valueOf)) {
                    stormClusterState.remove_metric_task(str, String.valueOf(valueOf));
                }
            }
            for (String str2 : stormClusterState.get_metric_workerIds(str)) {
                if (!arrayList2.contains(str2)) {
                    stormClusterState.remove_metric_worker(str, str2);
                }
            }
            for (String str3 : stormClusterState.get_metric_users(str)) {
                if (!arrayList2.contains(str3)) {
                    stormClusterState.remove_metric_user(str, str3);
                }
            }
        } catch (Exception e) {
            LOG.error("Failed to update metrics info when rebalance or reassignment, topologyId=" + str, e);
        }
    }

    public static void updateTaskMetricData(TaskMetricData taskMetricData, TaskMetricInfo taskMetricInfo) {
        taskMetricData.set_task_id(Integer.valueOf(taskMetricInfo.getTaskId()).intValue());
        taskMetricData.set_component_id(taskMetricInfo.getComponent());
        taskMetricData.set_gauge(taskMetricInfo.getGaugeData());
        taskMetricData.set_counter(taskMetricInfo.getCounterData());
        taskMetricData.set_meter(taskMetricInfo.getMeterData());
        taskMetricData.set_timer(taskMetricInfo.getTimerData());
        taskMetricData.set_histogram(taskMetricInfo.getHistogramData());
    }

    public static void updateWorkerMetricData(WorkerMetricData workerMetricData, WorkerMetricInfo workerMetricInfo) {
        workerMetricData.set_hostname(workerMetricInfo.getHostName());
        workerMetricData.set_port(workerMetricInfo.getPort().intValue());
        workerMetricData.set_gauge(workerMetricInfo.getGaugeData());
        workerMetricData.set_counter(workerMetricInfo.getCounterData());
        workerMetricData.set_meter(workerMetricInfo.getMeterData());
        workerMetricData.set_timer(workerMetricInfo.getTimerData());
        workerMetricData.set_histogram(workerMetricInfo.getHistogramData());
        Map map = workerMetricData.get_gauge();
        map.put("Used_Cpu", Double.valueOf(workerMetricInfo.getUsedCpu()));
        map.put("Used_Memory", Double.valueOf(Long.valueOf(workerMetricInfo.getUsedMem()).doubleValue()));
    }

    public static String getNimbusVersion() {
        String str = null;
        BufferedReader bufferedReader = null;
        try {
            try {
                BufferedReader bufferedReader2 = new BufferedReader(new FileReader(new File(System.getProperty("jstorm.home") + "/RELEASE")));
                bufferedReader = bufferedReader2;
                str = bufferedReader2.readLine();
                if (bufferedReader != null) {
                    try {
                        bufferedReader.close();
                    } catch (Exception e) {
                        LOG.error("Failed to close the reader of RELEASE", e);
                    }
                }
            } catch (Exception e2) {
                LOG.warn("Failed to get nimbus version", e2);
                if (bufferedReader != null) {
                    try {
                        bufferedReader.close();
                    } catch (Exception e3) {
                        LOG.error("Failed to close the reader of RELEASE", e3);
                    }
                }
            }
            return str;
        } catch (Throwable th) {
            if (bufferedReader != null) {
                try {
                    bufferedReader.close();
                } catch (Exception e4) {
                    LOG.error("Failed to close the reader of RELEASE", e4);
                }
            }
            throw th;
        }
    }
}
