diff --git a/src/mqtt.service.ts b/src/mqtt.service.ts index 7c16e31..32e48b8 100644 --- a/src/mqtt.service.ts +++ b/src/mqtt.service.ts @@ -59,6 +59,7 @@ export class MqttService { private _onConnect: EventEmitter = new EventEmitter(); private _onClose: EventEmitter = new EventEmitter(); + private _onOffline: EventEmitter = new EventEmitter(); private _onError: EventEmitter = new EventEmitter(); private _onReconnect: EventEmitter = new EventEmitter(); private _onMessage: EventEmitter = new EventEmitter(); @@ -115,6 +116,7 @@ export class MqttService { this.client.stream.on('error', this._handleOnError); this.client.on('reconnect', this._handleOnReconnect); this.client.on('message', this._handleOnMessage); + this.client.on('offline', this._handleOnOffline); } /** @@ -285,6 +287,11 @@ export class MqttService { return this._onClose; } + /** An EventEmitter to listen to offline events */ + public get onOffline(): EventEmitter { + return this._onOffline; + } + /** An EventEmitter to listen to connect messages */ public get onConnect(): EventEmitter { return this._onConnect; @@ -315,6 +322,11 @@ export class MqttService { this._onClose.emit(); } + private _handleOnOffline = () => { + this.state.next(MqttConnectionState.CLOSED); + this._onOffline.emit(); + } + private _handleOnConnect = (e: IOnConnectEvent) => { if (this.options.connectOnCreate === true) { Object.keys(this.observables).forEach((filter: string) => {