Skip to content

Commit

Permalink
Add ':binding' support
Browse files Browse the repository at this point in the history
Per #54 in metosin/seippari, this commit
could be considered a starting point for supporting
bindings support in clojure.

The initial implementation used the starting
point @robert-stuttaford posted in the referenced
issue.

A tradeoff here, is that in order to support sync/async
w/ the various runtimes, using (bound-fn*) seems to be necessary
given the different thread pool implemenations. Counsequently,
there is a a perf hit needing to push/pop thread local vars. It
doesn't seem to be too much in the general case, but with manifold,
it seems to a an order of magnitude slower, though still in the
microseconds.
  • Loading branch information
bnert committed Mar 31, 2024
1 parent bfc76d4 commit 6728643
Show file tree
Hide file tree
Showing 10 changed files with 204 additions and 14 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@ pom.xml.asc
/.lumo_cache/
.idea/
*.iml
.clj-kondo/.cache/
.lsp/
41 changes: 36 additions & 5 deletions src/sieppari/async.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,18 @@
java.util.concurrent.CompletionException
java.util.function.Function)))

#?(:clj
(defn -forward-bindings
([f]
(fn [ctx]
(println "fwd" ctx)
(with-bindings (or (:bindings ctx) {})
((bound-fn* f) ctx))))
([f ctx]
(with-bindings (or (:bindings ctx) {})
((bound-fn* f) ctx)))))


(defprotocol AsyncContext
(async? [t])
(continue [t f])
Expand All @@ -15,38 +27,57 @@
(deftype FunctionWrapper [f]
Function
(apply [_ v]
(println "V>" f v)
(f v))))

#?(:clj
(extend-protocol AsyncContext
Object
(async? [_] false)
; Given the implementation of enter/leave,
; `continue` won't be called, and therefore,
; the function call does not need to be bound
; here.
(continue [t f] (f t))
(await [t] t)))

#?(:cljs
(extend-protocol AsyncContext
default
(async? [_] false)
; Given the implementation of enter/leave,
; `continue` won't be called, and therefore,
; the function call does not need to be bound
; here.
(continue [t f] (f t))))

#?(:clj
(extend-protocol AsyncContext
clojure.lang.IDeref
(async? [_] true)
(continue [c f] (future (f @c)))
(catch [c f] (future (let [c @c]
(if (exception? c) (f c) c))))
(continue [c f]
(future ((-forward-bindings f) @c)))
(catch [c f]
(future
(let [c @c]
(if (exception? c) ((-forward-bindings f) c) c))))
(await [c] @c)))

#?(:clj
(extend-protocol AsyncContext
CompletionStage
(async? [_] true)
(continue [this f]
; f may a callable value (i.e. a promise), which doesn't look to
; play nicely with with-binding/bound-fn*.
;
; Therefore, only forwarding bindings when value is
; a function and not some other type.
(.thenApply ^CompletionStage this
^Function (->FunctionWrapper f)))

^Function (->FunctionWrapper
(cond-> f
(fn? f)
(-forward-bindings)))))
(catch [this f]
(letfn [(handler [e]
(if (instance? CompletionException e)
Expand Down
6 changes: 5 additions & 1 deletion src/sieppari/async/core_async.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
#?(:clj clojure.core.async.impl.protocols.Channel
:cljs cljs.core.async.impl.channels/ManyToManyChannel)
(async? [_] true)
(continue [c f] (go (f (cca/<! c))))
#?(:clj (continue [c f]
(let [f' (sa/-forward-bindings f)]
(go
(f' (cca/<! c)))))
:cljs (continue [c f] (go (f (cca/<! c)))))
(catch [c f] (go (let [c (cca/<! c)]
(if (exception? c) (f c) c))))
#?(:clj (await [c] (<!! c))))
24 changes: 20 additions & 4 deletions src/sieppari/async/manifold.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,31 @@
(:require [sieppari.async :as sa]
[manifold.deferred :as d]))

; chain'-, as is being used here, is chain'-/3
; chain'-/3, at the time of writing, has an arglist of:
; [d x f]
; where:
; - `d` is a non-realized manifold deferred value, or nil
; to signal a deferred should be returned/provided
; - `x` is either a deferred or a value. If it is a deferred,
; then the deferred is recurively realized until a non-deferred value
; is yeilded.
; - `f` is a function applied to the unwrapped value `x`, before being either realized
; into `d` or being returned as a sucess or error deferred, depending on the result
; of `(f x)`.
(extend-protocol sa/AsyncContext
manifold.deferred.Deferred
(async? [_] true)
(continue [d f] (d/chain'- nil d f))
(catch [d f] (d/catch' d f))
(continue [d f]
(d/chain'- nil d (sa/-forward-bindings f)))
(catch [d f]
(d/catch' d (sa/-forward-bindings f)))
(await [d] (deref d))

manifold.deferred.ErrorDeferred
(async? [_] true)
(continue [d f] (d/chain'- nil d f))
(catch [d f] (d/catch' d f))
(continue [d f]
(d/chain'- nil d (sa/-forward-bindings f)))
(catch [d f]
(d/catch' d (sa/-forward-bindings f)))
(await [d] (deref d)))
11 changes: 9 additions & 2 deletions src/sieppari/core.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,17 @@
c/Context
(context? [_] true))


(defn- -try [ctx f]
(if f
(try
(let [ctx* (f ctx)]
(let [ctx* #?(:clj (with-bindings (or (:bindings ctx) {})
; Given the various async
; executors may exec on different threads,
; the fn must be bound in order to preserve
; bindings
((bound-fn* f) ctx))
:cljs (f ctx))]
(if (a/async? ctx*)
(a/catch ctx* (fn [e] (assoc ctx :error e)))
ctx*))
Expand Down Expand Up @@ -79,7 +86,7 @@
(callback result))))

(defn- remove-context-keys [ctx]
(dissoc ctx :error :queue :stack))
(dissoc ctx :bindings :error :queue :stack))

;;
;; Public API:
Expand Down
33 changes: 33 additions & 0 deletions test/clj/sieppari/core_async_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -246,3 +246,36 @@
[:error :c]
[:error :b]
[:leave :a]])))

(def ^:dynamic *boundv* 41)

(defn bindings-handler [_]
(is (= 43 *boundv*))
(go
*boundv*))

(def bindings-chain
[{:enter (fn [ctx]
(go
(assoc ctx
:bindings
{#'*boundv* 42})))
:leave (fn [ctx]
(go
(is (= 42 *boundv*))
ctx))}
{:enter (fn [ctx]
(is (= 42 *boundv*)
"In interceptor failed")
(go
(update-in ctx [:bindings #'*boundv*] inc)))
:leave (fn [ctx]
(go
(update-in ctx [:bindings #'*boundv*] dec)))}

bindings-handler])

(deftest async-bindings-test
(fact "bindings are conveyed across interceptor chain"
(sc/execute bindings-chain {}) => 43))

20 changes: 20 additions & 0 deletions test/clj/sieppari/core_execute_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,26 @@
[:leave :x]
[:leave :a]]))


(def ^:dynamic *boundv* 41)

(defn bindings-handler [_]
(is (= 42 *boundv*))
*boundv*)

(def bindings-chain
[{:enter (fn [ctx] (assoc ctx
:bindings
{#'*boundv* 42}))}
{:enter (fn [ctx]
(is (= 42 *boundv*))
ctx)}
bindings-handler])

(deftest use-bindings-test
(fact "bindings are conveyed across interceptor chain"
(s/execute bindings-chain {}) => 42))

; TODO: figure out how enqueue should work? Should enqueue add interceptors just
; before the handler?
#_(deftest enqueue-interceptor-test
Expand Down
42 changes: 42 additions & 0 deletions test/clj/sieppari/manifold_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,45 @@
[:error :c]
[:error :b]
[:leave :a]])))

(def ^:dynamic *boundv* 41)

(defn bindings-handler [_]
(is (= 43 *boundv*))
(d/chain
nil
(fn [_]
*boundv*)))

(def bindings-chain
[{:enter (fn [ctx]
(d/future
(assoc ctx
:bindings
{#'*boundv* 42})))
:leave (fn [ctx]
(d/chain
ctx
(fn [ctx']
(is (= 42 *boundv*))
ctx')))}
{:enter (fn [ctx]
(is (= 42 *boundv*)
"In interceptor failed")
(d/chain
ctx
#(update-in
%
[:bindings #'*boundv*] inc)))
:leave (fn [ctx]
(d/chain
ctx
#(update-in
%
[:bindings #'*boundv*] dec)))}
bindings-handler])

(deftest async-bindings-test
(fact "bindings are conveyed across interceptor chain"
(sc/execute bindings-chain {}) => 43))

35 changes: 35 additions & 0 deletions test/clj/sieppari/promesa_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,38 @@
[:error :c]
[:error :b]
[:leave :a]])))

(def ^:dynamic *boundv* 41)

(defn bindings-handler [_]
(is (= 43 *boundv*))
(p/resolved
*boundv*))

(def bindings-chain
[{:enter (fn [ctx]
(p/resolved
(assoc ctx
:bindings
{#'*boundv* 42})))
:leave (fn [ctx]
(is (= 42 *boundv*))
ctx)}
{:enter (fn [ctx]
(is (= 42 *boundv*))
(-> ctx
(update-in [:bindings #'*boundv*]
inc)
(p/resolved)))
:leave (fn [ctx]
(is (= 43 *boundv*))
(-> ctx
(update-in [:bindings #'*boundv*]
dec)
(p/resolved)))}
bindings-handler])

(deftest async-bindings-test
(fact "bindings are conveyed across interceptor chain"
(sc/execute bindings-chain {}) => 43))

4 changes: 2 additions & 2 deletions test/cljc/sieppari/async/promesa_test.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
(is (as/async? (p/promise 1))))

#?(:clj
(deftest core-async-continue-cljs-callback-test
(deftest core-async-continue-clj-callback-test
(let [respond (promise)
p (p/create
(fn [resolve _]
Expand All @@ -26,7 +26,7 @@
(done))))))))

#?(:clj
(deftest core-async-catch-cljs-callback-test
(deftest core-async-catch-clj-callback-test
(let [respond (promise)
p (p/create
(fn [_ reject]
Expand Down

0 comments on commit 6728643

Please sign in to comment.