Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
nitely committed Dec 20, 2024
1 parent 3e8f168 commit 3b78dad
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 64 deletions.
22 changes: 10 additions & 12 deletions src/hyperx/limiter.nim
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ import std/asyncdispatch
import ./utils
import ./errors

template fut[T](f: FutureVar[T]): Future[T] = Future[T](f)

type LimiterAsyncClosedError* = QueueClosedError

func newLimiterAsyncClosedError(): ref LimiterAsyncClosedError {.raises: [].} =
Expand All @@ -13,25 +11,23 @@ func newLimiterAsyncClosedError(): ref LimiterAsyncClosedError {.raises: [].} =
type LimiterAsync* = ref object
## Async concurrency limiter.
used, size: int
waiter: FutureVar[void]
waiter: Future[void]
wakingUp: bool
isClosed: bool

func newLimiter*(size: int): LimiterAsync {.raises: [].} =
doAssert size > 0
{.cast(noSideEffect).}:
let waiter = newFutureVar[void]()
untrackExceptions:
waiter.complete()
LimiterAsync(
used: 0,
size: size,
waiter: waiter,
waiter: nil,
wakingUp: false,
isClosed: false
)

proc wakeup(lt: LimiterAsync) {.raises: [].} =
if lt.waiter == nil:
return
if lt.waiter.finished:
return
proc wakeup =
Expand Down Expand Up @@ -63,18 +59,20 @@ proc isEmpty*(lt: LimiterAsync): bool {.raises: [].} =
proc wait*(lt: LimiterAsync): Future[void] {.raises: [LimiterAsyncClosedError].} =
doAssert lt.used > 0
doAssert lt.used <= lt.size
doAssert lt.waiter.finished
doAssert lt.waiter == nil or lt.waiter.finished
check not lt.isClosed, newLimiterAsyncClosedError()
lt.waiter.clean()
return lt.waiter.fut
lt.waiter = newFuture[void]()
return lt.waiter

proc failSoon(lt: LimiterAsync) {.raises: [].} =
if lt.waiter == nil:
return
if lt.waiter.finished:
return
proc wakeup =
lt.wakingUp = false
if not lt.waiter.finished:
lt.waiter.fut.fail newLimiterAsyncClosedError()
lt.waiter.fail newLimiterAsyncClosedError()
if not lt.wakingUp:
lt.wakingUp = true
untrackExceptions:
Expand Down
62 changes: 31 additions & 31 deletions src/hyperx/queue.nim
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ import std/deques
import ./utils
import ./errors

template fut[T](f: FutureVar[T]): Future[T] = Future[T](f)

func newQueueClosedError(): ref QueueClosedError {.raises: [].} =
result = (ref QueueClosedError)(msg: "Queue is closed")

Expand All @@ -15,27 +13,21 @@ type
QueueAsync*[T] = ref object
s: Deque[T]
size: int
putWaiter, popWaiter: FutureVar[void]
putWaiter, popWaiter: Future[void]
wakingPut, wakingPop: bool
isClosed: bool

func newQueue*[T](size: int): QueueAsync[T] {.raises: [].} =
doAssert size > 0
{.cast(noSideEffect).}:
let putWaiter = newFutureVar[void]()
let popWaiter = newFutureVar[void]()
untrackExceptions:
putWaiter.complete()
popWaiter.complete()
QueueAsync[T](
s: initDeque[T](size),
size: size,
putWaiter: putWaiter,
popWaiter: popWaiter,
wakingPut: false,
wakingPop: false,
isClosed: false
)
QueueAsync[T](
s: initDeque[T](size),
size: size,
putWaiter: nil,
popWaiter: nil,
wakingPut: false,
wakingPop: false,
isClosed: false
)

iterator items*[T](q: QueueAsync[T]): T {.inline.} =
for elm in items q.s:
Expand All @@ -45,6 +37,8 @@ func used[T](q: QueueAsync[T]): int {.raises: [].} =
q.s.len

proc wakeupPop[T](q: QueueAsync[T]) {.raises: [].} =
if q.popWaiter == nil:
return
if q.popWaiter.finished:
return
proc wakeup =
Expand All @@ -59,16 +53,18 @@ proc wakeupPop[T](q: QueueAsync[T]) {.raises: [].} =
proc put*[T](q: QueueAsync[T], v: T) {.async.} =
doAssert q.used <= q.size
check not q.isClosed, newQueueClosedError()
doAssert q.putWaiter.finished
doAssert q.putWaiter == nil or q.putWaiter.finished
if q.used == q.size:
q.putWaiter.clean()
await q.putWaiter.fut
q.putWaiter = newFuture[void]()
await q.putWaiter
check not q.isClosed, newQueueClosedError()
doAssert q.used < q.size
q.s.addFirst v
q.wakeupPop()

proc wakeupPut[T](q: QueueAsync[T]) {.raises: [].} =
if q.putWaiter == nil:
return
if q.putWaiter.finished:
return
proc wakeup =
Expand All @@ -83,10 +79,10 @@ proc wakeupPut[T](q: QueueAsync[T]) {.raises: [].} =
proc pop*[T](q: QueueAsync[T]): Future[T] {.async.} =
doAssert q.used >= 0
check not q.isClosed, newQueueClosedError()
doAssert q.popWaiter.finished
doAssert q.popWaiter == nil or q.popWaiter.finished
if q.used == 0:
q.popWaiter.clean()
await q.popWaiter.fut
q.popWaiter = newFuture[void]()
await q.popWaiter
check not q.isClosed, newQueueClosedError()
doAssert q.used > 0
result = q.s.popLast()
Expand All @@ -95,17 +91,21 @@ proc pop*[T](q: QueueAsync[T]): Future[T] {.async.} =
func isClosed*[T](q: QueueAsync[T]): bool {.raises: [].} =
q.isClosed

proc failSoon(f: Future[void]) {.raises: [].} =
if f == nil:
return
proc wakeup =
if not f.finished:
f.fail newQueueClosedError()
untrackExceptions:
callSoon wakeup

proc close*[T](q: QueueAsync[T]) {.raises: [].} =
if q.isClosed:
return
q.isClosed = true
proc failWaiters =
if not q.putWaiter.finished:
q.putWaiter.fut.fail newQueueClosedError()
if not q.popWaiter.finished:
q.popWaiter.fut.fail newQueueClosedError()
untrackExceptions:
callSoon failWaiters
failSoon q.putWaiter
failSoon q.popWaiter

when isMainModule:
proc sleepCycle: Future[void] =
Expand Down
1 change: 0 additions & 1 deletion src/hyperx/signal.nim
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ func newSignalClosedError(): ref SignalClosedError {.raises: [].} =
type
SignalAsync* = ref object
## Wait for a signal. When triggers wakes everyone up
# XXX use/reuse FutureVars
waiters: Deque[Future[void]]
isClosed: bool

Expand Down
36 changes: 16 additions & 20 deletions src/hyperx/value.nim
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ import std/asyncdispatch
import ./utils
import ./errors

template fut[T](f: FutureVar[T]): Future[T] = Future[T](f)

type
ValueAsyncClosedError* = QueueClosedError

Expand All @@ -13,25 +11,21 @@ func newValueAsyncClosedError(): ref ValueAsyncClosedError {.raises: [].} =

type
ValueAsync*[T] = ref object
putWaiter, getWaiter: FutureVar[void]
putWaiter, getWaiter: Future[void]
val: T
isClosed: bool

func newValueAsync*[T](): ValueAsync[T] {.raises: [].} =
{.cast(noSideEffect).}:
let putWaiter = newFutureVar[void]()
let getWaiter = newFutureVar[void]()
untrackExceptions:
putWaiter.complete()
getWaiter.complete()
ValueAsync[T](
putWaiter: putWaiter,
getWaiter: getWaiter,
putWaiter: nil,
getWaiter: nil,
val: nil,
isClosed: false
)

proc wakeupSoon(f: Future[void]) {.raises: [].} =
if f == nil:
return
if f.finished:
return
proc wakeup =
Expand All @@ -45,24 +39,26 @@ proc put*[T](vala: ValueAsync[T], val: T) {.async.} =
doAssert val != nil
doAssert vala.val == nil
vala.val = val
wakeupSoon vala.getWaiter.fut
doAssert vala.putWaiter.finished
vala.putWaiter.clean()
await vala.putWaiter.fut
wakeupSoon vala.getWaiter
doAssert vala.putWaiter == nil or vala.putWaiter.finished
vala.putWaiter = newFuture[void]()
await vala.putWaiter
doAssert vala.val == nil

proc get*[T](vala: ValueAsync[T]): Future[T] {.async.} =
check not vala.isClosed, newValueAsyncClosedError()
doAssert vala.getWaiter.finished
doAssert vala.getWaiter == nil or vala.getWaiter.finished
if vala.val == nil:
vala.getWaiter.clean()
await vala.getWaiter.fut
vala.getWaiter = newFuture[void]()
await vala.getWaiter
doAssert vala.val != nil
result = vala.val
vala.val = nil
wakeupSoon vala.putWaiter.fut

proc failSoon(f: Future[void]) {.raises: [].} =
if f == nil:
return
if f.finished:
return
proc wakeup =
Expand All @@ -75,8 +71,8 @@ proc close*[T](vala: ValueAsync[T]) {.raises: [].} =
if vala.isClosed:
return
vala.isClosed = true
failSoon vala.putWaiter.fut
failSoon vala.getWaiter.fut
failSoon vala.putWaiter
failSoon vala.getWaiter

when isMainModule:
func newIntRef(n: int): ref int =
Expand Down

0 comments on commit 3b78dad

Please sign in to comment.