Skip to content

Commit

Permalink
update verb specs (#29)
Browse files Browse the repository at this point in the history
* update verb specs

* add support for session:redirect event

* 0.1.67

* added tts:user_interrupt event and other improvements
  • Loading branch information
davehorton authored Jan 3, 2025
1 parent 4d43b1a commit 8a2cf6d
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 61 deletions.
20 changes: 20 additions & 0 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ class Client extends Emitter {
case 'session:adulting':
this._onSessionAdulting(ws, path, msg);
break;
case 'session:redirect':
this._onSessionRedirect(ws, path, msg);
break;
default:
debug(`Client: discarding msg type ${msg.type}`);
}
Expand Down Expand Up @@ -61,6 +64,23 @@ class Client extends Emitter {
ws.off('message', this._boundHandler);
this.emit('session:new', session, path, ws.req);
}

_onSessionRedirect(ws, path, msg) {
const logger = typeof this.logger.child === 'function' ?
this.logger.child({call_sid: msg.call_sid}) :
this.logger;
const session = new Session({logger, ws, msg});
/* Note: all further messaging after session:new will be handled by the session */
ws.off('message', this._boundHandler);
session.reply();

// Check if there are listeners for "session:redirect", otherwise emit "session:new"
if (this.listenerCount('session:redirect') > 0) {
this.emit('session:redirect', session, path, ws.req);
} else {
this.emit('session:new', session, path, ws.req);
}
}
}

module.exports = Client;
147 changes: 97 additions & 50 deletions lib/session.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
const assert = require('assert');
const Emitter = require('events');
const {validateInjectCommand} = require('./utils/validate-inject-command');
const {validateVerb, specs} = require('@jambonz/verb-specifications');
Expand All @@ -17,6 +18,7 @@ class Session extends Emitter {
this.locals = {};
this._ttsCounter = 0;
this._ttsPaused = false;
this.commandQueue = []; // Single queue for all commands

// eslint-disable-next-line no-unused-vars
const {sip_status, sip_reason, call_status, ...rest} = msg.data;
Expand All @@ -26,9 +28,6 @@ class Session extends Emitter {
this[key] = value;
}

this.pendingTtsTokens = [];
this.queuedTtsTokens = [];

ws.on('message', this._onMessage.bind(this));
ws.on('close', this._onClose.bind(this));
}
Expand All @@ -37,35 +36,6 @@ class Session extends Emitter {
return this._ttsPaused;
}

processTtsTokensResult(data) {
const { id, status, reason } = data;
const index = this.pendingTtsTokens.findIndex((obj) => obj.id === id);
if (index === -1) {
debug({id, data}, 'processTtsTokensResult: Received unexpected tts:tokens-result');
this.logger.info({ id, data }, 'Received unexpected tts:tokens-result message');
return;
}
const obj = this.pendingTtsTokens[index];

if (status === 'ok') {
debug(`processTtsTokensResult: received ok for id ${id}`);
obj.resolver({status: 'ok'});
} else if (reason === 'full') {
debug(`processTtsTokensResult: received reason=full for id ${id}`);
if (!this._ttsPaused) {
this.logger.info({ id, status, reason }, 'Received tts:tokens-result message with reason=full');
}
this._ttsPaused = true;
this.queuedTtsTokens.push(obj.tokens);
obj.resolver({status: 'full'});
} else {
debug(`processTtsTokensResult: received error for id ${id}: status: ${status}, reason: ${reason}`);
this.logger.info({ id, status, reason }, 'Error processing tts:tokens-result message');
obj.rejector(new Error(`tts-token error: ${reason}`));
}
this.pendingTtsTokens.splice(index, 1);
debug(`processTtsTokensResult: we now have ${this.pendingTtsTokens.length} pending tts token requests`);
}
send({execImmediate = true, reply = false} = {}) {
const queueCommand = !execImmediate;
const ack = reply || !this._acked ;
Expand Down Expand Up @@ -129,38 +99,112 @@ class Session extends Emitter {
return this.sendCommand('llm:update', data);
}

async sendTtsTokens(text) {
sendTtsTokens(text) {
if (!text || typeof text !== 'string') throw new Error('Invalid tokens provided');

const id = ++this._ttsCounter;
const obj = {id, tokens: text};
debug(`sendTtsTokens: sending ${text.length} with id ${id}`);
this.sendCommand('tts:tokens', obj);
const p = new Promise((resolve, reject) => {
obj.resolver = resolve;
obj.rejector = reject;
const promise = new Promise((resolve, reject) => {
this.commandQueue.push({ type: 'tokens', id, tokens: text, resolve, reject });
});
this.pendingTtsTokens.push(obj);
return p;

/* if the queue was empty, process the queue and send this message out immediately */
if (this.commandQueue.length === 1 && !this._ttsPaused) this._processQueue();

return promise;
}

handleTtsTokensResult(data) {
const { id, status, reason } = data;
assert.ok(this.commandQueue.length > 0, 'matching request should always be at the head of the queue');
assert.ok(id === this.commandQueue[0].id, 'matching request should always be at the head of the queue');

/* remove the first command from the queue */
const command = this.commandQueue[0];

if (status === 'ok') {
debug(`handleTtsTokensResult: received ok for id ${id}`);
this.commandQueue.shift();
command.resolve();
} else if (reason === 'full') {
debug(`handleTtsTokensResult: received reason=full for id ${id}`);
this._ttsPaused = true;
command.resolve();
} else {
debug(`handleTtsTokensResult: received error for id ${id}: status=${status}, reason=${reason}`);
this.commandQueue.shift();
command.reject(new Error(`tts-token error: ${reason}`));
}

if (this.commandQueue.length > 0 && !this._ttsPaused) {
this._processQueue();
}
}

clearTtsTokens() {
this.queuedTtsTokens = [];
/* resolve all of the commands in the queue and clear it */
for (const command of this.commandQueue) {
if (command.type === 'tokens') {
command.resolve();
}
}
this.commandQueue = [];
this._ttsPaused = false;
debug('clearTtsTokens: sending tts:clear');
return this.sendCommand('tts:clear');
}

flushTtsTokens() {
debug('sendTtsFlush: sending tts:flush');
return this.sendCommand('tts:flush');
debug('flushTtsTokens: queuing tts:flush command');

if (this.commandQueue.length === 0) {
this.sendCommand('tts:flush');
}
else {
this.commandQueue.push({ type: 'flush' });
}
}

async resumeTtsStreaming() {
const arr = [...this.queuedTtsTokens];
this.queuedTtsTokens = [];
resumeTtsStreaming() {
debug('resumeTtsStreaming: resuming TTS streaming');
this._ttsPaused = false;
for (const text of arr) {
this.sendTtsTokens(text);
this._processQueue();
}

handleUserInterruption() {
debug('handleUserInterruption: received user interruption');

this.emit('tts:user_interrupt');


/* resolve all of the commands in the queue and clear it */
for (const command of this.commandQueue) {
if (command.type === 'tokens') {
command.resolve();
}
}
this.commandQueue = [];
this._ttsPaused = false;
}

async _processQueue() {
assert(this.commandQueue.length > 0, 'queue should not be empty');
assert(this._ttsPaused === false, 'queue should not be processed when paused');

/* process any flush commands at the head of the queue */
while (this.commandQueue.length > 0 && this.commandQueue[0].type === 'flush') {
debug('_processQueue: sending tts:flush command');
this.sendCommand('tts:flush');
this.commandQueue.shift();
}

if (this.commandQueue.length > 0 && !this._ttsPaused) {
assert(this.commandQueue[0].type === 'tokens', 'next command should be tokens');
const command = this.commandQueue[0];
debug(`_processQueue: sending tts:tokens id ${command.id}`);
this.sendCommand('tts:tokens', { id: command.id, tokens: command.tokens });
}

debug('_processQueue: queue processing completed');
}

toJSON() {
Expand Down Expand Up @@ -206,10 +250,13 @@ class Session extends Emitter {
if (msg.data?.event_type === 'stream_resumed') {
this.resumeTtsStreaming();
}
else if (msg.data?.event_type === 'user_interruption') {
this.handleUserInterruption();
}
this.emit(msg.type, msg.data);
break;
case 'tts:tokens-result':
this.processTtsTokensResult(msg.data);
this.handleTtsTokensResult(msg.data);
break;
case 'call:status':
case 'verb:status':
Expand Down
18 changes: 9 additions & 9 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@jambonz/node-client-ws",
"version": "0.1.66",
"version": "0.1.67",
"description": "",
"main": "index.js",
"scripts": {
Expand All @@ -10,7 +10,7 @@
"author": "Dave Horton",
"license": "MIT",
"dependencies": {
"@jambonz/verb-specifications": "^0.0.89",
"@jambonz/verb-specifications": "^0.0.91",
"debug": "^4.3.4",
"parseurl": "^1.3.3",
"ws": "^8.18.0"
Expand Down

0 comments on commit 8a2cf6d

Please sign in to comment.