diff --git a/.gitignore b/.gitignore index 77f50ae..cacfae6 100644 --- a/.gitignore +++ b/.gitignore @@ -30,4 +30,5 @@ examples/multipleGets examples/streamClient examples/localServer examples/dataStream +examples/tlsServer out diff --git a/examples/localServer.nim b/examples/localServer.nim index 70e2e17..706c9e0 100644 --- a/examples/localServer.nim +++ b/examples/localServer.nim @@ -3,7 +3,6 @@ {.define: ssl.} from std/os import getEnv -import std/strutils import std/asyncdispatch import ../src/hyperx/server @@ -14,72 +13,30 @@ const keyFile = getEnv "HYPERX_TEST_KEYFILE" proc processStream(strm: ClientStream) {.async.} = ## Full-duplex echo stream - with strm: - let data = new string - await strm.recvHeaders(data) - await strm.sendHeaders( - @[(":status", "200")], finish = false - ) - if strm.recvEnded: - data[] = "Hello world!" - await strm.sendBody(data, finish = true) - while not strm.recvEnded: - data[].setLen 0 - await strm.recvBody(data) - await strm.sendBody(data, finish = strm.recvEnded) - if not strm.sendEnded: - # recv ended while sending; trailer headers or empty data recv - data[].setLen 0 - await strm.sendBody(data, finish = true) - -proc processStreamHandler(strm: ClientStream, propagateErr: bool) {.async.} = - try: - await processStream(strm) - except HyperxStrmError as err: - if propagateErr: - raise err - debugEcho err.msg - except HyperxConnError as err: - if propagateErr: - raise err - debugEcho err.msg - -proc processClient(client: ClientContext, propagateErr: bool) {.async.} = - with client: - while client.isConnected: - let strm = await client.recvStream() - asyncCheck processStreamHandler(strm, propagateErr) - -proc processClientHandler(client: ClientContext, propagateErr: bool) {.async.} = - try: - await processClient(client, propagateErr) - except HyperxConnError as err: - if propagateErr: - raise err - debugEcho err.msg - -# xxx propagateErr = false -proc serve*(server: ServerContext, propagateErr = true) {.async.} = - with server: - while server.isConnected: - let client = await server.recvClient() - asyncCheck processClientHandler(client, propagateErr) - -proc newServer*(): ServerContext = - newServer( + let data = new string + await strm.recvHeaders(data) + await strm.sendHeaders( + @[(":status", "200")], finish = false + ) + if strm.recvEnded: + data[] = "Hello world!" + await strm.sendBody(data, finish = true) + while not strm.recvEnded: + data[].setLen 0 + await strm.recvBody(data) + await strm.sendBody(data, finish = strm.recvEnded) + if not strm.sendEnded: + # recv ended while sending; trailer headers or empty data recv + data[].setLen 0 + await strm.sendBody(data, finish = true) + +proc main() {.async.} = + echo "Serving forever" + let server = newServer( localHost, localPort, certFile, keyFile ) + await server.serve(processStream) when isMainModule: - proc main() {.async.} = - echo "Serving forever" - var server = newServer() - await server.serve(propagateErr = false) - when true: - waitFor main() - else: # this is better for profiling/benchmarking but uses 100% CPU - var fut = main() - while not fut.finished: - poll(0) - fut.read() + waitFor main() echo "ok" diff --git a/src/hyperx/server.nim b/src/hyperx/server.nim index 501744c..59f6b5c 100644 --- a/src/hyperx/server.nim +++ b/src/hyperx/server.nim @@ -179,3 +179,41 @@ proc sendHeaders*( client.hpackEncode(headers, "content-length", $contentLen) let finish = contentLen <= 0 result = strm.sendHeadersImpl(headers, finish) + +type StreamCallback* = + proc (stream: ClientStream): Future[void] {.closure, gcsafe.} + +proc processStreamHandler( + strm: ClientStream, + callback: StreamCallback +) {.async.} = + try: + with strm: + await callback(strm) + except HyperxError: + debugInfo getCurrentException().getStackTrace() + debugInfo getCurrentException().msg + +proc processClientHandler( + client: ClientContext, + callback: StreamCallback +) {.async.} = + try: + with client: + while client.isConnected: + let strm = await client.recvStream() + asyncCheck processStreamHandler(strm, callback) + except HyperxError: + debugInfo getCurrentException().getStackTrace() + debugInfo getCurrentException().msg + when defined(hyperxStats): + echoStats client + +proc serve*( + server: ServerContext, + callback: StreamCallback +) {.async.} = + with server: + while server.isConnected: + let client = await server.recvClient() + asyncCheck processClientHandler(client, callback) diff --git a/tests/functional/tserver.nim b/tests/functional/tserver.nim index 597abdb..eaca1ca 100644 --- a/tests/functional/tserver.nim +++ b/tests/functional/tserver.nim @@ -11,62 +11,33 @@ const certFile = getEnv "HYPERX_TEST_CERTFILE" const keyFile = getEnv "HYPERX_TEST_KEYFILE" proc processStream(strm: ClientStream) {.async.} = - with strm: - let data = new string - await strm.recvHeaders(data) - if "x-flow-control-check" in data[]: - # let recv buff for a bit - #debugEcho "sleeping" - await sleepAsync(10_000) - await strm.sendHeaders( - @[(":status", "200")], finish = false - ) + let data = new string + await strm.recvHeaders(data) + if "x-flow-control-check" in data[]: + # let recv buff for a bit + #debugEcho "sleeping" + await sleepAsync(10_000) + await strm.sendHeaders( + @[(":status", "200")], finish = false + ) + await strm.sendBody(data, finish = strm.recvEnded) + while not strm.recvEnded: + data[].setLen 0 + await strm.recvBody(data) await strm.sendBody(data, finish = strm.recvEnded) - while not strm.recvEnded: - data[].setLen 0 - await strm.recvBody(data) - await strm.sendBody(data, finish = strm.recvEnded) - #GC_fullCollect() - -proc processStreamHandler(strm: ClientStream) {.async.} = - try: - await processStream(strm) - except HyperxStrmError as err: - debugEcho err.msg - except HyperxConnError as err: - debugEcho err.msg - -proc processClient(client: ClientContext) {.async.} = - with client: - while client.isConnected: - let strm = await client.recvStream() - asyncCheck processStreamHandler(strm) - -proc processClientHandler(client: ClientContext) {.async.} = - try: - await processClient(client) - except HyperxConnError as err: - debugEcho err.msg - when defined(hyperxStats): - echoStats client #GC_fullCollect() proc serve*(server: ServerContext) {.async.} = - with server: - while server.isConnected: - let client = await server.recvClient() - asyncCheck processClientHandler(client) + await server.serve(processStream) -proc newServer(): ServerContext = - newServer( +proc main() {.async.} = + echo "Serving forever" + var server = newServer( localHost, localPort, certFile, keyFile ) + await server.serve() when isMainModule: - proc main() {.async.} = - echo "Serving forever" - var server = newServer() - await server.serve() waitFor main() doAssert not hasPendingOperations() echo "ok"