package com.alibaba.jstorm.message.zeroMq;

import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.TaskMessage;
import backtype.storm.serialization.KryoTupleDeserializer;
import backtype.storm.utils.DisruptorQueue;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.callback.RunnableCallback;
import java.util.List;
import org.apache.log4j.Logger;
import org.zeromq.ZMQ;

/* loaded from: input_file:com/alibaba/jstorm/message/zeroMq/ZMQRecvConnection.class */
public class ZMQRecvConnection extends RunnableCallback implements IConnection {
    private static final Logger LOG = Logger.getLogger(ZMQRecvConnection.class);
    private ZMQ.Socket socket;
    private boolean closed = false;
    private DisruptorQueue recvQueue;

    public ZMQRecvConnection(ZMQ.Socket socket) {
        this.socket = socket;
        new AsyncLoopThread(this, true, 10, true);
    }

    public TaskMessage recv(int i) {
        byte[] recv = ZeroMq.recv(this.socket, i);
        if (recv == null || recv.length <= 4) {
            return null;
        }
        return new TaskMessage(KryoTupleDeserializer.deserializeTaskId(recv), recv);
    }

    public void close() {
        this.socket.close();
        this.closed = true;
    }

    public boolean isClosed() {
        return this.closed;
    }

    public void send(List<TaskMessage> list) {
        throw new UnsupportedOperationException("Server connection should not send any messages");
    }

    public void send(TaskMessage taskMessage) {
        throw new UnsupportedOperationException("Server connection should not send any messages");
    }

    public void registerQueue(DisruptorQueue disruptorQueue) {
        this.recvQueue = disruptorQueue;
    }

    public void enqueue(TaskMessage taskMessage) {
        if (taskMessage != null) {
            this.recvQueue.publish(taskMessage);
        }
    }

    public void run() {
        LOG.info("Successfully start ZMQ Recv thread");
        while (!isClosed()) {
            try {
                enqueue(recv(0));
            } catch (Exception e) {
                LOG.warn("ZMQ Recv thread receive error", e);
            }
        }
        LOG.info("Successfully shutdown ZMQ Recv thread");
    }

    public Object getResult() {
        LOG.info("Begin to shutdown ZMQ Recv thread");
        return -1;
    }
}
