From 30a07a2817a7a6599bfe09f7d4cb63fdf0d9a631 Mon Sep 17 00:00:00 2001 From: sclausen Date: Mon, 16 Apr 2018 07:28:33 +0200 Subject: [PATCH] End the client, if manually executing connect, but already have an internal client. --- README.md | 4 +- package.json | 2 +- src/index.ts | 4 +- src/mqtt.model.ts | 18 +++---- src/mqtt.service.ts | 116 +++++++++++++++++++++++--------------------- 5 files changed, 74 insertions(+), 70 deletions(-) diff --git a/README.md b/README.md index 1038c25..62dcd58 100644 --- a/README.md +++ b/README.md @@ -57,10 +57,10 @@ import { MqttMessage, MqttModule, MqttService, - MqttServiceOptions + IMqttServiceOptions } from 'ngx-mqtt'; -export const MQTT_SERVICE_OPTIONS: MqttServiceOptions = { +export const MQTT_SERVICE_OPTIONS: IMqttServiceOptions = { hostname: 'localhost', port: 9001, path: '/mqtt' diff --git a/package.json b/package.json index 302a5da..7a37160 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "ngx-mqtt", - "version": "5.1.0", + "version": "5.2.0", "description": "ngx mqtt client library", "main": "bundles/ngx-mqtt.min.js", "module": "./src/index.js", diff --git a/src/index.ts b/src/index.ts index a4a17f5..43b3ea0 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,12 +1,12 @@ import { NgModule, ModuleWithProviders } from '@angular/core'; import { MqttService } from './mqtt.service'; -import { MqttServiceOptions } from './mqtt.model'; +import { IMqttServiceOptions } from './mqtt.model'; export * from './mqtt.service'; export * from './mqtt.model'; -export const MQTT_SERVICE_OPTIONS: MqttServiceOptions = { +export const MQTT_SERVICE_OPTIONS: IMqttServiceOptions = { connectOnCreate: true, hostname: 'localhost', port: 1884, diff --git a/src/mqtt.model.ts b/src/mqtt.model.ts index b376752..21bafde 100644 --- a/src/mqtt.model.ts +++ b/src/mqtt.model.ts @@ -1,4 +1,4 @@ -import * as MQTT from 'mqtt'; +import { IClientOptions, IClientPublishOptions, IPacket, MqttClient } from 'mqtt'; import { Stream } from 'stream'; export enum MqttConnectionState { @@ -7,7 +7,7 @@ export enum MqttConnectionState { CONNECTED } -export interface MqttServiceOptions extends MQTT.IClientOptions { +export interface IMqttServiceOptions extends IClientOptions { /** wether a new connection should be created * on creating an instance of the service */ connectOnCreate?: boolean; @@ -20,7 +20,7 @@ export interface MqttServiceOptions extends MQTT.IClientOptions { protocol?: 'wss' | 'ws'; } -export interface MqttMessage extends MQTT.IPacket { +export interface IMqttMessage extends IPacket { /** the mqtt topic to which this message was published to */ topic: string; /** the payload */ @@ -33,15 +33,15 @@ export interface MqttMessage extends MQTT.IPacket { dup: boolean; } -export interface PublishOptions extends MQTT.IClientPublishOptions { } -export interface OnConnectEvent extends MqttMessage { } -export interface OnErrorEvent extends Error { } -export interface OnMessageEvent extends MqttMessage { } -export interface OnSubackEvent { +export interface IPublishOptions extends IClientPublishOptions { } +export interface IOnConnectEvent extends IMqttMessage { } +export interface IOnErrorEvent extends Error { } +export interface IOnMessageEvent extends IMqttMessage { } +export interface IOnSubackEvent { granted: boolean; filter: string; } -export interface MqttClient extends MQTT.Client { +export interface IMqttClient extends MqttClient { stream: Stream; } \ No newline at end of file diff --git a/src/mqtt.service.ts b/src/mqtt.service.ts index 31c522e..a448b87 100644 --- a/src/mqtt.service.ts +++ b/src/mqtt.service.ts @@ -1,5 +1,5 @@ import { EventEmitter } from '@angular/core'; -import * as MQTT from 'mqtt'; +import { connect, ISubscriptionGrant } from 'mqtt'; import * as extend from 'xtend'; import { BehaviorSubject } from 'rxjs/BehaviorSubject'; @@ -12,15 +12,15 @@ import { merge } from 'rxjs/observable/merge'; import { filter, publishReplay, refCount } from 'rxjs/operators'; import { - MqttClient, + IMqttClient, MqttConnectionState, - MqttMessage, - MqttServiceOptions, - OnConnectEvent, - OnErrorEvent, - OnMessageEvent, - OnSubackEvent, - PublishOptions + IMqttMessage, + IMqttServiceOptions, + IOnConnectEvent, + IOnErrorEvent, + IOnMessageEvent, + IOnSubackEvent, + IPublishOptions } from './mqtt.model'; /** @@ -30,11 +30,11 @@ import { */ export class MqttService { /** a map of all mqtt observables by filter */ - public observables: { [filter: string]: Observable } = {}; + public observables: { [filter: string]: Observable } = {}; /** the connection state */ public state: BehaviorSubject = new BehaviorSubject(MqttConnectionState.CLOSED); /** an observable of the last mqtt message */ - public messages: Subject = new Subject(); + public messages: Subject = new Subject(); private _clientId = this._generateClientId(); private _keepalive = 10; @@ -42,21 +42,21 @@ export class MqttService { private _reconnectPeriod = 10000; private _url: string; - private _onConnect: EventEmitter = new EventEmitter(); + private _onConnect: EventEmitter = new EventEmitter(); private _onClose: EventEmitter = new EventEmitter(); - private _onError: EventEmitter = new EventEmitter(); + private _onError: EventEmitter = new EventEmitter(); private _onReconnect: EventEmitter = new EventEmitter(); - private _onMessage: EventEmitter = new EventEmitter(); - private _onSuback: EventEmitter = new EventEmitter(); + private _onMessage: EventEmitter = new EventEmitter(); + private _onSuback: EventEmitter = new EventEmitter(); /** - * The constructor needs [connection options]{@link MqttServiceOptions} regarding the broker and some + * The constructor needs [connection options]{@link IMqttServiceOptions} regarding the broker and some * options to configure behavior of this service, like if the connection to the broker * should be established on creation of this service or not. * @param options connection and creation options for MQTT.js and this service - * @param client an instance of MqttClient + * @param client an instance of IMqttClient */ - constructor(private options: MqttServiceOptions, private client?: MqttClient) { + constructor(private options: IMqttServiceOptions, private client?: IMqttClient) { if (options.connectOnCreate !== false) { this.connect({}, client); } @@ -67,9 +67,9 @@ export class MqttService { /** * connect manually connects to the mqtt broker. * @param opts the connection options - * @param client an optional MqttClient + * @param client an optional IMqttClient */ - public connect(opts?: MqttServiceOptions, client?: MqttClient) { + public connect(opts?: IMqttServiceOptions, client?: IMqttClient) { const options = extend(this.options || {}, opts); const protocol = options.protocol || 'ws'; const hostname = options.hostname || 'localhost'; @@ -84,8 +84,12 @@ export class MqttService { connectTimeout: this._connectTimeout }, options); + if (this.client) { + this.client.end(true); + } + if (!client) { - this.client = MQTT.connect(this._url, mergedOptions); + this.client = connect(this._url, mergedOptions); } else { this.client = client; } @@ -123,42 +127,42 @@ export class MqttService { * The first one subscribing to the resulting observable executes a mqtt subscribe. * The last one unsubscribing this filter executes a mqtt unsubscribe. * @param {string} filter - * @return {Observable} the observable you can subscribe to + * @return {Observable} the observable you can subscribe to */ - public observe(filterString: string): Observable { + public observe(filterString: string): Observable { if (!this.client) { throw new Error('mqtt client not connected'); } if (!this.observables[filterString]) { const rejected = new Subject(); - this.observables[filterString] = >UsingObservable + this.observables[filterString] = >UsingObservable .create( - // resourceFactory: Do the actual ref-counting MQTT subscription. - // refcount is decreased on unsubscribe. - () => { - const subscription: Subscription = new Subscription(); - this.client.subscribe(filterString, (err, granted: MQTT.ISubscriptionGrant[]) => { - granted.forEach((granted_: MQTT.ISubscriptionGrant) => { - if (granted_.qos === 128) { - delete this.observables[granted_.topic]; - this.client.unsubscribe(granted_.topic); - rejected.error(`subscription for '${granted_.topic}' rejected!`); - } - this._onSuback.emit({filter: filterString, granted: granted_.qos !== 128}); + // resourceFactory: Do the actual ref-counting MQTT subscription. + // refcount is decreased on unsubscribe. + () => { + const subscription: Subscription = new Subscription(); + this.client.subscribe(filterString, (err, granted: ISubscriptionGrant[]) => { + granted.forEach((granted_: ISubscriptionGrant) => { + if (granted_.qos === 128) { + delete this.observables[granted_.topic]; + this.client.unsubscribe(granted_.topic); + rejected.error(`subscription for '${granted_.topic}' rejected!`); + } + this._onSuback.emit({ filter: filterString, granted: granted_.qos !== 128 }); + }); + }); + subscription.add(() => { + delete this.observables[filterString]; + this.client.unsubscribe(filterString); }); - }); - subscription.add(() => { - delete this.observables[filterString]; - this.client.unsubscribe(filterString); - }); - return subscription; - }, - // observableFactory: Create the observable that is consumed from. - // This part is not executed until the Observable returned by - // `observe` gets actually subscribed. - (subscription: AnonymousSubscription) => merge(rejected, this.messages)) + return subscription; + }, + // observableFactory: Create the observable that is consumed from. + // This part is not executed until the Observable returned by + // `observe` gets actually subscribed. + (subscription: AnonymousSubscription) => merge(rejected, this.messages)) .pipe( - filter((msg: MqttMessage) => MqttService.filterMatchesTopic(filterString, msg.topic)), + filter((msg: IMqttMessage) => MqttService.filterMatchesTopic(filterString, msg.topic)), publishReplay(1), refCount() ); @@ -175,7 +179,7 @@ export class MqttService { * @param {PublishOptions} options * @return {Observable} */ - public publish(topic: string, message: any, options?: PublishOptions): Observable { + public publish(topic: string, message: any, options?: IPublishOptions): Observable { if (!this.client) { throw new Error('mqtt client not connected'); } @@ -198,7 +202,7 @@ export class MqttService { * @param {any} message * @param {PublishOptions} options */ - public unsafePublish(topic: string, message: any, options?: PublishOptions): void { + public unsafePublish(topic: string, message: any, options?: IPublishOptions): void { if (!this.client) { throw new Error('mqtt client not connected'); } @@ -252,7 +256,7 @@ export class MqttService { } /** An EventEmitter to listen to connect messages */ - public get onConnect(): EventEmitter { + public get onConnect(): EventEmitter { return this._onConnect; } @@ -262,17 +266,17 @@ export class MqttService { } /** An EventEmitter to listen to message events */ - public get onMessage(): EventEmitter { + public get onMessage(): EventEmitter { return this._onMessage; } /** An EventEmitter to listen to suback events */ - public get onSuback(): EventEmitter { + public get onSuback(): EventEmitter { return this._onSuback; } /** An EventEmitter to listen to error events */ - public get onError(): EventEmitter { + public get onError(): EventEmitter { return this._onError; } @@ -281,7 +285,7 @@ export class MqttService { this._onClose.emit(); } - private _handleOnConnect = (e: OnConnectEvent) => { + private _handleOnConnect = (e: IOnConnectEvent) => { Object.keys(this.observables).forEach((filter: string) => { this.client.subscribe(filter); }); @@ -297,7 +301,7 @@ export class MqttService { this._onReconnect.emit(); } - private _handleOnError = (e: OnErrorEvent) => { + private _handleOnError = (e: IOnErrorEvent) => { this._onError.emit(e); console.error(e); }