diff --git a/.github/SECURITY.md b/.github/SECURITY.md new file mode 100644 index 0000000..b6f0ebe --- /dev/null +++ b/.github/SECURITY.md @@ -0,0 +1,15 @@ +# Security Policy + +## Supported Versions + +The following version of the project are currently being supported with security updates. + +| Version | Supported | +| ------- | ------------------ | +| 0.1.x | :white_check_mark: | + + +## Reporting a Vulnerability + +To report a vulnerability, please open an issue with the label "security". + diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..064381b --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,206 @@ +on: + pull_request: + types: [review_requested, opened] + branches: + - 'releases/**' + push: + branches: + - 'main' + - 'releases/**' + - 'testing/**' + - 'feat/**' + - 'fix/**' + - 'dev/**' +concurrency: + group: ${{ github.workflow }}-${{ !contains(github.event.pull_request.labels.*.name, 'test-flaky-ci') && github.head_ref || github.run_id }} + cancel-in-progress: true + +name: Webtransport CI +jobs: + build: + if: | + github.event_name == 'push' || !startsWith(github.event.pull_request.head.label, 'hironichu:') + strategy: + matrix: + os: [ 'ubuntu-latest', 'self-hosted', "macos-latest", "windows-latest"] + job: [build] + profile: [debug, release] + include: + - os: 'ubuntu-latest' + job: lint + profile: debug + + name: ${{ matrix.job }} ${{ matrix.profile }} ${{ matrix.os }} + runs-on: ${{ matrix.os }} + timeout-minutes: 10 + env: + CARGO_TERM_COLOR: always + RUST_BACKTRACE: full + steps: + - name: Clone repository + uses: actions/checkout@v3 + with: + fetch-depth: 2 + - name: Create source tarballs (release, linux) + if: | + startsWith(matrix.os, 'ubuntu') && + matrix.profile == 'release' && + matrix.job == 'build' && + github.repository == 'hironichu/webtransport' && + startsWith(github.ref, 'refs/tags/') + run: | + mkdir -p target/release + tar --exclude=".git*" --exclude=target --exclude=third_party/prebuilt \ + -czvf target/release/webtransport.tar.gz -C .. webtransport + + - name: Setting Up Rust + uses: dtolnay/rust-toolchain@master + if: | + matrix.job != 'lint' + with: + toolchain: stable + - name: Install Deno from .land + if: matrix.os != 'self-hosted' + uses: denoland/setup-deno@v1 + with: + deno-version: v1.x + + - name: Install Deno from source + if: matrix.os == 'self-hosted' + run: | + echo "Check if Deno is already installed" + if ! type deno > /dev/null; then + echo "Deno is not installed, installing..." + curl -s https://gist.githubusercontent.com/LukeChannings/09d53f5c364391042186518c8598b85e/raw/ac8cd8c675b985edd4b3e16df63ffef14d1f0e24/deno_install.sh | sh + else + echo "Deno is already installed" + fi + - name: Error on warning + run: echo "RUSTFLAGS=-D warnings" >> $GITHUB_ENV + + - name: Deno Format + if: matrix.job == 'lint' + run: deno task util:fmt + + - name: Deno lint + if: matrix.job == 'lint' + run: deno task util:lint + + - name: Build Debug + if: | + (matrix.job == 'build' && matrix.profile == 'debug') + run: deno task build:${{matrix.profile}} + + - name: Build release + if: | + (matrix.job == 'build' && matrix.profile == 'release') && + (github.ref == 'refs/heads/main' || startsWith(github.ref, 'refs/tags/')) + run: deno task build:${{matrix.profile}} + + - name: Move arm file (release) + if: startsWith(matrix.os, 'self-hosted') && matrix.job == 'build' && matrix.profile == 'release' && (github.ref == 'refs/heads/main' || startsWith(github.ref, 'refs/tags/')) + run: | + mv target/${{matrix.profile}}/libwebtransport.so target/${{matrix.profile}}/libwebtransport_aarch64.so + + - name: Move arm file (debug) + if: startsWith(matrix.os, 'self-hosted') && matrix.job == 'build' && matrix.profile == 'debug' + run: | + mv target/${{matrix.profile}}/libwebtransport.so target/${{matrix.profile}}/libwebtransport_aarch64.so + - name: Upload artifact (release) + uses: actions/upload-artifact@master + if: | + (matrix.job == 'build' && matrix.profile == 'release') && + ((github.ref == 'refs/heads/main' && !startsWith(github.ref, 'refs/tags/'))) + with: + name: release + path: | + target/${{matrix.profile}}/webtransport.dll + target/${{matrix.profile}}/libwebtransport.so + target/${{matrix.profile}}/libwebtransport_aarch64.so + target/${{matrix.profile}}/libwebtransport.dylib + - name: Upload artifact (debug) + uses: actions/upload-artifact@master + if: | + matrix.job == 'build' && matrix.profile == 'debug' && !startsWith(github.ref, 'refs/tags/') + with: + name: debug + path: | + target/${{matrix.profile}}/webtransport.dll + target/${{matrix.profile}}/libwebtransport.so + target/${{matrix.profile}}/libwebtransport_aarch64.so + target/${{matrix.profile}}/libwebtransport.dylib + - name: Upload release to GitHub + uses: softprops/action-gh-release@59c3b4891632ff9a897f99a91d7bc557467a3a22 + if: | + (matrix.job == 'build' && matrix.profile == 'release') && + github.repository == 'hironichu/webtransport' && + github.ref == 'refs/heads/main' + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + files: | + target/${{matrix.profile}}/webtransport.dll + target/${{matrix.profile}}/libwebtransport.so + target/${{matrix.profile}}/libwebtransport_aarch64.so + target/${{matrix.profile}}/libwebtransport.dylib + draft: true + test: + needs: build + if: | + github.event_name == 'push' || !startsWith(github.event.pull_request.head.label, 'hironichu:') + strategy: + matrix: + os: ['ubuntu-latest', 'self-hosted', "macos-latest", "windows-latest"] + job: [test] + profile: [debug, release] + name: ${{ matrix.job }} ${{ matrix.profile }} ${{ matrix.os }} + runs-on: ${{ matrix.os }} + timeout-minutes: 5 + + steps: + - uses: actions/checkout@master + - name: Creating target structure + run: | + mkdir -p target + + - name: Download artifact + uses: actions/download-artifact@master + with: + path: target + + - name: Display structure of downloaded files + run: ls -R + - name: Install Deno from .land + if: matrix.os != 'self-hosted' + uses: denoland/setup-deno@v1 + with: + deno-version: v1.x + + - name: Install Deno from source + if: matrix.os == 'self-hosted' + run: | + echo "Check if Deno is already installed" + if ! type deno > /dev/null; then + echo "Deno is not installed, installing..." + curl -s https://gist.githubusercontent.com/LukeChannings/09d53f5c364391042186518c8598b85e/raw/ac8cd8c675b985edd4b3e16df63ffef14d1f0e24/deno_install.sh | sh + else + echo "Deno is already installed" + fi + - name: Run deno test (debug) + if: | + matrix.job == 'test' && matrix.profile == 'debug' && !startsWith(github.ref, 'refs/tags/') + env: + BUILD_TARGET: debug + CI_BUILD: true + run: | + deno task test + + - name: Run deno test (release) + if: | + (matrix.job == 'test' && matrix.profile == 'release') && + ((github.ref == 'refs/heads/main' && !startsWith(github.ref, 'refs/tags/'))) + env: + BUILD_TARGET: release + CI_BUILD: true + run: | + deno task test \ No newline at end of file diff --git a/.gitignore b/.gitignore index 7a51535..fee9251 100644 --- a/.gitignore +++ b/.gitignore @@ -2,11 +2,12 @@ # will have compiled files and executables debug/ target/ - +certs/ +dist/ # Remove Cargo.lock from gitignore if creating an executable, leave it for libraries # More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html Cargo.lock - +*.lock # These are backup files generated by rustfmt **/*.rs.bk @@ -18,3 +19,9 @@ Cargo.lock /target /Cargo.lock + + +.env +*.env + +.VSCodeCounter/ \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..bfac3ff --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,38 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "request": "launch", + "name": "server-uni-recv", + "type": "node", + "program": "${workspaceFolder}/examples/deno/wt_server_uni_recv.ts", + "cwd": "${workspaceFolder}", + "runtimeExecutable": "C:\\Users\\zenze\\.deno\\bin\\deno.EXE", + "runtimeArgs": [ + "run", + "--unstable", + "--inspect-wait", + "--allow-all" + ], + "attachSimplePort": 9229 + }, + { + "request": "launch", + "name": "client-uni-send", + "type": "node", + "program": "${workspaceFolder}/examples/deno/wt_client_uni_send.ts", + "cwd": "${workspaceFolder}", + "runtimeExecutable": "C:\\Users\\zenze\\.deno\\bin\\deno.EXE", + "runtimeArgs": [ + "run", + "--unstable", + "--inspect-wait", + "--allow-all" + ], + "attachSimplePort": 9229 + } + ] +} \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json index 2c7ddc5..712aeed 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,5 +1,9 @@ { - "deno.enable": true, - "deno.lint": true, - "deno.unstable": true -} \ No newline at end of file + "deno.enable": true, + "deno.lint": true, + "deno.unstable": true, + "editor.formatOnSave": true, + "rust-analyzer.linkedProjects": [ + ".\\Cargo.toml" + ] +} diff --git a/Cargo.toml b/Cargo.toml index 227b5a8..48e5192 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,19 +1,54 @@ [package] -name = "FTL-2" +name = "Webtransport" +description = "Deno WebTransport FFI library for Deno" version = "0.1.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [lib] -name = "ftlt" +name = "webtransport" crate-type = ["cdylib"] +test = false +bench = false + +[profile.release] +strip = true +opt-level = 3 +debug = false +debug-assertions = false +overflow-checks = true +lto = "fat" +panic = "unwind" +incremental = false +codegen-units = 16 +rpath = false + +[profile.dev] +opt-level = 0 +lto = false +debug = true +debug-assertions = true +overflow-checks = true +panic = "abort" +incremental = true +codegen-units = 1 [dependencies] -flume = "0.11.0" -futures-util = "0.3.28" -num_cpus = "1.16.0" -once_cell = "1.18.0" -smol = "1.3.0" -tokio = { version = "1.28.1", features = ["rt", "rt-multi-thread", "macros"] } -wtransport = "0.1.4" -serde = { version = "1", features = ["derive"] } \ No newline at end of file +once_cell = "=1.18.0" +tokio = { version = "=1.32.0", default-features = false, features = [ + "rt", + "rt-multi-thread", + "macros", +] } +# wtransport = "0.1.4" # TODO: Replace this once the fix for arm is merged. +wtransport = { git = "https://github.com/hironichu/wtransport", branch = "master", features = [ + "dangerous-configuration", + "quinn", +] } +wtransport-proto = { git = "https://github.com/hironichu/wtransport", branch = "master" } +rcgen = "=0.11.2" +ring = "=0.16.20" +time = "=0.3.29" +anyhow = "=1.0.75" +serde = { version = "=1.0", features = ["derive"] } +serde_json = "1.0.107" diff --git a/README.md b/README.md new file mode 100644 index 0000000..f0cedb6 --- /dev/null +++ b/README.md @@ -0,0 +1,73 @@ +# Deno Webtransport + +This library implements a very WIP version of the +[WebTransport](https://w3c.github.io/webtransport/) API for Deno. + +> This does not follow perfectly the web standards and is not production ready. +> basically just a PoC + +## Usage + +### Client example : + +This client follows the standards of the API, but the server does not (since it +has not been defined). + +```ts +import "https://deno.land/x/webtransport/mod.ts"; + +//Client +const transport = new WebTransport("https://localhost:4433"); +/** + * Please note that this example above will not work if you have self validated certificates + * use the following to disable certificate verification.. (Warning this is not secure) + * const transport = new WebTransport("https://localhost:4433", { + * maxTimeout: 10, + * keepAlive: 3, + * validateCertificate: false, + * }); + */ +await transport.ready; +// Send Datagram packet + +const encoder = new TextEncoder(); +const data = encoder.encode("Hello World"); +const writer = await transport.datagrams.writable.getWriter(); +await writer.write(data); +``` + +### Server example : + +This server tries to be as close to the client api but has some differences. + +```ts +import "https://deno.land/x/webtransport/mod.ts"; + +//Client +const transport = new WebTransportServer(4433, { + keyFile: "./certs/key.pem", + certFile: "./certs/cert.pem", + maxTimeout: 10, + keepAlive: 3, +}); + +transport.on("connection", async (conn) => { + //To get the datagrams from the client you can just look over the datagram stream + for await (const datagram of conn.datagrams.readable) { + const decoder = new TextDecoder(); + console.log(decoder.decode(datagram)); + } + //The server can also send datagrams to the client (using the same API for consistency) + const encoder = new TextEncoder(); + const data = encoder.encode("Hello World"); + const writer = await conn.datagrams.writable.getWriter(); + await writer.write(data); +}); +``` + +# IMPORTANT + +The module add declaration to the global namespace, so you can use the API +without importing the classes + +This might not be practical for some people, but for now wont be changed. diff --git a/bindings.json b/bindings.json deleted file mode 100644 index e69de29..0000000 diff --git a/cert.crt b/cert.crt deleted file mode 100644 index 96457eb..0000000 --- a/cert.crt +++ /dev/null @@ -1,9 +0,0 @@ ------BEGIN CERTIFICATE----- -MIIBNjCB3KADAgECAggr3hcUukuKrDAKBggqhkjOPQQDAjAUMRIwEAYDVQQDDAls -b2NhbGhvc3QwHhcNMjMwOTEyMTgwMTA0WhcNMjMwOTE2MTgwMTA0WjAUMRIwEAYD -VQQDDAlsb2NhbGhvc3QwWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAASOlej65PFh -PThJi0R8GKRVqBNbITNkFWppDi8+ROQ/rJCNJjb4jytT960lxpRElIrxGkesv8aZ -yIIhSDKdeu77oxgwFjAUBgNVHREEDTALgglsb2NhbGhvc3QwCgYIKoZIzj0EAwID -SQAwRgIhAL5P+Ks84VtQBjUubta6y1EPqnVczMeIgl1+oUF/vqkpAiEAm1YFhD9r -SezQf4HsVgnDNGcGzifzJu9vjxldFryzTVw= ------END CERTIFICATE----- diff --git a/cert.key b/cert.key deleted file mode 100644 index 69ee83d..0000000 --- a/cert.key +++ /dev/null @@ -1,5 +0,0 @@ ------BEGIN PRIVATE KEY----- -MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgsn0KihQyihVxJImg -kTZbpYhkDgNbx67KTym2gXcqIoShRANCAASOlej65PFhPThJi0R8GKRVqBNbITNk -FWppDi8+ROQ/rJCNJjb4jytT960lxpRElIrxGkesv8aZyIIhSDKdeu77 ------END PRIVATE KEY----- diff --git a/cert.pem b/cert.pem deleted file mode 100644 index 84dbed5..0000000 --- a/cert.pem +++ /dev/null @@ -1,9 +0,0 @@ ------BEGIN CERTIFICATE----- -MIIBNTCB3KADAgECAggikfe7vbxE3DAKBggqhkjOPQQDAjAUMRIwEAYDVQQDDAls -b2NhbGhvc3QwHhcNMjMwOTEyMTMwMjU2WhcNMjMwOTE2MTMwMjU2WjAUMRIwEAYD -VQQDDAlsb2NhbGhvc3QwWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAASXNjGpiW3z -8G5HaaQ0Qn8uZUbckzpyAHqnnJtpPAFDxgrTAVTLOwmLQhRafzdUPLzuIMiFzzkn -ydayU/P8HNWmoxgwFjAUBgNVHREEDTALgglsb2NhbGhvc3QwCgYIKoZIzj0EAwID -SAAwRQIhAI4TlJ3uuoEsf3QlUgWIz3i8WZsVkHwm0GF1sbSfiwXdAiAdL3CiopNt -6+Z9/dbAk1Dxh4iRrUbmKiC/Q4JYTOhTpQ== ------END CERTIFICATE----- diff --git a/certificate.key b/certificate.key deleted file mode 100644 index 5d333c6..0000000 --- a/certificate.key +++ /dev/null @@ -1,28 +0,0 @@ ------BEGIN PRIVATE KEY----- -MIIEvAIBADANBgkqhkiG9w0BAQEFAASCBKYwggSiAgEAAoIBAQCVyXZN1oL+6D1j -VuzjSyExD5P6SOrDwC3hviZmhjP9jeOwG9gb7bi+lza40SxvtDMNB0QzZQFPjMKT -Jca0idpLV3+v2hhficwbwfq2T5EialOfiQBOglXUsQBnuK2VbGOOoedO115BOLdo -Z0jd0HzWtgH1nkd8scX8YsGxqc+dGnv2nzkTSnr25hL6Go+kPiPH87HFDgUoQnO4 -bPAtHMljJVQv5+vuC7DMEbNC3VprQp4DCbXjc2OPFO1RIrlMrOQufDOGtKcPaQSu -yJBh4BXs0PL6sMEuRKlinEkeGYSDRe0jG084KBz82NpcHcl2GdChCGzeUCB5utxt -VIdf0kPvAgMBAAECggEAP1Z71KlHJkQNJVMOA4Ty6ZyFPdoUj6bVn/X4pfTMykR0 -CbWUeibZLpqhlKA60pdX2QQAFl15IurKIk/giNob+SzsPO+Ty95odTpe6jWWEP/2 -EWlsvEYHxlL8cV+Z4yv8o0UaGvAeFqVFhPvbx5QQHfjDtllyMCu6JSGbdyVvuaXh -8p5Tveu9rk0icz6c5XRQE/W+zIWcnB1hojGXcadalYLMfPnPAMQ0cojyhiFz5u0h -CgZgEKLLa2bOTerSSFX8CLu9VZXYQSmzPw08FN0r9wukBKJBaFz5mCAWhRDLUwLN -tzYK9jTrRp7BI/QJv+uBFh6/Wvy40WWkJuj+NOpivQKBgQDPOmmw/BiYNI0bq/2v -JSyKpuoovtBBEXnPRW3plYpCDUf//mt34/TfRUK15gJFNDuqObHSVPn8N/skfFHf -kXsdZYPNLMNgWfWkvswNNOzZbD3zLvTz51Mi/JQ9nBdvzhPLvlkVHieOarZm6+ai -qUihGJpSp+KLXpYpv5QVPt3FzQKBgQC5CjD5XZ+RmwKsnWBaPCFgOOUv5YDvzxlm -l8KcIA6IkLW44uvbCgb0kVATr1bp/1i5jPF/Tw8EX/ZUCXJOKp4BtwmSdCqr6/Lv -Ov8qwYXB2LguLjIglhfd0sRNsdHRyqiQpKdOgo4/6n9ibgdxCODcrJVZWaVnMYph -r8cwMge0qwKBgE7I0m3rKh6TvCINHYF9DJYaJ2QeR2a1ki3vI35u0AWUrw7wV0NZ -czt/RYGKVMqitRxemvBwRipRzjVs3mO1F61xbs6OeikjinR75XAP8wwmTtcpvw8L -n1vp0yIpOe/T1Urcr1mMAVXv1DEE9sZYvbghdmp+UW7TIxv0LgR1xjLxAoGABJrj -UyFoWjhQXblg7331mq5vzbxZdB5kVHBMcJQ/qFhpnVtQYVgjiiyfoI3JjPNE4wFF -9VQjXImC2N6PULCw0/wZKeLoOPUbS6bdONQuHv/kvYl6x+LUXznilshpH156yXa/ -jy2imqacWvfACakd54AIC3w2qJXMCthUDbgBBxcCgYAPJ57OJLtbzcK0wE6vY5n3 -+s8TG8yf6Oe6ikDFyGFe/Elfh2Jqpi9MFTUGOUFkNygEqUlLuYAcyRhUA3NkjvGe -JmERaLswAVS01JcdA2yYfDlRgxJftijGW56AvyVmko6ZPe8dtY5FbfgIAxAtWrrm -ss9N3MS0jII1FxerNEB6Qg== ------END PRIVATE KEY----- diff --git a/certificate.pem b/certificate.pem deleted file mode 100644 index a5a264f..0000000 --- a/certificate.pem +++ /dev/null @@ -1,20 +0,0 @@ ------BEGIN CERTIFICATE----- -MIIDLTCCAhWgAwIBAgIUchaBMPWLkCvnsAtp9qohJfGWr7kwDQYJKoZIhvcNAQEL -BQAwGzEZMBcGA1UEAwwQVGVzdCBDZXJ0aWZpY2F0ZTAeFw0yMzA5MTQxNDE3MTZa -Fw0yMzEwMTQxNDE3MTZaMBsxGTAXBgNVBAMMEFRlc3QgQ2VydGlmaWNhdGUwggEi -MA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCVyXZN1oL+6D1jVuzjSyExD5P6 -SOrDwC3hviZmhjP9jeOwG9gb7bi+lza40SxvtDMNB0QzZQFPjMKTJca0idpLV3+v -2hhficwbwfq2T5EialOfiQBOglXUsQBnuK2VbGOOoedO115BOLdoZ0jd0HzWtgH1 -nkd8scX8YsGxqc+dGnv2nzkTSnr25hL6Go+kPiPH87HFDgUoQnO4bPAtHMljJVQv -5+vuC7DMEbNC3VprQp4DCbXjc2OPFO1RIrlMrOQufDOGtKcPaQSuyJBh4BXs0PL6 -sMEuRKlinEkeGYSDRe0jG084KBz82NpcHcl2GdChCGzeUCB5utxtVIdf0kPvAgMB -AAGjaTBnMB0GA1UdDgQWBBTUe5gB7JjjNnvY/1qhvcm3DlDGUTAfBgNVHSMEGDAW -gBTUe5gB7JjjNnvY/1qhvcm3DlDGUTAPBgNVHRMBAf8EBTADAQH/MBQGA1UdEQQN -MAuCCWxvY2FsaG9zdDANBgkqhkiG9w0BAQsFAAOCAQEAOnkFLGRpJMx2FkQSt0Pi -PzXdPUx5PiR3gutu9TpJv18hhtNeuyto/oGefrcp8lX4CHwogc6ZXQISxpSoJiKQ -4b04qydfFEbWkPvGkgjN/p0TgRftNglGgT78sc2xKgVXZ2BVg8uq4U8gfsOxHtpM -eA4WGwXGeB2+K+RddkfuLkff9wYhsrH2n5VavdHVfFHaZhF1nivSM+soEaz9TZ1o -ht3JuCxQ8Q8vXR851/Uvlxjli7TX3fDZO2dMGtttpT8yKBjjcQ62JiHDb6dGh/dW -AtlJy/f7f8XCPK8CfK/fAlEIh+P6YByO88J1pBNbbJSOfP4ncB7XqbCbYq8PMPdp -Cw== ------END CERTIFICATE----- diff --git a/deno.jsonc b/deno.jsonc new file mode 100644 index 0000000..d092f0e --- /dev/null +++ b/deno.jsonc @@ -0,0 +1,50 @@ +{ + "test": { + }, + "tasks": { + //Examples + "demo:web": "deno run -A ./examples/web_server/web.js", + "demo:server": "deno run -A --unstable ./examples/deno/wt_server.ts", + "demo:server-uni-recv": "deno run -A --unstable ./examples/deno/wt_server_uni_recv.ts", + "demo:server-uni-send": "deno run -A --unstable ./examples/deno/wt_server_uni_send.ts", + "demo:server-bidi-send": "deno run -A --unstable ./examples/deno/wt_server_bidi_send.ts", + "demo:server-bidi-recv": "deno run -A --unstable ./examples/deno/wt_server_bidi_recv.ts", + "demo:client": "deno run -A --unstable ./examples/deno/wt_client.ts", + "demo:bidi": "deno run -A --unstable ./examples/deno/wt_client_bidi.ts", + "demo:uni-send": "deno run -A --unstable ./examples/deno/wt_client_uni_send.ts", + "demo:uni-recv": "deno run -A --unstable ./examples/deno/wt_client_uni_recv.ts", + "demo:gencert": "deno run -A --unstable ./examples/deno/wt_gencert.ts", + // CI Build task + "build:release": "cargo clean && cargo build --release", + "build:debug": "cargo clean && cargo build", + //Utils + "util:fmt": "deno fmt --unstable", + "util:lint": "deno lint --unstable", + //Tests + "test": "deno task demo:gencert && deno test -A --unstable ", + //Non CI build + "build": "cargo build" + }, + "compilerOptions": { + "checkJs": true, + "strict": true + }, + "fmt": { + "exclude": [ + "./target", + "./.git", + "./.github", + "./.vscode" + ], + "lineWidth": 80, + "indentWidth": 4 + }, + "lint": { + "exclude": [ + "./target", + "./.git", + "./.github", + "./.vscode" + ] + } +} diff --git a/deno.ts b/deno.ts deleted file mode 100644 index b55add4..0000000 --- a/deno.ts +++ /dev/null @@ -1,100 +0,0 @@ -const lib = Deno.dlopen("./target/release/ftlt.dll", { - start: { - parameters: ["function", "buffer", "pointer"], - result: "pointer", - callback: true, - }, - handle_session: { - parameters: ["pointer", "pointer"], - result: "pointer", - nonblocking: true, - }, - init_runtime: { - parameters: [], - result: "pointer", - }, - proc_rec: { - parameters: ["pointer"], - result: "pointer", - nonblocking: true, - }, - proc_rec_streams: { - parameters: ["pointer", "pointer", "pointer"], - result: "void", - nonblocking: true, - }, - proc_recv_ch_datagram: { - parameters: ["pointer", "pointer", "buffer"], - result: "usize", - nonblocking: true, - }, - test_proc: { - parameters: ["pointer"], - result: "void", - nonblocking: true, - }, -}) - -const ptrstate = new Uint32Array(1); - -const sender = new Deno.UnsafeCallback( - { - parameters: ["u32", "pointer", "u32"], - result: "void", - }, - (_code: unknown | number, buffer, buflen) => { - const code = _code as typeof ptrstate[0]; - if (buflen < 0) { - return; - } - const pointer = Deno.UnsafePointerView.getArrayBuffer( - buffer as unknown as NonNullable, - buflen, - ); - }, - ); - const runtime = lib.symbols.init_runtime(); - const resptr = lib.symbols.start(sender.pointer, ptrstate, runtime); - await lib.symbols.handle_session(resptr, runtime); - - -Promise.all([(async () => { - let client = await lib.symbols.proc_rec(resptr); - - while(client !== null) { - console.log("New connection"); - await lib.symbols.proc_rec_streams(resptr, runtime, client) - // //start a new thread to handle the connection - // lib.symbols.proc_rec_streams(resptr, client); - // // - // console.log("Connection handled"); - - // // let mut buffer = vec![0; 65536].into_boxed_slice(); - Promise.all([(async () => { - let buffer = new Uint8Array(65536); - let res = await lib.symbols.proc_recv_ch_datagram(resptr, client, buffer); - while (res > 0) { - const ress = buffer.subarray(0, res as number); - console.log(ress); - buffer = buffer.fill(0); - res = await lib.symbols.proc_recv_ch_datagram(resptr, client, buffer); - } - })()]); - - // // const buffview = new Deno.UnsafePointerView(res!); - // // console.log(buffview.getBigInt64(0)); - client = await lib.symbols.proc_rec(resptr); - } -})()]); - -// setInterval(() => { -// console.log(ptrstate[0]); -// }, 5000); - -Deno.serve((_req: Request) => { - return new Response(Deno.readTextFileSync("./index.html"), { - headers: { - "content-type": "text/html" - } - }); -}) \ No newline at end of file diff --git a/examples/deno/wt_client.ts b/examples/deno/wt_client.ts new file mode 100644 index 0000000..977a71f --- /dev/null +++ b/examples/deno/wt_client.ts @@ -0,0 +1,26 @@ +//TO BE IMPLEMENTED +import "../../mod/mod.ts"; + +//get the connect address from args 1 +const connectAddr = Deno.args[0] ?? "https://localhost:4433"; + +const client = new WebTransport(connectAddr, { + maxTimeout: 10, + keepAlive: 3, +}); + +// console.log(client); + +await client.ready; +console.log("Client connected"); +const writer = client.datagrams.writable.getWriter(); +writer?.write(new Uint8Array([1, 2, 3, 4, 5])); +writer?.write(new Uint8Array([1, 2, 3, 4, 5])); +writer?.write(new Uint8Array([1, 2, 3, 4, 5])); +writer?.write(new Uint8Array([1, 2, 3, 4, 5])); +writer?.write(new Uint8Array([1, 2, 3, 4, 5])); + +// // //await messages +for await (const read of client.datagrams.readable) { + console.log(read); +} diff --git a/examples/deno/wt_client_bidi.ts b/examples/deno/wt_client_bidi.ts new file mode 100644 index 0000000..b814480 --- /dev/null +++ b/examples/deno/wt_client_bidi.ts @@ -0,0 +1,26 @@ +import "../../mod/mod.ts"; + +//get the connect address from args 1 +const connectAddr = Deno.args[0] ?? "https://localhost:4433"; + +const transport = new WebTransport(connectAddr, { + maxTimeout: 10, + keepAlive: 3, +}); + +await transport.ready; +console.log("Client connected"); +const stream = await transport.createBidirectionalStream(); +const decoder = new TextDecoder(); +const stdin = Deno.stdin.readable; +//get the last oppened stream +await Promise.all([ + (async () => { + for await (const data of stream!.readable) { + console.log("Recevied : ", decoder.decode(data)); + } + })(), + (() => { + stdin.pipeTo(stream!.writable); + })(), +]); diff --git a/examples/deno/wt_client_test.ts b/examples/deno/wt_client_test.ts new file mode 100644 index 0000000..a5c961c --- /dev/null +++ b/examples/deno/wt_client_test.ts @@ -0,0 +1,60 @@ +import "../../mod/mod.ts"; +// import puppeteer from "https://deno.land/x/puppeteer@16.2.0/mod.ts"; +// import { getAnyEdgeLatest } from "npm:edge-paths"; + +// Deno.test( +// { name: "Browser test", ignore: Deno.build.os != "windows" }, +// async () => { +// const browser = await puppeteer.launch({ +// headless: true, +// defaultViewport: null, +// executablePath: getAnyEdgeLatest(), +// // no extension +// args: [ +// "--enable-automation", +// "--disable-gpu", +// "--disable-extensions", +// ], +// }); +// //TODO(hironichu): Setup valid testing context for client +// const page = await browser.newPage(); + +// await page.goto("https://google.com", { +// waitUntil: "networkidle2", +// }); + +// // await page.pdf({ path: "hn.pdf", format: "A4" }); + +// await browser.close(); +// }, +// ); +// Deno.test( +// { +// name: "Client connect/close (unsafe)", +// sanitizeOps: false, +// sanitizeResources: false, +// }, +// async () => { +// //THis causes panic??????? +// const server = new WebTransportServer("https://localhost:4433", { +// certFile: "./certs/localhost.crt", +// keyFile: "./certs/localhost.key", +// maxTimeout: 10, +// keepAlive: 3, +// }); +// await server.ready; +// server.on("connection", (client) => { +// setTimeout(async () => { +// client.close(); +// await server.close(); +// }, 2000); +// }); +// const client = new WebTransport("https://localhost:4433", { +// maxTimeout: 50, +// keepAlive: 3, +// }); +// await client.ready; + +// console.log("Closing"); +// }, +// ); diff --git a/examples/deno/wt_client_uni_recv.ts b/examples/deno/wt_client_uni_recv.ts new file mode 100644 index 0000000..0a23952 --- /dev/null +++ b/examples/deno/wt_client_uni_recv.ts @@ -0,0 +1,34 @@ +import "../../mod/mod.ts"; + +//get the connect address from args 1 +const connectAddr = Deno.args[0] ?? "https://localhost:4433"; + +const client = new WebTransport(connectAddr, { + maxTimeout: 10, + keepAlive: 3, +}); + +const transport = await client.ready; +console.log("Client connected"); + +const currentTime = performance.now(); +console.log("Waiting for a unidirectional stream to open"); + +// const uds = transport.incomingUnidirectionalStreams; +const streams = transport.incomingUnidirectionalStreams; + +for await (const stream of streams.values()) { + const buff = new Uint8Array(100); + const reader = stream.getReader({ mode: "byob" }); + const first = await reader.read(buff); + console.log(first.value); + + // ({ value, done } = await reader.read(buff)); + // console.log(value); + // ({ value, done } = await reader.read(buff)); + // console.log(value); + // } +} + +//stream should close after 20 messages +console.log("Stream closed after " + (performance.now() - currentTime)); diff --git a/examples/deno/wt_client_uni_send.ts b/examples/deno/wt_client_uni_send.ts new file mode 100644 index 0000000..e0d0375 --- /dev/null +++ b/examples/deno/wt_client_uni_send.ts @@ -0,0 +1,28 @@ +import "../../mod/mod.ts"; + +//get the connect address from args 1 +const connectAddr = Deno.args[0] ?? "https://localhost:4433"; + +const client = new WebTransport(connectAddr, { + maxTimeout: 10, + keepAlive: 3, +}); + +const transport = await client.ready; +console.log("Client connected"); + +const _currentTime = performance.now(); +console.log("Waiting for a unidirectional stream to open"); + +// const uds = transport.incomingUnidirectionalStreams; +const stream = await transport.createUnidirectionalStream(); +console.log("created stream"); +const writer = stream.getWriter(); +await writer.write( + new TextEncoder().encode("Hello from client"), +); +console.log("Stream opened"); + +Deno.serve({ port: 9999 }, (req) => { + return new Response("Hello " + req.url); +}); diff --git a/examples/deno/wt_gencert.ts b/examples/deno/wt_gencert.ts new file mode 100644 index 0000000..6849f8a --- /dev/null +++ b/examples/deno/wt_gencert.ts @@ -0,0 +1,11 @@ +import "../../mod/mod.ts"; +import { GenerateCertKeyFile } from "../../mod/crypto.ts"; + +const hostname = Deno.args[0] ?? "localhost"; + +if (typeof hostname !== "string" || hostname.length == 0) { + console.error("Invalid hostname"); + Deno.exit(1); +} + +GenerateCertKeyFile(hostname, 0, 10); diff --git a/examples/deno/wt_server.ts b/examples/deno/wt_server.ts new file mode 100644 index 0000000..fabf878 --- /dev/null +++ b/examples/deno/wt_server.ts @@ -0,0 +1,41 @@ +//TO BE IMPLEMENTED +import "../../mod/mod.ts"; + +//get cert path from args 1 and 2 (cert and key) or use default +const certFile = Deno.args[0] ?? "./certs/localhost.crt"; +const keyFile = Deno.args[1] ?? "./certs/localhost.key"; +//check if certFile and keyFile are valid non-empty strings +if (typeof certFile !== "string" || typeof keyFile !== "string") { + console.error("Invalid certFile or keyFile"); + Deno.exit(1); +} +//check path +try { + Deno.statSync(certFile); + Deno.statSync(keyFile); +} catch { + console.error("Invalid certFile or keyFile"); + Deno.exit(1); +} + +const server = new WebTransportServer("https://localhost:4433", { + certFile: "./certs/localhost.crt", + keyFile: "./certs/localhost.key", + maxTimeout: 10, + keepAlive: 3, +}); + +await server.ready; + +console.log("Server listening"); +server.on("connection", async (conn) => { + console.log("New client"); + const bidiStream = conn.datagrams; + const writer = bidiStream.writable.getWriter(); + writer.write(new Uint8Array([1, 2, 3, 4, 5])); + + //read incoming datagrams + for await (const read of bidiStream.readable) { + console.log(read); + } +}); diff --git a/examples/deno/wt_server_bidi_recv.ts b/examples/deno/wt_server_bidi_recv.ts new file mode 100644 index 0000000..c95f984 --- /dev/null +++ b/examples/deno/wt_server_bidi_recv.ts @@ -0,0 +1,57 @@ +//TO BE IMPLEMENTED +import "../../mod/mod.ts"; + +//get cert path from args 1 and 2 (cert and key) or use default +const certFile = Deno.args[0] ?? "./certs/localhost.crt"; +const keyFile = Deno.args[1] ?? "./certs/localhost.key"; +//check if certFile and keyFile are valid non-empty strings +if (typeof certFile !== "string" || typeof keyFile !== "string") { + console.error("Invalid certFile or keyFile"); + Deno.exit(1); +} +//check path +try { + Deno.statSync(certFile); + Deno.statSync(keyFile); +} catch { + console.error("Invalid certFile or keyFile"); + Deno.exit(1); +} + +const server = new WebTransportServer("https://localhost:4433", { + certFile: "./certs/localhost.crt", + keyFile: "./certs/localhost.key", + maxTimeout: 10, + keepAlive: 3, +}); + +await server.ready; +console.log("Server listening"); +const decodeer = new TextDecoder(); +const stdin = Deno.stdin.readable; +server.on("connection", async (conn) => { + console.log("New connection"); + const bidiStream = conn.incomingBidirectionalStreams; + const BIDireader = bidiStream.getReader(); + const { value: firststream } = await BIDireader.read(); + + console.log("READING STD IN"); + await Promise.all([ + (async () => { + for await (const data of firststream!.readable) { + const decoded = decodeer.decode(data).trim().replaceAll( + "\t", + " ", + ); + if (decoded === "exit") { + await server.close(); + return; + } + console.log("Recevied : ", decoded); + } + })(), + (() => { + stdin.pipeTo(firststream!.writable); + })(), + ]); +}); diff --git a/examples/deno/wt_server_bidi_send.ts b/examples/deno/wt_server_bidi_send.ts new file mode 100644 index 0000000..64ed7a4 --- /dev/null +++ b/examples/deno/wt_server_bidi_send.ts @@ -0,0 +1,53 @@ +//TO BE IMPLEMENTED +import "../../mod/mod.ts"; + +//get cert path from args 1 and 2 (cert and key) or use default +const certFile = Deno.args[0] ?? "./certs/localhost.crt"; +const keyFile = Deno.args[1] ?? "./certs/localhost.key"; +//check if certFile and keyFile are valid non-empty strings +if (typeof certFile !== "string" || typeof keyFile !== "string") { + console.error("Invalid certFile or keyFile"); + Deno.exit(1); +} +//check path +try { + Deno.statSync(certFile); + Deno.statSync(keyFile); +} catch { + console.error("Invalid certFile or keyFile"); + Deno.exit(1); +} + +const server = new WebTransportServer("https://localhost:4433", { + certFile: "./certs/localhost.crt", + keyFile: "./certs/localhost.key", + maxTimeout: 10, + keepAlive: 3, +}); + +await server.ready; +console.log("Server listening"); +const decoder = new TextDecoder(); +const stdin = Deno.stdin.readable; +server.on("connection", async (conn) => { + console.log("New connection"); + const bidiStream = await conn.createBidirectionalStream(); + await Promise.all([ + (async () => { + for await (const data of bidiStream!.readable) { + const decoded = decoder.decode(data).trim().replaceAll( + "\t", + " ", + ); + if (decoded.trim() === "exit") { + console.log("Closing server"); + await server.close(); + } + console.log("Recevied : ", decoded); + } + })(), + (() => { + stdin.pipeTo(bidiStream!.writable); + })(), + ]); +}); diff --git a/examples/deno/wt_server_test.ts b/examples/deno/wt_server_test.ts new file mode 100644 index 0000000..73e1be2 --- /dev/null +++ b/examples/deno/wt_server_test.ts @@ -0,0 +1,25 @@ +// import { GenerateCertKeyFile } from "../../mod/crypto.ts"; +import "../../mod/mod.ts"; +// import { assert } from "https://deno.land/std@0.202.0/assert/mod.ts"; + +// import { WebTransportServer } from "../../mod/server.ts"; +// //add certs cleanup methods after tests +// const certPath = join(Deno.cwd(), "./certs/"); +async function _sleep(msec: number) { + await new Promise((res, _rej) => setTimeout(res, msec)); +} + +// Deno.test({ name: "Server startup/close" }, async () => { +// //generate a certificate + +// const server = new WebTransportServer("https://localhost:4433", { +// certFile: "./certs/localhost.crt", +// keyFile: "./certs/localhost.key", +// maxTimeout: 10, +// keepAlive: 3, +// }); +// await server.ready; +// await server.close().catch((e) => { +// console.log(e); +// }); +// }); diff --git a/examples/deno/wt_server_uni_recv.ts b/examples/deno/wt_server_uni_recv.ts new file mode 100644 index 0000000..1124dc3 --- /dev/null +++ b/examples/deno/wt_server_uni_recv.ts @@ -0,0 +1,50 @@ +//TO BE IMPLEMENTED +import "../../mod/mod.ts"; + +//get cert path from args 1 and 2 (cert and key) or use default +const certFile = Deno.args[0] ?? "./certs/localhost.crt"; +const keyFile = Deno.args[1] ?? "./certs/localhost.key"; +//check if certFile and keyFile are valid non-empty strings +if (typeof certFile !== "string" || typeof keyFile !== "string") { + console.error("Invalid certFile or keyFile"); + Deno.exit(1); +} +//check path +try { + Deno.statSync(certFile); + Deno.statSync(keyFile); +} catch { + console.error("Invalid certFile or keyFile"); + Deno.exit(1); +} +Deno.serve(() => new Response("Welcome to Deno 🦕")); +const client = new WebTransport("https://localhost:4433", { + maxTimeout: 5, + keepAlive: 0, +}); +const server = new WebTransportServer("https://localhost:4433", { + certFile: "./certs/localhost.crt", + keyFile: "./certs/localhost.key", + maxTimeout: 10, + keepAlive: 0, +}); +server.on("listening", () => { + console.log("Server listening"); +}); + +server.on("connection", async (transport) => { + console.log("New client"); + + const streams = transport.incomingUnidirectionalStreams; + const reader = streams.getReader(); + const firststream = await reader.read(); + const _incoming = firststream.value!; + // for await (const data of incoming) { + // console.log(data); + // } +}); + +await server.ready; +//after 5 seconds call closed on client + +await client.ready; diff --git a/examples/deno/wt_server_uni_send.ts b/examples/deno/wt_server_uni_send.ts new file mode 100644 index 0000000..e31b785 --- /dev/null +++ b/examples/deno/wt_server_uni_send.ts @@ -0,0 +1,52 @@ +//TO BE IMPLEMENTED +import "../../mod/mod.ts"; + +//get cert path from args 1 and 2 (cert and key) or use default +const certFile = Deno.args[0] ?? "./certs/localhost.crt"; +const keyFile = Deno.args[1] ?? "./certs/localhost.key"; +//check if certFile and keyFile are valid non-empty strings +if (typeof certFile !== "string" || typeof keyFile !== "string") { + console.error("Invalid certFile or keyFile"); + Deno.exit(1); +} +//check path +try { + Deno.statSync(certFile); + Deno.statSync(keyFile); +} catch { + console.error("Invalid certFile or keyFile"); + Deno.exit(1); +} + +const server = new WebTransportServer("https://localhost:4433", { + certFile: "./certs/localhost.crt", + keyFile: "./certs/localhost.key", + maxTimeout: 10, + keepAlive: 3, +}); + +await server.ready; +console.log("Server listening"); +server.on("connection", async (conn) => { + console.log("New client"); + const sendStream = await conn.createUnidirectionalStream(); + console.log("Stream open"); + const writer = sendStream.getWriter(); + // //wait 5 seeconds and send 2 message every 2 secondes + // await new Promise((resolve) => setTimeout(resolve, 2000)); + // console.log("starting sends"); + await writer.write(new TextEncoder().encode("Hello from server")); + await writer.write(new TextEncoder().encode("Hello from server 2")); + await writer.write(new TextEncoder().encode("Hello from server 3")); + // let sent = 0; + // const inter = setInterval(() => { + // writer.write(new Uint8Array([1, 2, 3, 4, 5])); + // writer.write(new Uint8Array([1, 2, 3, 4, 5])); + // if (sent === 10) { + // clearInterval(inter); + // //close the stream (this will also finish the stream on rust side) + // writer.close(); + // } + // sent++; + // }, 2000); +}); diff --git a/examples/web_server/index.html b/examples/web_server/index.html new file mode 100644 index 0000000..8809e12 --- /dev/null +++ b/examples/web_server/index.html @@ -0,0 +1,67 @@ + + + + + + + DEMO WEBTRANSPORT + + + + +

DEMO WEBTRANSPORT

+

Server IP : HOST_IP

+ + + + + \ No newline at end of file diff --git a/examples/web_server/web.js b/examples/web_server/web.js new file mode 100644 index 0000000..e1daa0e --- /dev/null +++ b/examples/web_server/web.js @@ -0,0 +1,23 @@ +import { readCertFile } from "../../mod/crypto.ts"; + +//grab the first argument as the IP to connect to (defaults to localhost) +const ip = Deno.args[0] || "localhost"; + +const certFile = readCertFile("./certs/localhost.crt"); +const crtdata = new Uint8Array(await crypto.subtle.digest("SHA-256", certFile)); +const certHash = Array.from(crtdata); + +let indexHTML = Deno.readTextFileSync("./examples/web_server/index.html"); + +indexHTML = indexHTML.replace("CERT_HASH", "[" + certHash + "]").replaceAll( + "HOST_IP", + ip, +); + +Deno.serve((_) => { + return new Response(indexHTML, { + headers: { + "content-type": "text/html", + }, + }); +}); diff --git a/fingerprint.hex b/fingerprint.hex deleted file mode 100644 index 0ce454c..0000000 --- a/fingerprint.hex +++ /dev/null @@ -1 +0,0 @@ -SHA2-256(stdin)= 76055c1a369c1eb43d62225a087bea480d2e6c2f4c7a67d8d85f19f83a9ddf84 diff --git a/index.html b/index.html deleted file mode 100644 index 9386332..0000000 --- a/index.html +++ /dev/null @@ -1,160 +0,0 @@ - - - - - - - Document - - - - - - - \ No newline at end of file diff --git a/key.pem b/key.pem deleted file mode 100644 index 754bd72..0000000 --- a/key.pem +++ /dev/null @@ -1,5 +0,0 @@ ------BEGIN PRIVATE KEY----- -MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgf/I2d7oP44Q1UKSr -mAr3PDoq4ez61Yhsct0yiEbFBkOhRANCAASXNjGpiW3z8G5HaaQ0Qn8uZUbckzpy -AHqnnJtpPAFDxgrTAVTLOwmLQhRafzdUPLzuIMiFzzknydayU/P8HNWm ------END PRIVATE KEY----- diff --git a/mod/client.ts b/mod/client.ts new file mode 100644 index 0000000..2d76ac8 --- /dev/null +++ b/mod/client.ts @@ -0,0 +1,235 @@ +if (import.meta.main) { + throw new Error("This module is not meant to be imported."); +} +import { WebTransportSendStreamOptions } from "./connection.ts"; +import { + WebTransportBidirectionalStream, + WebTransportDatagramDuplexStream, + WebTransportReceiveStream, + WebTransportSendStream, +} from "./streams.ts"; +import { + type WebTransportOptions, + WebTransportOptions as ServerOpts, +} from "./interface.ts"; +import { encodeBuf } from "./utils.ts"; + +export class WebTransport { + #CLIENT_PTR: Deno.PointerValue | undefined; + + private conn?: Deno.PointerValue; + protected remote: URL; + private buffer!: Uint8Array; + constructor( + _client: URL | string, + _options: WebTransportOptions = ServerOpts, + ) { + /// if _client is string convert it to an URL + if (typeof _client === "string") { + _client = new URL(_client); + } + + this.#CLIENT_PTR = window.WTLIB.symbols.proc_client_init( + _options.keepAlive, + _options.maxTimeout, + ); + + if (!this.#CLIENT_PTR) { + throw new Error("Failed to initialize client"); + } + this.remote = _client; + } + get incomingBidirectionalStreams() { + const pointer = this.conn; + const errorPTR = this.error.pointer; + return new ReadableStream< + WebTransportBidirectionalStream + >({ + async start(controller) { + try { + if ( + (!pointer || pointer === null) || + (!errorPTR || errorPTR === null) + ) { + controller.close(); + return; + } + const stream = await window.WTLIB.symbols + .proc_accept_bi( + pointer, + errorPTR, + ); + if (!stream || stream === null) { + console.error("[incoming BIDI] Stream not accepted"); + controller.close(); + return; + } + controller.enqueue( + new WebTransportBidirectionalStream( + stream, + errorPTR, + ), + ); + } catch (e) { + controller.error(e); + } + }, + cancel() { + console.info("[incoming BIDI] Cancelled"); + }, + }); + } + get incomingUnidirectionalStreams() { + const pointer = this.conn!; + const errorPTR = this.error.pointer; + return new ReadableStream< + ReadableStream + >({ + async start(controller) { + if ( + (!pointer || pointer === null) || + (!errorPTR || errorPTR === null) + ) { + controller.close(); + return; + } + try { + const stream = await window.WTLIB.symbols + .proc_accept_uni( + pointer, + errorPTR, + ); + if (!stream || stream === null) { + console.error("[incoming UNI] Stream not accepted"); + controller.close(); + return; + } + + controller.enqueue(WebTransportReceiveStream.from( + stream, + undefined, + errorPTR, + )); + } catch (e) { + controller.error(e); + } + }, + cancel() { + console.info("[incoming UNI] Cancelled"); + }, + }); + } + get datagrams() { + return new WebTransportDatagramDuplexStream( + this.conn!, + this.buffer, + this.error.pointer, + ); + } + + public async createBidirectionalStream( + _options?: WebTransportSendStreamOptions, + ): Promise { + if (!this.conn) throw new Error("Connection is closed"); + if (!this.conn || this.conn === null) { + throw new Error("Connection is closed"); + } + const _streams = await window.WTLIB.symbols.proc_open_bi( + this.conn, + this.error.pointer, + ); + if (!_streams || _streams === null) { + throw new Error("Failed to create bi stream"); + } + return new WebTransportBidirectionalStream( + _streams, + this.error.pointer, + ); + } + + public async createUnidirectionalStream( + _options?: WebTransportSendStreamOptions, + ): Promise { + if (!this.conn || this.conn === null) { + throw new Error("Connection is closed"); + } + //The following operation block the thread until the stream is created. + const _streams = await window.WTLIB.symbols.proc_open_uni( + this.conn, + this.error.pointer, + ); + + if (!_streams || _streams === null) { + throw new Error("Failed to create uni stream"); + } + const stream = WebTransportSendStream.from( + _streams, + this.error.pointer, + ); + + return stream; + } + error = new Deno.UnsafeCallback({ + parameters: ["u32", "buffer", "u32"], + result: "void", + }, async (code, _pointer, _buflen) => { + if (code === 154) { + console.log("Timed out, closing connection"); + await this.closed; + } + }); + get closed() { + return new Promise(() => { + //close the datagrams + if (this.datagrams) { + this.datagrams.close(); + } + //close all the streams + + if (this.#CLIENT_PTR && this.conn) { + window.WTLIB.symbols.proc_client_close( + this.conn!, + ); + } + if (this.#CLIENT_PTR && !this.conn) { + window.WTLIB.symbols.free_client( + this.#CLIENT_PTR, + ); + } + // this.#NOTIFY_PTR.unref(); + // this.#CONNECTION_CB.unref(); + // this.#NOTIFY_PTR.close(); + // this.#CONNECTION_CB.close(); + }); + } + get ready() { + return new Promise( + (resolve, reject) => { + if (!this.#CLIENT_PTR) { + reject("Client is not initialized"); + return; + } + const encoded = encodeBuf(this.remote.href); + window.WTLIB.symbols.proc_client_connect( + this.#CLIENT_PTR, + encoded[0], + encoded[1], + ).then((conn) => { + if (!conn || conn === null) { + reject("Failed to connect"); + return; + } + this.buffer = new Uint8Array(1024); + this.conn = conn; + resolve(this); + }).catch((e) => { + reject(e); + }); + }, + ); + } +} + +export default WebTransport; + +// Path: mod/client.ts diff --git a/mod/code.ts b/mod/code.ts new file mode 100644 index 0000000..c8b4a6b --- /dev/null +++ b/mod/code.ts @@ -0,0 +1,61 @@ +// This needs to be implemented in some way... might not be the smartest way to do it though + +export const C_TYPES = { + "PROC_OK": 0, + "PROC_ERR": 1, + "PROC_CONN_CLOSED": 2, + "PROC_CONN_TIMEOUT": 3, + "PROC_CONN_H3_ERROR": 4, + "PROC_CONN_QUIC_ERROR": 5, + "PROC_CONN_CLOSED_BY_APP": 6, + "PROC_CONN_CLOSED_LOCALLY": 7, + "PROC_CONN_TIMEDOUT": 8, +} as const; +export type C_TYPES = typeof C_TYPES[keyof typeof C_TYPES]; +//make so i can get the KEY from the VALUE of C_TYPES +const C_TYPES_KEYS = Object.keys(C_TYPES) as (keyof typeof C_TYPES)[]; + +export const FFI_CODES = { + 0: { + type: C_TYPES.PROC_OK, + name: C_TYPES_KEYS[C_TYPES.PROC_OK], + desc: "Success", + }, + 1: { + type: C_TYPES.PROC_ERR, + name: C_TYPES_KEYS[C_TYPES.PROC_ERR], + desc: "Generic error", + }, + 2: { + type: C_TYPES.PROC_CONN_CLOSED, + name: C_TYPES_KEYS[C_TYPES.PROC_CONN_CLOSED], + desc: "Connection closed by peer", + }, + 3: { + type: C_TYPES.PROC_CONN_TIMEOUT, + name: C_TYPES_KEYS[C_TYPES.PROC_CONN_TIMEOUT], + desc: "Connection timed out", + }, + 4: { + type: C_TYPES.PROC_CONN_H3_ERROR, + name: C_TYPES_KEYS[C_TYPES.PROC_CONN_H3_ERROR], + desc: "Connection closed by peer due to H3 error", + }, + 5: { + type: C_TYPES.PROC_CONN_QUIC_ERROR, + name: C_TYPES_KEYS[C_TYPES.PROC_CONN_QUIC_ERROR], + desc: "Connection closed by peer due to QUIC error", + }, + 6: { + type: C_TYPES.PROC_CONN_CLOSED_BY_APP, + name: C_TYPES_KEYS[C_TYPES.PROC_CONN_CLOSED_BY_APP], + desc: "Connection closed by application", + }, + 7: { + type: C_TYPES.PROC_CONN_CLOSED_LOCALLY, + name: C_TYPES_KEYS[C_TYPES.PROC_CONN_CLOSED_LOCALLY], + desc: "Connection closed locally", + }, +} as const; + +export type FFI_CODES = typeof FFI_CODES[keyof typeof FFI_CODES]; diff --git a/mod/connection.ts b/mod/connection.ts new file mode 100644 index 0000000..ef9d9c2 --- /dev/null +++ b/mod/connection.ts @@ -0,0 +1,182 @@ +// import { FFI_CODES } from "./code.ts"; + +import { + WebTransportBidirectionalStream, + WebTransportDatagramDuplexStream, + WebTransportReceiveStream, + WebTransportSendStream, +} from "./streams.ts"; + +/// +export interface WebTransportSendStreamOptions { + sendOrder?: number | null; +} +export interface WebTransportCloseInfo { + errorCode?: number; + reason?: string; +} + +export default class WebTransportConnection { + state: + | "connected" + | "closed" + | "draining" + | "failed" + | "connecting" = "closed" as const; + + constructor( + public readonly pointer: Deno.PointerValue, + private readonly buffer: Uint8Array, + ) { + this.state = "connected"; + } + get datagrams() { + return new WebTransportDatagramDuplexStream( + this.pointer, + this.buffer, + this.error.pointer, + ); + } + get incomingBidirectionalStreams() { + const pointer = this.pointer; + const errorPTR = this.error.pointer; + return new ReadableStream< + WebTransportBidirectionalStream + >({ + async start(controller) { + try { + if ( + (!pointer || pointer === null) || + (!errorPTR || errorPTR === null) + ) { + controller.close(); + return; + } + const stream = await window.WTLIB.symbols + .proc_accept_bi( + pointer, + errorPTR, + ); + if (!stream || stream === null) { + console.error( + "[incoming BIDI] Stream not accepted", + ); + controller.close(); + return; + } + controller.enqueue( + new WebTransportBidirectionalStream( + stream, + errorPTR, + ), + ); + } catch (e) { + controller.error(e); + } + }, + cancel() { + console.info("[incoming BIDI] Cancelled"); + }, + }); + } + get incomingUnidirectionalStreams() { + const pointer = this.pointer; + const errorPTR = this.error.pointer; + return new ReadableStream< + ReadableStream + >({ + async start(controller) { + if ( + (!pointer || pointer === null) || + (!errorPTR || errorPTR === null) + ) { + controller.close(); + return; + } + try { + const stream = await window.WTLIB.symbols + .proc_accept_uni( + pointer, + errorPTR, + ); + if (!stream || stream === null) { + console.error("[incoming UNI] Stream not accepted"); + controller.close(); + return; + } + controller.enqueue(WebTransportReceiveStream.from( + stream, + undefined, + errorPTR, + )); + } catch (e) { + controller.error(e); + } + }, + cancel() { + console.info("[incoming UNI] Cancelled"); + }, + }); + } + public async createBidirectionalStream( + _options?: WebTransportSendStreamOptions, + ): Promise { + //The following operation block the thread until the stream is created. + if (!this.pointer || this.pointer === null) { + throw new Error("Connection is closed"); + } + const _streams = await window.WTLIB.symbols.proc_open_bi( + this.pointer, + this.error.pointer, + ); + if (!_streams || _streams === null) { + throw new Error("Failed to create bi stream"); + } + return new WebTransportBidirectionalStream( + _streams, + this.error.pointer, + ); + } + + public async createUnidirectionalStream( + _options?: WebTransportSendStreamOptions, + ): Promise { + if (!this.pointer || this.pointer === null) { + throw new Error("Connection is closed"); + } + //The following operation block the thread until the stream is created. + const _streams = await window.WTLIB.symbols.proc_open_uni( + this.pointer, + this.error.pointer, + ); + + if (!_streams || _streams === null) { + throw new Error("Failed to create uni stream"); + } + const stream = WebTransportSendStream.from( + _streams, + this.error.pointer, + ); + + return stream; + } + + error = new Deno.UnsafeCallback({ + parameters: ["u32", "buffer", "u32"], + result: "void", + }, (code, _pointer, _buflen) => { + if (code === 154) { + this.close(); + } + }); + close(_closeInfo?: WebTransportCloseInfo) { + if (!this.pointer || this.pointer === null) { + throw new Error("Connection is closed"); + } + + this.state = "closed"; + window.WTLIB.symbols.proc_client_close( + this.pointer, + ); + } +} diff --git a/mod/crypto.ts b/mod/crypto.ts new file mode 100644 index 0000000..a7751f7 --- /dev/null +++ b/mod/crypto.ts @@ -0,0 +1,106 @@ +if (import.meta.main) { + throw new Error("This module is not meant to be imported."); +} +import { join } from "./deps.ts"; +import { encodeBuf } from "./utils.ts"; +export function base64ToArrayBuffer(base64: string) { + const binaryString = atob(base64); + const bytes = new Uint8Array(binaryString.length); + for (let i = 0; i < binaryString.length; i++) { + bytes[i] = binaryString.charCodeAt(i); + } + return bytes; +} + +export function readCertFile(certpath: string) { + try { + Deno.statSync(certpath); + } catch { + throw new Error("Certificate file does not exist"); + } + const cert = Deno.readTextFileSync(certpath); + const certBase64 = cert.replace(/-----BEGIN CERTIFICATE-----/g, "").replace( + /-----END CERTIFICATE-----/g, + "", + ).replace(/\n/g, ""); + const certBuffer = base64ToArrayBuffer(certBase64); + return certBuffer; +} + +export function GenerateCertKey( + domainStr: string, + start: number, + end: number, +) { + if (start > end) throw new Error("Invalid date range"); + if (domainStr.length === 0) throw new Error("Invalid domain name"); + + const domain = encodeBuf(domainStr); + const certBUFF = new Uint8Array(2048); + const certLenPTR = new Uint32Array(1); + const keyBUFF = new Uint8Array(2048); + const keyLenPTR = new Uint32Array(1); + try { + const struct = window.WTLIB.symbols.proc_gencert( + domain[0], + domain[1], + start, + end, + certBUFF, + certLenPTR, + keyBUFF, + keyLenPTR, + ); + if (!struct) { + throw new Error("Failed to generate certificate"); + } + + const cert = certBUFF.subarray(0, certLenPTR[0]); + const key = keyBUFF.subarray(0, keyLenPTR[0]); + + return [cert, key]; + } catch (e) { + console.error(e); + Deno.exit(1); + } +} + +/// Generate certificate and key file +/// GenerateCertKeyFile("localhost", 0, 10); +export function GenerateCertKeyFile( + domainStr: string, + start: number, + end: number, + path?: string, + keyFileName?: string, + certFileName?: string, +) { + if (start > end) throw new Error("Invalid date range"); + path = path ?? join(Deno.cwd(), "./certs/"); + //check path + try { + Deno.statSync(path); + } catch { + console.info("[Webtransport] Creating directory: ", path); + Deno.mkdirSync(path, { + recursive: true, + }); + } + const [cert, key] = GenerateCertKey(domainStr, start, end); + const certpath = join(path, `${certFileName ?? domainStr + ".crt"}`); + const keypath = join(path, `${keyFileName ?? domainStr + ".key"}`); + try { + Deno.writeFileSync(certpath, cert, { + mode: 0o766, + create: true, + }); + Deno.writeFileSync(keypath, key, { + mode: 0o766, + create: true, + }); + return [certpath, keypath]; + } catch (e) { + console.error(e); + throw new Error("Failed to write certificate or key file ", e); + } +} diff --git a/mod/deps.ts b/mod/deps.ts new file mode 100644 index 0000000..9fdefc2 --- /dev/null +++ b/mod/deps.ts @@ -0,0 +1,3 @@ +import "https://deno.land/std@0.202.0/dotenv/load.ts"; +export { join, toFileUrl } from "https://deno.land/std@0.202.0/path/mod.ts"; +export { EventEmitter } from "https://deno.land/x/event@2.0.1/mod.ts"; diff --git a/mod/interface.ts b/mod/interface.ts new file mode 100644 index 0000000..5b8a50b --- /dev/null +++ b/mod/interface.ts @@ -0,0 +1,28 @@ +export const WebTransportOptions = { + maxTimeout: 10, + keepAlive: 3, +}; + +export const CertificateOptions = { + certFile: "", + keyFile: "", +}; + +export const CertificateGenParams = { + domain: Deno.hostname(), + notBefore: 0, + notAfter: 10, +}; +type CertificateOptions = + & typeof CertificateOptions + & typeof CertificateGenParams; + +export const WebTransportServerOptions = { + maxTimeout: 10, + keepAlive: 3, +}; +export type WebTransportServerOptions = + & typeof WebTransportServerOptions + & Partial; + +export type WebTransportOptions = typeof WebTransportOptions; diff --git a/mod/lib.ts b/mod/lib.ts new file mode 100644 index 0000000..07bfa95 --- /dev/null +++ b/mod/lib.ts @@ -0,0 +1,113 @@ +import { toFileUrl } from "./deps.ts"; +import LIB_URL from "../utils/download_lib.ts"; +import { symbols } from "./symbols.ts"; + +let local = false; +let download_lib: URL; +if (import.meta.url.startsWith("file://")) { + local = true; + download_lib = new URL(`http://localhost/`); +} else { + local = false; + if (LIB_URL) { + download_lib = LIB_URL; + } else { + throw new Error("LIB_URL is not defined"); + } +} +const askPerm = async (): Promise => { + if ((await Deno.permissions.query({ name: "write" })).state !== "granted") { + console.info( + `We need to download the library to use this module, please grant write permission to Deno.`, + ); + await Deno.permissions.request({ + name: "write", + path: Deno.cwd(), + }); + } + + if ( + (await Deno.permissions.query({ name: "read" })) + .state !== + "granted" + ) { + console.info( + `We need to download the library to use this module, please grant read permission to Deno.`, + ); + await Deno.permissions.request({ + name: "read", + }); + } +}; +const currentPath = toFileUrl(Deno.cwd() + "/"); + +const dirpath = local ? "./target/debug/" : "./.lib/"; +let [fileExt, fileprefix] = ["", Deno.build.os === "windows" ? "" : "lib"]; +switch (Deno.build.os) { + case "windows": + fileExt = ".dll"; + break; + case "linux": + fileExt = ".so"; + break; + case "darwin": { + fileExt = ".dylib"; + break; + } +} +const buildFilename = `${fileprefix}webtransport${ + !Deno.env.has("CI_BUILD") + ? "" + : `${Deno.build.arch == "aarch64" ? `_${Deno.build.arch}` : ""}` +}${fileExt}`; + +const DURL = new URL(dirpath + buildFilename, currentPath); +if (!local) { + const remoteLIb = await fetch(download_lib, { + headers: { + Accept: "application/octet-stream", + }, + }); + const remoteBuffer = new Uint8Array(await remoteLIb.arrayBuffer()); + let dirs = false; + try { + const dir = Deno.statSync(`./.lib`); + if (dir.isDirectory) { + dirs = true; + Deno.statSync(DURL); + const file = Deno.readFileSync(DURL); + const localhash = await crypto.subtle.digest("SHA-1", file); + const remotehash = await crypto.subtle.digest( + "SHA-1", + remoteBuffer, + ); + const localhashHex = Array.from(new Uint8Array(localhash)).map(( + b, + ) => b.toString(16).padStart(2, "0")).join(""); + const remotehashHex = Array.from(new Uint8Array(remotehash)).map(( + b, + ) => b.toString(16).padStart(2, "0")).join(""); + if (localhashHex !== remotehashHex) { + await askPerm(); + Deno.writeFileSync(DURL, remoteBuffer); + } + } else { + dirs = false; + throw ""; + } + } catch { + await askPerm(); + if (!dirs) { + Deno.mkdirSync(`./dist`, { recursive: false }); + } + Deno.writeFileSync(DURL, remoteBuffer, { + create: true, + mode: 0o755, + }); + } + console.log(`Downloaded library to ${DURL}`); +} + +export const LIB = async () => await Deno.dlopen(DURL, symbols); + +export default { LIB, symbols }; diff --git a/mod/mod.ts b/mod/mod.ts new file mode 100644 index 0000000..0bc1c00 --- /dev/null +++ b/mod/mod.ts @@ -0,0 +1,47 @@ +import { WebTransportOptions, WebTransportServerOptions } from "./interface.ts"; +import { symbols } from "./symbols.ts"; +import { LIB } from "./lib.ts"; + +if (window.WTLIB_STATE) { + throw new Error("[Webtransport] Main module already imported."); +} + +window.WTLIB = await LIB(); +window.WTLIB_STATE = true; +import { WebTransportServer } from "./server.ts"; +import { WebTransport } from "./client.ts"; + +declare global { + export interface Window { + WTLIB: Deno.DynamicLibrary; + WTLIB_STATE: boolean; + } +} + +declare global { + namespace globalThis { + const WebTransport: { + readonly prototype: WebTransport; + new ( + _client: URL | string, + _options?: WebTransportOptions, + ): WebTransport; + }; + const WebTransportServer: { + readonly prototype: WebTransportServer; + new ( + _url: URL | string, + _options: WebTransportServerOptions, + ): WebTransportServer; + }; + } +} + +((globalThis) => { + Object.assign(globalThis, { + WebTransport: WebTransport, + WebTransportServer: WebTransportServer, + }); +})(globalThis); + +// Path: mod/mod.ts diff --git a/mod/server.ts b/mod/server.ts new file mode 100644 index 0000000..031d515 --- /dev/null +++ b/mod/server.ts @@ -0,0 +1,198 @@ +if (import.meta.main) { + throw new Error("This module is not meant to be imported."); +} +import WebTransportConnection from "./connection.ts"; +import { GenerateCertKeyFile } from "./crypto.ts"; +import { EventEmitter } from "./deps.ts"; +import { + type WebTransportServerOptions, + WebTransportServerOptions as ServerOpts, +} from "./interface.ts"; +import { encodeBuf } from "./utils.ts"; + +export type WebTransportServerEvents = { + listening: [Event]; + connection: [WebTransportConnection]; + event: [MessageEvent]; + // Error Event + error: [ErrorEvent | string]; + // Close Event + close: [CloseEvent]; +}; + +export class WebTransportServer extends EventEmitter { + public connections: Map = new Map(); + #SRV_PTR: Deno.PointerValue | undefined; + + #CONNECTION_CB = new Deno.UnsafeCallback( + { + parameters: ["pointer"], + result: "void", + }, + this.connection.bind(this), + ); + constructor( + _url: URL | string, + _options: WebTransportServerOptions = ServerOpts, + ) { + super(); + //if _url is a string, we need to convert it to an URL + if (typeof _url === "string") { + _url = new URL(_url); + } + if (_url.port.length == 0) { + throw new TypeError("Invalid port"); + } + if (_url.protocol != "https:") { + throw new TypeError("Invalid protocol"); + } + if (_url.hostname.length == 0) { + throw new TypeError("Invalid hostname"); + } + _options.domain ??= _url.hostname ?? Deno.hostname(); + + const [certificate, key] = this.checkArgs(_options); + + const certbuf = encodeBuf(certificate); + const keybuf = encodeBuf(key); + + this.#SRV_PTR = window.WTLIB.symbols.proc_server_init( + parseInt(_url.port), + true, + _options.keepAlive, + _options.maxTimeout, + certbuf[0], + certbuf[1], + keybuf[0], + keybuf[1], + ); + + if (!this.#SRV_PTR) { + throw new Error("Failed to initialize server"); + } + + this.#CONNECTION_CB.ref(); + this.emit("listening", new Event("listening")); + } + /** + * @callback connection + * @param {Deno.PointerValue} client + * @returns {void} + * @description This function is called when a new connection is received from the server + */ + private connection(client: Deno.PointerValue) { + const SHARED_BUF = new ArrayBuffer(65536); + const CONN_BUFFER = new Uint8Array(SHARED_BUF); + + const conn = new WebTransportConnection( + client, + CONN_BUFFER, + ); + + this.connections.set( + this.connections.size, + conn, + ); + this.emit("connection", conn); + } + + async close() { + console.info("[JS] SERVER CLOSE CALLED"); + //free all the connections + if (this.#SRV_PTR) { + if (this.connections.size > 0) { + await window.WTLIB.symbols.proc_server_close_clients( + this.#SRV_PTR, + ); + } + } + this.connections.forEach((conn, id) => { + if (conn.state != "closed" && conn.pointer) { + window.WTLIB.symbols.proc_client_close( + conn.pointer, + ); + window.WTLIB.symbols.free_conn(conn.pointer); + conn.state = "closed"; + } + this.connections.delete(id); + }); + + this.#CONNECTION_CB.close(); + await new Promise((r) => { + setTimeout(() => { + window.WTLIB.symbols.proc_server_close(this.#SRV_PTR!); + r(true); + }, 100); + }); + window.WTLIB.symbols.free_server(this.#SRV_PTR!); + this.#SRV_PTR = undefined; + console.info("[SERVER] Server closed"); + } + get ready() { + return new Promise((resolve, reject) => { + console.info("[SERVER] Server ready"); + const rest = window.WTLIB.symbols.proc_server_listen( + this.#SRV_PTR!, + this.#CONNECTION_CB.pointer, + ); + if (!rest) { + reject("Failed to listen"); + } + resolve(this); + }); + } + private checkArgs(_options: WebTransportServerOptions) { + if ( + ((!_options.certFile || _options.certFile.length == 0) && + (!_options.keyFile || _options.keyFile.length == 0)) && + (typeof _options.notAfter == "undefined" && + typeof _options.notBefore == "undefined" && + !_options.domain) + ) { + throw new TypeError( + "Missing necessary parameters: certFile, keyFile or notAfter, notBefore to generate a new certificate", + ); + } + let certificate = ""; + let key = ""; + + if ( + (_options.certFile && _options.keyFile) + ) { + if (_options.certFile.length == 0 || _options.keyFile.length == 0) { + throw new TypeError( + "Invalid certificate or key file path (empty string)", + ); + } + try { + Deno.statSync(_options.certFile); + Deno.statSync(_options.keyFile); + } catch { + throw new TypeError( + "Invalid certificate or key file path", + ); + } + // + certificate = _options.certFile; + key = _options.keyFile; + return [certificate, key]; + } + if ( + typeof _options.notAfter != "undefined" && + typeof _options.notBefore != "undefined" && _options.domain + ) { + return [certificate, key] = GenerateCertKeyFile( + _options.domain, + _options.notBefore, + _options.notAfter, + ); + } else { + throw new TypeError( + "Missing necessary parameters: notAfter, notBefore, domain to generate a new certificate", + ); + } + } +} +export default WebTransportServer; + +// Path: mod/client.ts diff --git a/mod/streams.ts b/mod/streams.ts new file mode 100644 index 0000000..c8ea829 --- /dev/null +++ b/mod/streams.ts @@ -0,0 +1,227 @@ +/** + * @class WebTransportDatagramDuplexStream + */ +export class WebTransportDatagramDuplexStream { + #READ_BUFFER?: Uint8Array; + readonly incomingHighWaterMark = 1; + readonly incomingMaxAge = 0; + readonly maxDatagramSize = 1024; + readonly outgoingHighWaterMark = 1; + readonly outgoingMaxAge = 0; + constructor( + private readonly connection: Deno.PointerValue, + readonly _buffer: Uint8Array, + private readonly cb: Deno.PointerValue, + ) { + this.#READ_BUFFER = _buffer; + } + get writable() { + const connection = this.connection; + const error = this.cb; + return new WritableStream({ + start(controller) { + if (!connection || connection === null) { + controller.error("Connection is closed"); + return; + } + }, + write(chunk: Uint8Array) { + window.WTLIB.symbols.proc_send_datagram( + connection!, + chunk, + chunk.byteLength, + error, + ); + }, + abort(e) { + console.error("[Error] Write aborted", e); + }, + close() { + console.log("[Info] Write closed"); + }, + }); + } + get readable() { + const connection = this.connection; + const buffer = this.#READ_BUFFER ?? new Uint8Array(1024); + const cb = this.cb; + // const DEFAULT_CHUNK_SIZE = this.maxDatagramSize; + return new ReadableStream({ + type: "bytes", + async start(controller) { + if (!connection || connection === null) { + controller.error("Connection is closed"); + return; + } + const nread = await window.WTLIB.symbols.proc_recv_datagram( + connection, + buffer, + cb, + ) as number; + if (nread <= 0) { + return; + } + controller.enqueue( + buffer.subarray(0, nread as number), + ); + }, + cancel() { + }, + }); + } + close() { + console.log("[Info] Closing datagram duplex stream"); + this.#READ_BUFFER = undefined; + this.readable.cancel(); + this.writable.close(); + } +} +/** + * @class WebTransportBidirectionalStream + * @description This class is used to read and write data from a bidirectional stream + */ +export class WebTransportBidirectionalStream { + readonly readable: ReadableStream; + readonly writable: WritableStream; + + constructor( + public readonly ptr: Deno.PointerValue, + private readonly errorCB: Deno.PointerValue, + ) { + this.readable = WebTransportReceiveStream.from( + this.ptr, + undefined, + errorCB, + ); + console.log(this.ptr); + this.writable = WebTransportSendStream.from(this.ptr, errorCB); + } +} + +/** + * @class WebTransportReceiveStream + * @description This class is used to read data from a stream (uni or bidirectional) + */ +export class WebTransportReceiveStream { + private static readonly ptr: Deno.PointerValue; + constructor(private readonly ptr: Deno.PointerValue) { + } + static from( + ptr: Deno.PointerValue, + DEFAULT_CHUNK_SIZE = 1024, + cb: Deno.PointerValue, + ) { + console.log("BIDI READER"); + return new ReadableStream({ + type: "bytes", + start(controller) { + readRepeatedly().catch((e) => controller.error(e)); + async function readRepeatedly() { + // return socket.select2().then(() => { + // Since the socket can become readable even when there's + // no pending BYOB requests, we need to handle both cases. + let bytesRead; + if (controller.byobRequest) { + const v = controller.byobRequest.view; + bytesRead = await window.WTLIB.symbols.proc_read( + ptr, + v?.buffer!, + v?.byteLength!, + cb, + ); + if (bytesRead === 0) { + controller.close(); + } + controller.byobRequest.respond(bytesRead as number); + } else { + const buffer = new ArrayBuffer(DEFAULT_CHUNK_SIZE); + bytesRead = await window.WTLIB.symbols.proc_read( + ptr, + buffer, + DEFAULT_CHUNK_SIZE, + cb, + ); + if (bytesRead === 0) { + controller.close(); + } else { + controller.enqueue( + new Uint8Array(buffer, 0, bytesRead as number), + ); + } + } + + if (bytesRead === 0) { + return; + // no more bytes in source + } + return readRepeatedly(); + // }); + } + }, + + async cancel(reason?: string): Promise { + if (!ptr || ptr === null) { + console.debug("Stream is closed"); + return; + } + console.error("Canceled: ", reason); + await window.WTLIB.symbols.proc_recvtream_stop(ptr).catch( + (e) => { + console.error(e); + }, + ); + }, + }); + } +} + +/** + * @class WebTransportSendStream + * @description This class is used to write data to a stream (uni or bidirectional) + */ +export class WebTransportSendStream { + static from( + ptr: Deno.PointerValue, + cb: Deno.PointerValue, + ) { + return new WritableStream({ + async write( + chunk: Uint8Array, + controller: WritableStreamDefaultController, + ) { + let written = 0; + if (!ptr || ptr === null) { + controller.error("Stream is closed"); + return; + } + + written = await window.WTLIB.symbols.proc_write( + ptr, + chunk, + chunk.byteLength, + cb, + ) as number; + if (written === 0) { + controller.error("Write failed"); + return; + } + }, + async abort() { + if (!ptr || ptr === null) { + return; + } + await window.WTLIB.symbols.proc_sendstream_finish( + ptr, + ); + }, + async close() { + if (!ptr || ptr === null) { + return; + } + await window.WTLIB.symbols.proc_sendstream_finish( + ptr, + ); + }, + }); + } +} diff --git a/mod/symbols.ts b/mod/symbols.ts new file mode 100644 index 0000000..4b994c4 --- /dev/null +++ b/mod/symbols.ts @@ -0,0 +1,193 @@ +export const symbols = { + // Server symbols + proc_server_init: { + parameters: [ + // "function", //Callback + "u16", + "bool", + "u64", + "u64", + "buffer", + "usize", + "buffer", + "usize", //KeyLen + ], + result: "pointer", + callback: false, + }, + proc_server_listen: { + parameters: ["pointer", "function"], + result: "pointer", + callback: true, + nonblocking: false, + }, + proc_server_close: { + parameters: ["pointer"], + result: "usize", + nonblocking: false, + }, + proc_server_close_clients: { + parameters: ["pointer"], + result: "usize", + nonblocking: true, + }, + proc_server_client_headers: { + parameters: ["pointer", "buffer"], + result: "pointer", + }, + proc_server_client_path: { + parameters: ["pointer", "buffer"], + result: "pointer", + }, + proc_server_client_authority: { + parameters: ["pointer", "buffer"], + result: "pointer", + }, + // Client symbols + proc_client_init: { + parameters: [ + // "function", //Event Callback + "u64", + "u64", //MaxTimeout + ], + result: "pointer", + }, + proc_client_connect: { + parameters: [ + "pointer", + "buffer", + "usize", //HostLen + ], + result: "pointer", + nonblocking: true, + }, + proc_client_close: { + parameters: ["pointer"], + result: "void", + }, + // Shared symbols + proc_recv_datagram: { + parameters: ["pointer", "buffer", "function"], + result: "usize", + nonblocking: true, + callback: true, + }, + proc_send_datagram: { + parameters: ["pointer", "buffer", "usize", "function"], + result: "void", + nonblocking: true, + callback: true, + }, + proc_accept_bi: { + parameters: [ + "pointer", + "function", //Event Callback + ], + result: "pointer", + nonblocking: true, + callback: true, + }, + proc_open_bi: { + parameters: [ + "pointer", + "function", //Event Callback + ], + result: "pointer", + nonblocking: true, + callback: true, + }, + + proc_accept_uni: { + parameters: [ + "pointer", + "function", //Event Callback + ], + result: "pointer", + nonblocking: true, + callback: true, + }, + proc_open_uni: { + parameters: [ + "pointer", + "function", //Event Callback + ], + result: "pointer", + nonblocking: true, + callback: true, + }, + + proc_read: { + parameters: ["pointer", "buffer", "usize", "function"], + result: "usize", + nonblocking: true, + callback: true, + }, + proc_write: { + parameters: ["pointer", "buffer", "usize", "function"], + result: "usize", + nonblocking: true, + callback: true, + }, + proc_write_all: { + parameters: ["pointer", "buffer", "usize", "function"], + result: "usize", + nonblocking: true, + callback: true, + }, + proc_recvstream_id: { + parameters: ["pointer"], + result: "u64", + }, + proc_sendstream_id: { + parameters: ["pointer"], + result: "u64", + }, + proc_sendstream_priority: { + parameters: ["pointer"], + result: "u64", + }, + proc_sendstream_set_priority: { + parameters: ["pointer", "u64"], + result: "u64", + }, + proc_sendstream_finish: { + parameters: ["pointer"], + result: "void", + nonblocking: true, + }, + proc_recvtream_stop: { + parameters: ["pointer"], + result: "void", + nonblocking: true, + }, + // Crypto symbols + proc_gencert: { + parameters: [ + "buffer", + "usize", + "i64", + "i64", + "buffer", + "buffer", + "buffer", + "buffer", + ], + result: "bool", + }, + free_server: { + parameters: ["pointer"], + result: "void", + }, + free_conn: { + parameters: ["pointer"], + result: "void", + }, + free_all_client: { + parameters: ["pointer", "pointer"], + result: "void", + }, + free_client: { + parameters: ["pointer"], + result: "void", + }, +} as const; diff --git a/mod/utils.ts b/mod/utils.ts new file mode 100644 index 0000000..a002fea --- /dev/null +++ b/mod/utils.ts @@ -0,0 +1,16 @@ +if (import.meta.main) { + throw new Error("This module is not meant to be imported."); +} +export const encoder = new TextEncoder(); + +export function encodeBuf(v: string | Uint8Array): [Uint8Array, number] { + if (typeof v !== "string") return [v, v.byteLength]; + const encoded = encoder.encode(v); + return [encoded, encoded.byteLength]; +} +export const decoder = new TextDecoder(); +export default { + decoder, + encoder, + encodeBuf, +}; diff --git a/src/certificate.rs b/src/certificate.rs new file mode 100644 index 0000000..b708a72 --- /dev/null +++ b/src/certificate.rs @@ -0,0 +1,82 @@ +use anyhow::Result; +use rcgen::BasicConstraints; +use rcgen::CertificateParams; +use rcgen::DistinguishedName; +use rcgen::DnType; +use rcgen::ExtendedKeyUsagePurpose; +use rcgen::IsCa; +use rcgen::KeyPair; +use rcgen::PKCS_ECDSA_P256_SHA256; + +use time::Duration; +use time::OffsetDateTime; + +pub struct SelfCertificate { + /// DER certificate. + pub certificate: rcgen::Certificate, +} + +/// Generates a self-signed certificate for WebTransport connections. +pub fn generate_certificate>( + common_name: S, + start: OffsetDateTime, + end: OffsetDateTime, +) -> Result { + let keypair = KeyPair::generate(&PKCS_ECDSA_P256_SHA256)?; + + let mut dname = DistinguishedName::new(); + dname.push(DnType::CommonName, common_name.as_ref()); + + let mut cert_params = CertificateParams::new(vec![common_name.as_ref().to_string()]); + cert_params.distinguished_name = dname; + cert_params.alg = &PKCS_ECDSA_P256_SHA256; + cert_params.key_pair = Some(keypair); + cert_params.not_before = start; + cert_params.not_after = end; + cert_params.extended_key_usages = vec![ + ExtendedKeyUsagePurpose::ServerAuth, + ExtendedKeyUsagePurpose::ClientAuth, + ]; + cert_params.key_usages = vec![ + rcgen::KeyUsagePurpose::DigitalSignature, + rcgen::KeyUsagePurpose::KeyCertSign, + ]; + cert_params.is_ca = IsCa::Ca(BasicConstraints::Unconstrained); + + let certificate = rcgen::Certificate::from_params(cert_params)?; + + Ok(SelfCertificate { certificate }) +} + +#[no_mangle] +pub unsafe extern "C" fn proc_gencert( + domain_buf: *const u8, + domain_buf_len: usize, + start: i64, + end: i64, + cert_buf: *mut u8, + cert_buf_len: *mut usize, + key_buf: *mut u8, + key_buf_len: *mut usize, +) -> bool { + let buf = ::std::slice::from_raw_parts(domain_buf, domain_buf_len); + let domain = String::from_utf8_lossy(buf); + let start = OffsetDateTime::now_utc() + .checked_add(Duration::days(start)) + .unwrap(); + let end = OffsetDateTime::now_utc() + .checked_add(Duration::days(end)) + .unwrap(); + let cert = generate_certificate(domain, start, end).unwrap(); + let certbuffer = cert.certificate.serialize_pem().unwrap(); + let keybuff = cert.certificate.serialize_private_key_pem(); + + let cert_len = certbuffer.len(); + let key_len = keybuff.len(); + + ::std::slice::from_raw_parts_mut(cert_buf, cert_len).copy_from_slice(certbuffer.as_bytes()); + ::std::slice::from_raw_parts_mut(key_buf, key_len).copy_from_slice(keybuff.as_bytes()); + *cert_buf_len = cert_len; + *key_buf_len = key_len; + true +} diff --git a/src/client.rs b/src/client.rs index cc622d2..06ecbc2 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,21 +1,112 @@ -use flume::{Sender, Receiver}; -use wtransport::{Connection, datagram::Datagram}; +use crate::{ + connection::{Client, Conn}, + RUNTIME, +}; +use std::{io::Error, time::Duration}; +use wtransport::endpoint::endpoint_side::Client as endClient; +use wtransport::{ClientConfig, Endpoint}; +pub struct WebTransportClient { + pub client: Option>, + // pub conn_cb: Option)>, + pub state: Option, +} + +impl WebTransportClient { + pub(crate) fn new(config: ClientConfig) -> Result { + let _guard = RUNTIME.enter(); + + let client = match Endpoint::client(config) { + Ok(server) => server, + Err(e) => { + return Err(e); + } + }; + Ok(Self { + // conn_cb: None, + client: Some(client), + state: Some(true), + }) + } + + pub(crate) fn connect(&'static mut self, url: String) -> *mut Conn { + RUNTIME.block_on(async move { + let client = self.client.as_mut().unwrap(); + + match client.connect(url).await { + Ok(conn) => { + let client = Conn::::new(conn); + return Box::into_raw(Box::new(client)); + } + Err(_err) => { + //return null ptr + return std::ptr::null_mut(); + // println!("DBG: Error connecting to server. Err: {}", err.to_string()); + // let mut msg = err.to_string(); + // errocb(141, msg.as_mut_ptr(), msg.len() as u32); + //sender_cb(141, msg.as_mut_ptr(), msg.len() as u32); + } + } + }) + } +} + +#[no_mangle] +pub extern "C" fn proc_client_init( + // send_func: Option, + keepalive: u64, + timeout: u64, +) -> *mut WebTransportClient { + // assert!(!send_func.is_none()); + + let keepalive = if keepalive == 0 { + None + } else { + Some(Duration::from_secs(keepalive)) + }; + let timeout = if timeout == 0 { + None + } else { + Some(Duration::from_secs(timeout)) + }; + + let config = ClientConfig::builder() + .with_bind_config(wtransport::config::IpBindConfig::InAddrAnyDual) + .with_no_cert_validation() + .keep_alive_interval(keepalive) + .max_idle_timeout(timeout) + .unwrap() + .build(); + let client = WebTransportClient::new(config); + match client { + Ok(client) => Box::into_raw(Box::new(client)), + Err(_error) => std::ptr::null_mut(), + } +} +#[no_mangle] +pub unsafe extern "C" fn proc_client_connect( + client: *mut WebTransportClient, + url: *const u8, + url_len: usize, +) { + let url = ::std::slice::from_raw_parts(url, url_len); + let url = std::str::from_utf8(url).unwrap(); + let client = &mut *client; + + client.connect(url.to_string()); +} + +#[no_mangle] +pub unsafe extern "C" fn proc_client_close(conn: *mut Conn) -> usize { + assert!(!conn.is_null()); -pub struct ClientConn { - pub conn: Connection, - pub datagram_ch_sender: Sender, - pub datagram_ch_receiver: Receiver, + let conn = &mut *conn; + conn.close(30, Some(b"Closed by client")); + 0 } +#[no_mangle] +pub unsafe extern "C" fn free_all_client(_a: *mut WebTransportClient, _b: *mut Conn) {} -impl ClientConn { - pub(crate) fn new(conn: Connection) -> Self { - let (sender, receiver) = flume::unbounded(); - Self { - conn, - datagram_ch_sender: sender, - datagram_ch_receiver: receiver, - } - } -} \ No newline at end of file +#[no_mangle] +pub unsafe extern "C" fn free_client(_a: *mut WebTransportClient) {} diff --git a/src/connection.rs b/src/connection.rs new file mode 100644 index 0000000..09f32bc --- /dev/null +++ b/src/connection.rs @@ -0,0 +1,146 @@ +use crate::RUNTIME; +use std::{collections::HashMap, future::Future, marker::PhantomData, pin::Pin}; +use wtransport::{ + datagram::Datagram, endpoint::SessionRequest, error::ConnectionError, Connection, RecvStream, + SendStream, +}; +use wtransport_proto::varint::VarInt; + +type DynFutureIncomingSession = dyn Future> + Send + Sync; + +pub struct Server(Pin>); + +/// Type of endpoint opening a WebTransport connection. +pub struct Client; + +pub struct Conn { + pub conn: Option, + pub accepted_session: Option, + pub buffer: Option<&'static mut [u8]>, + _marker: PhantomData, +} + +impl Conn { + pub fn read_datagram(&mut self) -> Result { + let conn = self.conn.as_ref().unwrap(); + let stream = RUNTIME.block_on(async move { conn.receive_datagram().await }); + match stream { + Ok(dgram) => Ok(dgram), + Err(error) => Err(error), + } + } + + pub async fn closed(&mut self) { + let conn = self.conn.as_ref().unwrap(); + conn.closed().await + } + + /// Open a unidirectional stream. + pub fn open_uni(&'static mut self) -> Result { + let conn = self.conn.as_ref().unwrap(); + let stream = RUNTIME.block_on(async move { + let stream = conn.open_uni().await; + match stream { + Ok(stream) => Ok(stream.await.unwrap()), + Err(e) => Err(e), + } + }); + stream + } + + /// Open a bidirectional stream. + pub fn open_bi(&'static mut self) -> Result<(SendStream, RecvStream), ConnectionError> { + let conn = self.conn.as_ref().unwrap(); + let stream = RUNTIME.block_on(async move { + let stream = conn.open_bi().await; + match stream { + Ok(stream) => Ok(stream.await.unwrap()), + Err(e) => Err(e), + } + }); + + stream + } + + /// Accept a unidirectional stream. + pub fn accept_uni(&'static mut self) -> Result { + let conn = self.conn.as_ref().unwrap(); + let stream = RUNTIME.block_on(async move { + let stream = conn.accept_uni().await; + match stream { + Ok(stream) => Ok(stream), + Err(e) => Err(e), + } + }); + stream + } + + /// Accept a bidirectional stream. + pub fn accept_bi(&'static mut self) -> Result<(SendStream, RecvStream), ConnectionError> { + let conn = self.conn.as_ref().unwrap(); + let stream = RUNTIME.block_on(async move { + let stream = conn.accept_bi().await; + match stream { + Ok(stream) => Ok(stream), + Err(e) => Err(e), + } + }); + stream + } + + /// Close the connection. + pub fn close(&mut self, code: u32, reason: Option<&[u8]>) { + let reason = match reason { + Some(reason) => reason, + None => b"closed", + }; + let conn = self.conn.as_ref(); + match conn { + Some(conn) => conn.close(VarInt::from_u32(code), reason), + None => println!("Connection is None"), + } + } +} + +impl Conn { + pub(crate) fn new(accepted_session: SessionRequest) -> Self { + Self { + conn: None, + accepted_session: Some(accepted_session), + buffer: None, + _marker: PhantomData, + } + } + + pub fn accepted(&mut self, conn: Connection) { + self.conn = Some(conn); + } + + pub async fn accept(&mut self) -> Result { + let accepted_session = self.accepted_session.take().unwrap(); + accepted_session.accept().await + } + + pub fn path(&self) -> &str { + self.accepted_session.as_ref().unwrap().path() + } + + pub fn authority(&self) -> &str { + self.accepted_session.as_ref().unwrap().authority() + } + + pub fn headers(&self) -> &HashMap { + self.accepted_session.as_ref().unwrap().headers() + } +} + +impl Conn { + pub(crate) fn new(conn: Connection) -> Self { + Self { + conn: Some(conn), + accepted_session: None, + buffer: None, + _marker: PhantomData, + } + } +} diff --git a/src/executor.rs b/src/executor.rs deleted file mode 100644 index 9659362..0000000 --- a/src/executor.rs +++ /dev/null @@ -1,23 +0,0 @@ -use num_cpus; -use once_cell::sync::Lazy; -use smol::{block_on, future, Executor, Task}; -use std::{future::Future, thread}; - -// Runs the given future on the global executor, spawning a new thread if necessary. - -pub fn spawn(future: impl Future + Send + 'static) -> Task { - static GLOBAL: Lazy> = Lazy::new(|| { - for n in 1..=num_cpus::get() { - thread::Builder::new() - .name(format!("ftlt-{}", n)) - .spawn(|| loop { - block_on(GLOBAL.run(future::pending::<()>())) - }) - .expect("cannot spawn executor thread"); - } - - Executor::new() - }); - - GLOBAL.spawn(future) -} diff --git a/src/lib.rs b/src/lib.rs index 309304d..28c29c1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,315 +1,13 @@ -use client::ClientConn; -use flume::Receiver; -use flume::Sender; -// use futures_util::{pin_mut, select, FutureExt}; -use std::time::Duration; +use once_cell::sync::Lazy; use tokio::runtime::Runtime; -// use wtransport::datagram::Datagram; -use wtransport::endpoint; -// use wtransport::endpoint::SessionRequest; -use wtransport::tls::Certificate; -use wtransport::Connection; -use wtransport::Endpoint; -use wtransport::ServerConfig; +///------------------------------------ +// static mut SEND_FN: Option = None; +static RUNTIME: Lazy = Lazy::new(|| Runtime::new().unwrap()); +///------------------------------------ +mod certificate; mod client; -mod executor; - -#[repr(C)] -pub struct ErrorMessage { - pub code: i32, - pub message: String, -} - -pub struct WebTransport { - /// The innter Server object - pub server: Option>, - // - conn_ch_sender: Option>, - conn_ch_receiver: Option>, - - // datagram_ch_sender: Option>, - // datagram_ch_receiver: Option>, - pub state: Option, -} -static mut SEND_FN: Option = None; - -impl WebTransport { - pub(crate) fn new( - port: u16, - sender_fn: Option, - _runtime: &mut Runtime, - ) -> Result { - unsafe { - SEND_FN = sender_fn; - }; - let _guard = _runtime.enter(); - - let config = ServerConfig::builder() - .with_bind_default(port) - .with_certificate(Certificate::load("cert.crt", "cert.key").unwrap()) - .keep_alive_interval(Some(Duration::from_secs(1))) - .build(); - - let (conn_sender, conn_reciever) = flume::unbounded(); - // let (datagram_sender, datagram_receiver) = flume::unbounded(); - - let server = match Endpoint::server(config) { - Ok(server) => server, - Err(e) => { - println!("Error creating server: {:?}", e); - return Err(1); - } - }; - // - // self.handle_sess_in(_runtime); - // - Ok(Self { - server: Some(server), - state: Some(true), - conn_ch_sender: Some(conn_sender), - conn_ch_receiver: Some(conn_reciever), - // datagram_ch_sender: Some(datagram_sender), - // datagram_ch_receiver: Some(datagram_receiver), - }) - } - - pub(crate) unsafe fn handle_sess_in(&'static mut self, runtime: *mut Runtime) { - println!("Waiting for session request..."); - let rt = runtime.as_mut().unwrap(); - // let _ = rt.enter(); - let handle = rt.handle().clone(); - - rt.spawn(async move { - for id in 0.. { - let sender = self.conn_ch_sender.as_ref().unwrap(); - let incoming_session = self.server.as_mut().unwrap().accept().await; - println!("SEssion Number {}", id); - handle.spawn(async move { - let _test = sender.capacity(); - let _buffer = vec![0; 65536].into_boxed_slice(); - println!("Waiting for session request..."); - let session_request = incoming_session.await.unwrap(); - println!( - "New session: Authority: '{}', Path: '{}'", - session_request.authority(), - session_request.path() - ); - let connection = session_request.accept().await.unwrap(); - println!("Waiting for data from client..."); - let _ = sender.send_async(connection).await; - // self.conn_ch_sender.as_mut().unwrap().send(connection).unwrap(); - }); - } - // for _ in 0.. { - // // let receiver = self.conn_ch_receiver.as_mut().unwrap(); - // let sender = self.conn_ch_sender.as_ref().unwrap(); - // let next = { - // // let insess_receiver = receiver.recv_async().fuse(); - // let incoming_session = self.server.as_mut().unwrap().accept().fuse(); - - // // pin_mut!(sender); - // pin_mut!(incoming_session); - // select! { - // result = incoming_session => { - // let sess_req = result.await; - // Next::NewSessionRequest(sess_req) - // } - // } - // }; - // match next { - // Next::NewSessionRequest(sessreq) => { - // match sessreq { - // Ok(sessreq) => { - // println!( - // "Received Session Request from client: {:?}", - // sessreq.authority() - // ); - // // self.handle_session_impl(sessreq.accept().await); - // let conn = sessreq.accept().await.unwrap(); - // println!( - // "Accepted session request from client: {:?}", - // conn.remote_address() - // ); - - // let _ = sender.send(conn); - // // self.conn_ch_sender.as_mut().unwrap().send(conn).unwrap(); - // } - // Err(e) => { - // println!("Error accepting session request: {:?}", e); - // } - // } - // // insess_sender.send_async(sessreq).await.unwrap(); - // } - // } - // } - }); - } -} - -#[no_mangle] -pub extern "C" fn init_runtime() -> *mut Runtime { - Box::into_raw(Box::new(Runtime::new().unwrap())) -} - -#[no_mangle] -pub unsafe extern "C" fn start( - send_func: Option, - res: *mut u32, - rt_ptr: *mut Runtime, -) -> *mut WebTransport { - assert!(!rt_ptr.is_null()); - assert!(!res.is_null()); - - let _runtime = &mut *rt_ptr; - let server = WebTransport::new(4433, send_func, _runtime); - match server { - Ok(server) => { - let server_ptr = Box::into_raw(Box::new(server)); - *res = 0; - server_ptr - } - Err(_) => { - *res = 2; - std::ptr::null_mut() - } - } -} - -#[no_mangle] -pub unsafe extern "C" fn handle_session(server_ptr: *mut WebTransport, runtime: *mut Runtime) { - assert!(!server_ptr.is_null()); - assert!(!runtime.is_null()); - let server = &mut *server_ptr; - let _runtime = &mut *runtime; - server.handle_sess_in(_runtime); -} - -#[no_mangle] -pub unsafe extern "C" fn proc_rec(srv: *mut WebTransport) -> *mut ClientConn { - assert!(!srv.is_null()); - - let server = &mut *srv; - println!("DBG: RECEIVER READY"); - - match server.conn_ch_receiver.as_ref().unwrap().recv() { - Ok(conn) => { - // println!("New client : {:?}", conn.remote_address()); - // let connptr = Box::into_raw(Box::new(conn)); - let client_conn = ClientConn::new(conn); - let clientptr = Box::into_raw(Box::new(client_conn)); - // Box::into_raw(Box::new(conn)) - clientptr - } - _ => { - panic!("Error receiving connection"); - } - } -} - -//create a method that creates a unbound channel and returns a pointer to the sender/receiver pair to the FFI - -#[no_mangle] -pub unsafe extern "C" fn proc_rec_streams(srv: *mut WebTransport, rt: *mut Runtime, clientptr: *mut ClientConn) { - assert!(!clientptr.is_null()); - assert!(!srv.is_null()); - let client = &mut *clientptr; - let _server = &mut *srv; - let _runtime = &mut *rt; - // let connection = client.get_conn(); - let sender = client.datagram_ch_sender.clone(); - - println!("CONN RECEIVER PROC SET & READY"); - // let rthandle = _runtime.handle(); - _runtime.spawn(async move { - let mut buffer = vec![0; 65536].into_boxed_slice(); - - loop { - // let sender = sender; - // println!("Waiting for datagram from client"); - tokio::select! { - stream = client.conn.accept_bi() => { - match stream { - Ok(mut stream) => { - - println!("Accepted BI stream"); - let bytes_read = stream.1.read(&mut buffer).await.unwrap().unwrap(); - let str_data = std::str::from_utf8(&buffer[..bytes_read]).unwrap(); - - println!("Received (bi) '{str_data}' from client"); - - stream.0.write_all(b"ACK").await.unwrap(); - }, - _ => { - - } - }; - - } - stream = client.conn.accept_uni() => { - // let mut stream = stream; - match stream { - Ok(mut stream) => { - println!("Accepted UNI stream"); - let bytes_read = match stream.read(&mut buffer).await.unwrap() { - Some(bytes_read) => bytes_read, - None => continue, - }; - - let str_data = std::str::from_utf8(&buffer[..bytes_read]).unwrap(); - - println!("Received (uni) '{str_data}' from client"); - - let mut stream = client.conn.open_uni().await.unwrap().await.unwrap(); - stream.write_all(b"ACK").await.unwrap(); - }, - _ => { - // println!("Error accepting UNI stream: {:?}", e); - } - } - - } - dgram = client.conn.receive_datagram() => { - match dgram { - Ok(dgram) => { - - println!("Received datagram from client"); - sender.send(dgram).unwrap(); - // let str_data = std::str::from_utf8(&dgram).unwrap(); - // println!("Received (dgram) '{str_data}' from client"); - client.conn.send_datagram(b"ACK").unwrap(); - }, - _ => { - // break; - // println!("Error receiving datagram"); - } - } - }, - - } - } - });//.detach(); -} - -#[no_mangle] -pub unsafe extern "C" fn proc_recv_ch_datagram(srv: *mut WebTransport, clientptr: *mut ClientConn, buff: *mut u8) -> usize { - assert!(!clientptr.is_null()); - assert!(!srv.is_null()); - let client = &mut *clientptr; - let _server = &mut *srv; - match client.datagram_ch_receiver.recv() { - Ok(dgram) => { - ::std::slice::from_raw_parts_mut(buff, dgram.len()).copy_from_slice(&dgram); - dgram.len() - } - Err(_) => 0, - } -} - -#[no_mangle] -pub unsafe extern "C" fn test_proc(client: *mut ClientConn) { - assert!(!client.is_null()); - let client = &mut *client; - println!("TEST PROC {} ", client.conn.remote_address()); -} +mod connection; +mod server; +mod shared; diff --git a/src/old_runtime.rs b/src/old_runtime.rs deleted file mode 100644 index 4f447b6..0000000 --- a/src/old_runtime.rs +++ /dev/null @@ -1,181 +0,0 @@ -rt.spawn(async move{ - for _id in 0.. { - let incoming_session = self.server.as_mut().unwrap().accept().await; - let mut buffer = vec![0; 65536].into_boxed_slice(); - - println!("Waiting for session request..."); - let session_request = incoming_session.await; - // println!("Session request received ID : {}", id); - let session_request = match session_request { - Ok(sessreq) => sessreq, - Err(e) => { - println!("Error accepting session request: {:?}", e); - continue ; - } - }; - println!( - "New session: Authority: '{}', Path: '{}'", - session_request.authority(), - session_request.path() - ); - - let connection = session_request.accept().await.unwrap(); - - println!("Waiting for data from client..."); - - executor::spawn(async move { - loop { - tokio::select! { - // stream = connection.accept_bi() => { - // match stream { - // Ok(mut stream) => { - - // println!("Accepted BI stream"); - // let bytes_read = stream.1.read(&mut buffer).await.unwrap().unwrap(); - // let str_data = std::str::from_utf8(&buffer[..bytes_read]).unwrap(); - - // println!("Received (bi) '{str_data}' from client"); - - // stream.0.write_all(b"ACK").await.unwrap(); - // }, - // Err(e) => { - // break ; - // } - // }; - - // } - // stream = connection.accept_uni() => { - // // let mut stream = stream; - // match stream { - // Ok(mut stream) => { - // println!("Accepted UNI stream"); - // let bytes_read = match stream.read(&mut buffer).await.unwrap() { - // Some(bytes_read) => bytes_read, - // None => continue, - // }; - - // let str_data = std::str::from_utf8(&buffer[..bytes_read]).unwrap(); - - // println!("Received (uni) '{str_data}' from client"); - - // let mut stream = connection.open_uni().await.unwrap().await.unwrap(); - // stream.write_all(b"ACK").await.unwrap(); - // }, - // _ => { - // // println!("Error accepting UNI stream: {:?}", e); - // break ; - // } - // } - - // } - dgram = connection.receive_datagram() => { - match dgram { - Ok(dgram) => { - println!("Received datagram from client"); - let str_data = std::str::from_utf8(&dgram).unwrap(); - println!("Received (dgram) '{str_data}' from client"); - connection.send_datagram(b"ACK").unwrap(); - }, - _ => { - // break; - break ; - } - } - } - } - } - }).detach(); - - } -}); - -pub fn handle_incoming(&'static mut self, connection: Connection) { - // executor::spawn(async move { - // loop { - // let sender = self.datagram_ch_sender.as_ref().unwrap(); - // tokio::select! { - // // stream = connection.accept_bi() => { - // // match stream { - // // Ok(mut stream) => { - - // // println!("Accepted BI stream"); - // // let bytes_read = stream.1.read(&mut buffer).await.unwrap().unwrap(); - // // let str_data = std::str::from_utf8(&buffer[..bytes_read]).unwrap(); - - // // println!("Received (bi) '{str_data}' from client"); - - // // stream.0.write_all(b"ACK").await.unwrap(); - // // }, - // // Err(e) => { - // // break ; - // // } - // // }; - - // // } - // // stream = connection.accept_uni() => { - // // // let mut stream = stream; - // // match stream { - // // Ok(mut stream) => { - // // println!("Accepted UNI stream"); - // // let bytes_read = match stream.read(&mut buffer).await.unwrap() { - // // Some(bytes_read) => bytes_read, - // // None => continue, - // // }; - - // // let str_data = std::str::from_utf8(&buffer[..bytes_read]).unwrap(); - - // // println!("Received (uni) '{str_data}' from client"); - - // // let mut stream = connection.open_uni().await.unwrap().await.unwrap(); - // // stream.write_all(b"ACK").await.unwrap(); - // // }, - // // _ => { - // // // println!("Error accepting UNI stream: {:?}", e); - // // break ; - // // } - // // } - - // // } - // dgram = connection.receive_datagram() => { - // match dgram { - // Ok(dgram) => { - // sender.send_async(dgram).await.unwrap(); - // // println!("Received datagram from client"); - // // let str_data = std::str::from_utf8(&dgram).unwrap(); - // // println!("Received (dgram) '{str_data}' from client"); - // // connection.send_datagram(b"ACK").unwrap(); - // }, - // _ => { - // // break; - // break ; - // } - // } - // } - // } - // } - // }).detach(); - // println!("Waiting for session request..."); - // executor::spawn(async move { - // let receiver = self.conn_ch_receiver.as_mut().unwrap(); - // let next = { - // let to_client_receiver_next = receiver.recv_async().fuse(); - // let session_request = incoming_session.fuse(); - - // pin_mut!(to_client_receiver_next); - // pin_mut!(session_request); - // select! { - // from_client_result = session_request => { - // let sess_req = from_client_result.unwrap(); - // println!("Session request received {}", sess_req.authority()); - // Next::NewSessionRequest(sess_req) - // } - // } - // }; - // match next { - // Next::NewSessionRequest(sessreq) => { - // println!("Received datagram from client: {:?}", sessreq.authority()); - // } - - // } - // }).detach(); -} \ No newline at end of file diff --git a/src/oldlib.rs b/src/oldlib.rs deleted file mode 100644 index 171fe6a..0000000 --- a/src/oldlib.rs +++ /dev/null @@ -1,287 +0,0 @@ -use client::ClientConn; -use flume::Receiver; -use flume::Sender; -use futures_util::{pin_mut, select, FutureExt}; -use std::time::Duration; -use tokio::runtime::Runtime; -use wtransport::datagram::Datagram; -use wtransport::endpoint; - -use wtransport::endpoint::SessionRequest; -use wtransport::tls::Certificate; -use wtransport::Connection; -use wtransport::Endpoint; -use wtransport::ServerConfig; - -mod client; -mod executor; - -#[repr(C)] -pub struct ErrorMessage { - pub code: i32, - pub message: String, -} - -pub struct WebTransport { - /// The innter Server object - pub server: Option>, - // - conn_ch_sender: Option>, - conn_ch_receiver: Option>, - pub state: Option, -} -static mut SEND_FN: Option = None; -enum Next { - NewSessionRequest(Result), -} -impl WebTransport { - pub(crate) fn new( - port: u16, - sender_fn: Option, - _runtime: &mut Runtime, - ) -> Result { - unsafe { - SEND_FN = sender_fn; - }; - let _guard = _runtime.enter(); - - let config = ServerConfig::builder() - .with_bind_default(port) - .with_certificate(Certificate::load("cert.crt", "cert.key").unwrap()) - .keep_alive_interval(Some(Duration::from_secs(1))) - .build(); - - let (conn_sender, conn_reciever) = flume::unbounded(); - // let (datagram_sender, datagram_receiver) = flume::unbounded(); - - let server = match Endpoint::server(config) { - Ok(server) => server, - Err(e) => { - println!("Error creating server: {:?}", e); - return Err(1); - } - }; - // - // self.handle_sess_in(_runtime); - // - Ok(Self { - server: Some(server), - state: Some(true), - conn_ch_sender: Some(conn_sender), - conn_ch_receiver: Some(conn_reciever), - // datagram_ch_sender: Some(datagram_sender), - // datagram_ch_receiver: Some(datagram_receiver), - }) - } - - pub(crate) unsafe fn handle_sess_in(&'static mut self, runtime: *mut Runtime) { - println!("Waiting for session request..."); - let rt = runtime.as_mut().unwrap(); - rt.spawn(async move { - loop { - // let receiver = self.conn_ch_receiver.as_mut().unwrap(); - let sender = self.conn_ch_sender.as_ref().unwrap(); - let next = { - // let insess_receiver = receiver.recv_async().fuse(); - let incoming_session = self.server.as_mut().unwrap().accept().fuse(); - - // pin_mut!(sender); - pin_mut!(incoming_session); - select! { - result = incoming_session => { - let sess_req = result.await; - Next::NewSessionRequest(sess_req) - } - } - }; - match next { - Next::NewSessionRequest(sessreq) => { - match sessreq { - Ok(sessreq) => { - println!( - "Received Session Request from client: {:?}", - sessreq.authority() - ); - // self.handle_session_impl(sessreq.accept().await); - let conn = sessreq.accept().await.unwrap(); - println!( - "Accepted session request from client: {:?}", - conn.remote_address() - ); - - let _ = sender.send_async(conn).await; - // self.conn_ch_sender.as_mut().unwrap().send(conn).unwrap(); - } - Err(e) => { - println!("Error accepting session request: {:?}", e); - } - } - // insess_sender.send_async(sessreq).await.unwrap(); - } - } - } - }); - } -} - -#[no_mangle] -pub extern "C" fn init_runtime() -> *mut Runtime { - Box::into_raw(Box::new(Runtime::new().unwrap())) -} - -#[no_mangle] -pub unsafe extern "C" fn start( - send_func: Option, - res: *mut u32, - rt_ptr: *mut Runtime, -) -> *mut WebTransport { - assert!(!rt_ptr.is_null()); - assert!(!res.is_null()); - let _runtime = &mut *rt_ptr; - let server = WebTransport::new(4433, send_func, _runtime); - match server { - Ok(server) => { - let server_ptr = Box::into_raw(Box::new(server)); - *res = 0; - server_ptr - } - Err(_) => { - *res = 2; - std::ptr::null_mut() - } - } -} - -#[no_mangle] -pub unsafe extern "C" fn handle_session(server_ptr: *mut WebTransport, runtime: *mut Runtime) { - assert!(!server_ptr.is_null()); - assert!(!runtime.is_null()); - let server = &mut *server_ptr; - let _runtime = &mut *runtime; - server.handle_sess_in(_runtime); -} - -#[no_mangle] -pub unsafe extern "C" fn proc_rec(srv: *mut WebTransport) -> *mut ClientConn { - assert!(!srv.is_null()); - - let server = &mut *srv; - println!("DBG: RECEIVER READY"); - - match server.conn_ch_receiver.as_ref().unwrap().recv() { - Ok(conn) => { - println!("New client : {:?}", conn.remote_address()); - let connptr = Box::into_raw(Box::new(conn)); - let client_conn = ClientConn::new(connptr); - let clientptr = Box::into_raw(Box::new(client_conn)); - clientptr - } - _ => { - panic!("Error receiving connection"); - } - } -} - -//create a method that creates a unbound channel and returns a pointer to the sender/receiver pair to the FFI - -#[no_mangle] -pub unsafe extern "C" fn proc_rec_streams(client: *mut ClientConn) { - assert!(!client.is_null()); - let client = &mut *client; - let connection = client.get_conn(); - let sender = client.datagram_ch_sender.as_ref().unwrap(); - - println!("CONN RECEIVER PROC SET & READY"); - executor::spawn(async move { - let mut buffer = vec![0; 65536].into_boxed_slice(); - - loop { - // let sender = sender; - println!("Waiting for datagram from client"); - tokio::select! { - stream = connection.accept_bi() => { - match stream { - Ok(mut stream) => { - - println!("Accepted BI stream"); - let bytes_read = stream.1.read(&mut buffer).await.unwrap().unwrap(); - let str_data = std::str::from_utf8(&buffer[..bytes_read]).unwrap(); - - println!("Received (bi) '{str_data}' from client"); - - stream.0.write_all(b"ACK").await.unwrap(); - }, - Err(e) => { - break ; - } - }; - - } - stream = connection.accept_uni() => { - // let mut stream = stream; - match stream { - Ok(mut stream) => { - println!("Accepted UNI stream"); - let bytes_read = match stream.read(&mut buffer).await.unwrap() { - Some(bytes_read) => bytes_read, - None => continue, - }; - - let str_data = std::str::from_utf8(&buffer[..bytes_read]).unwrap(); - - println!("Received (uni) '{str_data}' from client"); - - let mut stream = connection.open_uni().await.unwrap().await.unwrap(); - stream.write_all(b"ACK").await.unwrap(); - }, - _ => { - // println!("Error accepting UNI stream: {:?}", e); - break ; - } - } - - } - dgram = connection.receive_datagram() => { - match dgram { - Ok(dgram) => { - - println!("Received datagram from client"); - sender.send(dgram).unwrap(); - // let str_data = std::str::from_utf8(&dgram).unwrap(); - // println!("Received (dgram) '{str_data}' from client"); - connection.send_datagram(b"ACK").unwrap(); - }, - _ => { - // break; - println!("Error receiving datagram"); - - } - } - }, - - } - } - }) - .detach(); -} - -#[no_mangle] -pub unsafe extern "C" fn proc_recv_ch_datagram(client: *mut ClientConn, buff: *mut u8) -> usize { - assert!(!client.is_null()); - let client = &mut *client; - - match client.datagram_ch_receiver.as_ref().unwrap().recv() { - Ok(dgram) => { - ::std::slice::from_raw_parts_mut(buff, dgram.len()).copy_from_slice(&dgram); - dgram.len() - } - Err(_) => 0, - } -} - -#[no_mangle] -pub unsafe extern "C" fn test_proc(client: *mut ClientConn) { - assert!(!client.is_null()); - let client = &mut *client; - println!("TEST PROC {} ", client.get_conn().remote_address()); -} diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..b3eeeff --- /dev/null +++ b/src/server.rs @@ -0,0 +1,235 @@ +use crate::{ + connection::{self, Conn, Server}, + RUNTIME, +}; +use std::{path::Path, time::Duration}; +use tokio::runtime::Runtime; +use wtransport::endpoint::endpoint_side::Server as endServer; +use wtransport::{tls::Certificate, Endpoint, ServerConfig}; + +pub struct WebTransportServer { + pub server: Option>, + pub state: Option, +} + +impl WebTransportServer { + pub(crate) unsafe fn new(config: ServerConfig) -> Result { + let _guard = RUNTIME.enter(); + + let server = match Endpoint::server(config) { + Ok(server) => server, + Err(e) => { + println!("Error creating server: {:?}", e); + return Err(1); + } + }; + + Ok(Self { + server: Some(server), + state: Some(true), + }) + } + + pub(crate) async unsafe fn handle_sess_in(&mut self) -> Result<*mut Conn, u32> { + let incoming_session = self.server.as_mut(); + match incoming_session { + Some(incoming_session) => { + let session_request = incoming_session.accept().await; + + let accepted_session = match session_request.await { + Ok(session_request) => { + let client = Conn::::new(session_request); + Ok(client) + } + Err(e) => Err(e), + }; + match accepted_session { + Ok(mut sess) => match sess.accept().await { + Ok(conn) => { + sess.accepted(conn); + let client_ptr = Box::into_raw(Box::new(sess)); + Ok(client_ptr) + } + Err(e) => { + println!("Error accepting connection : {}", e.to_string()); + + Err(0) + } + }, + Err(error) => { + println!("Error accepting session : {}", error.to_string()); + Err(0) + } + } + } + None => { + println!("Server endpoint is None (should be closed by now and not be called.."); + return Err(0); + } + } + } +} + +#[no_mangle] +pub unsafe extern "C" fn proc_server_init( + port: u16, + migration: bool, + keepalive: u64, + timeout: u64, + cert_path: *const u8, + cert_path_len: usize, + key_path: *const u8, + key_path_len: usize, +) -> *mut WebTransportServer { + assert!(port > 0); + + let cert_path = ::std::slice::from_raw_parts(cert_path, cert_path_len); + let key_path = ::std::slice::from_raw_parts(key_path, key_path_len); + let cert_path = Path::new(std::str::from_utf8(cert_path).unwrap()); + let key_path = Path::new(std::str::from_utf8(key_path).unwrap()); + + let certificates = Certificate::load(cert_path, key_path).unwrap(); + + let keepalive = if keepalive == 0 { + None + } else { + Some(Duration::from_secs(keepalive)) + }; + let timeout = if timeout == 0 { + None + } else { + Some(Duration::from_secs(timeout)) + }; + //print the paths for debug + let config = ServerConfig::builder() + .with_bind_config(wtransport::config::IpBindConfig::InAddrAnyDual, port) + .with_certificate(certificates) + .keep_alive_interval(keepalive) + .max_idle_timeout(timeout) + .unwrap() + .allow_migration(migration) + .build(); + let server = WebTransportServer::new(config); + + match server { + Ok(server) => Box::into_raw(Box::new(server)), + Err(_) => std::ptr::null_mut(), + } +} + +#[no_mangle] +pub unsafe extern "C" fn proc_server_listen( + server_ptr: *mut WebTransportServer, + cb: extern "C" fn(*mut Conn), +) { + assert!(!server_ptr.is_null()); + let server = &mut *server_ptr; + + RUNTIME.spawn(async move { + loop { + match server.state { + Some(true) => {} + Some(false) => { + println!("Server state is false, exiting"); + return; + } + None => { + println!("Server state is None, exiting"); + return; + } + } + match server.handle_sess_in().await { + Ok(conn) => { + cb(conn); + } + Err(e) => { + println!("Error accepting sess in : {}", e); + } + } + } + }); +} +#[no_mangle] +pub extern "C" fn proc_server_client_authority( + conn: *mut Conn, + buflen: *mut u32, +) -> *const u8 { + assert!(!conn.is_null()); + let conn = unsafe { &mut *conn }; + let authority = conn.authority(); + unsafe { + *buflen = authority.len() as u32; + } + authority.as_ptr() +} + +#[no_mangle] +pub extern "C" fn proc_server_client_headers( + conn: *mut Conn, + buflen: *mut u32, +) -> *const u8 { + assert!(!conn.is_null()); + let conn = unsafe { &mut *conn }; + let mut json = serde_json::to_string(&conn.headers()).unwrap(); + unsafe { + *buflen = json.len() as u32; + } + json.push('\0'); + json.as_ptr() +} + +#[no_mangle] +pub extern "C" fn proc_server_client_path( + conn: *mut Conn, + buflen: *mut u32, +) -> *const u8 { + assert!(!conn.is_null()); + let conn = unsafe { &mut *conn }; + let path = conn.path(); + let mut path = path.to_string(); + unsafe { + *buflen = path.len() as u32; + } + path.push('\0'); + path.as_ptr() +} + +#[no_mangle] +pub unsafe extern "C" fn proc_server_close(server_ptr: *mut WebTransportServer) -> usize { + assert!(!server_ptr.is_null()); + let server = &mut *server_ptr; + let endpoint = server.server.as_mut(); + match endpoint { + Some(endpoint) => { + endpoint.close(30, b"Server closing"); + } + None => println!("Error closing server"), + } + 0 +} +#[no_mangle] +pub unsafe extern "C" fn proc_server_close_clients(server_ptr: *mut WebTransportServer) -> usize { + assert!(!server_ptr.is_null()); + + let server = &mut *server_ptr; + server.state = Some(false); + let endpoint = server.server.as_mut(); + match endpoint { + Some(endpoint) => { + RUNTIME.block_on(async move { + endpoint.wait_idle().await; + }); + } + None => println!("Error closing clients connections"), + } + 0 +} +//free all above once +#[no_mangle] +pub unsafe extern "C" fn free_all_server(_a: *mut WebTransportServer, _c: *mut Runtime) {} + +#[no_mangle] +pub unsafe extern "C" fn free_server(_v: *mut WebTransportServer) { + let _s = &mut *_v; + drop(_s.server.take()); +} diff --git a/src/shared.rs b/src/shared.rs new file mode 100644 index 0000000..764e7ee --- /dev/null +++ b/src/shared.rs @@ -0,0 +1,382 @@ +use crate::{ + connection::{Conn, Server}, + RUNTIME, +}; + +use std::slice::from_raw_parts_mut; +use tokio::runtime::Runtime; +use wtransport::{error::ConnectionError, RecvStream, SendStream}; + +//impl a way to identify ConnectionError with a number for JS +pub struct ConnectionErrorWrapper(pub ConnectionError); + +// Implement the From trait for the new type +impl From for u32 { + fn from(wrapper: ConnectionErrorWrapper) -> Self { + match wrapper.0 { + ConnectionError::TimedOut => 170, + ConnectionError::ApplicationClosed(_) => 171, + ConnectionError::ConnectionClosed(_) => 172, + ConnectionError::LocalH3Error(_) => 173, + ConnectionError::QuicProto => 174, + ConnectionError::LocallyClosed => 175, + } + } +} + +unsafe fn send_error(code: u32, message: String, errorcb: extern "C" fn(u32, *mut u8, u32)) { + let mut msg = message; + errorcb(code, msg.as_mut_ptr(), msg.len() as u32); +} + +#[repr(C)] +pub struct BidiStreams { + pub send: Option, + pub recv: Option, +} + +/// Send a datagram +/// Error Codes : +/// 161 : Connection closed +/// 162 : Too large +/// 163 : Not supported by peer +/// 164 : Other error +#[no_mangle] +pub unsafe extern "C" fn proc_send_datagram( + connptr: *mut Conn, + buf: *const u8, + buflen: u32, + errorcb: extern "C" fn(u32, *mut u8, u32), +) { + assert!(!connptr.is_null()); + + let client = &mut *connptr; + let buf = ::std::slice::from_raw_parts(buf, buflen as usize); + let conn = client.conn.as_ref().unwrap(); + match conn.send_datagram(buf) { + Ok(_) => {} + Err(err) => { + send_error(200, err.to_string(), errorcb); + //TODO: Handle error better + // match err { + // SendDatagramError::NotConnected => { + // println!("DBG: Rust Connection closed"); + // SEND_FN.unwrap()(161, vec![0].as_mut_ptr(), 1); + // } + // SendDatagramError::TooLarge => { + // println!("DBG: Rust Too large"); + // SEND_FN.unwrap()(162, vec![0].as_mut_ptr(), 1); + // } + // SendDatagramError::UnsupportedByPeer => { + // println!("DBG: Rust not supported by peer"); + // SEND_FN.unwrap()(163, vec![0].as_mut_ptr(), 1); + // } + // }; + } + } +} + +/// Receive a datagram. +/// Error Codes : +/// 170 : Connection closed +/// 171 : ApplicationClosed +/// 172 : ConnectionClosed +/// 173 : LocalH3Error +/// 174 : QuicProto +/// 175 : LocallyClosed +#[no_mangle] +pub unsafe extern "C" fn proc_recv_datagram( + conn_ptr: *mut Conn, + buff: *mut u8, + errorcb: extern "C" fn(u32, *mut u8, u32), +) -> usize { + assert!(!conn_ptr.is_null()); + + let client = &mut *conn_ptr; + + match client.read_datagram() { + Ok(dgram) => { + from_raw_parts_mut(buff, dgram.len()).clone_from_slice(&dgram); + dgram.len() + } + Err(error) => { + let message = error.to_string(); + send_error(ConnectionErrorWrapper(error).into(), message, errorcb); + 0 + } + } +} + +/// Open a bidirectional stream. +/// Error Codes : +/// 150 : Connection closed +#[no_mangle] +pub unsafe extern "C" fn proc_open_bi( + connptr: *mut Conn, + errorcb: extern "C" fn(u32, *mut u8, u32), +) -> *mut BidiStreams { + assert!(!connptr.is_null()); + + let _client = &mut *connptr; + let stream = _client.open_bi(); + match stream { + Ok((send, recv)) => { + let bidi = BidiStreams { + send: Some(send), + recv: Some(recv), + }; + Box::into_raw(Box::new(bidi)) + } + Err(error) => { + let message = error.to_string(); + send_error(ConnectionErrorWrapper(error).into(), message, errorcb); + std::ptr::null_mut() + } + } +} + +/// Open a unidirectional stream. +#[no_mangle] +pub unsafe extern "C" fn proc_open_uni( + connptr: *mut Conn, + errorcb: extern "C" fn(u32, *mut u8, u32), +) -> *mut BidiStreams { + assert!(!connptr.is_null()); + + let _client = &mut *connptr; + let stream = _client.open_uni(); + match stream { + Ok(stream) => Box::into_raw(Box::new(BidiStreams { + send: Some(stream), + recv: None, + })), + Err(error) => { + let message = error.to_string(); + send_error(ConnectionErrorWrapper(error).into(), message, errorcb); + std::ptr::null_mut() + } + } +} + +/// Accept a unidirectional stream. +#[no_mangle] +pub unsafe extern "C" fn proc_accept_uni( + connptr: *mut Conn, + errorcb: extern "C" fn(u32, *mut u8, u32), +) -> *mut BidiStreams { + assert!(!connptr.is_null()); + + let _client = &mut *connptr; + let stream = _client.accept_uni(); + match stream { + Ok(stream) => Box::into_raw(Box::new(BidiStreams { + send: None, + recv: Some(stream), + })), + Err(err) => { + let msg = err.to_string(); + send_error(ConnectionErrorWrapper(err).into(), msg, errorcb); + std::ptr::null_mut() + } + } +} + +/// Accept a bidirectional stream. +#[no_mangle] +pub unsafe extern "C" fn proc_accept_bi( + connptr: *mut Conn, + errorcb: extern "C" fn(u32, *mut u8, u32), +) -> *mut BidiStreams { + assert!(!connptr.is_null()); + + let _client = &mut *connptr; + let stream = _client.accept_bi(); + match stream { + Ok((send, recv)) => { + let bidi = BidiStreams { + send: Some(send), + recv: Some(recv), + }; + Box::into_raw(Box::new(bidi)) + } + Err(err) => { + let msg = err.to_string(); + send_error(ConnectionErrorWrapper(err).into(), msg, errorcb); + std::ptr::null_mut() + } + } +} + +///Write to a stream +#[no_mangle] +pub unsafe extern "C" fn proc_write( + stream_ptr: *mut BidiStreams, + buf: *const u8, + buflen: usize, + errorcb: extern "C" fn(u32, *mut u8, u32), +) -> usize { + assert!(!stream_ptr.is_null()); + assert!(buflen > 0); + + let bidi_streams = &mut *stream_ptr; + let buf = ::std::slice::from_raw_parts(buf, buflen); + let writer = bidi_streams.send.as_mut().unwrap(); + let writenlen = RUNTIME.block_on(async move { + match writer.write(buf).await { + Ok(len) => len, + Err(err) => { + let str = err.to_string(); + send_error(153, str, errorcb); + // SEND_FN.unwrap()(153, str.as_mut_ptr(), str.len() as u32); + 0 + } + } + }); + writenlen +} + +#[no_mangle] +pub unsafe extern "C" fn proc_write_all( + stream_ptr: *mut BidiStreams, + buf: *const u8, + buflen: usize, + errorcb: extern "C" fn(u32, *mut u8, u32), +) -> usize { + assert!(!stream_ptr.is_null()); + assert!(buflen > 0); + let stream = &mut *stream_ptr; + let buf = ::std::slice::from_raw_parts(buf, buflen); + let writer = stream.send.as_mut().unwrap(); + let writenlen = RUNTIME.block_on(async move { + match writer.write_all(buf).await { + Ok(_) => buflen, + Err(err) => { + let str = err.to_string(); + send_error(153, str, errorcb); + // SEND_FN.unwrap()(153, str.as_mut_ptr(), str.len() as u32); + 0 + } + } + }); + writenlen +} + +/// Read from a stream +/// +/// Warning : We should always provide valid pointer from JS, if we dont we will crash for safety +/// Rust will automatically panic for any invalid pointer +#[no_mangle] +pub unsafe extern "C" fn proc_read( + stream_ptr: *mut BidiStreams, + buf: *mut u8, + buflen: usize, + errorcb: extern "C" fn(u32, *mut u8, u32), +) -> usize { + assert!(!stream_ptr.is_null()); + assert!(buflen > 0); + + let stream = &mut *stream_ptr; + let buf = ::std::slice::from_raw_parts_mut(buf, buflen); + let readlen = RUNTIME.block_on(async move { + match stream.recv.as_mut().unwrap().read(buf).await { + Ok(len) => len, + Err(err) => { + let strs = err.to_string(); + send_error(154, strs, errorcb); + None + } + } + }); + match readlen { + Some(len) => len, + None => 0, + } +} + +/// Get a rescvstream id +#[no_mangle] +pub unsafe extern "C" fn proc_recvstream_id(stream_ptr: *mut BidiStreams) -> u64 { + assert!(!stream_ptr.is_null()); + let stream = &mut *stream_ptr; + stream.recv.as_mut().unwrap().id().into_u64() +} + +/// Get a sendstream id +#[no_mangle] +pub unsafe extern "C" fn proc_sendstream_id(stream_ptr: *mut BidiStreams) -> u64 { + assert!(!stream_ptr.is_null()); + let stream = &mut *stream_ptr; + stream.send.as_mut().unwrap().id().into_u64() +} + +/// Close a send stream. +#[no_mangle] +pub unsafe extern "C" fn proc_sendstream_finish(stream_ptr: *mut BidiStreams) -> u32 { + assert!(!stream_ptr.is_null()); + let stream = &mut *stream_ptr; + let sendstream = stream.send.as_mut().unwrap(); + RUNTIME.block_on(async move { + match sendstream.finish().await { + Ok(_) => 1, + Err(_err) => { + // let mut msg = err.to_string(); + // SEND_FN.unwrap()(150, msg.as_mut_ptr(), msg.len() as u32); + 0 + } + } + }) +} + +#[no_mangle] +pub unsafe extern "C" fn proc_recvtream_stop(stream_ptr: *mut BidiStreams) -> u32 { + assert!(!stream_ptr.is_null()); + let stream = &mut *stream_ptr; + RUNTIME.block_on(async move { + match stream.recv.as_mut().unwrap().stop(0).await { + Ok(_) => { + // drop(stream_ptr); + 1 + } + Err(_) => { + // SEND_FN.unwrap()(158, std::ptr::null_mut(), 0); + 0 + } + } + }) +} +#[no_mangle] +pub unsafe extern "C" fn proc_sendstream_priority(stream_ptr: *mut BidiStreams) -> i32 { + assert!(!stream_ptr.is_null()); + let stream = &mut *stream_ptr; + stream.send.as_ref().unwrap().priority() +} + +/// +#[no_mangle] +pub unsafe extern "C" fn proc_sendstream_set_priority( + stream_ptr: *mut BidiStreams, + priority: i32, +) -> i32 { + assert!(!stream_ptr.is_null()); + let stream = &mut *stream_ptr; + stream.send.as_ref().unwrap().set_priority(priority); + priority +} + +//add all the methods to get authority, headers, etc +#[no_mangle] +pub unsafe extern "C" fn free_streams(stream_ptr: *mut BidiStreams) { + let _stream = &mut *stream_ptr; + drop(_stream.send.take()); + drop(_stream.recv.take()); +} +#[no_mangle] +pub unsafe extern "C" fn proc_closed(conn: *mut Conn) { + let _conn = &mut *conn; + RUNTIME.block_on(async move { _conn.closed().await }); +} +#[no_mangle] +pub unsafe extern "C" fn free_conn(_: *mut Conn) {} + +#[no_mangle] +pub unsafe extern "C" fn free_runtime(_: *mut Runtime) {} diff --git a/utils/download_lib.ts b/utils/download_lib.ts new file mode 100644 index 0000000..520d075 --- /dev/null +++ b/utils/download_lib.ts @@ -0,0 +1,47 @@ +import "https://deno.land/std@0.201.0/dotenv/load.ts"; +const LIB_NAME = "webtransport"; +let LIB_URL: URL | undefined; + +if (!Deno.env.has("DEVELOPMENT") && !Deno.env.has("CI_BUILD")) { + const headers = new Headers(); + if (Deno.env.has("DENO_AUTH_TOKENS")) { + headers.set( + "Authorization", + `token ${ + Deno.env.get("DENO_AUTH_TOKENS")!.split("@")[0].split( + ":", + )[1] + }`, + ); + } + headers.set("Accept", "application/vnd.github.v3+json"); + const data = await fetch( + `https://api.github.com/repos/hironichu/${LIB_NAME}/releases`, + { + headers: headers, + }, + ); + + const json = await data.json() as Array<{ + assets: { + [key: string]: Array<{ + url: string; + }>; + }; + }>; + if (json.length === 0) { + throw new Error("No release found"); + } + LIB_URL = new URL( + json[0] + .assets[ + `${LIB_NAME}_${Deno.build.os}_${Deno.build.arch}` + ][0].url, + ); + LIB_URL!.username = Deno.env.has("DENO_AUTH_TOKENS") + ? Deno.env.get("DENO_AUTH_TOKENS")!.split("@")[0]! + : ""; +} +const build_target = Deno.env.get("DEVELOPMENT") ? "debug" : "release"; +LIB_URL = LIB_URL ?? new URL(`../target/${build_target}/`, import.meta.url); +export default LIB_URL;