Skip to content

Commit

Permalink
Remove rxjs (#14617)
Browse files Browse the repository at this point in the history
* Remove rxjs

* Add event emitter to jupyterServer.node.ts

* Remove rxjs

* Dispose everything

* Fix tests
  • Loading branch information
DonJayamanne authored Oct 30, 2023
1 parent a727344 commit 3511846
Show file tree
Hide file tree
Showing 18 changed files with 292 additions and 240 deletions.
2 changes: 1 addition & 1 deletion .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ module.exports = {
'@typescript-eslint/no-restricted-imports': [
'error',
{
paths: ['lodash', 'rxjs', 'lodash/noop', 'rxjs/util/noop'],
paths: ['lodash', 'lodash/noop'],
patterns: [
{
group: ['@jupyterlab/*'],
Expand Down
12 changes: 0 additions & 12 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2168,8 +2168,6 @@
"redux": "^4.0.4",
"redux-logger": "^3.0.6",
"reflect-metadata": "^0.1.12",
"rxjs": "^6.5.4",
"rxjs-compat": "^6.5.4",
"safe-buffer": "^5.2.1",
"sanitize-filename": "^1.6.3",
"semver": "^5.7.2",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import { assert, expect, use } from 'chai';
import chaiPromise from 'chai-as-promised';
import * as path from '../../../platform/vscode-path/path';
import * as sinon from 'sinon';
import { Subject } from 'rxjs/Subject';
import { anything, capture, deepEqual, instance, mock, when } from 'ts-mockito';
import { Uri } from 'vscode';
import { ObservableExecutionResult, Output } from '../../../platform/common/process/types.node';
Expand All @@ -26,6 +25,9 @@ import { JupyterInterpreterDependencyService } from './jupyterInterpreterDepende
import { JupyterInterpreterService } from './jupyterInterpreterService.node';
import { JupyterInterpreterSubCommandExecutionService } from './jupyterInterpreterSubCommandExecutionService.node';
import { noop } from '../../../test/core';
import { createObservable } from '../../../platform/common/process/proc.node';
import { IDisposable } from '../../../platform/common/types';
import { dispose } from '../../../platform/common/utils/lifecycle';
use(chaiPromise);

/* eslint-disable */
Expand All @@ -39,6 +41,7 @@ suite('Jupyter InterpreterSubCommandExecutionService', () => {
const selectedJupyterInterpreter = createPythonInterpreter({ displayName: 'JupyterInterpreter' });
const activePythonInterpreter = createPythonInterpreter({ displayName: 'activePythonInterpreter' });
let notebookStartResult: ObservableExecutionResult<string>;
const disposables: IDisposable[] = [];
setup(() => {
interpreterService = mock<IInterpreterService>();
jupyterInterpreter = mock(JupyterInterpreterService);
Expand All @@ -49,10 +52,12 @@ suite('Jupyter InterpreterSubCommandExecutionService', () => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(instance(execService) as any).then = undefined;
const output = new MockOutputChannel('');
const out = createObservable<Output<string>>();
disposables.push(out);
notebookStartResult = {
dispose: noop,
proc: undefined,
out: new Subject<Output<string>>().asObservable()
out
};
const jupyterPaths = mock<JupyterPaths>();
when(jupyterPaths.getKernelSpecTempRegistrationFolder()).thenResolve(
Expand All @@ -78,6 +83,7 @@ suite('Jupyter InterpreterSubCommandExecutionService', () => {
when(interpreterService.getActiveInterpreter(undefined)).thenResolve(activePythonInterpreter);
});
teardown(() => {
dispose(disposables);
sinon.restore();
});
// eslint-disable-next-line
Expand Down
39 changes: 19 additions & 20 deletions src/kernels/jupyter/launcher/jupyterConnectionWaiter.node.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

import { Subscription } from 'rxjs/Subscription';
import { Disposable, Uri } from 'vscode';
import { IConfigurationService, IDisposable } from '../../../platform/common/types';
import { traceError, traceWarning, traceVerbose } from '../../../platform/logging';
Expand All @@ -20,6 +19,7 @@ import { JupyterNotebookNotInstalled } from '../../../platform/errors/jupyterNot
import { PythonEnvironment } from '../../../platform/pythonEnvironments/info';
import { JupyterCannotBeLaunchedWithRootError } from '../../../platform/errors/jupyterCannotBeLaunchedWithRootError';
import { createJupyterConnectionInfo } from '../jupyterUtils';
import { dispose } from '../../../platform/common/utils/lifecycle';

const urlMatcher = new RegExp(RegExpValues.UrlPatternRegEx);

Expand All @@ -30,7 +30,7 @@ export class JupyterConnectionWaiter implements IDisposable {
private startPromise = createDeferred<IJupyterConnection>();
private launchTimeout: NodeJS.Timer | number;
private output = '';
private subscriptions: Subscription[] = [];
private subscriptions: IDisposable[] = [];
public readonly ready = this.startPromise.promise;

constructor(
Expand Down Expand Up @@ -58,30 +58,29 @@ export class JupyterConnectionWaiter implements IDisposable {
}
// Listen on stderr for its connection information
this.subscriptions.push(
launchResult.out.subscribe(
(output: Output<string>) => {
traceVerbose(output.out);
this.output += output.out;
if (RegExpValues.HttpPattern.exec(this.output) && !this.startPromise.completed) {
// .then so that we can keep from pushing aync up to the subscribed observable function
this.getServerInfo()
.then((serverInfos) => this.getJupyterURL(serverInfos, this.output))
.catch((ex) => traceWarning('Failed to get server info', ex));
}
launchResult.out.onDidChange((output: Output<string>) => {
traceVerbose(output.out);
this.output += output.out;
if (RegExpValues.HttpPattern.exec(this.output) && !this.startPromise.completed) {
// .then so that we can keep from pushing aync up to the subscribed observable function
this.getServerInfo()
.then((serverInfos) => this.getJupyterURL(serverInfos, this.output))
.catch((ex) => traceWarning('Failed to get server info', ex));
}

// Sometimes jupyter will return a 403 error. Not sure why. We used
// to fail on this, but it looks like jupyter works with this error in place.
},
(e) => this.rejectStartPromise(e),
// If the process dies, we can't extract connection information.
() => this.rejectStartPromise(DataScience.jupyterServerCrashed(exitCode))
)
// Sometimes jupyter will return a 403 error. Not sure why. We used
// to fail on this, but it looks like jupyter works with this error in place.
})
);
// If the process dies, we can't extract connection information.
launchResult.out.done
.then(() => this.rejectStartPromise(DataScience.jupyterServerCrashed(exitCode)))
.catch((e) => this.rejectStartPromise(e));
}
public dispose() {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
clearTimeout(this.launchTimeout as any);
this.subscriptions.forEach((d) => d.unsubscribe());
dispose(this.subscriptions);
}

// From a list of jupyter server infos try to find the matching jupyter that we launched
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,23 @@ import { CancellationToken, Uri } from 'vscode';
import { IJupyterRequestAgentCreator, IJupyterRequestCreator, JupyterServerInfo } from '../types';
import chaiAsPromised from 'chai-as-promised';
import events from 'events';
import { Subject } from 'rxjs/Subject';
import sinon from 'sinon';
import { JupyterSettings } from '../../../platform/common/configSettings';
import { ConfigurationService } from '../../../platform/common/configuration/service.node';
import { IFileSystemNode } from '../../../platform/common/platform/types.node';
import { Output, ObservableExecutionResult } from '../../../platform/common/process/types.node';
import { IConfigurationService, IJupyterSettings } from '../../../platform/common/types';
import { IConfigurationService, IDisposable, IJupyterSettings } from '../../../platform/common/types';
import { DataScience } from '../../../platform/common/utils/localize';
import { EXTENSION_ROOT_DIR } from '../../../platform/constants.node';
import { ServiceContainer } from '../../../platform/ioc/container';
import { IServiceContainer } from '../../../platform/ioc/types';
import { JupyterConnectionWaiter } from './jupyterConnectionWaiter.node';
import { noop } from '../../../test/core';
import { createObservable } from '../../../platform/common/process/proc.node';
import { dispose } from '../../../platform/common/utils/lifecycle';
use(chaiAsPromised);
suite('Jupyter Connection Waiter', async () => {
let observableOutput: Subject<Output<string>>;
let observableOutput: ReturnType<typeof createObservable<Output<string>>>;
let launchResult: ObservableExecutionResult<string>;
let getServerInfoStub: sinon.SinonStub<[CancellationToken | undefined], JupyterServerInfo[] | undefined>;
let configService: IConfigurationService;
Expand All @@ -35,6 +36,7 @@ suite('Jupyter Connection Waiter', async () => {
const dsSettings: IJupyterSettings = { jupyterLaunchTimeout: 10_000 } as any;
const childProc = new events.EventEmitter();
const notebookDir = Uri.file('someDir');
const disposables: IDisposable[] = [];
const dummyServerInfos: JupyterServerInfo[] = [
{
base_url: 'http://localhost1:1',
Expand Down Expand Up @@ -73,7 +75,8 @@ suite('Jupyter Connection Waiter', async () => {
const expectedServerInfo = dummyServerInfos[1];

setup(() => {
observableOutput = new Subject<Output<string>>();
observableOutput = createObservable<Output<string>>();
disposables.push(observableOutput);
launchResult = {
dispose: noop,
out: observableOutput,
Expand All @@ -96,6 +99,7 @@ suite('Jupyter Connection Waiter', async () => {
instance(mock<IJupyterRequestAgentCreator>())
);
});
teardown(() => dispose(disposables));

function createConnectionWaiter() {
return new JupyterConnectionWaiter(
Expand All @@ -111,7 +115,7 @@ suite('Jupyter Connection Waiter', async () => {
test('Successfully gets connection info', async () => {
(<any>dsSettings).jupyterLaunchTimeout = 10_000;
const waiter = createConnectionWaiter();
observableOutput.next({ source: 'stderr', out: 'Jupyter listening on http://localhost2:2' });
observableOutput.fire({ source: 'stderr', out: 'Jupyter listening on http://localhost2:2' });

const connection = await waiter.ready;

Expand All @@ -133,7 +137,7 @@ suite('Jupyter Connection Waiter', async () => {

const promise = waiter.ready;
childProc.emit('exit', exitCode);
observableOutput.complete();
observableOutput.resolve();

await assert.isRejected(promise, DataScience.jupyterServerCrashed(exitCode));
});
Expand Down
2 changes: 1 addition & 1 deletion src/kernels/jupyter/launcher/jupyterServerStarter.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ export class JupyterServerStarter implements IJupyterServerStarter {
// Watch for premature exits
if (launchResult.proc) {
launchResult.proc.on('exit', (c: number | null) => (exitCode = c));
launchResult.out.subscribe((out) => this.jupyterOutputChannel.append(out.out));
launchResult.out.onDidChange((out) => this.jupyterOutputChannel.append(out.out));
}

// Make sure this process gets cleaned up. We might be canceled before the connection finishes.
Expand Down
4 changes: 2 additions & 2 deletions src/kernels/raw/finder/pythonKernelInterruptDaemon.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ export class PythonKernelInterruptDaemon {

await new Promise<void>((resolve, reject) => {
let started = false;
const subscription = proc.out.subscribe((out) => {
const subscription = proc.out.onDidChange((out) => {
traceInfoIfCI(
`Output from interrupt daemon started = ${started}, output (${out.source}) = ${out.out} ('END)`
);
Expand Down Expand Up @@ -208,7 +208,7 @@ export class PythonKernelInterruptDaemon {
}
}
});
this.disposableRegistry.push(new Disposable(() => subscription.unsubscribe()));
this.disposableRegistry.push(subscription);
});
this.disposableRegistry.push(new Disposable(() => swallowExceptions(() => proc.proc?.kill())));
// Added for logging to see if this process dies.
Expand Down
78 changes: 38 additions & 40 deletions src/kernels/raw/launcher/kernelProcess.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,50 +207,48 @@ export class KernelProcess implements IKernelProcess {
}

let sawKernelConnectionFile = false;
exeObs.out.subscribe(
(output) => {
if (output.source === 'stderr') {
output.out = stripUnwantedMessages(output.out);
// Capture stderr, incase kernel doesn't start.
stderr += output.out;

if (output.out.trim().length) {
traceWarning(`StdErr from Kernel Process ${output.out.trim()}`);
}
} else {
stdout += output.out;
// Strip unwanted stuff from the output, else it just chews up unnecessary space.
if (!sawKernelConnectionFile) {
stdout = stdout.replace(kernelOutputToNotLog, '');
stdout = stdout.replace(kernelOutputToNotLog.split(/\r?\n/).join(os.EOL), '');
// Strip the leading space, as we've removed some leading text.
stdout = stdout.trimStart();
const lines = splitLines(stdout, { trim: true, removeEmptyEntries: true });
if (
lines.length === 2 &&
lines[0] === kernelOutputWithConnectionFile &&
lines[1].startsWith('--existing') &&
lines[1].endsWith('.json')
) {
stdout = `${lines.join(' ')}${os.EOL}`;
}
}
if (stdout.includes(kernelOutputWithConnectionFile)) {
sawKernelConnectionFile = true;
exeObs.out.onDidChange((output) => {
if (output.source === 'stderr') {
output.out = stripUnwantedMessages(output.out);
// Capture stderr, incase kernel doesn't start.
stderr += output.out;

if (output.out.trim().length) {
traceWarning(`StdErr from Kernel Process ${output.out.trim()}`);
}
} else {
stdout += output.out;
// Strip unwanted stuff from the output, else it just chews up unnecessary space.
if (!sawKernelConnectionFile) {
stdout = stdout.replace(kernelOutputToNotLog, '');
stdout = stdout.replace(kernelOutputToNotLog.split(/\r?\n/).join(os.EOL), '');
// Strip the leading space, as we've removed some leading text.
stdout = stdout.trimStart();
const lines = splitLines(stdout, { trim: true, removeEmptyEntries: true });
if (
lines.length === 2 &&
lines[0] === kernelOutputWithConnectionFile &&
lines[1].startsWith('--existing') &&
lines[1].endsWith('.json')
) {
stdout = `${lines.join(' ')}${os.EOL}`;
}
traceVerbose(`Kernel Output: ${stdout}`);
}
this.sendToOutput(output.out);
},
(error) => {
if (this.disposed) {
traceWarning('Kernel died', error, stderr);
return;
if (stdout.includes(kernelOutputWithConnectionFile)) {
sawKernelConnectionFile = true;
}
traceError('Kernel died', error, stderr);
deferred.reject(error);
traceVerbose(`Kernel Output: ${stdout}`);
}
);
this.sendToOutput(output.out);
});
exeObs.out.done.catch((error) => {
if (this.disposed) {
traceWarning('Kernel died', error, stderr);
return;
}
traceError('Kernel died', error, stderr);
deferred.reject(error);
});

// Don't return until our heartbeat channel is open for connections or the kernel died or we timed out
try {
Expand Down
Loading

0 comments on commit 3511846

Please sign in to comment.