/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.mqtt.inbound;

import java.util.Arrays;
import java.util.Date;
import java.util.concurrent.ScheduledFuture;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.integration.acks.SimpleAcknowledgment;
import org.springframework.integration.mqtt.core.ConsumerStopAction;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
import org.springframework.integration.mqtt.inbound.AbstractMqttMessageDrivenChannelAdapter;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.util.Assert;

public class MqttPahoMessageDrivenChannelAdapter
extends AbstractMqttMessageDrivenChannelAdapter
implements MqttCallback,
ApplicationEventPublisherAware {
    public static final long DEFAULT_COMPLETION_TIMEOUT = 30000L;
    public static final long DISCONNECT_COMPLETION_TIMEOUT = 5000L;
    private static final int DEFAULT_RECOVERY_INTERVAL = 10000;
    private final MqttPahoClientFactory clientFactory;
    private int recoveryInterval = 10000;
    private long completionTimeout = 30000L;
    private long disconnectCompletionTimeout = 5000L;
    private boolean manualAcks;
    private volatile IMqttClient client;
    private volatile ScheduledFuture<?> reconnectFuture;
    private volatile boolean connected;
    private volatile boolean cleanSession;
    private volatile ConsumerStopAction consumerStopAction;
    private ApplicationEventPublisher applicationEventPublisher;

    public MqttPahoMessageDrivenChannelAdapter(String url, String clientId, MqttPahoClientFactory clientFactory, String ... topic) {
        super(url, clientId, topic);
        this.clientFactory = clientFactory;
    }

    public MqttPahoMessageDrivenChannelAdapter(String clientId, MqttPahoClientFactory clientFactory, String ... topic) {
        super(null, clientId, topic);
        this.clientFactory = clientFactory;
    }

    public MqttPahoMessageDrivenChannelAdapter(String url, String clientId, String ... topic) {
        this(url, clientId, new DefaultMqttPahoClientFactory(), topic);
    }

    public synchronized void setCompletionTimeout(long completionTimeout) {
        this.completionTimeout = completionTimeout;
    }

    public synchronized void setDisconnectCompletionTimeout(long completionTimeout) {
        this.disconnectCompletionTimeout = completionTimeout;
    }

    public synchronized void setRecoveryInterval(int recoveryInterval) {
        this.recoveryInterval = recoveryInterval;
    }

    public void setManualAcks(boolean manualAcks) {
        this.manualAcks = manualAcks;
    }

    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.applicationEventPublisher = applicationEventPublisher;
    }

    protected void doStart() {
        Assert.state((this.getTaskScheduler() != null ? 1 : 0) != 0, (String)"A 'taskScheduler' is required");
        try {
            this.connectAndSubscribe();
        }
        catch (Exception e) {
            this.logger.error((Object)"Exception while connecting and subscribing, retrying", (Throwable)e);
            this.scheduleReconnect();
        }
    }

    protected synchronized void doStop() {
        this.cancelReconnect();
        if (this.client != null) {
            try {
                if (this.consumerStopAction.equals((Object)ConsumerStopAction.UNSUBSCRIBE_ALWAYS) || this.consumerStopAction.equals((Object)ConsumerStopAction.UNSUBSCRIBE_CLEAN) && this.cleanSession) {
                    this.client.unsubscribe(this.getTopic());
                }
            }
            catch (MqttException e) {
                this.logger.error((Object)"Exception while unsubscribing", (Throwable)e);
            }
            try {
                this.client.disconnectForcibly(this.disconnectCompletionTimeout);
            }
            catch (MqttException e) {
                this.logger.error((Object)"Exception while disconnecting", (Throwable)e);
            }
            this.client.setCallback(null);
            try {
                this.client.close();
            }
            catch (MqttException e) {
                this.logger.error((Object)"Exception while closing", (Throwable)e);
            }
            this.connected = false;
            this.client = null;
        }
    }

    @Override
    public void addTopic(String topic, int qos) {
        this.topicLock.lock();
        try {
            super.addTopic(topic, qos);
            if (this.client != null && this.client.isConnected()) {
                this.client.subscribe(topic, qos);
            }
        }
        catch (MqttException e) {
            super.removeTopic(topic);
            throw new MessagingException("Failed to subscribe to topic " + topic, (Throwable)e);
        }
        finally {
            this.topicLock.unlock();
        }
    }

    @Override
    public void removeTopic(String ... topic) {
        this.topicLock.lock();
        try {
            if (this.client != null && this.client.isConnected()) {
                this.client.unsubscribe(topic);
            }
            super.removeTopic(topic);
        }
        catch (MqttException e) {
            throw new MessagingException("Failed to unsubscribe from topic " + Arrays.asList(topic), (Throwable)e);
        }
        finally {
            this.topicLock.unlock();
        }
    }

    private synchronized void connectAndSubscribe() throws MqttException {
        MqttConnectOptions connectionOptions = this.clientFactory.getConnectionOptions();
        this.cleanSession = connectionOptions.isCleanSession();
        this.consumerStopAction = this.clientFactory.getConsumerStopAction();
        if (this.consumerStopAction == null) {
            this.consumerStopAction = ConsumerStopAction.UNSUBSCRIBE_CLEAN;
        }
        Assert.state((this.getUrl() != null || connectionOptions.getServerURIs() != null ? 1 : 0) != 0, (String)"If no 'url' provided, connectionOptions.getServerURIs() must not be null");
        this.client = this.clientFactory.getClientInstance(this.getUrl(), this.getClientId());
        this.client.setCallback((MqttCallback)this);
        if (this.client instanceof MqttClient) {
            ((MqttClient)this.client).setTimeToWait(this.completionTimeout);
        }
        this.topicLock.lock();
        Object[] topics = this.getTopic();
        try {
            this.client.connect(connectionOptions);
            this.client.setManualAcks(this.manualAcks);
            int[] requestedQos = this.getQos();
            int[] grantedQos = Arrays.copyOf(requestedQos, requestedQos.length);
            this.client.subscribe((String[])topics, grantedQos);
            for (int i = 0; i < requestedQos.length; ++i) {
                if (grantedQos[i] == requestedQos[i]) continue;
                if (this.logger.isWarnEnabled()) {
                    this.logger.warn((Object)("Granted QOS different to Requested QOS; topics: " + Arrays.toString(topics) + " requested: " + Arrays.toString(requestedQos) + " granted: " + Arrays.toString(grantedQos)));
                }
                break;
            }
        }
        catch (MqttException e) {
            if (this.applicationEventPublisher != null) {
                this.applicationEventPublisher.publishEvent((ApplicationEvent)new MqttConnectionFailedEvent((Object)this, e));
            }
            this.logger.error((Object)("Error connecting or subscribing to " + Arrays.toString(topics)), (Throwable)e);
            this.client.disconnectForcibly(this.disconnectCompletionTimeout);
            try {
                this.client.setCallback(null);
                this.client.close();
            }
            catch (MqttException mqttException) {
                // empty catch block
            }
            this.client = null;
            throw e;
        }
        finally {
            this.topicLock.unlock();
        }
        if (this.client.isConnected()) {
            this.connected = true;
            String message = "Connected and subscribed to " + Arrays.toString(topics);
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)message);
            }
            if (this.applicationEventPublisher != null) {
                this.applicationEventPublisher.publishEvent((ApplicationEvent)new MqttSubscribedEvent((Object)this, message));
            }
        }
    }

    private synchronized void cancelReconnect() {
        if (this.reconnectFuture != null) {
            this.reconnectFuture.cancel(false);
            this.reconnectFuture = null;
        }
    }

    private synchronized void scheduleReconnect() {
        this.cancelReconnect();
        try {
            this.reconnectFuture = this.getTaskScheduler().schedule(() -> {
                try {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug((Object)"Attempting reconnect");
                    }
                    MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = this;
                    synchronized (mqttPahoMessageDrivenChannelAdapter) {
                        if (!this.connected) {
                            this.connectAndSubscribe();
                            this.reconnectFuture = null;
                        }
                    }
                }
                catch (MqttException e) {
                    this.logger.error((Object)"Exception while connecting and subscribing", (Throwable)e);
                    this.scheduleReconnect();
                }
            }, new Date(System.currentTimeMillis() + (long)this.recoveryInterval));
        }
        catch (Exception e) {
            this.logger.error((Object)"Failed to schedule reconnect", (Throwable)e);
        }
    }

    public synchronized void connectionLost(Throwable cause) {
        if (this.isRunning()) {
            this.logger.error((Object)("Lost connection: " + cause.getMessage() + "; retrying..."));
            this.connected = false;
            if (this.client != null) {
                try {
                    this.client.setCallback(null);
                    this.client.close();
                }
                catch (MqttException mqttException) {
                    // empty catch block
                }
            }
            this.client = null;
            this.scheduleReconnect();
            if (this.applicationEventPublisher != null) {
                this.applicationEventPublisher.publishEvent((ApplicationEvent)new MqttConnectionFailedEvent((Object)this, cause));
            }
        }
    }

    public void messageArrived(String topic, MqttMessage mqttMessage) {
        AbstractIntegrationMessageBuilder<?> builder = this.getConverter().toMessageBuilder(topic, mqttMessage);
        if (this.manualAcks) {
            builder.setHeader("acknowledgmentCallback", (Object)new AcknowledgmentImpl(mqttMessage.getId(), mqttMessage.getQos(), this.client));
        }
        Message message = builder.build();
        try {
            this.sendMessage(message);
        }
        catch (RuntimeException e) {
            this.logger.error((Object)("Unhandled exception for " + message.toString()), (Throwable)e);
            throw e;
        }
    }

    public void deliveryComplete(IMqttDeliveryToken token) {
    }

    private static class AcknowledgmentImpl
    implements SimpleAcknowledgment {
        private final int id;
        private final int qos;
        private final IMqttClient ackClient;

        AcknowledgmentImpl(int id, int qos, IMqttClient client) {
            this.id = id;
            this.qos = qos;
            this.ackClient = client;
        }

        public void acknowledge() {
            if (this.ackClient != null) {
                try {
                    this.ackClient.messageArrivedComplete(this.id, this.qos);
                }
                catch (MqttException e) {
                    throw new IllegalStateException(e);
                }
            } else {
                throw new IllegalStateException("Client has changed");
            }
        }
    }
}

