Skip to content

Commit

Permalink
wip: testing
Browse files Browse the repository at this point in the history
  • Loading branch information
nbbeeken committed Jan 14, 2025
1 parent 6af545c commit d1b4562
Show file tree
Hide file tree
Showing 6 changed files with 343 additions and 44 deletions.
12 changes: 10 additions & 2 deletions src/client-side-encryption/auto_encrypter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { kDecorateResult } from '../constants';
import { getMongoDBClientEncryption } from '../deps';
import { MongoRuntimeError } from '../error';
import { MongoClient, type MongoClientOptions } from '../mongo_client';
import { type Abortable } from '../mongo_types';
import { MongoDBCollectionNamespace } from '../utils';
import { autoSelectSocketOptions } from './client_encryption';
import * as cryptoCallbacks from './crypto_callbacks';
Expand Down Expand Up @@ -372,8 +373,10 @@ export class AutoEncrypter {
async encrypt(
ns: string,
cmd: Document,
options: CommandOptions = {}
options: CommandOptions & Abortable = {}
): Promise<Document | Uint8Array> {
options.signal?.throwIfAborted();

if (this._bypassEncryption) {
// If `bypassAutoEncryption` has been specified, don't encrypt
return cmd;
Expand Down Expand Up @@ -407,7 +410,12 @@ export class AutoEncrypter {
/**
* Decrypt a command response
*/
async decrypt(response: Uint8Array, options: CommandOptions = {}): Promise<Uint8Array> {
async decrypt(
response: Uint8Array,
options: CommandOptions & Abortable = {}
): Promise<Uint8Array> {
options.signal?.throwIfAborted();

const context = this._mongocrypt.makeDecryptionContext(response);

context.id = this._contextCounter++;
Expand Down
1 change: 1 addition & 0 deletions src/client-side-encryption/state_machine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ export class StateMachine {
let result: Uint8Array | null = null;

while (context.state !== MONGOCRYPT_CTX_DONE && context.state !== MONGOCRYPT_CTX_ERROR) {
options.signal?.throwIfAborted();
debug(`[context#${context.id}] ${stateToString.get(context.state) || context.state}`);

switch (context.state) {
Expand Down
5 changes: 4 additions & 1 deletion src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,10 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
);
}

for await (const response of this.readMany({ timeoutContext: options.timeoutContext })) {
for await (const response of this.readMany({
timeoutContext: options.timeoutContext,
signal: options.signal
})) {
this.socket.setTimeout(0);
const bson = response.parse();

Expand Down
2 changes: 2 additions & 0 deletions src/cmap/wire_protocol/on_data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ export function onData(
emitter: EventEmitter,
{ timeoutContext, signal }: { timeoutContext?: TimeoutContext } & Abortable
) {
signal?.throwIfAborted();

// Setup pending events and pending promise lists
/**
* When the caller has not yet called .next(), we store the
Expand Down
2 changes: 2 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1351,6 +1351,8 @@ export const randomBytes = promisify(crypto.randomBytes);
* @param name - An event name to wait for
*/
export async function once<T>(ee: EventEmitter, name: string, options?: Abortable): Promise<T> {
options?.signal?.throwIfAborted();

const { promise, resolve, reject } = promiseWithResolvers<T>();
const onEvent = (data: T) => resolve(data);
const onError = (error: Error) => reject(error);
Expand Down
Loading

0 comments on commit d1b4562

Please sign in to comment.