From 5f0bd10c61c554c0ec408bd6fcdb9747115840bc Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Tue, 12 Mar 2024 13:46:06 -0700 Subject: [PATCH 01/28] Request batch when idle (#7526) See https://github.com/nasa/openmct/pull/7526 for details --- .../displayLayout/displayLayout.e2e.spec.js | 2 +- .../plugins/plot/previews.e2e.spec.js | 4 +- e2e/tests/functional/tooltips.e2e.spec.js | 2 +- src/api/telemetry/BatchingWebSocket.js | 98 ++++++++++++++++--- src/api/telemetry/TelemetryAPI.js | 7 +- src/api/telemetry/WebSocketWorker.js | 77 +++++++++++++-- src/plugins/LADTable/components/LadRow.vue | 2 + src/plugins/LADTable/components/LadTable.vue | 2 +- .../displayLayout/components/LayoutFrame.vue | 1 + .../components/TelemetryView.vue | 14 +-- .../gauge/components/GaugeComponent.vue | 1 + .../openInNewTabAction/openInNewTabAction.js | 2 +- .../components/TableComponent.vue | 82 ++++++++-------- src/plugins/timeConductor/ConductorAxis.vue | 2 - .../timeConductor/ConductorInputsRealtime.vue | 5 +- src/ui/components/ObjectFrame.vue | 2 + src/ui/components/ObjectLabel.vue | 4 +- src/ui/components/TimeSystemAxis.vue | 19 +++- 18 files changed, 242 insertions(+), 84 deletions(-) diff --git a/e2e/tests/functional/plugins/displayLayout/displayLayout.e2e.spec.js b/e2e/tests/functional/plugins/displayLayout/displayLayout.e2e.spec.js index cc678d3b5bf..a187c885229 100644 --- a/e2e/tests/functional/plugins/displayLayout/displayLayout.e2e.spec.js +++ b/e2e/tests/functional/plugins/displayLayout/displayLayout.e2e.spec.js @@ -272,7 +272,7 @@ test.describe('Display Layout', () => { expect(trimmedDisplayValue).toBe(formattedTelemetryValue); // ensure we can right click on the alpha-numeric widget and view historical data - await page.getByLabel('Sine', { exact: true }).click({ + await page.getByLabel(/Alpha-numeric telemetry value of.*/).click({ button: 'right' }); await page.getByLabel('View Historical Data').click(); diff --git a/e2e/tests/functional/plugins/plot/previews.e2e.spec.js b/e2e/tests/functional/plugins/plot/previews.e2e.spec.js index 5d6211c3df7..7a5b425fe46 100644 --- a/e2e/tests/functional/plugins/plot/previews.e2e.spec.js +++ b/e2e/tests/functional/plugins/plot/previews.e2e.spec.js @@ -54,7 +54,9 @@ test.describe('Plots work in Previews', () => { await page.getByRole('listitem', { name: 'Save and Finish Editing' }).click(); // right click on the plot and select view large - await page.getByLabel('Sine', { exact: true }).click({ button: 'right' }); + await page.getByLabel(/Alpha-numeric telemetry value of.*/).click({ + button: 'right' + }); await page.getByLabel('View Historical Data').click(); await expect(page.getByLabel('Preview Container').getByLabel('Plot Canvas')).toBeVisible(); await page.getByRole('button', { name: 'Close' }).click(); diff --git a/e2e/tests/functional/tooltips.e2e.spec.js b/e2e/tests/functional/tooltips.e2e.spec.js index 527593f2172..711abfd786b 100644 --- a/e2e/tests/functional/tooltips.e2e.spec.js +++ b/e2e/tests/functional/tooltips.e2e.spec.js @@ -109,7 +109,7 @@ test.describe('Verify tooltips', () => { async function getToolTip(object) { await page.locator('.c-create-button').hover(); - await page.getByRole('cell', { name: object.name }).hover(); + await page.getByLabel('lad name').getByText(object.name).hover(); let tooltipText = await page.locator('.c-tooltip').textContent(); return tooltipText.replace('\n', '').trim(); } diff --git a/src/api/telemetry/BatchingWebSocket.js b/src/api/telemetry/BatchingWebSocket.js index 204e6e9e707..5ba0848e586 100644 --- a/src/api/telemetry/BatchingWebSocket.js +++ b/src/api/telemetry/BatchingWebSocket.js @@ -20,7 +20,6 @@ * at runtime from the About dialog for additional information. *****************************************************************************/ import installWorker from './WebSocketWorker.js'; -const DEFAULT_RATE_MS = 1000; /** * Describes the strategy to be used when batching WebSocket messages * @@ -51,11 +50,21 @@ const DEFAULT_RATE_MS = 1000; * * @memberof module:openmct.telemetry */ +// Shim for Internet Explorer, I mean Safari. It doesn't support requestIdleCallback, but it's in a tech preview, so it will be dropping soon. +const requestIdleCallback = + // eslint-disable-next-line compat/compat + window.requestIdleCallback ?? ((fn, { timeout }) => setTimeout(fn, timeout)); +const ONE_SECOND = 1000; +const FIVE_SECONDS = 5 * ONE_SECOND; + class BatchingWebSocket extends EventTarget { #worker; #openmct; #showingRateLimitNotification; - #rate; + #maxBatchSize; + #applicationIsInitializing; + #maxBatchWait; + #firstBatchReceived; constructor(openmct) { super(); @@ -66,7 +75,10 @@ class BatchingWebSocket extends EventTarget { this.#worker = new Worker(workerUrl); this.#openmct = openmct; this.#showingRateLimitNotification = false; - this.#rate = DEFAULT_RATE_MS; + this.#maxBatchSize = Number.POSITIVE_INFINITY; + this.#maxBatchWait = ONE_SECOND; + this.#applicationIsInitializing = true; + this.#firstBatchReceived = false; const routeMessageToHandler = this.#routeMessageToHandler.bind(this); this.#worker.addEventListener('message', routeMessageToHandler); @@ -78,6 +90,20 @@ class BatchingWebSocket extends EventTarget { }, { once: true } ); + + openmct.once('start', () => { + // An idle callback is a pretty good indication that a complex display is done loading. At that point set the batch size more conservatively. + // Force it after 5 seconds if it hasn't happened yet. + requestIdleCallback( + () => { + this.#applicationIsInitializing = false; + this.setMaxBatchSize(this.#maxBatchSize); + }, + { + timeout: FIVE_SECONDS + } + ); + }); } /** @@ -129,14 +155,6 @@ class BatchingWebSocket extends EventTarget { }); } - /** - * When using batching, sets the rate at which batches of messages are released. - * @param {Number} rate the amount of time to wait, in ms, between batches. - */ - setRate(rate) { - this.#rate = rate; - } - /** * @param {Number} maxBatchSize the maximum length of a batch of messages. For example, * the maximum number of telemetry values to batch before dropping them @@ -151,12 +169,29 @@ class BatchingWebSocket extends EventTarget { * 15 would probably be a better batch size. */ setMaxBatchSize(maxBatchSize) { + this.#maxBatchSize = maxBatchSize; + if (!this.#applicationIsInitializing) { + this.#sendMaxBatchSizeToWorker(this.#maxBatchSize); + } + } + setMaxBatchWait(wait) { + this.#maxBatchWait = wait; + this.#sendBatchWaitToWorker(this.#maxBatchWait); + } + #sendMaxBatchSizeToWorker(maxBatchSize) { this.#worker.postMessage({ type: 'setMaxBatchSize', maxBatchSize }); } + #sendBatchWaitToWorker(maxBatchWait) { + this.#worker.postMessage({ + type: 'setMaxBatchWait', + maxBatchWait + }); + } + /** * Disconnect the associated WebSocket. Generally speaking there is no need to call * this manually. @@ -169,7 +204,9 @@ class BatchingWebSocket extends EventTarget { #routeMessageToHandler(message) { if (message.data.type === 'batch') { - if (message.data.batch.dropped === true && !this.#showingRateLimitNotification) { + this.start = Date.now(); + const batch = message.data.batch; + if (batch.dropped === true && !this.#showingRateLimitNotification) { const notification = this.#openmct.notifications.alert( 'Telemetry dropped due to client rate limiting.', { hint: 'Refresh individual telemetry views to retrieve dropped telemetry if needed.' } @@ -179,16 +216,45 @@ class BatchingWebSocket extends EventTarget { this.#showingRateLimitNotification = false; }); } - this.dispatchEvent(new CustomEvent('batch', { detail: message.data.batch })); - setTimeout(() => { - this.#readyForNextBatch(); - }, this.#rate); + + this.dispatchEvent(new CustomEvent('batch', { detail: batch })); + this.#waitUntilIdleAndRequestNextBatch(batch); } else if (message.data.type === 'message') { this.dispatchEvent(new CustomEvent('message', { detail: message.data.message })); + } else if (message.data.type === 'reconnected') { + this.dispatchEvent(new CustomEvent('reconnected')); } else { throw new Error(`Unknown message type: ${message.data.type}`); } } + + #waitUntilIdleAndRequestNextBatch(batch) { + requestIdleCallback( + (state) => { + if (this.#firstBatchReceived === false) { + this.#firstBatchReceived = true; + } + const now = Date.now(); + const waitedFor = now - this.start; + if (state.didTimeout === true) { + if (document.visibilityState === 'visible') { + console.warn(`Event loop is too busy to process batch.`); + this.#waitUntilIdleAndRequestNextBatch(batch); + } else { + // After ingesting a telemetry batch, wait until the event loop is idle again before + // informing the worker we are ready for another batch. + this.#readyForNextBatch(); + } + } else { + if (waitedFor > ONE_SECOND) { + console.warn(`Warning, batch processing took ${waitedFor}ms`); + } + this.#readyForNextBatch(); + } + }, + { timeout: ONE_SECOND } + ); + } } export default BatchingWebSocket; diff --git a/src/api/telemetry/TelemetryAPI.js b/src/api/telemetry/TelemetryAPI.js index 143f3fc32ed..e6d49268163 100644 --- a/src/api/telemetry/TelemetryAPI.js +++ b/src/api/telemetry/TelemetryAPI.js @@ -85,6 +85,7 @@ const SUBSCRIBE_STRATEGY = { export default class TelemetryAPI { #isGreedyLAD; #subscribeCache; + #hasReturnedFirstData; get SUBSCRIBE_STRATEGY() { return SUBSCRIBE_STRATEGY; @@ -108,6 +109,7 @@ export default class TelemetryAPI { this.#isGreedyLAD = true; this.BatchingWebSocket = BatchingWebSocket; this.#subscribeCache = {}; + this.#hasReturnedFirstData = false; } abortAllRequests() { @@ -383,7 +385,10 @@ export default class TelemetryAPI { arguments[1] = await this.applyRequestInterceptors(domainObject, arguments[1]); try { const telemetry = await provider.request(...arguments); - + if (!this.#hasReturnedFirstData) { + this.#hasReturnedFirstData = true; + performance.mark('firstHistoricalDataReturned'); + } return telemetry; } catch (error) { if (error.name !== 'AbortError') { diff --git a/src/api/telemetry/WebSocketWorker.js b/src/api/telemetry/WebSocketWorker.js index f09e7d12f7f..3daa15f75d6 100644 --- a/src/api/telemetry/WebSocketWorker.js +++ b/src/api/telemetry/WebSocketWorker.js @@ -21,6 +21,7 @@ *****************************************************************************/ /* eslint-disable max-classes-per-file */ export default function installWorker() { + const ONE_SECOND = 1000; const FALLBACK_AND_WAIT_MS = [1000, 5000, 5000, 10000, 10000, 30000]; /** @@ -44,6 +45,13 @@ export default function installWorker() { #currentWaitIndex = 0; #messageCallbacks = []; #wsUrl; + #reconnecting = false; + #worker; + + constructor(worker) { + super(); + this.#worker = worker; + } /** * Establish a new WebSocket connection to the given URL @@ -62,6 +70,9 @@ export default function installWorker() { this.#isConnecting = true; this.#webSocket = new WebSocket(url); + //Exposed to e2e tests so that the websocket can be manipulated during tests. Cannot find any other way to do this. + // Playwright does not support forcing websocket state changes. + this.#worker.currentWebSocket = this.#webSocket; const boundConnected = this.#connected.bind(this); this.#webSocket.addEventListener('open', boundConnected); @@ -100,12 +111,17 @@ export default function installWorker() { } #connected() { - console.debug('Websocket connected.'); + console.info('Websocket connected.'); this.#isConnected = true; this.#isConnecting = false; this.#currentWaitIndex = 0; - this.dispatchEvent(new Event('connected')); + if (this.#reconnecting) { + this.#worker.postMessage({ + type: 'reconnected' + }); + this.#reconnecting = false; + } this.#flushQueue(); } @@ -138,6 +154,7 @@ export default function installWorker() { if (this.#reconnectTimeoutHandle) { return; } + this.#reconnecting = true; this.#reconnectTimeoutHandle = setTimeout(() => { this.connect(this.#wsUrl); @@ -207,6 +224,9 @@ export default function installWorker() { case 'setMaxBatchSize': this.#messageBatcher.setMaxBatchSize(message.data.maxBatchSize); break; + case 'setMaxBatchWait': + this.#messageBatcher.setMaxBatchWait(message.data.maxBatchWait); + break; default: throw new Error(`Unknown message type: ${type}`); } @@ -245,7 +265,6 @@ export default function installWorker() { } routeMessageToHandler(data) { - //Implement batching here if (this.#messageBatcher.shouldBatchMessage(data)) { this.#messageBatcher.addMessageToBatch(data); } else { @@ -267,12 +286,15 @@ export default function installWorker() { #maxBatchSize; #readyForNextBatch; #worker; + #throttledSendNextBatch; constructor(worker) { - this.#maxBatchSize = 10; + // No dropping telemetry unless we're explicitly told to. + this.#maxBatchSize = Number.POSITIVE_INFINITY; this.#readyForNextBatch = false; this.#worker = worker; this.#resetBatch(); + this.setMaxBatchWait(ONE_SECOND); } #resetBatch() { this.#batch = {}; @@ -310,23 +332,29 @@ export default function installWorker() { const batchId = this.#batchingStrategy.getBatchIdFromMessage(message); let batch = this.#batch[batchId]; if (batch === undefined) { + this.#hasBatch = true; batch = this.#batch[batchId] = [message]; } else { batch.push(message); } if (batch.length > this.#maxBatchSize) { + console.warn( + `Exceeded max batch size of ${this.#maxBatchSize} for ${batchId}. Dropping value.` + ); batch.shift(); - this.#batch.dropped = this.#batch.dropped || true; + this.#batch.dropped = true; } + if (this.#readyForNextBatch) { - this.#sendNextBatch(); - } else { - this.#hasBatch = true; + this.#throttledSendNextBatch(); } } setMaxBatchSize(maxBatchSize) { this.#maxBatchSize = maxBatchSize; } + setMaxBatchWait(maxBatchWait) { + this.#throttledSendNextBatch = throttle(this.#sendNextBatch.bind(this), maxBatchWait); + } /** * Indicates that client code is ready to receive the next batch of * messages. If a batch is available, it will be immediately sent. @@ -335,7 +363,7 @@ export default function installWorker() { */ readyForNextBatch() { if (this.#hasBatch) { - this.#sendNextBatch(); + this.#throttledSendNextBatch(); } else { this.#readyForNextBatch = true; } @@ -352,7 +380,34 @@ export default function installWorker() { } } - const websocket = new ResilientWebSocket(); + function throttle(callback, wait) { + let last = 0; + let throttling = false; + + return function (...args) { + if (throttling) { + return; + } + + const now = performance.now(); + const timeSinceLast = now - last; + + if (timeSinceLast >= wait) { + last = now; + callback(...args); + } else if (!throttling) { + throttling = true; + + setTimeout(() => { + last = performance.now(); + throttling = false; + callback(...args); + }, wait - timeSinceLast); + } + }; + } + + const websocket = new ResilientWebSocket(self); const messageBatcher = new MessageBatcher(self); const workerBroker = new WorkerToWebSocketMessageBroker(websocket, messageBatcher); const websocketBroker = new WebSocketToWorkerMessageBroker(messageBatcher, self); @@ -363,4 +418,6 @@ export default function installWorker() { websocket.registerMessageCallback((data) => { websocketBroker.routeMessageToHandler(data); }); + + self.websocketInstance = websocket; } diff --git a/src/plugins/LADTable/components/LadRow.vue b/src/plugins/LADTable/components/LadRow.vue index 2adf23743de..d6df9189689 100644 --- a/src/plugins/LADTable/components/LadRow.vue +++ b/src/plugins/LADTable/components/LadRow.vue @@ -24,11 +24,13 @@
- +
diff --git a/src/plugins/displayLayout/components/LayoutFrame.vue b/src/plugins/displayLayout/components/LayoutFrame.vue index f9f2a3a63a3..440fad62f5b 100644 --- a/src/plugins/displayLayout/components/LayoutFrame.vue +++ b/src/plugins/displayLayout/components/LayoutFrame.vue @@ -22,6 +22,7 @@
Name