From 52b67d8a3836e3f10809c37e4ceab3987e13eac9 Mon Sep 17 00:00:00 2001 From: Eli Knebel Date: Mon, 31 Jul 2023 21:18:10 -0400 Subject: [PATCH] abstract sprocket service and remove mist dependency from core lib --- src/docs.gleam | 44 +++++++- src/sprocket/cassette.gleam | 197 ++++++++++++++++++++---------------- src/sprocket/context.gleam | 9 +- src/sprocket/render.gleam | 2 +- src/sprocket/sprocket.gleam | 25 ++--- 5 files changed, 164 insertions(+), 113 deletions(-) diff --git a/src/docs.gleam b/src/docs.gleam index 427ea77..1942db4 100644 --- a/src/docs.gleam +++ b/src/docs.gleam @@ -1,7 +1,8 @@ import gleam/int import gleam/string -import gleam/result import gleam/option.{None} +import gleam/result +import gleam/dynamic import gleam/erlang/os import gleam/erlang/process import gleam/http/service.{Service} @@ -10,7 +11,9 @@ import gleam/http/response.{Response} import gleam/http.{Get} import gleam/bit_builder.{BitBuilder} import mist -import sprocket/cassette +import mist/websocket +import mist/internal/websocket.{TextMessage} as internal_websocket +import sprocket/cassette.{Cassette, LiveService} import docs/routes import docs/app_context.{AppContext} import docs/utils/logger @@ -27,7 +30,7 @@ pub fn main() { port, mist.handler_func(fn(req) { case req.method, request.path_segments(req) { - Get, ["live"] -> cassette.live_service(req, ca) + Get, ["live"] -> live_service(req, ca) _, _ -> http_service(req, router) } }), @@ -39,6 +42,41 @@ pub fn main() { process.sleep_forever() } +fn live_service(_req: Request(mist.Body), ca: Cassette) { + let LiveService(on_msg, on_init, on_close) = cassette.live_service(ca) + + websocket.with_handler(fn(msg, ws) { + case msg { + TextMessage(msg) -> + on_msg( + msg, + dynamic.from(ws), + fn(msg) { + websocket.send(ws, TextMessage(msg)) + Ok(Nil) + }, + ) + + internal_websocket.BinaryMessage(_) -> { + logger.info("Received binary message") + + Ok(Nil) + } + } + }) + |> websocket.on_init(fn(ws) { + let _ = on_init(dynamic.from(ws)) + + Nil + }) + |> websocket.on_close(fn(ws) { + let _ = on_close(dynamic.from(ws)) + + Nil + }) + |> mist.upgrade +} + fn http_service( req: Request(mist.Body), router: Service(BitString, BitBuilder), diff --git a/src/sprocket/cassette.gleam b/src/sprocket/cassette.gleam index b40cc28..ef63558 100644 --- a/src/sprocket/cassette.gleam +++ b/src/sprocket/cassette.gleam @@ -1,17 +1,13 @@ -import gleam/otp/actor +import gleam/io import gleam/list import gleam/dynamic.{Dynamic, field, optional_field} -import gleam/io import gleam/option.{Option, Some} +import gleam/json import gleam/erlang import gleam/erlang/process.{Subject} -import gleam/json -import gleam/http/request.{Request} -import mist -import mist/websocket -import mist/internal/websocket.{TextMessage} as internal_websocket +import gleam/otp/actor import sprocket/sprocket.{Sprocket} -import sprocket/context.{Element, Updater, WebSocket} +import sprocket/context.{Element, Updater} import sprocket/render.{RenderedElement} import sprocket/internal/render/json as json_renderer import sprocket/internal/patch.{Patch} @@ -45,8 +41,8 @@ pub type Message { StartPreflightCleanupJob(cleanup_preflights: fn() -> Nil) PopPreflight(reply_with: Subject(Result(Preflight, Nil)), id: String) PushSprocket(sprocket: Sprocket) - GetSprocket(reply_with: Subject(Result(Sprocket, Nil)), ws: WebSocket) - PopSprocket(reply_with: Subject(Result(Sprocket, Nil)), ws: WebSocket) + GetSprocket(reply_with: Subject(Result(Sprocket, Nil)), ws: Dynamic) + PopSprocket(reply_with: Subject(Result(Sprocket, Nil)), ws: Dynamic) } fn handle_message(message: Message, state: State) -> actor.Next(State) { @@ -145,6 +141,11 @@ pub type CassetteOpts { CassetteOpts(debug: Bool) } +/// Start the cassette. This is intended to only be called once during web server +/// initiliazation. +/// +/// The cassette is a long running process that manages the state of +/// all sprockets and preflights. pub fn start(opts: Option(CassetteOpts)) -> Cassette { let assert Ok(ca) = actor.start( @@ -163,10 +164,12 @@ pub fn start(opts: Option(CassetteOpts)) -> Cassette { ca } +/// Stop the cassette pub fn stop(ca: Cassette) { process.send(ca, Shutdown) } +/// Get the current state of the cassette. Mostly intended for unit tests and debugging. pub fn get_state(ca: Cassette) { process.call(ca, GetState(_), call_timeout()) } @@ -182,63 +185,82 @@ fn start_preflight_cleanup_job( process.send(ca, StartPreflightCleanupJob(fn() { cleanup_preflights(ca) })) } +/// Pushes a preflight to the cassette. pub fn push_preflight(ca: Cassette, preflight: Preflight) -> Preflight { process.send(ca, PushPreflight(preflight)) preflight } +/// Pops a preflight from the cassette. pub fn pop_preflight(ca: Cassette, id: String) -> Result(Preflight, Nil) { process.call(ca, PopPreflight(_, id), call_timeout()) } -pub fn live_service(_req: Request(mist.Body), ca: Cassette) { - websocket.with_handler(fn(msg, ws) { - handle_ws_message(ca, ws, msg) - - Ok(Nil) - }) - |> websocket.on_init(fn(_ws) { Nil }) - |> websocket.on_close(fn(ws) { - let spkt = pop_sprocket(ca, ws) - - case spkt { - Ok(sprocket) -> sprocket.stop(sprocket) - Error(_) -> { - logger.error("failed to pop sprocket for websoket:") - io.debug(ws) - Nil +pub type LiveService { + LiveService( + on_msg: fn(String, Dynamic, fn(String) -> Result(Nil, Nil)) -> + Result(Nil, Nil), + on_init: fn(Dynamic) -> Result(Nil, Nil), + on_close: fn(Dynamic) -> Result(Nil, Nil), + ) +} + +/// Returns a live service specification for a sprocket websocket. +/// +/// This is a generic interface for handling websocket messages that can be +/// used to implement a live service for a sprocket with a variety of different +/// web servers. +/// +/// Refer to the example docs repository for an example of how to use this. +pub fn live_service(ca: Cassette) { + LiveService( + on_msg: fn(msg, ws, ws_send) { handle_ws_message(ca, msg, ws, ws_send) }, + on_init: fn(_ws) { Ok(Nil) }, + on_close: fn(ws) { + let spkt = pop_sprocket(ca, ws) + + case spkt { + Ok(sprocket) -> { + sprocket.stop(sprocket) + Ok(Nil) + } + Error(_) -> { + logger.error("failed to pop sprocket for websoket:") + io.debug(ws) + Ok(Nil) + } } - } - }) - |> mist.upgrade + }, + ) } +/// Pushes a sprocket to the cassette. fn push_sprocket(ca: Cassette, sprocket: Sprocket) { process.send(ca, PushSprocket(sprocket)) } -fn get_sprocket(ca: Cassette, ws: WebSocket) { +/// Gets a sprocket from the cassette. +fn get_sprocket(ca: Cassette, ws: Dynamic) { process.call(ca, GetSprocket(_, ws), call_timeout()) } -fn pop_sprocket(ca: Cassette, ws: WebSocket) { +/// Pops a sprocket from the cassette. +fn pop_sprocket(ca: Cassette, ws: Dynamic) { process.call(ca, PopSprocket(_, ws), call_timeout()) } +/// Handles client websocket connection initialization. fn connect( ca: Cassette, - ws: WebSocket, + ws: Dynamic, + ws_send: fn(String) -> Result(Nil, Nil), preflight_id: String, preflight_csrf: String, ) { let updater = Updater(send: fn(update) { - let _ = - websocket.send( - ws, - TextMessage(update_to_json(update, get_state(ca).debug)), - ) + let _ = ws_send(update_to_json(update, get_state(ca).debug)) Ok(Nil) }) @@ -249,25 +271,21 @@ fn connect( let sprocket = sprocket.start(Some(ws), view, Some(updater)) push_sprocket(ca, sprocket) - // intitial live render - let rendered = sprocket.render(sprocket) - websocket.send(ws, TextMessage(rendered_to_json(rendered))) - logger.info("Sprocket connected!") - Nil + // intitial live render + let rendered = sprocket.render(sprocket) + ws_send(rendered_to_json(rendered)) } Error(_) -> { logger.error("CSRF token mismatch for preflight id:" <> preflight_id) - websocket.send(ws, TextMessage(error_to_json(InvalidCSRFToken))) + ws_send(error_to_json(InvalidCSRFToken)) } } } Error(Nil) -> { logger.error("Error no sprocket found for preflight id:" <> preflight_id) - websocket.send(ws, TextMessage(error_to_json(PreflightNotFound))) - - Nil + ws_send(error_to_json(PreflightNotFound)) } } } @@ -313,60 +331,59 @@ fn decode_empty(data: Dynamic) { fn handle_ws_message( ca: Cassette, - ws: WebSocket, - msg: internal_websocket.Message, -) { - case msg { - TextMessage(msg) -> { - case - json.decode(msg, dynamic.any([decode_join, decode_event, decode_empty])) - { - Ok(#("join", JoinPayload(id, csrf))) -> { - logger.info("New client joined with preflight id: " <> id) - - connect(ca, ws, id, csrf) - } - Ok(#("event", EventPayload(kind, id, value))) -> { - logger.info("Event: " <> kind <> " " <> id) - - case get_sprocket(ca, ws) { - Ok(sprocket) -> { - case sprocket.get_handler(sprocket, id) { - Ok(context.EventHandler(_, handler)) -> { - // call the event handler - case handler { - CallbackFn(cb) -> { - cb() + msg: String, + ws: Dynamic, + ws_send: fn(String) -> Result(Nil, Nil), +) -> Result(Nil, Nil) { + case + json.decode(msg, dynamic.any([decode_join, decode_event, decode_empty])) + { + Ok(#("join", JoinPayload(id, csrf))) -> { + logger.info("New client joined with preflight id: " <> id) + + connect(ca, ws, ws_send, id, csrf) + } + Ok(#("event", EventPayload(kind, id, value))) -> { + logger.info("Event: " <> kind <> " " <> id) + + case get_sprocket(ca, ws) { + Ok(sprocket) -> { + case sprocket.get_handler(sprocket, id) { + Ok(context.EventHandler(_, handler)) -> { + // call the event handler + case handler { + CallbackFn(cb) -> { + cb() + + Ok(Nil) + } + CallbackWithValueFn(cb) -> { + case value { + Some(value) -> { + cb(value) + + Ok(Nil) } - CallbackWithValueFn(cb) -> { - case value { - Some(value) -> cb(value) - _ -> { - logger.error("Error: expected a value but got None") - io.debug(value) - panic - } - } + _ -> { + logger.error("Error: expected a value but got None") + io.debug(value) + panic } } } - _ -> Nil } } - _ -> Nil + _ -> Ok(Nil) } } - Error(e) -> { - logger.error("Error decoding message") - io.debug(e) - - Nil - } + _ -> Error(Nil) } } - internal_websocket.BinaryMessage(_) -> { - logger.info("Received binary message") - Nil + Error(e) -> { + logger.error("Error decoding message") + io.debug(e) + + Error(Nil) } } } diff --git a/src/sprocket/context.gleam b/src/sprocket/context.gleam index aa579dd..3eb7a8d 100644 --- a/src/sprocket/context.gleam +++ b/src/sprocket/context.gleam @@ -3,8 +3,6 @@ import gleam/list import gleam/option.{Option} import gleam/dynamic.{Dynamic} import sprocket/html/attributes.{Attribute} -import gleam/erlang/process.{Subject} -import glisten/handler.{HandlerMessage} import sprocket/internal/identifiable_callback.{CallbackFn, IdentifiableCallback} import sprocket/hooks.{Hook} @@ -32,9 +30,6 @@ pub type EventHandler { EventHandler(id: Unique, handler: CallbackFn) } -pub type WebSocket = - Subject(HandlerMessage) - pub type Updater(r) { Updater(send: fn(r) -> Result(Nil, Nil)) } @@ -51,13 +46,13 @@ pub type Context { view: Element, wip: ComponentWip, handlers: List(EventHandler), - ws: Option(WebSocket), + ws: Option(Dynamic), render_update: fn() -> Nil, update_hook: fn(Unique, fn(Hook) -> Hook) -> Nil, ) } -pub fn new(ws: Option(WebSocket), view: Element) -> Context { +pub fn new(ws: Option(Dynamic), view: Element) -> Context { Context( view: view, wip: ComponentWip(hooks: ordered_map.new(), index: 0, is_first_render: True), diff --git a/src/sprocket/render.gleam b/src/sprocket/render.gleam index 56a3188..6bade6f 100644 --- a/src/sprocket/render.gleam +++ b/src/sprocket/render.gleam @@ -1,5 +1,5 @@ import gleam/io -import gleam/list.{Continue, Stop} +import gleam/list import gleam/option.{None, Option, Some} import gleam/dynamic.{Dynamic} import sprocket/html/attributes.{Attribute, Event} diff --git a/src/sprocket/sprocket.gleam b/src/sprocket/sprocket.gleam index 2f4eaab..ec5ceb6 100644 --- a/src/sprocket/sprocket.gleam +++ b/src/sprocket/sprocket.gleam @@ -1,12 +1,13 @@ import gleam/list +import gleam/dynamic.{Dynamic} import gleam/map.{Map} import gleam/otp/actor import gleam/erlang/process.{Subject} import gleam/option.{None, Option, Some} import sprocket/internal/logger -import sprocket/context.{ - ComponentHooks, Context, Element, EventHandler, Updater, WebSocket, -} +import sprocket/internal/constants.{call_timeout} +import sprocket/context.{ComponentHooks, + Context, Element, EventHandler, Updater} import sprocket/hooks.{ Callback, Changed, Effect, EffectCleanup, EffectResult, Hook, HookDependencies, HookTrigger, OnMount, OnUpdate, Reducer, Unchanged, WithDeps, compare_deps, @@ -39,7 +40,7 @@ pub type Message { BeginSelfDestruct(Int) CancelSelfDestruct GetRendered(reply_with: Subject(Option(RenderedElement))) - HasWebSocket(reply_with: Subject(Bool), websocket: WebSocket) + HasWebSocket(reply_with: Subject(Bool), ws: Dynamic) SetRenderUpdate(fn() -> Nil) Render(reply_with: Subject(RenderedElement)) RenderUpdate @@ -85,10 +86,10 @@ fn handle_message(message: Message, state: State) -> actor.Next(State) { actor.Continue(state) } - HasWebSocket(reply_with, websocket) -> { + HasWebSocket(reply_with, ws) -> { case state.ctx { - Context(ws: Some(ws), ..) -> { - actor.send(reply_with, ws == websocket) + Context(ws: Some(context_ws), ..) -> { + actor.send(reply_with, context_ws == ws) } _ -> { actor.send(reply_with, False) @@ -217,7 +218,7 @@ fn handle_message(message: Message, state: State) -> actor.Next(State) { /// Start a new sprocket actor pub fn start( - ws: Option(WebSocket), + ws: Option(Dynamic), view: Element, updater: Option(Updater(Patch)), ) { @@ -250,22 +251,22 @@ pub fn stop(actor) { /// Returns true if the actor matches a given websocket connection pub fn has_websocket(actor, websocket) -> Bool { - actor.call(actor, HasWebSocket(_, websocket), 100) + actor.call(actor, HasWebSocket(_, websocket), call_timeout()) } /// Get the previously rendered view from the actor pub fn get_rendered(actor) { - actor.call(actor, GetRendered(_), 100) + actor.call(actor, GetRendered(_), call_timeout()) } /// Get the event handler for a given id pub fn get_handler(actor, id: String) { - actor.call(actor, GetEventHandler(_, unique.from_string(id)), 100) + actor.call(actor, GetEventHandler(_, unique.from_string(id)), call_timeout()) } /// Render the view pub fn render(actor) -> RenderedElement { - actor.call(actor, Render(_), 100) + actor.call(actor, Render(_), call_timeout()) } /// Render the view and send an update Patch to the updater