package com.alibaba.jstorm.message.zeroMq;

import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.TaskMessage;
import backtype.storm.utils.DisruptorQueue;
import com.alibaba.jstorm.metric.JStormHistogram;
import com.alibaba.jstorm.metric.JStormTimer;
import com.alibaba.jstorm.metric.Metrics;
import com.alibaba.jstorm.utils.JStormServerUtils;
import java.util.Iterator;
import java.util.List;
import org.zeromq.ZMQ;

/* loaded from: input_file:com/alibaba/jstorm/message/zeroMq/ZMQSendConnection.class */
public class ZMQSendConnection implements IConnection {
    private ZMQ.Socket socket;
    private boolean closed = false;
    private JStormTimer timer;
    private JStormHistogram histogram;
    private String prefix;

    public ZMQSendConnection(ZMQ.Socket socket, String str, int i) {
        this.socket = socket;
        this.prefix = JStormServerUtils.getName(str, i);
        this.timer = Metrics.registerTimer(this.prefix, "ZMQ_Send_Time", (String) null, Metrics.MetricType.WORKER);
        this.histogram = Metrics.registerHistograms(this.prefix, "ZMQ_Send_MSG_Size", (String) null, Metrics.MetricType.WORKER);
    }

    public void close() {
        this.socket.close();
        this.closed = true;
    }

    public boolean isClosed() {
        return this.closed;
    }

    public void registerQueue(DisruptorQueue disruptorQueue) {
        throw new UnsupportedOperationException("recvTask() Client connection should not receive any messages");
    }

    public void enqueue(TaskMessage taskMessage) {
        throw new UnsupportedOperationException("recvTask() Client connection should not receive any messages");
    }

    public void send(List<TaskMessage> list) {
        this.timer.start();
        try {
            Iterator<TaskMessage> it = list.iterator();
            while (it.hasNext()) {
                ZeroMq.send(this.socket, it.next().message());
            }
        } finally {
            this.timer.stop();
            this.histogram.update(list.size());
        }
    }

    public void send(TaskMessage taskMessage) {
        this.timer.start();
        try {
            ZeroMq.send(this.socket, taskMessage.message());
            this.timer.stop();
            this.histogram.update(1);
        } catch (Throwable th) {
            this.timer.stop();
            this.histogram.update(1);
            throw th;
        }
    }

    public TaskMessage recv(int i) {
        throw new UnsupportedOperationException("recvTask() Client connection should not receive any messages");
    }
}
