From d53af84d0b46fc0a4f8ad98df750b3221f50f83e Mon Sep 17 00:00:00 2001 From: Lenny Date: Thu, 18 Apr 2024 18:15:57 +0200 Subject: [PATCH] Add optional sctpStreamId to SCTP-based data consumers --- node/src/DataConsumer.ts | 6 ++++ node/src/Transport.ts | 44 +++++++++++++++++------------- node/src/test/test-DataConsumer.ts | 14 ++++++++++ 3 files changed, 45 insertions(+), 19 deletions(-) diff --git a/node/src/DataConsumer.ts b/node/src/DataConsumer.ts index d0ff2ace7c..d3cc50d398 100644 --- a/node/src/DataConsumer.ts +++ b/node/src/DataConsumer.ts @@ -45,6 +45,12 @@ export type DataConsumerOptions = */ maxRetransmits?: number; + /** + * Just if consuming over SCTP. + * SCTP stream id. If not provided defaults to an arbitrary available stream ID. + */ + sctpStreamId?: number; + /** * Whether the data consumer must start in paused mode. Default false. */ diff --git a/node/src/Transport.ts b/node/src/Transport.ts index 30812e6dfa..b44dc33394 100644 --- a/node/src/Transport.ts +++ b/node/src/Transport.ts @@ -1091,6 +1091,7 @@ export class Transport< ordered, maxPacketLifeTime, maxRetransmits, + sctpStreamId, paused = false, subchannels, appData, @@ -1113,7 +1114,6 @@ export class Transport< let type: DataConsumerType; let sctpStreamParameters: SctpStreamParameters | undefined; - let sctpStreamId: number; // If this is not a DirectTransport, use sctpStreamParameters from the // DataProducer (if type 'sctp') unless they are given in method parameters. @@ -1139,10 +1139,9 @@ export class Transport< } // This may throw. - sctpStreamId = this.getNextSctpStreamId(); - - this.#sctpStreamIds![sctpStreamId] = 1; - sctpStreamParameters.streamId = sctpStreamId; + sctpStreamParameters.streamId = this.getNextSctpStreamId(sctpStreamId); + ortc.validateSctpStreamParameters(sctpStreamParameters); + this.#sctpStreamIds![sctpStreamParameters.streamId] = 1; } // If this is a DirectTransport, sctpStreamParameters must not be used. else { @@ -1151,10 +1150,11 @@ export class Transport< if ( ordered !== undefined || maxPacketLifeTime !== undefined || - maxRetransmits !== undefined + maxRetransmits !== undefined || + sctpStreamId !== undefined ) { logger.warn( - 'consumeData() | ordered, maxPacketLifeTime and maxRetransmits are ignored when consuming data on a DirectTransport' + 'consumeData() | ordered, maxPacketLifeTime, maxRetransmits and sctpStreamId are ignored when consuming data on a DirectTransport' ); } } @@ -1213,14 +1213,14 @@ export class Transport< this.dataConsumers.delete(dataConsumer.id); if (this.#sctpStreamIds) { - this.#sctpStreamIds[sctpStreamId] = 0; + this.#sctpStreamIds[sctpStreamParameters!.streamId] = 0; } }); dataConsumer.on('@dataproducerclose', () => { this.dataConsumers.delete(dataConsumer.id); if (this.#sctpStreamIds) { - this.#sctpStreamIds[sctpStreamId] = 0; + this.#sctpStreamIds[sctpStreamParameters!.streamId] = 0; } }); @@ -1266,7 +1266,7 @@ export class Transport< ); } - private getNextSctpStreamId(): number { + private getNextSctpStreamId(sctpStreamId?: number): number { if ( !this.#data.sctpParameters || typeof this.#data.sctpParameters.MIS !== 'number' @@ -1280,20 +1280,26 @@ export class Transport< this.#sctpStreamIds = Buffer.alloc(numStreams, 0); } - let sctpStreamId; - - for (let idx = 0; idx < this.#sctpStreamIds.length; ++idx) { - sctpStreamId = - (this.#nextSctpStreamId + idx) % this.#sctpStreamIds.length; + if (sctpStreamId === undefined) { + for (let idx = 0; idx < this.#sctpStreamIds.length; ++idx) { + sctpStreamId = + (this.#nextSctpStreamId + idx) % this.#sctpStreamIds.length; - if (!this.#sctpStreamIds[sctpStreamId]) { - this.#nextSctpStreamId = sctpStreamId + 1; + if (!this.#sctpStreamIds[sctpStreamId]) { + this.#nextSctpStreamId = sctpStreamId + 1; - return sctpStreamId; + return sctpStreamId; + } } + + throw new Error('no sctpStreamId available'); + } else if (sctpStreamId >= this.#sctpStreamIds.length) { + throw new Error('invalid sctpStreamId'); + } else if (this.#sctpStreamIds[sctpStreamId]) { + throw new Error('sctpStreamId already assigned'); } - throw new Error('no sctpStreamId available'); + return sctpStreamId; } } diff --git a/node/src/test/test-DataConsumer.ts b/node/src/test/test-DataConsumer.ts index e51a718095..418ace4ac9 100644 --- a/node/src/test/test-DataConsumer.ts +++ b/node/src/test/test-DataConsumer.ts @@ -107,6 +107,20 @@ test('transport.consumeData() succeeds', async () => { }); }, 2000); +test('transport.consumeData() with already used sctpStreamId rejects with Error', async () => { + await ctx.webRtcTransport2!.consumeData({ + dataProducerId: ctx.dataProducer!.id, + sctpStreamId: 123, + }); + + await expect( + ctx.webRtcTransport2!.consumeData({ + dataProducerId: ctx.dataProducer!.id, + sctpStreamId: 123, + }) + ).rejects.toThrow(Error); +}, 2000); + test('dataConsumer.dump() succeeds', async () => { const dataConsumer = await ctx.webRtcTransport2!.consumeData({ dataProducerId: ctx.dataProducer!.id,