diff --git a/package-lock.json b/package-lock.json index 8766cc44b6..9245bdf46f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1592,6 +1592,15 @@ } } }, + "@matrixai/async-locks": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/@matrixai/async-locks/-/async-locks-1.0.0.tgz", + "integrity": "sha512-Tij4LzmSczrEobzOFE0iRSEgR+cr76hgVzqJcEpm9wZgZqa1jrj1TOTlQ+4a53Vhikj8LH4NEHd5d/L+PNV+dA==", + "requires": { + "@matrixai/resources": "^1.0.0", + "async-mutex": "^0.3.2" + } + }, "@matrixai/db": { "version": "3.0.0", "resolved": "https://registry.npmjs.org/@matrixai/db/-/db-3.0.0.tgz", diff --git a/package.json b/package.json index 140c7d8642..266ed142fc 100644 --- a/package.json +++ b/package.json @@ -73,13 +73,13 @@ "dependencies": { "@grpc/grpc-js": "1.3.7", "@matrixai/async-init": "^1.6.0", + "@matrixai/async-locks": "^1.0.0", "@matrixai/db": "^3.0.0", "@matrixai/id": "^3.3.2", "@matrixai/logger": "^2.1.0", "@matrixai/resources": "^1.0.0", "@matrixai/workers": "^1.2.5", "ajv": "^7.0.4", - "async-mutex": "^0.3.2", "bip39": "^3.0.3", "canonicalize": "^1.0.5", "cheerio": "^1.0.0-rc.5", diff --git a/src/utils/context.ts b/src/utils/context.ts deleted file mode 100644 index d4102debc0..0000000000 --- a/src/utils/context.ts +++ /dev/null @@ -1,75 +0,0 @@ -type ResourceAcquire = () => Promise< - readonly [ResourceRelease, Resource?] ->; - -type ResourceRelease = () => Promise; - -type Resources[]> = { - [K in keyof T]: T[K] extends ResourceAcquire ? R : never; -}; - -/** - * Make sure to explicitly declare or cast `acquires` as a tuple using `[ResourceAcquire...]` or `as const` - */ -async function withF< - ResourceAcquires extends - | readonly [ResourceAcquire] - | readonly ResourceAcquire[], - T, ->( - acquires: ResourceAcquires, - f: (resources: Resources) => Promise, -): Promise { - const releases: Array = []; - const resources: Array = []; - try { - for (const acquire of acquires) { - const [release, resource] = await acquire(); - releases.push(release); - resources.push(resource); - } - return await f(resources as unknown as Resources); - } finally { - releases.reverse(); - for (const release of releases) { - await release(); - } - } -} - -/** - * Make sure to explicitly declare or cast `acquires` as a tuple using `[ResourceAcquire...]` or `as const` - */ -async function* withG< - ResourceAcquires extends - | readonly [ResourceAcquire] - | readonly ResourceAcquire[], - T = unknown, - TReturn = any, - TNext = unknown, ->( - acquires: ResourceAcquires, - g: ( - resources: Resources, - ) => AsyncGenerator, -): AsyncGenerator { - const releases: Array = []; - const resources: Array = []; - try { - for (const acquire of acquires) { - const [release, resource] = await acquire(); - releases.push(release); - resources.push(resource); - } - return yield* g(resources as unknown as Resources); - } finally { - releases.reverse(); - for (const release of releases) { - await release(); - } - } -} - -export { withF, withG }; - -export type { ResourceAcquire, ResourceRelease }; diff --git a/src/utils/index.ts b/src/utils/index.ts index cbb38a8bef..f50908acab 100644 --- a/src/utils/index.ts +++ b/src/utils/index.ts @@ -1,6 +1,4 @@ export { default as sysexits } from './sysexits'; -export * from './locks'; -export * from './context'; export * from './utils'; export * from './matchers'; export * from './binary'; diff --git a/src/utils/locks.ts b/src/utils/locks.ts deleted file mode 100644 index eb6f952450..0000000000 --- a/src/utils/locks.ts +++ /dev/null @@ -1,85 +0,0 @@ -import type { MutexInterface } from 'async-mutex'; -import { Mutex } from 'async-mutex'; - -/** - * Single threaded write-preferring read write lock - */ -class RWLock { - protected readersLock: Mutex = new Mutex(); - protected writersLock: Mutex = new Mutex(); - protected readersRelease: MutexInterface.Releaser; - protected readerCountBlocked: number = 0; - protected _readerCount: number = 0; - protected _writerCount: number = 0; - - public get readerCount(): number { - return this._readerCount + this.readerCountBlocked; - } - - public get writerCount(): number { - return this._writerCount; - } - - public async withRead(f: () => Promise): Promise { - const release = await this.acquireRead(); - try { - return await f(); - } finally { - release(); - } - } - - public async withWrite(f: () => Promise): Promise { - const release = await this.acquireWrite(); - try { - return await f(); - } finally { - release(); - } - } - - public async acquireRead(): Promise<() => void> { - if (this._writerCount > 0) { - ++this.readerCountBlocked; - await this.writersLock.waitForUnlock(); - --this.readerCountBlocked; - } - const readerCount = ++this._readerCount; - // The first reader locks - if (readerCount === 1) { - this.readersRelease = await this.readersLock.acquire(); - } - return () => { - const readerCount = --this._readerCount; - // The last reader unlocks - if (readerCount === 0) { - this.readersRelease(); - } - }; - } - - public async acquireWrite(): Promise<() => void> { - ++this._writerCount; - const writersRelease = await this.writersLock.acquire(); - this.readersRelease = await this.readersLock.acquire(); - return () => { - this.readersRelease(); - writersRelease(); - --this._writerCount; - }; - } - - public isLocked(): boolean { - return this.readersLock.isLocked() || this.writersLock.isLocked(); - } - - public async waitForUnlock(): Promise { - await Promise.all([ - this.readersLock.waitForUnlock(), - this.writersLock.waitForUnlock(), - ]); - return; - } -} - -export { RWLock }; diff --git a/tests/utils.test.ts b/tests/utils.test.ts index 5f6ee891e0..1896fbedcc 100644 --- a/tests/utils.test.ts +++ b/tests/utils.test.ts @@ -1,6 +1,4 @@ -import type { ResourceAcquire } from '@/utils'; import os from 'os'; -import { Mutex } from 'async-mutex'; import * as utils from '@/utils'; describe('utils', () => { @@ -15,247 +13,4 @@ describe('utils', () => { expect(p).toBe(`${homeDir}/AppData/Local/polykey`); } }); - test('withF resource context', async () => { - // No resources - const result1 = await utils.withF([], async () => { - return 'bar'; - }); - expect(result1).toBe('bar'); - // Noop resource - const result2 = await utils.withF( - [ - async () => { - return [async () => {}]; - }, - ], - async () => { - return 'foo'; - }, - ); - expect(result2).toBe('foo'); - // Counter resource - let counter = 1; - const result3 = await utils.withF( - [ - async () => { - counter++; - return [ - async () => { - counter--; - }, - counter, - ]; - }, - ], - async ([c]) => { - expect(c).toBe(2); - return c / 2; - }, - ); - expect(result3).toBe(1); - expect(counter).toBe(1); - // Multiple resources - const result4 = await utils.withF( - [ - async () => { - return [async () => {}, 123]; - }, - async () => { - return [async () => {}]; - }, - async () => { - return [async () => {}, 'hello world']; - }, - ], - async ([n, u, s]) => { - expect(u).toBe(undefined); - return [n, s]; - }, - ); - expect(result4).toStrictEqual([123, 'hello world']); - // Multiple resources, but only take the first - const result5 = await utils.withF( - [ - async () => { - return [async () => {}, 123]; - }, - async () => { - return [async () => {}]; - }, - async () => { - return [async () => {}, 'hello world']; - }, - ], - async ([n]) => { - return n; - }, - ); - expect(result5).toBe(123); - // Multiple resources outside requires type declaration - const resourceAcquires6: [ - ResourceAcquire, - ResourceAcquire, - ResourceAcquire, - ] = [ - async () => { - return [async () => {}, 123]; - }, - async () => { - return [async () => {}]; - }, - async () => { - return [async () => {}, 'hello world']; - }, - ]; - const result6 = await utils.withF(resourceAcquires6, async ([n, u, s]) => { - expect(u).toBe(undefined); - return [n, s]; - }); - expect(result6).toStrictEqual([123, 'hello world']); - // Multiple resources outside can also use const - const resourceAcquires7 = [ - async () => { - return [async () => {}, 123] as const; - }, - async () => { - return [async () => {}] as const; - }, - async () => { - return [async () => {}, 'hello world'] as const; - }, - ] as const; - const result7 = await utils.withF(resourceAcquires7, async ([n, u, s]) => { - expect(u).toBe(undefined); - return [n, s]; - }); - expect(result7).toStrictEqual([123, 'hello world']); - // It must be given a explicit type, or `as const` can be used internally - const acquire8: ResourceAcquire = async () => { - return [async () => {}, 123]; - }; - const result8 = await utils.withF([acquire8], async () => { - return 'done'; - }); - expect(result8).toBe('done'); - const acquire9 = async () => { - return [async () => {}, 123] as const; - }; - const result9 = await utils.withF([acquire9], async () => { - return 'done'; - }); - expect(result9).toBe('done'); - // Order of acquisition is left to right - // Order of release is right ot left - const lock1 = new Mutex(); - const lock2 = new Mutex(); - const acquireOrder: Array = []; - const releaseOrder: Array = []; - await utils.withF( - [ - async () => { - const release = await lock1.acquire(); - acquireOrder.push(lock1); - return [ - async () => { - releaseOrder.push(lock1); - release(); - }, - lock1, - ]; - }, - async () => { - const release = await lock2.acquire(); - acquireOrder.push(lock2); - return [ - async () => { - releaseOrder.push(lock2); - release(); - }, - lock2, - ]; - }, - ], - async ([l1, l2]) => { - expect(l1.isLocked()).toBe(true); - expect(l2.isLocked()).toBe(true); - }, - ); - expect(acquireOrder).toStrictEqual([lock1, lock2]); - expect(releaseOrder).toStrictEqual([lock2, lock1]); - }); - test('withG resource context', async () => { - // No resources - const g1 = utils.withG([], async function* () { - yield 'first'; - yield 'second'; - return 'last'; - }); - expect(await g1.next()).toStrictEqual({ value: 'first', done: false }); - expect(await g1.next()).toStrictEqual({ value: 'second', done: false }); - expect(await g1.next()).toStrictEqual({ value: 'last', done: true }); - // Noop resource - const g2 = await utils.withG( - [ - async () => { - return [async () => {}]; - }, - ], - async function* () { - yield 'first'; - return 'last'; - }, - ); - expect(await g2.next()).toStrictEqual({ value: 'first', done: false }); - expect(await g2.next()).toStrictEqual({ value: 'last', done: true }); - // Order of acquisition is left to right - // Order of release is right ot left - const lock1 = new Mutex(); - const lock2 = new Mutex(); - const acquireOrder: Array = []; - const releaseOrder: Array = []; - const g3 = utils.withG( - [ - async () => { - const release = await lock1.acquire(); - acquireOrder.push(lock1); - return [ - async () => { - releaseOrder.push(lock1); - release(); - }, - lock1, - ]; - }, - async () => { - const release = await lock2.acquire(); - acquireOrder.push(lock2); - return [ - async () => { - releaseOrder.push(lock2); - release(); - }, - lock2, - ]; - }, - ], - async function* ([l1, l2]) { - expect(l1.isLocked()).toBe(true); - expect(l2.isLocked()).toBe(true); - yield 'first'; - yield 'second'; - return 'last'; - }, - ); - expect(await g3.next()).toStrictEqual({ value: 'first', done: false }); - expect(lock1.isLocked()).toBe(true); - expect(lock2.isLocked()).toBe(true); - expect(await g3.next()).toStrictEqual({ value: 'second', done: false }); - expect(lock1.isLocked()).toBe(true); - expect(lock2.isLocked()).toBe(true); - expect(await g3.next()).toStrictEqual({ value: 'last', done: true }); - expect(lock1.isLocked()).toBe(false); - expect(lock2.isLocked()).toBe(false); - expect(acquireOrder).toStrictEqual([lock1, lock2]); - expect(releaseOrder).toStrictEqual([lock2, lock1]); - }); });