package com.alibaba.jstorm.message.zeroMq;

import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.IContext;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.utils.JStormUtils;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZMQ;

/* loaded from: input_file:com/alibaba/jstorm/message/zeroMq/MQContext.class */
public class MQContext implements IContext {
    protected static final Logger LOG = LoggerFactory.getLogger(MQContext.class);
    protected Map storm_conf;
    protected int zmqThreads;
    protected int linger_ms;
    protected boolean ipc;
    protected boolean virtportZmq = false;
    protected int maxQueueMsg;
    private ZMQ.Context context;

    public void prepare(Map map) {
        this.storm_conf = map;
        this.zmqThreads = JStormUtils.parseInt(map.get("zmq.threads")).intValue();
        this.linger_ms = JStormUtils.parseInt(map.get("zmq.linger.millis")).intValue();
        this.ipc = StormConfig.cluster_mode(map).equals("local");
        this.virtportZmq = JStormUtils.parseBoolean(map.get("storm.local.mode.zmq"), false);
        this.maxQueueMsg = JStormUtils.parseInt(map.get("zmq.hwm"), 1000).intValue();
        init();
        LOG.info("MQContext prepare done...");
    }

    protected void init() {
        this.context = ZeroMq.context(this.zmqThreads);
    }

    protected MQContext() {
    }

    public IConnection bind(String str, int i) {
        return zmq_bind(true, i);
    }

    protected IConnection zmq_bind(boolean z, int i) {
        String str = z ? this.ipc ? "ipc://" + i + ".ipc" : "tcp://*:" + i : "inproc://" + i;
        ZMQ.Socket socket = ZeroMq.socket(this.context, ZeroMq.pull);
        ZeroMq.bind(socket, str);
        ZeroMq.set_hwm(socket, this.maxQueueMsg);
        LOG.info("Create zmq receiver {}", str);
        return new ZMQRecvConnection(socket);
    }

    public IConnection connect(String str, String str2, int i) {
        return zmq_connect(true, str2, i);
    }

    protected IConnection zmq_connect(boolean z, String str, int i) {
        String str2 = z ? this.ipc ? "ipc://" + i + ".ipc" : "tcp://" + str + ":" + i : "inproc://" + i;
        ZMQ.Socket connect = ZeroMq.connect(ZeroMq.set_linger(ZeroMq.socket(this.context, ZeroMq.push), this.linger_ms), str2);
        ZeroMq.set_hwm(connect, this.maxQueueMsg);
        LOG.info("Create zmq sender {}", str2);
        return new ZMQSendConnection(connect, str, i);
    }

    public void term() {
        LOG.info("ZMQ context terminates ");
        this.context.term();
    }
}
