package com.alibaba.jstorm.daemon.nimbus;

import backtype.storm.daemon.Shutdownable;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.Bolt;
import backtype.storm.generated.ClusterSummary;
import backtype.storm.generated.ComponentCommon;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.KillOptions;
import backtype.storm.generated.MonitorOptions;
import backtype.storm.generated.Nimbus;
import backtype.storm.generated.NotAliveException;
import backtype.storm.generated.RebalanceOptions;
import backtype.storm.generated.SpoutSpec;
import backtype.storm.generated.StateSpoutSpec;
import backtype.storm.generated.StormTopology;
import backtype.storm.generated.SubmitOptions;
import backtype.storm.generated.SupervisorWorkers;
import backtype.storm.generated.TaskMetricData;
import backtype.storm.generated.TaskSummary;
import backtype.storm.generated.TopologyAssignException;
import backtype.storm.generated.TopologyInfo;
import backtype.storm.generated.TopologyInitialStatus;
import backtype.storm.generated.TopologyMetricInfo;
import backtype.storm.generated.WorkerMetricData;
import backtype.storm.generated.WorkerSummary;
import backtype.storm.utils.BufferFileInputStream;
import backtype.storm.utils.TimeCacheMap;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.cluster.Cluster;
import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.cluster.DaemonCommon;
import com.alibaba.jstorm.cluster.StormBase;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.daemon.supervisor.SupervisorInfo;
import com.alibaba.jstorm.daemon.worker.WorkerMetricInfo;
import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
import com.alibaba.jstorm.task.Assignment;
import com.alibaba.jstorm.task.TaskInfo;
import com.alibaba.jstorm.task.TaskMetricInfo;
import com.alibaba.jstorm.utils.FailedAssignTopologyException;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.NetWorkUtils;
import com.alibaba.jstorm.utils.Thrift;
import com.alibaba.jstorm.utils.TimeUtils;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.security.InvalidParameterException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
import org.apache.thrift7.TException;

/* loaded from: input_file:com/alibaba/jstorm/daemon/nimbus/ServiceHandler.class */
public class ServiceHandler implements Nimbus.Iface, Shutdownable, DaemonCommon {
    private static final Logger LOG = Logger.getLogger(ServiceHandler.class);
    public static final int THREAD_NUM = 64;
    private NimbusData data;
    private Map<Object, Object> conf;

    public ServiceHandler(NimbusData nimbusData) {
        this.data = nimbusData;
        this.conf = nimbusData.getConf();
    }

    public void submitTopology(String str, String str2, String str3, StormTopology stormTopology) throws AlreadyAliveException, InvalidTopologyException, TopologyAssignException, TException {
        submitTopologyWithOpts(str, str2, str3, stormTopology, new SubmitOptions(TopologyInitialStatus.ACTIVE));
    }

    private void makeAssignment(String str, String str2, TopologyInitialStatus topologyInitialStatus) throws FailedAssignTopologyException {
        TopologyAssignEvent topologyAssignEvent = new TopologyAssignEvent();
        topologyAssignEvent.setTopologyId(str2);
        topologyAssignEvent.setScratch(false);
        topologyAssignEvent.setTopologyName(str);
        topologyAssignEvent.setOldStatus(Thrift.topologyInitialStatusToStormStatus(topologyInitialStatus));
        TopologyAssign.push(topologyAssignEvent);
        if (!topologyAssignEvent.waitFinish()) {
            throw new FailedAssignTopologyException(topologyAssignEvent.getErrorMsg());
        }
        LOG.info("Finish submit for " + str);
    }

    public void submitTopologyWithOpts(String str, String str2, String str3, StormTopology stormTopology, SubmitOptions submitOptions) throws AlreadyAliveException, InvalidTopologyException, TopologyAssignException, TException {
        LOG.info("Receive " + str + ", uploadedJarLocation:" + str2);
        try {
            checkTopologyActive(this.data, str, false);
            String TopologyNameToId = Common.TopologyNameToId(str, this.data.getSubmittedCount().incrementAndGet());
            try {
                Map map = (Map) JStormUtils.from_json(str3);
                if (map == null) {
                    LOG.warn("Failed to serialized Configuration");
                    throw new InvalidTopologyException("Failed to serilaze topology configuration");
                }
                map.put("topology.id", TopologyNameToId);
                map.put("topology.name", str);
                Map<Object, Object> normalizeConf = NimbusUtils.normalizeConf(this.conf, map, stormTopology);
                HashMap hashMap = new HashMap(this.conf);
                hashMap.putAll(normalizeConf);
                StormTopology normalizeTopology = NimbusUtils.normalizeTopology(normalizeConf, stormTopology, false);
                Common.validate_basic(normalizeTopology, hashMap, TopologyNameToId);
                StormClusterState stormClusterState = this.data.getStormClusterState();
                setupStormCode(this.conf, TopologyNameToId, str2, normalizeConf, normalizeTopology);
                setupZkTaskInfo(this.conf, TopologyNameToId, stormClusterState);
                LOG.info("Submit for " + str + " with conf " + map);
                makeAssignment(str, TopologyNameToId, submitOptions.get_initial_status());
            } catch (FailedAssignTopologyException e) {
                StringBuilder sb = new StringBuilder();
                sb.append("Fail to sumbit topology, Root cause:");
                if (e.getMessage() == null) {
                    sb.append("submit timeout");
                } else {
                    sb.append(e.getMessage());
                }
                sb.append("\n\n");
                sb.append("topologyId:" + TopologyNameToId);
                sb.append(", uploadedJarLocation:" + str2 + "\n");
                LOG.error(sb.toString(), e);
                throw new TopologyAssignException(sb.toString());
            } catch (InvalidParameterException e2) {
                StringBuilder sb2 = new StringBuilder();
                sb2.append("Fail to sumbit topology ");
                sb2.append(e2.getMessage());
                sb2.append(", cause:" + e2.getCause());
                sb2.append("\n\n");
                sb2.append("topologyId:" + TopologyNameToId);
                sb2.append(", uploadedJarLocation:" + str2 + "\n");
                LOG.error(sb2.toString(), e2);
                throw new InvalidParameterException(sb2.toString());
            } catch (InvalidTopologyException e3) {
                LOG.error("Topology is invalid. " + e3.get_msg());
                throw e3;
            } catch (Throwable th) {
                StringBuilder sb3 = new StringBuilder();
                sb3.append("Fail to sumbit topology ");
                sb3.append(th.getMessage());
                sb3.append(", cause:" + th.getCause());
                sb3.append("\n\n");
                sb3.append("topologyId:" + TopologyNameToId);
                sb3.append(", uploadedJarLocation:" + str2 + "\n");
                LOG.error(sb3.toString(), th);
                throw new TopologyAssignException(sb3.toString());
            }
        } catch (AlreadyAliveException e4) {
            LOG.info(str + " is already exist ");
            throw e4;
        } catch (Throwable th2) {
            LOG.info("Failed to check whether topology is alive or not", th2);
            throw new TException(th2);
        }
    }

    public void submitTopologyAfterRestart(String str, String str2) throws InvalidTopologyException, TopologyAssignException, TException {
        LOG.info("Restart " + str);
        try {
            checkTopologyActive(this.data, str, false);
            String str3 = str + "-" + this.data.getSubmittedCount().incrementAndGet() + "-" + TimeUtils.current_time_secs();
            String str4 = null;
            try {
                try {
                    try {
                        try {
                            String masterStormdistRoot = StormConfig.masterStormdistRoot(this.conf, str3);
                            str4 = StormConfig.masterStormTmpRoot(this.conf, str);
                            FileUtils.copyDirectory(new File(str4), new File(masterStormdistRoot));
                            StormTopology read_nimbus_topology_code = StormConfig.read_nimbus_topology_code(this.conf, str3);
                            if (str2 != null) {
                                Map read_nimbusTmp_topology_conf = StormConfig.read_nimbusTmp_topology_conf(this.conf, str);
                                read_nimbusTmp_topology_conf.putAll((Map) JStormUtils.from_json(str2));
                                Map normalizeConf = NimbusUtils.normalizeConf(this.conf, read_nimbusTmp_topology_conf, read_nimbus_topology_code);
                                File file = new File(StormConfig.stormconf_path(masterStormdistRoot));
                                if (file.exists()) {
                                    file.delete();
                                }
                                FileUtils.writeByteArrayToFile(file, Utils.serialize(normalizeConf));
                                StormTopology normalizeTopology = NimbusUtils.normalizeTopology(normalizeConf, read_nimbus_topology_code, true);
                                File file2 = new File(StormConfig.stormcode_path(masterStormdistRoot));
                                if (file2.exists()) {
                                    file2.delete();
                                }
                                FileUtils.writeByteArrayToFile(file2, Utils.serialize(normalizeTopology));
                            }
                            setupZkTaskInfo(this.conf, str3, this.data.getStormClusterState());
                            LOG.info("Submit for " + str + " with conf " + str2);
                            makeAssignment(str, str3, TopologyInitialStatus.ACTIVE);
                            if (str4 != null) {
                                try {
                                    File file3 = new File(str4);
                                    if (file3.exists()) {
                                        FileUtils.deleteDirectory(file3);
                                    }
                                } catch (Exception e) {
                                    LOG.error("Failed to delete stormTmpDir=" + str4, e);
                                }
                            }
                        } catch (FailedAssignTopologyException e2) {
                            StringBuilder sb = new StringBuilder();
                            sb.append("Fail to sumbit topology, Root cause:");
                            if (e2.getMessage() == null) {
                                sb.append("submit timeout");
                            } else {
                                sb.append(e2.getMessage());
                            }
                            sb.append("\n\n");
                            sb.append("topologyId:" + str3 + "\n");
                            LOG.error(sb.toString(), e2);
                            throw new TopologyAssignException(sb.toString());
                        }
                    } catch (InvalidParameterException e3) {
                        StringBuilder sb2 = new StringBuilder();
                        sb2.append("Fail to sumbit topology ");
                        sb2.append(e3.getMessage());
                        sb2.append(", cause:" + e3.getCause());
                        sb2.append("\n\n");
                        sb2.append("topologyId:" + str3 + "\n");
                        LOG.error(sb2.toString(), e3);
                        throw new InvalidParameterException(sb2.toString());
                    }
                } catch (Throwable th) {
                    StringBuilder sb3 = new StringBuilder();
                    sb3.append("Fail to sumbit topology ");
                    sb3.append(th.getMessage());
                    sb3.append(", cause:" + th.getCause());
                    sb3.append("\n\n");
                    sb3.append("topologyId:" + str3 + "\n");
                    LOG.error(sb3.toString(), th);
                    throw new TopologyAssignException(sb3.toString());
                }
            } catch (Throwable th2) {
                if (str4 != null) {
                    try {
                        File file4 = new File(str4);
                        if (file4.exists()) {
                            FileUtils.deleteDirectory(file4);
                        }
                    } catch (Exception e4) {
                        LOG.error("Failed to delete stormTmpDir=" + str4, e4);
                    }
                }
                throw th2;
            }
        } catch (AlreadyAliveException e5) {
            LOG.info("Fail to kill " + str + " before restarting");
        } catch (Throwable th3) {
            LOG.info("Failed to check whether topology is alive or not", th3);
            throw new TException(th3);
        }
    }

    public void killTopology(String str) throws NotAliveException, TException {
        killTopologyWithOpts(str, new KillOptions());
    }

    public void killTopologyWithOpts(String str, KillOptions killOptions) throws NotAliveException, TException {
        try {
            checkTopologyActive(this.data, str, true);
            Integer num = null;
            if (killOptions.is_set_wait_secs()) {
                num = Integer.valueOf(killOptions.get_wait_secs());
            }
            NimbusUtils.transitionName(this.data, str, true, StatusType.kill, num);
        } catch (Exception e) {
            String str2 = "Failed to kill topology " + str;
            LOG.error(str2, e);
            throw new TException(str2);
        } catch (NotAliveException e2) {
            String str3 = "KillTopology Error, no this topology " + str;
            LOG.error(str3, e2);
            throw new NotAliveException(str3);
        }
    }

    public void activate(String str) throws NotAliveException, TException {
        try {
            NimbusUtils.transitionName(this.data, str, true, StatusType.activate, new Object[0]);
        } catch (Exception e) {
            String str2 = "Failed to active topology " + str;
            LOG.error(str2, e);
            throw new TException(str2);
        } catch (NotAliveException e2) {
            String str3 = "Activate Error, no this topology " + str;
            LOG.error(str3, e2);
            throw new NotAliveException(str3);
        }
    }

    public void deactivate(String str) throws NotAliveException, TException {
        try {
            NimbusUtils.transitionName(this.data, str, true, StatusType.inactivate, new Object[0]);
        } catch (Exception e) {
            String str2 = "Failed to deactivate topology " + str;
            LOG.error(str2, e);
            throw new TException(str2);
        } catch (NotAliveException e2) {
            String str3 = "Deactivate Error, no this topology " + str;
            LOG.error(str3, e2);
            throw new NotAliveException(str3);
        }
    }

    public void rebalance(String str, RebalanceOptions rebalanceOptions) throws NotAliveException, TException, InvalidTopologyException {
        try {
            checkTopologyActive(this.data, str, true);
            Integer num = null;
            if (rebalanceOptions != null && rebalanceOptions.is_set_wait_secs()) {
                num = Integer.valueOf(rebalanceOptions.get_wait_secs());
            }
            NimbusUtils.transitionName(this.data, str, true, StatusType.rebalance, num);
        } catch (Exception e) {
            String str2 = "Failed to rebalance topology " + str;
            LOG.error(str2, e);
            throw new TException(str2);
        } catch (NotAliveException e2) {
            String str3 = "Rebalance Error, no this topology " + str;
            LOG.error(str3, e2);
            throw new NotAliveException(str3);
        }
    }

    public void restart(String str, String str2) throws NotAliveException, InvalidTopologyException, TopologyAssignException, TException {
        try {
            String findTopoFileInStormdist = NimbusUtils.findTopoFileInStormdist(this.data, str);
            if (findTopoFileInStormdist == null) {
                throw new InvalidTopologyException("Topology=" + str + " is not exist!");
            }
            FileUtils.copyDirectory(new File(StormConfig.masterStormdistRoot(this.conf, findTopoFileInStormdist)), new File(StormConfig.masterStormTmpRoot(this.conf, str)));
            deactivate(str);
            JStormUtils.sleepMs(5000L);
            KillOptions killOptions = new KillOptions();
            killOptions.set_wait_secs(1);
            killTopologyWithOpts(str, killOptions);
            JStormUtils.sleepMs((JStormUtils.parseInt(this.conf.get("supervisor.monitor.frequency.secs")).intValue() * 1000) + 5000);
            submitTopologyAfterRestart(str, str2);
        } catch (Exception e) {
            LOG.info("InvalidTopologyException: " + e.getMessage());
            throw new InvalidTopologyException(e.getMessage());
        }
    }

    public void beginLibUpload(String str) throws TException {
        try {
            this.data.getUploaders().put(str, Channels.newChannel(new FileOutputStream(str)));
            LOG.info("Begin upload file from client to " + str);
        } catch (FileNotFoundException e) {
            LOG.error("Fail to upload jar " + str, e);
            throw new TException(e);
        }
    }

    public String beginFileUpload() throws TException {
        String str = null;
        try {
            String uuid = UUID.randomUUID().toString();
            String str2 = StormConfig.masterInbox(this.conf) + Cluster.ZK_SEPERATOR + uuid;
            FileUtils.forceMkdir(new File(str2));
            FileUtils.cleanDirectory(new File(str2));
            str = str2 + "/stormjar-" + uuid + ".jar";
            this.data.getUploaders().put(str, Channels.newChannel(new FileOutputStream(str)));
            LOG.info("Begin upload file from client to " + str);
            return str2;
        } catch (FileNotFoundException e) {
            LOG.error("File not found: " + str, e);
            throw new TException(e);
        } catch (IOException e2) {
            LOG.error("Upload file error: " + str, e2);
            throw new TException(e2);
        }
    }

    public void uploadChunk(String str, ByteBuffer byteBuffer) throws TException {
        TimeCacheMap<Object, Object> uploaders = this.data.getUploaders();
        Object obj = uploaders.get(str);
        if (obj == null) {
            throw new TException("File for that location does not exist (or timed out) " + str);
        }
        try {
            if (!(obj instanceof WritableByteChannel)) {
                throw new TException("Object isn't WritableByteChannel for " + str);
            }
            WritableByteChannel writableByteChannel = (WritableByteChannel) obj;
            writableByteChannel.write(byteBuffer);
            uploaders.put(str, writableByteChannel);
        } catch (IOException e) {
            LOG.error(" WritableByteChannel write filed when uploadChunk " + str);
            throw new TException(e);
        }
    }

    public void finishFileUpload(String str) throws TException {
        TimeCacheMap<Object, Object> uploaders = this.data.getUploaders();
        Object obj = uploaders.get(str);
        if (obj == null) {
            throw new TException("File for that location does not exist (or timed out)");
        }
        try {
            if (!(obj instanceof WritableByteChannel)) {
                throw new TException("Object isn't WritableByteChannel for " + str);
            }
            ((WritableByteChannel) obj).close();
            uploaders.remove(str);
            LOG.info("Finished uploading file from client: " + str);
        } catch (IOException e) {
            LOG.error(" WritableByteChannel close failed when finishFileUpload " + str);
        }
    }

    public String beginFileDownload(String str) throws TException {
        try {
            BufferFileInputStream bufferFileInputStream = new BufferFileInputStream(str, JStormUtils.parseInt(this.conf.get("nimbus.thrift.max_buffer_size"), 1048576).intValue() / 2);
            String uuid = UUID.randomUUID().toString();
            this.data.getDownloaders().put(uuid, bufferFileInputStream);
            return uuid;
        } catch (FileNotFoundException e) {
            LOG.error(e + "file:" + str + " not found");
            throw new TException(e);
        }
    }

    public ByteBuffer downloadChunk(String str) throws TException {
        TimeCacheMap<Object, Object> downloaders = this.data.getDownloaders();
        Object obj = downloaders.get(str);
        if (obj == null) {
            throw new TException("Could not find input stream for that id");
        }
        try {
            if (!(obj instanceof BufferFileInputStream)) {
                throw new TException("Object isn't BufferFileInputStream for " + str);
            }
            BufferFileInputStream bufferFileInputStream = (BufferFileInputStream) obj;
            byte[] read = bufferFileInputStream.read();
            if (read == null) {
                return ByteBuffer.wrap(new byte[0]);
            }
            downloaders.put(str, bufferFileInputStream);
            return ByteBuffer.wrap(read);
        } catch (IOException e) {
            LOG.error("BufferFileInputStream read failed when downloadChunk ", e);
            throw new TException(e);
        }
    }

    public ClusterSummary getClusterInfo() throws TException {
        try {
            StormClusterState stormClusterState = this.data.getStormClusterState();
            HashMap hashMap = new HashMap();
            int uptime = this.data.uptime();
            ArrayList arrayList = new ArrayList();
            for (Map.Entry<String, StormBase> entry : Cluster.topology_bases(stormClusterState).entrySet()) {
                String key = entry.getKey();
                StormBase value = entry.getValue();
                Assignment assignment_info = stormClusterState.assignment_info(key, null);
                if (assignment_info == null) {
                    LOG.error("Failed to get assignment of " + key);
                } else {
                    hashMap.put(key, assignment_info);
                    Map<Integer, String> map = null;
                    try {
                        map = stormClusterState.topo_lastErr_time(key);
                    } catch (Exception e) {
                        LOG.error("Failed to get last error timestamp map for " + key + ", and begin to remove the corrupt data", e);
                        try {
                            stormClusterState.remove_lastErr_time(key);
                        } catch (Exception e2) {
                            LOG.error("Failed to remove last error timestamp in ZK for " + key, e2);
                        }
                    }
                    arrayList.add(NimbusUtils.mkTopologySummary(assignment_info, key, value.getStormName(), value.getStatusString(), TimeUtils.time_delta(value.getLanchTimeSecs()), map));
                }
            }
            ClusterSummary clusterSummary = new ClusterSummary(NimbusUtils.mkSupervisorSummaries(Cluster.allSupervisorInfo(stormClusterState, null), hashMap), uptime, arrayList);
            clusterSummary.set_version(NimbusUtils.getNimbusVersion());
            return clusterSummary;
        } catch (TException e3) {
            LOG.info("Failed to get ClusterSummary ", e3);
            throw e3;
        } catch (Exception e4) {
            LOG.info("Failed to get ClusterSummary ", e4);
            throw new TException(e4);
        }
    }

    public SupervisorWorkers getSupervisorWorkers(String str) throws NotAliveException, TException {
        try {
            StormClusterState stormClusterState = this.data.getStormClusterState();
            String str2 = null;
            SupervisorInfo supervisorInfo = null;
            String host2Ip = NetWorkUtils.host2Ip(str);
            String ip2Host = NetWorkUtils.ip2Host(str);
            for (Map.Entry<String, SupervisorInfo> entry : Cluster.allSupervisorInfo(stormClusterState, null).entrySet()) {
                SupervisorInfo value = entry.getValue();
                if (value.getHostName().equals(ip2Host) || value.getHostName().equals(host2Ip)) {
                    str2 = entry.getKey();
                    supervisorInfo = value;
                    break;
                }
            }
            if (str2 == null) {
                throw new TException("No supervisor of " + str);
            }
            HashMap hashMap = new HashMap();
            for (Map.Entry<String, StormBase> entry2 : Cluster.topology_bases(stormClusterState).entrySet()) {
                String key = entry2.getKey();
                entry2.getValue();
                Assignment assignment_info = stormClusterState.assignment_info(key, null);
                if (assignment_info == null) {
                    LOG.error("Failed to get assignment of " + key);
                } else {
                    hashMap.put(key, assignment_info);
                }
            }
            TreeMap treeMap = new TreeMap();
            HashMap hashMap2 = new HashMap();
            for (Map.Entry entry3 : hashMap.entrySet()) {
                String str3 = (String) entry3.getKey();
                Assignment assignment = (Assignment) entry3.getValue();
                HashMap<Integer, String> hashMap3 = Cluster.topology_task_info(stormClusterState, str3);
                HashMap<Integer, String> hashMap4 = Cluster.topology_task_compType(stormClusterState, str3);
                for (ResourceWorkerSlot resourceWorkerSlot : assignment.getWorkers()) {
                    if (str2.equals(resourceWorkerSlot.getNodeId())) {
                        Integer num = (Integer) hashMap2.get(str2);
                        if (num == null) {
                            num = 0;
                            hashMap2.put(str2, null);
                        }
                        hashMap2.put(str2, Integer.valueOf(num.intValue() + 1));
                        Integer valueOf = Integer.valueOf(resourceWorkerSlot.getPort());
                        WorkerSummary workerSummary = (WorkerSummary) treeMap.get(valueOf);
                        if (workerSummary == null) {
                            workerSummary = new WorkerSummary();
                            workerSummary.set_port(valueOf.intValue());
                            workerSummary.set_topology(str3);
                            workerSummary.set_tasks(new ArrayList());
                            treeMap.put(valueOf, workerSummary);
                        }
                        for (Integer num2 : resourceWorkerSlot.getTasks()) {
                            workerSummary.get_tasks().add(NimbusUtils.mkSimpleTaskSummary(resourceWorkerSlot, num2.intValue(), hashMap3.get(num2), hashMap4.get(num2), str, TimeUtils.time_delta(assignment.getTaskStartTimeSecs().get(num2).intValue())));
                        }
                    }
                }
            }
            ArrayList arrayList = new ArrayList();
            arrayList.addAll(treeMap.values());
            return new SupervisorWorkers(NimbusUtils.mkSupervisorSummary(supervisorInfo, str2, hashMap2), arrayList);
        } catch (TException e) {
            LOG.info("Failed to get ClusterSummary ", e);
            throw e;
        } catch (Exception e2) {
            LOG.info("Failed to get ClusterSummary ", e2);
            throw new TException(e2);
        }
    }

    public TopologyInfo getTopologyInfo(String str) throws NotAliveException, TException {
        TopologyInfo topologyInfo = new TopologyInfo();
        StormClusterState stormClusterState = this.data.getStormClusterState();
        try {
            StormBase storm_base = stormClusterState.storm_base(str, null);
            if (storm_base == null) {
                throw new NotAliveException("No topology of " + str);
            }
            topologyInfo.set_id(str);
            topologyInfo.set_name(storm_base.getStormName());
            topologyInfo.set_uptime_secs(TimeUtils.time_delta(storm_base.getLanchTimeSecs()));
            topologyInfo.set_status(storm_base.getStatusString());
            Assignment assignment_info = stormClusterState.assignment_info(str, null);
            if (assignment_info == null) {
                throw new TException("Failed to get StormBase from ZK of " + str);
            }
            Map<Integer, TaskSummary> mkTaskSummary = NimbusUtils.mkTaskSummary(stormClusterState, assignment_info, Cluster.topology_task_info(stormClusterState, str), str);
            ArrayList arrayList = new ArrayList();
            Iterator<Map.Entry<Integer, TaskSummary>> it = mkTaskSummary.entrySet().iterator();
            while (it.hasNext()) {
                arrayList.add(it.next().getValue());
            }
            topologyInfo.set_tasks(arrayList);
            topologyInfo.set_workers(NimbusUtils.mkWorkerSummary(str, assignment_info, mkTaskSummary));
            return topologyInfo;
        } catch (TException e) {
            LOG.info("Failed to get topologyInfo " + str, e);
            throw e;
        } catch (Exception e2) {
            LOG.info("Failed to get topologyInfo " + str, e2);
            throw new TException("Failed to get topologyInfo" + str);
        }
    }

    public String getTopologyConf(String str) throws NotAliveException, TException {
        try {
            return JStormUtils.to_json(StormConfig.read_nimbus_topology_conf(this.conf, str));
        } catch (IOException e) {
            LOG.info("Failed to get configuration of " + str, e);
            throw new TException(e);
        }
    }

    public StormTopology getTopology(String str) throws NotAliveException, TException {
        try {
            StormTopology read_nimbus_topology_code = StormConfig.read_nimbus_topology_code(this.conf, str);
            if (read_nimbus_topology_code == null) {
                throw new TException("topology:" + str + "is null");
            }
            return Common.system_topology(StormConfig.read_nimbus_topology_conf(this.conf, str), read_nimbus_topology_code);
        } catch (Exception e) {
            LOG.error("Failed to get topology " + str + ",", e);
            throw new TException("Failed to get system_topology");
        }
    }

    public void shutdown() {
        LOG.info("Begin to shut down master");
        LOG.info("Successfully shut down master");
    }

    @Override // com.alibaba.jstorm.cluster.DaemonCommon
    public boolean waiting() {
        return false;
    }

    public void checkTopologyActive(NimbusData nimbusData, String str, boolean z) throws Exception {
        if (isTopologyActive(nimbusData.getStormClusterState(), str) != z) {
            if (!z) {
                throw new AlreadyAliveException(str + " is already active");
            }
            throw new NotAliveException(str + " is not alive");
        }
    }

    public boolean isTopologyActive(StormClusterState stormClusterState, String str) throws Exception {
        boolean z = false;
        if (Cluster.get_topology_id(stormClusterState, str) != null) {
            z = true;
        }
        return z;
    }

    private void setupStormCode(Map<Object, Object> map, String str, String str2, Map<Object, Object> map2, StormTopology stormTopology) throws IOException {
        String masterStormdistRoot = StormConfig.masterStormdistRoot(map, str);
        FileUtils.forceMkdir(new File(masterStormdistRoot));
        FileUtils.cleanDirectory(new File(masterStormdistRoot));
        setupJar(map, str2, masterStormdistRoot);
        FileUtils.writeByteArrayToFile(new File(StormConfig.stormcode_path(masterStormdistRoot)), Utils.serialize(stormTopology));
        FileUtils.writeByteArrayToFile(new File(StormConfig.stormconf_path(masterStormdistRoot)), Utils.serialize(map2));
    }

    private void setupJar(Map<Object, Object> map, String str, String str2) throws IOException {
        if (StormConfig.local_mode(map)) {
            return;
        }
        String[] split = str.split(Cluster.ZK_SEPERATOR);
        String str3 = str + "/stormjar-" + split[split.length - 1] + ".jar";
        File file = new File(str3);
        if (!file.exists()) {
            throw new IllegalArgumentException(str3 + " to copy to " + str2 + " does not exist!");
        }
        FileUtils.copyFile(file, new File(StormConfig.stormjar_path(str2)));
        file.delete();
        String stormlib_path = StormConfig.stormlib_path(str2);
        File file2 = new File(str);
        if (!file2.exists()) {
            throw new IllegalArgumentException(str + " to copy to " + str2 + " does not exist!");
        }
        FileUtils.copyDirectory(file2, new File(stormlib_path));
    }

    public void setupZkTaskInfo(Map<Object, Object> map, String str, StormClusterState stormClusterState) throws Exception {
        stormClusterState.setup_heartbeats(str);
        Map<Integer, TaskInfo> mkTaskComponentAssignments = mkTaskComponentAssignments(map, str);
        if (mkTaskComponentAssignments == null) {
            throw new InvalidTopologyException("Failed to generate TaskIDs map");
        }
        for (Map.Entry<Integer, TaskInfo> entry : mkTaskComponentAssignments.entrySet()) {
            stormClusterState.set_task(str, entry.getKey().intValue(), entry.getValue());
        }
    }

    public Map<Integer, TaskInfo> mkTaskComponentAssignments(Map<Object, Object> map, String str) throws IOException, InvalidTopologyException {
        Map<Object, Object> read_nimbus_topology_conf = StormConfig.read_nimbus_topology_conf(map, str);
        StormTopology read_nimbus_topology_code = StormConfig.read_nimbus_topology_code(map, str);
        TreeMap treeMap = new TreeMap();
        StormTopology system_topology = Common.system_topology(read_nimbus_topology_conf, read_nimbus_topology_code);
        mkTaskMaker(read_nimbus_topology_conf, system_topology.get_state_spouts(), treeMap, mkTaskMaker(read_nimbus_topology_conf, system_topology.get_spouts(), treeMap, mkTaskMaker(read_nimbus_topology_conf, system_topology.get_bolts(), treeMap, 0)));
        return treeMap;
    }

    public Integer mkTaskMaker(Map<Object, Object> map, Map<String, ?> map2, Map<Integer, TaskInfo> map3, Integer num) {
        if (map2 == null) {
            LOG.warn("Component map is empty");
            return num;
        }
        for (Map.Entry<String, ?> entry : map2.entrySet()) {
            Object value = entry.getValue();
            ComponentCommon componentCommon = null;
            String str = "bolt";
            if (value instanceof Bolt) {
                componentCommon = ((Bolt) value).get_common();
                str = "bolt";
            } else if (value instanceof SpoutSpec) {
                componentCommon = ((SpoutSpec) value).get_common();
                str = "spout";
            } else if (value instanceof StateSpoutSpec) {
                componentCommon = ((StateSpoutSpec) value).get_common();
                str = "spout";
            }
            if (componentCommon == null) {
                throw new RuntimeException("No ComponentCommon of " + ((Object) entry.getKey()));
            }
            int parallelismHint = Thrift.parallelismHint(componentCommon);
            Integer valueOf = Integer.valueOf(parallelismHint);
            Integer parseInt = JStormUtils.parseInt(new HashMap(map).get("topology.max.task.parallelism"));
            if (parseInt != null) {
                valueOf = Integer.valueOf(Math.min(parseInt.intValue(), parallelismHint));
            }
            for (int i = 0; i < valueOf.intValue(); i++) {
                num = Integer.valueOf(num.intValue() + 1);
                map3.put(num, new TaskInfo(entry.getKey(), str));
            }
        }
        return num;
    }

    public String getNimbusConf() throws TException {
        return null;
    }

    public StormTopology getUserTopology(String str) throws NotAliveException, TException {
        return null;
    }

    public void metricMonitor(String str, MonitorOptions monitorOptions) throws NotAliveException, TException {
        boolean is_isEnable = monitorOptions.is_isEnable();
        StormClusterState stormClusterState = this.data.getStormClusterState();
        try {
            String str2 = Cluster.get_topology_id(stormClusterState, str);
            if (null == str2) {
                throw new NotAliveException("Failed to update metricsMonitor status as " + str + " is not alive");
            }
            NimbusUtils.updateMetricMonitorStatus(stormClusterState, str2, is_isEnable);
        } catch (Exception e) {
            LOG.error("Failed to update metricsMonitor " + str, e);
            throw new TException(e);
        }
    }

    public TopologyMetricInfo getTopologyMetric(String str) throws NotAliveException, TException {
        LOG.debug("Nimbus service handler, getTopologyMetric, topology ID: " + str);
        TopologyMetricInfo topologyMetricInfo = new TopologyMetricInfo();
        StormClusterState stormClusterState = this.data.getStormClusterState();
        topologyMetricInfo.set_topology_id(str);
        try {
            Map<Integer, TaskInfo> task_info_list = stormClusterState.task_info_list(str);
            for (TaskMetricInfo taskMetricInfo : stormClusterState.get_task_metric_list(str)) {
                TaskMetricData taskMetricData = new TaskMetricData();
                NimbusUtils.updateTaskMetricData(taskMetricData, taskMetricInfo);
                taskMetricData.set_component_id(task_info_list.get(Integer.valueOf(Integer.parseInt(taskMetricInfo.getTaskId()))).getComponentId());
                topologyMetricInfo.add_to_task_metric_list(taskMetricData);
            }
            for (WorkerMetricInfo workerMetricInfo : stormClusterState.get_worker_metric_list(str)) {
                WorkerMetricData workerMetricData = new WorkerMetricData();
                NimbusUtils.updateWorkerMetricData(workerMetricData, workerMetricInfo);
                topologyMetricInfo.add_to_worker_metric_list(workerMetricData);
            }
            return topologyMetricInfo;
        } catch (Exception e) {
            LOG.error("Failed to get topology Metric Data " + str, e);
            throw new TException(e);
        }
    }
}
