Skip to content

Commit

Permalink
End the client, if manually executing connect, but already have an in…
Browse files Browse the repository at this point in the history
…ternal client.
  • Loading branch information
sclausen committed Apr 16, 2018
1 parent ae6f4c2 commit 30a07a2
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 70 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ import {
MqttMessage,
MqttModule,
MqttService,
MqttServiceOptions
IMqttServiceOptions
} from 'ngx-mqtt';

export const MQTT_SERVICE_OPTIONS: MqttServiceOptions = {
export const MQTT_SERVICE_OPTIONS: IMqttServiceOptions = {
hostname: 'localhost',
port: 9001,
path: '/mqtt'
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "ngx-mqtt",
"version": "5.1.0",
"version": "5.2.0",
"description": "ngx mqtt client library",
"main": "bundles/ngx-mqtt.min.js",
"module": "./src/index.js",
Expand Down
4 changes: 2 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { NgModule, ModuleWithProviders } from '@angular/core';

import { MqttService } from './mqtt.service';
import { MqttServiceOptions } from './mqtt.model';
import { IMqttServiceOptions } from './mqtt.model';

export * from './mqtt.service';
export * from './mqtt.model';

export const MQTT_SERVICE_OPTIONS: MqttServiceOptions = {
export const MQTT_SERVICE_OPTIONS: IMqttServiceOptions = {
connectOnCreate: true,
hostname: 'localhost',
port: 1884,
Expand Down
18 changes: 9 additions & 9 deletions src/mqtt.model.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import * as MQTT from 'mqtt';
import { IClientOptions, IClientPublishOptions, IPacket, MqttClient } from 'mqtt';
import { Stream } from 'stream';

export enum MqttConnectionState {
Expand All @@ -7,7 +7,7 @@ export enum MqttConnectionState {
CONNECTED
}

export interface MqttServiceOptions extends MQTT.IClientOptions {
export interface IMqttServiceOptions extends IClientOptions {
/** wether a new connection should be created
* on creating an instance of the service */
connectOnCreate?: boolean;
Expand All @@ -20,7 +20,7 @@ export interface MqttServiceOptions extends MQTT.IClientOptions {
protocol?: 'wss' | 'ws';
}

export interface MqttMessage extends MQTT.IPacket {
export interface IMqttMessage extends IPacket {
/** the mqtt topic to which this message was published to */
topic: string;
/** the payload */
Expand All @@ -33,15 +33,15 @@ export interface MqttMessage extends MQTT.IPacket {
dup: boolean;
}

export interface PublishOptions extends MQTT.IClientPublishOptions { }
export interface OnConnectEvent extends MqttMessage { }
export interface OnErrorEvent extends Error { }
export interface OnMessageEvent extends MqttMessage { }
export interface OnSubackEvent {
export interface IPublishOptions extends IClientPublishOptions { }
export interface IOnConnectEvent extends IMqttMessage { }
export interface IOnErrorEvent extends Error { }
export interface IOnMessageEvent extends IMqttMessage { }
export interface IOnSubackEvent {
granted: boolean;
filter: string;
}

export interface MqttClient extends MQTT.Client {
export interface IMqttClient extends MqttClient {
stream: Stream;
}
116 changes: 60 additions & 56 deletions src/mqtt.service.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { EventEmitter } from '@angular/core';
import * as MQTT from 'mqtt';
import { connect, ISubscriptionGrant } from 'mqtt';
import * as extend from 'xtend';

import { BehaviorSubject } from 'rxjs/BehaviorSubject';
Expand All @@ -12,15 +12,15 @@ import { merge } from 'rxjs/observable/merge';
import { filter, publishReplay, refCount } from 'rxjs/operators';

import {
MqttClient,
IMqttClient,
MqttConnectionState,
MqttMessage,
MqttServiceOptions,
OnConnectEvent,
OnErrorEvent,
OnMessageEvent,
OnSubackEvent,
PublishOptions
IMqttMessage,
IMqttServiceOptions,
IOnConnectEvent,
IOnErrorEvent,
IOnMessageEvent,
IOnSubackEvent,
IPublishOptions
} from './mqtt.model';

/**
Expand All @@ -30,33 +30,33 @@ import {
*/
export class MqttService {
/** a map of all mqtt observables by filter */
public observables: { [filter: string]: Observable<MqttMessage> } = {};
public observables: { [filter: string]: Observable<IMqttMessage> } = {};
/** the connection state */
public state: BehaviorSubject<MqttConnectionState> = new BehaviorSubject(MqttConnectionState.CLOSED);
/** an observable of the last mqtt message */
public messages: Subject<MqttMessage> = new Subject<MqttMessage>();
public messages: Subject<IMqttMessage> = new Subject<IMqttMessage>();

private _clientId = this._generateClientId();
private _keepalive = 10;
private _connectTimeout = 10000;
private _reconnectPeriod = 10000;
private _url: string;

private _onConnect: EventEmitter<OnConnectEvent> = new EventEmitter<OnConnectEvent>();
private _onConnect: EventEmitter<IOnConnectEvent> = new EventEmitter<IOnConnectEvent>();
private _onClose: EventEmitter<void> = new EventEmitter<void>();
private _onError: EventEmitter<OnErrorEvent> = new EventEmitter<OnErrorEvent>();
private _onError: EventEmitter<IOnErrorEvent> = new EventEmitter<IOnErrorEvent>();
private _onReconnect: EventEmitter<void> = new EventEmitter<void>();
private _onMessage: EventEmitter<OnMessageEvent> = new EventEmitter<OnMessageEvent>();
private _onSuback: EventEmitter<OnSubackEvent> = new EventEmitter<OnSubackEvent>();
private _onMessage: EventEmitter<IOnMessageEvent> = new EventEmitter<IOnMessageEvent>();
private _onSuback: EventEmitter<IOnSubackEvent> = new EventEmitter<IOnSubackEvent>();

/**
* The constructor needs [connection options]{@link MqttServiceOptions} regarding the broker and some
* The constructor needs [connection options]{@link IMqttServiceOptions} regarding the broker and some
* options to configure behavior of this service, like if the connection to the broker
* should be established on creation of this service or not.
* @param options connection and creation options for MQTT.js and this service
* @param client an instance of MqttClient
* @param client an instance of IMqttClient
*/
constructor(private options: MqttServiceOptions, private client?: MqttClient) {
constructor(private options: IMqttServiceOptions, private client?: IMqttClient) {
if (options.connectOnCreate !== false) {
this.connect({}, client);
}
Expand All @@ -67,9 +67,9 @@ export class MqttService {
/**
* connect manually connects to the mqtt broker.
* @param opts the connection options
* @param client an optional MqttClient
* @param client an optional IMqttClient
*/
public connect(opts?: MqttServiceOptions, client?: MqttClient) {
public connect(opts?: IMqttServiceOptions, client?: IMqttClient) {
const options = extend(this.options || {}, opts);
const protocol = options.protocol || 'ws';
const hostname = options.hostname || 'localhost';
Expand All @@ -84,8 +84,12 @@ export class MqttService {
connectTimeout: this._connectTimeout
}, options);

if (this.client) {
this.client.end(true);
}

if (!client) {
this.client = <MqttClient>MQTT.connect(this._url, mergedOptions);
this.client = <IMqttClient>connect(this._url, mergedOptions);
} else {
this.client = client;
}
Expand Down Expand Up @@ -123,42 +127,42 @@ export class MqttService {
* The first one subscribing to the resulting observable executes a mqtt subscribe.
* The last one unsubscribing this filter executes a mqtt unsubscribe.
* @param {string} filter
* @return {Observable<MqttMessage>} the observable you can subscribe to
* @return {Observable<IMqttMessage>} the observable you can subscribe to
*/
public observe(filterString: string): Observable<MqttMessage> {
public observe(filterString: string): Observable<IMqttMessage> {
if (!this.client) {
throw new Error('mqtt client not connected');
}
if (!this.observables[filterString]) {
const rejected = new Subject();
this.observables[filterString] = <Observable<MqttMessage>>UsingObservable
this.observables[filterString] = <Observable<IMqttMessage>>UsingObservable
.create(
// resourceFactory: Do the actual ref-counting MQTT subscription.
// refcount is decreased on unsubscribe.
() => {
const subscription: Subscription = new Subscription();
this.client.subscribe(filterString, (err, granted: MQTT.ISubscriptionGrant[]) => {
granted.forEach((granted_: MQTT.ISubscriptionGrant) => {
if (granted_.qos === 128) {
delete this.observables[granted_.topic];
this.client.unsubscribe(granted_.topic);
rejected.error(`subscription for '${granted_.topic}' rejected!`);
}
this._onSuback.emit({filter: filterString, granted: granted_.qos !== 128});
// resourceFactory: Do the actual ref-counting MQTT subscription.
// refcount is decreased on unsubscribe.
() => {
const subscription: Subscription = new Subscription();
this.client.subscribe(filterString, (err, granted: ISubscriptionGrant[]) => {
granted.forEach((granted_: ISubscriptionGrant) => {
if (granted_.qos === 128) {
delete this.observables[granted_.topic];
this.client.unsubscribe(granted_.topic);
rejected.error(`subscription for '${granted_.topic}' rejected!`);
}
this._onSuback.emit({ filter: filterString, granted: granted_.qos !== 128 });
});
});
subscription.add(() => {
delete this.observables[filterString];
this.client.unsubscribe(filterString);
});
});
subscription.add(() => {
delete this.observables[filterString];
this.client.unsubscribe(filterString);
});
return subscription;
},
// observableFactory: Create the observable that is consumed from.
// This part is not executed until the Observable returned by
// `observe` gets actually subscribed.
(subscription: AnonymousSubscription) => merge(rejected, this.messages))
return subscription;
},
// observableFactory: Create the observable that is consumed from.
// This part is not executed until the Observable returned by
// `observe` gets actually subscribed.
(subscription: AnonymousSubscription) => merge(rejected, this.messages))
.pipe(
filter((msg: MqttMessage) => MqttService.filterMatchesTopic(filterString, msg.topic)),
filter((msg: IMqttMessage) => MqttService.filterMatchesTopic(filterString, msg.topic)),
publishReplay(1),
refCount()
);
Expand All @@ -175,7 +179,7 @@ export class MqttService {
* @param {PublishOptions} options
* @return {Observable<void>}
*/
public publish(topic: string, message: any, options?: PublishOptions): Observable<void> {
public publish(topic: string, message: any, options?: IPublishOptions): Observable<void> {
if (!this.client) {
throw new Error('mqtt client not connected');
}
Expand All @@ -198,7 +202,7 @@ export class MqttService {
* @param {any} message
* @param {PublishOptions} options
*/
public unsafePublish(topic: string, message: any, options?: PublishOptions): void {
public unsafePublish(topic: string, message: any, options?: IPublishOptions): void {
if (!this.client) {
throw new Error('mqtt client not connected');
}
Expand Down Expand Up @@ -252,7 +256,7 @@ export class MqttService {
}

/** An EventEmitter to listen to connect messages */
public get onConnect(): EventEmitter<OnConnectEvent> {
public get onConnect(): EventEmitter<IOnConnectEvent> {
return this._onConnect;
}

Expand All @@ -262,17 +266,17 @@ export class MqttService {
}

/** An EventEmitter to listen to message events */
public get onMessage(): EventEmitter<OnMessageEvent> {
public get onMessage(): EventEmitter<IOnMessageEvent> {
return this._onMessage;
}

/** An EventEmitter to listen to suback events */
public get onSuback(): EventEmitter<OnSubackEvent> {
public get onSuback(): EventEmitter<IOnSubackEvent> {
return this._onSuback;
}

/** An EventEmitter to listen to error events */
public get onError(): EventEmitter<OnErrorEvent> {
public get onError(): EventEmitter<IOnErrorEvent> {
return this._onError;
}

Expand All @@ -281,7 +285,7 @@ export class MqttService {
this._onClose.emit();
}

private _handleOnConnect = (e: OnConnectEvent) => {
private _handleOnConnect = (e: IOnConnectEvent) => {
Object.keys(this.observables).forEach((filter: string) => {
this.client.subscribe(filter);
});
Expand All @@ -297,7 +301,7 @@ export class MqttService {
this._onReconnect.emit();
}

private _handleOnError = (e: OnErrorEvent) => {
private _handleOnError = (e: IOnErrorEvent) => {
this._onError.emit(e);
console.error(e);
}
Expand Down

0 comments on commit 30a07a2

Please sign in to comment.