package com.alibaba.jstorm.daemon.worker;

import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.IContext;
import backtype.storm.scheduler.WorkerSlot;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
import com.alibaba.jstorm.task.Assignment;
import com.alibaba.jstorm.utils.JStormUtils;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/alibaba/jstorm/daemon/worker/RefreshConnections.class */
public class RefreshConnections extends RunnableCallback {
    private static Logger LOG = Logger.getLogger(RefreshConnections.class);
    private WorkerData workerData;
    private AtomicBoolean active;
    private Map conf;
    private StormClusterState zkCluster;
    private String topologyId;
    private Set<Integer> outboundTasks;
    private ConcurrentHashMap<WorkerSlot, IConnection> nodeportSocket;
    private IContext context;
    private ConcurrentHashMap<Integer, WorkerSlot> taskNodeport;
    private Integer frequence;
    private String supervisorId;

    public RefreshConnections(WorkerData workerData, Set<Integer> set) {
        this.workerData = workerData;
        this.active = workerData.getActive();
        this.conf = workerData.getConf();
        this.zkCluster = workerData.getZkCluster();
        this.topologyId = workerData.getTopologyId();
        this.outboundTasks = set;
        this.nodeportSocket = workerData.getNodeportSocket();
        this.context = workerData.getContext();
        this.taskNodeport = workerData.getTaskNodeport();
        this.supervisorId = workerData.getSupervisorId();
        this.frequence = JStormUtils.parseInt(this.conf.get("task.refresh.poll.secs"), 5);
    }

    public void run() {
        if (this.active.get()) {
            try {
                synchronized (this) {
                    Assignment assignment_info = this.zkCluster.assignment_info(this.topologyId, this);
                    if (assignment_info == null) {
                        LOG.error("Failed to get Assignment of " + this.topologyId);
                        return;
                    }
                    Set<ResourceWorkerSlot> workers = assignment_info.getWorkers();
                    if (workers == null) {
                        LOG.error("Failed to get taskToResource of " + this.topologyId);
                        return;
                    }
                    this.workerData.getWorkerToResource().addAll(workers);
                    HashMap hashMap = new HashMap();
                    Map<String, String> nodeHost = assignment_info.getNodeHost();
                    HashSet<WorkerSlot> hashSet = new HashSet();
                    HashSet hashSet2 = new HashSet();
                    if (workers != null && this.outboundTasks != null) {
                        for (ResourceWorkerSlot resourceWorkerSlot : workers) {
                            if (this.supervisorId.equals(resourceWorkerSlot.getNodeId())) {
                                hashSet2.addAll(resourceWorkerSlot.getTasks());
                            }
                            for (Integer num : resourceWorkerSlot.getTasks()) {
                                if (this.outboundTasks.contains(num)) {
                                    hashMap.put(num, resourceWorkerSlot);
                                    hashSet.add(resourceWorkerSlot);
                                }
                            }
                        }
                    }
                    this.taskNodeport.putAll(hashMap);
                    this.workerData.setLocalNodeTasks(hashSet2);
                    Set<WorkerSlot> keySet = this.nodeportSocket.keySet();
                    HashSet<WorkerSlot> hashSet3 = new HashSet();
                    HashSet<WorkerSlot> hashSet4 = new HashSet();
                    for (WorkerSlot workerSlot : hashSet) {
                        if (!keySet.contains(workerSlot)) {
                            hashSet3.add(workerSlot);
                        }
                    }
                    for (WorkerSlot workerSlot2 : keySet) {
                        if (!hashSet.contains(workerSlot2)) {
                            hashSet4.add(workerSlot2);
                        }
                    }
                    for (WorkerSlot workerSlot3 : hashSet3) {
                        this.nodeportSocket.put(workerSlot3, this.context.connect(this.topologyId, nodeHost.get(workerSlot3.getNodeId()), workerSlot3.getPort()));
                        LOG.info("Add connection to " + workerSlot3);
                    }
                    for (WorkerSlot workerSlot4 : hashSet4) {
                        LOG.info("Remove connection to " + workerSlot4);
                        this.nodeportSocket.remove(workerSlot4).close();
                    }
                }
            } catch (Exception e) {
                LOG.error("Failed to refresh worker Connection", e);
                throw new RuntimeException(e);
            }
        }
    }

    public Object getResult() {
        if (this.active.get()) {
            return this.frequence;
        }
        return -1;
    }
}
