package com.alibaba.jstorm.daemon.supervisor;

import backtype.storm.messaging.IContext;
import backtype.storm.utils.LocalState;
import backtype.storm.utils.Time;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.daemon.worker.ProcessSimulator;
import com.alibaba.jstorm.daemon.worker.State;
import com.alibaba.jstorm.daemon.worker.Worker;
import com.alibaba.jstorm.daemon.worker.WorkerHeartbeat;
import com.alibaba.jstorm.daemon.worker.metrics.AlimonitorClient;
import com.alibaba.jstorm.message.zeroMq.MQContext;
import com.alibaba.jstorm.task.LocalAssignment;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.PathUtils;
import com.alibaba.jstorm.utils.TimeFormat;
import com.alibaba.jstorm.utils.TimeUtils;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/alibaba/jstorm/daemon/supervisor/SyncProcessEvent.class */
public class SyncProcessEvent extends ShutdownWork {
    private static Logger LOG = Logger.getLogger(SyncProcessEvent.class);
    private LocalState localState;
    private Map conf;
    private ConcurrentHashMap<String, String> workerThreadPids;
    private String supervisorId;
    private IContext sharedContext;
    private CgroupManager cgroupManager;
    private SandBoxMaker sandBoxMaker;

    public SyncProcessEvent(String str, Map map, LocalState localState, ConcurrentHashMap<String, String> concurrentHashMap, IContext iContext) {
        this.supervisorId = str;
        this.conf = map;
        this.localState = localState;
        this.workerThreadPids = concurrentHashMap;
        this.sharedContext = iContext;
        this.sandBoxMaker = new SandBoxMaker(map);
        if (ConfigExtension.isEnableCgroup(map)) {
            this.cgroupManager = new CgroupManager(map);
        }
    }

    public void run() {
        LOG.debug("Syncing processes");
        try {
            try {
                Map<Integer, LocalAssignment> map = (Map) this.localState.get(Common.LS_LOCAL_ASSIGNMENTS);
                if (map == null) {
                    map = new HashMap();
                }
                LOG.debug("Assigned tasks: " + map);
                try {
                    Map<String, StateHeartbeat> localWorkerStats = getLocalWorkerStats(this.conf, this.localState, map);
                    LOG.debug("Allocated: " + localWorkerStats);
                    startNewWorkers(killUselessWorkers(localWorkerStats), map);
                } catch (Exception e) {
                    LOG.error("Failed to get Local worker stats");
                    throw e;
                }
            } catch (IOException e2) {
                LOG.error("Failed to get LOCAL_ASSIGNMENTS from LocalState", e2);
                throw e2;
            }
        } catch (Exception e3) {
            LOG.error("Failed Sync Process", e3);
        }
    }

    public void waitForWorkersLaunch(Map map, Collection<String> collection) throws IOException, InterruptedException {
        int current_time_secs = TimeUtils.current_time_secs();
        Iterator<String> it = collection.iterator();
        while (it.hasNext()) {
            waitForWorkerLaunch(map, it.next(), current_time_secs);
        }
    }

    public void waitForWorkerLaunch(Map map, String str, int i) throws IOException, InterruptedException {
        LocalState worker_state = StormConfig.worker_state(map, str);
        while (((WorkerHeartbeat) worker_state.get(Common.LS_WORKER_HEARTBEAT)) == null && TimeUtils.current_time_secs() - i < JStormUtils.parseInt(map.get("supervisor.worker.start.timeout.secs")).intValue()) {
            LOG.info(str + " still hasn't started");
            Time.sleep(500L);
        }
        if (((WorkerHeartbeat) worker_state.get(Common.LS_WORKER_HEARTBEAT)) == null) {
            LOG.error("Failed to start Worker " + str);
        } else {
            LOG.info("Successfully start worker " + str);
        }
    }

    public Map<String, StateHeartbeat> getLocalWorkerStats(Map map, LocalState localState, Map<Integer, LocalAssignment> map2) throws Exception {
        HashMap hashMap = new HashMap();
        int current_time_secs = TimeUtils.current_time_secs();
        for (Map.Entry<String, WorkerHeartbeat> entry : readWorkerHeartbeats(map).entrySet()) {
            String str = entry.getKey().toString();
            WorkerHeartbeat value = entry.getValue();
            State state = value == null ? State.notStarted : !matchesAssignment(value, map2) ? State.disallowed : current_time_secs - value.getTimeSecs() > JStormUtils.parseInt(map.get("supervisor.worker.timeout.secs")).intValue() ? State.timedOut : State.valid;
            if (state != State.valid) {
                LOG.info("Worker:" + str + " state:" + state + " WorkerHeartbeat: " + value + " at supervisor time-secs " + current_time_secs);
            } else {
                LOG.debug("Worker:" + str + " state:" + state + " WorkerHeartbeat: " + value + " at supervisor time-secs " + current_time_secs);
            }
            hashMap.put(str, new StateHeartbeat(state, value));
        }
        return hashMap;
    }

    public boolean matchesAssignment(WorkerHeartbeat workerHeartbeat, Map<Integer, LocalAssignment> map) {
        boolean z = true;
        LocalAssignment localAssignment = map.get(workerHeartbeat.getPort());
        if (localAssignment == null) {
            z = false;
        } else if (!workerHeartbeat.getTopologyId().equals(localAssignment.getTopologyId())) {
            LOG.info("topology id not equal whb=" + workerHeartbeat.getTopologyId() + ",localAssignment=" + localAssignment.getTopologyId());
            z = false;
        } else if (!workerHeartbeat.getTaskIds().equals(localAssignment.getTaskIds())) {
            LOG.info("task-id isn't equal whb=" + workerHeartbeat.getTaskIds() + ",localAssignment=" + localAssignment.getTaskIds());
            z = false;
        }
        return z;
    }

    public Map<String, WorkerHeartbeat> readWorkerHeartbeats(Map map) throws Exception {
        HashMap hashMap = new HashMap();
        String worker_root = StormConfig.worker_root(map);
        List<String> read_dir_contents = PathUtils.read_dir_contents(worker_root);
        if (read_dir_contents == null) {
            LOG.info("No worker dir under " + worker_root);
            return hashMap;
        }
        for (String str : read_dir_contents) {
            hashMap.put(str, readWorkerHeartbeat(map, str));
        }
        return hashMap;
    }

    public WorkerHeartbeat readWorkerHeartbeat(Map map, String str) throws Exception {
        try {
            return (WorkerHeartbeat) StormConfig.worker_state(map, str).get(Common.LS_WORKER_HEARTBEAT);
        } catch (IOException e) {
            LOG.error("Failed to get worker Heartbeat", e);
            return null;
        }
    }

    public void launchWorker(Map map, IContext iContext, String str, String str2, Integer num, String str3, ConcurrentHashMap<String, String> concurrentHashMap) throws Exception {
        String uuid = UUID.randomUUID().toString();
        ProcessSimulator.registerProcess(uuid, Worker.mk_worker(map, iContext, str, str2, num.intValue(), str3, null));
        concurrentHashMap.put(str3, uuid);
    }

    private String getClassPath(String str, String str2, Map map) {
        String[] split = JStormUtils.current_classpath().split(":");
        HashSet<String> hashSet = new HashSet();
        for (String str3 : split) {
            hashSet.add(str3);
        }
        if (str2 != null) {
            for (String str4 : PathUtils.read_dir_contents(str2)) {
                if (str4.endsWith(".jar")) {
                    hashSet.add(str2 + File.separator + str4);
                }
            }
            for (String str5 : PathUtils.read_dir_contents(str2 + File.separator + "lib")) {
                if (str5.endsWith(".jar")) {
                    hashSet.add(str2 + File.separator + "lib" + File.separator + str5);
                }
            }
        }
        CharSequence charSequence = null;
        String str6 = (String) map.get("storm.messaging.transport");
        if (str6.equals(MQContext.class.getCanonicalName())) {
            charSequence = "jeromq";
        } else if (str6.equals("com.alibaba.jstorm.message.jeroMq.JMQContext")) {
            charSequence = "jzmq";
        }
        StringBuilder sb = new StringBuilder();
        if (charSequence != null) {
            for (String str7 : hashSet) {
                if (!str7.contains(charSequence)) {
                    sb.append(str7 + ":");
                }
            }
        } else {
            Iterator it = hashSet.iterator();
            while (it.hasNext()) {
                sb.append(((String) it.next()) + ":");
            }
        }
        if (ConfigExtension.isEnableTopologyClassLoader(map)) {
            return sb.toString().substring(0, sb.length() - 1);
        }
        sb.append(str);
        return sb.toString();
    }

    public String getChildOpts(Map map) {
        String str = " ";
        if (map.get("topology.worker.childopts") != null) {
            str = str + ((String) map.get("topology.worker.childopts"));
        } else if (ConfigExtension.getWorkerGc(map) != null) {
            str = str + ConfigExtension.getWorkerGc(map);
        }
        return str;
    }

    private String getGcDumpParam(Map map) {
        String logDir = JStormUtils.getLogDir();
        return " -Xloggc:" + logDir + File.separator + "%TOPOLOGYID%-worker-%ID%-" + TimeFormat.getSecond(new Date()) + "-gc.log -verbose:gc -XX:HeapDumpPath=" + logDir + " ";
    }

    public void launchWorker(Map map, IContext iContext, String str, String str2, Integer num, String str3, LocalAssignment localAssignment) throws IOException {
        String supervisor_stormdist_root = StormConfig.supervisor_stormdist_root(map, str);
        String stormjar_path = StormConfig.stormjar_path(supervisor_stormdist_root);
        Map read_supervisor_topology_conf = StormConfig.read_supervisor_topology_conf(map, str);
        HashMap hashMap = new HashMap();
        hashMap.putAll(map);
        hashMap.putAll(read_supervisor_topology_conf);
        String property = System.getProperty("jstorm.home");
        long mem = localAssignment.getMem();
        int cpu = localAssignment.getCpu();
        String replace = (getChildOpts(hashMap) + getGcDumpParam(hashMap)).replace("%ID%", num.toString()).replace("%TOPOLOGYID%", str);
        String replace2 = property != null ? replace.replace(SandBoxMaker.JSTORM_HOME_KEY, property) : replace.replace(SandBoxMaker.JSTORM_HOME_KEY, "./");
        HashMap hashMap2 = new HashMap();
        if (ConfigExtension.getWorkerRedirectOutput(hashMap)) {
            hashMap2.put("REDIRECT", "true");
        } else {
            hashMap2.put("REDIRECT", "false");
        }
        String genLogName = JStormUtils.genLogName(localAssignment.getTopologyName(), num);
        hashMap2.put("LD_LIBRARY_PATH", (String) hashMap.get("java.library.path"));
        StringBuilder sb = new StringBuilder();
        try {
            if (this.cgroupManager != null) {
                sb.append(this.cgroupManager.startNewWorker(cpu, str3));
            }
            sb.append("java -server ");
            sb.append(" -Xms" + mem);
            sb.append(" -Xmx" + mem + " ");
            sb.append(" -Xmn" + (mem / 3) + " ");
            sb.append(" -XX:PermSize=" + (mem / 16));
            sb.append(" -XX:MaxPermSize=" + (mem / 8));
            sb.append(" " + replace2);
            sb.append(" " + (localAssignment.getJvm() == null ? AlimonitorClient.DEFAULT_ERROR_INFO : localAssignment.getJvm()));
            sb.append(" -Djava.library.path=");
            sb.append((String) hashMap.get("java.library.path"));
            sb.append(" -Dlogfile.name=");
            sb.append(genLogName);
            if (property != null) {
                sb.append(" -Dlog4j.configuration=File:" + property + "/conf/jstorm.log4j.properties");
                sb.append(" -Djstorm.home=");
                sb.append(property);
            } else {
                sb.append(" -Dlog4j.configuration=File:jstorm.log4j.properties");
            }
            String classPath = getClassPath(stormjar_path, property, hashMap);
            String str4 = (String) hashMap.get("worker.classpath");
            List list = (List) read_supervisor_topology_conf.get("topology.lib.name");
            StringBuilder sb2 = new StringBuilder();
            if (list != null) {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    sb2.append(StormConfig.stormlib_path(supervisor_stormdist_root, (String) it.next())).append(":");
                }
            }
            String str5 = str4 + ":" + sb2.toString();
            HashMap hashMap3 = new HashMap();
            hashMap3.put(SandBoxMaker.CLASS_PATH_KEY, classPath + ":" + str5);
            sb.append(this.sandBoxMaker.sandboxPolicy(str3, hashMap3));
            sb.append(" -cp ");
            sb.append(classPath);
            if (!ConfigExtension.isEnableTopologyClassLoader(hashMap)) {
                sb.append(":").append(str5);
            }
            sb.append(" com.alibaba.jstorm.daemon.worker.Worker ");
            sb.append(str);
            sb.append(" ");
            sb.append(str2);
            sb.append(" ");
            sb.append(num);
            sb.append(" ");
            sb.append(str3);
            sb.append(" ");
            sb.append(str5 + ":" + stormjar_path);
            LOG.info("Launching worker with command: " + ((Object) sb));
            LOG.info("Environment:" + hashMap2.toString());
            JStormUtils.launch_process(sb.toString(), hashMap2, true);
        } catch (Exception e) {
            LOG.error("fail to prepare cgroup to workerId: " + str3, e);
        }
    }

    private Set<Integer> killUselessWorkers(Map<String, StateHeartbeat> map) {
        HashMap hashMap = new HashMap();
        HashSet hashSet = new HashSet();
        for (Map.Entry<String, StateHeartbeat> entry : map.entrySet()) {
            String key = entry.getKey();
            StateHeartbeat value = entry.getValue();
            if (value.getState().equals(State.valid)) {
                hashSet.add(value.getHeartbeat().getPort());
            } else {
                if (value.getHeartbeat() != null) {
                    hashMap.put(key, value.getHeartbeat().getTopologyId());
                } else {
                    hashMap.put(key, null);
                }
                StringBuilder sb = new StringBuilder();
                sb.append("Shutting down and clearing state for id ");
                sb.append(key);
                sb.append(";State:");
                sb.append(value);
                LOG.info(sb);
            }
        }
        shutWorker(this.conf, this.supervisorId, hashMap, this.workerThreadPids, this.cgroupManager);
        Iterator<String> it = hashMap.keySet().iterator();
        while (it.hasNext()) {
            map.remove(it.next());
        }
        return hashSet;
    }

    private void startNewWorkers(Set<Integer> set, Map<Integer, LocalAssignment> map) throws Exception {
        Map select_keys_pred = JStormUtils.select_keys_pred(set, map);
        HashMap hashMap = new HashMap();
        for (Map.Entry entry : select_keys_pred.entrySet()) {
            Integer num = (Integer) entry.getKey();
            LocalAssignment localAssignment = (LocalAssignment) entry.getValue();
            String uuid = UUID.randomUUID().toString();
            hashMap.put(num, uuid);
            try {
                StormConfig.worker_pids_root(this.conf, uuid);
                StringBuilder sb = new StringBuilder();
                sb.append("Launching worker with assiangment ");
                sb.append(localAssignment.toString());
                sb.append(" for the supervisor ");
                sb.append(this.supervisorId);
                sb.append(" on port ");
                sb.append(num);
                sb.append(" with id ");
                sb.append(uuid);
                LOG.info(sb);
                try {
                    String cluster_mode = StormConfig.cluster_mode(this.conf);
                    if (cluster_mode.equals("distributed")) {
                        launchWorker(this.conf, this.sharedContext, localAssignment.getTopologyId(), this.supervisorId, num, uuid, localAssignment);
                    } else if (cluster_mode.equals("local")) {
                        launchWorker(this.conf, this.sharedContext, localAssignment.getTopologyId(), this.supervisorId, num, uuid, this.workerThreadPids);
                    }
                } catch (Exception e) {
                    LOG.error("Failed to launchWorker workerId:" + uuid + ":" + num, e);
                    throw e;
                }
            } catch (IOException e2) {
                LOG.error("Failed to create " + uuid + " localdir", e2);
                throw e2;
            }
        }
        try {
            waitForWorkersLaunch(this.conf, hashMap.values());
        } catch (IOException e3) {
            LOG.error(e3 + " waitForWorkersLaunch failed");
        } catch (InterruptedException e4) {
            LOG.error(e4 + " waitForWorkersLaunch failed");
        }
    }
}
