Skip to content

Commit

Permalink
Added new method , which will send the last received message to all n…
Browse files Browse the repository at this point in the history
…ew subscribers.
  • Loading branch information
markdegrootnl committed Dec 2, 2018
1 parent 6081861 commit 8b15804
Showing 1 changed file with 45 additions and 22 deletions.
67 changes: 45 additions & 22 deletions src/mqtt.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { UsingObservable } from 'rxjs/observable/UsingObservable';
import { Subject } from 'rxjs/Subject';
import { Subscription, AnonymousSubscription } from 'rxjs/Subscription';
import { merge } from 'rxjs/observable/merge';
import { filter, publish, refCount } from 'rxjs/operators';
import { filter, publish, publishReplay, refCount } from 'rxjs/operators';

import {
IMqttClient,
Expand Down Expand Up @@ -126,22 +126,44 @@ export class MqttService {
* The observable will only emit messages matching the filter.
* The first one subscribing to the resulting observable executes a mqtt subscribe.
* The last one unsubscribing this filter executes a mqtt unsubscribe.
* @param {string} filter
* @return {Observable<IMqttMessage>} the observable you can subscribe to
* Every new subscriber gets the latest message.
*/
public observeRetained(filterString: string): Observable<IMqttMessage> {
return this._generalObserve(filterString, () => publishReplay());
}

/**
* With this method, you can observe messages for a mqtt topic.
* The observable will only emit messages matching the filter.
* The first one subscribing to the resulting observable executes a mqtt subscribe.
* The last one unsubscribing this filter executes a mqtt unsubscribe.
*/
public observe(filterString: string): Observable<IMqttMessage> {
return this._generalObserve(filterString, () => publish());
}

/**
* With this method, you can observe messages for a mqtt topic.
* The observable will only emit messages matching the filter.
* The first one subscribing to the resulting observable executes a mqtt subscribe.
* The last one unsubscribing this filter executes a mqtt unsubscribe.
* Depending on the publish function, the messages will either be replayed after new
* subscribers subscribe or the messages are just passed through
*/
private _generalObserve(filterString: string, publishFn: Function): Observable<IMqttMessage> {
if (!this.client) {
throw new Error('mqtt client not connected');
}
if (!this.observables[filterString]) {
const rejected = new Subject();
const rejected: Subject<IMqttMessage> = new Subject();
this.observables[filterString] = <Observable<IMqttMessage>>UsingObservable
.create(
// resourceFactory: Do the actual ref-counting MQTT subscription.
// refcount is decreased on unsubscribe.
() => {
const subscription: Subscription = new Subscription();
this.client.subscribe(filterString, (err, granted: ISubscriptionGrant[]) => {
// resourceFactory: Do the actual ref-counting MQTT subscription.
// refcount is decreased on unsubscribe.
() => {
const subscription: Subscription = new Subscription();
this.client.subscribe(filterString, (err, granted: ISubscriptionGrant[]) => {
if (granted) { // granted can be undefined when an error occurs when the client is disconnecting
granted.forEach((granted_: ISubscriptionGrant) => {
if (granted_.qos === 128) {
delete this.observables[granted_.topic];
Expand All @@ -150,22 +172,23 @@ export class MqttService {
}
this._onSuback.emit({ filter: filterString, granted: granted_.qos !== 128 });
});
});
subscription.add(() => {
delete this.observables[filterString];
this.client.unsubscribe(filterString);
});
return subscription;
},
// observableFactory: Create the observable that is consumed from.
// This part is not executed until the Observable returned by
// `observe` gets actually subscribed.
(subscription: AnonymousSubscription) => merge(rejected, this.messages))
}
});
subscription.add(() => {
delete this.observables[filterString];
this.client.unsubscribe(filterString);
});
return subscription;
},
// observableFactory: Create the observable that is consumed from.
// This part is not executed until the Observable returned by
// `observe` gets actually subscribed.
(subscription: AnonymousSubscription) => merge(rejected, this.messages))
.pipe(
filter((msg: IMqttMessage) => MqttService.filterMatchesTopic(filterString, msg.topic)),
publish(),
publishFn(),
refCount()
);
) as Observable<IMqttMessage>;
}
return this.observables[filterString];
}
Expand Down

0 comments on commit 8b15804

Please sign in to comment.