Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recipe: custom remote to offload work to a WebWorker #7

Open
holyjak opened this issue Sep 6, 2023 · 0 comments
Open

Recipe: custom remote to offload work to a WebWorker #7

holyjak opened this issue Sep 6, 2023 · 0 comments

Comments

@holyjak
Copy link
Contributor

holyjak commented Sep 6, 2023

Demonstrate how to easily leverage a custom remote to send work for processing in a Web Worker...

Notes:

  • The Worker registers for message events, and then sends 0 or more progress events, and finally some kind of finish event.
  • The remote supports abort, so a worker can be killed
  • You have to talk via js-serializable messages but the remote can convert to/from EDN so that auto-merge facilities work, thus returning works

Remote

(defn web-worker-remote
  "Create a Fulcro remote that forwards (only) mutations web workers. The web worker
   is given by `js-file`.
   The worker will receive an event of the form: #js {:cmd (name mutation-symbol) :payload (clj->js params) :requestId unique-str}.
   The web worker MUST post a messages that take the form: #js {:cmd 'xxx' :requestId unique-str-from-event :payload {}}`,
   where the payload must always be a map, and the requestId must be the one it is responding to (what it was sent as an event to
   process). The special command `done` means the worker has finished the given mutation, and the `payload` is the return value
   of the mutation. ALL other commands are sent as UPDATES to the same mutation (e.g. progress reports).
   The returned map from the worker will be turned to CLJ, and will be available for normalization via the normal mutation
   `returning` mechanism. It is therefore recommended that you include a namespaced ID in the map for this
   purpose (e.g. #js {'project/id': id-to-normalize ...})."
  [js-file]
  (log/debug "Starting web worker remote" js-file)
  (let [active-requests               (atom {})]
    {:transmit! (fn transmit! [_ {:keys [::txn/ast ::txn/result-handler ::txn/update-handler] :as send-node}]
                  (let [{:keys [dispatch-key params type] :as _mutation-node} (if (= :root (:type ast))
                                                                                (first (:children ast))
                                                                                ast)
                        request-id (str (random-uuid))
                        abort-id (or
                                   (-> send-node ::txn/options ::txn/abort-id)
                                   (-> send-node ::txn/options :abort-id)
                                   request-id)
                        edn (eql/ast->query ast)]
                    (let [worker (js/Worker. js-file)]
                      (try
                        (when (not= type :call) (throw (ex-info "This remote only handles mutations" {})))
                        (let [msg #js {:cmd (name dispatch-key)
                                       :requestId request-id
                                       :payload (clj->js params)}
                              listener (fn listener* [event]
                                         (let [{:keys [cmd payload]} (js->clj (.-data event))]
                                           (log/debug "Received worker event with " cmd payload)
                                           (if (= cmd "done")
                                             (do
                                               (.removeEventListener worker "message" listener*)
                                               (.terminate worker)
                                               (swap! active-requests dissoc request-id)
                                               (try
                                                 (result-handler {:status-code 200
                                                                  :transaction edn
                                                                  :body {dispatch-key payload}})
                                                 (catch :default e
                                                   (log/error e "Result handler for web worker remote failed."))))
                                             (try
                                               (update-handler {:transaction edn
                                                                :body {dispatch-key payload}})
                                               (catch :default e
                                                 (log/error e "Web worker update handler threw unexpected exception."))))))]
                          (swap! active-requests assoc request-id {:listener listener
                                                                   :abort-id abort-id
                                                                   :worker worker})
                          (.addEventListener worker "message" listener)
                          (.postMessage worker msg)
                          request-id)
                        (catch :default e
                          (log/error e "Unexpected internal exception")
                          (result-handler {:status-code 500
                                           :status-text "Internal Error"})
                          (.terminate worker))))))
     :abort! (fn [_ id]
               (when-let [{:keys [worker
                                  listener
                                  request-id]} (reduce-kv
                                                 (fn [_ k v]
                                                   (when (= id (:abort-id v))
                                                     (reduced (assoc v :requestId k))))
                                                 nil
                                                 @active-requests)]
                 (log/debug "Aborting request " request-id)
                 (.removeEventListener worker "message" listener)
                 (.terminate worker)
                 (swap! active-requests dissoc request-id)))}))

Notes: Here we start a new worker for each request. That way, you could also leverage comp/transact!'s option :parallel? true to submit multiple work items in parallel. Depending on your use case, you might want to have a single, reused worker or a pool of workers.

Worker

function progress(request, id, stage) {
  self.postMessage({
    request, payload: {
      "worker-result/status": stage,
      "worker-result/id": id
    }
  })
}

function done(request, output) {
  self.postMessage({
    cmd: "done",
    request, payload: {
      "worker-result/status": "success",
      "worker-result/output": output,
    }
  })
}

function failed(request, error) {
  self.postMessage({
    cmd: "done",
    request, payload: {
      "worker-result/status": "failed",
      "worker-result/output": { errors: error }
    }
  })
}

self.addEventListener('message', (e) => {
  const { data: { cmd, requestId, payload } } = e
  const { text } = payload
  const doTheWork = (text) => { try { done(requestId, text.upperCase()); } catch (e) { failed(requestId, e); } }

  progress(requestId, `Starting...`);

  if (cmd === "transform") {
    setTimeout(() => progress(requestId, `Still working...`), 1000);
    setTimeout(doTheWork, 1000);
  } else { failed(requestId, `Unknown command: ${cmd}`); }
})

UI and mutation

(ns ex-worker-remote
  (:require
    [com.fulcrologic.fulcro.algorithms.merge :as merge]
    [com.fulcrologic.fulcro.dom :as dom]
    [com.fulcrologic.fulcro.mutations :as m]
    [com.fulcrologic.fulcro.components :as comp :refer [defsc]]
    [com.fulcrologic.fulcro.algorithms.normalized-state :as fns]
    [com.fulcrologic.fulcro.algorithms.tx-processing :as txn]))

(defsc WorkerResult [_ {:worker-result/keys [status output]}]
  {:query [:worker-result/id :worker-result/output :worker-result/status]
   :ident :worker-result/id}
  (dom/div
    (str "Result status: " (or status "none."))
    (if (= status "failed")
      (dom/div :.ui.red.message (dom/p "Something went wrong:" (pr-str (some-> output (js/JSON.parse) (js->clj :keywordize-keys true) :errors seq))))
      (dom/div "Output: " (dom/kbd output)))))

(def ui-worker-result (comp/factory WorkerResult))

(m/defmutation do-in-worker
  "Send work to a Web Worker & return the result. Input:

    id - The ID of the request. You make it up (SHA of source is recommended).
    input - the input (text) to process
    result-key - (optional) Where the build result should be joined on `ref`.

    The mutation returns a WorkerResult (normalized as `[:worker-result/id id]`).
    "
  [{:keys [id input result-key] :or {result-key ::result}}]
  (action [{:keys [state ref] :as _env}]
          (fns/swap!-> state
                       (assoc-in (conj ref :ui/working?) true)
                       (merge/merge-component WorkerResult {:worker-result/id id
                                                           :worker-result/output {}
                                                           :worker-result/status "working"}
                                              :replace (conj ref result-key))))
  (progress-action [{:keys [state progress] :as _env}]
                   (when-let [status (-> progress :body (get `do-in-worker) :worker-result/status)]
                     (swap! state assoc-in [:worker-result/id id :worker-result/status] status)))
  (ok-action [{:keys [state ref] ::txn/keys [options]}]
             (fns/swap!-> state (assoc-in (conj ref :ui/working?) false))
             (let [status (get-in @state [:worker-result/id id :worker-result/status])
                   {:keys [on-success]} options]
               (when (and (= "success" status) on-success) (on-success))))
  (error-action [{:keys [state ref]}]
                (fns/swap!-> state
                             (assoc-in (conj ref :ui/working?) false)
                             (assoc-in [:worker-result/id id :worker-result/status] "failed")))
  (web-worker [env]
              (-> env
                  (m/with-params {:id id :text input})
                  (m/returning WorkerResult))))

Putting it all together

Create fuclro-app with remotes containing :web-worker (web-worker-remote "path/to/my-worker.js"), add a UI button to transact the do-in-worker mutation and display the progress/result.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant