Skip to content

Commit

Permalink
去掉mqtt log
Browse files Browse the repository at this point in the history
  • Loading branch information
puyang1017 committed Mar 31, 2020
1 parent d5f1b32 commit 6653287
Show file tree
Hide file tree
Showing 43 changed files with 36 additions and 2,312 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Logger;

import javax.net.SocketFactory;
import org.eclipse.paho.client.mqttv3.internal.ClientComms;
Expand All @@ -39,12 +39,9 @@
import org.eclipse.paho.client.mqttv3.internal.wire.MqttPublish;
import org.eclipse.paho.client.mqttv3.internal.wire.MqttSubscribe;
import org.eclipse.paho.client.mqttv3.internal.wire.MqttUnsubscribe;
import org.eclipse.paho.client.mqttv3.logging.Logger;
import org.eclipse.paho.client.mqttv3.logging.LoggerFactory;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import org.eclipse.paho.client.mqttv3.util.Debug;
import org.eclipse.paho.client.mqttv3.IMqttToken;

/**
* Lightweight client for talking to an MQTT server using non-blocking methods
Expand Down Expand Up @@ -92,7 +89,6 @@
*/
public class MqttAsyncClient implements IMqttAsyncClient {
private static final String CLASS_NAME = MqttAsyncClient.class.getName();
private Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT, CLASS_NAME);

private static final String CLIENT_ID_PREFIX = "paho";
private static final long QUIESCE_TIMEOUT = 30000; // ms
Expand Down Expand Up @@ -435,7 +431,6 @@ public MqttAsyncClient(String serverURI, String clientId, MqttClientPersistence
MqttPingSender pingSender, ScheduledExecutorService executorService) throws MqttException {
final String methodName = "MqttAsyncClient";

log.setResourceName(clientId);

if (clientId == null) { // Support empty client Id, 3.1.1 standard
throw new IllegalArgumentException("Null clientId");
Expand Down Expand Up @@ -464,7 +459,6 @@ public MqttAsyncClient(String serverURI, String clientId, MqttClientPersistence
this.executorService = executorService;

// @TRACE 101=<init> ClientID={0} ServerURI={1} PersistenceType={2}
log.fine(CLASS_NAME, methodName, "101", new Object[] { clientId, serverURI, persistence });

this.persistence.open(clientId, serverURI);
this.comms = new ClientComms(this, this.persistence, pingSender, this.executorService);
Expand Down Expand Up @@ -500,7 +494,6 @@ protected NetworkModule[] createNetworkModules(String address, MqttConnectOption
throws MqttException, MqttSecurityException {
final String methodName = "createNetworkModules";
// @TRACE 116=URI={0}
log.fine(CLASS_NAME, methodName, "116", new Object[] { address });

NetworkModule[] networkModules = null;
String[] serverURIs = options.getServerURIs();
Expand All @@ -518,7 +511,6 @@ protected NetworkModule[] createNetworkModules(String address, MqttConnectOption
networkModules[i] = createNetworkModule(array[i], options);
}

log.fine(CLASS_NAME, methodName, "108");
return networkModules;
}

Expand All @@ -533,7 +525,6 @@ protected NetworkModule[] createNetworkModules(String address, MqttConnectOption
private NetworkModule createNetworkModule(String address, MqttConnectOptions options) throws MqttException, MqttSecurityException {
final String methodName = "createNetworkModule";
// @TRACE 115=URI={0}
log.fine(CLASS_NAME,methodName, "115", new Object[] {address});


NetworkModule netModule = NetworkModuleService.createInstance(address, options, clientId);
Expand Down Expand Up @@ -616,11 +607,6 @@ public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttA

// @TRACE 103=cleanSession={0} connectionTimeout={1} TimekeepAlive={2}
// userName={3} password={4} will={5} userContext={6} callback={7}
log.fine(CLASS_NAME, methodName, "103",
new Object[] { Boolean.valueOf(options.isCleanSession()), Integer.valueOf(options.getConnectionTimeout()),
Integer.valueOf(options.getKeepAliveInterval()), options.getUserName(),
((null == options.getPassword()) ? "[null]" : "[notnull]"),
((null == options.getWillMessage()) ? "[null]" : "[notnull]"), userContext, callback });
comms.setNetworkModules(createNetworkModules(serverURI, options));
comms.setReconnectCallback(new MqttReconnectCallback(automaticReconnect));

Expand Down Expand Up @@ -683,7 +669,6 @@ public IMqttToken disconnect(long quiesceTimeout, Object userContext, IMqttActio
throws MqttException {
final String methodName = "disconnect";
// @TRACE 104=> quiesceTimeout={0} userContext={1} callback={2}
log.fine(CLASS_NAME, methodName, "104", new Object[] { Long.valueOf(quiesceTimeout), userContext, callback });

MqttToken token = new MqttToken(getClientId());
token.setActionCallback(callback);
Expand All @@ -694,11 +679,9 @@ public IMqttToken disconnect(long quiesceTimeout, Object userContext, IMqttActio
comms.disconnect(disconnect, quiesceTimeout, token);
} catch (MqttException ex) {
// @TRACE 105=< exception
log.fine(CLASS_NAME, methodName, "105", null, ex);
throw ex;
}
// @TRACE 108=<
log.fine(CLASS_NAME, methodName, "108");

return token;
}
Expand Down Expand Up @@ -872,11 +855,9 @@ public IMqttToken checkPing(Object userContext, IMqttActionListener callback) th
final String methodName = "ping";
MqttToken token;
// @TRACE 117=>
log.fine(CLASS_NAME, methodName, "117");

token = comms.checkForActivity(callback);
// @TRACE 118=<
log.fine(CLASS_NAME, methodName, "118");

return token;
}
Expand Down Expand Up @@ -942,17 +923,6 @@ private IMqttToken subscribeBase(String[] topicFilters, int[] qos, Object userCo
final String methodName = "subscribe";

// Only Generate Log string if we are logging at FINE level
if (log.isLoggable(Logger.FINE)) {
StringBuffer subs = new StringBuffer();
for (int i = 0; i < topicFilters.length; i++) {
if (i > 0) {
subs.append(", ");
}
subs.append("topic=").append(topicFilters[i]).append(" qos=").append(qos[i]);
}
// @TRACE 106=Subscribe topicFilter={0} userContext={1} callback={2}
log.fine(CLASS_NAME, methodName, "106", new Object[] { subs.toString(), userContext, callback });
}

MqttToken token = new MqttToken(getClientId());
token.setActionCallback(callback);
Expand All @@ -963,7 +933,6 @@ private IMqttToken subscribeBase(String[] topicFilters, int[] qos, Object userCo

comms.sendNoWait(register, token);
// @TRACE 109=<
log.fine(CLASS_NAME, methodName, "109");

return token;
}
Expand Down Expand Up @@ -1084,18 +1053,6 @@ public IMqttToken unsubscribe(String[] topicFilters, Object userContext, IMqttAc
final String methodName = "unsubscribe";

// Only Generate Log string if we are logging at FINE level
if (log.isLoggable(Logger.FINE)) {
String subs = "";
for (int i = 0; i < topicFilters.length; i++) {
if (i > 0) {
subs += ", ";
}
subs += topicFilters[i];
}

// @TRACE 107=Unsubscribe topic={0} userContext={1} callback={2}
log.fine(CLASS_NAME, methodName, "107", new Object[] { subs, userContext, callback });
}

for (String topicFilter : topicFilters) {
// Check if the topic filter is valid before unsubscribing
Expand All @@ -1119,7 +1076,6 @@ public IMqttToken unsubscribe(String[] topicFilters, Object userContext, IMqttAc

comms.sendNoWait(unregister, token);
// @TRACE 110=<
log.fine(CLASS_NAME, methodName, "110");

return token;
}
Expand Down Expand Up @@ -1236,7 +1192,6 @@ public IMqttDeliveryToken publish(String topic, MqttMessage message, Object user
IMqttActionListener callback) throws MqttException, MqttPersistenceException {
final String methodName = "publish";
// @TRACE 111=< topic={0} message={1}userContext={1} callback={2}
log.fine(CLASS_NAME, methodName, "111", new Object[] { topic, userContext, callback });

// Checks if a topic is valid when publishing a message.
MqttTopic.validate(topic, false/* wildcards NOT allowed */);
Expand All @@ -1251,7 +1206,6 @@ public IMqttDeliveryToken publish(String topic, MqttMessage message, Object user
comms.sendNoWait(pubMsg, token);

// @TRACE 112=<
log.fine(CLASS_NAME, methodName, "112");

return token;
}
Expand All @@ -1265,7 +1219,6 @@ public IMqttDeliveryToken publish(String topic, MqttMessage message, Object user
public void reconnect() throws MqttException {
final String methodName = "reconnect";
// @Trace 500=Attempting to reconnect client: {0}
log.fine(CLASS_NAME, methodName, "500", new Object[] { this.clientId });
// Some checks to make sure that we're not attempting to reconnect an
// already connected client
if (comms.isConnected()) {
Expand Down Expand Up @@ -1299,30 +1252,25 @@ public void reconnect() throws MqttException {
private void attemptReconnect() {
final String methodName = "attemptReconnect";
// @Trace 500=Attempting to reconnect client: {0}
log.fine(CLASS_NAME, methodName, "500", new Object[] { this.clientId });
try {
connect(this.connOpts, this.userContext, new MqttReconnectActionListener(methodName));
} catch (MqttSecurityException ex) {
// @TRACE 804=exception
log.fine(CLASS_NAME, methodName, "804", null, ex);
} catch (MqttException ex) {
// @TRACE 804=exception
log.fine(CLASS_NAME, methodName, "804", null, ex);
}
}

private void startReconnectCycle() {
String methodName = "startReconnectCycle";
// @Trace 503=Start reconnect timer for client: {0}, delay: {1}
log.fine(CLASS_NAME, methodName, "503", new Object[] { this.clientId, Long.valueOf(reconnectDelay) });
reconnectTimer = new Timer("MQTT Reconnect: " + clientId);
reconnectTimer.schedule(new ReconnectTask(), reconnectDelay);
}

private void stopReconnectCycle() {
String methodName = "stopReconnectCycle";
// @Trace 504=Stop reconnect timer for client: {0}
log.fine(CLASS_NAME, methodName, "504", new Object[] { this.clientId });
synchronized (clientLock) {
if (this.connOpts.isAutomaticReconnect()) {
if (reconnectTimer != null) {
Expand All @@ -1339,7 +1287,6 @@ private class ReconnectTask extends TimerTask {

public void run() {
// @Trace 506=Triggering Automatic Reconnect attempt.
log.fine(CLASS_NAME, methodName, "506");
attemptReconnect();
}
}
Expand Down Expand Up @@ -1383,14 +1330,12 @@ class MqttReconnectActionListener implements IMqttActionListener {

public void onSuccess(IMqttToken asyncActionToken) {
// @Trace 501=Automatic Reconnect Successful: {0}
log.fine(CLASS_NAME, methodName, "501", new Object[] { asyncActionToken.getClient().getClientId() });
comms.setRestingState(false);
stopReconnectCycle();
}

public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
// @Trace 502=Automatic Reconnect failed, rescheduling: {0}
log.fine(CLASS_NAME, methodName, "502", new Object[] { asyncActionToken.getClient().getClientId() });
if (reconnectDelay < connOpts.getMaxReconnectDelay()) {
reconnectDelay = reconnectDelay * 2;
}
Expand All @@ -1401,8 +1346,6 @@ private void rescheduleReconnectCycle(int delay) {
String reschedulemethodName = methodName + ":rescheduleReconnectCycle";
// @Trace 505=Rescheduling reconnect timer for client: {0}, delay:
// {1}
log.fine(CLASS_NAME, reschedulemethodName, "505",
new Object[] { MqttAsyncClient.this.clientId, String.valueOf(reconnectDelay) });
synchronized (clientLock) {
if (MqttAsyncClient.this.connOpts.isAutomaticReconnect()) {
if (reconnectTimer != null) {
Expand Down Expand Up @@ -1487,10 +1430,8 @@ public void close() throws MqttException {
public void close(boolean force) throws MqttException {
final String methodName = "close";
// @TRACE 113=<
log.fine(CLASS_NAME, methodName, "113");
comms.close(force);
// @TRACE 114=>
log.fine(CLASS_NAME, methodName, "114");

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import java.util.concurrent.TimeUnit;

import org.eclipse.paho.client.mqttv3.internal.ClientComms;
import org.eclipse.paho.client.mqttv3.logging.Logger;
import org.eclipse.paho.client.mqttv3.logging.LoggerFactory;

/**
* Default ping sender implementation
Expand All @@ -33,7 +31,6 @@
*/
public class ScheduledExecutorPingSender implements MqttPingSender {
private static final String CLASS_NAME = ScheduledExecutorPingSender.class.getName();
private final Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT, CLASS_NAME);

private ClientComms comms;
private ScheduledExecutorService executorService;
Expand All @@ -59,15 +56,13 @@ public void start() {
final String methodName = "start";

//@Trace 659=start timer for client:{0}
log.fine(CLASS_NAME, methodName, "659", new Object[]{ clientid });
//Check ping after first keep alive interval.
schedule(comms.getKeepAlive());
}

public void stop() {
final String methodName = "stop";
//@Trace 661=stop
log.fine(CLASS_NAME, methodName, "661", null);
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
Expand All @@ -84,7 +79,6 @@ public void run() {
String originalThreadName = Thread.currentThread().getName();
Thread.currentThread().setName("MQTT Ping: " + clientid);
//@Trace 660=Check schedule at {0}
log.fine(CLASS_NAME, methodName, "660", new Object[]{ Long.valueOf(System.nanoTime()) });
comms.checkForActivity();
Thread.currentThread().setName(originalThreadName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
import java.util.TimerTask;

import org.eclipse.paho.client.mqttv3.internal.ClientComms;
import org.eclipse.paho.client.mqttv3.logging.Logger;
import org.eclipse.paho.client.mqttv3.logging.LoggerFactory;

/**
* Default ping sender implementation
Expand All @@ -31,7 +29,6 @@
*/
public class TimerPingSender implements MqttPingSender {
private static final String CLASS_NAME = TimerPingSender.class.getName();
private Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT,CLASS_NAME);

private ClientComms comms;
private Timer timer;
Expand All @@ -43,15 +40,13 @@ public void init(ClientComms comms) {
}
this.comms = comms;
clientid = comms.getClient().getClientId();
log.setResourceName(clientid);
}

public void start() {
final String methodName = "start";

//@Trace 659=start timer for client:{0}
log.fine(CLASS_NAME, methodName, "659", new Object[]{clientid});


timer = new Timer("MQTT Ping: " + clientid);
//Check ping after first keep alive interval.
timer.schedule(new PingTask(), comms.getKeepAlive());
Expand All @@ -60,7 +55,6 @@ public void start() {
public void stop() {
final String methodName = "stop";
//@Trace 661=stop
log.fine(CLASS_NAME, methodName, "661", null);
if(timer != null){
timer.cancel();
}
Expand All @@ -75,8 +69,7 @@ private class PingTask extends TimerTask {

public void run() {
//@Trace 660=Check schedule at {0}
log.fine(CLASS_NAME, methodName, "660", new Object[]{Long.valueOf(System.nanoTime())});
comms.checkForActivity();
comms.checkForActivity();
}
}
}
Loading

0 comments on commit 6653287

Please sign in to comment.