diff --git a/.github/workflows/mediasoup-worker-prebuild.yaml b/.github/workflows/mediasoup-worker-prebuild.yaml index 762361367b..203810a3d6 100644 --- a/.github/workflows/mediasoup-worker-prebuild.yaml +++ b/.github/workflows/mediasoup-worker-prebuild.yaml @@ -67,4 +67,4 @@ jobs: - name: Upload mediasoup-worker prebuilt binary uses: softprops/action-gh-release@v1 with: - files: worker/prebuild/mediasoup-worker-*.tgz + files: worker/prebuild/libmediasoup-worker-*.tgz diff --git a/.gitignore b/.gitignore index e2f65fca29..3ff697e6e7 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,9 @@ # flatc generated files. /node/src/fbs +## Node WorkerChannel addon. +/node/src/workerChannel/build + ## Rust. /rust/examples-frontend/*/node_modules /rust/examples-frontend/*/package-lock.json diff --git a/node/src/Channel.ts b/node/src/Channel.ts index a2fa6b3a6c..9eee23b0fe 100644 --- a/node/src/Channel.ts +++ b/node/src/Channel.ts @@ -1,7 +1,7 @@ import * as os from 'node:os'; -import { Duplex } from 'node:stream'; import { info, warn } from 'node:console'; import * as flatbuffers from 'flatbuffers'; +import { WorkerChannel } from './workerChannel/src'; import { Logger } from './Logger'; import { EnhancedEventEmitter } from './enhancedEvents'; import { InvalidStateError } from './errors'; @@ -35,11 +35,8 @@ export class Channel extends EnhancedEventEmitter { // Closed flag. #closed = false; - // Unix Socket instance for sending messages to the worker process. - readonly #producerSocket: Duplex; - - // Unix Socket instance for receiving messages to the worker process. - readonly #consumerSocket: Duplex; + // WorkerChannel for sending and receiving messages to/from worker. + readonly #workerChannel: WorkerChannel; // Next id for messages sent to the worker process. #nextId = 0; @@ -57,23 +54,20 @@ export class Channel extends EnhancedEventEmitter { * @private */ constructor({ - producerSocket, - consumerSocket, + workerChannel, pid, }: { - producerSocket: any; - consumerSocket: any; + workerChannel: WorkerChannel; pid: number; }) { super(); logger.debug('constructor()'); - this.#producerSocket = producerSocket as Duplex; - this.#consumerSocket = consumerSocket as Duplex; + this.#workerChannel = workerChannel; // Read Channel responses/notifications from the worker. - this.#consumerSocket.on('data', (buffer: Buffer) => { + this.#workerChannel.on('data', (buffer: Buffer) => { if (!this.#recvBuffer.length) { this.#recvBuffer = buffer; } else { @@ -176,22 +170,6 @@ export class Channel extends EnhancedEventEmitter { this.#recvBuffer = this.#recvBuffer.slice(msgStart); } }); - - this.#consumerSocket.on('end', () => - logger.debug('Consumer Channel ended by the worker process') - ); - - this.#consumerSocket.on('error', error => - logger.error(`Consumer Channel error: ${error}`) - ); - - this.#producerSocket.on('end', () => - logger.debug('Producer Channel ended by the worker process') - ); - - this.#producerSocket.on('error', error => - logger.error(`Producer Channel error: ${error}`) - ); } /** @@ -217,24 +195,6 @@ export class Channel extends EnhancedEventEmitter { for (const sent of this.#sents.values()) { sent.close(); } - - // Remove event listeners but leave a fake 'error' hander to avoid - // propagation. - this.#consumerSocket.removeAllListeners('end'); - this.#consumerSocket.removeAllListeners('error'); - this.#consumerSocket.on('error', () => {}); - - this.#producerSocket.removeAllListeners('end'); - this.#producerSocket.removeAllListeners('error'); - this.#producerSocket.on('error', () => {}); - - // Destroy the sockets. - try { - this.#producerSocket.destroy(); - } catch (error) {} - try { - this.#consumerSocket.destroy(); - } catch (error) {} } /** @@ -282,14 +242,14 @@ export class Channel extends EnhancedEventEmitter { notificationOffset ); - // Finalizes the buffer and adds a 4 byte prefix with the size of the buffer. - this.#bufferBuilder.finishSizePrefixed(messageOffset); + // Finalizes the buffer. + this.#bufferBuilder.finish(messageOffset); - // Create a new buffer with this data so multiple contiguous flatbuffers - // do not point to the builder buffer overriding others info. - const buffer = new Uint8Array(this.#bufferBuilder.asUint8Array()); + // Take a reference of the inner buffer. + const buffer = this.#bufferBuilder.asUint8Array(); // Clear the buffer builder so it's reused for the next request. + // NOTE: This merely resets the buffer offset, it does not clear the content. this.#bufferBuilder.clear(); if (buffer.byteLength > MESSAGE_MAX_LEN) { @@ -298,7 +258,7 @@ export class Channel extends EnhancedEventEmitter { try { // This may throw if closed or remote side ended. - this.#producerSocket.write(buffer, 'binary'); + this.#workerChannel.send(buffer); } catch (error) { logger.warn(`notify() | sending notification failed: ${error}`); @@ -354,14 +314,14 @@ export class Channel extends EnhancedEventEmitter { requestOffset ); - // Finalizes the buffer and adds a 4 byte prefix with the size of the buffer. - this.#bufferBuilder.finishSizePrefixed(messageOffset); + // Finalizes the buffer. + this.#bufferBuilder.finish(messageOffset); - // Create a new buffer with this data so multiple contiguous flatbuffers - // do not point to the builder buffer overriding others info. - const buffer = new Uint8Array(this.#bufferBuilder.asUint8Array()); + // Take a reference of the inner buffer. + const buffer = this.#bufferBuilder.asUint8Array(); // Clear the buffer builder so it's reused for the next request. + // NOTE: This merely resets the buffer offset, it does not clear the content. this.#bufferBuilder.clear(); if (buffer.byteLength > MESSAGE_MAX_LEN) { @@ -369,7 +329,7 @@ export class Channel extends EnhancedEventEmitter { } // This may throw if closed or remote side ended. - this.#producerSocket.write(buffer, 'binary'); + this.#workerChannel.send(buffer); return new Promise((pResolve, pReject) => { const sent: Sent = { diff --git a/node/src/Worker.ts b/node/src/Worker.ts index b69d2c9c29..4a42323532 100644 --- a/node/src/Worker.ts +++ b/node/src/Worker.ts @@ -1,6 +1,6 @@ import * as process from 'node:process'; import * as path from 'node:path'; -import { spawn, ChildProcess } from 'node:child_process'; +import { WorkerChannel } from './workerChannel/src'; import { version } from './'; import { Logger } from './Logger'; import { EnhancedEventEmitter } from './enhancedEvents'; @@ -202,8 +202,6 @@ export type WorkerDump = { }; export type WorkerEvents = { - died: [Error]; - subprocessclose: []; listenererror: [string, Error]; // Private events. '@success': []; @@ -242,16 +240,17 @@ export const workerBin = process.env.MEDIASOUP_WORKER_BIN ); const logger = new Logger('Worker'); -const workerLogger = new Logger('Worker'); export class Worker< WorkerAppData extends AppData = AppData, > extends EnhancedEventEmitter { - // mediasoup-worker child process. - #child: ChildProcess; + /** + * WorkerChannel instance. + */ + #workerChannel: WorkerChannel; // Worker process PID. - readonly #pid: number; + readonly #pid: number = process.pid; // Channel instance. readonly #channel: Channel; @@ -259,12 +258,6 @@ export class Worker< // Closed flag. #closed = false; - // Died dlag. - #died = false; - - // Worker subprocess closed flag. - #subprocessClosed = false; - // Custom app data. #appData: WorkerAppData; @@ -345,157 +338,39 @@ export class Worker< spawnArgs.join(' ') ); - this.#child = spawn( - // command - spawnBin, - // args - spawnArgs, - // options - { - env: { - MEDIASOUP_VERSION: version, - // Let the worker process inherit all environment variables, useful - // if a custom and not in the path GCC is used so the user can set - // LD_LIBRARY_PATH environment variable for runtime. - ...process.env, - }, - - detached: false, - - // fd 0 (stdin) : Just ignore it. - // fd 1 (stdout) : Pipe it for 3rd libraries that log their own stuff. - // fd 2 (stderr) : Same as stdout. - // fd 3 (channel) : Producer Channel fd. - // fd 4 (channel) : Consumer Channel fd. - stdio: ['ignore', 'pipe', 'pipe', 'pipe', 'pipe'], - windowsHide: true, - } - ); - - this.#pid = this.#child.pid!; - - this.#channel = new Channel({ - producerSocket: this.#child.stdio[3], - consumerSocket: this.#child.stdio[4], - pid: this.#pid, - }); - - this.#appData = appData || ({} as WorkerAppData); - - let spawnDone = false; + this.#workerChannel = new WorkerChannel(version, spawnArgs); - // Listen for 'running' notification. - this.#channel.once(String(this.#pid), (event: Event) => { - if (!spawnDone && event === Event.WORKER_RUNNING) { - spawnDone = true; - - logger.debug('worker process running [pid:%s]', this.#pid); + this.#workerChannel.on('error', (code: number) => { + if (code === 42) { + logger.error('worker failed due to wrong settings [pid:%s]', this.#pid); - this.emit('@success'); - } - }); - - this.#child.on('exit', (code, signal) => { - // If killed by ourselves, do nothing. - if (this.#child.killed) { - return; - } - - if (!spawnDone) { - spawnDone = true; - - if (code === 42) { - logger.error( - 'worker process failed due to wrong settings [pid:%s]', - this.#pid - ); - - this.close(); - this.emit('@failure', new TypeError('wrong settings')); - } else { - logger.error( - 'worker process failed unexpectedly [pid:%s, code:%s, signal:%s]', - this.#pid, - code, - signal - ); - - this.close(); - this.emit( - '@failure', - new Error(`[pid:${this.#pid}, code:${code}, signal:${signal}]`) - ); - } + this.emit('@failure', new TypeError('wrong settings')); } else { logger.error( - 'worker process died unexpectedly [pid:%s, code:%s, signal:%s]', + 'worker failed unexpectedly [pid:%s, code:%s]', this.#pid, - code, - signal - ); - - this.workerDied( - new Error(`[pid:${this.#pid}, code:${code}, signal:${signal}]`) + code ); - } - }); - this.#child.on('error', error => { - // If killed by ourselves, do nothing. - if (this.#child.killed) { - return; + this.emit('@failure', new Error(`[pid:${this.#pid}, code:${code}]`)); } - if (!spawnDone) { - spawnDone = true; - - logger.error( - 'worker process failed [pid:%s]: %s', - this.#pid, - error.message - ); - - this.close(); - this.emit('@failure', error); - } else { - logger.error( - 'worker process error [pid:%s]: %s', - this.#pid, - error.message - ); - - this.workerDied(error); - } + this.close(); }); - this.#child.on('close', (code, signal) => { - logger.debug( - 'worker subprocess closed [pid:%s, code:%s, signal:%s]', - this.#pid, - code, - signal - ); - - this.#subprocessClosed = true; - - this.safeEmit('subprocessclose'); + this.#channel = new Channel({ + workerChannel: this.#workerChannel, + pid: process.pid, }); - // Be ready for 3rd party worker libraries logging to stdout. - this.#child.stdout!.on('data', buffer => { - for (const line of buffer.toString('utf8').split('\n')) { - if (line) { - workerLogger.debug(`(stdout) ${line}`); - } - } - }); + this.#appData = appData || ({} as WorkerAppData); + + // Listen for 'running' notification. + this.#channel.once(String(process.pid), (event: Event) => { + if (event === Event.WORKER_RUNNING) { + logger.debug('worker process running [pid:%s]', this.#pid); - // In case of a worker bug, mediasoup will log to stderr. - this.#child.stderr!.on('data', buffer => { - for (const line of buffer.toString('utf8').split('\n')) { - if (line) { - workerLogger.error(`(stderr) ${line}`); - } + this.emit('@success'); } }); } @@ -514,20 +389,6 @@ export class Worker< return this.#closed; } - /** - * Whether the Worker died. - */ - get died(): boolean { - return this.#died; - } - - /** - * Whether the Worker subprocess is closed. - */ - get subprocessClosed(): boolean { - return this.#subprocessClosed; - } - /** * App custom data. */ @@ -577,12 +438,14 @@ export class Worker< this.#closed = true; - // Kill the worker process. - this.#child.kill('SIGTERM'); + this.#channel.request(FbsRequest.Method.WORKER_CLOSE).catch(() => {}); // Close the Channel instance. this.#channel.close(); + // Close the WorkerChannel instance. + this.#workerChannel.close(); + // Close every Router. for (const router of this.#routers) { router.workerClosed(); @@ -794,37 +657,6 @@ export class Worker< return router; } - - private workerDied(error: Error): void { - if (this.#closed) { - return; - } - - logger.debug(`died() [error:${error}]`); - - this.#closed = true; - this.#died = true; - - // Close the Channel instance. - this.#channel.close(); - - // Close every Router. - for (const router of this.#routers) { - router.workerClosed(); - } - this.#routers.clear(); - - // Close every WebRtcServer. - for (const webRtcServer of this.#webRtcServers) { - webRtcServer.workerClosed(); - } - this.#webRtcServers.clear(); - - this.safeEmit('died', error); - - // Emit observer event. - this.#observer.safeEmit('close'); - } } export function parseWorkerDumpResponse( diff --git a/node/src/test/test-ActiveSpeakerObserver.ts b/node/src/test/test-ActiveSpeakerObserver.ts index f957a727c2..973457f3cc 100644 --- a/node/src/test/test-ActiveSpeakerObserver.ts +++ b/node/src/test/test-ActiveSpeakerObserver.ts @@ -1,6 +1,6 @@ import * as mediasoup from '../'; import { enhancedOnce } from '../enhancedEvents'; -import { WorkerEvents, ActiveSpeakerObserverEvents } from '../types'; +import { ActiveSpeakerObserverEvents } from '../types'; import * as utils from '../utils'; type TestContext = { @@ -31,10 +31,6 @@ beforeEach(async () => { afterEach(async () => { ctx.worker?.close(); - - if (ctx.worker?.subprocessClosed === false) { - await enhancedOnce(ctx.worker, 'subprocessclose'); - } }); test('router.createActiveSpeakerObserver() succeeds', async () => { diff --git a/node/src/test/test-AudioLevelObserver.ts b/node/src/test/test-AudioLevelObserver.ts index f6d6f61723..7f9df88665 100644 --- a/node/src/test/test-AudioLevelObserver.ts +++ b/node/src/test/test-AudioLevelObserver.ts @@ -1,6 +1,6 @@ import * as mediasoup from '../'; import { enhancedOnce } from '../enhancedEvents'; -import { WorkerEvents, AudioLevelObserverEvents } from '../types'; +import { AudioLevelObserverEvents } from '../types'; import * as utils from '../utils'; type TestContext = { @@ -31,10 +31,6 @@ beforeEach(async () => { afterEach(async () => { ctx.worker?.close(); - - if (ctx.worker?.subprocessClosed === false) { - await enhancedOnce(ctx.worker, 'subprocessclose'); - } }); test('router.createAudioLevelObserver() succeeds', async () => { diff --git a/node/src/test/test-Consumer.ts b/node/src/test/test-Consumer.ts index 436145fbc2..2c11710586 100644 --- a/node/src/test/test-Consumer.ts +++ b/node/src/test/test-Consumer.ts @@ -1,7 +1,7 @@ import * as flatbuffers from 'flatbuffers'; import * as mediasoup from '../'; import { enhancedOnce } from '../enhancedEvents'; -import { WorkerEvents, ConsumerEvents } from '../types'; +import { ConsumerEvents } from '../types'; import { UnsupportedError } from '../errors'; import * as utils from '../utils'; import { @@ -247,10 +247,6 @@ beforeEach(async () => { afterEach(async () => { ctx.worker?.close(); - - if (ctx.worker?.subprocessClosed === false) { - await enhancedOnce(ctx.worker, 'subprocessclose'); - } }); test('transport.consume() succeeds', async () => { diff --git a/node/src/test/test-DataConsumer.ts b/node/src/test/test-DataConsumer.ts index e51a718095..a85341ecf3 100644 --- a/node/src/test/test-DataConsumer.ts +++ b/node/src/test/test-DataConsumer.ts @@ -1,6 +1,6 @@ import * as mediasoup from '../'; import { enhancedOnce } from '../enhancedEvents'; -import { WorkerEvents, DataConsumerEvents } from '../types'; +import { DataConsumerEvents } from '../types'; import * as utils from '../utils'; type TestContext = { @@ -44,10 +44,6 @@ beforeEach(async () => { afterEach(async () => { ctx.worker?.close(); - - if (ctx.worker?.subprocessClosed === false) { - await enhancedOnce(ctx.worker, 'subprocessclose'); - } }); test('transport.consumeData() succeeds', async () => { diff --git a/node/src/test/test-DataProducer.ts b/node/src/test/test-DataProducer.ts index ad1457caea..ae151e389c 100644 --- a/node/src/test/test-DataProducer.ts +++ b/node/src/test/test-DataProducer.ts @@ -1,6 +1,6 @@ import * as mediasoup from '../'; import { enhancedOnce } from '../enhancedEvents'; -import { WorkerEvents, DataProducerEvents } from '../types'; +import { DataProducerEvents } from '../types'; import * as utils from '../utils'; type TestContext = { @@ -48,10 +48,6 @@ beforeEach(async () => { afterEach(async () => { ctx.worker?.close(); - - if (ctx.worker?.subprocessClosed === false) { - await enhancedOnce(ctx.worker, 'subprocessclose'); - } }); test('webRtcTransport1.produceData() succeeds', async () => { diff --git a/node/src/test/test-DirectTransport.ts b/node/src/test/test-DirectTransport.ts index 3b7689acc1..9e2a48fd7d 100644 --- a/node/src/test/test-DirectTransport.ts +++ b/node/src/test/test-DirectTransport.ts @@ -1,6 +1,6 @@ import * as mediasoup from '../'; import { enhancedOnce } from '../enhancedEvents'; -import { WorkerEvents, DirectTransportEvents } from '../types'; +import { DirectTransportEvents } from '../types'; type TestContext = { worker?: mediasoup.types.Worker; @@ -16,10 +16,6 @@ beforeEach(async () => { afterEach(async () => { ctx.worker?.close(); - - if (ctx.worker?.subprocessClosed === false) { - await enhancedOnce(ctx.worker, 'subprocessclose'); - } }); test('router.createDirectTransport() succeeds', async () => { diff --git a/node/src/test/test-PipeTransport.ts b/node/src/test/test-PipeTransport.ts index 74809b320b..559e16bd46 100644 --- a/node/src/test/test-PipeTransport.ts +++ b/node/src/test/test-PipeTransport.ts @@ -1,7 +1,7 @@ import { pickPort } from 'pick-port'; import * as mediasoup from '../'; import { enhancedOnce } from '../enhancedEvents'; -import { WorkerEvents, ConsumerEvents, DataConsumerEvents } from '../types'; +import { ConsumerEvents, DataConsumerEvents } from '../types'; import * as utils from '../utils'; type TestContext = { @@ -201,14 +201,6 @@ beforeEach(async () => { afterEach(async () => { ctx.worker1?.close(); ctx.worker2?.close(); - - if (ctx.worker1?.subprocessClosed === false) { - await enhancedOnce(ctx.worker1, 'subprocessclose'); - } - - if (ctx.worker2?.subprocessClosed === false) { - await enhancedOnce(ctx.worker2, 'subprocessclose'); - } }); test('router.pipeToRouter() succeeds with audio', async () => { diff --git a/node/src/test/test-PlainTransport.ts b/node/src/test/test-PlainTransport.ts index 31508bbddf..4f82af85d2 100644 --- a/node/src/test/test-PlainTransport.ts +++ b/node/src/test/test-PlainTransport.ts @@ -2,7 +2,7 @@ import * as os from 'node:os'; import { pickPort } from 'pick-port'; import * as mediasoup from '../'; import { enhancedOnce } from '../enhancedEvents'; -import { WorkerEvents, PlainTransportEvents } from '../types'; +import { PlainTransportEvents } from '../types'; import * as utils from '../utils'; const IS_WINDOWS = os.platform() === 'win32'; @@ -52,10 +52,6 @@ beforeEach(async () => { afterEach(async () => { ctx.worker?.close(); - - if (ctx.worker?.subprocessClosed === false) { - await enhancedOnce(ctx.worker, 'subprocessclose'); - } }); test('router.createPlainTransport() succeeds', async () => { diff --git a/node/src/test/test-Producer.ts b/node/src/test/test-Producer.ts index fbf2cef5c1..dde7d1730d 100644 --- a/node/src/test/test-Producer.ts +++ b/node/src/test/test-Producer.ts @@ -1,7 +1,7 @@ import * as flatbuffers from 'flatbuffers'; import * as mediasoup from '../'; import { enhancedOnce } from '../enhancedEvents'; -import { WorkerEvents, ProducerEvents } from '../types'; +import { ProducerEvents } from '../types'; import { UnsupportedError } from '../errors'; import * as utils from '../utils'; import { @@ -148,10 +148,6 @@ beforeEach(async () => { afterEach(async () => { ctx.worker?.close(); - - if (ctx.worker?.subprocessClosed === false) { - await enhancedOnce(ctx.worker, 'subprocessclose'); - } }); test('webRtcTransport1.produce() succeeds', async () => { diff --git a/node/src/test/test-Router.ts b/node/src/test/test-Router.ts index 50602aec32..701cd3b6c3 100644 --- a/node/src/test/test-Router.ts +++ b/node/src/test/test-Router.ts @@ -1,6 +1,6 @@ import * as mediasoup from '../'; import { enhancedOnce } from '../enhancedEvents'; -import { WorkerEvents, RouterEvents } from '../types'; +import { RouterEvents } from '../types'; import { InvalidStateError } from '../errors'; import * as utils from '../utils'; @@ -46,10 +46,6 @@ beforeEach(async () => { afterEach(async () => { ctx.worker?.close(); - - if (ctx.worker?.subprocessClosed === false) { - await enhancedOnce(ctx.worker, 'subprocessclose'); - } }); test('worker.createRouter() succeeds', async () => { diff --git a/node/src/test/test-WebRtcServer.ts b/node/src/test/test-WebRtcServer.ts index 8297941723..76563d5905 100644 --- a/node/src/test/test-WebRtcServer.ts +++ b/node/src/test/test-WebRtcServer.ts @@ -1,7 +1,7 @@ import { pickPort } from 'pick-port'; import * as mediasoup from '../'; import { enhancedOnce } from '../enhancedEvents'; -import { WorkerEvents, WebRtcServerEvents } from '../types'; +import { WebRtcServerEvents } from '../types'; import { InvalidStateError } from '../errors'; type TestContext = { @@ -16,10 +16,6 @@ beforeEach(async () => { afterEach(async () => { ctx.worker?.close(); - - if (ctx.worker?.subprocessClosed === false) { - await enhancedOnce(ctx.worker, 'subprocessclose'); - } }); test('worker.createWebRtcServer() succeeds', async () => { diff --git a/node/src/test/test-WebRtcTransport.ts b/node/src/test/test-WebRtcTransport.ts index 601073d209..13ba4a68d5 100644 --- a/node/src/test/test-WebRtcTransport.ts +++ b/node/src/test/test-WebRtcTransport.ts @@ -2,7 +2,7 @@ import { pickPort } from 'pick-port'; import * as flatbuffers from 'flatbuffers'; import * as mediasoup from '../'; import { enhancedOnce } from '../enhancedEvents'; -import { WorkerEvents, WebRtcTransportEvents } from '../types'; +import { WebRtcTransportEvents } from '../types'; import * as utils from '../utils'; import { serializeProtocol, TransportTuple } from '../Transport'; import { @@ -57,10 +57,6 @@ beforeEach(async () => { afterEach(async () => { ctx.worker?.close(); - - if (ctx.worker?.subprocessClosed === false) { - await enhancedOnce(ctx.worker, 'subprocessclose'); - } }); test('router.createWebRtcTransport() succeeds', async () => { diff --git a/node/src/test/test-Worker.ts b/node/src/test/test-Worker.ts index 6f1b28dedc..04d3cded48 100644 --- a/node/src/test/test-Worker.ts +++ b/node/src/test/test-Worker.ts @@ -1,9 +1,6 @@ -import * as os from 'node:os'; import * as process from 'node:process'; import * as path from 'node:path'; import * as mediasoup from '../'; -import { enhancedOnce } from '../enhancedEvents'; -import { WorkerEvents } from '../types'; import { InvalidStateError } from '../errors'; test('Worker.workerBin matches mediasoup-worker absolute path', () => { @@ -46,14 +43,10 @@ test('createWorker() succeeds', async () => { expect(worker1.constructor.name).toBe('Worker'); expect(typeof worker1.pid).toBe('number'); expect(worker1.closed).toBe(false); - expect(worker1.died).toBe(false); worker1.close(); - await enhancedOnce(worker1, 'subprocessclose'); - expect(worker1.closed).toBe(true); - expect(worker1.died).toBe(false); const worker2 = await mediasoup.createWorker<{ foo: number; bar?: string }>({ logLevel: 'debug', @@ -69,15 +62,11 @@ test('createWorker() succeeds', async () => { expect(worker2.constructor.name).toBe('Worker'); expect(typeof worker2.pid).toBe('number'); expect(worker2.closed).toBe(false); - expect(worker2.died).toBe(false); expect(worker2.appData).toEqual({ foo: 456 }); worker2.close(); - await enhancedOnce(worker2, 'subprocessclose'); - expect(worker2.closed).toBe(true); - expect(worker2.died).toBe(false); }, 2000); test('createWorker() with wrong settings rejects with TypeError', async () => { @@ -117,8 +106,6 @@ test('worker.updateSettings() succeeds', async () => { ).resolves.toBeUndefined(); worker.close(); - - await enhancedOnce(worker, 'subprocessclose'); }, 2000); test('worker.updateSettings() with wrong settings rejects with TypeError', async () => { @@ -130,8 +117,6 @@ test('worker.updateSettings() with wrong settings rejects with TypeError', async ); worker.close(); - - await enhancedOnce(worker, 'subprocessclose'); }, 2000); test('worker.updateSettings() rejects with InvalidStateError if closed', async () => { @@ -139,8 +124,6 @@ test('worker.updateSettings() rejects with InvalidStateError if closed', async ( worker.close(); - await enhancedOnce(worker, 'subprocessclose'); - await expect(worker.updateSettings({ logLevel: 'error' })).rejects.toThrow( InvalidStateError ); @@ -167,8 +150,6 @@ test('worker.dump() rejects with InvalidStateError if closed', async () => { worker.close(); - await enhancedOnce(worker, 'subprocessclose'); - await expect(worker.dump()).rejects.toThrow(InvalidStateError); }, 2000); @@ -178,8 +159,6 @@ test('worker.getResourceUsage() succeeds', async () => { await expect(worker.getResourceUsage()).resolves.toMatchObject({}); worker.close(); - - await enhancedOnce(worker, 'subprocessclose'); }, 2000); test('worker.close() succeeds', async () => { @@ -189,141 +168,6 @@ test('worker.close() succeeds', async () => { worker.observer.once('close', onObserverClose); worker.close(); - await enhancedOnce(worker, 'subprocessclose'); - expect(onObserverClose).toHaveBeenCalledTimes(1); expect(worker.closed).toBe(true); - expect(worker.died).toBe(false); }, 2000); - -test('Worker emits "died" if worker process died unexpectedly', async () => { - let onDied: ReturnType; - let onObserverClose: ReturnType; - - const worker1 = await mediasoup.createWorker({ logLevel: 'warn' }); - - onDied = jest.fn(); - onObserverClose = jest.fn(); - - worker1.observer.once('close', onObserverClose); - - await new Promise((resolve, reject) => { - worker1.on('died', () => { - onDied(); - - if (onObserverClose.mock.calls.length > 0) { - reject( - new Error('observer "close" event emitted before worker "died" event') - ); - } else if (worker1.closed) { - resolve(); - } else { - reject(new Error('worker.closed is false')); - } - }); - - process.kill(worker1.pid, 'SIGINT'); - }); - - if (!worker1.subprocessClosed) { - await enhancedOnce(worker1, 'subprocessclose'); - } - - expect(onDied).toHaveBeenCalledTimes(1); - expect(onObserverClose).toHaveBeenCalledTimes(1); - expect(worker1.closed).toBe(true); - expect(worker1.died).toBe(true); - - const worker2 = await mediasoup.createWorker({ logLevel: 'warn' }); - - onDied = jest.fn(); - onObserverClose = jest.fn(); - - worker2.observer.once('close', onObserverClose); - - await new Promise((resolve, reject) => { - worker2.on('died', () => { - onDied(); - - if (onObserverClose.mock.calls.length > 0) { - reject( - new Error('observer "close" event emitted before worker "died" event') - ); - } else if (worker2.closed) { - resolve(); - } else { - reject(new Error('worker.closed is false')); - } - }); - - process.kill(worker2.pid, 'SIGTERM'); - }); - - if (!worker2.subprocessClosed) { - await enhancedOnce(worker2, 'subprocessclose'); - } - - expect(onDied).toHaveBeenCalledTimes(1); - expect(onObserverClose).toHaveBeenCalledTimes(1); - expect(worker2.closed).toBe(true); - expect(worker2.died).toBe(true); - - const worker3 = await mediasoup.createWorker({ logLevel: 'warn' }); - - onDied = jest.fn(); - onObserverClose = jest.fn(); - - worker3.observer.once('close', onObserverClose); - - await new Promise((resolve, reject) => { - worker3.on('died', () => { - onDied(); - - if (onObserverClose.mock.calls.length > 0) { - reject( - new Error('observer "close" event emitted before worker "died" event') - ); - } else if (worker3.closed) { - resolve(); - } else { - reject(new Error('worker.closed is false')); - } - }); - - process.kill(worker3.pid, 'SIGKILL'); - }); - - if (!worker3.subprocessClosed) { - await enhancedOnce(worker3, 'subprocessclose'); - } - - expect(onDied).toHaveBeenCalledTimes(1); - expect(onObserverClose).toHaveBeenCalledTimes(1); - expect(worker3.closed).toBe(true); - expect(worker3.died).toBe(true); -}, 5000); - -// Windows doesn't have some signals such as SIGPIPE, SIGALRM, SIGUSR1, SIGUSR2 -// so we just skip this test in Windows. -if (os.platform() !== 'win32') { - test('worker process ignores PIPE, HUP, ALRM, USR1 and USR2 signals', async () => { - const worker = await mediasoup.createWorker({ logLevel: 'warn' }); - - await new Promise((resolve, reject) => { - worker.on('died', reject); - - process.kill(worker.pid, 'SIGPIPE'); - process.kill(worker.pid, 'SIGHUP'); - process.kill(worker.pid, 'SIGALRM'); - process.kill(worker.pid, 'SIGUSR1'); - process.kill(worker.pid, 'SIGUSR2'); - - setTimeout(() => { - expect(worker.closed).toBe(false); - - worker.close(); - worker.on('subprocessclose', resolve); - }, 2000); - }); - }, 3000); -} diff --git a/node/src/test/test-multiopus.ts b/node/src/test/test-multiopus.ts index d6542fb391..ecade44bc6 100644 --- a/node/src/test/test-multiopus.ts +++ b/node/src/test/test-multiopus.ts @@ -1,6 +1,4 @@ import * as mediasoup from '../'; -import { enhancedOnce } from '../enhancedEvents'; -import { WorkerEvents } from '../types'; import { UnsupportedError } from '../errors'; import * as utils from '../utils'; @@ -109,10 +107,6 @@ beforeEach(async () => { afterEach(async () => { ctx.worker?.close(); - - if (ctx.worker?.subprocessClosed === false) { - await enhancedOnce(ctx.worker, 'subprocessclose'); - } }); test('produce() and consume() succeed', async () => { diff --git a/node/src/test/test-node-sctp.ts b/node/src/test/test-node-sctp.ts index 46b2f93036..94be5732e5 100644 --- a/node/src/test/test-node-sctp.ts +++ b/node/src/test/test-node-sctp.ts @@ -3,7 +3,6 @@ import * as dgram from 'node:dgram'; import * as sctp from 'sctp'; import * as mediasoup from '../'; import { enhancedOnce } from '../enhancedEvents'; -import { WorkerEvents } from '../types'; type TestContext = { worker?: mediasoup.types.Worker; @@ -101,10 +100,6 @@ afterEach(async () => { ctx.sctpSocket?.end(); ctx.worker?.close(); - if (ctx.worker?.subprocessClosed === false) { - await enhancedOnce(ctx.worker, 'subprocessclose'); - } - // NOTE: For some reason we have to wait a bit for the SCTP stuff to release // internal things, otherwise Jest reports open handles. We don't care much // honestly. diff --git a/node/src/workerChannel/binding.gyp b/node/src/workerChannel/binding.gyp new file mode 100644 index 0000000000..3f7b7ab452 --- /dev/null +++ b/node/src/workerChannel/binding.gyp @@ -0,0 +1,71 @@ +{ + 'variables': { + 'mediasoup_build_type%': 'Release', + 'mediasoup_worker_lib%': '' + }, + "targets": [ + { + 'target_name': 'worker-channel', + 'sources': [ + 'src/binding.cpp', + 'src/workerChannel.cpp' + ], + 'cflags!': [ '-fno-exceptions' ], + 'cflags_cc!': [ '-fno-exceptions' ], + 'include_dirs': [ + " +#include +#include +#include + +class WorkerChannel : public Napi::ObjectWrap +{ +public: + static Napi::Object Init(Napi::Env env, Napi::Object exports); + WorkerChannel(const Napi::CallbackInfo& info); + ~WorkerChannel(); + + // Called when the wrapped native instance is freed. + void Finalize(Napi::Env env); + ChannelReadFreeFn OnChannelRead(uint8_t** message, uint32_t* messageLen, const uv_async_t* handle); + void OnChannelWrite(const uint8_t* message, uint32_t messageLen); + void OnError(const uint8_t code); + +private: + static Napi::FunctionReference constructor; + std::thread thread; + Napi::ThreadSafeFunction emit; + + const uv_async_t* handle{ nullptr }; + std::mutex mutex; + std::deque messages; + + void Send(const Napi::CallbackInfo& info); +}; diff --git a/node/src/workerChannel/scripts.mjs b/node/src/workerChannel/scripts.mjs new file mode 100644 index 0000000000..a0e0ae7435 --- /dev/null +++ b/node/src/workerChannel/scripts.mjs @@ -0,0 +1,90 @@ +import * as process from 'node:process'; +import { execSync } from 'node:child_process'; + +const task = process.argv[2]; +const args = process.argv.slice(3).join(' '); + +run(); + +async function run() { + logInfo(args ? `[args:"${args}"]` : ''); + + switch (task) { + case 'binding:build': { + logInfo(`binding:build: [args:"${args}"]`); + buildBinding(); + + break; + } + + case 'test': { + testNode(); + + break; + } + + default: { + logError('unknown task'); + + exitWithError(); + } + } +} + +function buildBinding() { + logInfo('buildBinding()'); + + const buildType = process.env.MEDIASOUP_BUILDTYPE || 'Release'; + + process.env.GYP_DEFINES = `mediasoup_build_type=${buildType}`; + + if (process.env.MEDIASOUP_WORKER_BIN) { + process.env.GYP_DEFINES += ` mediasoup_worker_lib=${process.env.MEDIASOUP_WORKER_LIB}`; + } + + executeCmd(`node-gyp rebuild --${buildType.toLowerCase()} --verbose`); +} + +function testNode() { + logInfo('testNode()'); + + executeCmd('node lib/test.js'); +} + +function executeCmd(command, exitOnError = true) { + logInfo(`executeCmd(): ${command}`); + + try { + execSync(command, { + stdio: ['ignore', process.stdout, process.stderr], + }); + } catch (error) { + if (exitOnError) { + logError(`executeCmd() failed, exiting: ${error}`); + + exitWithError(); + } else { + logInfo(`executeCmd() failed, ignoring: ${error}`); + } + } +} + +function logInfo(message) { + // eslint-disable-next-line no-console + console.log(`npm-scripts.mjs \x1b[36m[INFO] [${task}]\x1b[0m`, message); +} + +// eslint-disable-next-line no-unused-vars +function logWarn(message) { + // eslint-disable-next-line no-console + console.warn(`npm-scripts.mjs \x1b[33m[WARN] [${task}]\x1b\0m`, message); +} + +function logError(message) { + // eslint-disable-next-line no-console + console.error(`npm-scripts.mjs \x1b[31m[ERROR] [${task}]\x1b[0m`, message); +} + +function exitWithError() { + process.exit(1); +} diff --git a/node/src/workerChannel/src/binding.cpp b/node/src/workerChannel/src/binding.cpp new file mode 100644 index 0000000000..ea1f46724d --- /dev/null +++ b/node/src/workerChannel/src/binding.cpp @@ -0,0 +1,13 @@ +#include + +#include "../include/workerChannel.hpp" + +// Initialize native add-on. +Napi::Object Init(Napi::Env env, Napi::Object exports) +{ + WorkerChannel::Init(env, exports); + return exports; +} + +// Register and initialize native add-on. +NODE_API_MODULE(NODE_GYP_MODULE_NAME, Init); diff --git a/node/src/workerChannel/src/index.ts b/node/src/workerChannel/src/index.ts new file mode 100644 index 0000000000..f894f8992a --- /dev/null +++ b/node/src/workerChannel/src/index.ts @@ -0,0 +1,54 @@ +import { EventEmitter } from 'events'; +import { EnhancedEventEmitter } from '../../enhancedEvents'; + +const buildType = process.env.MEDIASOUP_BUILDTYPE ?? 'Release'; + +/** + * NOTE: The following path is reachable from: + * - current file: node/src/workerChannel/src/index.ts + * - transpiled JS file: node/lib/workerChannel/src/index.js + */ +// eslint-disable-next-line @typescript-eslint/no-var-requires +const { WorkerChannel: NativeWorkerChannel } = require( + `../../../src/workerChannel/build/${buildType}/worker-channel.node` +); + +export type WorkerChannelEvents = { + data: [Buffer]; + error: [number]; +}; + +export class WorkerChannel extends EnhancedEventEmitter { + private emitter: EventEmitter; + private workerChannel: typeof NativeWorkerChannel; + + constructor(version: string, args: string[]) { + super(); + + this.emitter = new EventEmitter(); + this.workerChannel = new NativeWorkerChannel( + this.emitter.emit.bind(this.emitter), + version, + args + ); + + this.emitter.on('data', data => { + this.safeEmit('data', data); + }); + + this.emitter.on('error', code => { + this.safeEmit('error', code); + }); + } + + close() { + // By setting the instance to `undefined`, the garbage collector will clean up + // the native instance, calling its `Finalize()` method accordingly. + // Without it, the tests will never finish due to remaining open handles. + this.workerChannel = undefined; + } + + send(data: Uint8Array): void { + this.workerChannel.send(data); + } +} diff --git a/node/src/workerChannel/src/test.ts b/node/src/workerChannel/src/test.ts new file mode 100644 index 0000000000..ee0bbbd70c --- /dev/null +++ b/node/src/workerChannel/src/test.ts @@ -0,0 +1,110 @@ +/** + * NOTE: These tests are to be run locally only, if needed. + * When running them within a testing environment (jest|node) the native addon + * handle will remain open and the test process won't terminate, making any + * CI process fail. + */ + +import assert = require('node:assert'); +import { beforeEach, describe, it } from 'node:test'; + +const buildType = process.env.MEDIASOUP_BUILDTYPE ?? 'Release'; + +// eslint-disable-next-line @typescript-eslint/no-var-requires +const { WorkerChannel: NativeWorkerChannel } = require( + `../build/${buildType}/worker-channel.node` +); + +describe('NativeWorkerChannel constructor', () => { + it('fails if no argument is passed', () => { + assert.throws(() => new NativeWorkerChannel(), TypeError); + }); + + it('fails if a single argument is passed', () => { + const func = () => {}; + + assert.throws(() => new NativeWorkerChannel(func), TypeError); + }); + + [true, 1, 'one'].forEach(func => { + it(`fails if the first argument ("${func}") is not a Function`, () => { + const version = 'X'; + + assert.throws(() => new NativeWorkerChannel(func, version), TypeError); + }); + }); + + [true, 1, () => {}].forEach(version => { + it(`fails if the second argument ("${version}") is not a String`, () => { + const func = () => {}; + + assert.throws(() => new NativeWorkerChannel(func, version), TypeError); + }); + }); + + [true, 1, 'one', () => {}].forEach(args => { + it(`fails if the third argument is present ("${args}") and is not an Array`, () => { + const func = () => {}; + const version = 'X'; + + assert.throws( + () => new NativeWorkerChannel(func, version, args), + TypeError + ); + }); + }); + + [true, 1, () => {}].forEach(item => { + it(`fails if the third argument is present and contains a non String item (${item})`, () => { + const func = () => {}; + const version = 'X'; + const args = ['one', 'two', 'three']; + + // @ts-ignore. + args.push(item); + + assert.throws( + () => new NativeWorkerChannel(func, version, args), + TypeError + ); + }); + }); + + it('succeeds if the given arguments are a Function and a String respectively', () => { + const func = () => {}; + const version = 'X'; + + assert.doesNotThrow(() => new NativeWorkerChannel(func, version)); + }); + + it('succeeds if the third argument is an Array of Strings', () => { + const func = () => {}; + const version = 'X'; + const args = ['one', 'two', 'three']; + + assert.doesNotThrow(() => new NativeWorkerChannel(func, version, args)); + }); +}); + +describe('NativeWorkerChannel send', () => { + const version = 'X'; + const func = () => {}; + + let workerChannel: any; + + beforeEach(() => { + workerChannel = new NativeWorkerChannel(func, version); + }); + + [true, 1, 'one', () => {}].forEach(data => { + it(`fails if send() is called with a value (${data}) different than an Uint8Array`, () => { + assert.throws(() => workerChannel.send(data), TypeError); + }); + }); + + it(`succeeds if send() is called with a Uint8Array`, () => { + const data = new Uint8Array(2); + + workerChannel.send(data); + }); +}); diff --git a/node/src/workerChannel/src/workerChannel.cpp b/node/src/workerChannel/src/workerChannel.cpp new file mode 100644 index 0000000000..bb04b947dd --- /dev/null +++ b/node/src/workerChannel/src/workerChannel.cpp @@ -0,0 +1,238 @@ +#include "../include/workerChannel.hpp" +#include "napi.h" +#include +#include + +void deleteMessage(uint8_t* message, uint32_t messageLen, size_t ctx) +{ + delete[] message; +} + +ChannelReadFreeFn channelReadFn( + uint8_t** message, + uint32_t* messageLen, + size_t* messageCtx, + const void* handle, + ChannelReadCtx channelReadCtx) +{ + auto workerChannel = static_cast(channelReadCtx); + const uv_async_t* uvAsyncT = static_cast(handle); + + return workerChannel->OnChannelRead(message, messageLen, uvAsyncT); +} + +void channelWriteFn(const uint8_t* message, uint32_t messageLen, ChannelWriteCtx channelWriteCtx) +{ + auto workerChannel = static_cast(channelWriteCtx); + + return workerChannel->OnChannelWrite(message, messageLen); +} + +void libmediasoup(WorkerChannel* workerChannel, std::string version, std::vector args) +{ + std::vector argv; + + for (auto& arg : args) + { + argv.push_back(arg.data()); + } + + auto result = mediasoup_worker_run( + argv.size(), + argv.data(), + version.data(), + 0, + 0, + channelReadFn, + workerChannel, + channelWriteFn, + workerChannel); + + if (result != 0) + { + workerChannel->OnError(result); + } +} + +Napi::FunctionReference WorkerChannel::constructor; + +Napi::Object WorkerChannel::Init(Napi::Env env, Napi::Object exports) +{ + Napi::Function func = + DefineClass(env, "WorkerChannel", { InstanceMethod("send", &WorkerChannel::Send) }); + + Napi::FunctionReference* constructor = new Napi::FunctionReference(); + + // Create a persistent reference to the class constructor. This will allow + // a function called on a class prototype and a function + // called on instance of a class to be distinguished from each other. + *constructor = Napi::Persistent(func); + + exports.Set("WorkerChannel", func); + + // Store the constructor as the add-on instance data. This will allow this + // add-on to support multiple instances of itself running on multiple worker + // threads, as well as multiple instances of itself running in different + // contexts on the same thread. + // + // By default, the value set on the environment here will be destroyed when + // the add-on is unloaded using the `delete` operator, but it is also + // possible to supply a custom deleter. + env.SetInstanceData(constructor); + + return exports; +} + +WorkerChannel::WorkerChannel(const Napi::CallbackInfo& info) : Napi::ObjectWrap(info) +{ + auto env = info.Env(); + + if (info.Length() < 2) + { + throw Napi::TypeError::New(env, "Expected at least two arguments"); + } + else if (!info[0].IsFunction()) + { + throw Napi::TypeError::New(env, "Expected first arg to be function"); + } + else if (!info[1].IsString()) + { + throw Napi::TypeError::New(env, "Expected second arg to be string"); + } + + auto cb = info[0].As(); + auto version = info[1].As(); + + this->emit = Napi::ThreadSafeFunction::New(env, cb, "WorkerChannel", 0, 1); + + std::vector args = { "" }; + + if (info.Length() == 3) + { + if (!info[2].IsArray()) + { + throw Napi::TypeError::New(env, "Expected third arg to be array"); + } + + auto params = info[2].As(); + + for (uint32_t i = 0; i < params.Length(); i++) + { + Napi::Value v = params[i]; + + if (!v.IsString()) + { + throw Napi::TypeError::New(env, "Expected array item to be string"); + } + + auto value = v.As(); + + args.push_back(value.Utf8Value()); + } + } + + this->thread = std::thread(libmediasoup, this, version.Utf8Value(), args); +} + +WorkerChannel::~WorkerChannel() +{ + std::lock_guard guard(this->mutex); + + for (const auto* message : this->messages) + { + delete[] message; + } +} + +void WorkerChannel::Finalize(Napi::Env env) +{ + this->emit.Release(); + this->thread.join(); +} + +ChannelReadFreeFn WorkerChannel::OnChannelRead( + uint8_t** message, uint32_t* messageLen, const uv_async_t* handle) +{ + if (!this->handle) + { + this->handle = handle; + } + + std::lock_guard guard(this->mutex); + + if (this->messages.size() == 0) + { + return nullptr; + } + + auto* msg = this->messages.front(); + this->messages.pop_front(); + + *message = msg; + + return deleteMessage; +} + +void WorkerChannel::OnChannelWrite(const uint8_t* message, uint32_t messageLen) +{ + auto copy = new uint8_t[messageLen]; + + std::memcpy(copy, message, messageLen); + + auto callback = [copy, messageLen](Napi::Env env, Napi::Function cb) + { + auto data = Napi::Buffer::New( + env, + const_cast(copy), + messageLen, + [](Napi::Env env, void* data) { delete[] (Napi::Buffer*)data; }); + + cb.Call({ Napi::String::New(env, "data"), data }); + }; + + this->emit.NonBlockingCall(callback); +} + +void WorkerChannel::OnError(const uint8_t code) +{ + auto callback = [code](Napi::Env env, Napi::Function cb) + { + auto value = Napi::Number::New(env, code); + + cb.Call({ Napi::String::New(env, "error"), value }); + }; + + this->emit.NonBlockingCall(callback); +} + +void WorkerChannel::Send(const Napi::CallbackInfo& info) +{ + if (info.Length() != 1) + { + throw Napi::TypeError::New(info.Env(), "Expected one argument"); + } + + if (!info[0].IsTypedArray()) + { + throw Napi::TypeError::New(info.Env(), "Expected arg to be a Uint8Array"); + } + + // Copy the message into its own memory. + auto message = info[0].As(); + auto data = new uint8_t[message.ByteLength()]; + + std::memcpy(data, message.Data(), message.ByteLength()); + + { + std::lock_guard guard(this->mutex); + this->messages.push_back(data); + } + + if (!this->handle) + { + return; + } + + // Notify mediasoup about the new message. + uv_async_send(const_cast(this->handle)); +} diff --git a/npm-scripts.mjs b/npm-scripts.mjs index 2bb7081e84..9daac3348c 100644 --- a/npm-scripts.mjs +++ b/npm-scripts.mjs @@ -15,13 +15,12 @@ const MAYOR_VERSION = PKG.version.split('.')[0]; const PYTHON = getPython(); const PIP_INVOKE_DIR = path.resolve('worker/pip_invoke'); const WORKER_RELEASE_DIR = 'worker/out/Release'; -const WORKER_RELEASE_BIN = IS_WINDOWS - ? 'mediasoup-worker.exe' - : 'mediasoup-worker'; +const WORKER_RELEASE_BIN = 'libmediasoup-worker.a'; const WORKER_RELEASE_BIN_PATH = `${WORKER_RELEASE_DIR}/${WORKER_RELEASE_BIN}`; const WORKER_PREBUILD_DIR = 'worker/prebuild'; const WORKER_PREBUILD_TAR = getWorkerPrebuildTarName(); const WORKER_PREBUILD_TAR_PATH = `${WORKER_PREBUILD_DIR}/${WORKER_PREBUILD_TAR}`; +const WORKER_CHANNEL_ADDON_PATH = 'node/src/workerChannel'; const GH_OWNER = 'versatica'; const GH_REPO = 'mediasoup'; @@ -109,7 +108,8 @@ async function run() { 'skipping mediasoup-worker prebuilt download, building it locally' ); - buildWorker(); + buildWorkerLib(); + buildAddon(); if (!process.env.MEDIASOUP_LOCAL_DEV) { cleanWorkerArtifacts(); @@ -121,7 +121,8 @@ async function run() { `couldn't fetch any mediasoup-worker prebuilt binary, building it locally` ); - buildWorker(); + buildWorkerLib(); + buildAddon(); if (!process.env.MEDIASOUP_LOCAL_DEV) { cleanWorkerArtifacts(); @@ -146,7 +147,7 @@ async function run() { } case 'worker:build': { - buildWorker(); + buildWorkerLib(); break; } @@ -281,7 +282,7 @@ function getPython() { } function getWorkerPrebuildTarName() { - let name = `mediasoup-worker-${PKG.version}-${os.platform()}-${os.arch()}`; + let name = `libmediasoup-worker-${PKG.version}-${os.platform()}-${os.arch()}`; // In Linux we want to know about kernel version since kernel >= 6 supports // io-uring. @@ -330,12 +331,22 @@ function buildTypescript({ force = false } = { force: false }) { executeCmd('tsc --project node'); } -function buildWorker() { - logInfo('buildWorker()'); +function buildWorkerLib() { + logInfo('buildWorkerLib()'); installInvoke(); - executeCmd(`"${PYTHON}" -m invoke -r worker mediasoup-worker`); + executeCmd(`"${PYTHON}" -m invoke -r worker libmediasoup-worker`); +} + +function buildAddon() { + logInfo('buildAddon()'); + + installInvoke(); + + executeCmd( + `cd ${WORKER_CHANNEL_ADDON_PATH} && node scripts.mjs binding:build` + ); } function cleanWorkerArtifacts() { @@ -467,7 +478,8 @@ function checkRelease() { installNodeDeps(); flatcNode(); buildTypescript({ force: true }); - buildWorker(); + buildWorkerLib(); + buildAddon(); lintNode(); lintWorker(); testNode(); @@ -569,46 +581,6 @@ async function downloadPrebuiltWorker() { `downloadPrebuiltWorker() | failed to give execution permissions to the mediasoup-worker prebuilt binary: ${error}` ); } - - // Let's confirm that the fetched mediasoup-worker prebuit binary does - // run in current host. This is to prevent weird issues related to - // different versions of libc in the system and so on. - // So run mediasoup-worker without the required MEDIASOUP_VERSION env and - // expect exit code 41 (see main.cpp). - - logInfo( - 'downloadPrebuiltWorker() | checking fetched mediasoup-worker prebuilt binary in current host' - ); - - try { - const resolvedBinPath = path.resolve(WORKER_RELEASE_BIN_PATH); - - // This will always fail on purpose, but if status code is 41 then - // it's good. - execSync(`"${resolvedBinPath}"`, { - stdio: ['ignore', 'ignore', 'ignore'], - // Ensure no env is passed to avoid accidents. - env: {}, - }); - } catch (error) { - if (error.status === 41) { - logInfo( - 'downloadPrebuiltWorker() | fetched mediasoup-worker prebuilt binary is valid for current host' - ); - - resolve(true); - } else { - logError( - `downloadPrebuiltWorker() | fetched mediasoup-worker prebuilt binary fails to run in this host [status:${error.status}]` - ); - - try { - fs.unlinkSync(WORKER_RELEASE_BIN_PATH); - } catch (error2) {} - - resolve(false); - } - } }) .on('error', error => { logError( diff --git a/package-lock.json b/package-lock.json index aad83f706d..37c633feb8 100644 --- a/package-lock.json +++ b/package-lock.json @@ -15,6 +15,7 @@ "flatbuffers": "^24.3.25", "h264-profile-level-id": "^2.0.0", "ini": "^4.1.2", + "node-addon-api": "^8.0.0", "node-fetch": "^3.3.2", "supports-color": "^9.4.0", "tar": "^7.1.0" @@ -40,7 +41,8 @@ "typescript": "^5.4.5" }, "engines": { - "node": ">=18" + "node": ">=18", + "npm": ">=10.3.0" }, "funding": { "type": "opencollective", @@ -4889,6 +4891,14 @@ "integrity": "sha512-OWND8ei3VtNC9h7V60qff3SVobHr996CTwgxubgyQYEpg290h9J0buyECNNJexkFm5sOajh5G116RYA1c8ZMSw==", "dev": true }, + "node_modules/node-addon-api": { + "version": "8.0.0", + "resolved": "https://registry.npmjs.org/node-addon-api/-/node-addon-api-8.0.0.tgz", + "integrity": "sha512-ipO7rsHEBqa9STO5C5T10fj732ml+5kLN1cAG8/jdHd56ldQeGj3Q7+scUS+VHK/qy1zLEwC4wMK5+yM0btPvw==", + "engines": { + "node": "^18 || ^20 || >= 21" + } + }, "node_modules/node-domexception": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/node-domexception/-/node-domexception-1.0.0.tgz", @@ -6338,6 +6348,18 @@ "funding": { "url": "https://github.com/sponsors/sindresorhus" } + }, + "node/workerChannel": { + "name": "worker-channel", + "version": "1.0.0", + "extraneous": true, + "dependencies": { + "debug": "^4.3.4", + "node-addon-api": "*" + }, + "devDependencies": { + "typescript": "^5.4.5" + } } }, "dependencies": { @@ -9856,6 +9878,11 @@ "integrity": "sha512-OWND8ei3VtNC9h7V60qff3SVobHr996CTwgxubgyQYEpg290h9J0buyECNNJexkFm5sOajh5G116RYA1c8ZMSw==", "dev": true }, + "node-addon-api": { + "version": "8.0.0", + "resolved": "https://registry.npmjs.org/node-addon-api/-/node-addon-api-8.0.0.tgz", + "integrity": "sha512-ipO7rsHEBqa9STO5C5T10fj732ml+5kLN1cAG8/jdHd56ldQeGj3Q7+scUS+VHK/qy1zLEwC4wMK5+yM0btPvw==" + }, "node-domexception": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/node-domexception/-/node-domexception-1.0.0.tgz", diff --git a/package.json b/package.json index 0eb048158f..f96b9a7572 100644 --- a/package.json +++ b/package.json @@ -21,6 +21,10 @@ "types": "node/lib/index.d.ts", "files": [ "node/lib", + "node/src/workerChannel/binding.gyp", + "node/src/workerChannel/include", + "node/src/workerChannel/scripts.mjs", + "node/src/workerChannel/src", "worker/deps/libwebrtc", "worker/fbs", "worker/fuzzer/include", @@ -40,7 +44,8 @@ "npm-scripts.mjs" ], "engines": { - "node": ">=18" + "node": ">=18", + "npm": ">=10.3.0" }, "keywords": [ "webrtc", @@ -104,6 +109,7 @@ "flatbuffers": "^24.3.25", "h264-profile-level-id": "^2.0.0", "ini": "^4.1.2", + "node-addon-api": "^8.0.0", "node-fetch": "^3.3.2", "supports-color": "^9.4.0", "tar": "^7.1.0" diff --git a/worker/meson.build b/worker/meson.build index 1be338815b..de985a0351 100644 --- a/worker/meson.build +++ b/worker/meson.build @@ -5,6 +5,7 @@ project( 'warning_level=1', 'cpp_std=c++17', 'default_library=static', + 'b_vscrt=static_from_buildtype', ], meson_version: '>= 1.1.0', ) diff --git a/worker/scripts/clang-format.mjs b/worker/scripts/clang-format.mjs index c17152998a..4cbd6907aa 100644 --- a/worker/scripts/clang-format.mjs +++ b/worker/scripts/clang-format.mjs @@ -15,6 +15,8 @@ async function run() { '../test/include/helpers.hpp', '../fuzzer/src/**/*.cpp', '../fuzzer/include/**/*.hpp', + '../../node/src/workerChannel/include/**/*.hpp', + '../../node/src/workerChannel/src/**/*.cpp', ]); switch (task) {