From 54f110d05cfb626fdd704569ba0c4ed90fb8e92a Mon Sep 17 00:00:00 2001
From: James Sutton This class implements the non-blocking {@link IMqttAsyncClient} client interface
- * allowing applications to initiate MQTT actions and then carry on working while the
- * MQTT action completes on a background thread.
- * This implementation is compatible with all Java SE runtimes from 1.7 and up.
+ *
+ * This class implements the non-blocking {@link IMqttAsyncClient} client
+ * interface allowing applications to initiate MQTT actions and then carry on
+ * working while the MQTT action completes on a background thread. This
+ * implementation is compatible with all Java SE runtimes from 1.7 and up.
+ *
+ * An application can connect to an MQTT server using:
* An application can connect to an MQTT server using: To enable messages to be delivered even across network and client restarts
- * messages need to be safely stored until the message has been delivered at the requested
- * quality of service. A pluggable persistence mechanism is provided to store the messages.
+ *
+ * To enable messages to be delivered even across network and client restarts
+ * messages need to be safely stored until the message has been delivered at the
+ * requested quality of service. A pluggable persistence mechanism is provided
+ * to store the messages.
* By default {@link MqttDefaultFilePersistence} is used to store messages to a file.
- * If persistence is set to null then messages are stored in memory and hence can be lost
- * if the client, Java runtime or device shuts down.
+ *
+ * By default {@link MqttDefaultFilePersistence} is used to store messages to a
+ * file. If persistence is set to null then messages are stored in memory and
+ * hence can be lost if the client, Java runtime or device shuts down.
* If connecting with {@link MqttConnectOptions#setCleanSession(boolean)} set to true it
- * is safe to use memory persistence as all state is cleared when a client disconnects. If
- * connecting with cleanSession set to false in order to provide reliable message delivery
- * then a persistent message store such as the default one should be used.
+ *
+ * If connecting with {@link MqttConnectOptions#setCleanSession(boolean)} set to
+ * true it is safe to use memory persistence as all state is cleared when a
+ * client disconnects. If connecting with cleanSession set to false in order to
+ * provide reliable message delivery then a persistent message store such as the
+ * default one should be used.
* The message store interface is pluggable. Different stores can be used by implementing
- * the {@link MqttClientPersistence} interface and passing it to the clients constructor.
+ *
+ * The message store interface is pluggable. Different stores can be used by
+ * implementing the {@link MqttClientPersistence} interface and passing it to
+ * the clients constructor.
*
- * The address of a server can be specified on the constructor. Alternatively
- * a list containing one or more servers can be specified using the
- * {@link MqttConnectOptions#setServerURIs(String[]) setServerURIs} method
- * on MqttConnectOptions.
+ * The address of a server can be specified on the constructor.
+ * Alternatively a list containing one or more servers can be specified
+ * using the {@link MqttConnectOptions#setServerURIs(String[])
+ * setServerURIs} method on MqttConnectOptions.
*
- * The
+ * The The address of the server to connect to is specified as a URI. Two types of
- * connection are supported
+ * The address of the server to connect to is specified as a URI. Two types
+ * of connection are supported
- * If the port is not specified, it will
- * default to 1883 for
- * A client identifier A convenience method is provided to generate a random client id that
- * should satisfy this criteria - {@link #generateClientId()}. As the client identifier
- * is used by the server to identify a client when it reconnects, the client must use the
- * same identifier between connections if durable subscriptions or reliable
- * delivery of messages is required.
+ * A client identifier
+ * A convenience method is provided to generate a random client id that
+ * should satisfy this criteria - {@link #generateClientId()}. As the client
+ * identifier is used by the server to identify a client when it reconnects,
+ * the client must use the same identifier between connections if durable
+ * subscriptions or reliable delivery of messages is required.
*
* In Java SE, SSL can be configured in one of several ways, which the
* client will use in the following order:
* In Java ME, the platform settings are used for SSL connections.
+ * In Java ME, the platform settings are used for SSL connections.
+ * An instance of the default persistence mechanism {@link MqttDefaultFilePersistence}
- * is used by the client. To specify a different persistence mechanism or to turn
- * off persistence, use the {@link #MqttAsyncClient(String, String, MqttClientPersistence)}
+ *
+ * An instance of the default persistence mechanism
+ * {@link MqttDefaultFilePersistence} is used by the client. To specify a
+ * different persistence mechanism or to turn off persistence, use the
+ * {@link #MqttAsyncClient(String, String, MqttClientPersistence)}
* constructor.
*
- * @param serverURI the address of the server to connect to, specified as a URI. Can be overridden using
- * {@link MqttConnectOptions#setServerURIs(String[])}
- * @param clientId a client identifier that is unique on the server being connected to
- * @throws IllegalArgumentException if the URI does not start with
- * "tcp://", "ssl://" or "local://".
- * @throws IllegalArgumentException if the clientId is null or is greater than 65535 characters in length
- * @throws MqttException if any other problem was encountered
+ * @param serverURI
+ * the address of the server to connect to, specified as a URI.
+ * Can be overridden using
+ * {@link MqttConnectOptions#setServerURIs(String[])}
+ * @param clientId
+ * a client identifier that is unique on the server being
+ * connected to
+ * @throws IllegalArgumentException
+ * if the URI does not start with "tcp://", "ssl://" or
+ * "local://".
+ * @throws IllegalArgumentException
+ * if the clientId is null or is greater than 65535 characters
+ * in length
+ * @throws MqttException
+ * if any other problem was encountered
*/
public MqttAsyncClient(String serverURI, String clientId) throws MqttException {
this(serverURI, clientId, new MqttDefaultFilePersistence());
}
/**
- * Create an MqttAsyncClient that is used to communicate with an MQTT server.
+ * Create an MqttAsyncClient that is used to communicate with an MQTT
+ * server.
*
- * The address of a server can be specified on the constructor. Alternatively
- * a list containing one or more servers can be specified using the
- * {@link MqttConnectOptions#setServerURIs(String[]) setServerURIs} method
- * on MqttConnectOptions.
+ * The address of a server can be specified on the constructor.
+ * Alternatively a list containing one or more servers can be specified
+ * using the {@link MqttConnectOptions#setServerURIs(String[])
+ * setServerURIs} method on MqttConnectOptions.
*
- * The
+ * The The address of the server to connect to is specified as a URI. Two types of
- * connection are supported
+ * The address of the server to connect to is specified as a URI. Two types
+ * of connection are supported
- * If the port is not specified, it will
- * default to 1883 for
- * A client identifier A convenience method is provided to generate a random client id that
- * should satisfy this criteria - {@link #generateClientId()}. As the client identifier
- * is used by the server to identify a client when it reconnects, the client must use the
- * same identifier between connections if durable subscriptions or reliable
- * delivery of messages is required.
+ * A client identifier
+ * A convenience method is provided to generate a random client id that
+ * should satisfy this criteria - {@link #generateClientId()}. As the client
+ * identifier is used by the server to identify a client when it reconnects,
+ * the client must use the same identifier between connections if durable
+ * subscriptions or reliable delivery of messages is required.
*
* In Java SE, SSL can be configured in one of several ways, which the
* client will use in the following order:
* In Java ME, the platform settings are used for SSL connections.
- * A persistence mechanism is used to enable reliable messaging.
- * For messages sent at qualities of service (QoS) 1 or 2 to be reliably delivered,
- * messages must be stored (on both the client and server) until the delivery of the message
- * is complete. If messages are not safely stored when being delivered then
- * a failure in the client or server can result in lost messages. A pluggable
- * persistence mechanism is supported via the {@link MqttClientPersistence}
- * interface. An implementer of this interface that safely stores messages
- * must be specified in order for delivery of messages to be reliable. In
- * addition {@link MqttConnectOptions#setCleanSession(boolean)} must be set
- * to false. In the event that only QoS 0 messages are sent or received or
+ * In Java ME, the platform settings are used for SSL connections.
+ *
+ * A persistence mechanism is used to enable reliable messaging. For
+ * messages sent at qualities of service (QoS) 1 or 2 to be reliably
+ * delivered, messages must be stored (on both the client and server) until
+ * the delivery of the message is complete. If messages are not safely
+ * stored when being delivered then a failure in the client or server can
+ * result in lost messages. A pluggable persistence mechanism is supported
+ * via the {@link MqttClientPersistence} interface. An implementer of this
+ * interface that safely stores messages must be specified in order for
+ * delivery of messages to be reliable. In addition
+ * {@link MqttConnectOptions#setCleanSession(boolean)} must be set to false.
+ * In the event that only QoS 0 messages are sent or received or
* cleanSession is set to true then a safe store is not needed.
* An implementation of file-based persistence is provided in
- * class {@link MqttDefaultFilePersistence} which will work in all Java SE based
- * systems. If no persistence is needed, the persistence parameter
- * can be explicitly set to
+ * An implementation of file-based persistence is provided in class
+ * {@link MqttDefaultFilePersistence} which will work in all Java SE based
+ * systems. If no persistence is needed, the persistence parameter can be
+ * explicitly set to
- * The address of a server can be specified on the constructor. Alternatively
- * a list containing one or more servers can be specified using the
- * {@link MqttConnectOptions#setServerURIs(String[]) setServerURIs} method
- * on MqttConnectOptions.
+ * The address of a server can be specified on the constructor.
+ * Alternatively a list containing one or more servers can be specified
+ * using the {@link MqttConnectOptions#setServerURIs(String[])
+ * setServerURIs} method on MqttConnectOptions.
*
- * The
+ * The The address of the server to connect to is specified as a URI. Two types of
- * connection are supported
+ * The address of the server to connect to is specified as a URI. Two types
+ * of connection are supported
- * If the port is not specified, it will
- * default to 1883 for
- * A client identifier A convenience method is provided to generate a random client id that
- * should satisfy this criteria - {@link #generateClientId()}. As the client identifier
- * is used by the server to identify a client when it reconnects, the client must use the
- * same identifier between connections if durable subscriptions or reliable
- * delivery of messages is required.
+ * A client identifier
+ * A convenience method is provided to generate a random client id that
+ * should satisfy this criteria - {@link #generateClientId()}. As the client
+ * identifier is used by the server to identify a client when it reconnects,
+ * the client must use the same identifier between connections if durable
+ * subscriptions or reliable delivery of messages is required.
*
* In Java SE, SSL can be configured in one of several ways, which the
* client will use in the following order:
* In Java ME, the platform settings are used for SSL connections.
- * A persistence mechanism is used to enable reliable messaging.
- * For messages sent at qualities of service (QoS) 1 or 2 to be reliably delivered,
- * messages must be stored (on both the client and server) until the delivery of the message
- * is complete. If messages are not safely stored when being delivered then
- * a failure in the client or server can result in lost messages. A pluggable
- * persistence mechanism is supported via the {@link MqttClientPersistence}
- * interface. An implementer of this interface that safely stores messages
- * must be specified in order for delivery of messages to be reliable. In
- * addition {@link MqttConnectOptions#setCleanSession(boolean)} must be set
- * to false. In the event that only QoS 0 messages are sent or received or
+ * In Java ME, the platform settings are used for SSL connections.
+ *
+ * A persistence mechanism is used to enable reliable messaging. For
+ * messages sent at qualities of service (QoS) 1 or 2 to be reliably
+ * delivered, messages must be stored (on both the client and server) until
+ * the delivery of the message is complete. If messages are not safely
+ * stored when being delivered then a failure in the client or server can
+ * result in lost messages. A pluggable persistence mechanism is supported
+ * via the {@link MqttClientPersistence} interface. An implementer of this
+ * interface that safely stores messages must be specified in order for
+ * delivery of messages to be reliable. In addition
+ * {@link MqttConnectOptions#setCleanSession(boolean)} must be set to false.
+ * In the event that only QoS 0 messages are sent or received or
* cleanSession is set to true then a safe store is not needed.
* An implementation of file-based persistence is provided in
- * class {@link MqttDefaultFilePersistence} which will work in all Java SE based
- * systems. If no persistence is needed, the persistence parameter
- * can be explicitly set to
+ * An implementation of file-based persistence is provided in class
+ * {@link MqttDefaultFilePersistence} which will work in all Java SE based
+ * systems. If no persistence is needed, the persistence parameter can be
+ * explicitly set to
- * Because the client is able to establish the TCP/IP connection to a none MQTT server and it will certainly fail to
- * send the disconnect packet.
+ * Because the client is able to establish the TCP/IP connection to a none
+ * MQTT server and it will certainly fail to send the disconnect packet.
*
- * @param quiesceTimeout the amount of time in milliseconds to allow for existing work to finish before
- * disconnecting. A value of zero or less means the client will not quiesce.
- * @param disconnectTimeout the amount of time in milliseconds to allow send disconnect packet to server.
- * @param sendDisconnectPacket if true, will send the disconnect packet to the server
- * @throws MqttException if any unexpected error
+ * @param quiesceTimeout
+ * the amount of time in milliseconds to allow for existing work
+ * to finish before disconnecting. A value of zero or less means
+ * the client will not quiesce.
+ * @param disconnectTimeout
+ * the amount of time in milliseconds to allow send disconnect
+ * packet to server.
+ * @param sendDisconnectPacket
+ * if true, will send the disconnect packet to the server
+ * @throws MqttException
+ * if any unexpected error
*/
- public void disconnectForcibly(long quiesceTimeout, long disconnectTimeout, boolean sendDisconnectPacket) throws MqttException {
- comms.disconnectForcibly(quiesceTimeout, disconnectTimeout, sendDisconnectPacket);
- }
+ public void disconnectForcibly(long quiesceTimeout, long disconnectTimeout, boolean sendDisconnectPacket)
+ throws MqttException {
+ comms.disconnectForcibly(quiesceTimeout, disconnectTimeout, sendDisconnectPacket);
+ }
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see IMqttAsyncClient#isConnected()
*/
public boolean isConnected() {
return comms.isConnected();
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see IMqttAsyncClient#getClientId()
*/
public String getClientId() {
return clientId;
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see IMqttAsyncClient#getServerURI()
*/
public String getServerURI() {
@@ -768,118 +917,143 @@ public String getServerURI() {
}
/**
- * Returns the currently connected Server URI
- * Implemented due to: https://bugs.eclipse.org/bugs/show_bug.cgi?id=481097
+ * Returns the currently connected Server URI Implemented due to:
+ * https://bugs.eclipse.org/bugs/show_bug.cgi?id=481097
*
* Where getServerURI only returns the URI that was provided in
* MqttAsyncClient's constructor, getCurrentServerURI returns the URI of the
- * Server that the client is currently connected to. This would be different in scenarios
- * where multiple server URIs have been provided to the MqttConnectOptions.
+ * Server that the client is currently connected to. This would be different
+ * in scenarios where multiple server URIs have been provided to the
+ * MqttConnectOptions.
*
* @return the currently connected server URI
*/
- public String getCurrentServerURI(){
+ public String getCurrentServerURI() {
return comms.getNetworkModules()[comms.getNetworkModuleIndex()].getServerURI();
}
/**
* Get a topic object which can be used to publish messages.
- * There are two alternative methods that should be used in preference to this one when publishing a message:
+ * There are two alternative methods that should be used in preference to
+ * this one when publishing a message:
+ * When you build an application,
- * the design of the topic tree should take into account the following principles
- * of topic name syntax and semantics:
+ * When you build an application, the design of the topic tree should take
+ * into account the following principles of topic name syntax and semantics:
+ * The following principles apply to the construction and content of a topic
- * tree:
+ * The following principles apply to the construction and content of a topic
+ * tree:
+ * By default, client sends PingReq to server to keep the connection to
- * server. For some platforms which cannot use this mechanism, such as Android,
- * developer needs to handle the ping request manually with this method.
- * By default, client
+ * sends PingReq to server to keep the connection to server. For some
+ * platforms which cannot use this mechanism, such as Android, developer
+ * needs to handle the ping request manually with this method.
*
*
- * serverURI
parameter is typically used with the
- * the clientId
parameter to form a key. The key
- * is used to store and reference messages while they are being delivered.
- * Hence the serverURI specified on the constructor must still be specified even if a list
- * of servers is specified on an MqttConnectOptions object.
- * The serverURI on the constructor must remain the same across
- * restarts of the client for delivery of messages to be maintained from a given
- * client to a given server or set of servers.
+ * serverURI
parameter is typically used with the the
+ * clientId
parameter to form a key. The key is used to store
+ * and reference messages while they are being delivered. Hence the
+ * serverURI specified on the constructor must still be specified even if a
+ * list of servers is specified on an MqttConnectOptions object. The
+ * serverURI on the constructor must remain the same across restarts of the
+ * client for delivery of messages to be maintained from a given client to a
+ * given server or set of servers.
*
- * tcp://
for a TCP connection and
- * ssl://
for a TCP connection secured by SSL/TLS.
- * For example:tcp://
for a TCP connection and
+ * ssl://
for a TCP connection secured by SSL/TLS. For example:
+ *
- *
* tcp://localhost:1883
ssl://localhost:8883
tcp://localhost:1883
ssl://localhost:8883
tcp://
" URIs, and 8883 for ssl://
URIs.
+ * If the port is not specified, it will default to 1883 for
+ * tcp://
" URIs, and 8883 for ssl://
URIs.
* clientId
must be specified and be less that 65535 characters.
- * It must be unique across all clients connecting to the same
- * server. The clientId is used by the server to store data related to the client,
- * hence it is important that the clientId remain the same when connecting to a server
- * if durable subscriptions or reliable messaging are required.
- * clientId
must be specified and be less
+ * that 65535 characters. It must be unique across all clients connecting to
+ * the same server. The clientId is used by the server to store data related
+ * to the client, hence it is important that the clientId remain the same
+ * when connecting to a server if durable subscriptions or reliable
+ * messaging are required.
+ *
- *
*
- * SSLSocketFactory
- applications can
- * use {@link MqttConnectOptions#setSocketFactory(SocketFactory)} to supply
- * a factory with the appropriate SSL settings.SSLSocketFactory
-
+ * applications can use
+ * {@link MqttConnectOptions#setSocketFactory(SocketFactory)} to supply a
+ * factory with the appropriate SSL settings.serverURI
parameter is typically used with the
- * the clientId
parameter to form a key. The key
- * is used to store and reference messages while they are being delivered.
- * Hence the serverURI specified on the constructor must still be specified even if a list
- * of servers is specified on an MqttConnectOptions object.
- * The serverURI on the constructor must remain the same across
- * restarts of the client for delivery of messages to be maintained from a given
- * client to a given server or set of servers.
+ * serverURI
parameter is typically used with the the
+ * clientId
parameter to form a key. The key is used to store
+ * and reference messages while they are being delivered. Hence the
+ * serverURI specified on the constructor must still be specified even if a
+ * list of servers is specified on an MqttConnectOptions object. The
+ * serverURI on the constructor must remain the same across restarts of the
+ * client for delivery of messages to be maintained from a given client to a
+ * given server or set of servers.
*
- * tcp://
for a TCP connection and
- * ssl://
for a TCP connection secured by SSL/TLS.
- * For example:tcp://
for a TCP connection and
+ * ssl://
for a TCP connection secured by SSL/TLS. For example:
+ *
- *
* tcp://localhost:1883
ssl://localhost:8883
tcp://localhost:1883
ssl://localhost:8883
tcp://
" URIs, and 8883 for ssl://
URIs.
+ * If the port is not specified, it will default to 1883 for
+ * tcp://
" URIs, and 8883 for ssl://
URIs.
* clientId
must be specified and be less that 65535 characters.
- * It must be unique across all clients connecting to the same
- * server. The clientId is used by the server to store data related to the client,
- * hence it is important that the clientId remain the same when connecting to a server
- * if durable subscriptions or reliable messaging are required.
- * clientId
must be specified and be less
+ * that 65535 characters. It must be unique across all clients connecting to
+ * the same server. The clientId is used by the server to store data related
+ * to the client, hence it is important that the clientId remain the same
+ * when connecting to a server if durable subscriptions or reliable
+ * messaging are required.
+ *
- *
*
- * SSLSocketFactory
- applications can
- * use {@link MqttConnectOptions#setSocketFactory(SocketFactory)} to supply
- * a factory with the appropriate SSL settings.SSLSocketFactory
-
+ * applications can use
+ * {@link MqttConnectOptions#setSocketFactory(SocketFactory)} to supply a
+ * factory with the appropriate SSL settings.null
.null
.
+ * serverURI
parameter is typically used with the
- * the clientId
parameter to form a key. The key
- * is used to store and reference messages while they are being delivered.
- * Hence the serverURI specified on the constructor must still be specified even if a list
- * of servers is specified on an MqttConnectOptions object.
- * The serverURI on the constructor must remain the same across
- * restarts of the client for delivery of messages to be maintained from a given
- * client to a given server or set of servers.
+ * serverURI
parameter is typically used with the the
+ * clientId
parameter to form a key. The key is used to store
+ * and reference messages while they are being delivered. Hence the
+ * serverURI specified on the constructor must still be specified even if a
+ * list of servers is specified on an MqttConnectOptions object. The
+ * serverURI on the constructor must remain the same across restarts of the
+ * client for delivery of messages to be maintained from a given client to a
+ * given server or set of servers.
*
- * tcp://
for a TCP connection and
- * ssl://
for a TCP connection secured by SSL/TLS.
- * For example:tcp://
for a TCP connection and
+ * ssl://
for a TCP connection secured by SSL/TLS. For example:
+ *
- *
* tcp://localhost:1883
ssl://localhost:8883
tcp://localhost:1883
ssl://localhost:8883
tcp://
" URIs, and 8883 for ssl://
URIs.
+ * If the port is not specified, it will default to 1883 for
+ * tcp://
" URIs, and 8883 for ssl://
URIs.
* clientId
must be specified and be less that 65535 characters.
- * It must be unique across all clients connecting to the same
- * server. The clientId is used by the server to store data related to the client,
- * hence it is important that the clientId remain the same when connecting to a server
- * if durable subscriptions or reliable messaging are required.
- * clientId
must be specified and be less
+ * that 65535 characters. It must be unique across all clients connecting to
+ * the same server. The clientId is used by the server to store data related
+ * to the client, hence it is important that the clientId remain the same
+ * when connecting to a server if durable subscriptions or reliable
+ * messaging are required.
+ *
- *
*
- * SSLSocketFactory
- applications can
- * use {@link MqttConnectOptions#setSocketFactory(SocketFactory)} to supply
- * a factory with the appropriate SSL settings.SSLSocketFactory
-
+ * applications can use
+ * {@link MqttConnectOptions#setSocketFactory(SocketFactory)} to supply a
+ * factory with the appropriate SSL settings.null
.null
.
+ *
- *
- *
- *
*
- *
- *
+ *
+ * When cleanSession is set to false, an application must ensure it uses the + * same client identifier when it reconnects to the server to resume state + * and maintain assured message delivery. + *
+ * * @return a generated client identifier * @see MqttConnectOptions#setCleanSession(boolean) */ public static String generateClientId() { - //length of nanoTime = 15, so total length = 19 < 65535(defined in spec) + // length of nanoTime = 15, so total length = 19 < 65535(defined in + // spec) return CLIENT_ID_PREFIX + System.nanoTime(); } - /* (non-Javadoc) + /* + * (non-Javadoc) + * * @see IMqttAsyncClient#getPendingDeliveryTokens() */ public IMqttDeliveryToken[] getPendingDeliveryTokens() { return comms.getPendingDeliveryTokens(); } - /* (non-Javadoc) - * @see org.eclipse.paho.client.mqttv3.IMqttAsyncClient#publish(java.lang.String, byte[], int, boolean, java.lang.Object, org.eclipse.paho.client.mqttv3.IMqttActionListener) + /* + * (non-Javadoc) + * + * @see + * org.eclipse.paho.client.mqttv3.IMqttAsyncClient#publish(java.lang.String, + * byte[], int, boolean, java.lang.Object, + * org.eclipse.paho.client.mqttv3.IMqttActionListener) */ - public IMqttDeliveryToken publish(String topic, byte[] payload, int qos, - boolean retained, Object userContext, IMqttActionListener callback) throws MqttException, - MqttPersistenceException { + public IMqttDeliveryToken publish(String topic, byte[] payload, int qos, boolean retained, Object userContext, + IMqttActionListener callback) throws MqttException, MqttPersistenceException { MqttMessage message = new MqttMessage(payload); message.setQos(qos); message.setRetained(retained); return this.publish(topic, message, userContext, callback); } - /* (non-Javadoc) - * @see org.eclipse.paho.client.mqttv3.IMqttAsyncClient#publish(java.lang.String, byte[], int, boolean) + + /* + * (non-Javadoc) + * + * @see + * org.eclipse.paho.client.mqttv3.IMqttAsyncClient#publish(java.lang.String, + * byte[], int, boolean) */ - public IMqttDeliveryToken publish(String topic, byte[] payload, int qos, - boolean retained) throws MqttException, MqttPersistenceException { + public IMqttDeliveryToken publish(String topic, byte[] payload, int qos, boolean retained) + throws MqttException, MqttPersistenceException { return this.publish(topic, payload, qos, retained, null, null); } - /* (non-Javadoc) - * @see org.eclipse.paho.client.mqttv3.IMqttAsyncClient#publish(java.lang.String, org.eclipse.paho.client.mqttv3.MqttMessage) + /* + * (non-Javadoc) + * + * @see + * org.eclipse.paho.client.mqttv3.IMqttAsyncClient#publish(java.lang.String, + * org.eclipse.paho.client.mqttv3.MqttMessage) */ - public IMqttDeliveryToken publish(String topic, MqttMessage message) throws MqttException, MqttPersistenceException { + public IMqttDeliveryToken publish(String topic, MqttMessage message) + throws MqttException, MqttPersistenceException { return this.publish(topic, message, null, null); } - - /* (non-Javadoc) - * @see org.eclipse.paho.client.mqttv3.IMqttAsyncClient#publish(java.lang.String, org.eclipse.paho.client.mqttv3.MqttMessage, java.lang.Object, org.eclipse.paho.client.mqttv3.IMqttActionListener) + /* + * (non-Javadoc) + * + * @see + * org.eclipse.paho.client.mqttv3.IMqttAsyncClient#publish(java.lang.String, + * org.eclipse.paho.client.mqttv3.MqttMessage, java.lang.Object, + * org.eclipse.paho.client.mqttv3.IMqttActionListener) */ - public IMqttDeliveryToken publish(String topic, MqttMessage message, Object userContext, IMqttActionListener callback) throws MqttException, - MqttPersistenceException { + public IMqttDeliveryToken publish(String topic, MqttMessage message, Object userContext, + IMqttActionListener callback) throws MqttException, MqttPersistenceException { final String methodName = "publish"; - //@TRACE 111=< topic={0} message={1}userContext={1} callback={2} - log.fine(CLASS_NAME,methodName,"111", new Object[] {topic, userContext, callback}); + // @TRACE 111=< topic={0} message={1}userContext={1} callback={2} + log.fine(CLASS_NAME, methodName, "111", new Object[] { topic, userContext, callback }); - //Checks if a topic is valid when publishing a message. - MqttTopic.validate(topic, false/*wildcards NOT allowed*/); + // Checks if a topic is valid when publishing a message. + MqttTopic.validate(topic, false/* wildcards NOT allowed */); MqttDeliveryToken token = new MqttDeliveryToken(getClientId()); token.setActionCallback(callback); token.setUserContext(userContext); token.setMessage(message); - token.internalTok.setTopics(new String[] {topic}); + token.internalTok.setTopics(new String[] { topic }); MqttPublish pubMsg = new MqttPublish(topic, message); comms.sendNoWait(pubMsg, token); - //@TRACE 112=< - log.fine(CLASS_NAME,methodName,"112"); + // @TRACE 112=< + log.fine(CLASS_NAME, methodName, "112"); return token; } /** * User triggered attempt to reconnect - * @throws MqttException if there is an issue with reconnecting + * + * @throws MqttException + * if there is an issue with reconnecting */ public void reconnect() throws MqttException { final String methodName = "reconnect"; - //@Trace 500=Attempting to reconnect client: {0} - log.fine(CLASS_NAME, methodName, "500", new Object[]{this.clientId}); - // Some checks to make sure that we're not attempting to reconnect an already connected client + // @Trace 500=Attempting to reconnect client: {0} + log.fine(CLASS_NAME, methodName, "500", new Object[] { this.clientId }); + // Some checks to make sure that we're not attempting to reconnect an + // already connected client if (comms.isConnected()) { throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_CONNECTED); } @@ -1151,50 +1390,46 @@ public void reconnect() throws MqttException { attemptReconnect(); } - /** - * Attempts to reconnect the client to the server. - * If successful it will make sure that there are no further - * reconnects scheduled. However if the connect fails, the delay will double - * up to 128 seconds and will re-schedule the reconnect for after the delay. + * Attempts to reconnect the client to the server. If successful it will + * make sure that there are no further reconnects scheduled. However if the + * connect fails, the delay will double up to 128 seconds and will + * re-schedule the reconnect for after the delay. * * Any thrown exceptions are logged but not acted upon as it is assumed that * they are being thrown due to the server being offline and so reconnect * attempts will continue. */ - private void attemptReconnect(){ + private void attemptReconnect() { final String methodName = "attemptReconnect"; - //@Trace 500=Attempting to reconnect client: {0} - log.fine(CLASS_NAME, methodName, "500", new Object[]{this.clientId}); + // @Trace 500=Attempting to reconnect client: {0} + log.fine(CLASS_NAME, methodName, "500", new Object[] { this.clientId }); try { - connect(this.connOpts, this.userContext,new MqttReconnectActionListener(methodName)); + connect(this.connOpts, this.userContext, new MqttReconnectActionListener(methodName)); } catch (MqttSecurityException ex) { - //@TRACE 804=exception - log.fine(CLASS_NAME,methodName,"804",null, ex); + // @TRACE 804=exception + log.fine(CLASS_NAME, methodName, "804", null, ex); } catch (MqttException ex) { - //@TRACE 804=exception - log.fine(CLASS_NAME,methodName,"804",null, ex); + // @TRACE 804=exception + log.fine(CLASS_NAME, methodName, "804", null, ex); } } - - - - private void startReconnectCycle(){ + private void startReconnectCycle() { String methodName = "startReconnectCycle"; - //@Trace 503=Start reconnect timer for client: {0}, delay: {1} - log.fine(CLASS_NAME, methodName, "503", new Object[]{this.clientId, new Long(reconnectDelay)}); + // @Trace 503=Start reconnect timer for client: {0}, delay: {1} + log.fine(CLASS_NAME, methodName, "503", new Object[] { this.clientId, new Long(reconnectDelay) }); reconnectTimer = new Timer("MQTT Reconnect: " + clientId); reconnectTimer.schedule(new ReconnectTask(), reconnectDelay); } - private void stopReconnectCycle(){ + private void stopReconnectCycle() { String methodName = "stopReconnectCycle"; - //@Trace 504=Stop reconnect timer for client: {0} - log.fine(CLASS_NAME, methodName, "504", new Object[]{this.clientId}); - synchronized(clientLock) { + // @Trace 504=Stop reconnect timer for client: {0} + log.fine(CLASS_NAME, methodName, "504", new Object[] { this.clientId }); + synchronized (clientLock) { if (this.connOpts.isAutomaticReconnect()) { - if(reconnectTimer != null){ + if (reconnectTimer != null) { reconnectTimer.cancel(); reconnectTimer = null; } @@ -1203,157 +1438,173 @@ private void stopReconnectCycle(){ } } - private class ReconnectTask extends TimerTask { private static final String methodName = "ReconnectTask.run"; + public void run() { - //@Trace 506=Triggering Automatic Reconnect attempt. + // @Trace 506=Triggering Automatic Reconnect attempt. log.fine(CLASS_NAME, methodName, "506"); attemptReconnect(); } } - - class MqttReconnectCallback implements MqttCallbackExtended{ + + class MqttReconnectCallback implements MqttCallbackExtended { final boolean automaticReconnect; - MqttReconnectCallback(boolean isAutomaticReconnect){ + MqttReconnectCallback(boolean isAutomaticReconnect) { automaticReconnect = isAutomaticReconnect; } - + public void connectionLost(Throwable cause) { - if(automaticReconnect){ - // Automatic reconnect is set so make sure comms is in resting state + if (automaticReconnect) { + // Automatic reconnect is set so make sure comms is in resting + // state comms.setRestingState(true); reconnecting = true; startReconnectCycle(); } } - public void messageArrived(String topic, MqttMessage message) throws Exception {} + public void messageArrived(String topic, MqttMessage message) throws Exception { + } - public void deliveryComplete(IMqttDeliveryToken token) {} + public void deliveryComplete(IMqttDeliveryToken token) { + } + + public void connectComplete(boolean reconnect, String serverURI) { + } - public void connectComplete(boolean reconnect, String serverURI) {} - } - - class MqttReconnectActionListener implements IMqttActionListener{ - - final String methodName; - - MqttReconnectActionListener(String methodName) { - this.methodName = methodName; - } - public void onSuccess(IMqttToken asyncActionToken) { - //@Trace 501=Automatic Reconnect Successful: {0} - log.fine(CLASS_NAME, methodName, "501", new Object[]{asyncActionToken.getClient().getClientId()}); - comms.setRestingState(false); - stopReconnectCycle(); - } - - public void onFailure(IMqttToken asyncActionToken, Throwable exception) { - //@Trace 502=Automatic Reconnect failed, rescheduling: {0} - log.fine(CLASS_NAME, methodName, "502", new Object[]{asyncActionToken.getClient().getClientId()}); - if(reconnectDelay < 128000){ - reconnectDelay = reconnectDelay * 2; - } - rescheduleReconnectCycle(reconnectDelay); + class MqttReconnectActionListener implements IMqttActionListener { + + final String methodName; + + MqttReconnectActionListener(String methodName) { + this.methodName = methodName; + } + + public void onSuccess(IMqttToken asyncActionToken) { + // @Trace 501=Automatic Reconnect Successful: {0} + log.fine(CLASS_NAME, methodName, "501", new Object[] { asyncActionToken.getClient().getClientId() }); + comms.setRestingState(false); + stopReconnectCycle(); + } + + public void onFailure(IMqttToken asyncActionToken, Throwable exception) { + // @Trace 502=Automatic Reconnect failed, rescheduling: {0} + log.fine(CLASS_NAME, methodName, "502", new Object[] { asyncActionToken.getClient().getClientId() }); + if (reconnectDelay < 128000) { + reconnectDelay = reconnectDelay * 2; } - - private void rescheduleReconnectCycle(int delay){ - String reschedulemethodName = methodName+ ":rescheduleReconnectCycle"; - //@Trace 505=Rescheduling reconnect timer for client: {0}, delay: {1} - log.fine(CLASS_NAME, reschedulemethodName, "505", new Object[]{MqttAsyncClient.this.clientId, String.valueOf(reconnectDelay)}); - synchronized(clientLock) { - if(MqttAsyncClient.this.connOpts.isAutomaticReconnect()) { - if (reconnectTimer != null) { - reconnectTimer.schedule(new ReconnectTask(), delay); - } else { - // The previous reconnect timer was cancelled - reconnectDelay = delay; - startReconnectCycle(); - } + rescheduleReconnectCycle(reconnectDelay); + } + + private void rescheduleReconnectCycle(int delay) { + String reschedulemethodName = methodName + ":rescheduleReconnectCycle"; + // @Trace 505=Rescheduling reconnect timer for client: {0}, delay: + // {1} + log.fine(CLASS_NAME, reschedulemethodName, "505", + new Object[] { MqttAsyncClient.this.clientId, String.valueOf(reconnectDelay) }); + synchronized (clientLock) { + if (MqttAsyncClient.this.connOpts.isAutomaticReconnect()) { + if (reconnectTimer != null) { + reconnectTimer.schedule(new ReconnectTask(), delay); + } else { + // The previous reconnect timer was cancelled + reconnectDelay = delay; + startReconnectCycle(); } } } - + } + } - + /** * Sets the DisconnectedBufferOptions for this client - * @param bufferOpts the {@link DisconnectedBufferOptions} + * + * @param bufferOpts + * the {@link DisconnectedBufferOptions} */ public void setBufferOpts(DisconnectedBufferOptions bufferOpts) { this.comms.setDisconnectedMessageBuffer(new DisconnectedMessageBuffer(bufferOpts)); } - /** * Returns the number of messages in the Disconnected Message Buffer + * * @return Count of messages in the buffer */ - public int getBufferedMessageCount(){ + public int getBufferedMessageCount() { return this.comms.getBufferedMessageCount(); } - /** * Returns a message from the Disconnected Message Buffer - * @param bufferIndex the index of the message to be retrieved. + * + * @param bufferIndex + * the index of the message to be retrieved. * @return the message located at the bufferIndex */ - public MqttMessage getBufferedMessage(int bufferIndex){ + public MqttMessage getBufferedMessage(int bufferIndex) { return this.comms.getBufferedMessage(bufferIndex); } /** * Deletes a message from the Disconnected Message Buffer - * @param bufferIndex the index of the message to be deleted. + * + * @param bufferIndex + * the index of the message to be deleted. */ - public void deleteBufferedMessage(int bufferIndex){ + public void deleteBufferedMessage(int bufferIndex) { this.comms.deleteBufferedMessage(bufferIndex); } /** - * Returns the current number of outgoing in-flight messages - * being sent by the client. Note that this number cannot be - * guaranteed to be 100% accurate as some messages may have - * been sent or queued in the time taken for this method to return. + * Returns the current number of outgoing in-flight messages being sent by + * the client. Note that this number cannot be guaranteed to be 100% + * accurate as some messages may have been sent or queued in the time taken + * for this method to return. + * * @return the current number of in-flight messages. */ - public int getInFlightMessageCount(){ + public int getInFlightMessageCount() { return this.comms.getActualInFlight(); } - /* (non-Javadoc) + /* + * (non-Javadoc) + * * @see org.eclipse.paho.client.mqttv3.IMqttAsyncClient#close() */ public void close() throws MqttException { close(false); } - - /* (non-Javadoc) + /* + * (non-Javadoc) + * * @see org.eclipse.paho.client.mqttv3.IMqttAsyncClient#close() */ public void close(boolean force) throws MqttException { final String methodName = "close"; - //@TRACE 113=< - log.fine(CLASS_NAME,methodName,"113"); + // @TRACE 113=< + log.fine(CLASS_NAME, methodName, "113"); comms.close(force); - //@TRACE 114=> - log.fine(CLASS_NAME,methodName,"114"); + // @TRACE 114=> + log.fine(CLASS_NAME, methodName, "114"); } /** * Return a debug object that can be used to help solve problems. + * * @return the {@link Debug} object */ public Debug getDebug() { - return new Debug(clientId,comms); + return new Debug(clientId, comms); } } diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttOutputStream.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttOutputStream.java index 412bf35fe..95b9cea9d 100644 --- a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttOutputStream.java +++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttOutputStream.java @@ -87,8 +87,8 @@ public void write(MqttWireMessage message) throws IOException, MqttException { clientState.notifySentBytes(length); } - // @TRACE 500= sent {0} - log.fine(CLASS_NAME, methodName, "500", new Object[]{message}); + // @TRACE 529= sent {0} + log.fine(CLASS_NAME, methodName, "529", new Object[]{message}); } } diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/persist/MqttDefaultFilePersistence.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/persist/MqttDefaultFilePersistence.java index 33e03949c..e86e7e9d4 100644 --- a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/persist/MqttDefaultFilePersistence.java +++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/persist/MqttDefaultFilePersistence.java @@ -298,5 +298,6 @@ public void clear() throws MqttPersistenceException { for (int i=0; i