From 7f261a1f18df6bf78556e6a97310fc3cd7e012a3 Mon Sep 17 00:00:00 2001 From: rgallor Date: Thu, 18 May 2023 18:42:58 +0200 Subject: [PATCH 01/11] Start developing a websocket Signed-off-by: rgallor --- Cargo.lock | 607 ++++++++++++++++++++++++++++++++++- rust-remote-shell/Cargo.toml | 3 + rust-remote-shell/src/lib.rs | 80 ++++- 3 files changed, 687 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e501ffe..cf1a6ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -32,7 +32,7 @@ version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" dependencies = [ - "hermit-abi", + "hermit-abi 0.1.19", "libc", "winapi", ] @@ -64,6 +64,27 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + +[[package]] +name = "byteorder" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" + +[[package]] +name = "bytes" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" + [[package]] name = "cc" version = "1.0.79" @@ -152,6 +173,41 @@ dependencies = [ "tracing-error", ] +[[package]] +name = "cpufeatures" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e4c1eaa2012c47becbbad2ab175484c2a84d1185b566fb2cc5b8707343dfe58" +dependencies = [ + "libc", +] + +[[package]] +name = "crypto-common" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" +dependencies = [ + "generic-array", + "typenum", +] + +[[package]] +name = "data-encoding" +version = "2.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23d8666cb01533c39dde32bcbab8e227b4ed6679b2c925eba05feabea39508fb" + +[[package]] +name = "digest" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8168378f4e5023e7218c89c891c0fd8ecdb5e5e4f18cb78f38cf245dd021e76f" +dependencies = [ + "block-buffer", + "crypto-common", +] + [[package]] name = "eyre" version = "0.6.8" @@ -162,6 +218,86 @@ dependencies = [ "once_cell", ] +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + +[[package]] +name = "form_urlencoded" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9c384f161156f5260c24a097c56119f9be8c798586aecc13afbcbe7b7e26bf8" +dependencies = [ + "percent-encoding", +] + +[[package]] +name = "futures-core" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" + +[[package]] +name = "futures-macro" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.16", +] + +[[package]] +name = "futures-sink" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e" + +[[package]] +name = "futures-task" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" + +[[package]] +name = "futures-util" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" +dependencies = [ + "futures-core", + "futures-macro", + "futures-sink", + "futures-task", + "pin-project-lite", + "pin-utils", + "slab", +] + +[[package]] +name = "generic-array" +version = "0.14.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" +dependencies = [ + "typenum", + "version_check", +] + +[[package]] +name = "getrandom" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c85e1d9ab2eadba7e5040d4e09cbd6d072b76a557ad64e797c2cb9d4da21d7e4" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + [[package]] name = "gimli" version = "0.27.2" @@ -189,6 +325,42 @@ dependencies = [ "libc", ] +[[package]] +name = "hermit-abi" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7" +dependencies = [ + "libc", +] + +[[package]] +name = "http" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd6effc99afb63425aff9b05836f029929e345a6148a14b7ecd5ab67af944482" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + +[[package]] +name = "httparse" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904" + +[[package]] +name = "idna" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e14ddfc70884202db2244c223200c204c2bda1bc6e0998d11b5e024d657209e6" +dependencies = [ + "unicode-bidi", + "unicode-normalization", +] + [[package]] name = "indenter" version = "0.3.3" @@ -205,6 +377,12 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "itoa" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "453ad9f582a441959e5f0d088b02ce04cfe8d51a8eaf077f12ac6d3e94164ca6" + [[package]] name = "lazy_static" version = "1.4.0" @@ -217,6 +395,25 @@ version = "0.2.144" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b00cc1c228a6782d0f076e7b232802e0c5689d41bb5df366f2a6b6621cfdfe1" +[[package]] +name = "lock_api" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "435011366fe56583b16cf956f9df0095b405b82d76425bc8981c0e22e60ec4df" +dependencies = [ + "autocfg", + "scopeguard", +] + +[[package]] +name = "log" +version = "0.4.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" +dependencies = [ + "cfg-if", +] + [[package]] name = "memchr" version = "2.5.0" @@ -232,6 +429,28 @@ dependencies = [ "adler", ] +[[package]] +name = "mio" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b9d9a46eff5b4ff64b45a9e316a6d1e0bc719ef429cbec4dc630684212bfdf9" +dependencies = [ + "libc", + "log", + "wasi", + "windows-sys 0.45.0", +] + +[[package]] +name = "num_cpus" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fac9e2da13b5eb447a6ce3d392f23a29d8694bff781bf03a16cd9ac8697593b" +dependencies = [ + "hermit-abi 0.2.6", + "libc", +] + [[package]] name = "object" version = "0.30.3" @@ -259,12 +478,53 @@ version = "3.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f" +[[package]] +name = "parking_lot" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9069cbb9f99e3a5083476ccb29ceb1de18b9118cafa53e90c9551235de2b9521" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-sys 0.45.0", +] + +[[package]] +name = "percent-encoding" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e" + [[package]] name = "pin-project-lite" version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "ppv-lite86" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -307,6 +567,45 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] + +[[package]] +name = "redox_syscall" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" +dependencies = [ + "bitflags", +] + [[package]] name = "regex" version = "1.7.3" @@ -328,7 +627,10 @@ checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" name = "rust-remote-shell" version = "0.1.0" dependencies = [ + "futures-util", "thiserror", + "tokio", + "tokio-tungstenite", ] [[package]] @@ -337,6 +639,23 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +[[package]] +name = "scopeguard" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" + +[[package]] +name = "sha1" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f04293dc80c3993519f2d7f6f511707ee7094fe0c6d3406feb330cdb3540eba3" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sharded-slab" version = "0.1.4" @@ -356,6 +675,40 @@ dependencies = [ "regex", ] +[[package]] +name = "signal-hook-registry" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" +dependencies = [ + "libc", +] + +[[package]] +name = "slab" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6528351c9bc8ab22353f9d776db39a20288e8d6c37ef8cfe3317cf875eecfc2d" +dependencies = [ + "autocfg", +] + +[[package]] +name = "smallvec" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0" + +[[package]] +name = "socket2" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64a4a911eed85daf18834cfaa86a79b7d266ff93ff5ba14005426219480ed662" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "strsim" version = "0.10.0" @@ -429,6 +782,63 @@ dependencies = [ "once_cell", ] +[[package]] +name = "tinyvec" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" + +[[package]] +name = "tokio" +version = "1.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0aa32867d44e6f2ce3385e89dceb990188b8bb0fb25b0cf576647a6f98ac5105" +dependencies = [ + "autocfg", + "bytes", + "libc", + "mio", + "num_cpus", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "socket2", + "tokio-macros", + "windows-sys 0.48.0", +] + +[[package]] +name = "tokio-macros" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.16", +] + +[[package]] +name = "tokio-tungstenite" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec509ac96e9a0c43427c74f003127d953a265737636129424288d27cb5c4b12c" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tracing" version = "0.1.37" @@ -471,12 +881,69 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tungstenite" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15fba1a6d6bb030745759a9a2a588bfe8490fc8b4751a277db3a0be1c9ebbf67" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "url", + "utf-8", +] + +[[package]] +name = "typenum" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" + +[[package]] +name = "unicode-bidi" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92888ba5573ff080736b3648696b70cafad7d250551175acbaa4e0385b3e1460" + [[package]] name = "unicode-ident" version = "1.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5464a87b239f13a63a501f2701565754bae92d243d4bb7eb12f6d57d2269bf4" +[[package]] +name = "unicode-normalization" +version = "0.1.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c5713f0fc4b5db668a2ac63cdb7bb4469d8c9fed047b1d0292cc7b0ce2ba921" +dependencies = [ + "tinyvec", +] + +[[package]] +name = "url" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d68c799ae75762b8c3fe375feb6600ef5602c883c5d21eb51c09f22b83c4643" +dependencies = [ + "form_urlencoded", + "idna", + "percent-encoding", +] + +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "valuable" version = "0.1.0" @@ -489,6 +956,12 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + [[package]] name = "winapi" version = "0.3.9" @@ -519,3 +992,135 @@ name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "windows-sys" +version = "0.45.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" +dependencies = [ + "windows-targets 0.42.2", +] + +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets 0.48.0", +] + +[[package]] +name = "windows-targets" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071" +dependencies = [ + "windows_aarch64_gnullvm 0.42.2", + "windows_aarch64_msvc 0.42.2", + "windows_i686_gnu 0.42.2", + "windows_i686_msvc 0.42.2", + "windows_x86_64_gnu 0.42.2", + "windows_x86_64_gnullvm 0.42.2", + "windows_x86_64_msvc 0.42.2", +] + +[[package]] +name = "windows-targets" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b1eb6f0cd7c80c79759c929114ef071b87354ce476d9d94271031c0497adfd5" +dependencies = [ + "windows_aarch64_gnullvm 0.48.0", + "windows_aarch64_msvc 0.48.0", + "windows_i686_gnu 0.48.0", + "windows_i686_msvc 0.48.0", + "windows_x86_64_gnu 0.48.0", + "windows_x86_64_gnullvm 0.48.0", + "windows_x86_64_msvc 0.48.0", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3" + +[[package]] +name = "windows_i686_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" + +[[package]] +name = "windows_i686_gnu" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241" + +[[package]] +name = "windows_i686_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" + +[[package]] +name = "windows_i686_msvc" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" diff --git a/rust-remote-shell/Cargo.toml b/rust-remote-shell/Cargo.toml index bbad59d..ac069f1 100644 --- a/rust-remote-shell/Cargo.toml +++ b/rust-remote-shell/Cargo.toml @@ -8,4 +8,7 @@ edition = "2021" [lib] [dependencies] +futures-util = "0.3.28" thiserror = "1.0.40" +tokio = { version = "1.28.1", features = ["full"] } +tokio-tungstenite = "0.19.0" diff --git a/rust-remote-shell/src/lib.rs b/rust-remote-shell/src/lib.rs index 8d9922d..c470e47 100644 --- a/rust-remote-shell/src/lib.rs +++ b/rust-remote-shell/src/lib.rs @@ -1,9 +1,12 @@ -use std::ffi::OsStr; +use futures_util::StreamExt; // future, TryStreamExt use std::io; +use std::net::SocketAddr; +//use std::os::unix::prelude::OsStrExt; +use std::ffi::OsStr; // , ffi::OsString, collections::VecDeque, use std::process; use std::string::FromUtf8Error; - use thiserror::Error; +use tokio::net::{TcpListener, TcpStream}; #[derive(Error, Debug)] pub enum ShellError { @@ -51,3 +54,76 @@ where // if the conversion from UTF8 to String goes wrong, return an error .map_err(ShellError::WrongOutConversion) } + +// type Command = OsString; + +#[derive(Error, Debug)] +pub enum DeviceServerError { + #[error("Failed to bind on port {0}")] + BindError(u16), + #[error("Connected streams should have a peer address")] + PeerAddrError, + #[error("Error during the websocket handshake occurred")] + WebSocketHandshakeError, +} + +pub struct DeviceServer { + addr: SocketAddr, + // queue used to store incoming commands in case multiple commands are sent to the device while one is processed. + // received_commands: VecDeque, +} + +impl DeviceServer { + pub fn new(addr: SocketAddr) -> Self { + Self { + addr, + //received_commands: VecDeque::new(), + } + } + + pub async fn listen(&self) -> Result<(), DeviceServerError> { + let try_socket = TcpListener::bind(self.addr).await; + let socket = try_socket.map_err(|_| DeviceServerError::BindError(self.addr.port()))?; + + println!("Listening on: {}", self.addr); + + // accept a new connection + while let Ok((stream, _)) = socket.accept().await { + tokio::spawn(handle_connection(stream)); + } + + Ok(()) + } +} + +// create a websocket connection and +async fn handle_connection(stream: TcpStream) -> Result<(), DeviceServerError> { + let addr = stream + .peer_addr() + .map_err(|_| DeviceServerError::PeerAddrError)?; + + // create a WebSocket connection + let try_web_socket_stream = tokio_tungstenite::accept_async(stream).await; + let web_socket_stream = + try_web_socket_stream.map_err(|_| DeviceServerError::WebSocketHandshakeError)?; + + println!("New WebSocket connection created: {}", addr); + + // separate ownership between receiving and writing part + let (_write, _read) = web_socket_stream.split(); + + // 0. decomment imports & type Command & remove _ + // 1. read the received command + // 2. convert it from a Vec into a OsString --> OsStr::from_bytes(&Vec).to_owned() + // 3. send the output to the client (either Ok or ShellError) + // 4. IMPLEMENT CLIENT + + /* + read.try_filter(|msg| future::ready(msg.is_text() || msg.is_binary())) + .forward(write) + .await + .expect("Failed to forward messages") + */ + + Ok(()) +} From 62ddcea2697e8ff0d6103a496aeab7a05f145f9d Mon Sep 17 00:00:00 2001 From: rgallor Date: Fri, 19 May 2023 16:03:46 +0200 Subject: [PATCH 02/11] Implement server functionality. Implement server functionality capable of creating new connections using websocket and execuring shell commands. Partially implement error handling. Signed-off-by: rgallor --- Cargo.lock | 2 + cli/Cargo.toml | 1 + cli/src/main.rs | 18 ++++- rust-remote-shell/Cargo.toml | 1 + rust-remote-shell/src/lib.rs | 133 +++++++++++++++++++++++++---------- 5 files changed, 117 insertions(+), 38 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cf1a6ad..1d6d0cd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -144,6 +144,7 @@ dependencies = [ "color-eyre", "rust-remote-shell", "shellwords", + "tokio", ] [[package]] @@ -628,6 +629,7 @@ name = "rust-remote-shell" version = "0.1.0" dependencies = [ "futures-util", + "shellwords", "thiserror", "tokio", "tokio-tungstenite", diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 0548d4c..109d46e 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -10,3 +10,4 @@ rust-remote-shell = { path = "../rust-remote-shell" } # cli depends on rust-remo clap = { version = "3.2.25", features = ["derive"] } color-eyre = "0.6.2" shellwords = "1.1.0" +tokio = "1.28.1" diff --git a/cli/src/main.rs b/cli/src/main.rs index 3dbb539..6c2a20d 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -1,8 +1,9 @@ -use std::ops::Deref; +use std::{net::SocketAddr, ops::Deref}; use clap::{Parser, Subcommand}; use color_eyre::Result; +use rust_remote_shell::DeviceServer; /// CLI for a rust remote shell #[derive(Debug, Parser)] @@ -16,9 +17,14 @@ struct Cli { enum Commands { /// Execute a command Command { cmd: String }, + /// Make the device listen on a specific IP and port + Listener { addr: SocketAddr }, + /// Create a client capable of sending command to a Listener + Sender { addr: SocketAddr }, } -fn main() -> Result<()> { +#[tokio::main] +async fn main() -> Result<()> { color_eyre::install()?; let cli = Cli::parse(); @@ -34,6 +40,14 @@ fn main() -> Result<()> { let cmd_out = rust_remote_shell::cmd_from_input(cmd.deref())?; println!("Command output: {}", cmd_out); } + Commands::Listener { addr } => { + let device_server = DeviceServer::new(*addr); + device_server.listen().await?; + } + Commands::Sender { addr: _ } => { + todo!(); + //let sender_client = SenderClient::new(addr); + } } Ok(()) diff --git a/rust-remote-shell/Cargo.toml b/rust-remote-shell/Cargo.toml index ac069f1..5b98982 100644 --- a/rust-remote-shell/Cargo.toml +++ b/rust-remote-shell/Cargo.toml @@ -9,6 +9,7 @@ edition = "2021" [dependencies] futures-util = "0.3.28" +shellwords = "1.1.0" thiserror = "1.0.40" tokio = { version = "1.28.1", features = ["full"] } tokio-tungstenite = "0.19.0" diff --git a/rust-remote-shell/src/lib.rs b/rust-remote-shell/src/lib.rs index c470e47..ed0f162 100644 --- a/rust-remote-shell/src/lib.rs +++ b/rust-remote-shell/src/lib.rs @@ -1,7 +1,10 @@ -use futures_util::StreamExt; // future, TryStreamExt +use futures_util::{future, StreamExt, TryStreamExt}; use std::io; use std::net::SocketAddr; -//use std::os::unix::prelude::OsStrExt; +use std::sync::Arc; +use tokio::sync::Mutex; +use tokio_tungstenite::tungstenite::Message; // future, TryStreamExt + //use std::os::unix::prelude::OsStrExt; use std::ffi::OsStr; // , ffi::OsString, collections::VecDeque, use std::process; use std::string::FromUtf8Error; @@ -55,8 +58,6 @@ where .map_err(ShellError::WrongOutConversion) } -// type Command = OsString; - #[derive(Error, Debug)] pub enum DeviceServerError { #[error("Failed to bind on port {0}")] @@ -65,6 +66,14 @@ pub enum DeviceServerError { PeerAddrError, #[error("Error during the websocket handshake occurred")] WebSocketHandshakeError, + #[error("Error while reading the shell command from websocket")] + ReadCommandError, + #[error("Shell error: {0}")] + DeviceShellError(#[from] ShellError), + #[error("Error marshaling to UTF8")] + Utf8Error(#[from] FromUtf8Error), + #[error("Trasport error from Tungstenite")] + Transport(#[from] tokio_tungstenite::tungstenite::Error), } pub struct DeviceServer { @@ -87,43 +96,95 @@ impl DeviceServer { println!("Listening on: {}", self.addr); + // TODO + // Destrutturare self in modo tale da prendere un mutable reference alla lista di comandi + // Usare un mutex per passare la coda di comandi ad ogni handle_connection + // pass the queue of commands to each spawned task. + + let (tx_err, mut rx_err) = tokio::sync::mpsc::channel::(10); + + let handles = Arc::new(Mutex::new(Vec::new())); + let handles_clone = Arc::clone(&handles); + // accept a new connection - while let Ok((stream, _)) = socket.accept().await { - tokio::spawn(handle_connection(stream)); + let handle_connections = tokio::spawn(async move { + while let Ok((stream, _)) = socket.accept().await { + let handle_single_connection = tokio::spawn( + Self::handle_connection(stream, tx_err.clone()), // TODO: GESTIRE ERRORE + ); + handles_clone.lock().await.push(handle_single_connection); + } + }); + + // join connections + if let Some(err) = rx_err.recv().await { + handle_connections.abort(); + let _ = handle_connections.await; // TODO print error + + for h in handles.lock().await.iter() { + h.abort(); + } + + return Err(err); } Ok(()) } -} - -// create a websocket connection and -async fn handle_connection(stream: TcpStream) -> Result<(), DeviceServerError> { - let addr = stream - .peer_addr() - .map_err(|_| DeviceServerError::PeerAddrError)?; - // create a WebSocket connection - let try_web_socket_stream = tokio_tungstenite::accept_async(stream).await; - let web_socket_stream = - try_web_socket_stream.map_err(|_| DeviceServerError::WebSocketHandshakeError)?; + // create a websocket connection and + async fn handle_connection( + stream: TcpStream, + _tx_err: tokio::sync::mpsc::Sender, + ) -> Result<(), DeviceServerError> { + let addr = stream + .peer_addr() + .map_err(|_| DeviceServerError::PeerAddrError)?; + + // create a WebSocket connection + let try_web_socket_stream = tokio_tungstenite::accept_async(stream).await; + let web_socket_stream = + try_web_socket_stream.map_err(|_| DeviceServerError::WebSocketHandshakeError)?; + + println!("New WebSocket connection created: {}", addr); + + // separate ownership between receiving and writing part + let (_write, read) = web_socket_stream.split(); + + // Read the received command + read.map_err(DeviceServerError::Transport) + .and_then(|msg| { + let cmd = match msg { + // convert the message from a Vec into a OsString --> OsStr::from_bytes(&Vec).to_owned() + Message::Text(t) => Ok(t), + Message::Binary(v) => { + String::from_utf8(v).map_err(DeviceServerError::Utf8Error) + } + _ => Err(DeviceServerError::ReadCommandError), + }; + future::ready(cmd) + }) + .try_for_each(|cmd| async move { + // convert the command into the correct format + let cmd = shellwords::split(&cmd) + .map_err(|_| DeviceServerError::DeviceShellError(ShellError::MalformedInput))?; + + // compute the command + let cmd_out = cmd_from_input(&cmd).map_err(DeviceServerError::DeviceShellError)?; // TODO: MAKE THIS AN ASYNC FUNCTION + println!("Command output: {}", cmd_out); + + // WE SHOULD COMPUTE THE OUTPUT AND SEND IT TO THE CLIENT, NOOT PRINTING IT + + /* + read.try_filter(|msg| future::ready(msg.is_text() || msg.is_binary())) + .forward(write) + .await + .expect("Failed to forward messages") + */ + + Ok(()) + }) + .await?; - println!("New WebSocket connection created: {}", addr); - - // separate ownership between receiving and writing part - let (_write, _read) = web_socket_stream.split(); - - // 0. decomment imports & type Command & remove _ - // 1. read the received command - // 2. convert it from a Vec into a OsString --> OsStr::from_bytes(&Vec).to_owned() - // 3. send the output to the client (either Ok or ShellError) - // 4. IMPLEMENT CLIENT - - /* - read.try_filter(|msg| future::ready(msg.is_text() || msg.is_binary())) - .forward(write) - .await - .expect("Failed to forward messages") - */ - - Ok(()) + Ok(()) + } } From ed8639d092c9e3871f92d61cabbccb3742d9b655 Mon Sep 17 00:00:00 2001 From: rgallor Date: Fri, 19 May 2023 17:33:38 +0200 Subject: [PATCH 03/11] Implement error handling function Signed-off-by: rgallor --- cli/src/main.rs | 2 +- rust-remote-shell/src/lib.rs | 33 ++++++++++++++++++++++++++------- 2 files changed, 27 insertions(+), 8 deletions(-) diff --git a/cli/src/main.rs b/cli/src/main.rs index 6c2a20d..69c7a81 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -37,7 +37,7 @@ async fn main() -> Result<()> { let cmd = shellwords::split(cmd.trim()) .map_err(|_| rust_remote_shell::ShellError::MalformedInput)?; - let cmd_out = rust_remote_shell::cmd_from_input(cmd.deref())?; + let cmd_out = rust_remote_shell::cmd_from_input(cmd.deref()).await?; println!("Command output: {}", cmd_out); } Commands::Listener { addr } => { diff --git a/rust-remote-shell/src/lib.rs b/rust-remote-shell/src/lib.rs index ed0f162..84b78d3 100644 --- a/rust-remote-shell/src/lib.rs +++ b/rust-remote-shell/src/lib.rs @@ -27,7 +27,7 @@ pub enum ShellError { WrongOutConversion(#[from] FromUtf8Error), } -fn execute_cmd(cmd: &[S]) -> Result +async fn execute_cmd(cmd: &[S]) -> Result where S: AsRef, { @@ -43,7 +43,7 @@ where }) } -pub fn cmd_from_input(cmd: &[S]) -> Result +pub async fn cmd_from_input(cmd: &[S]) -> Result where S: AsRef, { @@ -51,7 +51,7 @@ where // try executing the command. // If the error states that the command does not exists, throw WrongCommand(cmd.split(' ').first().unwrap()) - let cmd_out = execute_cmd(cmd)?; + let cmd_out = execute_cmd(cmd).await?; std::string::String::from_utf8(cmd_out.stdout) // if the conversion from UTF8 to String goes wrong, return an error @@ -76,6 +76,14 @@ pub enum DeviceServerError { Transport(#[from] tokio_tungstenite::tungstenite::Error), } +impl DeviceServerError { + fn is_fatal(&self) -> bool { + // distinguish between fatal (cause server failure) and non-fatal error + // maatch between different kind of errors. + todo!() + } +} + pub struct DeviceServer { addr: SocketAddr, // queue used to store incoming commands in case multiple commands are sent to the device while one is processed. @@ -112,6 +120,7 @@ impl DeviceServer { let handle_single_connection = tokio::spawn( Self::handle_connection(stream, tx_err.clone()), // TODO: GESTIRE ERRORE ); + handles_clone.lock().await.push(handle_single_connection); } }); @@ -134,8 +143,16 @@ impl DeviceServer { // create a websocket connection and async fn handle_connection( stream: TcpStream, - _tx_err: tokio::sync::mpsc::Sender, - ) -> Result<(), DeviceServerError> { + tx_err: tokio::sync::mpsc::Sender, + ) { + match Self::impl_handle_connection(stream).await { + Ok(_) => {} + Err(err) if err.is_fatal() => tx_err.send(err).await.expect("Error handler failure"), + Err(_) => todo!(), + } + } + + async fn impl_handle_connection(stream: TcpStream) -> Result<(), DeviceServerError> { let addr = stream .peer_addr() .map_err(|_| DeviceServerError::PeerAddrError)?; @@ -169,10 +186,12 @@ impl DeviceServer { .map_err(|_| DeviceServerError::DeviceShellError(ShellError::MalformedInput))?; // compute the command - let cmd_out = cmd_from_input(&cmd).map_err(DeviceServerError::DeviceShellError)?; // TODO: MAKE THIS AN ASYNC FUNCTION + let cmd_out = cmd_from_input(&cmd) + .await + .map_err(DeviceServerError::DeviceShellError)?; // TODO: MAKE THIS AN ASYNC FUNCTION println!("Command output: {}", cmd_out); - // WE SHOULD COMPUTE THE OUTPUT AND SEND IT TO THE CLIENT, NOOT PRINTING IT + // WE SHOULD COMPUTE THE OUTPUT AND SEND IT TO THE CLIENT, NOT PRINTING IT /* read.try_filter(|msg| future::ready(msg.is_text() || msg.is_binary())) From 58e2449dccfdf4ad7bfc1031e579f0e781947ffa Mon Sep 17 00:00:00 2001 From: rgallor Date: Tue, 23 May 2023 19:13:49 +0200 Subject: [PATCH 04/11] Implement client side Signed-off-by: rgallor --- Cargo.lock | 94 +++++++++++++++- cli/Cargo.toml | 3 + cli/src/main.rs | 19 +++- rust-remote-shell/Cargo.toml | 4 +- rust-remote-shell/src/lib.rs | 210 +++++++++++++++++++++++------------ 5 files changed, 255 insertions(+), 75 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1d6d0cd..4d8b73d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -145,6 +145,9 @@ dependencies = [ "rust-remote-shell", "shellwords", "tokio", + "tracing", + "tracing-subscriber", + "url", ] [[package]] @@ -234,12 +237,54 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" +dependencies = [ + "futures-core", + "futures-sink", +] + [[package]] name = "futures-core" version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" +[[package]] +name = "futures-executor" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" + [[package]] name = "futures-macro" version = "0.3.28" @@ -269,10 +314,13 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" dependencies = [ + "futures-channel", "futures-core", + "futures-io", "futures-macro", "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", "slab", @@ -442,6 +490,16 @@ dependencies = [ "windows-sys 0.45.0", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num_cpus" version = "1.15.0" @@ -473,6 +531,12 @@ version = "6.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ceedf44fb00f2d1984b0bc98102627ce622e083e49a5bacdb3e514fa4238e267" +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "owo-colors" version = "3.5.0" @@ -628,11 +692,13 @@ checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" name = "rust-remote-shell" version = "0.1.0" dependencies = [ - "futures-util", + "futures", "shellwords", "thiserror", "tokio", "tokio-tungstenite", + "tracing", + "url", ] [[package]] @@ -849,9 +915,21 @@ checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" dependencies = [ "cfg-if", "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f57e3ca2a01450b1a921183a9c9cbfda207fd822cef4ccb00a65402cbba7a74" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.16", +] + [[package]] name = "tracing-core" version = "0.1.31" @@ -872,15 +950,29 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "tracing-log" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922" +dependencies = [ + "lazy_static", + "log", + "tracing-core", +] + [[package]] name = "tracing-subscriber" version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77" dependencies = [ + "nu-ansi-term", "sharded-slab", + "smallvec", "thread_local", "tracing-core", + "tracing-log", ] [[package]] diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 109d46e..76de6da 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -11,3 +11,6 @@ clap = { version = "3.2.25", features = ["derive"] } color-eyre = "0.6.2" shellwords = "1.1.0" tokio = "1.28.1" +url = "2.3.1" +tracing = "0.1.37" +tracing-subscriber = "0.3.17" diff --git a/cli/src/main.rs b/cli/src/main.rs index 69c7a81..36ab366 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -3,7 +3,9 @@ use std::{net::SocketAddr, ops::Deref}; use clap::{Parser, Subcommand}; use color_eyre::Result; -use rust_remote_shell::DeviceServer; +use rust_remote_shell::{DeviceServer, SenderClient}; +use tracing::Level; +use tracing_subscriber::FmtSubscriber; /// CLI for a rust remote shell #[derive(Debug, Parser)] @@ -20,13 +22,20 @@ enum Commands { /// Make the device listen on a specific IP and port Listener { addr: SocketAddr }, /// Create a client capable of sending command to a Listener - Sender { addr: SocketAddr }, + Sender { listener_addr: url::Url }, } #[tokio::main] async fn main() -> Result<()> { color_eyre::install()?; + // define a subscriber for logging purposes + let subscriber = FmtSubscriber::builder() + .with_max_level(Level::TRACE) + .finish(); + + tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed"); + let cli = Cli::parse(); match &cli.command { @@ -44,9 +53,9 @@ async fn main() -> Result<()> { let device_server = DeviceServer::new(*addr); device_server.listen().await?; } - Commands::Sender { addr: _ } => { - todo!(); - //let sender_client = SenderClient::new(addr); + Commands::Sender { listener_addr } => { + let sender_client = SenderClient::new(listener_addr.clone()); + sender_client.connect().await?; } } diff --git a/rust-remote-shell/Cargo.toml b/rust-remote-shell/Cargo.toml index 5b98982..4447a25 100644 --- a/rust-remote-shell/Cargo.toml +++ b/rust-remote-shell/Cargo.toml @@ -8,8 +8,10 @@ edition = "2021" [lib] [dependencies] -futures-util = "0.3.28" +futures = "0.3.28" shellwords = "1.1.0" thiserror = "1.0.40" tokio = { version = "1.28.1", features = ["full"] } tokio-tungstenite = "0.19.0" +tracing = "0.1.37" +url = "2.3.1" diff --git a/rust-remote-shell/src/lib.rs b/rust-remote-shell/src/lib.rs index 84b78d3..15c7a51 100644 --- a/rust-remote-shell/src/lib.rs +++ b/rust-remote-shell/src/lib.rs @@ -1,15 +1,18 @@ -use futures_util::{future, StreamExt, TryStreamExt}; +use std::ffi::OsStr; use std::io; use std::net::SocketAddr; -use std::sync::Arc; -use tokio::sync::Mutex; -use tokio_tungstenite::tungstenite::Message; // future, TryStreamExt - //use std::os::unix::prelude::OsStrExt; -use std::ffi::OsStr; // , ffi::OsString, collections::VecDeque, -use std::process; use std::string::FromUtf8Error; +use std::sync::Arc; + +use futures::{future, SinkExt, StreamExt, TryStreamExt}; use thiserror::Error; +use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::net::{TcpListener, TcpStream}; +use tokio::process; +use tokio::{io::AsyncWriteExt, sync::Mutex}; +use tokio_tungstenite::{accept_async, connect_async, tungstenite::Message}; +use tracing::{error, info, instrument}; +use url::Url; #[derive(Error, Debug)] pub enum ShellError { @@ -27,7 +30,7 @@ pub enum ShellError { WrongOutConversion(#[from] FromUtf8Error), } -async fn execute_cmd(cmd: &[S]) -> Result +async fn execute_cmd(cmd: &[S]) -> Result where S: AsRef, { @@ -37,6 +40,7 @@ where process::Command::new(cmd_to_exec) .args(cmd_iter) .output() + .await .map_err(|e| ShellError::WrongCommand { cmd: cmd_to_exec.as_ref().to_string_lossy().to_string(), error: e, @@ -60,55 +64,41 @@ where #[derive(Error, Debug)] pub enum DeviceServerError { - #[error("Failed to bind on port {0}")] - BindError(u16), + #[error("Failed to bind")] + Bind(#[from] io::Error), #[error("Connected streams should have a peer address")] - PeerAddrError, + PeerAddr, #[error("Error during the websocket handshake occurred")] - WebSocketHandshakeError, + WebSocketHandshake, #[error("Error while reading the shell command from websocket")] - ReadCommandError, - #[error("Shell error: {0}")] - DeviceShellError(#[from] ShellError), + ReadCommand, #[error("Error marshaling to UTF8")] Utf8Error(#[from] FromUtf8Error), #[error("Trasport error from Tungstenite")] Transport(#[from] tokio_tungstenite::tungstenite::Error), } -impl DeviceServerError { - fn is_fatal(&self) -> bool { - // distinguish between fatal (cause server failure) and non-fatal error - // maatch between different kind of errors. - todo!() - } -} +type TxErrorType = tokio::sync::mpsc::Sender; +#[derive(Debug)] pub struct DeviceServer { addr: SocketAddr, - // queue used to store incoming commands in case multiple commands are sent to the device while one is processed. - // received_commands: VecDeque, } impl DeviceServer { pub fn new(addr: SocketAddr) -> Self { - Self { - addr, - //received_commands: VecDeque::new(), - } + Self { addr } } + #[instrument(skip(self))] pub async fn listen(&self) -> Result<(), DeviceServerError> { - let try_socket = TcpListener::bind(self.addr).await; - let socket = try_socket.map_err(|_| DeviceServerError::BindError(self.addr.port()))?; - - println!("Listening on: {}", self.addr); + let socket = TcpListener::bind(self.addr) + .await + .map_err(DeviceServerError::Bind)?; - // TODO - // Destrutturare self in modo tale da prendere un mutable reference alla lista di comandi - // Usare un mutex per passare la coda di comandi ad ogni handle_connection - // pass the queue of commands to each spawned task. + info!("Listening at {}", self.addr); + // channel tx/rx to handle error let (tx_err, mut rx_err) = tokio::sync::mpsc::channel::(10); let handles = Arc::new(Mutex::new(Vec::new())); @@ -117,9 +107,8 @@ impl DeviceServer { // accept a new connection let handle_connections = tokio::spawn(async move { while let Ok((stream, _)) = socket.accept().await { - let handle_single_connection = tokio::spawn( - Self::handle_connection(stream, tx_err.clone()), // TODO: GESTIRE ERRORE - ); + let handle_single_connection = + tokio::spawn(Self::handle_connection(stream, tx_err.clone())); handles_clone.lock().await.push(handle_single_connection); } @@ -127,45 +116,48 @@ impl DeviceServer { // join connections if let Some(err) = rx_err.recv().await { + // terminate all connections handle_connections.abort(); - let _ = handle_connections.await; // TODO print error + let _ = handle_connections.await; for h in handles.lock().await.iter() { h.abort(); } + error!("Received error {:?}. Terminate all connections.", err); return Err(err); } Ok(()) } - // create a websocket connection and - async fn handle_connection( - stream: TcpStream, - tx_err: tokio::sync::mpsc::Sender, - ) { + #[instrument(skip_all)] + async fn handle_connection(stream: TcpStream, tx_err: TxErrorType) { match Self::impl_handle_connection(stream).await { Ok(_) => {} - Err(err) if err.is_fatal() => tx_err.send(err).await.expect("Error handler failure"), - Err(_) => todo!(), + Err(err) => { + // TODO: fare in modo che quando un client interrompa la connessione, il server non termini + error!("Fatal error occurred: {}", err); + tx_err.send(err).await.expect("Error handler failure"); + } } } + #[instrument(skip_all)] async fn impl_handle_connection(stream: TcpStream) -> Result<(), DeviceServerError> { let addr = stream .peer_addr() - .map_err(|_| DeviceServerError::PeerAddrError)?; + .map_err(|_| DeviceServerError::PeerAddr)?; // create a WebSocket connection - let try_web_socket_stream = tokio_tungstenite::accept_async(stream).await; - let web_socket_stream = - try_web_socket_stream.map_err(|_| DeviceServerError::WebSocketHandshakeError)?; + let web_socket_stream = accept_async(stream) + .await + .map_err(|_| DeviceServerError::WebSocketHandshake)?; - println!("New WebSocket connection created: {}", addr); + info!("New WebSocket connection created: {}", addr); // separate ownership between receiving and writing part - let (_write, read) = web_socket_stream.split(); + let (write, read) = web_socket_stream.split(); // Read the received command read.map_err(DeviceServerError::Transport) @@ -176,34 +168,116 @@ impl DeviceServer { Message::Binary(v) => { String::from_utf8(v).map_err(DeviceServerError::Utf8Error) } - _ => Err(DeviceServerError::ReadCommandError), + _ => Err(DeviceServerError::ReadCommand), }; + info!("Received command from the client"); future::ready(cmd) }) - .try_for_each(|cmd| async move { + .and_then(|cmd| async move { // convert the command into the correct format - let cmd = shellwords::split(&cmd) - .map_err(|_| DeviceServerError::DeviceShellError(ShellError::MalformedInput))?; + let cmd = + shellwords::split(&cmd).unwrap_or(vec!["Malformed command.\n".to_string()]); // compute the command let cmd_out = cmd_from_input(&cmd) .await - .map_err(DeviceServerError::DeviceShellError)?; // TODO: MAKE THIS AN ASYNC FUNCTION - println!("Command output: {}", cmd_out); - - // WE SHOULD COMPUTE THE OUTPUT AND SEND IT TO THE CLIENT, NOT PRINTING IT + .unwrap_or(String::from("Incorrect command.\n")); - /* - read.try_filter(|msg| future::ready(msg.is_text() || msg.is_binary())) - .forward(write) - .await - .expect("Failed to forward messages") - */ + info!("Send command output to the client"); - Ok(()) + Ok(Message::Binary(cmd_out.as_bytes().to_vec())) }) + .forward(write.sink_map_err(DeviceServerError::Transport)) .await?; Ok(()) } } + +#[derive(Debug)] +pub struct SenderClient { + listener_url: Url, + // id: usize, +} + +#[derive(Error, Debug)] +pub enum SenderClientError { + #[error("Error while trying to connect with server")] + WebSocketConnect(#[from] tokio_tungstenite::tungstenite::Error), + #[error("IO error occurred while reading from stdin")] + IORead(#[from] std::io::Error), + #[error("IO error occurred while writing to stdout")] + IOWrite { + #[source] + err: std::io::Error, + }, + #[error("Error while trying to send the output of a command to the main task")] + SendOutput(#[from] tokio::sync::mpsc::error::SendError), + #[error("Error from Tungstenite while reading command")] + TungsteniteReadData { + #[source] + err: tokio_tungstenite::tungstenite::Error, + }, +} + +// impl SenderClientError { +// fn handle_error(self) { +// todo!() +// // gracefully stop the client in case of errors +// // cases to handle: +// // 1. send a wrong command suddenly stops the client +// // 2. ... +// } +// } + +impl SenderClient { + pub fn new(listener_url: Url) -> Self { + info!("Create client"); + Self { listener_url } + } + + #[instrument(skip(self))] + pub async fn connect(&self) -> Result<(), SenderClientError> { + // Websocket connection to an existing server + let (mut ws_stream, _) = connect_async(self.listener_url.clone()) + .await + .map_err(SenderClientError::WebSocketConnect)?; + + info!("WebSocket handshake has been successfully completed"); + + let stdin = tokio::io::stdin(); + let mut reader = BufReader::new(stdin); + let mut cmd = String::new(); + let mut stdout = tokio::io::stdout(); + + // loop to read a command from stdin, wait for its output and write it to stdout + loop { + cmd.clear(); + // read a shell command into the stdin and send it to the server + reader + .read_line(&mut cmd) + .await + .map_err(SenderClientError::IORead)?; + info!("Send cmd \"{}\" to the server", cmd); + ws_stream + .send(Message::Binary(cmd.as_bytes().to_vec())) + .await + .expect("error while sending a command through websocket to the server"); + + // read command shell output from the websocket + let msg = match ws_stream.next().await { + None => todo!(), // connection closed / server stops + Some(res) => res.map_err(|err| SenderClientError::TungsteniteReadData { err })?, + }; + + let data = msg.into_data(); + info!("Returned cmd out to the Client"); + + stdout + .write(&data) + .await + .map_err(|err| SenderClientError::IOWrite { err })?; + stdout.flush().await.expect("writing stdout"); + } + } +} From bbc9fed9745edc048a64013ac0677b73f760b60e Mon Sep 17 00:00:00 2001 From: rgallor Date: Wed, 24 May 2023 15:33:32 +0200 Subject: [PATCH 05/11] Handle exit command from a client Signed-off-by: rgallor --- rust-remote-shell/src/lib.rs | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/rust-remote-shell/src/lib.rs b/rust-remote-shell/src/lib.rs index 15c7a51..7e44701 100644 --- a/rust-remote-shell/src/lib.rs +++ b/rust-remote-shell/src/lib.rs @@ -76,6 +76,8 @@ pub enum DeviceServerError { Utf8Error(#[from] FromUtf8Error), #[error("Trasport error from Tungstenite")] Transport(#[from] tokio_tungstenite::tungstenite::Error), + #[error("Close websocket connection")] + CloseWebsocket, } type TxErrorType = tokio::sync::mpsc::Sender; @@ -135,8 +137,8 @@ impl DeviceServer { async fn handle_connection(stream: TcpStream, tx_err: TxErrorType) { match Self::impl_handle_connection(stream).await { Ok(_) => {} + Err(DeviceServerError::CloseWebsocket) => info!("Websocket connection closed"), // TODO: check that the connection is effectively closed on the server-side (not only on the client-side) Err(err) => { - // TODO: fare in modo che quando un client interrompa la connessione, il server non termini error!("Fatal error occurred: {}", err); tx_err.send(err).await.expect("Error handler failure"); } @@ -168,6 +170,7 @@ impl DeviceServer { Message::Binary(v) => { String::from_utf8(v).map_err(DeviceServerError::Utf8Error) } + Message::Close(_) => Err(DeviceServerError::CloseWebsocket), _ => Err(DeviceServerError::ReadCommand), }; info!("Received command from the client"); @@ -220,16 +223,6 @@ pub enum SenderClientError { }, } -// impl SenderClientError { -// fn handle_error(self) { -// todo!() -// // gracefully stop the client in case of errors -// // cases to handle: -// // 1. send a wrong command suddenly stops the client -// // 2. ... -// } -// } - impl SenderClient { pub fn new(listener_url: Url) -> Self { info!("Create client"); @@ -258,7 +251,18 @@ impl SenderClient { .read_line(&mut cmd) .await .map_err(SenderClientError::IORead)?; - info!("Send cmd \"{}\" to the server", cmd); + + // check if the command is exit. Eventually, close the connection + if cmd.starts_with("exit") { + ws_stream + .close(None) + .await + .expect("Error while closing websocket connection"); + info!("Closed websocket on client side"); + break Ok(()); + } + + info!("Send command to the server"); ws_stream .send(Message::Binary(cmd.as_bytes().to_vec())) .await From 3e110f06e4fa753f3d77e0116d813c0285fbcf15 Mon Sep 17 00:00:00 2001 From: rgallor Date: Wed, 24 May 2023 17:28:00 +0200 Subject: [PATCH 06/11] Add struct to manage shell commands Signed-off-by: rgallor --- cli/src/main.rs | 17 +--- rust-remote-shell/src/lib.rs | 150 ++++++++++++++++++++--------------- 2 files changed, 91 insertions(+), 76 deletions(-) diff --git a/cli/src/main.rs b/cli/src/main.rs index 36ab366..6f4d6ff 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -1,4 +1,4 @@ -use std::{net::SocketAddr, ops::Deref}; +use std::net::SocketAddr; use clap::{Parser, Subcommand}; @@ -17,8 +17,6 @@ struct Cli { // these commands can be called from the CLI using lowercase Commands name #[derive(Subcommand, Debug)] enum Commands { - /// Execute a command - Command { cmd: String }, /// Make the device listen on a specific IP and port Listener { addr: SocketAddr }, /// Create a client capable of sending command to a Listener @@ -39,23 +37,14 @@ async fn main() -> Result<()> { let cli = Cli::parse(); match &cli.command { - Commands::Command { cmd } => { - println!("Input command \"{}\"", cmd); // substitute with logging inside the function - - // parse the cmd into a slice - let cmd = shellwords::split(cmd.trim()) - .map_err(|_| rust_remote_shell::ShellError::MalformedInput)?; - - let cmd_out = rust_remote_shell::cmd_from_input(cmd.deref()).await?; - println!("Command output: {}", cmd_out); - } Commands::Listener { addr } => { let device_server = DeviceServer::new(*addr); device_server.listen().await?; } Commands::Sender { listener_addr } => { - let sender_client = SenderClient::new(listener_addr.clone()); + let mut sender_client = SenderClient::new(listener_addr.clone()); sender_client.connect().await?; + sender_client.send().await?; } } diff --git a/rust-remote-shell/src/lib.rs b/rust-remote-shell/src/lib.rs index 7e44701..66e3c34 100644 --- a/rust-remote-shell/src/lib.rs +++ b/rust-remote-shell/src/lib.rs @@ -11,7 +11,8 @@ use tokio::net::{TcpListener, TcpStream}; use tokio::process; use tokio::{io::AsyncWriteExt, sync::Mutex}; use tokio_tungstenite::{accept_async, connect_async, tungstenite::Message}; -use tracing::{error, info, instrument}; +use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; +use tracing::{error, info, instrument, warn}; use url::Url; #[derive(Error, Debug)] @@ -30,36 +31,42 @@ pub enum ShellError { WrongOutConversion(#[from] FromUtf8Error), } -async fn execute_cmd(cmd: &[S]) -> Result -where - S: AsRef, -{ - let mut cmd_iter = cmd.iter(); - let cmd_to_exec = cmd_iter.next().ok_or(ShellError::EmptyCommand)?; - - process::Command::new(cmd_to_exec) - .args(cmd_iter) - .output() - .await - .map_err(|e| ShellError::WrongCommand { - cmd: cmd_to_exec.as_ref().to_string_lossy().to_string(), - error: e, - }) -} +pub struct CommandHandler; + +impl CommandHandler { + fn new() -> Self { + Self + // TODO: open a remote shell (to the input IP addr) + } -pub async fn cmd_from_input(cmd: &[S]) -> Result -where - S: AsRef, -{ - // before calling this function the binary should ensure that the input in in the correct sintactic format + pub async fn execute(&self, cmd: String) -> Result { + // convert the command into the correct format + let cmd = shellwords::split(&cmd).map_err(|_| ShellError::MalformedInput)?; - // try executing the command. - // If the error states that the command does not exists, throw WrongCommand(cmd.split(' ').first().unwrap()) - let cmd_out = execute_cmd(cmd).await?; + // try executing the command. + let cmd_out = self.inner_execute(&cmd).await?; + + std::string::String::from_utf8(cmd_out.stdout) + // if the conversion from UTF8 to String goes wrong, return an error + .map_err(ShellError::WrongOutConversion) + } - std::string::String::from_utf8(cmd_out.stdout) - // if the conversion from UTF8 to String goes wrong, return an error - .map_err(ShellError::WrongOutConversion) + async fn inner_execute(&self, cmd: &[S]) -> Result + where + S: AsRef, + { + let mut cmd_iter = cmd.iter(); + let cmd_to_exec = cmd_iter.next().ok_or(ShellError::EmptyCommand)?; + + process::Command::new(cmd_to_exec) + .args(cmd_iter) + .output() + .await + .map_err(|e| ShellError::WrongCommand { + cmd: cmd_to_exec.as_ref().to_string_lossy().to_string(), + error: e, + }) + } } #[derive(Error, Debug)] @@ -76,6 +83,8 @@ pub enum DeviceServerError { Utf8Error(#[from] FromUtf8Error), #[error("Trasport error from Tungstenite")] Transport(#[from] tokio_tungstenite::tungstenite::Error), + #[error("Error while precessing the shell command")] + ShellError(#[from] ShellError), #[error("Close websocket connection")] CloseWebsocket, } @@ -116,7 +125,7 @@ impl DeviceServer { } }); - // join connections + // join connections and handle errors if let Some(err) = rx_err.recv().await { // terminate all connections handle_connections.abort(); @@ -137,7 +146,10 @@ impl DeviceServer { async fn handle_connection(stream: TcpStream, tx_err: TxErrorType) { match Self::impl_handle_connection(stream).await { Ok(_) => {} - Err(DeviceServerError::CloseWebsocket) => info!("Websocket connection closed"), // TODO: check that the connection is effectively closed on the server-side (not only on the client-side) + Err(DeviceServerError::CloseWebsocket) => { + info!("Websocket connection closed"); + // TODO: check that the connection is effectively closed on the server-side (not only on the client-side) + } Err(err) => { error!("Fatal error occurred: {}", err); tx_err.send(err).await.expect("Error handler failure"); @@ -165,29 +177,27 @@ impl DeviceServer { read.map_err(DeviceServerError::Transport) .and_then(|msg| { let cmd = match msg { - // convert the message from a Vec into a OsString --> OsStr::from_bytes(&Vec).to_owned() - Message::Text(t) => Ok(t), + // convert the message from a Vec into a OsString Message::Binary(v) => { String::from_utf8(v).map_err(DeviceServerError::Utf8Error) } - Message::Close(_) => Err(DeviceServerError::CloseWebsocket), + Message::Close(_) => Err(DeviceServerError::CloseWebsocket), // the client closed the connection _ => Err(DeviceServerError::ReadCommand), }; info!("Received command from the client"); future::ready(cmd) }) .and_then(|cmd| async move { - // convert the command into the correct format - let cmd = - shellwords::split(&cmd).unwrap_or(vec!["Malformed command.\n".to_string()]); + // define a command handler + let cmd_handler = CommandHandler::new(); - // compute the command - let cmd_out = cmd_from_input(&cmd) - .await - .unwrap_or(String::from("Incorrect command.\n")); + // execute the command and eventually return the error + let cmd_out = cmd_handler.execute(cmd).await.unwrap_or_else(|err| { + warn!("Shell error: {}", err); + format!("Shell error: {}\n", err) + }); info!("Send command output to the client"); - Ok(Message::Binary(cmd_out.as_bytes().to_vec())) }) .forward(write.sink_map_err(DeviceServerError::Transport)) @@ -197,12 +207,6 @@ impl DeviceServer { } } -#[derive(Debug)] -pub struct SenderClient { - listener_url: Url, - // id: usize, -} - #[derive(Error, Debug)] pub enum SenderClientError { #[error("Error while trying to connect with server")] @@ -223,20 +227,44 @@ pub enum SenderClientError { }, } +#[derive(Debug)] +pub struct SenderClient { + listener_url: Url, + ws_stream: Option>>, +} + impl SenderClient { pub fn new(listener_url: Url) -> Self { info!("Create client"); - Self { listener_url } + Self { + listener_url, + ws_stream: None, + } } #[instrument(skip(self))] - pub async fn connect(&self) -> Result<(), SenderClientError> { + pub async fn connect(&mut self) -> Result<(), SenderClientError> { // Websocket connection to an existing server - let (mut ws_stream, _) = connect_async(self.listener_url.clone()) + let (ws_stream, _) = connect_async(self.listener_url.clone()) .await .map_err(SenderClientError::WebSocketConnect)?; + self.ws_stream = Some(ws_stream); + info!("WebSocket handshake has been successfully completed"); + Ok(()) + } + + #[instrument(skip(self))] + pub async fn send(&mut self) -> Result<(), SenderClientError> { + let SenderClient { + listener_url: _, + ws_stream, + } = self; + + let ws_stream = ws_stream + .as_mut() + .expect("expect existing websocket stream"); let stdin = tokio::io::stdin(); let mut reader = BufReader::new(stdin); @@ -269,19 +297,17 @@ impl SenderClient { .expect("error while sending a command through websocket to the server"); // read command shell output from the websocket - let msg = match ws_stream.next().await { - None => todo!(), // connection closed / server stops - Some(res) => res.map_err(|err| SenderClientError::TungsteniteReadData { err })?, - }; + if let Some(res) = ws_stream.next().await { + let data = res + .map_err(|err| SenderClientError::TungsteniteReadData { err })? + .into_data(); - let data = msg.into_data(); - info!("Returned cmd out to the Client"); - - stdout - .write(&data) - .await - .map_err(|err| SenderClientError::IOWrite { err })?; - stdout.flush().await.expect("writing stdout"); + stdout + .write(&data) + .await + .map_err(|err| SenderClientError::IOWrite { err })?; + stdout.flush().await.expect("writing stdout"); + }; } } } From 0672b971d7a558c1c2242a0b4e1d02a1e1ef8956 Mon Sep 17 00:00:00 2001 From: rgallor Date: Thu, 25 May 2023 19:23:35 +0200 Subject: [PATCH 07/11] Implement I/O handler and fix server error Signed-off-by: rgallor --- cli/src/main.rs | 1 - rust-remote-shell/src/lib.rs | 325 +++++++++++++++++++++++++++++------ 2 files changed, 276 insertions(+), 50 deletions(-) diff --git a/cli/src/main.rs b/cli/src/main.rs index 6f4d6ff..103153a 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -44,7 +44,6 @@ async fn main() -> Result<()> { Commands::Sender { listener_addr } => { let mut sender_client = SenderClient::new(listener_addr.clone()); sender_client.connect().await?; - sender_client.send().await?; } } diff --git a/rust-remote-shell/src/lib.rs b/rust-remote-shell/src/lib.rs index 66e3c34..0e6223c 100644 --- a/rust-remote-shell/src/lib.rs +++ b/rust-remote-shell/src/lib.rs @@ -4,11 +4,16 @@ use std::net::SocketAddr; use std::string::FromUtf8Error; use std::sync::Arc; +use futures::stream::SplitSink; use futures::{future, SinkExt, StreamExt, TryStreamExt}; use thiserror::Error; -use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::io::{AsyncBufReadExt, BufReader, Stdin, Stdout}; use tokio::net::{TcpListener, TcpStream}; use tokio::process; +use tokio::sync::mpsc::error::{SendError, TryRecvError}; +use tokio::sync::mpsc::{Sender, UnboundedReceiver}; +use tokio::sync::MutexGuard; +use tokio::task::JoinHandle; use tokio::{io::AsyncWriteExt, sync::Mutex}; use tokio_tungstenite::{accept_async, connect_async, tungstenite::Message}; use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; @@ -129,7 +134,11 @@ impl DeviceServer { if let Some(err) = rx_err.recv().await { // terminate all connections handle_connections.abort(); - let _ = handle_connections.await; + + match handle_connections.await { + Err(err) if !err.is_cancelled() => error!("Join failed: {}", err), + _ => {} + } for h in handles.lock().await.iter() { h.abort(); @@ -219,62 +228,170 @@ pub enum SenderClientError { err: std::io::Error, }, #[error("Error while trying to send the output of a command to the main task")] - SendOutput(#[from] tokio::sync::mpsc::error::SendError), + Channel(#[from] SendError), #[error("Error from Tungstenite while reading command")] TungsteniteReadData { #[source] err: tokio_tungstenite::tungstenite::Error, }, + #[error("Server disconnected")] + Disconnected, } #[derive(Debug)] -pub struct SenderClient { - listener_url: Url, - ws_stream: Option>>, +pub struct IOHandler { + stdout: Stdout, + reader: BufReader, + write: SplitSink>, Message>, + tx_err: Sender>, + buf_cmd: String, } -impl SenderClient { - pub fn new(listener_url: Url) -> Self { - info!("Create client"); +impl IOHandler { + fn new( + write: SplitSink>, Message>, + tx_err: Sender>, + ) -> Self { Self { - listener_url, - ws_stream: None, + stdout: tokio::io::stdout(), + reader: BufReader::new(tokio::io::stdin()), + write, + tx_err, + buf_cmd: String::new(), } } - #[instrument(skip(self))] - pub async fn connect(&mut self) -> Result<(), SenderClientError> { - // Websocket connection to an existing server - let (ws_stream, _) = connect_async(self.listener_url.clone()) + #[instrument(skip_all)] + async fn read_stdin(&mut self) -> Result<(), SenderClientError> { + self.buf_cmd.clear(); + + // read a shell command into the stdin and send it to the server + self.reader + .read_line(&mut self.buf_cmd) .await - .map_err(SenderClientError::WebSocketConnect)?; + .map_err(SenderClientError::IORead)?; - self.ws_stream = Some(ws_stream); + if self.check_exit() { + self.exit().await?; + } - info!("WebSocket handshake has been successfully completed"); Ok(()) } - #[instrument(skip(self))] - pub async fn send(&mut self) -> Result<(), SenderClientError> { - let SenderClient { - listener_url: _, - ws_stream, - } = self; - - let ws_stream = ws_stream - .as_mut() - .expect("expect existing websocket stream"); - - let stdin = tokio::io::stdin(); - let mut reader = BufReader::new(stdin); - let mut cmd = String::new(); - let mut stdout = tokio::io::stdout(); + #[instrument(skip_all)] + fn check_exit(&self) -> bool { + self.buf_cmd.starts_with("exit") + } + + #[instrument(skip_all)] + async fn exit(&mut self) -> Result<(), SenderClientError> { + // check if the command is exit. Eventually, close the connection + + self.write + .send(Message::Close(None)) + .await + .expect("Error while closing websocket connection"); + info!("Closed websocket on client side"); + + self.tx_err.send(Ok(())).await.expect("channel error"); + + Ok(()) // send Ok(()) to close the connection on client side + //break Ok(()); + } + + #[instrument(skip_all)] + async fn send_to_server(&mut self) -> Result<(), SenderClientError> { + info!("Send command to the server"); + self.write + .send(Message::Binary(self.buf_cmd.as_bytes().to_vec())) + .await + .map_err(|err| SenderClientError::TungsteniteReadData { err })?; - // loop to read a command from stdin, wait for its output and write it to stdout + info!("Command sent: {}", self.buf_cmd); + + Ok(()) + } + + #[instrument(skip_all)] + async fn impl_write_stdout(&mut self, msg: Message) -> Result<(), SenderClientError> { + let data = msg.into_data(); + + self.stdout + .write(&data) + .await + .map_err(|err| SenderClientError::IOWrite { err })?; + + self.stdout.flush().await.expect("writing stdout"); + + Ok(()) + } + + #[instrument(skip_all)] + pub async fn write_stdout( + &mut self, + rx: Arc>>, + ) -> Result<(), SenderClientError> { + // check if there are command outputs stored in the channel. Eventually, print them to the stdout + let mut channel = rx.lock().await; + + // wait to receive the first command output + let msg = channel.recv().await.unwrap(); + + self.impl_write_stdout(msg).await?; + + self.empty_buffer(channel).await?; + + Ok(()) + } + + async fn empty_buffer( + &mut self, + mut channel: MutexGuard<'_, UnboundedReceiver>, + ) -> Result<(), SenderClientError> { + loop { + match channel.try_recv() { + Ok(msg) => { + self.impl_write_stdout(msg).await?; + } + Err(TryRecvError::Empty) => { + // the channel is empty but the connection is still open + break Ok(()); // TODO: check that Ok(()) is a good return value + } + Err(TryRecvError::Disconnected) => { + unreachable!("the channel should not be dropped before the task is aborted") + } + } + } + } +} + +#[derive(Debug)] +pub struct SenderClient { + listener_url: Url, +} + +impl SenderClient { + pub fn new(listener_url: Url) -> Self { + Self { listener_url } + } + + async fn read_write( + write: SplitSink>, Message>, + rx: Arc>>, + tx_err: Sender>, + ) -> Result<(), SenderClientError> { + let mut iohandler = IOHandler::new(write, tx_err); + + // read from stdin and, if messages are present on the channel (rx) print them to the stdout + loop { + iohandler.read_stdin().await?; + iohandler.send_to_server().await?; + iohandler.write_stdout(Arc::clone(&rx)).await?; + } + /* loop { cmd.clear(); - // read a shell command into the stdin and send it to the server + // read a shell command from the stdin and send it to the server reader .read_line(&mut cmd) .await @@ -282,32 +399,142 @@ impl SenderClient { // check if the command is exit. Eventually, close the connection if cmd.starts_with("exit") { - ws_stream - .close(None) + write + .send(Message::Close(None)) .await .expect("Error while closing websocket connection"); info!("Closed websocket on client side"); + tx_err.send(Ok(())).await.expect("channel error"); // send Ok(()) to close the connection on client side break Ok(()); } info!("Send command to the server"); - ws_stream + write .send(Message::Binary(cmd.as_bytes().to_vec())) .await .expect("error while sending a command through websocket to the server"); + info!("Command sent: {}", cmd); - // read command shell output from the websocket - if let Some(res) = ws_stream.next().await { - let data = res - .map_err(|err| SenderClientError::TungsteniteReadData { err })? - .into_data(); + // check if there are command outputs stored in the channel. Eventually, print them to the stdout + let mut channel = rx.lock().await; - stdout - .write(&data) - .await - .map_err(|err| SenderClientError::IOWrite { err })?; - stdout.flush().await.expect("writing stdout"); - }; + let some = channel.recv().await.unwrap(); + let data = some.into_data(); + + stdout + .write(&data) + .await + .map_err(|err| SenderClientError::IOWrite { err })?; + stdout.flush().await.expect("writing stdout"); + + // TODO: define function + loop { + match channel.try_recv() { + Ok(cmd_out) => { + let data = cmd_out.into_data(); + + stdout + .write(&data) + .await + .map_err(|err| SenderClientError::IOWrite { err })?; + stdout.flush().await.expect("writing stdout"); + } + Err(TryRecvError::Empty) => { + // the channel is empty but the connection is still open + break; + } + Err(TryRecvError::Disconnected) => { + unreachable!("the channel should not be dropped before the task is aborted") + } + } + } + } + */ + } + + #[instrument(skip(self))] + pub async fn connect(&mut self) -> Result<(), SenderClientError> { + // Websocket connection to an existing server + let (ws_stream, _) = connect_async(self.listener_url.clone()) + .await + .map_err(SenderClientError::WebSocketConnect)?; + + info!("WebSocket handshake has been successfully completed"); + + let (write, read) = ws_stream.split(); + + let (tx_cmd_out, rx_cmd_out) = tokio::sync::mpsc::unbounded_channel::(); + let rx_cmd_out = Arc::new(Mutex::new(rx_cmd_out)); + let rx_cmd_out_clone = Arc::clone(&rx_cmd_out); + + let (tx_err, mut rx_err) = tokio::sync::mpsc::channel::>(1); + + // handle stdin and stdout + let handle_std_in_out = + tokio::spawn(Self::read_write(write, rx_cmd_out_clone, tx_err.clone())); + + let handle_read = tokio::spawn(async move { + let res = read + .map_err(|err| SenderClientError::TungsteniteReadData { err }) + .try_for_each(|cmd_out| async { + tx_cmd_out.send(cmd_out).map_err(SenderClientError::Channel) + }) + .await; + + if let Err(err) = res { + tx_err.send(Err(err)).await.expect("channel error"); + } + + Ok(()) + }); + + let handles = vec![handle_std_in_out, handle_read]; + + match rx_err.recv().await.expect("channel error") { + Ok(()) => { + info!("Closing websocket connection"); + Self::close(handles, rx_cmd_out).await + } + Err(err) => { + error!("Fatal error: {}", err); + Self::close(handles, rx_cmd_out).await + } } } + + #[instrument(skip_all)] + async fn close( + handles: Vec>>, + rx_cmd_out: Arc>>, + ) -> Result<(), SenderClientError> { + // abort the current active tasks + for h in handles.iter() { + h.abort(); + } + + for h in handles { + match h.await { + Err(err) if !err.is_cancelled() => { + error!("Join failed: {}", err) + } + _ => {} + } + } + + // write the remaining elements from cmd out buffer to stdout + let mut channel = rx_cmd_out.lock().await; + let mut stdout = tokio::io::stdout(); + while let Ok(cmd_out) = channel.try_recv() { + let data = cmd_out.into_data(); + stdout + .write(&data) + .await + .map_err(|err| SenderClientError::IOWrite { err })?; + stdout.flush().await.expect("writing stdout"); + } + + info!("EXIT"); + + Ok(()) + } } From 29a2a6187b3bdb6fc67fcfe72c16fa5adc1bbea0 Mon Sep 17 00:00:00 2001 From: rgallor Date: Fri, 26 May 2023 12:33:54 +0200 Subject: [PATCH 08/11] Add support for CRTL+C on both server and client side Signed-off-by: rgallor --- rust-remote-shell/src/lib.rs | 91 ++++++++++++++++++++++-------------- 1 file changed, 57 insertions(+), 34 deletions(-) diff --git a/rust-remote-shell/src/lib.rs b/rust-remote-shell/src/lib.rs index 0e6223c..c29bc42 100644 --- a/rust-remote-shell/src/lib.rs +++ b/rust-remote-shell/src/lib.rs @@ -1,5 +1,5 @@ use std::ffi::OsStr; -use std::io; +use std::io::{self}; use std::net::SocketAddr; use std::string::FromUtf8Error; use std::sync::Arc; @@ -15,9 +15,11 @@ use tokio::sync::mpsc::{Sender, UnboundedReceiver}; use tokio::sync::MutexGuard; use tokio::task::JoinHandle; use tokio::{io::AsyncWriteExt, sync::Mutex}; +use tokio_tungstenite::tungstenite::error::ProtocolError; +use tokio_tungstenite::tungstenite::Error as TungsteniteError; use tokio_tungstenite::{accept_async, connect_async, tungstenite::Message}; use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; -use tracing::{error, info, instrument, warn}; +use tracing::{debug, error, info, instrument, trace, warn}; use url::Url; #[derive(Error, Debug)] @@ -32,7 +34,7 @@ pub enum ShellError { #[source] error: io::Error, }, - #[error("The execution of the command caused an error while formatting the output into UTF8")] + #[error("Error while formatting command output into UTF8")] WrongOutConversion(#[from] FromUtf8Error), } @@ -44,16 +46,12 @@ impl CommandHandler { // TODO: open a remote shell (to the input IP addr) } + #[instrument(skip(self))] pub async fn execute(&self, cmd: String) -> Result { - // convert the command into the correct format + debug!("Execute command {}", cmd); let cmd = shellwords::split(&cmd).map_err(|_| ShellError::MalformedInput)?; - - // try executing the command. let cmd_out = self.inner_execute(&cmd).await?; - - std::string::String::from_utf8(cmd_out.stdout) - // if the conversion from UTF8 to String goes wrong, return an error - .map_err(ShellError::WrongOutConversion) + String::from_utf8(cmd_out.stdout).map_err(ShellError::WrongOutConversion) } async fn inner_execute(&self, cmd: &[S]) -> Result @@ -133,19 +131,29 @@ impl DeviceServer { // join connections and handle errors if let Some(err) = rx_err.recv().await { // terminate all connections - handle_connections.abort(); + self.terminate(handle_connections, &handles).await?; + error!("Received error {:?}. Terminate all connections.", err); + return Err(err); + } - match handle_connections.await { - Err(err) if !err.is_cancelled() => error!("Join failed: {}", err), - _ => {} - } + Ok(()) + } - for h in handles.lock().await.iter() { - h.abort(); - } + #[instrument(skip_all)] + async fn terminate( + &self, + handle_connections: JoinHandle<()>, + handles: &Mutex>>, + ) -> Result<(), DeviceServerError> { + handle_connections.abort(); + + match handle_connections.await { + Err(err) if !err.is_cancelled() => error!("Join failed: {}", err), + _ => {} + } - error!("Received error {:?}. Terminate all connections.", err); - return Err(err); + for h in handles.lock().await.iter() { + h.abort(); } Ok(()) @@ -155,8 +163,11 @@ impl DeviceServer { async fn handle_connection(stream: TcpStream, tx_err: TxErrorType) { match Self::impl_handle_connection(stream).await { Ok(_) => {} - Err(DeviceServerError::CloseWebsocket) => { - info!("Websocket connection closed"); + Err(DeviceServerError::CloseWebsocket) + | Err(DeviceServerError::Transport(TungsteniteError::Protocol( + ProtocolError::ResetWithoutClosingHandshake, + ))) => { + warn!("Websocket connection closed"); // TODO: check that the connection is effectively closed on the server-side (not only on the client-side) } Err(err) => { @@ -266,16 +277,22 @@ impl IOHandler { self.buf_cmd.clear(); // read a shell command into the stdin and send it to the server - self.reader + let byte_read = self + .reader .read_line(&mut self.buf_cmd) .await .map_err(SenderClientError::IORead)?; - if self.check_exit() { - self.exit().await?; + debug!(?byte_read); + if byte_read == 0 { + info!("EOF received"); + self.exit().await + } else if self.check_exit() { + info!("exit received"); + self.exit().await + } else { + Ok(()) } - - Ok(()) } #[instrument(skip_all)] @@ -329,7 +346,7 @@ impl IOHandler { #[instrument(skip_all)] pub async fn write_stdout( &mut self, - rx: Arc>>, + rx: &Mutex>, ) -> Result<(), SenderClientError> { // check if there are command outputs stored in the channel. Eventually, print them to the stdout let mut channel = rx.lock().await; @@ -386,7 +403,7 @@ impl SenderClient { loop { iohandler.read_stdin().await?; iohandler.send_to_server().await?; - iohandler.write_stdout(Arc::clone(&rx)).await?; + iohandler.write_stdout(&rx).await?; } /* loop { @@ -488,23 +505,24 @@ impl SenderClient { Ok(()) }); - let handles = vec![handle_std_in_out, handle_read]; + let mut handles = [handle_std_in_out, handle_read]; match rx_err.recv().await.expect("channel error") { Ok(()) => { info!("Closing websocket connection"); - Self::close(handles, rx_cmd_out).await + Self::close(&mut handles, rx_cmd_out).await } Err(err) => { error!("Fatal error: {}", err); - Self::close(handles, rx_cmd_out).await + Self::close(&mut handles, rx_cmd_out).await?; + Err(err) } } } #[instrument(skip_all)] async fn close( - handles: Vec>>, + handles: &mut [JoinHandle>], rx_cmd_out: Arc>>, ) -> Result<(), SenderClientError> { // abort the current active tasks @@ -517,7 +535,12 @@ impl SenderClient { Err(err) if !err.is_cancelled() => { error!("Join failed: {}", err) } - _ => {} + Err(_) => { + trace!("Task cancelled") + } + Ok(res) => { + debug!("Task joined with: {:?}", res) + } } } From bdb3dac0b9eb1ba1c13decddef4db942b204cc70 Mon Sep 17 00:00:00 2001 From: rgallor Date: Fri, 26 May 2023 16:39:39 +0200 Subject: [PATCH 09/11] Divide library into submodules Signed-off-by: rgallor --- cli/src/main.rs | 6 +- rust-remote-shell/src/device_server.rs | 173 ++++++++ rust-remote-shell/src/io_handler.rs | 146 +++++++ rust-remote-shell/src/lib.rs | 567 +------------------------ rust-remote-shell/src/sender_client.rs | 156 +++++++ rust-remote-shell/src/shell.rs | 78 ++++ 6 files changed, 561 insertions(+), 565 deletions(-) create mode 100644 rust-remote-shell/src/device_server.rs create mode 100644 rust-remote-shell/src/io_handler.rs create mode 100644 rust-remote-shell/src/sender_client.rs create mode 100644 rust-remote-shell/src/shell.rs diff --git a/cli/src/main.rs b/cli/src/main.rs index 103153a..a079a0b 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -3,10 +3,12 @@ use std::net::SocketAddr; use clap::{Parser, Subcommand}; use color_eyre::Result; -use rust_remote_shell::{DeviceServer, SenderClient}; use tracing::Level; use tracing_subscriber::FmtSubscriber; +use rust_remote_shell::device_server::DeviceServer; +use rust_remote_shell::sender_client::SenderClient; + /// CLI for a rust remote shell #[derive(Debug, Parser)] struct Cli { @@ -39,7 +41,7 @@ async fn main() -> Result<()> { match &cli.command { Commands::Listener { addr } => { let device_server = DeviceServer::new(*addr); - device_server.listen().await?; + device_server.listen().await? } Commands::Sender { listener_addr } => { let mut sender_client = SenderClient::new(listener_addr.clone()); diff --git a/rust-remote-shell/src/device_server.rs b/rust-remote-shell/src/device_server.rs new file mode 100644 index 0000000..8542a41 --- /dev/null +++ b/rust-remote-shell/src/device_server.rs @@ -0,0 +1,173 @@ +use std::io::{self}; +use std::net::SocketAddr; +use std::string::FromUtf8Error; +use std::sync::Arc; + +use futures::{future, SinkExt, StreamExt, TryStreamExt}; +use thiserror::Error; +use tokio::net::{TcpListener, TcpStream}; +use tokio::sync::Mutex; +use tokio::task::JoinHandle; +use tokio_tungstenite::tungstenite::error::ProtocolError; +use tokio_tungstenite::tungstenite::Error as TungsteniteError; +use tokio_tungstenite::{accept_async, tungstenite::Message}; +use tracing::{error, info, instrument, warn}; + +use crate::shell::{CommandHandler, ShellError}; + +#[derive(Error, Debug)] +pub enum DeviceServerError { + #[error("Failed to bind")] + Bind(#[from] io::Error), + #[error("Connected streams should have a peer address")] + PeerAddr, + #[error("Error during the websocket handshake occurred")] + WebSocketHandshake, + #[error("Error while reading the shell command from websocket")] + ReadCommand, + #[error("Error marshaling to UTF8")] + Utf8Error(#[from] FromUtf8Error), + #[error("Trasport error from Tungstenite")] + Transport(#[from] tokio_tungstenite::tungstenite::Error), + #[error("Error while precessing the shell command")] + ShellError(#[from] ShellError), + #[error("Close websocket connection")] + CloseWebsocket, +} + +type TxErrorType = tokio::sync::mpsc::Sender; +const MAX_ERRORS_TO_HANDLE: usize = 10; + +#[derive(Debug)] +pub struct DeviceServer { + addr: SocketAddr, +} + +impl DeviceServer { + pub fn new(addr: SocketAddr) -> Self { + Self { addr } + } + + #[instrument(skip(self))] + pub async fn listen(&self) -> Result<(), DeviceServerError> { + let socket = TcpListener::bind(self.addr) + .await + .map_err(DeviceServerError::Bind)?; + + info!("Listening at {}", self.addr); + + // channel tx/rx to handle error + let (tx_err, mut rx_err) = + tokio::sync::mpsc::channel::(MAX_ERRORS_TO_HANDLE); + + let handles = Arc::new(Mutex::new(Vec::new())); + let handles_clone = Arc::clone(&handles); + + // accept a new connection + let handle_connections = tokio::spawn(async move { + while let Ok((stream, _)) = socket.accept().await { + let handle_single_connection = + tokio::spawn(Self::handle_connection(stream, tx_err.clone())); + + handles_clone.lock().await.push(handle_single_connection); + } + }); + + // join connections and handle errors + if let Some(err) = rx_err.recv().await { + self.terminate(handle_connections, &handles).await?; + error!("Received error {:?}. Terminate all connections.", err); + return Err(err); + } + + Ok(()) + } + + // terminate all connections + #[instrument(skip_all)] + async fn terminate( + &self, + handle_connections: JoinHandle<()>, + handles: &Mutex>>, + ) -> Result<(), DeviceServerError> { + handle_connections.abort(); + + match handle_connections.await { + Err(err) if !err.is_cancelled() => error!("Join failed: {}", err), + _ => {} + } + + for h in handles.lock().await.iter() { + h.abort(); + } + + Ok(()) + } + + #[instrument(skip_all)] + async fn handle_connection(stream: TcpStream, tx_err: TxErrorType) { + match Self::impl_handle_connection(stream).await { + Ok(_) => {} + Err(DeviceServerError::CloseWebsocket) + | Err(DeviceServerError::Transport(TungsteniteError::Protocol( + ProtocolError::ResetWithoutClosingHandshake, + ))) => { + warn!("Websocket connection closed"); + // TODO: check that the connection is effectively closed on the server-side (not only on the client-side) + } + Err(err) => { + error!("Fatal error occurred: {}", err); + tx_err.send(err).await.expect("Error handler failure"); + } + } + } + + #[instrument(skip_all)] + async fn impl_handle_connection(stream: TcpStream) -> Result<(), DeviceServerError> { + let addr = stream + .peer_addr() + .map_err(|_| DeviceServerError::PeerAddr)?; + + // create a WebSocket connection + let web_socket_stream = accept_async(stream) + .await + .map_err(|_| DeviceServerError::WebSocketHandshake)?; + + info!("New WebSocket connection created: {}", addr); + + // separate ownership between receiving and writing part + let (write, read) = web_socket_stream.split(); + + // Read the received command + read.map_err(DeviceServerError::Transport) + .and_then(|msg| { + let cmd = match msg { + // convert the message from a Vec into a OsString + Message::Binary(v) => { + String::from_utf8(v).map_err(DeviceServerError::Utf8Error) + } + Message::Close(_) => Err(DeviceServerError::CloseWebsocket), // the client closed the connection + _ => Err(DeviceServerError::ReadCommand), + }; + info!("Received command from the client"); + future::ready(cmd) + }) + .and_then(|cmd| async move { + // define a command handler + let cmd_handler = CommandHandler::new(); + + // execute the command and eventually return the error + let cmd_out = cmd_handler.execute(cmd).await.unwrap_or_else(|err| { + warn!("Shell error: {}", err); + format!("Shell error: {}\n", err) + }); + + info!("Send command output to the client"); + Ok(Message::Binary(cmd_out.as_bytes().to_vec())) + }) + .forward(write.sink_map_err(DeviceServerError::Transport)) + .await?; + + Ok(()) + } +} diff --git a/rust-remote-shell/src/io_handler.rs b/rust-remote-shell/src/io_handler.rs new file mode 100644 index 0000000..57bf06f --- /dev/null +++ b/rust-remote-shell/src/io_handler.rs @@ -0,0 +1,146 @@ +use futures::stream::SplitSink; +use futures::SinkExt; +use tokio::io::{AsyncBufReadExt, BufReader, Stdin, Stdout}; +use tokio::net::TcpStream; +use tokio::sync::mpsc::error::TryRecvError; +use tokio::sync::mpsc::{Sender, UnboundedReceiver}; +use tokio::sync::MutexGuard; +use tokio::{io::AsyncWriteExt, sync::Mutex}; +use tokio_tungstenite::tungstenite::Message; +use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; +use tracing::{debug, info, instrument, warn}; + +use crate::sender_client::SenderClientError; + +#[derive(Debug)] +pub struct IOHandler { + stdout: Stdout, + reader: BufReader, + write: SplitSink>, Message>, + tx_err: Sender>, + buf_cmd: String, +} + +impl IOHandler { + pub fn new( + write: SplitSink>, Message>, + tx_err: Sender>, + ) -> Self { + Self { + stdout: tokio::io::stdout(), + reader: BufReader::new(tokio::io::stdin()), + write, + tx_err, + buf_cmd: String::new(), + } + } + + #[instrument(skip_all)] + pub async fn read_stdin(&mut self) -> Result<(), SenderClientError> { + self.buf_cmd.clear(); + + // read a shell command into the stdin and send it to the server + let byte_read = self + .reader + .read_line(&mut self.buf_cmd) + .await + .map_err(SenderClientError::IORead)?; + + debug!(?byte_read); + if byte_read == 0 { + info!("EOF received"); + self.exit().await + } else if self.check_exit() { + info!("exit received"); + self.exit().await + } else { + Ok(()) + } + } + + #[instrument(skip_all)] + fn check_exit(&self) -> bool { + self.buf_cmd.starts_with("exit") + } + + #[instrument(skip_all)] + async fn exit(&mut self) -> Result<(), SenderClientError> { + // check if the command is exit. Eventually, close the connection + + self.write + .send(Message::Close(None)) + .await + .expect("Error while closing websocket connection"); + info!("Closed websocket on client side"); + + self.tx_err.send(Ok(())).await.expect("channel error"); + + Ok(()) // send Ok(()) to close the connection on client side + //break Ok(()); + } + + #[instrument(skip_all)] + pub async fn send_to_server(&mut self) -> Result<(), SenderClientError> { + info!("Send command to the server"); + self.write + .send(Message::Binary(self.buf_cmd.as_bytes().to_vec())) + .await + .map_err(|err| SenderClientError::TungsteniteReadData { err })?; + + info!("Command sent: {}", self.buf_cmd); + + Ok(()) + } + + #[instrument(skip_all)] + pub async fn write_stdout( + &mut self, + rx: &Mutex>, + ) -> Result<(), SenderClientError> { + // check if there are command outputs stored in the channel. Eventually, print them to the stdout + let mut channel = rx.lock().await; + + // wait to receive the first command output + let msg = channel.recv().await.unwrap(); + + self.impl_write_stdout(msg).await?; + + self.empty_buffer(channel).await?; + + Ok(()) + } + + #[instrument(skip_all)] + async fn impl_write_stdout(&mut self, msg: Message) -> Result<(), SenderClientError> { + let data = msg.into_data(); + + self.stdout + .write(&data) + .await + .map_err(|err| SenderClientError::IOWrite { err })?; + + self.stdout.flush().await.expect("writing stdout"); + + Ok(()) + } + + async fn empty_buffer( + &mut self, + mut channel: MutexGuard<'_, UnboundedReceiver>, + ) -> Result<(), SenderClientError> { + loop { + match channel.try_recv() { + Ok(msg) => { + self.impl_write_stdout(msg).await?; + } + Err(TryRecvError::Empty) => { + // the channel is empty but the connection is still open + break Ok(()); // TODO: check that Ok(()) is a good return value + } + Err(TryRecvError::Disconnected) => { + unreachable!("the channel should not be dropped before the task is aborted") + } + } + } + } +} diff --git a/rust-remote-shell/src/lib.rs b/rust-remote-shell/src/lib.rs index c29bc42..2d17e2c 100644 --- a/rust-remote-shell/src/lib.rs +++ b/rust-remote-shell/src/lib.rs @@ -1,563 +1,4 @@ -use std::ffi::OsStr; -use std::io::{self}; -use std::net::SocketAddr; -use std::string::FromUtf8Error; -use std::sync::Arc; - -use futures::stream::SplitSink; -use futures::{future, SinkExt, StreamExt, TryStreamExt}; -use thiserror::Error; -use tokio::io::{AsyncBufReadExt, BufReader, Stdin, Stdout}; -use tokio::net::{TcpListener, TcpStream}; -use tokio::process; -use tokio::sync::mpsc::error::{SendError, TryRecvError}; -use tokio::sync::mpsc::{Sender, UnboundedReceiver}; -use tokio::sync::MutexGuard; -use tokio::task::JoinHandle; -use tokio::{io::AsyncWriteExt, sync::Mutex}; -use tokio_tungstenite::tungstenite::error::ProtocolError; -use tokio_tungstenite::tungstenite::Error as TungsteniteError; -use tokio_tungstenite::{accept_async, connect_async, tungstenite::Message}; -use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; -use tracing::{debug, error, info, instrument, trace, warn}; -use url::Url; - -#[derive(Error, Debug)] -pub enum ShellError { - #[error("Empty command")] - EmptyCommand, - #[error("Malformed input")] - MalformedInput, - #[error("Command {cmd} does not exists")] - WrongCommand { - cmd: String, - #[source] - error: io::Error, - }, - #[error("Error while formatting command output into UTF8")] - WrongOutConversion(#[from] FromUtf8Error), -} - -pub struct CommandHandler; - -impl CommandHandler { - fn new() -> Self { - Self - // TODO: open a remote shell (to the input IP addr) - } - - #[instrument(skip(self))] - pub async fn execute(&self, cmd: String) -> Result { - debug!("Execute command {}", cmd); - let cmd = shellwords::split(&cmd).map_err(|_| ShellError::MalformedInput)?; - let cmd_out = self.inner_execute(&cmd).await?; - String::from_utf8(cmd_out.stdout).map_err(ShellError::WrongOutConversion) - } - - async fn inner_execute(&self, cmd: &[S]) -> Result - where - S: AsRef, - { - let mut cmd_iter = cmd.iter(); - let cmd_to_exec = cmd_iter.next().ok_or(ShellError::EmptyCommand)?; - - process::Command::new(cmd_to_exec) - .args(cmd_iter) - .output() - .await - .map_err(|e| ShellError::WrongCommand { - cmd: cmd_to_exec.as_ref().to_string_lossy().to_string(), - error: e, - }) - } -} - -#[derive(Error, Debug)] -pub enum DeviceServerError { - #[error("Failed to bind")] - Bind(#[from] io::Error), - #[error("Connected streams should have a peer address")] - PeerAddr, - #[error("Error during the websocket handshake occurred")] - WebSocketHandshake, - #[error("Error while reading the shell command from websocket")] - ReadCommand, - #[error("Error marshaling to UTF8")] - Utf8Error(#[from] FromUtf8Error), - #[error("Trasport error from Tungstenite")] - Transport(#[from] tokio_tungstenite::tungstenite::Error), - #[error("Error while precessing the shell command")] - ShellError(#[from] ShellError), - #[error("Close websocket connection")] - CloseWebsocket, -} - -type TxErrorType = tokio::sync::mpsc::Sender; - -#[derive(Debug)] -pub struct DeviceServer { - addr: SocketAddr, -} - -impl DeviceServer { - pub fn new(addr: SocketAddr) -> Self { - Self { addr } - } - - #[instrument(skip(self))] - pub async fn listen(&self) -> Result<(), DeviceServerError> { - let socket = TcpListener::bind(self.addr) - .await - .map_err(DeviceServerError::Bind)?; - - info!("Listening at {}", self.addr); - - // channel tx/rx to handle error - let (tx_err, mut rx_err) = tokio::sync::mpsc::channel::(10); - - let handles = Arc::new(Mutex::new(Vec::new())); - let handles_clone = Arc::clone(&handles); - - // accept a new connection - let handle_connections = tokio::spawn(async move { - while let Ok((stream, _)) = socket.accept().await { - let handle_single_connection = - tokio::spawn(Self::handle_connection(stream, tx_err.clone())); - - handles_clone.lock().await.push(handle_single_connection); - } - }); - - // join connections and handle errors - if let Some(err) = rx_err.recv().await { - // terminate all connections - self.terminate(handle_connections, &handles).await?; - error!("Received error {:?}. Terminate all connections.", err); - return Err(err); - } - - Ok(()) - } - - #[instrument(skip_all)] - async fn terminate( - &self, - handle_connections: JoinHandle<()>, - handles: &Mutex>>, - ) -> Result<(), DeviceServerError> { - handle_connections.abort(); - - match handle_connections.await { - Err(err) if !err.is_cancelled() => error!("Join failed: {}", err), - _ => {} - } - - for h in handles.lock().await.iter() { - h.abort(); - } - - Ok(()) - } - - #[instrument(skip_all)] - async fn handle_connection(stream: TcpStream, tx_err: TxErrorType) { - match Self::impl_handle_connection(stream).await { - Ok(_) => {} - Err(DeviceServerError::CloseWebsocket) - | Err(DeviceServerError::Transport(TungsteniteError::Protocol( - ProtocolError::ResetWithoutClosingHandshake, - ))) => { - warn!("Websocket connection closed"); - // TODO: check that the connection is effectively closed on the server-side (not only on the client-side) - } - Err(err) => { - error!("Fatal error occurred: {}", err); - tx_err.send(err).await.expect("Error handler failure"); - } - } - } - - #[instrument(skip_all)] - async fn impl_handle_connection(stream: TcpStream) -> Result<(), DeviceServerError> { - let addr = stream - .peer_addr() - .map_err(|_| DeviceServerError::PeerAddr)?; - - // create a WebSocket connection - let web_socket_stream = accept_async(stream) - .await - .map_err(|_| DeviceServerError::WebSocketHandshake)?; - - info!("New WebSocket connection created: {}", addr); - - // separate ownership between receiving and writing part - let (write, read) = web_socket_stream.split(); - - // Read the received command - read.map_err(DeviceServerError::Transport) - .and_then(|msg| { - let cmd = match msg { - // convert the message from a Vec into a OsString - Message::Binary(v) => { - String::from_utf8(v).map_err(DeviceServerError::Utf8Error) - } - Message::Close(_) => Err(DeviceServerError::CloseWebsocket), // the client closed the connection - _ => Err(DeviceServerError::ReadCommand), - }; - info!("Received command from the client"); - future::ready(cmd) - }) - .and_then(|cmd| async move { - // define a command handler - let cmd_handler = CommandHandler::new(); - - // execute the command and eventually return the error - let cmd_out = cmd_handler.execute(cmd).await.unwrap_or_else(|err| { - warn!("Shell error: {}", err); - format!("Shell error: {}\n", err) - }); - - info!("Send command output to the client"); - Ok(Message::Binary(cmd_out.as_bytes().to_vec())) - }) - .forward(write.sink_map_err(DeviceServerError::Transport)) - .await?; - - Ok(()) - } -} - -#[derive(Error, Debug)] -pub enum SenderClientError { - #[error("Error while trying to connect with server")] - WebSocketConnect(#[from] tokio_tungstenite::tungstenite::Error), - #[error("IO error occurred while reading from stdin")] - IORead(#[from] std::io::Error), - #[error("IO error occurred while writing to stdout")] - IOWrite { - #[source] - err: std::io::Error, - }, - #[error("Error while trying to send the output of a command to the main task")] - Channel(#[from] SendError), - #[error("Error from Tungstenite while reading command")] - TungsteniteReadData { - #[source] - err: tokio_tungstenite::tungstenite::Error, - }, - #[error("Server disconnected")] - Disconnected, -} - -#[derive(Debug)] -pub struct IOHandler { - stdout: Stdout, - reader: BufReader, - write: SplitSink>, Message>, - tx_err: Sender>, - buf_cmd: String, -} - -impl IOHandler { - fn new( - write: SplitSink>, Message>, - tx_err: Sender>, - ) -> Self { - Self { - stdout: tokio::io::stdout(), - reader: BufReader::new(tokio::io::stdin()), - write, - tx_err, - buf_cmd: String::new(), - } - } - - #[instrument(skip_all)] - async fn read_stdin(&mut self) -> Result<(), SenderClientError> { - self.buf_cmd.clear(); - - // read a shell command into the stdin and send it to the server - let byte_read = self - .reader - .read_line(&mut self.buf_cmd) - .await - .map_err(SenderClientError::IORead)?; - - debug!(?byte_read); - if byte_read == 0 { - info!("EOF received"); - self.exit().await - } else if self.check_exit() { - info!("exit received"); - self.exit().await - } else { - Ok(()) - } - } - - #[instrument(skip_all)] - fn check_exit(&self) -> bool { - self.buf_cmd.starts_with("exit") - } - - #[instrument(skip_all)] - async fn exit(&mut self) -> Result<(), SenderClientError> { - // check if the command is exit. Eventually, close the connection - - self.write - .send(Message::Close(None)) - .await - .expect("Error while closing websocket connection"); - info!("Closed websocket on client side"); - - self.tx_err.send(Ok(())).await.expect("channel error"); - - Ok(()) // send Ok(()) to close the connection on client side - //break Ok(()); - } - - #[instrument(skip_all)] - async fn send_to_server(&mut self) -> Result<(), SenderClientError> { - info!("Send command to the server"); - self.write - .send(Message::Binary(self.buf_cmd.as_bytes().to_vec())) - .await - .map_err(|err| SenderClientError::TungsteniteReadData { err })?; - - info!("Command sent: {}", self.buf_cmd); - - Ok(()) - } - - #[instrument(skip_all)] - async fn impl_write_stdout(&mut self, msg: Message) -> Result<(), SenderClientError> { - let data = msg.into_data(); - - self.stdout - .write(&data) - .await - .map_err(|err| SenderClientError::IOWrite { err })?; - - self.stdout.flush().await.expect("writing stdout"); - - Ok(()) - } - - #[instrument(skip_all)] - pub async fn write_stdout( - &mut self, - rx: &Mutex>, - ) -> Result<(), SenderClientError> { - // check if there are command outputs stored in the channel. Eventually, print them to the stdout - let mut channel = rx.lock().await; - - // wait to receive the first command output - let msg = channel.recv().await.unwrap(); - - self.impl_write_stdout(msg).await?; - - self.empty_buffer(channel).await?; - - Ok(()) - } - - async fn empty_buffer( - &mut self, - mut channel: MutexGuard<'_, UnboundedReceiver>, - ) -> Result<(), SenderClientError> { - loop { - match channel.try_recv() { - Ok(msg) => { - self.impl_write_stdout(msg).await?; - } - Err(TryRecvError::Empty) => { - // the channel is empty but the connection is still open - break Ok(()); // TODO: check that Ok(()) is a good return value - } - Err(TryRecvError::Disconnected) => { - unreachable!("the channel should not be dropped before the task is aborted") - } - } - } - } -} - -#[derive(Debug)] -pub struct SenderClient { - listener_url: Url, -} - -impl SenderClient { - pub fn new(listener_url: Url) -> Self { - Self { listener_url } - } - - async fn read_write( - write: SplitSink>, Message>, - rx: Arc>>, - tx_err: Sender>, - ) -> Result<(), SenderClientError> { - let mut iohandler = IOHandler::new(write, tx_err); - - // read from stdin and, if messages are present on the channel (rx) print them to the stdout - loop { - iohandler.read_stdin().await?; - iohandler.send_to_server().await?; - iohandler.write_stdout(&rx).await?; - } - /* - loop { - cmd.clear(); - // read a shell command from the stdin and send it to the server - reader - .read_line(&mut cmd) - .await - .map_err(SenderClientError::IORead)?; - - // check if the command is exit. Eventually, close the connection - if cmd.starts_with("exit") { - write - .send(Message::Close(None)) - .await - .expect("Error while closing websocket connection"); - info!("Closed websocket on client side"); - tx_err.send(Ok(())).await.expect("channel error"); // send Ok(()) to close the connection on client side - break Ok(()); - } - - info!("Send command to the server"); - write - .send(Message::Binary(cmd.as_bytes().to_vec())) - .await - .expect("error while sending a command through websocket to the server"); - info!("Command sent: {}", cmd); - - // check if there are command outputs stored in the channel. Eventually, print them to the stdout - let mut channel = rx.lock().await; - - let some = channel.recv().await.unwrap(); - let data = some.into_data(); - - stdout - .write(&data) - .await - .map_err(|err| SenderClientError::IOWrite { err })?; - stdout.flush().await.expect("writing stdout"); - - // TODO: define function - loop { - match channel.try_recv() { - Ok(cmd_out) => { - let data = cmd_out.into_data(); - - stdout - .write(&data) - .await - .map_err(|err| SenderClientError::IOWrite { err })?; - stdout.flush().await.expect("writing stdout"); - } - Err(TryRecvError::Empty) => { - // the channel is empty but the connection is still open - break; - } - Err(TryRecvError::Disconnected) => { - unreachable!("the channel should not be dropped before the task is aborted") - } - } - } - } - */ - } - - #[instrument(skip(self))] - pub async fn connect(&mut self) -> Result<(), SenderClientError> { - // Websocket connection to an existing server - let (ws_stream, _) = connect_async(self.listener_url.clone()) - .await - .map_err(SenderClientError::WebSocketConnect)?; - - info!("WebSocket handshake has been successfully completed"); - - let (write, read) = ws_stream.split(); - - let (tx_cmd_out, rx_cmd_out) = tokio::sync::mpsc::unbounded_channel::(); - let rx_cmd_out = Arc::new(Mutex::new(rx_cmd_out)); - let rx_cmd_out_clone = Arc::clone(&rx_cmd_out); - - let (tx_err, mut rx_err) = tokio::sync::mpsc::channel::>(1); - - // handle stdin and stdout - let handle_std_in_out = - tokio::spawn(Self::read_write(write, rx_cmd_out_clone, tx_err.clone())); - - let handle_read = tokio::spawn(async move { - let res = read - .map_err(|err| SenderClientError::TungsteniteReadData { err }) - .try_for_each(|cmd_out| async { - tx_cmd_out.send(cmd_out).map_err(SenderClientError::Channel) - }) - .await; - - if let Err(err) = res { - tx_err.send(Err(err)).await.expect("channel error"); - } - - Ok(()) - }); - - let mut handles = [handle_std_in_out, handle_read]; - - match rx_err.recv().await.expect("channel error") { - Ok(()) => { - info!("Closing websocket connection"); - Self::close(&mut handles, rx_cmd_out).await - } - Err(err) => { - error!("Fatal error: {}", err); - Self::close(&mut handles, rx_cmd_out).await?; - Err(err) - } - } - } - - #[instrument(skip_all)] - async fn close( - handles: &mut [JoinHandle>], - rx_cmd_out: Arc>>, - ) -> Result<(), SenderClientError> { - // abort the current active tasks - for h in handles.iter() { - h.abort(); - } - - for h in handles { - match h.await { - Err(err) if !err.is_cancelled() => { - error!("Join failed: {}", err) - } - Err(_) => { - trace!("Task cancelled") - } - Ok(res) => { - debug!("Task joined with: {:?}", res) - } - } - } - - // write the remaining elements from cmd out buffer to stdout - let mut channel = rx_cmd_out.lock().await; - let mut stdout = tokio::io::stdout(); - while let Ok(cmd_out) = channel.try_recv() { - let data = cmd_out.into_data(); - stdout - .write(&data) - .await - .map_err(|err| SenderClientError::IOWrite { err })?; - stdout.flush().await.expect("writing stdout"); - } - - info!("EXIT"); - - Ok(()) - } -} +pub mod device_server; +pub mod io_handler; +pub mod sender_client; +pub mod shell; diff --git a/rust-remote-shell/src/sender_client.rs b/rust-remote-shell/src/sender_client.rs new file mode 100644 index 0000000..2f4b2da --- /dev/null +++ b/rust-remote-shell/src/sender_client.rs @@ -0,0 +1,156 @@ +use std::sync::Arc; + +use futures::stream::SplitSink; +use futures::{StreamExt, TryStreamExt}; +use thiserror::Error; +use tokio::net::TcpStream; +use tokio::sync::mpsc::error::SendError; +use tokio::sync::mpsc::{Sender, UnboundedReceiver}; +use tokio::task::JoinHandle; +use tokio::{io::AsyncWriteExt, sync::Mutex}; +use tokio_tungstenite::{connect_async, tungstenite::Message}; +use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; +use tracing::{debug, error, info, instrument, trace}; +use url::Url; + +use crate::io_handler::IOHandler; + +#[derive(Error, Debug)] +pub enum SenderClientError { + #[error("Error while trying to connect with server")] + WebSocketConnect(#[from] tokio_tungstenite::tungstenite::Error), + #[error("IO error occurred while reading from stdin")] + IORead(#[from] std::io::Error), + #[error("IO error occurred while writing to stdout")] + IOWrite { + #[source] + err: std::io::Error, + }, + #[error("Error while trying to send the output of a command to the main task")] + Channel(#[from] SendError), + #[error("Error from Tungstenite while reading command")] + TungsteniteReadData { + #[source] + err: tokio_tungstenite::tungstenite::Error, + }, + #[error("Server disconnected")] + Disconnected, +} + +#[derive(Debug)] +pub struct SenderClient { + listener_url: Url, +} + +impl SenderClient { + pub fn new(listener_url: Url) -> Self { + Self { listener_url } + } + + async fn read_write( + write: SplitSink>, Message>, + rx: Arc>>, + tx_err: Sender>, + ) -> Result<(), SenderClientError> { + let mut iohandler = IOHandler::new(write, tx_err); + + // read from stdin and, if messages are present on the channel (rx) print them to the stdout + loop { + iohandler.read_stdin().await?; + iohandler.send_to_server().await?; + iohandler.write_stdout(&rx).await?; + } + } + + #[instrument(skip(self))] + pub async fn connect(&mut self) -> Result<(), SenderClientError> { + // Websocket connection to an existing server + let (ws_stream, _) = connect_async(self.listener_url.clone()) + .await + .map_err(SenderClientError::WebSocketConnect)?; + + info!("WebSocket handshake has been successfully completed"); + + let (write, read) = ws_stream.split(); + + let (tx_cmd_out, rx_cmd_out) = tokio::sync::mpsc::unbounded_channel::(); + let rx_cmd_out = Arc::new(Mutex::new(rx_cmd_out)); + let rx_cmd_out_clone = Arc::clone(&rx_cmd_out); + + let (tx_err, mut rx_err) = tokio::sync::mpsc::channel::>(1); + + // handle stdin and stdout + let handle_std_in_out = + tokio::spawn(Self::read_write(write, rx_cmd_out_clone, tx_err.clone())); + + let handle_read = tokio::spawn(async move { + let res = read + .map_err(|err| SenderClientError::TungsteniteReadData { err }) + .try_for_each(|cmd_out| async { + tx_cmd_out.send(cmd_out).map_err(SenderClientError::Channel) + }) + .await; + + if let Err(err) = res { + tx_err.send(Err(err)).await.expect("channel error"); + } + + Ok(()) + }); + + let mut handles = [handle_std_in_out, handle_read]; + + match rx_err.recv().await.expect("channel error") { + Ok(()) => { + info!("Closing websocket connection"); + Self::close(&mut handles, rx_cmd_out).await + } + Err(err) => { + error!("Fatal error: {}", err); + Self::close(&mut handles, rx_cmd_out).await?; + Err(err) + } + } + } + + #[instrument(skip_all)] + async fn close( + handles: &mut [JoinHandle>], + rx_cmd_out: Arc>>, + ) -> Result<(), SenderClientError> { + // abort the current active tasks + for h in handles.iter() { + h.abort(); + } + + for h in handles { + match h.await { + Err(err) if !err.is_cancelled() => { + error!("Join failed: {}", err) + } + Err(_) => { + trace!("Task cancelled") + } + Ok(res) => { + debug!("Task joined with: {:?}", res) + } + } + } + + // write the remaining elements from cmd out buffer to stdout + let mut channel = rx_cmd_out.lock().await; + let mut stdout = tokio::io::stdout(); + while let Ok(cmd_out) = channel.try_recv() { + let data = cmd_out.into_data(); + stdout + .write(&data) + .await + .map_err(|err| SenderClientError::IOWrite { err })?; + stdout.flush().await.expect("writing stdout"); + } + + info!("EXIT"); + + Ok(()) + } +} diff --git a/rust-remote-shell/src/shell.rs b/rust-remote-shell/src/shell.rs new file mode 100644 index 0000000..9f25b90 --- /dev/null +++ b/rust-remote-shell/src/shell.rs @@ -0,0 +1,78 @@ +use std::ffi::OsStr; +use std::io::{self}; +use std::string::FromUtf8Error; + +use thiserror::Error; +use tokio::process; +use tracing::{debug, error, instrument, warn}; + +#[derive(Error, Debug)] +pub enum ShellError { + #[error("Empty command")] + EmptyCommand, + #[error("Malformed input")] + MalformedInput, + #[error("Command {cmd} does not exists")] + WrongCommand { + cmd: String, + #[source] + error: io::Error, + }, + #[error("Error while formatting command output into UTF8")] + WrongOutConversion(#[from] FromUtf8Error), // TODO: find a way to test it +} + +#[derive(Default)] +pub struct CommandHandler; + +impl CommandHandler { + pub fn new() -> Self { + Self::default() + // TODO: open a remote shell (to the input IP addr) + } + + #[instrument(skip(self))] + pub async fn execute(&self, cmd: String) -> Result { + debug!("Execute command {}", cmd); + let cmd = shellwords::split(&cmd).map_err(|_| { + warn!("Malformed input"); + ShellError::MalformedInput + })?; + let cmd_out = match self.inner_execute(&cmd).await { + Ok(cmd_out) => { + debug!("Output computed"); + cmd_out + } + Err(ShellError::EmptyCommand) => { + warn!("Empty command"); + return Err(ShellError::EmptyCommand); + } + Err(ShellError::WrongCommand { cmd, error }) => { + warn!("Wrong command: {}", cmd); + return Err(ShellError::WrongCommand { cmd, error }); + } + _ => unreachable!("no other error can be thrown"), + }; + String::from_utf8(cmd_out.stdout).map_err(|err| { + warn!("Wrong output conversion"); + ShellError::WrongOutConversion(err) + }) + } + + async fn inner_execute(&self, cmd: &[S]) -> Result + where + S: AsRef, + { + let mut cmd_iter = cmd.iter(); + let cmd_to_exec = cmd_iter.next().ok_or(ShellError::EmptyCommand)?; + + process::Command::new(cmd_to_exec) + .args(cmd_iter) + .output() + .await + .map_err(|e| ShellError::WrongCommand { + cmd: cmd_to_exec.as_ref().to_string_lossy().to_string(), + error: e, + }) + } +} From 18ad1240e416686e695a07afb863cd574d278079 Mon Sep 17 00:00:00 2001 From: rgallor Date: Fri, 26 May 2023 16:39:39 +0200 Subject: [PATCH 10/11] Divide library into submodules and fix tokio-tungstenite version Signed-off-by: rgallor --- Cargo.lock | 22 +++++++++++----------- rust-remote-shell/Cargo.toml | 2 +- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4d8b73d..b540c4b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -58,6 +58,12 @@ dependencies = [ "rustc-demangle", ] +[[package]] +name = "base64" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" + [[package]] name = "bitflags" version = "1.3.2" @@ -196,12 +202,6 @@ dependencies = [ "typenum", ] -[[package]] -name = "data-encoding" -version = "2.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23d8666cb01533c39dde32bcbab8e227b4ed6679b2c925eba05feabea39508fb" - [[package]] name = "digest" version = "0.10.6" @@ -897,9 +897,9 @@ dependencies = [ [[package]] name = "tokio-tungstenite" -version = "0.19.0" +version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec509ac96e9a0c43427c74f003127d953a265737636129424288d27cb5c4b12c" +checksum = "54319c93411147bced34cb5609a80e0a8e44c5999c93903a81cd866630ec0bfd" dependencies = [ "futures-util", "log", @@ -977,13 +977,13 @@ dependencies = [ [[package]] name = "tungstenite" -version = "0.19.0" +version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15fba1a6d6bb030745759a9a2a588bfe8490fc8b4751a277db3a0be1c9ebbf67" +checksum = "30ee6ab729cd4cf0fd55218530c4522ed30b7b6081752839b68fcec8d0960788" dependencies = [ + "base64", "byteorder", "bytes", - "data-encoding", "http", "httparse", "log", diff --git a/rust-remote-shell/Cargo.toml b/rust-remote-shell/Cargo.toml index 4447a25..92a0932 100644 --- a/rust-remote-shell/Cargo.toml +++ b/rust-remote-shell/Cargo.toml @@ -12,6 +12,6 @@ futures = "0.3.28" shellwords = "1.1.0" thiserror = "1.0.40" tokio = { version = "1.28.1", features = ["full"] } -tokio-tungstenite = "0.19.0" +tokio-tungstenite = "0.18.0" tracing = "0.1.37" url = "2.3.1" From 77dedd85fd53e18d90043ea414c8f2f5bed59699 Mon Sep 17 00:00:00 2001 From: rgallor Date: Fri, 16 Jun 2023 17:23:12 +0200 Subject: [PATCH 11/11] Fix PR suggestion Signed-off-by: rgallor --- cli/src/main.rs | 4 +--- rust-remote-shell/src/device_server.rs | 13 ++++++------- rust-remote-shell/src/io_handler.rs | 3 ++- rust-remote-shell/src/shell.rs | 5 ----- 4 files changed, 9 insertions(+), 16 deletions(-) diff --git a/cli/src/main.rs b/cli/src/main.rs index a079a0b..3c6a070 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -1,13 +1,11 @@ use std::net::SocketAddr; use clap::{Parser, Subcommand}; - use color_eyre::Result; use tracing::Level; use tracing_subscriber::FmtSubscriber; -use rust_remote_shell::device_server::DeviceServer; -use rust_remote_shell::sender_client::SenderClient; +use rust_remote_shell::{device_server::DeviceServer, sender_client::SenderClient}; /// CLI for a rust remote shell #[derive(Debug, Parser)] diff --git a/rust-remote-shell/src/device_server.rs b/rust-remote-shell/src/device_server.rs index 8542a41..75c0f8c 100644 --- a/rust-remote-shell/src/device_server.rs +++ b/rust-remote-shell/src/device_server.rs @@ -3,7 +3,7 @@ use std::net::SocketAddr; use std::string::FromUtf8Error; use std::sync::Arc; -use futures::{future, SinkExt, StreamExt, TryStreamExt}; +use futures::{SinkExt, StreamExt, TryStreamExt}; use thiserror::Error; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::Mutex; @@ -140,21 +140,20 @@ impl DeviceServer { // Read the received command read.map_err(DeviceServerError::Transport) - .and_then(|msg| { - let cmd = match msg { + .and_then(|msg| async move { + info!("Received command from the client"); + match msg { // convert the message from a Vec into a OsString Message::Binary(v) => { String::from_utf8(v).map_err(DeviceServerError::Utf8Error) } Message::Close(_) => Err(DeviceServerError::CloseWebsocket), // the client closed the connection _ => Err(DeviceServerError::ReadCommand), - }; - info!("Received command from the client"); - future::ready(cmd) + } }) .and_then(|cmd| async move { // define a command handler - let cmd_handler = CommandHandler::new(); + let cmd_handler = CommandHandler::default(); // execute the command and eventually return the error let cmd_out = cmd_handler.execute(cmd).await.unwrap_or_else(|err| { diff --git a/rust-remote-shell/src/io_handler.rs b/rust-remote-shell/src/io_handler.rs index 57bf06f..49f6b65 100644 --- a/rust-remote-shell/src/io_handler.rs +++ b/rust-remote-shell/src/io_handler.rs @@ -39,7 +39,7 @@ impl IOHandler { pub async fn read_stdin(&mut self) -> Result<(), SenderClientError> { self.buf_cmd.clear(); - // read a shell command into the stdin and send it to the server + // read a shell command from the stdin and send it to the server let byte_read = self .reader .read_line(&mut self.buf_cmd) @@ -105,6 +105,7 @@ impl IOHandler { self.impl_write_stdout(msg).await?; + // if the channel still contains information, empty it before aborting the task self.empty_buffer(channel).await?; Ok(()) diff --git a/rust-remote-shell/src/shell.rs b/rust-remote-shell/src/shell.rs index 9f25b90..f4caa4f 100644 --- a/rust-remote-shell/src/shell.rs +++ b/rust-remote-shell/src/shell.rs @@ -26,11 +26,6 @@ pub enum ShellError { pub struct CommandHandler; impl CommandHandler { - pub fn new() -> Self { - Self::default() - // TODO: open a remote shell (to the input IP addr) - } - #[instrument(skip(self))] pub async fn execute(&self, cmd: String) -> Result { debug!("Execute command {}", cmd);