diff --git a/src/mqtt.service.ts b/src/mqtt.service.ts index 729ccb7..c8065cb 100644 --- a/src/mqtt.service.ts +++ b/src/mqtt.service.ts @@ -208,8 +208,8 @@ export class MqttService { /** * This method publishes a message for a topic with optional options. - * The returned observable will complete, if publishing was successful - * and will throw an error, if the publication fails + * The returned observable will emit empty value and complete, if publishing was successful + * and will throw an error, if the publication fails. */ public publish(topic: string, message: any, options?: IPublishOptions): Observable { if (!this.client) { @@ -220,6 +220,7 @@ export class MqttService { if (err) { obs.error(err); } else { + obs.next(null); obs.complete(); } }); diff --git a/tests/mqtt.service.spec.ts b/tests/mqtt.service.spec.ts index fc8ed12..d52e479 100644 --- a/tests/mqtt.service.spec.ts +++ b/tests/mqtt.service.spec.ts @@ -1,7 +1,7 @@ import { TestBed } from '@angular/core/testing'; -import { skip, map } from 'rxjs/operators'; -import { noop, Subscription } from 'rxjs'; +import { skip, map, mergeMap, scan } from 'rxjs/operators'; +import { noop, Subscription, of } from 'rxjs'; import { MqttService } from '../src/mqtt.service'; import { MqttServiceConfig, MqttClientService } from '../src/mqtt.module'; @@ -90,6 +90,21 @@ describe('MqttService', () => { mqttService.publish('ngx-mqtt/tests/publish/' + currentUuid, 'publish').subscribe(noop); }); + it('#pipeablePublish', (done) => { + mqttService.observe('ngx-mqtt/tests/pipeablePublish/' + currentUuid).pipe( + scan(acc => { return acc + 1; }, 0) + ).subscribe(count => { + if (count === 3) { + done(); + } + }); + of(null).pipe( + mergeMap(i => mqttService.publish('ngx-mqtt/tests/pipeablePublish/' + currentUuid, 'publish1')), + mergeMap(i => mqttService.publish('ngx-mqtt/tests/pipeablePublish/' + currentUuid, 'publish2')), + mergeMap(i => mqttService.publish('ngx-mqtt/tests/pipeablePublish/' + currentUuid, 'publish3')) + ).subscribe(); + }); + it('#unsafePublish', (done) => { mqttService.observe('ngx-mqtt/tests/unsafePublish/' + currentUuid).subscribe((_: IMqttMessage) => { done();