diff --git a/.eslintrc.js b/.eslintrc.js index 78976d7a386..6453dd70202 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -127,7 +127,7 @@ module.exports = { '@typescript-eslint/no-restricted-imports': [ 'error', { - paths: ['lodash', 'rxjs', 'lodash/noop', 'rxjs/util/noop'], + paths: ['lodash', 'lodash/noop'], patterns: [ { group: ['@jupyterlab/*'], diff --git a/package-lock.json b/package-lock.json index faacd9b4ebc..7bced48774f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -59,8 +59,6 @@ "redux": "^4.0.4", "redux-logger": "^3.0.6", "reflect-metadata": "^0.1.12", - "rxjs": "^6.5.4", - "rxjs-compat": "^6.5.4", "safe-buffer": "^5.2.1", "sanitize-filename": "^1.6.3", "semver": "^5.7.2", @@ -19219,11 +19217,6 @@ "npm": ">=2.0.0" } }, - "node_modules/rxjs-compat": { - "version": "6.6.7", - "resolved": "https://registry.npmjs.org/rxjs-compat/-/rxjs-compat-6.6.7.tgz", - "integrity": "sha512-szN4fK+TqBPOFBcBcsR0g2cmTTUF/vaFEOZNuSdfU8/pGFnNmmn2u8SystYXG1QMrjOPBc6XTKHMVfENDf6hHw==" - }, "node_modules/rxjs/node_modules/tslib": { "version": "1.14.1", "resolved": "https://registry.npmjs.org/tslib/-/tslib-1.14.1.tgz", @@ -38352,11 +38345,6 @@ } } }, - "rxjs-compat": { - "version": "6.6.7", - "resolved": "https://registry.npmjs.org/rxjs-compat/-/rxjs-compat-6.6.7.tgz", - "integrity": "sha512-szN4fK+TqBPOFBcBcsR0g2cmTTUF/vaFEOZNuSdfU8/pGFnNmmn2u8SystYXG1QMrjOPBc6XTKHMVfENDf6hHw==" - }, "safe-array-concat": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/safe-array-concat/-/safe-array-concat-1.0.1.tgz", diff --git a/package.json b/package.json index c8389046f48..e6ea973cb12 100644 --- a/package.json +++ b/package.json @@ -2168,8 +2168,6 @@ "redux": "^4.0.4", "redux-logger": "^3.0.6", "reflect-metadata": "^0.1.12", - "rxjs": "^6.5.4", - "rxjs-compat": "^6.5.4", "safe-buffer": "^5.2.1", "sanitize-filename": "^1.6.3", "semver": "^5.7.2", diff --git a/src/kernels/jupyter/interpreter/jupyterInterpreterSubCommandExecutionService.unit.test.ts b/src/kernels/jupyter/interpreter/jupyterInterpreterSubCommandExecutionService.unit.test.ts index af9fac21137..ac0a53fb4b6 100644 --- a/src/kernels/jupyter/interpreter/jupyterInterpreterSubCommandExecutionService.unit.test.ts +++ b/src/kernels/jupyter/interpreter/jupyterInterpreterSubCommandExecutionService.unit.test.ts @@ -5,7 +5,6 @@ import { assert, expect, use } from 'chai'; import chaiPromise from 'chai-as-promised'; import * as path from '../../../platform/vscode-path/path'; import * as sinon from 'sinon'; -import { Subject } from 'rxjs/Subject'; import { anything, capture, deepEqual, instance, mock, when } from 'ts-mockito'; import { Uri } from 'vscode'; import { ObservableExecutionResult, Output } from '../../../platform/common/process/types.node'; @@ -26,6 +25,9 @@ import { JupyterInterpreterDependencyService } from './jupyterInterpreterDepende import { JupyterInterpreterService } from './jupyterInterpreterService.node'; import { JupyterInterpreterSubCommandExecutionService } from './jupyterInterpreterSubCommandExecutionService.node'; import { noop } from '../../../test/core'; +import { createObservable } from '../../../platform/common/process/proc.node'; +import { IDisposable } from '../../../platform/common/types'; +import { dispose } from '../../../platform/common/utils/lifecycle'; use(chaiPromise); /* eslint-disable */ @@ -39,6 +41,7 @@ suite('Jupyter InterpreterSubCommandExecutionService', () => { const selectedJupyterInterpreter = createPythonInterpreter({ displayName: 'JupyterInterpreter' }); const activePythonInterpreter = createPythonInterpreter({ displayName: 'activePythonInterpreter' }); let notebookStartResult: ObservableExecutionResult; + const disposables: IDisposable[] = []; setup(() => { interpreterService = mock(); jupyterInterpreter = mock(JupyterInterpreterService); @@ -49,10 +52,12 @@ suite('Jupyter InterpreterSubCommandExecutionService', () => { // eslint-disable-next-line @typescript-eslint/no-explicit-any (instance(execService) as any).then = undefined; const output = new MockOutputChannel(''); + const out = createObservable>(); + disposables.push(out); notebookStartResult = { dispose: noop, proc: undefined, - out: new Subject>().asObservable() + out }; const jupyterPaths = mock(); when(jupyterPaths.getKernelSpecTempRegistrationFolder()).thenResolve( @@ -78,6 +83,7 @@ suite('Jupyter InterpreterSubCommandExecutionService', () => { when(interpreterService.getActiveInterpreter(undefined)).thenResolve(activePythonInterpreter); }); teardown(() => { + dispose(disposables); sinon.restore(); }); // eslint-disable-next-line diff --git a/src/kernels/jupyter/launcher/jupyterConnectionWaiter.node.ts b/src/kernels/jupyter/launcher/jupyterConnectionWaiter.node.ts index 469ef17183f..3cbde26af75 100644 --- a/src/kernels/jupyter/launcher/jupyterConnectionWaiter.node.ts +++ b/src/kernels/jupyter/launcher/jupyterConnectionWaiter.node.ts @@ -1,7 +1,6 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. -import { Subscription } from 'rxjs/Subscription'; import { Disposable, Uri } from 'vscode'; import { IConfigurationService, IDisposable } from '../../../platform/common/types'; import { traceError, traceWarning, traceVerbose } from '../../../platform/logging'; @@ -20,6 +19,7 @@ import { JupyterNotebookNotInstalled } from '../../../platform/errors/jupyterNot import { PythonEnvironment } from '../../../platform/pythonEnvironments/info'; import { JupyterCannotBeLaunchedWithRootError } from '../../../platform/errors/jupyterCannotBeLaunchedWithRootError'; import { createJupyterConnectionInfo } from '../jupyterUtils'; +import { dispose } from '../../../platform/common/utils/lifecycle'; const urlMatcher = new RegExp(RegExpValues.UrlPatternRegEx); @@ -30,7 +30,7 @@ export class JupyterConnectionWaiter implements IDisposable { private startPromise = createDeferred(); private launchTimeout: NodeJS.Timer | number; private output = ''; - private subscriptions: Subscription[] = []; + private subscriptions: IDisposable[] = []; public readonly ready = this.startPromise.promise; constructor( @@ -58,30 +58,29 @@ export class JupyterConnectionWaiter implements IDisposable { } // Listen on stderr for its connection information this.subscriptions.push( - launchResult.out.subscribe( - (output: Output) => { - traceVerbose(output.out); - this.output += output.out; - if (RegExpValues.HttpPattern.exec(this.output) && !this.startPromise.completed) { - // .then so that we can keep from pushing aync up to the subscribed observable function - this.getServerInfo() - .then((serverInfos) => this.getJupyterURL(serverInfos, this.output)) - .catch((ex) => traceWarning('Failed to get server info', ex)); - } + launchResult.out.onDidChange((output: Output) => { + traceVerbose(output.out); + this.output += output.out; + if (RegExpValues.HttpPattern.exec(this.output) && !this.startPromise.completed) { + // .then so that we can keep from pushing aync up to the subscribed observable function + this.getServerInfo() + .then((serverInfos) => this.getJupyterURL(serverInfos, this.output)) + .catch((ex) => traceWarning('Failed to get server info', ex)); + } - // Sometimes jupyter will return a 403 error. Not sure why. We used - // to fail on this, but it looks like jupyter works with this error in place. - }, - (e) => this.rejectStartPromise(e), - // If the process dies, we can't extract connection information. - () => this.rejectStartPromise(DataScience.jupyterServerCrashed(exitCode)) - ) + // Sometimes jupyter will return a 403 error. Not sure why. We used + // to fail on this, but it looks like jupyter works with this error in place. + }) ); + // If the process dies, we can't extract connection information. + launchResult.out.done + .then(() => this.rejectStartPromise(DataScience.jupyterServerCrashed(exitCode))) + .catch((e) => this.rejectStartPromise(e)); } public dispose() { // eslint-disable-next-line @typescript-eslint/no-explicit-any clearTimeout(this.launchTimeout as any); - this.subscriptions.forEach((d) => d.unsubscribe()); + dispose(this.subscriptions); } // From a list of jupyter server infos try to find the matching jupyter that we launched diff --git a/src/kernels/jupyter/launcher/jupyterConnectionWaiter.node.unit.test.ts b/src/kernels/jupyter/launcher/jupyterConnectionWaiter.node.unit.test.ts index d5314e07b30..13d78d4408f 100644 --- a/src/kernels/jupyter/launcher/jupyterConnectionWaiter.node.unit.test.ts +++ b/src/kernels/jupyter/launcher/jupyterConnectionWaiter.node.unit.test.ts @@ -10,22 +10,23 @@ import { CancellationToken, Uri } from 'vscode'; import { IJupyterRequestAgentCreator, IJupyterRequestCreator, JupyterServerInfo } from '../types'; import chaiAsPromised from 'chai-as-promised'; import events from 'events'; -import { Subject } from 'rxjs/Subject'; import sinon from 'sinon'; import { JupyterSettings } from '../../../platform/common/configSettings'; import { ConfigurationService } from '../../../platform/common/configuration/service.node'; import { IFileSystemNode } from '../../../platform/common/platform/types.node'; import { Output, ObservableExecutionResult } from '../../../platform/common/process/types.node'; -import { IConfigurationService, IJupyterSettings } from '../../../platform/common/types'; +import { IConfigurationService, IDisposable, IJupyterSettings } from '../../../platform/common/types'; import { DataScience } from '../../../platform/common/utils/localize'; import { EXTENSION_ROOT_DIR } from '../../../platform/constants.node'; import { ServiceContainer } from '../../../platform/ioc/container'; import { IServiceContainer } from '../../../platform/ioc/types'; import { JupyterConnectionWaiter } from './jupyterConnectionWaiter.node'; import { noop } from '../../../test/core'; +import { createObservable } from '../../../platform/common/process/proc.node'; +import { dispose } from '../../../platform/common/utils/lifecycle'; use(chaiAsPromised); suite('Jupyter Connection Waiter', async () => { - let observableOutput: Subject>; + let observableOutput: ReturnType>>; let launchResult: ObservableExecutionResult; let getServerInfoStub: sinon.SinonStub<[CancellationToken | undefined], JupyterServerInfo[] | undefined>; let configService: IConfigurationService; @@ -35,6 +36,7 @@ suite('Jupyter Connection Waiter', async () => { const dsSettings: IJupyterSettings = { jupyterLaunchTimeout: 10_000 } as any; const childProc = new events.EventEmitter(); const notebookDir = Uri.file('someDir'); + const disposables: IDisposable[] = []; const dummyServerInfos: JupyterServerInfo[] = [ { base_url: 'http://localhost1:1', @@ -73,7 +75,8 @@ suite('Jupyter Connection Waiter', async () => { const expectedServerInfo = dummyServerInfos[1]; setup(() => { - observableOutput = new Subject>(); + observableOutput = createObservable>(); + disposables.push(observableOutput); launchResult = { dispose: noop, out: observableOutput, @@ -96,6 +99,7 @@ suite('Jupyter Connection Waiter', async () => { instance(mock()) ); }); + teardown(() => dispose(disposables)); function createConnectionWaiter() { return new JupyterConnectionWaiter( @@ -111,7 +115,7 @@ suite('Jupyter Connection Waiter', async () => { test('Successfully gets connection info', async () => { (dsSettings).jupyterLaunchTimeout = 10_000; const waiter = createConnectionWaiter(); - observableOutput.next({ source: 'stderr', out: 'Jupyter listening on http://localhost2:2' }); + observableOutput.fire({ source: 'stderr', out: 'Jupyter listening on http://localhost2:2' }); const connection = await waiter.ready; @@ -133,7 +137,7 @@ suite('Jupyter Connection Waiter', async () => { const promise = waiter.ready; childProc.emit('exit', exitCode); - observableOutput.complete(); + observableOutput.resolve(); await assert.isRejected(promise, DataScience.jupyterServerCrashed(exitCode)); }); diff --git a/src/kernels/jupyter/launcher/jupyterServerStarter.node.ts b/src/kernels/jupyter/launcher/jupyterServerStarter.node.ts index b0be3155b07..116345b6e7e 100644 --- a/src/kernels/jupyter/launcher/jupyterServerStarter.node.ts +++ b/src/kernels/jupyter/launcher/jupyterServerStarter.node.ts @@ -101,7 +101,7 @@ export class JupyterServerStarter implements IJupyterServerStarter { // Watch for premature exits if (launchResult.proc) { launchResult.proc.on('exit', (c: number | null) => (exitCode = c)); - launchResult.out.subscribe((out) => this.jupyterOutputChannel.append(out.out)); + launchResult.out.onDidChange((out) => this.jupyterOutputChannel.append(out.out)); } // Make sure this process gets cleaned up. We might be canceled before the connection finishes. diff --git a/src/kernels/raw/finder/pythonKernelInterruptDaemon.node.ts b/src/kernels/raw/finder/pythonKernelInterruptDaemon.node.ts index 3cbff147c94..1a6ecf5b2a5 100644 --- a/src/kernels/raw/finder/pythonKernelInterruptDaemon.node.ts +++ b/src/kernels/raw/finder/pythonKernelInterruptDaemon.node.ts @@ -139,7 +139,7 @@ export class PythonKernelInterruptDaemon { await new Promise((resolve, reject) => { let started = false; - const subscription = proc.out.subscribe((out) => { + const subscription = proc.out.onDidChange((out) => { traceInfoIfCI( `Output from interrupt daemon started = ${started}, output (${out.source}) = ${out.out} ('END)` ); @@ -208,7 +208,7 @@ export class PythonKernelInterruptDaemon { } } }); - this.disposableRegistry.push(new Disposable(() => subscription.unsubscribe())); + this.disposableRegistry.push(subscription); }); this.disposableRegistry.push(new Disposable(() => swallowExceptions(() => proc.proc?.kill()))); // Added for logging to see if this process dies. diff --git a/src/kernels/raw/launcher/kernelProcess.node.ts b/src/kernels/raw/launcher/kernelProcess.node.ts index 786de815b63..d771926e284 100644 --- a/src/kernels/raw/launcher/kernelProcess.node.ts +++ b/src/kernels/raw/launcher/kernelProcess.node.ts @@ -207,50 +207,48 @@ export class KernelProcess implements IKernelProcess { } let sawKernelConnectionFile = false; - exeObs.out.subscribe( - (output) => { - if (output.source === 'stderr') { - output.out = stripUnwantedMessages(output.out); - // Capture stderr, incase kernel doesn't start. - stderr += output.out; - - if (output.out.trim().length) { - traceWarning(`StdErr from Kernel Process ${output.out.trim()}`); - } - } else { - stdout += output.out; - // Strip unwanted stuff from the output, else it just chews up unnecessary space. - if (!sawKernelConnectionFile) { - stdout = stdout.replace(kernelOutputToNotLog, ''); - stdout = stdout.replace(kernelOutputToNotLog.split(/\r?\n/).join(os.EOL), ''); - // Strip the leading space, as we've removed some leading text. - stdout = stdout.trimStart(); - const lines = splitLines(stdout, { trim: true, removeEmptyEntries: true }); - if ( - lines.length === 2 && - lines[0] === kernelOutputWithConnectionFile && - lines[1].startsWith('--existing') && - lines[1].endsWith('.json') - ) { - stdout = `${lines.join(' ')}${os.EOL}`; - } - } - if (stdout.includes(kernelOutputWithConnectionFile)) { - sawKernelConnectionFile = true; + exeObs.out.onDidChange((output) => { + if (output.source === 'stderr') { + output.out = stripUnwantedMessages(output.out); + // Capture stderr, incase kernel doesn't start. + stderr += output.out; + + if (output.out.trim().length) { + traceWarning(`StdErr from Kernel Process ${output.out.trim()}`); + } + } else { + stdout += output.out; + // Strip unwanted stuff from the output, else it just chews up unnecessary space. + if (!sawKernelConnectionFile) { + stdout = stdout.replace(kernelOutputToNotLog, ''); + stdout = stdout.replace(kernelOutputToNotLog.split(/\r?\n/).join(os.EOL), ''); + // Strip the leading space, as we've removed some leading text. + stdout = stdout.trimStart(); + const lines = splitLines(stdout, { trim: true, removeEmptyEntries: true }); + if ( + lines.length === 2 && + lines[0] === kernelOutputWithConnectionFile && + lines[1].startsWith('--existing') && + lines[1].endsWith('.json') + ) { + stdout = `${lines.join(' ')}${os.EOL}`; } - traceVerbose(`Kernel Output: ${stdout}`); } - this.sendToOutput(output.out); - }, - (error) => { - if (this.disposed) { - traceWarning('Kernel died', error, stderr); - return; + if (stdout.includes(kernelOutputWithConnectionFile)) { + sawKernelConnectionFile = true; } - traceError('Kernel died', error, stderr); - deferred.reject(error); + traceVerbose(`Kernel Output: ${stdout}`); } - ); + this.sendToOutput(output.out); + }); + exeObs.out.done.catch((error) => { + if (this.disposed) { + traceWarning('Kernel died', error, stderr); + return; + } + traceError('Kernel died', error, stderr); + deferred.reject(error); + }); // Don't return until our heartbeat channel is open for connections or the kernel died or we timed out try { diff --git a/src/kernels/raw/launcher/kernelProcess.node.unit.test.ts b/src/kernels/raw/launcher/kernelProcess.node.unit.test.ts index 842a8a48ec9..1e671d2440a 100644 --- a/src/kernels/raw/launcher/kernelProcess.node.unit.test.ts +++ b/src/kernels/raw/launcher/kernelProcess.node.unit.test.ts @@ -25,8 +25,6 @@ import { IDisposable, IJupyterSettings, IOutputChannel } from '../../../platform import { CancellationTokenSource, Uri } from 'vscode'; import { dispose } from '../../../platform/common/helpers'; import { noop } from '../../../test/core'; -import { Observable } from 'rxjs/Observable'; -import { Subject } from 'rxjs/Subject'; import { ChildProcess } from 'child_process'; import { EventEmitter } from 'stream'; import { PythonKernelInterruptDaemon } from '../finder/pythonKernelInterruptDaemon.node'; @@ -37,6 +35,7 @@ import { IS_REMOTE_NATIVE_TEST } from '../../../test/constants'; import { traceInfo } from '../../../platform/logging'; import { IPlatformService } from '../../../platform/common/platform/types'; import { IPythonExecutionFactory, IPythonExecutionService } from '../../../platform/interpreter/types.node'; +import { createObservable } from '../../../platform/common/process/proc.node'; suite('kernel Process', () => { let kernelProcess: KernelProcess; @@ -64,7 +63,7 @@ suite('kernel Process', () => { let processService: IProcessService; let pythonProcess: IPythonExecutionService; const disposables: IDisposable[] = []; - let observableOutput: Observable>; + let observableOutput: ReturnType>>; let daemon: PythonKernelInterruptDaemon; let proc: ChildProcess; let jupyterPaths: JupyterPaths; @@ -84,7 +83,8 @@ suite('kernel Process', () => { jupyterSettings = mock(); pythonProcess = mock(); (instance(processService) as any).then = undefined; - observableOutput = new Subject>(); + observableOutput = createObservable>(); + disposables.push(observableOutput); proc = mock(); daemon = mock(); const eventEmitter = new EventEmitter(); @@ -457,9 +457,11 @@ suite('Kernel Process', () => { processService = mock(); const instanceOfExecutionService = instance(processService); (instanceOfExecutionService as any).then = undefined; + const out = createObservable>(); + disposables.push(out); const observableProc: ObservableExecutionResult = { dispose: noop, - out: { subscribe: noop } as any, + out, proc: { stdout: new EventEmitter(), stderr: new EventEmitter(), diff --git a/src/platform/common/process/proc.node.ts b/src/platform/common/process/proc.node.ts index c558a1ab9d3..7f6af1b2f83 100644 --- a/src/platform/common/process/proc.node.ts +++ b/src/platform/common/process/proc.node.ts @@ -2,11 +2,9 @@ // Licensed under the MIT License. import { exec, execSync, spawn } from 'child_process'; -import { Observable } from 'rxjs/Observable'; -import { CancellationError, Disposable } from 'vscode'; +import { CancellationError, Disposable, EventEmitter } from 'vscode'; import { ignoreLogging, traceDecoratorVerbose, traceInfoIfCI } from '../../logging'; import { TraceOptions } from '../../logging/types'; - import { IDisposable } from '../types'; import { createDeferred } from '../utils/async'; import { EnvironmentVariables } from '../variables/types'; @@ -22,6 +20,8 @@ import { StdErrError } from './types.node'; import { logProcess } from './logger.node'; +import { dispose } from '../utils/lifecycle'; +import { noop } from '../utils/misc'; export class BufferDecoder implements IBufferDecoder { public decode(buffers: Buffer[]): string { @@ -83,6 +83,7 @@ export class ProcessService implements IProcessService { const proc = spawn(file, args, spawnOptions); let procExited = false; traceInfoIfCI(`Exec observable ${file}, ${args.join(' ')}`); + const disposables: IDisposable[] = []; const disposable: IDisposable = { // eslint-disable-next-line dispose: function () { @@ -92,56 +93,56 @@ export class ProcessService implements IProcessService { if (proc) { proc.unref(); } + dispose(disposables); } }; this.processesToKill.add(disposable); - const output = new Observable>((subscriber) => { - const disposables: IDisposable[] = []; + const output = createObservable>(); + disposables.push(output); - const on = (ee: NodeJS.EventEmitter, name: string, fn: Function) => { - ee.on(name, fn as any); - disposables.push({ dispose: () => ee.removeListener(name, fn as any) as any }); - }; + const on = (ee: NodeJS.EventEmitter, name: string, fn: Function) => { + ee.on(name, fn as any); + disposables.push({ dispose: () => ee.removeListener(name, fn as any) as any }); + }; - if (options.token) { - disposables.push( - options.token.onCancellationRequested(() => { - if (!procExited && !proc.killed) { - ProcessService.kill(proc.pid); - procExited = true; - } - }) - ); - } + if (options.token) { + disposables.push( + options.token.onCancellationRequested(() => { + if (!procExited && !proc.killed) { + ProcessService.kill(proc.pid); + procExited = true; + } + }) + ); + } - const sendOutput = (source: 'stdout' | 'stderr', data: Buffer) => { - const out = this.decoder.decode([data]); - if (source === 'stderr' && options.throwOnStdErr) { - subscriber.error(new StdErrError(out)); - } else { - subscriber.next({ source, out: out }); - } - }; + const sendOutput = (source: 'stdout' | 'stderr', data: Buffer) => { + const out = this.decoder.decode([data]); + if (source === 'stderr' && options.throwOnStdErr) { + output.reject(new StdErrError(out)); + } else { + output.fire({ source, out: out }); + } + }; - on(proc.stdout!, 'data', (data: Buffer) => sendOutput('stdout', data)); - on(proc.stderr!, 'data', (data: Buffer) => sendOutput('stderr', data)); + on(proc.stdout!, 'data', (data: Buffer) => sendOutput('stdout', data)); + on(proc.stderr!, 'data', (data: Buffer) => sendOutput('stderr', data)); - proc.once('close', () => { - procExited = true; - subscriber.complete(); - disposables.forEach((d) => d.dispose()); - }); - proc.once('exit', () => { - procExited = true; - subscriber.complete(); - disposables.forEach((d) => d.dispose()); - }); - proc.once('error', (ex) => { - procExited = true; - subscriber.error(ex); - disposables.forEach((d) => d.dispose()); - }); + proc.once('close', () => { + procExited = true; + output.resolve(); + disposables.forEach((d) => d.dispose()); + }); + proc.once('exit', () => { + procExited = true; + output.resolve(); + disposables.forEach((d) => d.dispose()); + }); + proc.once('error', (ex) => { + procExited = true; + output.reject(ex); + disposables.forEach((d) => d.dispose()); }); logProcess(file, args, options); @@ -275,3 +276,24 @@ export class ProcessService implements IProcessService { return defaultOptions; } } + +export function createObservable() { + const onDidChange = new EventEmitter(); + const promise = createDeferred(); + // No dangling promises. + promise.promise.catch(noop); + return { + get onDidChange() { + return onDidChange.event; + }, + get done() { + return promise.promise; + }, + resolve: promise.resolve.bind(promise), + reject: promise.reject.bind(promise), + fire: onDidChange.fire.bind(onDidChange), + dispose: () => { + onDidChange.dispose(); + } + }; +} diff --git a/src/platform/common/process/types.node.ts b/src/platform/common/process/types.node.ts index 8a378335098..1b4838ee748 100644 --- a/src/platform/common/process/types.node.ts +++ b/src/platform/common/process/types.node.ts @@ -2,8 +2,7 @@ // Licensed under the MIT License. import { ChildProcess, ExecOptions, SpawnOptions as ChildProcessSpawnOptions } from 'child_process'; -import { Observable } from 'rxjs/Observable'; -import { CancellationToken } from 'vscode'; +import { CancellationToken, Event } from 'vscode'; import { BaseError } from '../../errors/types'; import { IDisposable, Resource } from '../types'; @@ -17,9 +16,15 @@ export type Output = { source: 'stdout' | 'stderr'; out: T; }; + +export interface ObservableOutput { + onDidChange: Event; + done: Promise; +} + export type ObservableExecutionResult = { proc: ChildProcess | undefined; - out: Observable>; + out: ObservableOutput>; dispose(): void; }; diff --git a/src/platform/interpreter/installer/moduleInstaller.node.ts b/src/platform/interpreter/installer/moduleInstaller.node.ts index 35e35f79765..10e55db6071 100644 --- a/src/platform/interpreter/installer/moduleInstaller.node.ts +++ b/src/platform/interpreter/installer/moduleInstaller.node.ts @@ -18,6 +18,8 @@ import { PackageNotInstalledWindowsLongPathNotEnabledError } from '../../errors/ import { splitLines } from '../../common/helpers'; import { IPythonExecutionFactory } from '../types.node'; import { Environment } from '@vscode/python-extension'; +import { IDisposable } from '../../common/types'; +import { dispose } from '../../common/utils/lifecycle'; export type ExecutionInstallArgs = { args: string[]; @@ -65,11 +67,16 @@ export abstract class ModuleInstaller implements IModuleInstaller { token: CancellationToken ) => { const deferred = createDeferred(); + const disposables: IDisposable[] = []; // When the progress is canceled notify caller - token.onCancellationRequested(() => { - cancelTokenSource.cancel(); - deferred.resolve(); - }); + token.onCancellationRequested( + () => { + cancelTokenSource.cancel(); + deferred.resolve(); + }, + this, + disposables + ); let observable: ObservableExecutionResult | undefined; @@ -118,8 +125,8 @@ export abstract class ModuleInstaller implements IModuleInstaller { const ticker = ['.', '..', '...']; let counter = 0; if (observable) { - observable.out.subscribe({ - next: (output) => { + observable.out.onDidChange( + (output) => { const suffix = ticker[counter % 3]; const trimmedOutput = output.out.trim(); counter += 1; @@ -145,40 +152,46 @@ export abstract class ModuleInstaller implements IModuleInstaller { lastStdErr = output.out; } }, - complete: () => { - if (observable?.proc?.exitCode !== 0) { - // https://github.com/microsoft/vscode-jupyter/issues/12703 - // `ERROR: Could not install packages due to an OSError: [Errno 2] No such file or directory: 'C:\\Users\\donjayamanne\\AppData\\Local\\Packages\\PythonSoftwareFoundation.Python.3.10_qbz5n2kfra8p0\\LocalCache\\local-packages\\Python310\\site-packages\\jedi\\third_party\\typeshed\\third_party\\2and3\\requests\\packages\\urllib3\\packages\\ssl_match_hostname\\_implementation.pyi' - // HINT: This error might have occurred since this system does not have Windows Long Path support enabled. You can find information on how to enable this at https://pip.pypa.io/warnings/enable-long-paths`; - // Remove the `[notice]` lines from the error messages - if ( - couldNotInstallErr && - couldNotInstallErr.includes('https://pip.pypa.io/warnings/enable-long-paths') - ) { - couldNotInstallErr = splitLines(couldNotInstallErr, { - trim: true, - removeEmptyEntries: true - }) - .filter((line) => !line.startsWith('[notice]')) - .join(EOL); - deferred.reject( - new PackageNotInstalledWindowsLongPathNotEnabledError( - productOrModuleName, - interpreter, - couldNotInstallErr - ) - ); + this, + disposables + ); + observable.out.done + .then( + () => { + if (observable?.proc?.exitCode !== 0) { + // https://github.com/microsoft/vscode-jupyter/issues/12703 + // `ERROR: Could not install packages due to an OSError: [Errno 2] No such file or directory: 'C:\\Users\\donjayamanne\\AppData\\Local\\Packages\\PythonSoftwareFoundation.Python.3.10_qbz5n2kfra8p0\\LocalCache\\local-packages\\Python310\\site-packages\\jedi\\third_party\\typeshed\\third_party\\2and3\\requests\\packages\\urllib3\\packages\\ssl_match_hostname\\_implementation.pyi' + // HINT: This error might have occurred since this system does not have Windows Long Path support enabled. You can find information on how to enable this at https://pip.pypa.io/warnings/enable-long-paths`; + // Remove the `[notice]` lines from the error messages + if ( + couldNotInstallErr && + couldNotInstallErr.includes('https://pip.pypa.io/warnings/enable-long-paths') + ) { + couldNotInstallErr = splitLines(couldNotInstallErr, { + trim: true, + removeEmptyEntries: true + }) + .filter((line) => !line.startsWith('[notice]')) + .join(EOL); + deferred.reject( + new PackageNotInstalledWindowsLongPathNotEnabledError( + productOrModuleName, + interpreter, + couldNotInstallErr + ) + ); + } else { + deferred.reject(lastStdErr || observable?.proc?.exitCode); + } } else { - deferred.reject(lastStdErr || observable?.proc?.exitCode); + deferred.resolve(); } - } else { - deferred.resolve(); + }, + (err: unknown) => { + deferred.reject(err); } - }, - error: (err: unknown) => { - deferred.reject(err); - } - }); + ) + .finally(() => dispose(disposables)); } return deferred.promise; }; diff --git a/src/platform/interpreter/installer/pipInstaller.unit.test.ts b/src/platform/interpreter/installer/pipInstaller.unit.test.ts index 6ded952720f..77c3c606830 100644 --- a/src/platform/interpreter/installer/pipInstaller.unit.test.ts +++ b/src/platform/interpreter/installer/pipInstaller.unit.test.ts @@ -14,10 +14,10 @@ import { Product } from '../../../platform/interpreter/installer/types'; import { IDisposable } from '../../../platform/common/types'; import { dispose } from '../../../platform/common/helpers'; import { IApplicationShell, IWorkspaceService } from '../../../platform/common/application/types'; -import { ReplaySubject } from 'rxjs/ReplaySubject'; import { ChildProcess } from 'child_process'; import { IPythonExecutionFactory, IPythonExecutionService } from '../../../platform/interpreter/types.node'; import { noop } from '../../../test/core'; +import { createObservable } from '../../common/process/proc.node'; suite('Pip installer', async () => { let serviceContainer: IServiceContainer; @@ -26,7 +26,7 @@ suite('Pip installer', async () => { let pythonExecutionService: IPythonExecutionService; let proc: ChildProcess; const disposables: IDisposable[] = []; - let subject: ReplaySubject>; + let subject: ReturnType>>; setup(() => { serviceContainer = mock(); pythonExecutionFactory = mock(); @@ -55,7 +55,8 @@ suite('Pip installer', async () => { when(workspaceConfig.get('proxy', '')).thenReturn(''); proc = mock(); - subject = new ReplaySubject>(); + subject = createObservable>(); + disposables.push(subject); when(pythonExecutionService.execObservable(anything(), anything())).thenReturn({ dispose: noop, out: subject, @@ -141,8 +142,8 @@ suite('Pip installer', async () => { new Error('Unable to check if module is installed') ); when(proc.exitCode).thenReturn(0); - subject.next({ out: '', source: 'stdout' }); - subject.complete(); + subject.fire({ out: '', source: 'stdout' }); + subject.resolve(); const cancellationToken = new CancellationTokenSource(); disposables.push(cancellationToken); diff --git a/src/platform/terminals/codeExecution/codeExecutionHelper.node.ts b/src/platform/terminals/codeExecution/codeExecutionHelper.node.ts index 3addb735cde..318935bc1e8 100644 --- a/src/platform/terminals/codeExecution/codeExecutionHelper.node.ts +++ b/src/platform/terminals/codeExecution/codeExecutionHelper.node.ts @@ -6,13 +6,14 @@ import { Uri } from 'vscode'; import { traceError } from '../../logging'; import * as internalScripts from '../../interpreter/internal/scripts/index.node'; -import { createDeferred } from '../../common/utils/async'; import { getFilePath } from '../../common/platform/fs-paths'; import { CodeExecutionHelperBase } from './codeExecutionHelper'; import { IProcessServiceFactory } from '../../common/process/types.node'; import { IInterpreterService } from '../../interpreter/contracts'; import { IServiceContainer } from '../../ioc/types'; import { splitLines } from '../../common/helpers'; +import { IDisposable } from '../../common/types'; +import { dispose } from '../../common/utils/lifecycle'; /** * Node version of the code execution helper. Node version is necessary because we can't create processes in the web version. @@ -29,6 +30,7 @@ export class CodeExecutionHelper extends CodeExecutionHelperBase { } public override async normalizeLines(code: string, resource?: Uri): Promise { + const disposables: IDisposable[] = []; try { const codeTrimmed = code.trim(); if (codeTrimmed.length === 0) { @@ -49,20 +51,18 @@ export class CodeExecutionHelper extends CodeExecutionHelperBase { const observable = processService.execObservable(getFilePath(interpreter?.uri) || 'python', args, { throwOnStdErr: true }); - const normalizeOutput = createDeferred(); // Read result from the normalization script from stdout, and resolve the promise when done. let normalized = ''; - observable.out.subscribe({ - next: (output) => { + observable.out.onDidChange( + (output) => { if (output.source === 'stdout') { normalized += output.out; } }, - complete: () => { - normalizeOutput.resolve(normalized); - } - }); + this, + disposables + ); // The normalization script expects a serialized JSON object, with the selection under the "code" key. // We're using a JSON object so that we don't have to worry about encoding, or escaping non-ASCII characters. @@ -71,7 +71,8 @@ export class CodeExecutionHelper extends CodeExecutionHelperBase { observable.proc?.stdin?.end(); // We expect a serialized JSON object back, with the normalized code under the "normalized" key. - const result = await normalizeOutput.promise; + await observable.out.done; + const result = normalized; const object = JSON.parse(result); const normalizedLines = parse(object.normalized); @@ -94,6 +95,8 @@ export class CodeExecutionHelper extends CodeExecutionHelperBase { } catch (ex) { traceError(ex, 'Python: Failed to normalize code for execution in Interactive Window'); return code; + } finally { + dispose(disposables); } } } diff --git a/src/test/datascience/jupyterServer.node.ts b/src/test/datascience/jupyterServer.node.ts index f97be649c72..96228bd2501 100644 --- a/src/test/datascience/jupyterServer.node.ts +++ b/src/test/datascience/jupyterServer.node.ts @@ -13,10 +13,10 @@ import uuid from 'uuid/v4'; import * as path from 'path'; import * as fs from 'fs-extra'; import * as child_process from 'child_process'; +import { Event, EventEmitter } from '@c4312/evt'; const uuidToHex = require('uuid-to-hex') as typeof import('uuid-to-hex'); import { EXTENSION_ROOT_DIR_FOR_TESTS } from '../constants.node'; import { dispose, splitLines } from '../../platform/common/helpers'; -import { Observable } from 'rxjs-compat/Observable'; const testFolder = path.join(EXTENSION_ROOT_DIR_FOR_TESTS, 'src', 'test', 'datascience'); import { sleep } from '../core'; import { EXTENSION_ROOT_DIR } from '../../platform/constants.node'; @@ -51,9 +51,14 @@ type Output = { out: T; }; -type ObservableExecutionResult = { +export interface ObservableOutput { + onDidChange: Event; + done: Promise; +} + +export type ObservableExecutionResult = { proc: child_process.ChildProcess | undefined; - out: Observable>; + out: ObservableOutput>; dispose(): void; }; @@ -286,7 +291,7 @@ export class JupyterServer { } }; let allOutput = ''; - const subscription = result.out.subscribe((output) => { + const subscription = result.out.onDidChange((output) => { allOutput += output.out; // When debugging Web Tests using VSCode dfebugger, we'd like to see this info. @@ -319,7 +324,7 @@ export class JupyterServer { } }); this._disposables.push(procDisposable); - this._disposables.push({ dispose: () => subscription.unsubscribe() }); + this._disposables.push(subscription); } catch (ex) { console.error(`Starting remote jupyter server failed`, ex); reject(ex); @@ -334,6 +339,7 @@ export class JupyterServer { ): ObservableExecutionResult { const proc = child_process.spawn(file, args, options); let procExited = false; + const disposables: IDisposable[] = []; const disposable: IDisposable = { // eslint-disable-next-line dispose: function () { @@ -343,40 +349,39 @@ export class JupyterServer { if (proc) { proc.unref(); } + disposables.forEach((d) => d.dispose()); } }; - const output = new Observable>((subscriber) => { - const disposables: IDisposable[] = []; - - const on = (ee: NodeJS.EventEmitter, name: string, fn: Function) => { - ee.on(name, fn as any); - disposables.push({ dispose: () => ee.removeListener(name, fn as any) as any }); - }; + const output = createObservable>(); + disposables.push(output); + const on = (ee: NodeJS.EventEmitter, name: string, fn: Function) => { + ee.on(name, fn as any); + disposables.push({ dispose: () => ee.removeListener(name, fn as any) as any }); + }; - const sendOutput = (source: 'stdout' | 'stderr', data: Buffer) => { - const out = this.decoder.decode([data]); - subscriber.next({ source, out: out }); - }; + const sendOutput = (source: 'stdout' | 'stderr', data: Buffer) => { + const out = this.decoder.decode([data]); + output.fire({ source, out: out }); + }; - on(proc.stdout!, 'data', (data: Buffer) => sendOutput('stdout', data)); - on(proc.stderr!, 'data', (data: Buffer) => sendOutput('stderr', data)); + on(proc.stdout!, 'data', (data: Buffer) => sendOutput('stdout', data)); + on(proc.stderr!, 'data', (data: Buffer) => sendOutput('stderr', data)); - proc.once('close', () => { - procExited = true; - subscriber.complete(); - disposables.forEach((d) => d.dispose()); - }); - proc.once('exit', () => { - procExited = true; - subscriber.complete(); - disposables.forEach((d) => d.dispose()); - }); - proc.once('error', (ex) => { - procExited = true; - subscriber.error(ex); - disposables.forEach((d) => d.dispose()); - }); + proc.once('close', () => { + procExited = true; + output.resolve(); + disposables.forEach((d) => d.dispose()); + }); + proc.once('exit', () => { + procExited = true; + output.reject(); + disposables.forEach((d) => d.dispose()); + }); + proc.once('error', (ex) => { + procExited = true; + output.reject(ex); + disposables.forEach((d) => d.dispose()); }); return { @@ -422,3 +427,29 @@ function genRandomString(length = 16) { .toString('hex') /** convert to hexadecimal format */ .slice(0, length); /** return required number of characters */ } + +function createObservable() { + const onDidChange = new EventEmitter(); + let resolve: () => void = noop; + let reject: (reason?: any) => void = noop; + const promise = new Promise((rs, rj) => { + resolve = rs; + reject = rj; + }); + // No dangling promises. + promise.catch(noop); + return { + get onDidChange() { + return onDidChange.event; + }, + get done() { + return promise; + }, + resolve: resolve.bind(promise), + reject: reject.bind(promise), + fire: onDidChange.fire.bind(onDidChange), + dispose: () => { + onDidChange.dispose(); + } + }; +} diff --git a/src/test/datascience/notebook/executionService.mock.vscode.test.ts b/src/test/datascience/notebook/executionService.mock.vscode.test.ts index f3ee4e6cfaf..029818da27b 100644 --- a/src/test/datascience/notebook/executionService.mock.vscode.test.ts +++ b/src/test/datascience/notebook/executionService.mock.vscode.test.ts @@ -30,7 +30,6 @@ // INotebookProvider // } from '../../../platform/datascience/types'; // import { instance, mock, when } from 'ts-mockito'; -// import { Subject } from 'rxjs-compat/Subject'; // import { EventEmitter, NotebookDocument } from 'vscode'; // import { ServerStatus } from '../../../webviews/webview-side/interactive-common/mainState'; // import { MockJupyterSession } from '../mockJupyterSession'; diff --git a/types/dom.fix.rx.compiler.d.ts b/types/dom.fix.rx.compiler.d.ts deleted file mode 100644 index c0eded6b890..00000000000 --- a/types/dom.fix.rx.compiler.d.ts +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -/** - * These are fake dom type definitions that rxjs depends on. - * Another solution is to add the 'dom' lib to tsconfig, but that's even worse. - * We don't need dom, as the extension does nothing with the dom (dom = HTML entities and the like). - */ -/* eslint-disable @typescript-eslint/naming-convention */ -interface EventTarget {} -interface NodeList {} -interface HTMLCollection {} -interface XMLHttpRequest {} -interface Event {} -interface MessageEvent {} -interface CloseEvent {} -interface WebSocket {}