From 65891fa4b7c708657fe3f51b389af2109c4a66bd Mon Sep 17 00:00:00 2001 From: Sebastian Clausen Date: Wed, 20 Jun 2018 10:47:23 +0200 Subject: [PATCH] Added new method , which will send the last received message to all new subscribers. --- package.json | 2 +- src/mqtt.service.ts | 44 +++++++++++++++++++++------------- tests/mqtt.service.spec.ts | 49 +++++++++++++++++++++----------------- 3 files changed, 56 insertions(+), 39 deletions(-) diff --git a/package.json b/package.json index cc230fd..7f539b3 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "ngx-mqtt", - "version": "6.2.0", + "version": "6.3.0", "description": "ngx mqtt client library", "main": "bundles/ngx-mqtt.min.js", "module": "./src/index.js", diff --git a/src/mqtt.service.ts b/src/mqtt.service.ts index 8d667aa..d886a81 100644 --- a/src/mqtt.service.ts +++ b/src/mqtt.service.ts @@ -11,10 +11,13 @@ import { Subscription, Subject, Unsubscribable, - using + using, + UnaryFunction } from 'rxjs'; import { filter, + publish, + publishReplay, refCount } from 'rxjs/operators'; @@ -31,7 +34,6 @@ import { } from './mqtt.model'; import { MqttServiceConfig, MqttClientService } from './index'; -import { publishReplayConditionally } from './rxjs/operators/publishReplayConditionally'; /** * With an instance of MqttService, you can observe and subscribe to MQTT in multiple places, e.g. in different components, @@ -66,8 +68,6 @@ export class MqttService { * The constructor needs [connection options]{@link IMqttServiceOptions} regarding the broker and some * options to configure behavior of this service, like if the connection to the broker * should be established on creation of this service or not. - * @param options connection and creation options for MQTT.js and this service - * @param client an instance of IMqttClient */ constructor( @Inject(MqttServiceConfig) private options: IMqttServiceOptions, @@ -82,8 +82,6 @@ export class MqttService { /** * connect manually connects to the mqtt broker. - * @param opts the connection options - * @param client an optional IMqttClient */ public connect(opts?: IMqttServiceOptions, client?: IMqttClient) { const options = extend(this.options || {}, opts); @@ -142,10 +140,31 @@ export class MqttService { * The observable will only emit messages matching the filter. * The first one subscribing to the resulting observable executes a mqtt subscribe. * The last one unsubscribing this filter executes a mqtt unsubscribe. - * @param {string} filter - * @return {Observable} the observable you can subscribe to + * Every new subscriber gets the latest message. + */ + public observeRetained(filterString: string): Observable { + return this._generalObserve(filterString, () => publishReplay()); + } + + /** + * With this method, you can observe messages for a mqtt topic. + * The observable will only emit messages matching the filter. + * The first one subscribing to the resulting observable executes a mqtt subscribe. + * The last one unsubscribing this filter executes a mqtt unsubscribe. */ public observe(filterString: string): Observable { + return this._generalObserve(filterString, () => publish()); + } + + /** + * With this method, you can observe messages for a mqtt topic. + * The observable will only emit messages matching the filter. + * The first one subscribing to the resulting observable executes a mqtt subscribe. + * The last one unsubscribing this filter executes a mqtt unsubscribe. + * Depending on the publish function, the messages will either be replayed after new + * subscribers subscribe or the messages are just passed through + */ + private _generalObserve(filterString: string, publishFn: Function): Observable { if (!this.client) { throw new Error('mqtt client not connected'); } @@ -180,7 +199,7 @@ export class MqttService { (subscription: Unsubscribable | void) => merge(rejected, this.messages)) .pipe( filter((msg: IMqttMessage) => MqttService.filterMatchesTopic(filterString, msg.topic)), - publishReplayConditionally(1, undefined, undefined, undefined, (msg: IMqttMessage) => msg.retain === true), + publishFn(), refCount() ) as Observable; } @@ -191,10 +210,6 @@ export class MqttService { * This method publishes a message for a topic with optional options. * The returned observable will complete, if publishing was successful * and will throw an error, if the publication fails - * @param {string} topic - * @param {any} message - * @param {PublishOptions} options - * @return {Observable} */ public publish(topic: string, message: any, options?: IPublishOptions): Observable { if (!this.client) { @@ -215,9 +230,6 @@ export class MqttService { /** * This method publishes a message for a topic with optional options. * If an error occurs, it will throw. - * @param {string} topic - * @param {any} message - * @param {PublishOptions} options */ public unsafePublish(topic: string, message: any, options?: IPublishOptions): void { if (!this.client) { diff --git a/tests/mqtt.service.spec.ts b/tests/mqtt.service.spec.ts index 88f668d..cc2dbc0 100644 --- a/tests/mqtt.service.spec.ts +++ b/tests/mqtt.service.spec.ts @@ -17,12 +17,14 @@ import { const config: IMqttServiceOptions = { connectOnCreate: true, - hostname: 'test.mosquitto.org', - port: 8080, + hostname: 'localhost', + port: 9001, + // hostname: 'test.mosquitto.org', + // port: 8080, path: '' }; -const uuid = generateUuid(); +const currentUuid = generateUuid(); let originalTimeout; let mqttService: MqttService; @@ -57,10 +59,10 @@ describe('MqttService', () => { it('#connect', (done) => { mqttService.disconnect(true); - mqttService.connect({ ...config, clientId: 'connect' + uuid }); + mqttService.connect({ ...config, clientId: 'connect' + currentUuid }); mqttService.state.pipe(skip(2)).subscribe(state => { expect(state).toBe(MqttConnectionState.CONNECTED); - expect(mqttService.clientId).toBe('connect' + uuid); + expect(mqttService.clientId).toBe('connect' + currentUuid); done(); }); }); @@ -84,17 +86,17 @@ describe('MqttService', () => { }); it('#publish', (done) => { - mqttService.observe('ngx-mqtt/tests/publish/' + uuid).subscribe((_: IMqttMessage) => { + mqttService.observe('ngx-mqtt/tests/publish/' + currentUuid).subscribe((_: IMqttMessage) => { done(); }); - mqttService.publish('ngx-mqtt/tests/publish/' + uuid, 'publish').subscribe(noop); + mqttService.publish('ngx-mqtt/tests/publish/' + currentUuid, 'publish').subscribe(noop); }); it('#unsafePublish', (done) => { - mqttService.observe('ngx-mqtt/tests/unsafePublish/' + uuid).subscribe((_: IMqttMessage) => { + mqttService.observe('ngx-mqtt/tests/unsafePublish/' + currentUuid).subscribe((_: IMqttMessage) => { done(); }); - mqttService.unsafePublish('ngx-mqtt/tests/unsafePublish/' + uuid, 'unsafePublish'); + mqttService.unsafePublish('ngx-mqtt/tests/unsafePublish/' + currentUuid, 'unsafePublish'); }); @@ -150,6 +152,7 @@ describe('MqttService', () => { describe('MqttService Retained Behavior', () => { it('emit the retained message for all current and new subscribers', (done) => { let counter = 0; + const topic = 'ngx-mqtt/tests/retained/' + currentUuid; const mqttSubscriptions: IMqttSubscription[] = []; function observe(): void { @@ -158,13 +161,15 @@ describe('MqttService Retained Behavior', () => { payload: null }; s.subscription = mqttService - .observe('topic') - .pipe( - map((v: IMqttMessage) => v.payload), - ) - .subscribe(msg => { s.payload = msg; }); + .observeRetained(topic) + .pipe(map((v: IMqttMessage) => v.payload)) + .subscribe(msg => { + s.payload = msg; + }); mqttSubscriptions.push(s); } + mqttService.unsafePublish(topic, 'foobar', { retain: true, qos: 0 }); + interface IMqttSubscription { subscription?: Subscription; id: number; @@ -172,30 +177,30 @@ describe('MqttService Retained Behavior', () => { } observe(); - for (let i = 0; i < 10; i++) { - setTimeout(() => observe(), i * 100); - } + setTimeout(() => observe(), 100); + setTimeout(() => observe(), 200); setTimeout(() => { mqttSubscriptions.map((s: IMqttSubscription) => { expect(s.payload).toBeTruthy(); }); done(); - }, 2000); + }, 300); }); it('do not emit not retained message on late subscribe', (done) => { + const topic = 'ngx-mqtt/tests/notRetained/' + currentUuid; let lateMessage: IMqttMessage; // this message should never occur - mqttService.observe('notretained').subscribe((msg1: IMqttMessage) => { + mqttService.observe(topic).subscribe((msg1: IMqttMessage) => { expect(msg1).toBeDefined(); - mqttService.observe('notretained').subscribe((msg2: IMqttMessage) => lateMessage = msg2); + mqttService.observe(topic).subscribe((msg2: IMqttMessage) => lateMessage = msg2); setTimeout(() => { expect(lateMessage).toBeUndefined(); done(); }, 1000); }); setTimeout(() => { - mqttService.unsafePublish('notretained', 'foobar'); + mqttService.unsafePublish(topic, 'foobar'); }, 1000); }); }); @@ -236,7 +241,7 @@ function generateUuid() { random = Math.random() * 16 | 0; if (i === 8 || i === 12 || i === 16 || i === 20) { - uuid += '-' + uuid += '-'; } uuid += (i === 12 ? 4 : (i === 16 ? (random & 3 | 8) : random)).toString(16); }