From 8b158042856093356bdd0e9b3aa978ff267ffaa4 Mon Sep 17 00:00:00 2001 From: Mark de Groot Date: Sun, 2 Dec 2018 15:53:08 +0100 Subject: [PATCH] Added new method , which will send the last received message to all new subscribers. --- src/mqtt.service.ts | 67 ++++++++++++++++++++++++++++++--------------- 1 file changed, 45 insertions(+), 22 deletions(-) diff --git a/src/mqtt.service.ts b/src/mqtt.service.ts index a7b7db5..0469135 100644 --- a/src/mqtt.service.ts +++ b/src/mqtt.service.ts @@ -9,7 +9,7 @@ import { UsingObservable } from 'rxjs/observable/UsingObservable'; import { Subject } from 'rxjs/Subject'; import { Subscription, AnonymousSubscription } from 'rxjs/Subscription'; import { merge } from 'rxjs/observable/merge'; -import { filter, publish, refCount } from 'rxjs/operators'; +import { filter, publish, publishReplay, refCount } from 'rxjs/operators'; import { IMqttClient, @@ -126,22 +126,44 @@ export class MqttService { * The observable will only emit messages matching the filter. * The first one subscribing to the resulting observable executes a mqtt subscribe. * The last one unsubscribing this filter executes a mqtt unsubscribe. - * @param {string} filter - * @return {Observable} the observable you can subscribe to + * Every new subscriber gets the latest message. + */ + public observeRetained(filterString: string): Observable { + return this._generalObserve(filterString, () => publishReplay()); + } + + /** + * With this method, you can observe messages for a mqtt topic. + * The observable will only emit messages matching the filter. + * The first one subscribing to the resulting observable executes a mqtt subscribe. + * The last one unsubscribing this filter executes a mqtt unsubscribe. */ public observe(filterString: string): Observable { + return this._generalObserve(filterString, () => publish()); + } + + /** + * With this method, you can observe messages for a mqtt topic. + * The observable will only emit messages matching the filter. + * The first one subscribing to the resulting observable executes a mqtt subscribe. + * The last one unsubscribing this filter executes a mqtt unsubscribe. + * Depending on the publish function, the messages will either be replayed after new + * subscribers subscribe or the messages are just passed through + */ + private _generalObserve(filterString: string, publishFn: Function): Observable { if (!this.client) { throw new Error('mqtt client not connected'); } if (!this.observables[filterString]) { - const rejected = new Subject(); + const rejected: Subject = new Subject(); this.observables[filterString] = >UsingObservable .create( - // resourceFactory: Do the actual ref-counting MQTT subscription. - // refcount is decreased on unsubscribe. - () => { - const subscription: Subscription = new Subscription(); - this.client.subscribe(filterString, (err, granted: ISubscriptionGrant[]) => { + // resourceFactory: Do the actual ref-counting MQTT subscription. + // refcount is decreased on unsubscribe. + () => { + const subscription: Subscription = new Subscription(); + this.client.subscribe(filterString, (err, granted: ISubscriptionGrant[]) => { + if (granted) { // granted can be undefined when an error occurs when the client is disconnecting granted.forEach((granted_: ISubscriptionGrant) => { if (granted_.qos === 128) { delete this.observables[granted_.topic]; @@ -150,22 +172,23 @@ export class MqttService { } this._onSuback.emit({ filter: filterString, granted: granted_.qos !== 128 }); }); - }); - subscription.add(() => { - delete this.observables[filterString]; - this.client.unsubscribe(filterString); - }); - return subscription; - }, - // observableFactory: Create the observable that is consumed from. - // This part is not executed until the Observable returned by - // `observe` gets actually subscribed. - (subscription: AnonymousSubscription) => merge(rejected, this.messages)) + } + }); + subscription.add(() => { + delete this.observables[filterString]; + this.client.unsubscribe(filterString); + }); + return subscription; + }, + // observableFactory: Create the observable that is consumed from. + // This part is not executed until the Observable returned by + // `observe` gets actually subscribed. + (subscription: AnonymousSubscription) => merge(rejected, this.messages)) .pipe( filter((msg: IMqttMessage) => MqttService.filterMatchesTopic(filterString, msg.topic)), - publish(), + publishFn(), refCount() - ); + ) as Observable; } return this.observables[filterString]; }