Skip to content

Commit

Permalink
Added new method , which will send the last received message to all n…
Browse files Browse the repository at this point in the history
…ew subscribers.
  • Loading branch information
sclausen committed Jun 20, 2018
1 parent 65533b3 commit 65891fa
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 39 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "ngx-mqtt",
"version": "6.2.0",
"version": "6.3.0",
"description": "ngx mqtt client library",
"main": "bundles/ngx-mqtt.min.js",
"module": "./src/index.js",
Expand Down
44 changes: 28 additions & 16 deletions src/mqtt.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@ import {
Subscription,
Subject,
Unsubscribable,
using
using,
UnaryFunction
} from 'rxjs';
import {
filter,
publish,
publishReplay,
refCount
} from 'rxjs/operators';

Expand All @@ -31,7 +34,6 @@ import {
} from './mqtt.model';

import { MqttServiceConfig, MqttClientService } from './index';
import { publishReplayConditionally } from './rxjs/operators/publishReplayConditionally';

/**
* With an instance of MqttService, you can observe and subscribe to MQTT in multiple places, e.g. in different components,
Expand Down Expand Up @@ -66,8 +68,6 @@ export class MqttService {
* The constructor needs [connection options]{@link IMqttServiceOptions} regarding the broker and some
* options to configure behavior of this service, like if the connection to the broker
* should be established on creation of this service or not.
* @param options connection and creation options for MQTT.js and this service
* @param client an instance of IMqttClient
*/
constructor(
@Inject(MqttServiceConfig) private options: IMqttServiceOptions,
Expand All @@ -82,8 +82,6 @@ export class MqttService {

/**
* connect manually connects to the mqtt broker.
* @param opts the connection options
* @param client an optional IMqttClient
*/
public connect(opts?: IMqttServiceOptions, client?: IMqttClient) {
const options = extend(this.options || {}, opts);
Expand Down Expand Up @@ -142,10 +140,31 @@ export class MqttService {
* The observable will only emit messages matching the filter.
* The first one subscribing to the resulting observable executes a mqtt subscribe.
* The last one unsubscribing this filter executes a mqtt unsubscribe.
* @param {string} filter
* @return {Observable<IMqttMessage>} the observable you can subscribe to
* Every new subscriber gets the latest message.
*/
public observeRetained(filterString: string): Observable<IMqttMessage> {
return this._generalObserve(filterString, () => publishReplay());
}

/**
* With this method, you can observe messages for a mqtt topic.
* The observable will only emit messages matching the filter.
* The first one subscribing to the resulting observable executes a mqtt subscribe.
* The last one unsubscribing this filter executes a mqtt unsubscribe.
*/
public observe(filterString: string): Observable<IMqttMessage> {
return this._generalObserve(filterString, () => publish());
}

/**
* With this method, you can observe messages for a mqtt topic.
* The observable will only emit messages matching the filter.
* The first one subscribing to the resulting observable executes a mqtt subscribe.
* The last one unsubscribing this filter executes a mqtt unsubscribe.
* Depending on the publish function, the messages will either be replayed after new
* subscribers subscribe or the messages are just passed through
*/
private _generalObserve(filterString: string, publishFn: Function): Observable<IMqttMessage> {
if (!this.client) {
throw new Error('mqtt client not connected');
}
Expand Down Expand Up @@ -180,7 +199,7 @@ export class MqttService {
(subscription: Unsubscribable | void) => merge(rejected, this.messages))
.pipe(
filter((msg: IMqttMessage) => MqttService.filterMatchesTopic(filterString, msg.topic)),
publishReplayConditionally(1, undefined, undefined, undefined, (msg: IMqttMessage) => msg.retain === true),
publishFn(),
refCount()
) as Observable<IMqttMessage>;
}
Expand All @@ -191,10 +210,6 @@ export class MqttService {
* This method publishes a message for a topic with optional options.
* The returned observable will complete, if publishing was successful
* and will throw an error, if the publication fails
* @param {string} topic
* @param {any} message
* @param {PublishOptions} options
* @return {Observable<void>}
*/
public publish(topic: string, message: any, options?: IPublishOptions): Observable<void> {
if (!this.client) {
Expand All @@ -215,9 +230,6 @@ export class MqttService {
/**
* This method publishes a message for a topic with optional options.
* If an error occurs, it will throw.
* @param {string} topic
* @param {any} message
* @param {PublishOptions} options
*/
public unsafePublish(topic: string, message: any, options?: IPublishOptions): void {
if (!this.client) {
Expand Down
49 changes: 27 additions & 22 deletions tests/mqtt.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ import {

const config: IMqttServiceOptions = {
connectOnCreate: true,
hostname: 'test.mosquitto.org',
port: 8080,
hostname: 'localhost',
port: 9001,
// hostname: 'test.mosquitto.org',
// port: 8080,
path: ''
};

const uuid = generateUuid();
const currentUuid = generateUuid();
let originalTimeout;
let mqttService: MqttService;

Expand Down Expand Up @@ -57,10 +59,10 @@ describe('MqttService', () => {

it('#connect', (done) => {
mqttService.disconnect(true);
mqttService.connect({ ...config, clientId: 'connect' + uuid });
mqttService.connect({ ...config, clientId: 'connect' + currentUuid });
mqttService.state.pipe(skip(2)).subscribe(state => {
expect(state).toBe(MqttConnectionState.CONNECTED);
expect(mqttService.clientId).toBe('connect' + uuid);
expect(mqttService.clientId).toBe('connect' + currentUuid);
done();
});
});
Expand All @@ -84,17 +86,17 @@ describe('MqttService', () => {
});

it('#publish', (done) => {
mqttService.observe('ngx-mqtt/tests/publish/' + uuid).subscribe((_: IMqttMessage) => {
mqttService.observe('ngx-mqtt/tests/publish/' + currentUuid).subscribe((_: IMqttMessage) => {
done();
});
mqttService.publish('ngx-mqtt/tests/publish/' + uuid, 'publish').subscribe(noop);
mqttService.publish('ngx-mqtt/tests/publish/' + currentUuid, 'publish').subscribe(noop);
});

it('#unsafePublish', (done) => {
mqttService.observe('ngx-mqtt/tests/unsafePublish/' + uuid).subscribe((_: IMqttMessage) => {
mqttService.observe('ngx-mqtt/tests/unsafePublish/' + currentUuid).subscribe((_: IMqttMessage) => {
done();
});
mqttService.unsafePublish('ngx-mqtt/tests/unsafePublish/' + uuid, 'unsafePublish');
mqttService.unsafePublish('ngx-mqtt/tests/unsafePublish/' + currentUuid, 'unsafePublish');
});


Expand Down Expand Up @@ -150,6 +152,7 @@ describe('MqttService', () => {
describe('MqttService Retained Behavior', () => {
it('emit the retained message for all current and new subscribers', (done) => {
let counter = 0;
const topic = 'ngx-mqtt/tests/retained/' + currentUuid;
const mqttSubscriptions: IMqttSubscription[] = [];

function observe(): void {
Expand All @@ -158,44 +161,46 @@ describe('MqttService Retained Behavior', () => {
payload: null
};
s.subscription = mqttService
.observe('topic')
.pipe(
map((v: IMqttMessage) => v.payload),
)
.subscribe(msg => { s.payload = msg; });
.observeRetained(topic)
.pipe(map((v: IMqttMessage) => v.payload))
.subscribe(msg => {
s.payload = msg;
});
mqttSubscriptions.push(s);
}
mqttService.unsafePublish(topic, 'foobar', { retain: true, qos: 0 });

interface IMqttSubscription {
subscription?: Subscription;
id: number;
payload: any;
}

observe();
for (let i = 0; i < 10; i++) {
setTimeout(() => observe(), i * 100);
}
setTimeout(() => observe(), 100);
setTimeout(() => observe(), 200);

setTimeout(() => {
mqttSubscriptions.map((s: IMqttSubscription) => {
expect(s.payload).toBeTruthy();
});
done();
}, 2000);
}, 300);
});

it('do not emit not retained message on late subscribe', (done) => {
const topic = 'ngx-mqtt/tests/notRetained/' + currentUuid;
let lateMessage: IMqttMessage; // this message should never occur
mqttService.observe('notretained').subscribe((msg1: IMqttMessage) => {
mqttService.observe(topic).subscribe((msg1: IMqttMessage) => {
expect(msg1).toBeDefined();
mqttService.observe('notretained').subscribe((msg2: IMqttMessage) => lateMessage = msg2);
mqttService.observe(topic).subscribe((msg2: IMqttMessage) => lateMessage = msg2);
setTimeout(() => {
expect(lateMessage).toBeUndefined();
done();
}, 1000);
});
setTimeout(() => {
mqttService.unsafePublish('notretained', 'foobar');
mqttService.unsafePublish(topic, 'foobar');
}, 1000);
});
});
Expand Down Expand Up @@ -236,7 +241,7 @@ function generateUuid() {
random = Math.random() * 16 | 0;

if (i === 8 || i === 12 || i === 16 || i === 20) {
uuid += '-'
uuid += '-';
}
uuid += (i === 12 ? 4 : (i === 16 ? (random & 3 | 8) : random)).toString(16);
}
Expand Down

0 comments on commit 65891fa

Please sign in to comment.