Skip to content

Commit

Permalink
MqttService.publish() emits empty value if publishing was successful
Browse files Browse the repository at this point in the history
  • Loading branch information
stepanmracek authored and sclausen committed Jul 6, 2018
1 parent d05ac86 commit 9862452
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 4 deletions.
5 changes: 3 additions & 2 deletions src/mqtt.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ export class MqttService {

/**
* This method publishes a message for a topic with optional options.
* The returned observable will complete, if publishing was successful
* and will throw an error, if the publication fails
* The returned observable will emit empty value and complete, if publishing was successful
* and will throw an error, if the publication fails.
*/
public publish(topic: string, message: any, options?: IPublishOptions): Observable<void> {
if (!this.client) {
Expand All @@ -220,6 +220,7 @@ export class MqttService {
if (err) {
obs.error(err);
} else {
obs.next(null);
obs.complete();
}
});
Expand Down
19 changes: 17 additions & 2 deletions tests/mqtt.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { TestBed } from '@angular/core/testing';

import { skip, map } from 'rxjs/operators';
import { noop, Subscription } from 'rxjs';
import { skip, map, mergeMap, scan } from 'rxjs/operators';
import { noop, Subscription, of } from 'rxjs';

import { MqttService } from '../src/mqtt.service';
import { MqttServiceConfig, MqttClientService } from '../src/mqtt.module';
Expand Down Expand Up @@ -90,6 +90,21 @@ describe('MqttService', () => {
mqttService.publish('ngx-mqtt/tests/publish/' + currentUuid, 'publish').subscribe(noop);
});

it('#pipeablePublish', (done) => {
mqttService.observe('ngx-mqtt/tests/pipeablePublish/' + currentUuid).pipe(
scan<IMqttMessage, number>(acc => { return acc + 1; }, 0)
).subscribe(count => {
if (count === 3) {
done();
}
});
of(null).pipe(
mergeMap(i => mqttService.publish('ngx-mqtt/tests/pipeablePublish/' + currentUuid, 'publish1')),
mergeMap(i => mqttService.publish('ngx-mqtt/tests/pipeablePublish/' + currentUuid, 'publish2')),
mergeMap(i => mqttService.publish('ngx-mqtt/tests/pipeablePublish/' + currentUuid, 'publish3'))
).subscribe();
});

it('#unsafePublish', (done) => {
mqttService.observe('ngx-mqtt/tests/unsafePublish/' + currentUuid).subscribe((_: IMqttMessage) => {
done();
Expand Down

0 comments on commit 9862452

Please sign in to comment.