package com.alibaba.jstorm.schedule;

import backtype.storm.utils.Utils;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Cluster;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.daemon.nimbus.NimbusData;
import com.alibaba.jstorm.task.Assignment;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.NetWorkUtils;
import com.alibaba.jstorm.utils.PathUtils;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
import org.apache.thrift7.TException;

/* loaded from: input_file:com/alibaba/jstorm/schedule/FollowerRunnable.class */
public class FollowerRunnable implements Runnable {
    private static final Logger LOG = Logger.getLogger(FollowerRunnable.class);
    private NimbusData data;
    private int sleepTime;
    private volatile boolean state = true;
    private RunnableCallback callback;
    private final String hostPort;

    public FollowerRunnable(final NimbusData nimbusData, int i) {
        this.data = nimbusData;
        this.sleepTime = i;
        if (ConfigExtension.isNimbusUseIp(nimbusData.getConf())) {
            this.hostPort = NetWorkUtils.ip() + ":" + String.valueOf(Utils.getInt(nimbusData.getConf().get("nimbus.thrift.port")));
        } else {
            this.hostPort = NetWorkUtils.hostname() + ":" + String.valueOf(Utils.getInt(nimbusData.getConf().get("nimbus.thrift.port")));
        }
        try {
            tryToBeLeader(nimbusData.getConf());
            try {
                nimbusData.getStormClusterState().register_nimbus_host(this.hostPort);
                this.callback = new RunnableCallback() { // from class: com.alibaba.jstorm.schedule.FollowerRunnable.1
                    public void run() {
                        if (nimbusData.isLeader()) {
                            return;
                        }
                        FollowerRunnable.this.check();
                    }
                };
            } catch (Exception e) {
                LOG.error("register nimbus host fail!", e);
                throw new RuntimeException();
            }
        } catch (Exception e2) {
            LOG.error("try to be leader error.", e2);
            throw new RuntimeException(e2);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        LOG.info("Follower Thread starts!");
        while (this.state) {
            try {
                Thread.sleep(this.sleepTime);
                if (this.data.isLeader()) {
                    this.data.getStormClusterState().unregister_nimbus_host(this.hostPort);
                    checkOwnMaster();
                } else if (this.data.getStormClusterState().leader_existed()) {
                    check();
                    this.data.getStormClusterState().update_follower_hb(this.hostPort, this.data.uptime());
                } else {
                    tryToBeLeader(this.data.getConf());
                }
            } catch (InterruptedException e) {
            } catch (Exception e2) {
                if (this.state) {
                    LOG.error("Unknow exception ", e2);
                }
            }
        }
        LOG.info("Follower Thread has closed!");
    }

    public void clean() {
        this.state = false;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void check() {
        StormClusterState stormClusterState = this.data.getStormClusterState();
        try {
            List<String> read_dir_contents = PathUtils.read_dir_contents(StormConfig.masterStormdistRoot(this.data.getConf()));
            List<String> assignments = stormClusterState.assignments(this.callback);
            ArrayList<String> arrayList = new ArrayList();
            for (String str : read_dir_contents) {
                if (assignments.contains(str)) {
                    arrayList.add(str);
                }
            }
            for (String str2 : arrayList) {
                assignments.remove(str2);
                read_dir_contents.remove(str2);
            }
            Iterator it = read_dir_contents.iterator();
            while (it.hasNext()) {
                deleteLocalTopology((String) it.next());
            }
            for (String str3 : assignments) {
                downloadCodeFromMaster(stormClusterState.assignment_info(str3, null), str3);
            }
        } catch (IOException e) {
            LOG.error("Get stormdist dir error!", e);
        } catch (Exception e2) {
            LOG.error("Check error!", e2);
        }
    }

    private void deleteLocalTopology(String str) throws IOException {
        String masterStormdistRoot = StormConfig.masterStormdistRoot(this.data.getConf(), str);
        try {
            PathUtils.rmr(masterStormdistRoot);
            LOG.info("delete:" + masterStormdistRoot + "successfully!");
        } catch (IOException e) {
            LOG.error("delete:" + masterStormdistRoot + "fail!", e);
        }
    }

    private void downloadCodeFromMaster(Assignment assignment, String str) throws IOException, TException {
        try {
            String masterStormdistRoot = StormConfig.masterStormdistRoot(this.data.getConf(), str);
            String str2 = StormConfig.masterInbox(this.data.getConf()) + Cluster.ZK_SEPERATOR + UUID.randomUUID().toString();
            JStormServerUtils.downloadCodeFromMaster(this.data.getConf(), str2, assignment.getMasterCodeDir(), str, false);
            FileUtils.moveDirectory(new File(str2), new File(masterStormdistRoot));
            LOG.info("Finished downloading code for topology id " + str + " from " + assignment.getMasterCodeDir());
        } catch (TException e) {
            LOG.error(e + " downloadStormCode failed topologyId:" + str + "masterCodeDir:" + assignment.getMasterCodeDir());
            throw e;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void tryToBeLeader(final Map map) throws Exception {
        this.data.setLeader(this.data.getStormClusterState().try_to_be_leader(Cluster.MASTER_SUBTREE, this.hostPort, new RunnableCallback() { // from class: com.alibaba.jstorm.schedule.FollowerRunnable.2
            public void run() {
                try {
                    FollowerRunnable.this.tryToBeLeader(map);
                } catch (Exception e) {
                    FollowerRunnable.LOG.error("To be master error", e);
                    JStormUtils.halt_process(30, "Cant't to be master" + e.getMessage());
                }
            }
        }));
    }

    private void checkOwnMaster() throws Exception {
        StormClusterState stormClusterState = this.data.getStormClusterState();
        int i = 0;
        while (i < 10) {
            if (stormClusterState.leader_existed()) {
                String str = stormClusterState.get_leader_host();
                if (this.hostPort.equals(str)) {
                    return;
                } else {
                    LOG.warn("Current Nimbus has start thrift, but fail to own zk master :" + str);
                }
            }
            i++;
            JStormUtils.sleepMs(this.sleepTime);
        }
        LOG.error("Current Nimubs fail to own nimbus_master, should halt process");
        JStormUtils.halt_process(0, "Current Nimubs fail to own nimbus_master, should halt process");
    }
}
