package com.alibaba.jstorm.daemon.nimbus;

import backtype.storm.generated.Nimbus;
import backtype.storm.scheduler.INimbus;
import backtype.storm.utils.BufferFileInputStream;
import backtype.storm.utils.TimeCacheMap;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.daemon.supervisor.Httpserver;
import com.alibaba.jstorm.daemon.worker.hearbeat.SyncContainerHb;
import com.alibaba.jstorm.daemon.worker.metrics.AlimonitorClient;
import com.alibaba.jstorm.daemon.worker.metrics.MetricSendClient;
import com.alibaba.jstorm.daemon.worker.metrics.UploadMetricFromZK;
import com.alibaba.jstorm.schedule.CleanRunnable;
import com.alibaba.jstorm.schedule.FollowerRunnable;
import com.alibaba.jstorm.schedule.MonitorRunnable;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.SmartThread;
import java.io.IOException;
import java.nio.channels.Channel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
import org.apache.thrift7.protocol.TBinaryProtocol;
import org.apache.thrift7.server.THsHaServer;
import org.apache.thrift7.transport.TNonblockingServerSocket;
import org.apache.thrift7.transport.TTransportException;

/* loaded from: input_file:com/alibaba/jstorm/daemon/nimbus/NimbusServer.class */
public class NimbusServer {
    private static final Logger LOG = Logger.getLogger(NimbusServer.class);
    private NimbusData data;
    private ServiceHandler serviceHandler;
    private TopologyAssign topologyAssign;
    private THsHaServer thriftServer;
    private FollowerRunnable follower;
    private Httpserver hs;
    private UploadMetricFromZK uploadMetric;
    private List<SmartThread> smartThreads = new ArrayList();
    private AtomicBoolean isShutdown = new AtomicBoolean(false);

    public static void main(String[] strArr) throws Exception {
        Map readStormConfig = Utils.readStormConfig();
        JStormServerUtils.startTaobaoJvmMonitor();
        new NimbusServer().launchServer(readStormConfig, new DefaultInimbus());
    }

    private void createPid(Map map) throws Exception {
        JStormServerUtils.createPid(StormConfig.masterPids(map));
    }

    private void launchServer(Map map, INimbus iNimbus) {
        LOG.info("Begin to start nimbus with conf " + map);
        try {
            try {
                StormConfig.validate_distributed_mode(map);
                createPid(map);
                initShutdownHook();
                iNimbus.prepare(map, StormConfig.masterInimbus(map));
                this.data = createNimbusData(map, iNimbus);
                initFollowerThread(map);
                this.hs = new Httpserver(ConfigExtension.getNimbusDeamonHttpserverPort(map).intValue(), map);
                this.hs.start();
                initContainerHBThread(map);
                while (!this.data.isLeader()) {
                    Utils.sleep(5000L);
                }
                initUploadMetricThread(this.data);
                init(map);
                cleanup();
            } catch (Throwable th) {
                LOG.error("Fail to run nimbus ", th);
                cleanup();
            }
            LOG.info("Quit nimbus");
        } catch (Throwable th2) {
            cleanup();
            throw th2;
        }
    }

    public ServiceHandler launcherLocalServer(Map map, INimbus iNimbus) throws Exception {
        LOG.info("Begin to start nimbus on local model");
        StormConfig.validate_local_mode(map);
        iNimbus.prepare(map, StormConfig.masterInimbus(map));
        this.data = createNimbusData(map, iNimbus);
        init(map);
        return this.serviceHandler;
    }

    private void initContainerHBThread(Map map) throws IOException {
        SmartThread mkNimbusInstance = SyncContainerHb.mkNimbusInstance(map);
        if (mkNimbusInstance != null) {
            this.smartThreads.add(mkNimbusInstance);
        }
    }

    private void init(Map map) throws Exception {
        NimbusUtils.cleanupCorruptTopologies(this.data);
        initTopologyAssign();
        initTopologyStatus();
        initCleaner(map);
        this.serviceHandler = new ServiceHandler(this.data);
        if (this.data.isLocalMode()) {
            return;
        }
        initMonitor(map);
        initThrift(map);
    }

    private NimbusData createNimbusData(Map map, INimbus iNimbus) throws Exception {
        TimeCacheMap.ExpiredCallback<Object, Object> expiredCallback = new TimeCacheMap.ExpiredCallback<Object, Object>() { // from class: com.alibaba.jstorm.daemon.nimbus.NimbusServer.1
            public void expire(Object obj, Object obj2) {
                try {
                    NimbusServer.LOG.info("Close file " + String.valueOf(obj));
                    if (obj2 != null) {
                        if (obj2 instanceof Channel) {
                            ((Channel) obj2).close();
                        } else if (obj2 instanceof BufferFileInputStream) {
                            ((BufferFileInputStream) obj2).close();
                        }
                    }
                } catch (IOException e) {
                    NimbusServer.LOG.error(e.getMessage(), e);
                }
            }
        };
        int intValue = JStormUtils.parseInt(map.get("nimbus.file.copy.expiration.secs"), 30).intValue();
        return new NimbusData(map, new TimeCacheMap(intValue, expiredCallback), new TimeCacheMap(intValue, expiredCallback), iNimbus);
    }

    private void initTopologyAssign() {
        this.topologyAssign = TopologyAssign.getInstance();
        this.topologyAssign.init(this.data);
    }

    private void initTopologyStatus() throws Exception {
        List<String> active_storms = this.data.getStormClusterState().active_storms();
        if (active_storms != null) {
            Iterator<String> it = active_storms.iterator();
            while (it.hasNext()) {
                NimbusUtils.transition(this.data, it.next(), false, StatusType.startup, new Object[0]);
            }
        }
        LOG.info("Successfully init topology status");
    }

    private void initMonitor(Map map) {
        this.data.getScheduExec().scheduleAtFixedRate(new MonitorRunnable(this.data), 0L, JStormUtils.parseInt(map.get("nimbus.monitor.freq.secs"), 10).intValue(), TimeUnit.SECONDS);
        LOG.info("Successfully init Monitor thread");
    }

    private void initCleaner(Map map) throws IOException {
        ScheduledExecutorService scheduExec = this.data.getScheduExec();
        String masterInbox = StormConfig.masterInbox(map);
        scheduExec.scheduleAtFixedRate(new CleanRunnable(masterInbox, JStormUtils.parseInt(map.get("nimbus.inbox.jar.expiration.secs"), 3600).intValue()), 0L, JStormUtils.parseInt(map.get("nimbus.cleanup.inbox.freq.secs"), 600).intValue(), TimeUnit.SECONDS);
        LOG.info("Successfully init " + masterInbox + " cleaner");
    }

    private void initThrift(Map map) throws TTransportException {
        TNonblockingServerSocket tNonblockingServerSocket = new TNonblockingServerSocket(JStormUtils.parseInt(map.get("nimbus.thrift.port")).intValue());
        Integer parseInt = JStormUtils.parseInt(map.get("nimbus.thrift.max_buffer_size"));
        THsHaServer.Args args = new THsHaServer.Args(tNonblockingServerSocket);
        args.workerThreads(64);
        args.protocolFactory(new TBinaryProtocol.Factory(false, true, parseInt.intValue()));
        args.processor(new Nimbus.Processor(this.serviceHandler));
        args.maxReadBufferBytes = parseInt.intValue();
        this.thriftServer = new THsHaServer(args);
        LOG.info("Successfully started nimbus: started Thrift server...");
        this.thriftServer.serve();
    }

    private void initFollowerThread(Map map) {
        this.follower = new FollowerRunnable(this.data, 5000);
        Thread thread = new Thread(this.follower);
        thread.setDaemon(true);
        thread.start();
        LOG.info("Successfully init Follower thread");
    }

    private void initShutdownHook() {
        Runtime.getRuntime().addShutdownHook(new Thread() { // from class: com.alibaba.jstorm.daemon.nimbus.NimbusServer.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                NimbusServer.this.cleanup();
            }
        });
    }

    private void initUploadMetricThread(NimbusData nimbusData) {
        ScheduledExecutorService scheduExec = nimbusData.getScheduExec();
        this.uploadMetric = new UploadMetricFromZK(nimbusData, ConfigExtension.isAlimonitorMetricsPost(nimbusData.getConf()) ? new AlimonitorClient(AlimonitorClient.DEFAUT_ADDR, AlimonitorClient.DEFAULT_PORT, true) : new MetricSendClient());
        scheduExec.scheduleWithFixedDelay(this.uploadMetric, 120L, 60L, TimeUnit.SECONDS);
        LOG.info("Successfully init metrics uploading thread");
    }

    public void cleanup() {
        if (!this.isShutdown.compareAndSet(false, true)) {
            LOG.info("Notify to quit nimbus");
            return;
        }
        LOG.info("Begin to shutdown nimbus");
        for (SmartThread smartThread : this.smartThreads) {
            smartThread.cleanup();
            JStormUtils.sleepMs(10L);
            smartThread.interrupt();
            try {
                smartThread.join();
            } catch (InterruptedException e) {
                LOG.error("join thread", e);
            }
        }
        if (this.serviceHandler != null) {
            this.serviceHandler.shutdown();
        }
        if (this.topologyAssign != null) {
            this.topologyAssign.cleanup();
            LOG.info("Successfully shutdown TopologyAssign thread");
        }
        if (this.follower != null) {
            this.follower.clean();
            LOG.info("Successfully shutdown follower thread");
        }
        if (this.uploadMetric != null) {
            this.uploadMetric.clean();
            LOG.info("Successfully shutdown UploadMetric thread");
        }
        if (this.data != null) {
            this.data.cleanup();
            LOG.info("Successfully shutdown NimbusData");
        }
        if (this.thriftServer != null) {
            this.thriftServer.stop();
            LOG.info("Successfully shutdown thrift server");
        }
        if (this.hs != null) {
            this.hs.shutdown();
            LOG.info("Successfully shutdown httpserver");
        }
        LOG.info("Successfully shutdown nimbus");
        JStormUtils.halt_process(0, "!!!Shutdown!!!");
    }
}
