Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose ProcessPromise stage #1077

Merged
merged 14 commits into from
Jan 11, 2025
Merged
4 changes: 2 additions & 2 deletions .size-limit.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
{
"name": "dts libdefs",
"path": "build/*.d.ts",
"limit": "38.1 kB",
"limit": "38.7 kB",
"brotli": false,
"gzip": false
},
Expand All @@ -30,7 +30,7 @@
{
"name": "all",
"path": "build/*",
"limit": "847.5 kB",
"limit": "849 kB",
"brotli": false,
"gzip": false
}
Expand Down
12 changes: 12 additions & 0 deletions docs/process-promise.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,18 @@ const p = $({halt: true})`command`
const o = await p.run()
```

## `stage`

Shows the current process stage: `initial` | `halted` | `running` | `fulfilled` | `rejected`

```ts
const p = $`echo foo`
p.stage // 'running'
await p
p.stage // 'fulfilled'
```


## `stdin`

Returns a writable stream of the stdin process. Accessing
Expand Down
45 changes: 30 additions & 15 deletions src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ export const $: Shell & Options = new Proxy<Shell & Options>(
},
}
)
/**
* State machine stages
*/
type ProcessStage = 'initial' | 'halted' | 'running' | 'fulfilled' | 'rejected'

type Resolve = (out: ProcessOutput) => void

Expand All @@ -214,6 +218,7 @@ type PipeMethod = {
}

export class ProcessPromise extends Promise<ProcessOutput> {
private _stage: ProcessStage = 'initial'
private _id = randomId()
private _command = ''
private _from = ''
Expand All @@ -225,11 +230,8 @@ export class ProcessPromise extends Promise<ProcessOutput> {
private _timeout?: number
private _timeoutSignal?: NodeJS.Signals
private _timeoutId?: NodeJS.Timeout
private _resolved = false
private _halted?: boolean
private _piped = false
private _pipedFrom?: ProcessPromise
private _run = false
private _ee = new EventEmitter()
private _stdin = new VoidStream()
private _zurk: ReturnType<typeof exec> | null = null
Expand All @@ -249,12 +251,12 @@ export class ProcessPromise extends Promise<ProcessOutput> {
this._resolve = resolve
this._reject = reject
this._snapshot = { ac: new AbortController(), ...options }
if (this._snapshot.halt) this._stage = 'halted'
}

run(): ProcessPromise {
if (this._run) return this // The _run() can be called from a few places.
this._halted = false
this._run = true
if (this.isRunning() || this.isSettled()) return this // The _run() can be called from a few places.
this._stage = 'running'
this._pipedFrom?.run()

const self = this
Expand Down Expand Up @@ -310,7 +312,6 @@ export class ProcessPromise extends Promise<ProcessOutput> {
$.log({ kind: 'stderr', data, verbose: !self.isQuiet(), id })
},
end: (data, c) => {
self._resolved = true
const { error, status, signal, duration, ctx } = data
const { stdout, stderr, stdall } = ctx.store
const dto: ProcessOutputLazyDto = {
Expand Down Expand Up @@ -341,8 +342,10 @@ export class ProcessPromise extends Promise<ProcessOutput> {
const output = self._output = new ProcessOutput(dto)

if (error || status !== 0 && !self.isNothrow()) {
self._stage = 'rejected'
self._reject(output)
} else {
self._stage = 'fulfilled'
self._resolve(output)
}
},
Expand Down Expand Up @@ -388,9 +391,9 @@ export class ProcessPromise extends Promise<ProcessOutput> {
for (const chunk of this._zurk!.store[source]) from.write(chunk)
return true
}
const fillEnd = () => this._resolved && fill() && from.end()
const fillEnd = () => this.isSettled() && fill() && from.end()

if (!this._resolved) {
if (!this.isSettled()) {
const onData = (chunk: string | Buffer) => from.write(chunk)
ee.once(source, () => {
fill()
Expand Down Expand Up @@ -495,6 +498,10 @@ export class ProcessPromise extends Promise<ProcessOutput> {
return this._output
}

get stage(): ProcessStage {
return this._stage
}

// Configurators
stdio(
stdin: IOType,
Expand Down Expand Up @@ -524,13 +531,13 @@ export class ProcessPromise extends Promise<ProcessOutput> {
d: Duration,
signal = this._timeoutSignal || $.timeoutSignal
): ProcessPromise {
if (this._resolved) return this
if (this.isSettled()) return this

this._timeout = parseDuration(d)
this._timeoutSignal = signal

if (this._timeoutId) clearTimeout(this._timeoutId)
if (this._timeout && this._run) {
if (this._timeout && this.isRunning()) {
this._timeoutId = setTimeout(
() => this.kill(this._timeoutSignal),
this._timeout
Expand Down Expand Up @@ -562,10 +569,6 @@ export class ProcessPromise extends Promise<ProcessOutput> {
}

// Status checkers
isHalted(): boolean {
return this._halted ?? this._snapshot.halt ?? false
}

isQuiet(): boolean {
return this._quiet ?? this._snapshot.quiet
}
Expand All @@ -578,6 +581,18 @@ export class ProcessPromise extends Promise<ProcessOutput> {
return this._nothrow ?? this._snapshot.nothrow
}

isHalted(): boolean {
return this.stage === 'halted'
}

private isSettled(): boolean {
return !!this.output
}

private isRunning(): boolean {
return this.stage === 'running'
}

// Promise API
then<R = ProcessOutput, E = ProcessOutput>(
onfulfilled?:
Expand Down
74 changes: 68 additions & 6 deletions test/core.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { basename } from 'node:path'
import { WriteStream } from 'node:fs'
import { Readable, Transform, Writable } from 'node:stream'
import { Socket } from 'node:net'
import { ChildProcess } from 'node:child_process'
import {
$,
ProcessPromise,
Expand All @@ -42,6 +43,7 @@ import {
which,
nothrow,
} from '../build/index.js'
import { noop } from '../build/util.js'

describe('core', () => {
describe('resolveDefaults()', () => {
Expand Down Expand Up @@ -392,6 +394,72 @@ describe('core', () => {
})

describe('ProcessPromise', () => {
test('getters', async () => {
const p = $`echo foo`
assert.ok(p.pid > 0)
assert.ok(typeof p.id === 'string')
assert.ok(typeof p.cmd === 'string')
assert.ok(typeof p.fullCmd === 'string')
assert.ok(typeof p.stage === 'string')
assert.ok(p.child instanceof ChildProcess)
assert.ok(p.stdout instanceof Socket)
assert.ok(p.stderr instanceof Socket)
assert.ok(p.exitCode instanceof Promise)
assert.ok(p.signal instanceof AbortSignal)
assert.equal(p.output, null)

await p
assert.ok(p.output instanceof ProcessOutput)
})

describe('state machine transitions', () => {
it('running > fulfilled', async () => {
const p = $`echo foo`
assert.equal(p.stage, 'running')
await p
assert.equal(p.stage, 'fulfilled')
})

it('running > rejected', async () => {
const p = $`foo`
assert.equal(p.stage, 'running')

try {
await p
} catch {}
assert.equal(p.stage, 'rejected')
})

it('halted > running > fulfilled', async () => {
const p = $({ halt: true })`echo foo`
assert.equal(p.stage, 'halted')
p.run()
assert.equal(p.stage, 'running')
await p
assert.equal(p.stage, 'fulfilled')
})

it('all transition', async () => {
const { promise, resolve, reject } = Promise.withResolvers()
const process = new ProcessPromise(noop, noop)

assert.equal(process.stage, 'initial')
process._bind('echo foo', 'test', resolve, reject, {
...resolveDefaults(),
halt: true,
})

assert.equal(process.stage, 'halted')
process.run()

assert.equal(process.stage, 'running')
await promise

assert.equal(process.stage, 'fulfilled')
assert.equal(process.output?.stdout, 'foo\n')
})
})

test('inherits native Promise', async () => {
const p1 = $`echo 1`
const p2 = p1.then((v) => v)
Expand Down Expand Up @@ -424,12 +492,6 @@ describe('core', () => {
assert.equal(p.fullCmd, "set -euo pipefail;echo $'#bar' --t 1")
})

test('exposes pid & id', () => {
const p = $`echo foo`
assert.ok(p.pid > 0)
assert.ok(typeof p.id === 'string')
})

test('stdio() works', async () => {
const p1 = $`printf foo`
await p1
Expand Down
Loading