Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serve API #27

Merged
merged 3 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ examples/multipleGets
examples/streamClient
examples/localServer
examples/dataStream
examples/tlsServer
out
87 changes: 22 additions & 65 deletions examples/localServer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
{.define: ssl.}

from std/os import getEnv
import std/strutils
import std/asyncdispatch
import ../src/hyperx/server

Expand All @@ -14,72 +13,30 @@ const keyFile = getEnv "HYPERX_TEST_KEYFILE"

proc processStream(strm: ClientStream) {.async.} =
## Full-duplex echo stream
with strm:
let data = new string
await strm.recvHeaders(data)
await strm.sendHeaders(
@[(":status", "200")], finish = false
)
if strm.recvEnded:
data[] = "Hello world!"
await strm.sendBody(data, finish = true)
while not strm.recvEnded:
data[].setLen 0
await strm.recvBody(data)
await strm.sendBody(data, finish = strm.recvEnded)
if not strm.sendEnded:
# recv ended while sending; trailer headers or empty data recv
data[].setLen 0
await strm.sendBody(data, finish = true)

proc processStreamHandler(strm: ClientStream, propagateErr: bool) {.async.} =
try:
await processStream(strm)
except HyperxStrmError as err:
if propagateErr:
raise err
debugEcho err.msg
except HyperxConnError as err:
if propagateErr:
raise err
debugEcho err.msg

proc processClient(client: ClientContext, propagateErr: bool) {.async.} =
with client:
while client.isConnected:
let strm = await client.recvStream()
asyncCheck processStreamHandler(strm, propagateErr)

proc processClientHandler(client: ClientContext, propagateErr: bool) {.async.} =
try:
await processClient(client, propagateErr)
except HyperxConnError as err:
if propagateErr:
raise err
debugEcho err.msg

# xxx propagateErr = false
proc serve*(server: ServerContext, propagateErr = true) {.async.} =
with server:
while server.isConnected:
let client = await server.recvClient()
asyncCheck processClientHandler(client, propagateErr)

proc newServer*(): ServerContext =
newServer(
let data = new string
await strm.recvHeaders(data)
await strm.sendHeaders(
@[(":status", "200")], finish = false
)
if strm.recvEnded:
data[] = "Hello world!"
await strm.sendBody(data, finish = true)
while not strm.recvEnded:
data[].setLen 0
await strm.recvBody(data)
await strm.sendBody(data, finish = strm.recvEnded)
if not strm.sendEnded:
# recv ended while sending; trailer headers or empty data recv
data[].setLen 0
await strm.sendBody(data, finish = true)

proc main() {.async.} =
echo "Serving forever"
let server = newServer(
localHost, localPort, certFile, keyFile
)
await server.serve(processStream)

when isMainModule:
proc main() {.async.} =
echo "Serving forever"
var server = newServer()
await server.serve(propagateErr = false)
when true:
waitFor main()
else: # this is better for profiling/benchmarking but uses 100% CPU
var fut = main()
while not fut.finished:
poll(0)
fut.read()
waitFor main()
echo "ok"
38 changes: 38 additions & 0 deletions src/hyperx/server.nim
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,41 @@ proc sendHeaders*(
client.hpackEncode(headers, "content-length", $contentLen)
let finish = contentLen <= 0
result = strm.sendHeadersImpl(headers, finish)

type StreamCallback* =
proc (stream: ClientStream): Future[void] {.closure, gcsafe.}

proc processStreamHandler(
strm: ClientStream,
callback: StreamCallback
) {.async.} =
try:
with strm:
await callback(strm)
except HyperxError:
debugInfo getCurrentException().getStackTrace()
debugInfo getCurrentException().msg

proc processClientHandler(
client: ClientContext,
callback: StreamCallback
) {.async.} =
try:
with client:
while client.isConnected:
let strm = await client.recvStream()
asyncCheck processStreamHandler(strm, callback)
except HyperxError:
debugInfo getCurrentException().getStackTrace()
debugInfo getCurrentException().msg
when defined(hyperxStats):
echoStats client

proc serve*(
server: ServerContext,
callback: StreamCallback
) {.async.} =
with server:
while server.isConnected:
let client = await server.recvClient()
asyncCheck processClientHandler(client, callback)
65 changes: 18 additions & 47 deletions tests/functional/tserver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,62 +11,33 @@ const certFile = getEnv "HYPERX_TEST_CERTFILE"
const keyFile = getEnv "HYPERX_TEST_KEYFILE"

proc processStream(strm: ClientStream) {.async.} =
with strm:
let data = new string
await strm.recvHeaders(data)
if "x-flow-control-check" in data[]:
# let recv buff for a bit
#debugEcho "sleeping"
await sleepAsync(10_000)
await strm.sendHeaders(
@[(":status", "200")], finish = false
)
let data = new string
await strm.recvHeaders(data)
if "x-flow-control-check" in data[]:
# let recv buff for a bit
#debugEcho "sleeping"
await sleepAsync(10_000)
await strm.sendHeaders(
@[(":status", "200")], finish = false
)
await strm.sendBody(data, finish = strm.recvEnded)
while not strm.recvEnded:
data[].setLen 0
await strm.recvBody(data)
await strm.sendBody(data, finish = strm.recvEnded)
while not strm.recvEnded:
data[].setLen 0
await strm.recvBody(data)
await strm.sendBody(data, finish = strm.recvEnded)
#GC_fullCollect()

proc processStreamHandler(strm: ClientStream) {.async.} =
try:
await processStream(strm)
except HyperxStrmError as err:
debugEcho err.msg
except HyperxConnError as err:
debugEcho err.msg

proc processClient(client: ClientContext) {.async.} =
with client:
while client.isConnected:
let strm = await client.recvStream()
asyncCheck processStreamHandler(strm)

proc processClientHandler(client: ClientContext) {.async.} =
try:
await processClient(client)
except HyperxConnError as err:
debugEcho err.msg
when defined(hyperxStats):
echoStats client
#GC_fullCollect()

proc serve*(server: ServerContext) {.async.} =
with server:
while server.isConnected:
let client = await server.recvClient()
asyncCheck processClientHandler(client)
await server.serve(processStream)

proc newServer(): ServerContext =
newServer(
proc main() {.async.} =
echo "Serving forever"
var server = newServer(
localHost, localPort, certFile, keyFile
)
await server.serve()

when isMainModule:
proc main() {.async.} =
echo "Serving forever"
var server = newServer()
await server.serve()
waitFor main()
doAssert not hasPendingOperations()
echo "ok"
Loading