diff --git a/src/mqtt.service.ts b/src/mqtt.service.ts index c8065cb..a252432 100644 --- a/src/mqtt.service.ts +++ b/src/mqtt.service.ts @@ -1,5 +1,5 @@ import { EventEmitter, Inject, Injectable } from '@angular/core'; -import { ISubscriptionGrant } from './mqtt-types'; +import { ISubscriptionGrant, IClientSubscribeOptions } from './mqtt-types'; import { connect } from '../vendor/mqtt.browserified.js'; import * as extend from 'xtend'; @@ -142,8 +142,8 @@ export class MqttService { * The last one unsubscribing this filter executes a mqtt unsubscribe. * Every new subscriber gets the latest message. */ - public observeRetained(filterString: string): Observable { - return this._generalObserve(filterString, () => publishReplay(1)); + public observeRetained(filterString: string, opts: IClientSubscribeOptions = { qos: 1 }): Observable { + return this._generalObserve(filterString, () => publishReplay(1), opts); } /** @@ -152,8 +152,8 @@ export class MqttService { * The first one subscribing to the resulting observable executes a mqtt subscribe. * The last one unsubscribing this filter executes a mqtt unsubscribe. */ - public observe(filterString: string): Observable { - return this._generalObserve(filterString, () => publish()); + public observe(filterString: string, opts: IClientSubscribeOptions = { qos: 1 }): Observable { + return this._generalObserve(filterString, () => publish(), opts); } /** @@ -164,7 +164,7 @@ export class MqttService { * Depending on the publish function, the messages will either be replayed after new * subscribers subscribe or the messages are just passed through */ - private _generalObserve(filterString: string, publishFn: Function): Observable { + private _generalObserve(filterString: string, publishFn: Function, opts: IClientSubscribeOptions): Observable { if (!this.client) { throw new Error('mqtt client not connected'); } @@ -175,7 +175,7 @@ export class MqttService { // refcount is decreased on unsubscribe. () => { const subscription: Subscription = new Subscription(); - this.client.subscribe(filterString, (err, granted: ISubscriptionGrant[]) => { + this.client.subscribe(filterString, opts, (err, granted: ISubscriptionGrant[]) => { if (granted) { // granted can be undefined when an error occurs when the client is disconnecting granted.forEach((granted_: ISubscriptionGrant) => { if (granted_.qos === 128) {