Skip to content

Commit

Permalink
Added subscribe option to set the QoS in observe and observeRetained.
Browse files Browse the repository at this point in the history
  • Loading branch information
sclausen committed Jul 11, 2018
1 parent 9862452 commit fd89be8
Showing 1 changed file with 7 additions and 7 deletions.
14 changes: 7 additions & 7 deletions src/mqtt.service.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { EventEmitter, Inject, Injectable } from '@angular/core';
import { ISubscriptionGrant } from './mqtt-types';
import { ISubscriptionGrant, IClientSubscribeOptions } from './mqtt-types';
import { connect } from '../vendor/mqtt.browserified.js';
import * as extend from 'xtend';

Expand Down Expand Up @@ -142,8 +142,8 @@ export class MqttService {
* The last one unsubscribing this filter executes a mqtt unsubscribe.
* Every new subscriber gets the latest message.
*/
public observeRetained(filterString: string): Observable<IMqttMessage> {
return this._generalObserve(filterString, () => publishReplay(1));
public observeRetained(filterString: string, opts: IClientSubscribeOptions = { qos: 1 }): Observable<IMqttMessage> {
return this._generalObserve(filterString, () => publishReplay(1), opts);
}

/**
Expand All @@ -152,8 +152,8 @@ export class MqttService {
* The first one subscribing to the resulting observable executes a mqtt subscribe.
* The last one unsubscribing this filter executes a mqtt unsubscribe.
*/
public observe(filterString: string): Observable<IMqttMessage> {
return this._generalObserve(filterString, () => publish());
public observe(filterString: string, opts: IClientSubscribeOptions = { qos: 1 }): Observable<IMqttMessage> {
return this._generalObserve(filterString, () => publish(), opts);
}

/**
Expand All @@ -164,7 +164,7 @@ export class MqttService {
* Depending on the publish function, the messages will either be replayed after new
* subscribers subscribe or the messages are just passed through
*/
private _generalObserve(filterString: string, publishFn: Function): Observable<IMqttMessage> {
private _generalObserve(filterString: string, publishFn: Function, opts: IClientSubscribeOptions): Observable<IMqttMessage> {
if (!this.client) {
throw new Error('mqtt client not connected');
}
Expand All @@ -175,7 +175,7 @@ export class MqttService {
// refcount is decreased on unsubscribe.
() => {
const subscription: Subscription = new Subscription();
this.client.subscribe(filterString, (err, granted: ISubscriptionGrant[]) => {
this.client.subscribe(filterString, opts, (err, granted: ISubscriptionGrant[]) => {
if (granted) { // granted can be undefined when an error occurs when the client is disconnecting
granted.forEach((granted_: ISubscriptionGrant) => {
if (granted_.qos === 128) {
Expand Down

0 comments on commit fd89be8

Please sign in to comment.