Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ':binding' support #55

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@ pom.xml.asc
/.lumo_cache/
.idea/
*.iml
.clj-kondo/.cache/
.lsp/
39 changes: 34 additions & 5 deletions src/sieppari/async.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,17 @@
java.util.concurrent.CompletionException
java.util.function.Function)))

#?(:clj
(defn -forward-bindings
([f]
(fn [ctx]
(with-bindings (or (:bindings ctx) {})
((bound-fn* f) ctx))))
([f ctx]
(with-bindings (or (:bindings ctx) {})
((bound-fn* f) ctx)))))


(defprotocol AsyncContext
(async? [t])
(continue [t f])
Expand All @@ -21,32 +32,50 @@
(extend-protocol AsyncContext
Object
(async? [_] false)
; Given the implementation of enter/leave,
; `continue` won't be called, and therefore,
; the function call does not need to be bound
; here.
(continue [t f] (f t))
(await [t] t)))

#?(:cljs
(extend-protocol AsyncContext
default
(async? [_] false)
; Given the implementation of enter/leave,
; `continue` won't be called, and therefore,
; the function call does not need to be bound
; here.
(continue [t f] (f t))))

#?(:clj
(extend-protocol AsyncContext
clojure.lang.IDeref
(async? [_] true)
(continue [c f] (future (f @c)))
(catch [c f] (future (let [c @c]
(if (exception? c) (f c) c))))
(continue [c f]
(future ((-forward-bindings f) @c)))
(catch [c f]
(future
(let [c @c]
(if (exception? c) ((-forward-bindings f) c) c))))
(await [c] @c)))

#?(:clj
(extend-protocol AsyncContext
CompletionStage
(async? [_] true)
(continue [this f]
; f may a callable value (i.e. a promise), which doesn't look to
; play nicely with with-binding/bound-fn*.
;
; Therefore, only forwarding bindings when value is
; a function and not some other type.
(.thenApply ^CompletionStage this
^Function (->FunctionWrapper f)))

^Function (->FunctionWrapper
(cond-> f
(fn? f)
(-forward-bindings)))))
(catch [this f]
(letfn [(handler [e]
(if (instance? CompletionException e)
Expand Down
6 changes: 5 additions & 1 deletion src/sieppari/async/core_async.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
#?(:clj clojure.core.async.impl.protocols.Channel
:cljs cljs.core.async.impl.channels/ManyToManyChannel)
(async? [_] true)
(continue [c f] (go (f (cca/<! c))))
#?(:clj (continue [c f]
(let [f' (sa/-forward-bindings f)]
(go
(f' (cca/<! c)))))
:cljs (continue [c f] (go (f (cca/<! c)))))
(catch [c f] (go (let [c (cca/<! c)]
(if (exception? c) (f c) c))))
#?(:clj (await [c] (<!! c))))
24 changes: 20 additions & 4 deletions src/sieppari/async/manifold.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,31 @@
(:require [sieppari.async :as sa]
[manifold.deferred :as d]))

; chain'-, as is being used here, is chain'-/3
; chain'-/3, at the time of writing, has an arglist of:
; [d x f]
; where:
; - `d` is a non-realized manifold deferred value, or nil
; to signal a deferred should be returned/provided
; - `x` is either a deferred or a value. If it is a deferred,
; then the deferred is recurively realized until a non-deferred value
; is yeilded.
; - `f` is a function applied to the unwrapped value `x`, before being either realized
; into `d` or being returned as a sucess or error deferred, depending on the result
; of `(f x)`.
(extend-protocol sa/AsyncContext
manifold.deferred.Deferred
(async? [_] true)
(continue [d f] (d/chain'- nil d f))
(catch [d f] (d/catch' d f))
(continue [d f]
(d/chain'- nil d (sa/-forward-bindings f)))
(catch [d f]
(d/catch' d (sa/-forward-bindings f)))
(await [d] (deref d))

manifold.deferred.ErrorDeferred
(async? [_] true)
(continue [d f] (d/chain'- nil d f))
(catch [d f] (d/catch' d f))
(continue [d f]
(d/chain'- nil d (sa/-forward-bindings f)))
(catch [d f]
(d/catch' d (sa/-forward-bindings f)))
(await [d] (deref d)))
11 changes: 9 additions & 2 deletions src/sieppari/core.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,17 @@
c/Context
(context? [_] true))


(defn- -try [ctx f]
(if f
(try
(let [ctx* (f ctx)]
(let [ctx* #?(:clj (with-bindings (or (:bindings ctx) {})
; Given the various async
; executors may exec on different threads,
; the fn must be bound in order to preserve
; bindings
((bound-fn* f) ctx))
:cljs (f ctx))]
(if (a/async? ctx*)
(a/catch ctx* (fn [e] (assoc ctx :error e)))
ctx*))
Expand Down Expand Up @@ -79,7 +86,7 @@
(callback result))))

(defn- remove-context-keys [ctx]
(dissoc ctx :error :queue :stack))
(dissoc ctx :bindings :error :queue :stack))

;;
;; Public API:
Expand Down
33 changes: 33 additions & 0 deletions test/clj/sieppari/core_async_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -246,3 +246,36 @@
[:error :c]
[:error :b]
[:leave :a]])))

(def ^:dynamic *boundv* 41)

(defn bindings-handler [_]
(is (= 43 *boundv*))
(go
*boundv*))

(def bindings-chain
[{:enter (fn [ctx]
(go
(assoc ctx
:bindings
{#'*boundv* 42})))
:leave (fn [ctx]
(go
(is (= 42 *boundv*))
ctx))}
{:enter (fn [ctx]
(is (= 42 *boundv*)
"In interceptor failed")
(go
(update-in ctx [:bindings #'*boundv*] inc)))
:leave (fn [ctx]
(go
(update-in ctx [:bindings #'*boundv*] dec)))}

bindings-handler])

(deftest async-bindings-test
(fact "bindings are conveyed across interceptor chain"
(sc/execute bindings-chain {}) => 43))

20 changes: 20 additions & 0 deletions test/clj/sieppari/core_execute_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,26 @@
[:leave :x]
[:leave :a]]))


(def ^:dynamic *boundv* 41)

(defn bindings-handler [_]
(is (= 42 *boundv*))
*boundv*)

(def bindings-chain
[{:enter (fn [ctx] (assoc ctx
:bindings
{#'*boundv* 42}))}
{:enter (fn [ctx]
(is (= 42 *boundv*))
ctx)}
bindings-handler])

(deftest use-bindings-test
(fact "bindings are conveyed across interceptor chain"
(s/execute bindings-chain {}) => 42))

; TODO: figure out how enqueue should work? Should enqueue add interceptors just
; before the handler?
#_(deftest enqueue-interceptor-test
Expand Down
42 changes: 42 additions & 0 deletions test/clj/sieppari/manifold_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,45 @@
[:error :c]
[:error :b]
[:leave :a]])))

(def ^:dynamic *boundv* 41)

(defn bindings-handler [_]
(is (= 43 *boundv*))
(d/chain
nil
(fn [_]
*boundv*)))

(def bindings-chain
[{:enter (fn [ctx]
(d/future
(assoc ctx
:bindings
{#'*boundv* 42})))
:leave (fn [ctx]
(d/chain
ctx
(fn [ctx']
(is (= 42 *boundv*))
ctx')))}
{:enter (fn [ctx]
(is (= 42 *boundv*)
"In interceptor failed")
(d/chain
ctx
#(update-in
%
[:bindings #'*boundv*] inc)))
:leave (fn [ctx]
(d/chain
ctx
#(update-in
%
[:bindings #'*boundv*] dec)))}
bindings-handler])

(deftest async-bindings-test
(fact "bindings are conveyed across interceptor chain"
(sc/execute bindings-chain {}) => 43))

35 changes: 35 additions & 0 deletions test/clj/sieppari/promesa_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,38 @@
[:error :c]
[:error :b]
[:leave :a]])))

(def ^:dynamic *boundv* 41)

(defn bindings-handler [_]
(is (= 43 *boundv*))
(p/resolved
*boundv*))

(def bindings-chain
[{:enter (fn [ctx]
(p/resolved
(assoc ctx
:bindings
{#'*boundv* 42})))
:leave (fn [ctx]
(is (= 42 *boundv*))
ctx)}
{:enter (fn [ctx]
(is (= 42 *boundv*))
(-> ctx
(update-in [:bindings #'*boundv*]
inc)
(p/resolved)))
:leave (fn [ctx]
(is (= 43 *boundv*))
(-> ctx
(update-in [:bindings #'*boundv*]
dec)
(p/resolved)))}
bindings-handler])

(deftest async-bindings-test
(fact "bindings are conveyed across interceptor chain"
(sc/execute bindings-chain {}) => 43))

4 changes: 2 additions & 2 deletions test/cljc/sieppari/async/promesa_test.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
(is (as/async? (p/promise 1))))

#?(:clj
(deftest core-async-continue-cljs-callback-test
(deftest core-async-continue-clj-callback-test
(let [respond (promise)
p (p/create
(fn [resolve _]
Expand All @@ -26,7 +26,7 @@
(done))))))))

#?(:clj
(deftest core-async-catch-cljs-callback-test
(deftest core-async-catch-clj-callback-test
(let [respond (promise)
p (p/create
(fn [_ reject]
Expand Down