Skip to content

Commit

Permalink
Merge pull request #13 from zabertech/10270--handle-multiple-sub-on-s…
Browse files Browse the repository at this point in the history
…ame-id

10270 - handle multiple subscriptions on same subscription ID
  • Loading branch information
zaberSatnam authored Oct 20, 2022
2 parents 1cc7803 + e121dc7 commit f5d7125
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 34 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "swampyer",
"version": "1.4.3",
"version": "1.5.0",
"description": "A lightweight WAMP client implementing the WAMP v2 basic profile",
"main": "lib/index.js",
"module": "lib/index.js",
Expand Down
125 changes: 111 additions & 14 deletions src/swampyer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,9 @@ describe(`${Swampyer.prototype.subscribe.name}()`, () => {
const request = await transportProvider.transport.read();
expect(request).toEqual([MessageTypes.Subscribe, expect.any(Number), expect.any(Object), 'com.some.uri']);
transportProvider.sendToLib(MessageTypes.Subscribed, [request[1] as number, 1234]);
await promise;
const data = await promise;

expect(data).toEqual({ id: 1234, handler: subHandler });

const args = [2, 'args'];
const kwargs = { one: 'kwarg' };
Expand All @@ -806,15 +808,15 @@ describe(`${Swampyer.prototype.subscribe.name}()`, () => {
expect(subHandler).toBeCalledWith(args, kwargs, details);
});

it('multiple subscriptions can co-exist', async () => {
it('runs multiple subscription handlers for different subscription IDs', async () => {
const subHandler1 = jest.fn();
const promise1 = wamp.subscribe('com.some.uri', subHandler1);
const promise1 = wamp.subscribe('com.some.uri1', subHandler1);
const request1 = await transportProvider.transport.read();
transportProvider.sendToLib(MessageTypes.Subscribed, [request1[1] as number, 1234]);
await promise1;

const subHandler2 = jest.fn();
const promise2 = wamp.subscribe('com.some.uri', subHandler2);
const promise2 = wamp.subscribe('com.some.uri2', subHandler2);
const request2 = await transportProvider.transport.read();
transportProvider.sendToLib(MessageTypes.Subscribed, [request2[1] as number, 9876]);
await promise2;
Expand All @@ -826,6 +828,25 @@ describe(`${Swampyer.prototype.subscribe.name}()`, () => {
expect(subHandler2).toBeCalledWith(['for 2nd sub'], {}, {});
});

it('runs multiple subscription handlers for same subscription ID', async () => {
const subHandler1 = jest.fn();
const promise1 = wamp.subscribe('com.some.uri', subHandler1);
const request1 = await transportProvider.transport.read();
transportProvider.sendToLib(MessageTypes.Subscribed, [request1[1] as number, 1234]);
await promise1;

const subHandler2 = jest.fn();
const promise2 = wamp.subscribe('com.some.uri', subHandler2);
const request2 = await transportProvider.transport.read();
transportProvider.sendToLib(MessageTypes.Subscribed, [request2[1] as number, 1234]);
await promise2;

transportProvider.sendToLib(MessageTypes.Event, [1234, 5555, {}, ['some event'], {}]);

expect(subHandler1).toBeCalledWith(['some event'], {}, {});
expect(subHandler2).toBeCalledWith(['some event'], {}, {});
});

it('provides reasonable defaults for args, kwargs, and details if they are undefined in the event data', async () => {
const subHandler = jest.fn();
const promise = wamp.subscribe('com.some.uri', subHandler);
Expand Down Expand Up @@ -859,7 +880,28 @@ describe(`${Swampyer.prototype.subscribe.name}()`, () => {
});

it('logs an error to console if subscription handler throws an error', async () => {
const subHandler = jest.fn(() => { throw Error('I never subscribed to this!') });
const errorObj = new Error('I never subscribed to this');
const subHandler = jest.fn(() => { throw errorObj });

const promise = wamp.subscribe('com.some.uri', subHandler);
const request = await transportProvider.transport.read();
expect(request).toEqual([MessageTypes.Subscribe, expect.any(Number), expect.any(Object), 'com.some.uri']);
transportProvider.sendToLib(MessageTypes.Subscribed, [request[1] as number, 1234]);
await promise;

transportProvider.sendToLib(MessageTypes.Event, [1234, 5555, {}, ['args'], {}]);
expect(subHandler).toBeCalledTimes(1);

// eslint-disable-next-line no-console
await waitUntilPass(() => expect(console.error).toBeCalledTimes(1));
// eslint-disable-next-line no-console
expect(console.error).toBeCalledWith(expect.any(String), errorObj);
});

it('logs an error to console if an async subscription handler throws an error', async () => {
const errorObj = new Error('I never subscribed to this');
const subHandler = jest.fn(async () => { throw errorObj });

const promise = wamp.subscribe('com.some.uri', subHandler);
const request = await transportProvider.transport.read();
expect(request).toEqual([MessageTypes.Subscribe, expect.any(Number), expect.any(Object), 'com.some.uri']);
Expand All @@ -870,9 +912,9 @@ describe(`${Swampyer.prototype.subscribe.name}()`, () => {
expect(subHandler).toBeCalledTimes(1);

// eslint-disable-next-line no-console
expect(console.error).toBeCalledTimes(1);
await waitUntilPass(() => expect(console.error).toBeCalledTimes(1));
// eslint-disable-next-line no-console
expect(console.error).toBeCalledWith(expect.any(String), expect.any(Error));
expect(console.error).toBeCalledWith(expect.any(String), errorObj);
});
});

Expand Down Expand Up @@ -916,41 +958,96 @@ describe(`${Swampyer.prototype.publish.name}()`, () => {
describe(`${Swampyer.prototype.unsubscribe.name}()`, () => {
const subId = 1234;

let handler: jest.Mock;
let handler1: jest.Mock;

beforeEach(async () => {
await openWamp();

handler = jest.fn();
const promise = wamp.subscribe('com.some.uri', handler);
handler1 = jest.fn();
const promise = wamp.subscribe('com.some.uri', handler1);
const request = await transportProvider.transport.read();
transportProvider.sendToLib(MessageTypes.Subscribed, [request[1] as number, subId]);
await promise;
});

it('unsubscribes a subscription and the old subscription callback no longer responds to publish events', async () => {
const promise = wamp.unsubscribe(subId);
it('unsubscribes a subscription', async () => {
const promise = wamp.unsubscribe({ id: subId, handler: handler1 });
const request = await transportProvider.transport.read();
expect(request).toEqual([MessageTypes.Unsubscribe, expect.any(Number), subId]);
transportProvider.sendToLib(MessageTypes.Unsubscribed, [request[1] as number]);
await promise;

transportProvider.sendToLib(MessageTypes.Event, [1234, 5555, {}, [97], {}]);
expect(handler1).toBeCalledTimes(0);
});

it('throws an error if the unsubscribe operation fails', async () => {
const promise = wamp.unsubscribe(subId);
const promise = wamp.unsubscribe({ id: subId, handler: handler1 });
const request = await transportProvider.transport.read();
expect(request).toEqual([MessageTypes.Unsubscribe, expect.any(Number), subId]);
transportProvider.sendToLib(MessageTypes.Error, [MessageTypes.Unsubscribe, request[1] as number, {}, 'something bad', [], {}]);
await expect(promise).rejects.toBeInstanceOf(SwampyerOperationError);
});

it('throws an error if a GOODBYE message is received before unsubscribe operation finishes', async () => {
const promise = wamp.unsubscribe(subId);
const promise = wamp.unsubscribe({ id: subId, handler: handler1 });
const request = await transportProvider.transport.read();
expect(request).toEqual([MessageTypes.Unsubscribe, expect.any(Number), subId]);
transportProvider.sendToLib(MessageTypes.Goodbye, [{}, 'com.some.reason']);
await expect(promise).rejects.toBeInstanceOf(ConnectionClosedError);
});

describe('multiple subscription for same subscription ID', () => {
let handler2: jest.Mock;

beforeEach(async () => {
handler2 = jest.fn();
const promise = wamp.subscribe('com.some.uri', handler2);
const request = await transportProvider.transport.read();
transportProvider.sendToLib(MessageTypes.Subscribed, [request[1] as number, subId]);
await promise;
});

it('unsubscribes the given handler but leaves the other handlers subscribed', async () => {
await wamp.unsubscribe({ id: subId, handler: handler1 });

transportProvider.sendToLib(MessageTypes.Event, [1234, 5555, {}, [97], {}]);
expect(handler1).toBeCalledTimes(0);
expect(handler2).toBeCalledTimes(1);
expect(handler2).toBeCalledWith([97], {}, {});
});

it('unsubscribes fully once the last handler is unsubscribed', async () => {
await wamp.unsubscribe({ id: subId, handler: handler1 });

transportProvider.sendToLib(MessageTypes.Event, [1234, 5555, {}, [97], {}]);
expect(handler1).toBeCalledTimes(0);
expect(handler2).toBeCalledTimes(1);
expect(handler2).toBeCalledWith([97], {}, {});

const promise = wamp.unsubscribe({ id: subId, handler: handler2 });
const request = await transportProvider.transport.read();
expect(request).toEqual([MessageTypes.Unsubscribe, expect.any(Number), subId]);
transportProvider.sendToLib(MessageTypes.Unsubscribed, [request[1] as number]);
await promise;

transportProvider.sendToLib(MessageTypes.Event, [1234, 5555, {}, [97], {}]);
expect(handler1).toBeCalledTimes(0);
expect(handler2).toBeCalledTimes(1);
});

it('unsubscribes fully if the argument is provided', async () => {
const promise = wamp.unsubscribe({ id: subId, handler: handler2 }, true);
const request = await transportProvider.transport.read();
expect(request).toEqual([MessageTypes.Unsubscribe, expect.any(Number), subId]);
transportProvider.sendToLib(MessageTypes.Unsubscribed, [request[1] as number]);
await promise;

transportProvider.sendToLib(MessageTypes.Event, [1234, 5555, {}, [97], {}]);
expect(handler1).toBeCalledTimes(0);
expect(handler2).toBeCalledTimes(0);
});
});
});

describe('misc event handling', () => {
Expand Down
54 changes: 35 additions & 19 deletions src/swampyer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ import { AbortError, ConnectionOpenError, ConnectionClosedError, SwampyerError,
import type { Transport, TransportProvider } from './transports/transport';
import {
WampMessage, MessageData, MessageTypes, PublishOptions, RegistrationHandler, SubscriptionHandler, WelcomeDetails,
OpenOptions, RegisterOptions, CallOptions, SubscribeOptions, CloseReason, CloseDetails, CloseEventData, AutoReconnectionOpenOptions
OpenOptions, RegisterOptions, CallOptions, SubscribeOptions, CloseReason, CloseDetails, CloseEventData, AutoReconnectionOpenOptions,
SubscriptionIdentifier
} from './types';
import { generateRandomInt, deferredPromise, SimpleEventEmitter } from './utils';

Expand All @@ -26,7 +27,7 @@ export class Swampyer {
private registrationRequestId = 1;
private unregistrationRequestId = 1;

private subscriptionHandlers: { [subscriptionId: number]: { uri: string; handler: SubscriptionHandler } } = {};
private subscriptionHandlers: { [subscriptionId: number]: { uri: string; handler: SubscriptionHandler }[] } = {};
private registrationHandlers: { [registrationId: number]: { uri: string; handler: RegistrationHandler } } = {};

private onCloseCleanup: (() => void)[] = [];
Expand Down Expand Up @@ -292,7 +293,10 @@ export class Swampyer {
}

/**
* Subscribe to publish events on a given WAMP URI
* Subscribe to publish events on a given WAMP URI.
*
* If a subscription already exists for a given subscription ID then all subscription handlers will
* get called when an event occurs on the subscription ID.
*
* @param uri The URI to subscribe to
*
Expand All @@ -305,25 +309,37 @@ export class Swampyer {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
async subscribe<A extends any[] = any, K = any>(
uri: string, handler: SubscriptionHandler<A, K>, options: SubscribeOptions = {}
): Promise<number> {
): Promise<SubscriptionIdentifier> {
this.throwIfNotOpen();
const fullUri = options.withoutUriBase ? uri : this.getFullUri(uri);
const requestId = generateRandomInt();
const [, subscriptionId] = await this.sendRequest(MessageTypes.Subscribe, [requestId, options, fullUri], MessageTypes.Subscribed);
this.subscriptionHandlers[subscriptionId] = { uri, handler };
return subscriptionId;
this.subscriptionHandlers[subscriptionId] = (this.subscriptionHandlers[subscriptionId] || []).concat({ uri, handler });
return {
id: subscriptionId,
handler
};
}

/**
* Unsubscribe from an existing subscription
* Unsubscribe existing subscriptions given a subscription ID and handler function.
*
* @param registrationId The subscription ID returned by {@link subscribe subscribe()}
* @param subscriptionData The subscription data returned by {@link subscribe subscribe()}
* @param unsubscribeAll Multiple subscriptions can have the same ID if the same client subscribes
* to the same URI. Set this to `true` if you would like to unsubscribe all existing subscriptions
* for this client for the given subscription ID.
*/
async unsubscribe(subscriptionId: number): Promise<void> {
async unsubscribe(subscriptionData: SubscriptionIdentifier, unsubscribeAll?: boolean): Promise<void> {
this.throwIfNotOpen();
const requestId = generateRandomInt();
await this.sendRequest(MessageTypes.Unsubscribe, [requestId, subscriptionId], MessageTypes.Unsubscribed);
delete this.subscriptionHandlers[subscriptionId];
if (unsubscribeAll || this.subscriptionHandlers[subscriptionData.id]?.length === 1) {
const requestId = generateRandomInt();
await this.sendRequest(MessageTypes.Unsubscribe, [requestId, subscriptionData.id], MessageTypes.Unsubscribed);
delete this.subscriptionHandlers[subscriptionData.id];
} else if (this.subscriptionHandlers[subscriptionData.id]) {
this.subscriptionHandlers[subscriptionData.id] = this.subscriptionHandlers[subscriptionData.id].filter(
({ handler }) => handler !== subscriptionData.handler
);
}
}

/**
Expand Down Expand Up @@ -389,13 +405,13 @@ export class Swampyer {
switch (messageType) {
case MessageTypes.Event: {
const [subscriptionId, , details, args, kwargs] = data as MessageData[MessageTypes.Event];
const { uri, handler } = this.subscriptionHandlers[subscriptionId] || {};
try {
handler?.(args ?? [], kwargs ?? {}, details ?? {});
} catch (e) {
// eslint-disable-next-line no-console
console.error(`An unhandled error occurred while running subscription handler for "${uri}"`, e);
}
(this.subscriptionHandlers[subscriptionId] || []).forEach(({ uri, handler }) => {
Promise.resolve((async () => handler(args ?? [], kwargs ?? {}, details ?? {}))())
.catch(e => {
// eslint-disable-next-line no-console
console.error(`An unhandled error occurred while running subscription handler for "${uri}"`, e);
});
});
break;
}
case MessageTypes.Invocation: {
Expand Down
5 changes: 5 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -190,3 +190,8 @@ export interface PublishOptions extends CommonOptions {
export type RegistrationHandler<R = any, A extends any[] = any, K = any> = (args: A, kwargs: K, details: InvocationDetails) => R;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export type SubscriptionHandler<A = any, K = any> = (args: A, kwargs: K, details: EventDetails) => void;

export interface SubscriptionIdentifier {
id: number;
handler: SubscriptionHandler;
}

0 comments on commit f5d7125

Please sign in to comment.