From 98624524eeddc623b7ea2ba1e03c84f4bd2dc67a Mon Sep 17 00:00:00 2001 From: Stepan Mracek Date: Fri, 6 Jul 2018 14:31:37 +0200 Subject: [PATCH] MqttService.publish() emits empty value if publishing was successful --- src/mqtt.service.ts | 5 +++-- tests/mqtt.service.spec.ts | 19 +++++++++++++++++-- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/src/mqtt.service.ts b/src/mqtt.service.ts index 729ccb7..c8065cb 100644 --- a/src/mqtt.service.ts +++ b/src/mqtt.service.ts @@ -208,8 +208,8 @@ export class MqttService { /** * This method publishes a message for a topic with optional options. - * The returned observable will complete, if publishing was successful - * and will throw an error, if the publication fails + * The returned observable will emit empty value and complete, if publishing was successful + * and will throw an error, if the publication fails. */ public publish(topic: string, message: any, options?: IPublishOptions): Observable { if (!this.client) { @@ -220,6 +220,7 @@ export class MqttService { if (err) { obs.error(err); } else { + obs.next(null); obs.complete(); } }); diff --git a/tests/mqtt.service.spec.ts b/tests/mqtt.service.spec.ts index fc8ed12..d52e479 100644 --- a/tests/mqtt.service.spec.ts +++ b/tests/mqtt.service.spec.ts @@ -1,7 +1,7 @@ import { TestBed } from '@angular/core/testing'; -import { skip, map } from 'rxjs/operators'; -import { noop, Subscription } from 'rxjs'; +import { skip, map, mergeMap, scan } from 'rxjs/operators'; +import { noop, Subscription, of } from 'rxjs'; import { MqttService } from '../src/mqtt.service'; import { MqttServiceConfig, MqttClientService } from '../src/mqtt.module'; @@ -90,6 +90,21 @@ describe('MqttService', () => { mqttService.publish('ngx-mqtt/tests/publish/' + currentUuid, 'publish').subscribe(noop); }); + it('#pipeablePublish', (done) => { + mqttService.observe('ngx-mqtt/tests/pipeablePublish/' + currentUuid).pipe( + scan(acc => { return acc + 1; }, 0) + ).subscribe(count => { + if (count === 3) { + done(); + } + }); + of(null).pipe( + mergeMap(i => mqttService.publish('ngx-mqtt/tests/pipeablePublish/' + currentUuid, 'publish1')), + mergeMap(i => mqttService.publish('ngx-mqtt/tests/pipeablePublish/' + currentUuid, 'publish2')), + mergeMap(i => mqttService.publish('ngx-mqtt/tests/pipeablePublish/' + currentUuid, 'publish3')) + ).subscribe(); + }); + it('#unsafePublish', (done) => { mqttService.observe('ngx-mqtt/tests/unsafePublish/' + currentUuid).subscribe((_: IMqttMessage) => { done();