diff --git a/package-lock.json b/package-lock.json index aa064e6..ac1efb2 100644 --- a/package-lock.json +++ b/package-lock.json @@ -13,12 +13,13 @@ "@nestjs/common": "^10.4.7", "@nestjs/config": "^3.3.0", "@nestjs/core": "^10.4.7", + "@nestjs/event-emitter": "^2.1.1", "@nestjs/testing": "^10.4.7", "@release-it/conventional-changelog": "^9.0.3", "@types/express": "^5.0.0", "@types/jest": "29.5.14", "@types/node": "^22.9.0", - "aedes": "^0.51.2", + "aedes": "0.51.3", "auto-changelog": "^2.5.0", "prettier": "3.3.3", "reflect-metadata": "^0.2.2", @@ -1638,6 +1639,20 @@ } } }, + "node_modules/@nestjs/event-emitter": { + "version": "2.1.1", + "resolved": "https://registry.npmjs.org/@nestjs/event-emitter/-/event-emitter-2.1.1.tgz", + "integrity": "sha512-6L6fBOZTyfFlL7Ih/JDdqlCzZeCW0RjCX28wnzGyg/ncv5F/EOeT1dfopQr1loBRQ3LTgu8OWM7n4zLN4xigsg==", + "dev": true, + "license": "MIT", + "dependencies": { + "eventemitter2": "6.4.9" + }, + "peerDependencies": { + "@nestjs/common": "^8.0.0 || ^9.0.0 || ^10.0.0", + "@nestjs/core": "^8.0.0 || ^9.0.0 || ^10.0.0" + } + }, "node_modules/@nestjs/schematics": { "version": "10.2.3", "resolved": "https://registry.npmjs.org/@nestjs/schematics/-/schematics-10.2.3.tgz", @@ -4828,6 +4843,13 @@ "node": ">=6" } }, + "node_modules/eventemitter2": { + "version": "6.4.9", + "resolved": "https://registry.npmjs.org/eventemitter2/-/eventemitter2-6.4.9.tgz", + "integrity": "sha512-JEPTiaOt9f04oa6NOkc4aH+nVp5I3wEjpHbIPqfgCdD5v5bUzy7xQqwcVO2aDQgOWhI28da57HksMrzK9HlRxg==", + "dev": true, + "license": "MIT" + }, "node_modules/events": { "version": "3.3.0", "resolved": "https://registry.npmjs.org/events/-/events-3.3.0.tgz", diff --git a/package.json b/package.json index d29a38d..dd899a4 100644 --- a/package.json +++ b/package.json @@ -33,7 +33,7 @@ "test:watch": "jest --watch", "test:cov": "jest --coverage", "test:debug": "node --inspect-brk -r tsconfig-paths/register -r ts-node/register node_modules/.bin/jest --runInBand", - "test:e2e": "jest --config ./test/jest-e2e.json", + "test:e2e": "jest --config ./test/jest-e2e.json --detectOpenHandles", "prepack": "npx nest build" }, "peerDependencies": { @@ -44,13 +44,15 @@ "devDependencies": { "@nestjs/cli": "^10.4.8", "@nestjs/common": "^10.4.7", - "@nestjs/core": "^10.4.7", "@nestjs/config": "^3.3.0", + "@nestjs/core": "^10.4.7", + "@nestjs/event-emitter": "^2.1.1", "@nestjs/testing": "^10.4.7", "@release-it/conventional-changelog": "^9.0.3", "@types/express": "^5.0.0", "@types/jest": "29.5.14", "@types/node": "^22.9.0", + "aedes": "0.51.3", "auto-changelog": "^2.5.0", "prettier": "3.3.3", "reflect-metadata": "^0.2.2", @@ -64,8 +66,7 @@ "tsc-watch": "6.2.1", "tsconfig-paths": "4.2.0", "tslint": "6.1.3", - "typescript": "^5.6.3", - "aedes": "^0.51.2" + "typescript": "^5.6.3" }, "jest": { "moduleFileExtensions": [ diff --git a/src/mqtt.constants.ts b/src/mqtt.constants.ts index 6746997..8714f61 100644 --- a/src/mqtt.constants.ts +++ b/src/mqtt.constants.ts @@ -2,4 +2,3 @@ export const MQTT_SUBSCRIBE_OPTIONS = '__mqtt_subscribe_options'; export const MQTT_SUBSCRIBER_PARAMS = '__mqtt_subscriber_params'; export const MQTT_CLIENT_INSTANCE = 'MQTT_CLIENT_INSTANCE'; export const MQTT_OPTION_PROVIDER = 'MQTT_OPTION_PROVIDER'; -export const MQTT_LOGGER_PROVIDER = 'MQTT_LOGGER_PROVIDER'; diff --git a/src/mqtt.explorer.ts b/src/mqtt.explorer.ts deleted file mode 100644 index 9ba60f3..0000000 --- a/src/mqtt.explorer.ts +++ /dev/null @@ -1,221 +0,0 @@ -import { Inject, Injectable, Logger, OnModuleInit } from '@nestjs/common'; -import { DiscoveryService, MetadataScanner, Reflector } from '@nestjs/core'; -import { InstanceWrapper } from '@nestjs/core/injector/instance-wrapper'; -import { - MQTT_CLIENT_INSTANCE, - MQTT_LOGGER_PROVIDER, - MQTT_OPTION_PROVIDER, - MQTT_SUBSCRIBE_OPTIONS, - MQTT_SUBSCRIBER_PARAMS, -} from './mqtt.constants'; -import { MqttClient } from 'mqtt'; -import { Packet } from 'mqtt-packet'; -import { getTransform } from './mqtt.transform'; -import { - MqttModuleOptions, - MqttSubscribeOptions, - MqttSubscriber, - MqttSubscriberParameter, -} from './mqtt.interface'; - -@Injectable() -export class MqttExplorer implements OnModuleInit { - private readonly reflector = new Reflector(); - subscribers: MqttSubscriber[]; - - constructor( - private readonly discoveryService: DiscoveryService, - private readonly metadataScanner: MetadataScanner, - @Inject(MQTT_LOGGER_PROVIDER) private readonly logger: Logger, - @Inject(MQTT_CLIENT_INSTANCE) private readonly client: MqttClient, - @Inject(MQTT_OPTION_PROVIDER) private readonly options: MqttModuleOptions, - ) { - this.subscribers = []; - } - - onModuleInit() { - this.logger.log('MqttModule dependencies initialized'); - this.explore(); - } - - preprocess(options: MqttSubscribeOptions): string | string[] { - const processTopic = (topic) => { - const queue = - typeof options.queue === 'boolean' ? options.queue : this.options.queue; - const share = - typeof options.share === 'string' ? options.share : this.options.share; - topic = topic - .replace('$queue/', '') - .replace(/^\$share\/([A-Za-z0-9]+)\//, ''); - if (queue) { - return `$queue/${topic}`; - } - - if (share) { - return `$share/${share}/${topic}`; - } - - return topic; - }; - if (Array.isArray(options.topic)) { - return options.topic.map(processTopic); - } else { - // this.logger.log(options.topic); - return processTopic(options.topic); - } - } - - subscribe( - options: MqttSubscribeOptions, - parameters: MqttSubscriberParameter[], - handle, - provider, - ) { - const topic = this.preprocess(options); - this.client.subscribe(topic, (err) => { - if (!err) { - // put it into this.subscribers; - if (!Array.isArray(options.topic)) { - const topics = new Array(); - topics.push(options.topic); - options.topic = topics; - } - options.topic.forEach((aTopic) => { - this.subscribers.push({ - topic: aTopic, - route: aTopic - .replace('$queue/', '') - .replace(/^\$share\/([A-Za-z0-9]+)\//, ''), - regexp: MqttExplorer.topicToRegexp(aTopic), - provider, - handle, - options, - parameters, - }); - }); - } else { - this.logger.error(`subscribe topic [${options.topic} failed]`); - } - - }); - } - - explore() { - const providers: InstanceWrapper[] = this.discoveryService.getProviders(); - providers.forEach((wrapper: InstanceWrapper) => { - const { instance } = wrapper; - if (!instance) { - return; - } - this.metadataScanner.scanFromPrototype( - instance, - Object.getPrototypeOf(instance), - (key) => { - const subscribeOptions: MqttSubscribeOptions = this.reflector.get( - MQTT_SUBSCRIBE_OPTIONS, - instance[key], - ); - const parameters = this.reflector.get( - MQTT_SUBSCRIBER_PARAMS, - instance[key], - ); - if (subscribeOptions) { - this.subscribe( - subscribeOptions, - parameters, - instance[key], - instance, - ); - } - }, - ); - }); - this.client.on( - 'message', - (topic: string, payload: Buffer, packet: Packet) => { - const subscriber = this.getSubscriber(topic); - if (subscriber) { - const parameters = subscriber.parameters || []; - const scatterParameters: MqttSubscriberParameter[] = []; - for (const parameter of parameters) { - scatterParameters[parameter.index] = parameter; - } - try { - const transform = getTransform(subscriber.options.transform); - - // add a option to do something before handle message. - if (this.options.beforeHandle) { - this.options.beforeHandle(topic, payload, packet); - } - - subscriber.handle.bind(subscriber.provider)( - ...scatterParameters.map((parameter) => { - switch (parameter?.type) { - case 'payload': - return transform(payload); - case 'topic': - return topic; - case 'packet': - return packet; - case 'params': - return MqttExplorer.matchGroups(topic, subscriber.regexp); - default: - return null; - } - }), - ); - } catch (err) { - this.logger.error(err); - } - } - }, - ); - } - - private getSubscriber(topic: string): MqttSubscriber | null { - for (const subscriber of this.subscribers) { - subscriber.regexp.lastIndex = 0; - if (subscriber.regexp.test(topic)) { - return subscriber; - } - } - return null; - } - - private static topicToRegexp(topic: string) { - // compatible with emqtt - return new RegExp( - '^' + - topic - .replace('$queue/', '') - .replace(/^\$share\/([A-Za-z0-9]+)\//, '') - .replace(/([\[\]\?\(\)\\\\$\^\*\.|])/g, '\\$1') - .replace(/\+/g, '([^/]+)') - .replace(/\/#$/, '(/.*)?') + - '$', - 'y', - ); - } - - private static matchGroups(str: string, regex: RegExp) { - regex.lastIndex = 0; - let m = regex.exec(str); - const matches: string[] = []; - - while (m !== null) { - // This is necessary to avoid infinite loops with zero-width matches - if (m.index === regex.lastIndex) { - regex.lastIndex++; - } - - // The result can be accessed through the `m`-variable. - m.forEach((match, groupIndex) => { - if (groupIndex !== 0) { - matches.push(match); - } - }); - m = regex.exec(str); - } - return matches; - } -} diff --git a/src/mqtt.interface.ts b/src/mqtt.interface.ts index d2d064a..402b363 100644 --- a/src/mqtt.interface.ts +++ b/src/mqtt.interface.ts @@ -46,6 +46,7 @@ export interface MqttModuleOptions extends IClientOptions { */ share?: string; + topic?: string; beforeHandle?: (topic: string, payload: Buffer, packet: Packet) => any; } diff --git a/src/mqtt.module-definition.ts b/src/mqtt.module-definition.ts index a336738..266ac9e 100644 --- a/src/mqtt.module-definition.ts +++ b/src/mqtt.module-definition.ts @@ -1,5 +1,7 @@ import { ConfigurableModuleBuilder } from '@nestjs/common'; import { MqttModuleOptions } from './mqtt.interface'; -export const { ConfigurableModuleClass, MODULE_OPTIONS_TOKEN } = - new ConfigurableModuleBuilder().setClassMethodName('forRoot').build(); \ No newline at end of file +export const { ConfigurableModuleClass, MODULE_OPTIONS_TOKEN, ASYNC_OPTIONS_TYPE } = + new ConfigurableModuleBuilder({ 'moduleName': 'MqttModule' }) + .setClassMethodName('forRoot') + .build(); \ No newline at end of file diff --git a/src/mqtt.module.ts b/src/mqtt.module.ts index ec87b1c..0cc6c6d 100644 --- a/src/mqtt.module.ts +++ b/src/mqtt.module.ts @@ -4,10 +4,12 @@ import { } from '@nestjs/common'; import { MqttService } from './mqtt.service'; import { ConfigurableModuleClass, MODULE_OPTIONS_TOKEN } from './mqtt.module-definition'; +import { DiscoveryModule, DiscoveryService } from '@nestjs/core'; @Global() @Module({ providers: [MqttService], + imports: [DiscoveryModule], exports: [MqttService, MODULE_OPTIONS_TOKEN], }) export class MqttModule extends ConfigurableModuleClass { diff --git a/src/mqtt.service.ts b/src/mqtt.service.ts index 6b3108b..025f0fa 100644 --- a/src/mqtt.service.ts +++ b/src/mqtt.service.ts @@ -1,23 +1,35 @@ import { Inject, Injectable, Logger, OnModuleDestroy, OnModuleInit } from '@nestjs/common'; -import { MQTT_CLIENT_INSTANCE } from './mqtt.constants'; import { MqttClient, Packet, IClientPublishOptions, IClientSubscribeOptions, ISubscriptionGrant, + connectAsync, } from 'mqtt'; import { MODULE_OPTIONS_TOKEN } from './mqtt.module-definition'; -import { MqttModuleOptions } from './mqtt.interface'; -import { connect } from 'mqtt'; +import { MqttModuleOptions, MqttSubscribeOptions, MqttSubscriber, MqttSubscriberParameter } from './mqtt.interface'; +import { InstanceWrapper } from '@nestjs/core/injector/instance-wrapper'; +import { DiscoveryService, MetadataScanner, Reflector } from '@nestjs/core'; +import { MQTT_SUBSCRIBE_OPTIONS, MQTT_SUBSCRIBER_PARAMS } from './mqtt.constants'; +import { getTransform } from './mqtt.transform'; @Injectable() export class MqttService implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger('MqttService'); private client: MqttClient; + private readonly reflector = new Reflector(); + private subscribers: MqttSubscriber[]; constructor( + private readonly discoveryService: DiscoveryService, + private readonly metadataScanner: MetadataScanner, @Inject(MODULE_OPTIONS_TOKEN) private readonly options: MqttModuleOptions, ) { + this.subscribers = []; + } + + getClient(): MqttClient { + return this.client; } async connect() { @@ -25,7 +37,7 @@ export class MqttService implements OnModuleInit, OnModuleDestroy { if (this.options.passwordProvider) { this.options.password = await this.options.passwordProvider(); } - this.client = connect(this.options); + this.client = await connectAsync(this.options); this.client.on('connect', () => { this.logger.log('MQTT connected'); }); @@ -53,12 +65,15 @@ export class MqttService implements OnModuleInit, OnModuleDestroy { this.client.on('offline', () => { this.logger.log('MQTT offline'); }); + + this.logger.log('MqttService::connected'); } subscribe( topic: string | string[], opts?: IClientSubscribeOptions, ): Promise { + this.logger.log(`subscribe ${topic}`) return new Promise((resolve, reject) => { this.client.subscribe(topic, opts || null, (err, granted) => { if (err) { @@ -101,15 +116,200 @@ export class MqttService implements OnModuleInit, OnModuleDestroy { }); } + /** + * Explorer parts + */ + preprocess(subscribeOptions: MqttSubscribeOptions): string | string[] { + const processTopic = (topic) => { + const queue = + typeof subscribeOptions.queue === 'boolean' ? subscribeOptions.queue : this.options.queue; + const share = + typeof subscribeOptions.share === 'string' ? subscribeOptions.share : this.options.share; + topic = topic + .replace('$queue/', '') + .replace(/^\$share\/([A-Za-z0-9]+)\//, ''); + if (queue) { + return `$queue/${topic}`; + } + + if (share) { + return `$share/${share}/${topic}`; + } + + return topic; + }; + if (Array.isArray(subscribeOptions.topic)) { + return subscribeOptions.topic.map(processTopic); + } else { + this.logger.log(this.options.topic); + return processTopic(subscribeOptions.topic); + } + }//preprocess + + _subscribe( + options: MqttSubscribeOptions, + parameters: MqttSubscriberParameter[], + handle, + provider, + ) { + const topic = this.preprocess(options); + this.client.subscribe(topic, (err) => { + if (!err) { + // put it into this.subscribers; + if (!Array.isArray(options.topic)) { + const topics = new Array(); + topics.push(options.topic); + options.topic = topics; + } + options.topic.forEach((aTopic) => { + this.subscribers.push({ + topic: aTopic, + route: aTopic + .replace('$queue/', '') + .replace(/^\$share\/([A-Za-z0-9]+)\//, ''), + regexp: MqttService.topicToRegexp(aTopic), + provider, + handle, + options, + parameters, + }); + }); + } else { + this.logger.error(`subscribe topic [${options.topic} failed]`); + } + + }); + }//_subscribe + + explore() { + const providers: InstanceWrapper[] = this.discoveryService.getProviders(); + providers.forEach((wrapper: InstanceWrapper) => { + const { instance } = wrapper; + if (!instance) { + return; + } + this.metadataScanner.scanFromPrototype( + instance, + Object.getPrototypeOf(instance), + (key) => { + const subscribeOptions: MqttSubscribeOptions = this.reflector.get( + MQTT_SUBSCRIBE_OPTIONS, + instance[key], + ); + const parameters = this.reflector.get( + MQTT_SUBSCRIBER_PARAMS, + instance[key], + ); + if (subscribeOptions) { + this._subscribe( + subscribeOptions, + parameters, + instance[key], + instance, + ); + } + }, + ); + }); + this.client.on( + 'message', + (topic: string, payload: Buffer, packet: Packet) => { + const subscriber = this.getSubscriber(topic); + if (subscriber) { + const parameters = subscriber.parameters || []; + const scatterParameters: MqttSubscriberParameter[] = []; + for (const parameter of parameters) { + scatterParameters[parameter.index] = parameter; + } + try { + const transform = getTransform(subscriber.options.transform); + + // add a option to do something before handle message. + if (this.options.beforeHandle) { + this.options.beforeHandle(topic, payload, packet); + } + + subscriber.handle.bind(subscriber.provider)( + ...scatterParameters.map((parameter) => { + switch (parameter?.type) { + case 'payload': + return transform(payload); + case 'topic': + return topic; + case 'packet': + return packet; + case 'params': + return MqttService.matchGroups(topic, subscriber.regexp); + default: + return null; + } + }), + ); + } catch (err) { + this.logger.error(err); + } + } + }, + ); + } + + private getSubscriber(topic: string): MqttSubscriber | null { + for (const subscriber of this.subscribers) { + subscriber.regexp.lastIndex = 0; + if (subscriber.regexp.test(topic)) { + return subscriber; + } + } + return null; + } + + private static topicToRegexp(topic: string) { + // compatible with emqtt + return new RegExp( + '^' + + topic + .replace('$queue/', '') + .replace(/^\$share\/([A-Za-z0-9]+)\//, '') + .replace(/([\[\]\?\(\)\\\\$\^\*\.|])/g, '\\$1') + .replace(/\+/g, '([^/]+)') + .replace(/\/#$/, '(/.*)?') + + '$', + 'y', + ); + } + + private static matchGroups(str: string, regex: RegExp) { + regex.lastIndex = 0; + let m = regex.exec(str); + const matches: string[] = []; + + while (m !== null) { + // This is necessary to avoid infinite loops with zero-width matches + if (m.index === regex.lastIndex) { + regex.lastIndex++; + } + + // The result can be accessed through the `m`-variable. + m.forEach((match, groupIndex) => { + if (groupIndex !== 0) { + matches.push(match); + } + }); + m = regex.exec(str); + } + return matches; + } + + async onModuleInit() { await this.connect(); + this.logger.log('onModuleInit::connected::explore'); + this.explore(); } - onModuleDestroy() { + async onModuleDestroy() { try { - if (this.client && this.client.connected) { - this.client.end(true); - } + await this.client.endAsync(true); } catch (e) { this.logger.error(e); } diff --git a/test/app.e2e-spec.ts b/test/app.e2e-spec.ts deleted file mode 100644 index 5df807d..0000000 --- a/test/app.e2e-spec.ts +++ /dev/null @@ -1,23 +0,0 @@ -import { Test, TestingModule } from '@nestjs/testing'; -import { INestApplication } from '@nestjs/common'; -import * as request from 'supertest'; -import { AppModule } from './../src/app.module'; - -describe('AppController (e2e)', () => { - let app: INestApplication; - - beforeEach(async () => { - const moduleFixture: TestingModule = await Test.createTestingModule({ - imports: [AppModule], - }).compile(); - - app = moduleFixture.createNestApplication(); - await app.init(); - }); - it('/ (GET)', () => { - return request(app.getHttpServer()) - .get('/') - .expect(200) - .expect('Hello World!'); - }); -}); diff --git a/test/mqtt.e2e-spec.ts b/test/mqtt.e2e-spec.ts new file mode 100644 index 0000000..5dd0cd4 --- /dev/null +++ b/test/mqtt.e2e-spec.ts @@ -0,0 +1,73 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { MqttModule } from '../src/mqtt.module'; +import Aedes, { AedesOptions } from 'aedes'; +import { createServer } from 'net'; +import { MqttModuleOptions } from '../src/mqtt.interface'; + +import { EventEmitter2, EventEmitterModule, OnEvent } from '@nestjs/event-emitter'; +import { BrokerService } from './test.broker.service'; +import { MQTT_RECEIVE_EVENT, MQTT_SEND_EVENT, TEST_TOPIC } from './test.constants'; +import { DiscoveryModule } from '@nestjs/core'; + +describe('MQTT Module (e2e)', () => { + const aedesServer = new Aedes({} as AedesOptions); + const server = createServer(aedesServer.handle); + const mqttHost = '127.0.0.1'; + const mqttProtocol = 'mqtt'; + const mqttPort = 1883; + const mqttUser = 'test'; + const mqttPassword = 'test' + let moduleFixture: TestingModule; + let brokerService, eventEmitter; + + aedesServer.authenticate = function (client, username, password, callback) { + callback(null, username === mqttUser && password.equals(Buffer.from(mqttPassword, 'utf8'))); + }//authenticate + + function providePassword(): Promise { + return Promise.resolve(mqttPassword); + } + + beforeAll(async () => { + server.listen(mqttPort); + moduleFixture = await Test.createTestingModule({ + imports: [ + DiscoveryModule, + EventEmitterModule.forRoot(), + MqttModule.forRootAsync({ + useFactory: async () => + ({ + host: mqttHost, + port: mqttPort, + protocol: mqttProtocol, + username: mqttUser, + passwordProvider: providePassword, + }) as MqttModuleOptions, + }) + ], + providers: [BrokerService] + }).compile(); + await moduleFixture.init(); + eventEmitter = await moduleFixture.resolve(EventEmitter2); + brokerService = await moduleFixture.resolve(BrokerService); + }); + + afterAll(async () => { + moduleFixture.close(); + server.close(); + aedesServer.close(); + }); + + it('test publish/subscribe', async () => { + const testMsg = {'blubb':'blubb'}; + let asyncEvent = new Promise<{topic: string, payload: any}>((resolve, reject) => { + eventEmitter.on(MQTT_RECEIVE_EVENT, function (topic, payload) { + resolve({topic, payload} as {topic: string, payload: any}); + }.bind(this)); + }); + eventEmitter.emit(MQTT_SEND_EVENT, testMsg) + const event = await asyncEvent; + expect(event.topic).toEqual(TEST_TOPIC); + expect(event.payload).toEqual(testMsg); + }); +}); diff --git a/test/test.broker.service.ts b/test/test.broker.service.ts new file mode 100644 index 0000000..c27a0e0 --- /dev/null +++ b/test/test.broker.service.ts @@ -0,0 +1,36 @@ +import { Inject, Injectable, OnModuleDestroy, OnModuleInit } from "@nestjs/common"; +import { MqttService, Payload, Subscribe, Topic } from "../src"; +import { EventEmitter2, OnEvent } from "@nestjs/event-emitter"; +import { MQTT_RECEIVE_EVENT, MQTT_SEND_EVENT, TEST_TOPIC } from "./test.constants"; + +@Injectable() +export class BrokerService { + constructor( + @Inject(MqttService) private readonly mqttService: MqttService, + @Inject(EventEmitter2) private readonly eventEmitter: EventEmitter2, + ) { } + + /** + * Subscribes to the incoming request topic and emits internal events. + * @param topic {string} - the topic + * @param payload {any} - the payload object in the request + */ + @Subscribe(TEST_TOPIC) + async handleSubscribed( + @Topic() topic: string, + @Payload() + payload: any, + ) { + this.eventEmitter.emit(MQTT_RECEIVE_EVENT, topic, payload); + } + + /** + * Publishes to the broker + * @param notification {MobileRegistrationRequested} - the notification. + */ + @OnEvent(MQTT_SEND_EVENT) + async handlePublish(notification: any) { + await this.mqttService.publish(TEST_TOPIC, JSON.stringify(notification), { 'qos': 2 }); + } + +} \ No newline at end of file diff --git a/test/test.constants.ts b/test/test.constants.ts new file mode 100644 index 0000000..bae0914 --- /dev/null +++ b/test/test.constants.ts @@ -0,0 +1,4 @@ + +export const TEST_TOPIC = 'test/1/interact'; +export const MQTT_SEND_EVENT = 'mqtt.broker.send'; +export const MQTT_RECEIVE_EVENT = 'mqtt.broker.receive'; \ No newline at end of file diff --git a/tsconfig.json b/tsconfig.json index 21b99e8..05d4ec5 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -18,6 +18,7 @@ "strictBindCallApply": false, "forceConsistentCasingInFileNames": false, "noFallthroughCasesInSwitch": false, - "allowJs": true + "allowJs": true, + "esModuleInterop": true } }