package com.alibaba.jstorm.daemon.worker;

import backtype.storm.generated.StormTopology;
import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.IContext;
import backtype.storm.messaging.TransportFactory;
import backtype.storm.scheduler.WorkerSlot;
import backtype.storm.utils.DisruptorQueue;
import backtype.storm.utils.Utils;
import backtype.storm.utils.WorkerClassLoader;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Cluster;
import com.alibaba.jstorm.cluster.ClusterState;
import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.daemon.nimbus.StatusType;
import com.alibaba.jstorm.daemon.worker.metrics.MetricReporter;
import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
import com.alibaba.jstorm.task.Assignment;
import com.alibaba.jstorm.task.TaskShutdownDameon;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.zk.ZkTool;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
import java.net.URL;
import java.security.InvalidParameterException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/alibaba/jstorm/daemon/worker/WorkerData.class */
public class WorkerData {
    private static Logger LOG = Logger.getLogger(WorkerData.class);
    private Map<Object, Object> conf;
    private Map<Object, Object> stormConf;
    private IContext context;
    private final String topologyId;
    private final String supervisorId;
    private final Integer port;
    private final String workerId;
    private ClusterState zkClusterstate;
    private StormClusterState zkCluster;
    private Set<Integer> taskids;
    private ConcurrentHashMap<WorkerSlot, IConnection> nodeportSocket;
    private ConcurrentHashMap<Integer, WorkerSlot> taskNodeport;
    private ConcurrentSkipListSet<ResourceWorkerSlot> workerToResource;
    private Set<Integer> localNodeTasks;
    private ConcurrentHashMap<Integer, DisruptorQueue> innerTaskTransfer;
    private ConcurrentHashMap<Integer, DisruptorQueue> deserializeQueues;
    private HashMap<Integer, String> tasksToComponent;
    private Map<String, List<Integer>> componentToSortedTasks;
    private Map<String, Object> defaultResources;
    private Map<String, Object> userResources;
    private Map<String, Object> executorData;
    private Map registeredMetrics;
    private StormTopology rawTopology;
    private StormTopology sysTopology;
    private ContextMaker contextMaker;
    private DisruptorQueue transferQueue;
    private DisruptorQueue sendingQueue;
    private List<TaskShutdownDameon> shutdownTasks;
    private MetricReporter metricReporter;
    private final WorkerHaltRunable workHalt = new WorkerHaltRunable();
    private AtomicBoolean active = new AtomicBoolean(true);
    private StatusType topologyStatus = StatusType.active;

    public WorkerData(Map map, IContext iContext, String str, String str2, int i, String str3, String str4) throws Exception {
        this.conf = map;
        this.context = iContext;
        this.topologyId = str;
        this.supervisorId = str2;
        this.port = Integer.valueOf(i);
        this.workerId = str3;
        if (StormConfig.cluster_mode(map).equals("distributed")) {
            JStormServerUtils.createPid(StormConfig.worker_pids_root(map, str3));
        }
        this.zkClusterstate = ZkTool.mk_distributed_cluster_state(map);
        this.zkCluster = Cluster.mk_storm_cluster_state(this.zkClusterstate);
        Map<? extends Object, ? extends Object> read_supervisor_topology_conf = StormConfig.read_supervisor_topology_conf(map, str);
        this.stormConf = new HashMap();
        this.stormConf.putAll(map);
        this.stormConf.putAll(read_supervisor_topology_conf);
        LOG.info("Worker Configuration " + this.stormConf);
        try {
            if (str4 != null) {
                String[] split = str4.split(":");
                HashSet hashSet = new HashSet();
                for (String str5 : split) {
                    if (!StringUtils.isBlank(str5)) {
                        hashSet.add(new URL("File:" + str5));
                    }
                }
                WorkerClassLoader.mkInstance((URL[]) hashSet.toArray(new URL[0]), ClassLoader.getSystemClassLoader(), ClassLoader.getSystemClassLoader().getParent(), ConfigExtension.isEnableTopologyClassLoader(this.stormConf));
            } else {
                WorkerClassLoader.mkInstance(new URL[0], ClassLoader.getSystemClassLoader(), ClassLoader.getSystemClassLoader().getParent(), ConfigExtension.isEnableTopologyClassLoader(this.stormConf));
            }
            if (this.context == null) {
                this.context = TransportFactory.makeContext(this.stormConf);
            }
            boolean isDisruptorUseSleep = ConfigExtension.isDisruptorUseSleep(this.stormConf);
            DisruptorQueue.setUseSleep(isDisruptorUseSleep);
            boolean topologyBufferSizeLimited = ConfigExtension.getTopologyBufferSizeLimited(this.stormConf);
            DisruptorQueue.setLimited(topologyBufferSizeLimited);
            LOG.info("Disruptor use sleep:" + isDisruptorUseSleep + ", limited size:" + topologyBufferSizeLimited);
            int intValue = Utils.getInt(map.get("topology.transfer.buffer.size")).intValue();
            WaitStrategy waitStrategy = (WaitStrategy) Utils.newInstance((String) map.get("topology.disruptor.wait.strategy"));
            this.transferQueue = DisruptorQueue.mkInstance("TotalTransfer", ProducerType.MULTI, intValue, waitStrategy);
            this.transferQueue.consumerStarted();
            this.sendingQueue = DisruptorQueue.mkInstance("TotalSending", ProducerType.MULTI, intValue, waitStrategy);
            this.sendingQueue.consumerStarted();
            this.nodeportSocket = new ConcurrentHashMap<>();
            this.taskNodeport = new ConcurrentHashMap<>();
            this.workerToResource = new ConcurrentSkipListSet<>();
            this.innerTaskTransfer = new ConcurrentHashMap<>();
            this.deserializeQueues = new ConcurrentHashMap<>();
            Assignment assignment_info = this.zkCluster.assignment_info(this.topologyId, null);
            if (assignment_info == null) {
                String str6 = "Failed to get Assignment of " + this.topologyId;
                LOG.error(str6);
                throw new RuntimeException(str6);
            }
            this.workerToResource.addAll(assignment_info.getWorkers());
            this.taskids = assignment_info.getCurrentWorkerTasks(this.supervisorId, i);
            if (this.taskids.size() == 0) {
                throw new RuntimeException("No tasks running current workers");
            }
            LOG.info("Current worker taskList:" + this.taskids);
            this.rawTopology = StormConfig.read_supervisor_topology_code(map, str);
            this.sysTopology = Common.system_topology(this.stormConf, this.rawTopology);
            generateMaps();
            this.contextMaker = new ContextMaker(this);
            this.metricReporter = new MetricReporter(this);
            LOG.info("Successfully create WorkerData");
        } catch (Exception e) {
            LOG.error("init jarClassLoader error!", e);
            throw new InvalidParameterException();
        }
    }

    private void generateMaps() throws Exception {
        this.tasksToComponent = Cluster.topology_task_info(this.zkCluster, this.topologyId);
        LOG.info("Map<taskId, component>:" + this.tasksToComponent);
        this.componentToSortedTasks = JStormUtils.reverse_map(this.tasksToComponent);
        Iterator<Map.Entry<String, List<Integer>>> it = this.componentToSortedTasks.entrySet().iterator();
        while (it.hasNext()) {
            Collections.sort(it.next().getValue());
        }
        this.defaultResources = new HashMap();
        this.userResources = new HashMap();
        this.executorData = new HashMap();
        this.registeredMetrics = new HashMap();
    }

    public Map<Object, Object> getConf() {
        return this.conf;
    }

    public AtomicBoolean getActive() {
        return this.active;
    }

    public void setActive(AtomicBoolean atomicBoolean) {
        this.active = atomicBoolean;
    }

    public StatusType getTopologyStatus() {
        return this.topologyStatus;
    }

    public void setTopologyStatus(StatusType statusType) {
        this.topologyStatus = statusType;
    }

    public Map<Object, Object> getStormConf() {
        return this.stormConf;
    }

    public IContext getContext() {
        return this.context;
    }

    public String getTopologyId() {
        return this.topologyId;
    }

    public String getSupervisorId() {
        return this.supervisorId;
    }

    public Integer getPort() {
        return this.port;
    }

    public String getWorkerId() {
        return this.workerId;
    }

    public ClusterState getZkClusterstate() {
        return this.zkClusterstate;
    }

    public StormClusterState getZkCluster() {
        return this.zkCluster;
    }

    public Set<Integer> getTaskids() {
        return this.taskids;
    }

    public ConcurrentHashMap<WorkerSlot, IConnection> getNodeportSocket() {
        return this.nodeportSocket;
    }

    public ConcurrentHashMap<Integer, WorkerSlot> getTaskNodeport() {
        return this.taskNodeport;
    }

    public ConcurrentSkipListSet<ResourceWorkerSlot> getWorkerToResource() {
        return this.workerToResource;
    }

    public ConcurrentHashMap<Integer, DisruptorQueue> getInnerTaskTransfer() {
        return this.innerTaskTransfer;
    }

    public ConcurrentHashMap<Integer, DisruptorQueue> getDeserializeQueues() {
        return this.deserializeQueues;
    }

    public HashMap<Integer, String> getTasksToComponent() {
        return this.tasksToComponent;
    }

    public StormTopology getRawTopology() {
        return this.rawTopology;
    }

    public StormTopology getSysTopology() {
        return this.sysTopology;
    }

    public ContextMaker getContextMaker() {
        return this.contextMaker;
    }

    public WorkerHaltRunable getWorkHalt() {
        return this.workHalt;
    }

    public DisruptorQueue getTransferQueue() {
        return this.transferQueue;
    }

    public DisruptorQueue getSendingQueue() {
        return this.sendingQueue;
    }

    public Map<String, List<Integer>> getComponentToSortedTasks() {
        return this.componentToSortedTasks;
    }

    public Map<String, Object> getDefaultResources() {
        return this.defaultResources;
    }

    public Map<String, Object> getUserResources() {
        return this.userResources;
    }

    public Map<String, Object> getExecutorData() {
        return this.executorData;
    }

    public Map getRegisteredMetrics() {
        return this.registeredMetrics;
    }

    public List<TaskShutdownDameon> getShutdownTasks() {
        return this.shutdownTasks;
    }

    public void setShutdownTasks(List<TaskShutdownDameon> list) {
        this.shutdownTasks = list;
    }

    public Set<Integer> getLocalNodeTasks() {
        return this.localNodeTasks;
    }

    public void setLocalNodeTasks(Set<Integer> set) {
        this.localNodeTasks = set;
    }

    public void setMetricsReporter(MetricReporter metricReporter) {
        this.metricReporter = metricReporter;
    }

    public MetricReporter getMetricsReporter() {
        return this.metricReporter;
    }
}
