Skip to content

Commit

Permalink
abstract sprocket service and remove mist dependency from core lib
Browse files Browse the repository at this point in the history
  • Loading branch information
eliknebel committed Aug 1, 2023
1 parent cd0f1c1 commit 52b67d8
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 113 deletions.
44 changes: 41 additions & 3 deletions src/docs.gleam
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import gleam/int
import gleam/string
import gleam/result
import gleam/option.{None}
import gleam/result
import gleam/dynamic
import gleam/erlang/os
import gleam/erlang/process
import gleam/http/service.{Service}
Expand All @@ -10,7 +11,9 @@ import gleam/http/response.{Response}
import gleam/http.{Get}
import gleam/bit_builder.{BitBuilder}
import mist
import sprocket/cassette
import mist/websocket
import mist/internal/websocket.{TextMessage} as internal_websocket
import sprocket/cassette.{Cassette, LiveService}
import docs/routes
import docs/app_context.{AppContext}
import docs/utils/logger
Expand All @@ -27,7 +30,7 @@ pub fn main() {
port,
mist.handler_func(fn(req) {
case req.method, request.path_segments(req) {
Get, ["live"] -> cassette.live_service(req, ca)
Get, ["live"] -> live_service(req, ca)
_, _ -> http_service(req, router)
}
}),
Expand All @@ -39,6 +42,41 @@ pub fn main() {
process.sleep_forever()
}

fn live_service(_req: Request(mist.Body), ca: Cassette) {
let LiveService(on_msg, on_init, on_close) = cassette.live_service(ca)

websocket.with_handler(fn(msg, ws) {
case msg {
TextMessage(msg) ->
on_msg(
msg,
dynamic.from(ws),
fn(msg) {
websocket.send(ws, TextMessage(msg))
Ok(Nil)
},
)

internal_websocket.BinaryMessage(_) -> {
logger.info("Received binary message")

Ok(Nil)
}
}
})
|> websocket.on_init(fn(ws) {
let _ = on_init(dynamic.from(ws))

Nil
})
|> websocket.on_close(fn(ws) {
let _ = on_close(dynamic.from(ws))

Nil
})
|> mist.upgrade
}

fn http_service(
req: Request(mist.Body),
router: Service(BitString, BitBuilder),
Expand Down
197 changes: 107 additions & 90 deletions src/sprocket/cassette.gleam
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
import gleam/otp/actor
import gleam/io
import gleam/list
import gleam/dynamic.{Dynamic, field, optional_field}
import gleam/io
import gleam/option.{Option, Some}
import gleam/json
import gleam/erlang
import gleam/erlang/process.{Subject}
import gleam/json
import gleam/http/request.{Request}
import mist
import mist/websocket
import mist/internal/websocket.{TextMessage} as internal_websocket
import gleam/otp/actor
import sprocket/sprocket.{Sprocket}
import sprocket/context.{Element, Updater, WebSocket}
import sprocket/context.{Element, Updater}
import sprocket/render.{RenderedElement}
import sprocket/internal/render/json as json_renderer
import sprocket/internal/patch.{Patch}
Expand Down Expand Up @@ -45,8 +41,8 @@ pub type Message {
StartPreflightCleanupJob(cleanup_preflights: fn() -> Nil)
PopPreflight(reply_with: Subject(Result(Preflight, Nil)), id: String)
PushSprocket(sprocket: Sprocket)
GetSprocket(reply_with: Subject(Result(Sprocket, Nil)), ws: WebSocket)
PopSprocket(reply_with: Subject(Result(Sprocket, Nil)), ws: WebSocket)
GetSprocket(reply_with: Subject(Result(Sprocket, Nil)), ws: Dynamic)
PopSprocket(reply_with: Subject(Result(Sprocket, Nil)), ws: Dynamic)
}

fn handle_message(message: Message, state: State) -> actor.Next(State) {
Expand Down Expand Up @@ -145,6 +141,11 @@ pub type CassetteOpts {
CassetteOpts(debug: Bool)
}

/// Start the cassette. This is intended to only be called once during web server
/// initiliazation.
///
/// The cassette is a long running process that manages the state of
/// all sprockets and preflights.
pub fn start(opts: Option(CassetteOpts)) -> Cassette {
let assert Ok(ca) =
actor.start(
Expand All @@ -163,10 +164,12 @@ pub fn start(opts: Option(CassetteOpts)) -> Cassette {
ca
}

/// Stop the cassette
pub fn stop(ca: Cassette) {
process.send(ca, Shutdown)
}

/// Get the current state of the cassette. Mostly intended for unit tests and debugging.
pub fn get_state(ca: Cassette) {
process.call(ca, GetState(_), call_timeout())
}
Expand All @@ -182,63 +185,82 @@ fn start_preflight_cleanup_job(
process.send(ca, StartPreflightCleanupJob(fn() { cleanup_preflights(ca) }))
}

/// Pushes a preflight to the cassette.
pub fn push_preflight(ca: Cassette, preflight: Preflight) -> Preflight {
process.send(ca, PushPreflight(preflight))

preflight
}

/// Pops a preflight from the cassette.
pub fn pop_preflight(ca: Cassette, id: String) -> Result(Preflight, Nil) {
process.call(ca, PopPreflight(_, id), call_timeout())
}

pub fn live_service(_req: Request(mist.Body), ca: Cassette) {
websocket.with_handler(fn(msg, ws) {
handle_ws_message(ca, ws, msg)

Ok(Nil)
})
|> websocket.on_init(fn(_ws) { Nil })
|> websocket.on_close(fn(ws) {
let spkt = pop_sprocket(ca, ws)

case spkt {
Ok(sprocket) -> sprocket.stop(sprocket)
Error(_) -> {
logger.error("failed to pop sprocket for websoket:")
io.debug(ws)
Nil
pub type LiveService {
LiveService(
on_msg: fn(String, Dynamic, fn(String) -> Result(Nil, Nil)) ->
Result(Nil, Nil),
on_init: fn(Dynamic) -> Result(Nil, Nil),
on_close: fn(Dynamic) -> Result(Nil, Nil),
)
}

/// Returns a live service specification for a sprocket websocket.
///
/// This is a generic interface for handling websocket messages that can be
/// used to implement a live service for a sprocket with a variety of different
/// web servers.
///
/// Refer to the example docs repository for an example of how to use this.
pub fn live_service(ca: Cassette) {
LiveService(
on_msg: fn(msg, ws, ws_send) { handle_ws_message(ca, msg, ws, ws_send) },
on_init: fn(_ws) { Ok(Nil) },
on_close: fn(ws) {
let spkt = pop_sprocket(ca, ws)

case spkt {
Ok(sprocket) -> {
sprocket.stop(sprocket)
Ok(Nil)
}
Error(_) -> {
logger.error("failed to pop sprocket for websoket:")
io.debug(ws)
Ok(Nil)
}
}
}
})
|> mist.upgrade
},
)
}

/// Pushes a sprocket to the cassette.
fn push_sprocket(ca: Cassette, sprocket: Sprocket) {
process.send(ca, PushSprocket(sprocket))
}

fn get_sprocket(ca: Cassette, ws: WebSocket) {
/// Gets a sprocket from the cassette.
fn get_sprocket(ca: Cassette, ws: Dynamic) {
process.call(ca, GetSprocket(_, ws), call_timeout())
}

fn pop_sprocket(ca: Cassette, ws: WebSocket) {
/// Pops a sprocket from the cassette.
fn pop_sprocket(ca: Cassette, ws: Dynamic) {
process.call(ca, PopSprocket(_, ws), call_timeout())
}

/// Handles client websocket connection initialization.
fn connect(
ca: Cassette,
ws: WebSocket,
ws: Dynamic,
ws_send: fn(String) -> Result(Nil, Nil),
preflight_id: String,
preflight_csrf: String,
) {
let updater =
Updater(send: fn(update) {
let _ =
websocket.send(
ws,
TextMessage(update_to_json(update, get_state(ca).debug)),
)
let _ = ws_send(update_to_json(update, get_state(ca).debug))
Ok(Nil)
})

Expand All @@ -249,25 +271,21 @@ fn connect(
let sprocket = sprocket.start(Some(ws), view, Some(updater))
push_sprocket(ca, sprocket)

// intitial live render
let rendered = sprocket.render(sprocket)
websocket.send(ws, TextMessage(rendered_to_json(rendered)))

logger.info("Sprocket connected!")

Nil
// intitial live render
let rendered = sprocket.render(sprocket)
ws_send(rendered_to_json(rendered))
}
Error(_) -> {
logger.error("CSRF token mismatch for preflight id:" <> preflight_id)
websocket.send(ws, TextMessage(error_to_json(InvalidCSRFToken)))
ws_send(error_to_json(InvalidCSRFToken))
}
}
}
Error(Nil) -> {
logger.error("Error no sprocket found for preflight id:" <> preflight_id)
websocket.send(ws, TextMessage(error_to_json(PreflightNotFound)))

Nil
ws_send(error_to_json(PreflightNotFound))
}
}
}
Expand Down Expand Up @@ -313,60 +331,59 @@ fn decode_empty(data: Dynamic) {

fn handle_ws_message(
ca: Cassette,
ws: WebSocket,
msg: internal_websocket.Message,
) {
case msg {
TextMessage(msg) -> {
case
json.decode(msg, dynamic.any([decode_join, decode_event, decode_empty]))
{
Ok(#("join", JoinPayload(id, csrf))) -> {
logger.info("New client joined with preflight id: " <> id)

connect(ca, ws, id, csrf)
}
Ok(#("event", EventPayload(kind, id, value))) -> {
logger.info("Event: " <> kind <> " " <> id)

case get_sprocket(ca, ws) {
Ok(sprocket) -> {
case sprocket.get_handler(sprocket, id) {
Ok(context.EventHandler(_, handler)) -> {
// call the event handler
case handler {
CallbackFn(cb) -> {
cb()
msg: String,
ws: Dynamic,
ws_send: fn(String) -> Result(Nil, Nil),
) -> Result(Nil, Nil) {
case
json.decode(msg, dynamic.any([decode_join, decode_event, decode_empty]))
{
Ok(#("join", JoinPayload(id, csrf))) -> {
logger.info("New client joined with preflight id: " <> id)

connect(ca, ws, ws_send, id, csrf)
}
Ok(#("event", EventPayload(kind, id, value))) -> {
logger.info("Event: " <> kind <> " " <> id)

case get_sprocket(ca, ws) {
Ok(sprocket) -> {
case sprocket.get_handler(sprocket, id) {
Ok(context.EventHandler(_, handler)) -> {
// call the event handler
case handler {
CallbackFn(cb) -> {
cb()

Ok(Nil)
}
CallbackWithValueFn(cb) -> {
case value {
Some(value) -> {
cb(value)

Ok(Nil)
}
CallbackWithValueFn(cb) -> {
case value {
Some(value) -> cb(value)
_ -> {
logger.error("Error: expected a value but got None")
io.debug(value)
panic
}
}
_ -> {
logger.error("Error: expected a value but got None")
io.debug(value)
panic
}
}
}
_ -> Nil
}
}
_ -> Nil
_ -> Ok(Nil)
}
}
Error(e) -> {
logger.error("Error decoding message")
io.debug(e)

Nil
}
_ -> Error(Nil)
}
}
internal_websocket.BinaryMessage(_) -> {
logger.info("Received binary message")
Nil
Error(e) -> {
logger.error("Error decoding message")
io.debug(e)

Error(Nil)
}
}
}
Expand Down
Loading

0 comments on commit 52b67d8

Please sign in to comment.