diff --git a/Cargo.lock b/Cargo.lock index 759da6164059e..c58fff899d44d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -675,7 +675,7 @@ dependencies = [ "arrow-schema", "chrono", "half 2.3.1", - "indexmap 2.2.6", + "indexmap 2.7.0", "lexical-core", "num", "serde", @@ -875,7 +875,7 @@ dependencies = [ "futures-util", "handlebars", "http 1.1.0", - "indexmap 2.2.6", + "indexmap 2.7.0", "lru 0.7.8", "mime", "multer", @@ -921,7 +921,7 @@ dependencies = [ "Inflector", "async-graphql-parser", "darling 0.20.3", - "proc-macro-crate", + "proc-macro-crate 1.1.3", "proc-macro2 1.0.87", "quote 1.0.37", "strum 0.25.0", @@ -948,21 +948,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7cf4d4e86208f4f9b81a503943c07e6e7f29ad3505e6c9ce6431fe64dc241681" dependencies = [ "bytes", - "indexmap 2.2.6", + "indexmap 2.7.0", "serde", "serde_json", ] -[[package]] -name = "async-lock" -version = "2.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8101efe8695a6c17e02911402145357e718ac92d3ff88ae8419e84b1707b685" -dependencies = [ - "event-listener", - "futures-lite", -] - [[package]] name = "async-recursion" version = "1.0.4" @@ -1852,9 +1842,6 @@ name = "beef" version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a8241f3ebb85c056b509d4327ad0358fbbba6ffb340bf388f26350aeda225b1" -dependencies = [ - "serde", -] [[package]] name = "bellpepper" @@ -2028,9 +2015,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.4.1" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07" +checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" [[package]] name = "bitmaps" @@ -2453,6 +2440,12 @@ dependencies = [ "libc", ] +[[package]] +name = "cesu8" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c" + [[package]] name = "cexpr" version = "0.6.0" @@ -2976,9 +2969,9 @@ dependencies = [ [[package]] name = "core-foundation" -version = "0.9.3" +version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "194a7a9e6de53fa55116934067c844d9d749312f75c6f6d0980e8c252f8c2146" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" dependencies = [ "core-foundation-sys", "libc", @@ -2986,9 +2979,9 @@ dependencies = [ [[package]] name = "core-foundation-sys" -version = "0.8.3" +version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc" +checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" [[package]] name = "core2" @@ -3409,7 +3402,7 @@ version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "718f6cd8c54ae5249fd42b0c86639df0100b8a86eea2e5f1b915cde2e1481453" dependencies = [ - "indexmap 2.2.6", + "indexmap 2.7.0", "lalrpop-util", "logos", ] @@ -3750,7 +3743,7 @@ version = "2.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "65e13bab2796f412722112327f3e575601a3e9cdcbe426f0d30dbf43f3f5dc71" dependencies = [ - "bitflags 2.4.1", + "bitflags 2.6.0", "byteorder", "chrono", "diesel_derives", @@ -5461,7 +5454,7 @@ dependencies = [ "fixedbitset 0.4.2", "guppy-summaries", "guppy-workspace-hack", - "indexmap 2.2.6", + "indexmap 2.7.0", "itertools 0.13.0", "nested", "once_cell", @@ -5510,7 +5503,7 @@ dependencies = [ "futures-sink", "futures-util", "http 0.2.9", - "indexmap 2.2.6", + "indexmap 2.7.0", "slab", "tokio", "tokio-util 0.7.10", @@ -5529,7 +5522,7 @@ dependencies = [ "futures-core", "futures-sink", "http 1.1.0", - "indexmap 2.2.6", + "indexmap 2.7.0", "slab", "tokio", "tokio-util 0.7.10", @@ -5624,6 +5617,12 @@ dependencies = [ "allocator-api2", ] +[[package]] +name = "hashbrown" +version = "0.15.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289" + [[package]] name = "hashers" version = "1.0.1" @@ -5932,7 +5931,6 @@ dependencies = [ "rustls-native-certs 0.6.2", "tokio", "tokio-rustls 0.23.4", - "webpki-roots 0.22.6", ] [[package]] @@ -6321,12 +6319,12 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.2.6" +version = "2.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26" +checksum = "62f822373a4fe84d4bb149bf54e584a7f4abec90e072ed49cda0edea5b95471f" dependencies = [ "equivalent", - "hashbrown 0.14.1", + "hashbrown 0.15.2", "serde", ] @@ -6570,6 +6568,26 @@ dependencies = [ "libc", ] +[[package]] +name = "jni" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6df18c2e3db7e453d3c6ac5b3e9d5182664d28788126d39b91f2d1e22b017ec" +dependencies = [ + "cesu8", + "combine", + "jni-sys", + "log", + "thiserror 1.0.64", + "walkdir", +] + +[[package]] +name = "jni-sys" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130" + [[package]] name = "jobserver" version = "0.1.25" @@ -6629,8 +6647,9 @@ dependencies = [ [[package]] name = "jsonrpsee" -version = "0.16.2" -source = "git+https://github.com/wlmyng/jsonrpsee.git?rev=b1b300784795f6a64d0fcdf8f03081a9bc38bde8#b1b300784795f6a64d0fcdf8f03081a9bc38bde8" +version = "0.24.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5c71d8c1a731cc4227c2f698d377e7848ca12c8a48866fc5e6951c43a4db843" dependencies = [ "jsonrpsee-core", "jsonrpsee-http-client", @@ -6638,100 +6657,117 @@ dependencies = [ "jsonrpsee-server", "jsonrpsee-types", "jsonrpsee-ws-client", + "tokio", "tracing", ] [[package]] name = "jsonrpsee-client-transport" -version = "0.16.2" -source = "git+https://github.com/wlmyng/jsonrpsee.git?rev=b1b300784795f6a64d0fcdf8f03081a9bc38bde8#b1b300784795f6a64d0fcdf8f03081a9bc38bde8" +version = "0.24.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "548125b159ba1314104f5bb5f38519e03a41862786aa3925cf349aae9cdd546e" dependencies = [ + "base64 0.22.1", "futures-util", - "http 0.2.9", + "http 1.1.0", "jsonrpsee-core", - "jsonrpsee-types", "pin-project", - "rustls-native-certs 0.6.2", + "rustls 0.23.20", + "rustls-pki-types", + "rustls-platform-verifier", "soketto", "thiserror 1.0.64", "tokio", - "tokio-rustls 0.23.4", + "tokio-rustls 0.26.0", "tokio-util 0.7.10", "tracing", - "webpki-roots 0.22.6", + "url", ] [[package]] name = "jsonrpsee-core" -version = "0.16.2" -source = "git+https://github.com/wlmyng/jsonrpsee.git?rev=b1b300784795f6a64d0fcdf8f03081a9bc38bde8#b1b300784795f6a64d0fcdf8f03081a9bc38bde8" +version = "0.24.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2882f6f8acb9fdaec7cefc4fd607119a9bd709831df7d7672a1d3b644628280" dependencies = [ - "anyhow", - "arrayvec 0.7.2", - "async-lock", "async-trait", - "beef", - "futures-channel", + "bytes", "futures-timer", "futures-util", - "globset", - "hyper 0.14.26", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", "jsonrpsee-types", "parking_lot 0.12.1", + "pin-project", "rand 0.8.5", - "rustc-hash 1.1.0", + "rustc-hash 2.0.0", "serde", "serde_json", - "soketto", "thiserror 1.0.64", "tokio", + "tokio-stream", "tracing", ] [[package]] name = "jsonrpsee-http-client" -version = "0.16.2" -source = "git+https://github.com/wlmyng/jsonrpsee.git?rev=b1b300784795f6a64d0fcdf8f03081a9bc38bde8#b1b300784795f6a64d0fcdf8f03081a9bc38bde8" +version = "0.24.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3638bc4617f96675973253b3a45006933bde93c2fd8a6170b33c777cc389e5b" dependencies = [ "async-trait", - "hyper 0.14.26", - "hyper-rustls 0.23.2", + "base64 0.22.1", + "http-body 1.0.1", + "hyper 1.4.1", + "hyper-rustls 0.27.2", + "hyper-util", "jsonrpsee-core", "jsonrpsee-types", - "rustc-hash 1.1.0", + "rustls 0.23.20", + "rustls-platform-verifier", "serde", "serde_json", "thiserror 1.0.64", "tokio", + "tower 0.4.13", "tracing", + "url", ] [[package]] name = "jsonrpsee-proc-macros" -version = "0.16.2" -source = "git+https://github.com/wlmyng/jsonrpsee.git?rev=b1b300784795f6a64d0fcdf8f03081a9bc38bde8#b1b300784795f6a64d0fcdf8f03081a9bc38bde8" +version = "0.24.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c06c01ae0007548e73412c08e2285ffe5d723195bf268bce67b1b77c3bb2a14d" dependencies = [ - "heck 0.4.1", - "proc-macro-crate", + "heck 0.5.0", + "proc-macro-crate 3.2.0", "proc-macro2 1.0.87", "quote 1.0.37", - "syn 1.0.107", + "syn 2.0.87", ] [[package]] name = "jsonrpsee-server" -version = "0.16.2" -source = "git+https://github.com/wlmyng/jsonrpsee.git?rev=b1b300784795f6a64d0fcdf8f03081a9bc38bde8#b1b300784795f6a64d0fcdf8f03081a9bc38bde8" +version = "0.24.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82ad8ddc14be1d4290cd68046e7d1d37acd408efed6d3ca08aefcc3ad6da069c" dependencies = [ - "futures-channel", "futures-util", - "http 0.2.9", - "hyper 0.14.26", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.4.1", + "hyper-util", "jsonrpsee-core", "jsonrpsee-types", + "pin-project", + "route-recognizer", "serde", "serde_json", "soketto", + "thiserror 1.0.64", "tokio", "tokio-stream", "tokio-util 0.7.10", @@ -6741,26 +6777,27 @@ dependencies = [ [[package]] name = "jsonrpsee-types" -version = "0.16.2" -source = "git+https://github.com/wlmyng/jsonrpsee.git?rev=b1b300784795f6a64d0fcdf8f03081a9bc38bde8#b1b300784795f6a64d0fcdf8f03081a9bc38bde8" +version = "0.24.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a178c60086f24cc35bb82f57c651d0d25d99c4742b4d335de04e97fa1f08a8a1" dependencies = [ - "anyhow", - "beef", + "http 1.1.0", "serde", "serde_json", "thiserror 1.0.64", - "tracing", ] [[package]] name = "jsonrpsee-ws-client" -version = "0.16.2" -source = "git+https://github.com/wlmyng/jsonrpsee.git?rev=b1b300784795f6a64d0fcdf8f03081a9bc38bde8#b1b300784795f6a64d0fcdf8f03081a9bc38bde8" +version = "0.24.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fe322e0896d0955a3ebdd5bf813571c53fea29edd713bc315b76620b327e86d" dependencies = [ - "http 0.2.9", + "http 1.1.0", "jsonrpsee-client-transport", "jsonrpsee-core", "jsonrpsee-types", + "url", ] [[package]] @@ -7579,7 +7616,7 @@ name = "move-bytecode-utils" version = "0.1.0" dependencies = [ "anyhow", - "indexmap 2.2.6", + "indexmap 2.7.0", "move-binary-format", "move-core-types", "petgraph 0.5.1", @@ -7787,7 +7824,7 @@ dependencies = [ "clap", "codespan", "colored", - "indexmap 2.2.6", + "indexmap 2.7.0", "move-abstract-interpreter", "move-binary-format", "move-bytecode-source-map", @@ -8377,7 +8414,7 @@ version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d6d4752e6230d8ef7adf7bd5d8c4b1f6561c1014c5ba9a37445ccefe18aa1db" dependencies = [ - "proc-macro-crate", + "proc-macro-crate 1.1.3", "proc-macro-error", "proc-macro2 1.0.87", "quote 1.0.37", @@ -8487,7 +8524,7 @@ dependencies = [ "fastcrypto-tbls", "hashbrown 0.12.3", "impl-trait-for-tuples", - "indexmap 2.2.6", + "indexmap 2.7.0", "mysten-util-mem-derive", "once_cell", "parking_lot 0.12.1", @@ -8683,7 +8720,7 @@ version = "6.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d" dependencies = [ - "bitflags 2.4.1", + "bitflags 2.6.0", "crossbeam-channel", "filetime", "fsevent-sys", @@ -8732,7 +8769,7 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "066b468120587a402f0b47d8f80035c921f6a46f8209efd0632a89a16f5188a4" dependencies = [ - "proc-macro-crate", + "proc-macro-crate 1.1.3", "proc-macro2 1.0.87", "quote 1.0.37", "syn 1.0.107", @@ -8933,7 +8970,7 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96667db765a921f7b295ffee8b60472b686a51d4f21c2ee4ffdb94c7013b65a6" dependencies = [ - "proc-macro-crate", + "proc-macro-crate 1.1.3", "proc-macro2 1.0.87", "quote 1.0.37", "syn 2.0.87", @@ -8945,7 +8982,7 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af1844ef2428cc3e1cb900be36181049ef3d3193c63e43026cfe202983b27a56" dependencies = [ - "proc-macro-crate", + "proc-macro-crate 1.1.3", "proc-macro2 1.0.87", "quote 1.0.37", "syn 2.0.87", @@ -9083,7 +9120,7 @@ checksum = "1e32339a5dc40459130b3bd269e9892439f55b33e772d2a9d402a789baaf4e8a" dependencies = [ "futures-core", "futures-sink", - "indexmap 2.2.6", + "indexmap 2.7.0", "js-sys", "once_cell", "pin-project-lite", @@ -9321,7 +9358,7 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1557010476e0595c9b568d16dcfb81b93cdeb157612726f5170d31aa707bed27" dependencies = [ - "proc-macro-crate", + "proc-macro-crate 1.1.3", "proc-macro2 1.0.87", "quote 1.0.37", "syn 1.0.107", @@ -9333,7 +9370,7 @@ version = "3.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "312270ee71e1cd70289dacf597cab7b207aa107d2f28191c2ae45b2ece18a260" dependencies = [ - "proc-macro-crate", + "proc-macro-crate 1.1.3", "proc-macro2 1.0.87", "quote 1.0.37", "syn 1.0.107", @@ -9464,11 +9501,11 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "499cff8432e71c5f8784d9645aac0f9fca604d67f59b68a606170b5e229c6538" dependencies = [ - "bitflags 2.4.1", + "bitflags 2.6.0", "ciborium", "coset", "data-encoding", - "indexmap 2.2.6", + "indexmap 2.7.0", "rand 0.8.5", "serde", "serde_json", @@ -9659,7 +9696,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" dependencies = [ "fixedbitset 0.4.2", - "indexmap 2.2.6", + "indexmap 2.7.0", ] [[package]] @@ -10040,6 +10077,15 @@ dependencies = [ "toml 0.5.11", ] +[[package]] +name = "proc-macro-crate" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecf48c7ca261d60b74ab1a7b20da18bede46776b2e55535cb958eb595c5fa7b" +dependencies = [ + "toml_edit 0.22.22", +] + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -10158,7 +10204,7 @@ checksum = "14cae93065090804185d3b75f0bf93b8eeda30c7a9b4a33d3bdb3988d6229e50" dependencies = [ "bit-set 0.8.0", "bit-vec 0.8.0", - "bitflags 2.4.1", + "bitflags 2.6.0", "lazy_static", "num-traits", "rand 0.8.5", @@ -10347,7 +10393,7 @@ version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57206b407293d2bcd3af849ce869d52068623f19e1b5ff8e8778e3309439682b" dependencies = [ - "bitflags 2.4.1", + "bitflags 2.6.0", "memchr", "unicase", ] @@ -11003,6 +11049,12 @@ dependencies = [ "serde", ] +[[package]] +name = "route-recognizer" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afab94fb28594581f62d981211a9a4d53cc8130bbcbbb89a0440d9b8e81a7746" + [[package]] name = "rsa" version = "0.8.2" @@ -11163,7 +11215,7 @@ dependencies = [ "aes", "aes-gcm", "async-trait", - "bitflags 2.4.1", + "bitflags 2.6.0", "byteorder", "chacha20", "ctr", @@ -11321,7 +11373,7 @@ version = "0.38.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72e572a5e8ca657d7366229cdde4bd14c4eb5499a9573d4d366fe1b599daa316" dependencies = [ - "bitflags 2.4.1", + "bitflags 2.6.0", "errno 0.3.8", "libc", "linux-raw-sys 0.4.12", @@ -11417,6 +11469,33 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2bf47e6ff922db3825eb750c4e2ff784c6ff8fb9e13046ef6a1d1c5401b0b37" +[[package]] +name = "rustls-platform-verifier" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afbb878bdfdf63a336a5e63561b1835e7a8c91524f51621db870169eac84b490" +dependencies = [ + "core-foundation", + "core-foundation-sys", + "jni", + "log", + "once_cell", + "rustls 0.23.20", + "rustls-native-certs 0.7.1", + "rustls-platform-verifier-android", + "rustls-webpki 0.102.8", + "security-framework", + "security-framework-sys", + "webpki-roots 0.26.3", + "winapi", +] + +[[package]] +name = "rustls-platform-verifier-android" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f87165f0995f63a9fbeea62b64d10b4d9d8e78ec6d7d51fb2125fda7bb36788f" + [[package]] name = "rustls-webpki" version = "0.101.7" @@ -11533,7 +11612,7 @@ version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "abf2c68b89cafb3b8d918dd07b42be0da66ff202cf1155c5739a4e0c1ea0dc19" dependencies = [ - "proc-macro-crate", + "proc-macro-crate 1.1.3", "proc-macro2 1.0.87", "quote 1.0.37", "syn 1.0.107", @@ -11682,22 +11761,23 @@ dependencies = [ [[package]] name = "security-framework" -version = "2.7.0" +version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bc1bb97804af6631813c55739f771071e0f2ed33ee20b68c86ec505d906356c" +checksum = "c627723fd09706bacdb5cf41499e95098555af3c3c29d014dc3c458ef6be11c0" dependencies = [ - "bitflags 1.3.2", + "bitflags 2.6.0", "core-foundation", "core-foundation-sys", "libc", + "num-bigint 0.4.4", "security-framework-sys", ] [[package]] name = "security-framework-sys" -version = "2.6.1" +version = "2.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0160a13a177a45bfb43ce71c01580998474f556ad854dcbca936dd2841a5c556" +checksum = "1863fd3768cd83c56a7f60faa4dc0d403f1b6df0a38c3c25f44b7894e45370d5" dependencies = [ "core-foundation-sys", "libc", @@ -11835,7 +11915,7 @@ version = "1.0.128" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ff5456707a1de34e7e37f2a6fd3d3f808c318259cbd01ab6377795054b483d8" dependencies = [ - "indexmap 2.2.6", + "indexmap 2.7.0", "itoa", "memchr", "ryu", @@ -11920,7 +12000,7 @@ dependencies = [ "chrono", "hex", "indexmap 1.9.3", - "indexmap 2.2.6", + "indexmap 2.7.0", "serde", "serde_derive", "serde_json", @@ -11987,19 +12067,6 @@ dependencies = [ "serde", ] -[[package]] -name = "sha-1" -version = "0.9.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99cd6713db3cf16b6c84e06321e049a9b9f699826e16096d23bbcc44d15d51a6" -dependencies = [ - "block-buffer 0.9.0", - "cfg-if", - "cpufeatures", - "digest 0.9.0", - "opaque-debug", -] - [[package]] name = "sha-1" version = "0.10.1" @@ -12377,18 +12444,18 @@ dependencies = [ [[package]] name = "soketto" -version = "0.7.1" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41d1c5305e39e09653383c2c7244f2f78b3bcae37cf50c64cb4789c9f5096ec2" +checksum = "2e859df029d160cb88608f5d7df7fb4753fd20fdfb4de5644f3d8b8440841721" dependencies = [ - "base64 0.13.1", + "base64 0.22.1", "bytes", "futures", - "http 0.2.9", + "http 1.1.0", "httparse", "log", "rand 0.8.5", - "sha-1 0.9.8", + "sha1", ] [[package]] @@ -13470,7 +13537,7 @@ dependencies = [ "fastcrypto-zkp", "fs_extra", "futures", - "indexmap 2.2.6", + "indexmap 2.7.0", "insta", "jsonrpsee", "move-binary-format", @@ -14118,7 +14185,7 @@ dependencies = [ "futures", "http-body 0.4.5", "hyper 1.4.1", - "indexmap 2.2.6", + "indexmap 2.7.0", "itertools 0.13.0", "jsonrpsee", "mockall", @@ -14430,7 +14497,7 @@ dependencies = [ "fastcrypto", "fastcrypto-vdf", "fastcrypto-zkp", - "indexmap 2.2.6", + "indexmap 2.7.0", "move-binary-format", "move-core-types", "move-stdlib-natives", @@ -14491,7 +14558,7 @@ dependencies = [ "better_any", "fastcrypto", "fastcrypto-zkp", - "indexmap 2.2.6", + "indexmap 2.7.0", "move-binary-format", "move-core-types", "move-stdlib-natives-v2", @@ -15361,7 +15428,7 @@ dependencies = [ "bcs", "clap", "futures", - "indexmap 2.2.6", + "indexmap 2.7.0", "move-binary-format", "move-core-types", "move-package", @@ -15663,7 +15730,7 @@ dependencies = [ "fastcrypto-tbls", "fastcrypto-zkp", "im", - "indexmap 2.2.6", + "indexmap 2.7.0", "itertools 0.13.0", "lru 0.10.0", "move-binary-format", @@ -16269,7 +16336,7 @@ dependencies = [ "hex", "num-traits", "serde", - "sha-1 0.10.1", + "sha-1", "test-fuzz-internal", ] @@ -16703,7 +16770,7 @@ dependencies = [ "serde", "serde_spanned", "toml_datetime 0.6.8", - "toml_edit 0.22.17", + "toml_edit 0.22.22", ] [[package]] @@ -16760,11 +16827,11 @@ dependencies = [ [[package]] name = "toml_edit" -version = "0.22.17" +version = "0.22.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d9f8729f5aea9562aac1cc0441f5d6de3cff1ee0c5d67293eeca5eb36ee7c16" +checksum = "4ae48d6208a266e853d946088ed816055e556cc6028c5e8e2b84d9fa5dd7c7f5" dependencies = [ - "indexmap 2.2.6", + "indexmap 2.7.0", "serde", "serde_spanned", "toml_datetime 0.6.8", @@ -16940,7 +17007,7 @@ checksum = "2873938d487c3cfb9aed7546dc9f2711d867c9f90c46b889989a2cb84eba6b4f" dependencies = [ "futures-core", "futures-util", - "indexmap 2.2.6", + "indexmap 2.7.0", "pin-project-lite", "slab", "sync_wrapper 0.1.2", @@ -16959,7 +17026,7 @@ checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" dependencies = [ "async-compression", "base64 0.21.7", - "bitflags 2.4.1", + "bitflags 2.6.0", "bytes", "futures-core", "futures-util", @@ -16989,7 +17056,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8437150ab6bbc8c5f0f519e3d5ed4aa883a83dd4cdd3d1b21f9482936046cb97" dependencies = [ "base64 0.22.1", - "bitflags 2.4.1", + "bitflags 2.6.0", "bytes", "http 1.1.0", "http-body 1.0.1", @@ -17850,15 +17917,6 @@ dependencies = [ "untrusted 0.9.0", ] -[[package]] -name = "webpki-roots" -version = "0.22.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6c71e40d7d2c34a5106301fb632274ca37242cd0c9d3e64dbece371a40a2d87" -dependencies = [ - "webpki", -] - [[package]] name = "webpki-roots" version = "0.25.2" diff --git a/Cargo.toml b/Cargo.toml index 4b68c52720115..31a797718037e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -373,13 +373,7 @@ integer-encoding = "3.0.1" ipnetwork = "0.20.0" itertools = "0.13.0" jemalloc-ctl = "^0.5" -jsonrpsee = { git = "https://github.com/wlmyng/jsonrpsee.git", rev = "b1b300784795f6a64d0fcdf8f03081a9bc38bde8", features = [ - "server", - "macros", - "ws-client", - "http-client", - "jsonrpsee-core", -] } +jsonrpsee = { version = "0.24.7", features = ["server", "macros", "ws-client", "http-client", "jsonrpsee-core"] } json_to_table = { git = "https://github.com/zhiburt/tabled/", rev = "e449317a1c02eb6b29e409ad6617e5d9eb7b3bd4" } leb128 = "0.2.5" lru = "0.10" diff --git a/crates/sui-e2e-tests/tests/traffic_control_tests.rs b/crates/sui-e2e-tests/tests/traffic_control_tests.rs index ca981a0ed58e1..71470781cb0bb 100644 --- a/crates/sui-e2e-tests/tests/traffic_control_tests.rs +++ b/crates/sui-e2e-tests/tests/traffic_control_tests.rs @@ -6,10 +6,7 @@ use core::panic; use fastcrypto::encoding::Base64; -use jsonrpsee::{ - core::{client::ClientT, RpcResult}, - rpc_params, -}; +use jsonrpsee::{core::client::ClientT, rpc_params}; use std::fs::File; use std::num::NonZeroUsize; use std::time::Duration; @@ -201,7 +198,7 @@ async fn test_fullnode_traffic_control_dry_run() -> Result<(), anyhow::Error> { // it should take no more than 4 requests to be added to the blocklist for _ in 0..txn_count { - let response: RpcResult = jsonrpc_client + let response: Result = jsonrpc_client .request("sui_getTransactionBlock", rpc_params![*tx_digest]) .await; assert!( @@ -307,7 +304,7 @@ async fn test_fullnode_traffic_control_spam_blocked() -> Result<(), anyhow::Erro // it should take no more than 4 requests to be added to the blocklist for _ in 0..txn_count { - let response: RpcResult = jsonrpc_client + let response: Result = jsonrpc_client .request("sui_getTransactionBlock", rpc_params![*tx_digest]) .await; if let Err(err) = response { @@ -362,7 +359,7 @@ async fn test_fullnode_traffic_control_error_blocked() -> Result<(), anyhow::Err SuiTransactionBlockResponseOptions::new(), ExecuteTransactionRequestType::WaitForLocalExecution ]; - let response: RpcResult = jsonrpc_client + let response: Result = jsonrpc_client .request("sui_executeTransactionBlock", params.clone()) .await; if let Err(err) = response { @@ -517,7 +514,7 @@ async fn test_fullnode_traffic_control_spam_delegated() -> Result<(), anyhow::Er assert!(confirmed_local_execution.unwrap()); for _ in 0..txn_count { - let response: RpcResult = jsonrpc_client + let response: Result = jsonrpc_client .request("sui_getTransactionBlock", rpc_params![*tx_digest]) .await; assert!(response.is_ok(), "Expected request to succeed"); @@ -845,7 +842,7 @@ async fn assert_validator_traffic_control_dry_run( // it should take no more than 4 requests to be added to the blocklist for _ in 0..txn_count { - let response: RpcResult = jsonrpc_client + let response: Result = jsonrpc_client .request("sui_getTransactionBlock", rpc_params![*tx_digest]) .await; assert!( diff --git a/crates/sui-indexer-alt-jsonrpc/src/api/mod.rs b/crates/sui-indexer-alt-jsonrpc/src/api/mod.rs index ba01b9fce33aa..3e19dd721e6f1 100644 --- a/crates/sui-indexer-alt-jsonrpc/src/api/mod.rs +++ b/crates/sui-indexer-alt-jsonrpc/src/api/mod.rs @@ -8,11 +8,7 @@ use diesel::query_dsl::methods::LimitDsl; use diesel::result::Error as DieselError; use diesel_async::methods::LoadQuery; use diesel_async::RunQueryDsl; -use jsonrpsee::core::Error as RpcError; -use jsonrpsee::types::{ - error::{CallError, INTERNAL_ERROR_CODE}, - ErrorObject, -}; +use jsonrpsee::types::{error::INTERNAL_ERROR_CODE, ErrorObject}; use sui_pg_db as db; use tracing::debug; @@ -49,20 +45,16 @@ impl<'p> Connection<'p> { } } -impl From for RpcError { - fn from(err: DbError) -> RpcError { +impl From for ErrorObject<'static> { + fn from(err: DbError) -> Self { match err { - DbError::Connect(err) => RpcError::Call(CallError::Custom(ErrorObject::owned( - INTERNAL_ERROR_CODE, - err.to_string(), - None::<()>, - ))), + DbError::Connect(err) => { + ErrorObject::owned(INTERNAL_ERROR_CODE, err.to_string(), None::<()>) + } - DbError::RunQuery(err) => RpcError::Call(CallError::Custom(ErrorObject::owned( - INTERNAL_ERROR_CODE, - err.to_string(), - None::<()>, - ))), + DbError::RunQuery(err) => { + ErrorObject::owned(INTERNAL_ERROR_CODE, err.to_string(), None::<()>) + } } } } diff --git a/crates/sui-indexer-alt-jsonrpc/src/lib.rs b/crates/sui-indexer-alt-jsonrpc/src/lib.rs index 1b1a6c9358ed8..eb21b2b072499 100644 --- a/crates/sui-indexer-alt-jsonrpc/src/lib.rs +++ b/crates/sui-indexer-alt-jsonrpc/src/lib.rs @@ -76,9 +76,7 @@ impl RpcService { info!("Starting JSON-RPC service on {listen_address}",); - let handle = server - .start(modules) - .context("Failed to start JSON-RPC service")?; + let handle = server.start(modules); Ok(tokio::spawn(async move { handle.stopped().await; diff --git a/crates/sui-indexer/src/apis/extended_api.rs b/crates/sui-indexer/src/apis/extended_api.rs index 9b9827ea2bbe1..41fadcd17bca1 100644 --- a/crates/sui-indexer/src/apis/extended_api.rs +++ b/crates/sui-indexer/src/apis/extended_api.rs @@ -1,7 +1,7 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::indexer_reader::IndexerReader; +use crate::{errors::IndexerError, indexer_reader::IndexerReader}; use jsonrpsee::{core::RpcResult, RpcModule}; use sui_json_rpc::SuiRpcModule; use sui_json_rpc_api::{validate_limit, ExtendedApiServer, QUERY_MAX_RESULT_LIMIT_CHECKPOINTS}; @@ -29,7 +29,8 @@ impl ExtendedApiServer for ExtendedApi { limit: Option, descending_order: Option, ) -> RpcResult { - let limit = validate_limit(limit, QUERY_MAX_RESULT_LIMIT_CHECKPOINTS)?; + let limit = validate_limit(limit, QUERY_MAX_RESULT_LIMIT_CHECKPOINTS) + .map_err(IndexerError::from)?; let mut epochs = self .inner .get_epochs( @@ -60,10 +61,7 @@ impl ExtendedApiServer for ExtendedApi { _cursor: Option, _limit: Option, ) -> RpcResult { - Err(jsonrpsee::types::error::CallError::Custom( - jsonrpsee::types::error::ErrorCode::MethodNotFound.into(), - ) - .into()) + Err(jsonrpsee::types::error::ErrorCode::MethodNotFound.into()) } async fn get_total_transactions(&self) -> RpcResult> { diff --git a/crates/sui-indexer/src/apis/indexer_api.rs b/crates/sui-indexer/src/apis/indexer_api.rs index ed7beb304fdac..7911b8c131d5f 100644 --- a/crates/sui-indexer/src/apis/indexer_api.rs +++ b/crates/sui-indexer/src/apis/indexer_api.rs @@ -3,9 +3,8 @@ use async_trait::async_trait; use jsonrpsee::core::RpcResult; -use jsonrpsee::types::SubscriptionEmptyError; -use jsonrpsee::types::SubscriptionResult; -use jsonrpsee::{RpcModule, SubscriptionSink}; +use jsonrpsee::core::SubscriptionResult; +use jsonrpsee::{PendingSubscriptionSink, RpcModule}; use tap::TapFallible; use sui_json_rpc::name_service::{Domain, NameRecord, NameServiceConfig, NameServiceError}; @@ -67,7 +66,7 @@ impl IndexerApi { .collect::, _>>() .map_err(|e| { tracing::error!("Error joining object read futures."); - jsonrpsee::core::Error::Custom(format!("Error joining object read futures. {}", e)) + crate::errors::IndexerError::from(e) })? .into_iter() .collect::, _>>() @@ -119,9 +118,11 @@ impl IndexerApi { .await .into_iter() .collect::, _>>() - .map_err(|e: tokio::task::JoinError| anyhow::anyhow!(e))? + .map_err(|e: tokio::task::JoinError| anyhow::anyhow!(e)) + .map_err(IndexerError::from)? .into_iter() - .collect::, anyhow::Error>>()?; + .collect::, anyhow::Error>>() + .map_err(IndexerError::from)?; Ok(Page { data, @@ -168,8 +169,7 @@ impl IndexerApiServer for IndexerApi { limit + 1, descending_order.unwrap_or(false), ) - .await - .map_err(|e: IndexerError| anyhow::anyhow!(e))?; + .await?; let has_next_page = results.len() > limit; results.truncate(limit); @@ -254,7 +254,9 @@ impl IndexerApiServer for IndexerApi { | sui_types::object::ObjectRead::Deleted(_) => {} sui_types::object::ObjectRead::Exists(object_ref, o, layout) => { return Ok(SuiObjectResponse::new_with_data( - (object_ref, o, layout, options, None).try_into()?, + (object_ref, o, layout, options, None) + .try_into() + .map_err(IndexerError::from)?, )); } } @@ -274,7 +276,9 @@ impl IndexerApiServer for IndexerApi { | sui_types::object::ObjectRead::Deleted(_) => {} sui_types::object::ObjectRead::Exists(object_ref, o, layout) => { return Ok(SuiObjectResponse::new_with_data( - (object_ref, o, layout, options, None).try_into()?, + (object_ref, o, layout, options, None) + .try_into() + .map_err(IndexerError::from)?, )); } } @@ -284,16 +288,20 @@ impl IndexerApiServer for IndexerApi { )) } - fn subscribe_event(&self, _sink: SubscriptionSink, _filter: EventFilter) -> SubscriptionResult { - Err(SubscriptionEmptyError) + fn subscribe_event( + &self, + _sink: PendingSubscriptionSink, + _filter: EventFilter, + ) -> SubscriptionResult { + Err("disabled".into()) } fn subscribe_transaction( &self, - _sink: SubscriptionSink, + _sink: PendingSubscriptionSink, _filter: TransactionFilter, ) -> SubscriptionResult { - Err(SubscriptionEmptyError) + Err("disabled".into()) } async fn resolve_name_service_address(&self, name: String) -> RpcResult> { diff --git a/crates/sui-indexer/src/apis/read_api.rs b/crates/sui-indexer/src/apis/read_api.rs index 3e3de5343869d..f7f21726d707e 100644 --- a/crates/sui-indexer/src/apis/read_api.rs +++ b/crates/sui-indexer/src/apis/read_api.rs @@ -142,10 +142,7 @@ impl ReadApiServer for ReadApi { _version: SequenceNumber, _options: Option, ) -> RpcResult { - Err(jsonrpsee::types::error::CallError::Custom( - jsonrpsee::types::error::ErrorCode::MethodNotFound.into(), - ) - .into()) + Err(jsonrpsee::types::error::ErrorCode::MethodNotFound.into()) } async fn try_get_object_before_version( @@ -153,10 +150,7 @@ impl ReadApiServer for ReadApi { _: ObjectID, _: SequenceNumber, ) -> RpcResult { - Err(jsonrpsee::types::error::CallError::Custom( - jsonrpsee::types::error::ErrorCode::MethodNotFound.into(), - ) - .into()) + Err(jsonrpsee::types::error::ErrorCode::MethodNotFound.into()) } async fn try_multi_get_past_objects( @@ -164,10 +158,7 @@ impl ReadApiServer for ReadApi { _past_objects: Vec, _options: Option, ) -> RpcResult> { - Err(jsonrpsee::types::error::CallError::Custom( - jsonrpsee::types::error::ErrorCode::MethodNotFound.into(), - ) - .into()) + Err(jsonrpsee::types::error::ErrorCode::MethodNotFound.into()) } async fn get_latest_checkpoint_sequence_number(&self) -> RpcResult> { @@ -209,20 +200,6 @@ impl ReadApiServer for ReadApi { }) } - async fn get_checkpoints_deprecated_limit( - &self, - cursor: Option>, - limit: Option>, - descending_order: bool, - ) -> RpcResult { - self.get_checkpoints( - cursor, - limit.map(|l| l.into_inner() as usize), - descending_order, - ) - .await - } - async fn get_events(&self, transaction_digest: TransactionDigest) -> RpcResult> { self.inner .get_transaction_events(transaction_digest) @@ -282,7 +259,11 @@ async fn object_read_to_object_response( Ok(rendered_fields) => display_fields = Some(rendered_fields), Err(e) => { return Ok(SuiObjectResponse::new( - Some((object_ref, o, layout, options, None).try_into()?), + Some( + (object_ref, o, layout, options, None) + .try_into() + .map_err(IndexerError::from)?, + ), Some(SuiObjectResponseError::DisplayError { error: e.to_string(), }), @@ -291,7 +272,9 @@ async fn object_read_to_object_response( } } Ok(SuiObjectResponse::new_with_data( - (object_ref, o, layout, options, display_fields).try_into()?, + (object_ref, o, layout, options, display_fields) + .try_into() + .map_err(IndexerError::from)?, )) } ObjectRead::Deleted((object_id, version, digest)) => Ok(SuiObjectResponse::new_with_error( diff --git a/crates/sui-indexer/src/apis/write_api.rs b/crates/sui-indexer/src/apis/write_api.rs index 71a54c356635b..fdb9156e79b42 100644 --- a/crates/sui-indexer/src/apis/write_api.rs +++ b/crates/sui-indexer/src/apis/write_api.rs @@ -44,7 +44,8 @@ impl WriteApiServer for WriteApi { let sui_transaction_response = self .fullnode .execute_transaction_block(tx_bytes, signatures, options.clone(), request_type) - .await?; + .await + .map_err(crate::errors::client_error_to_error_object)?; Ok(SuiTransactionBlockResponseWithOptions { response: sui_transaction_response, options: options.unwrap_or_default(), @@ -69,13 +70,17 @@ impl WriteApiServer for WriteApi { additional_args, ) .await + .map_err(crate::errors::client_error_to_error_object) } async fn dry_run_transaction_block( &self, tx_bytes: Base64, ) -> RpcResult { - self.fullnode.dry_run_transaction_block(tx_bytes).await + self.fullnode + .dry_run_transaction_block(tx_bytes) + .await + .map_err(crate::errors::client_error_to_error_object) } } diff --git a/crates/sui-indexer/src/errors.rs b/crates/sui-indexer/src/errors.rs index c8971e39781ad..031f1709d7d64 100644 --- a/crates/sui-indexer/src/errors.rs +++ b/crates/sui-indexer/src/errors.rs @@ -2,8 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use fastcrypto::error::FastCryptoError; -use jsonrpsee::core::Error as RpcError; -use jsonrpsee::types::error::CallError; +use jsonrpsee::types::ErrorObjectOwned as RpcError; use sui_json_rpc::name_service::NameServiceError; use thiserror::Error; @@ -155,7 +154,11 @@ impl Context for Result { impl From for RpcError { fn from(e: IndexerError) -> Self { - RpcError::Call(CallError::Failed(e.into())) + RpcError::owned( + jsonrpsee::types::error::CALL_EXECUTION_FAILED_CODE, + e.to_string(), + None::<()>, + ) } } @@ -170,3 +173,16 @@ impl From for IndexerError { Self::PgPoolConnectionError(value.to_string()) } } + +pub(crate) fn client_error_to_error_object( + e: jsonrpsee::core::ClientError, +) -> jsonrpsee::types::ErrorObjectOwned { + match e { + jsonrpsee::core::ClientError::Call(e) => e, + _ => jsonrpsee::types::ErrorObjectOwned::owned( + jsonrpsee::types::error::UNKNOWN_ERROR_CODE, + e.to_string(), + None::<()>, + ), + } +} diff --git a/crates/sui-indexer/src/lib.rs b/crates/sui-indexer/src/lib.rs index 70a96047b7df3..6f058cd3d122e 100644 --- a/crates/sui-indexer/src/lib.rs +++ b/crates/sui-indexer/src/lib.rs @@ -81,8 +81,7 @@ fn get_http_client(rpc_client_url: &str) -> Result { headers.insert(CLIENT_SDK_TYPE_HEADER, HeaderValue::from_static("indexer")); HttpClientBuilder::default() - .max_request_body_size(2 << 30) - .max_concurrent_requests(usize::MAX) + .max_request_size(2 << 30) .set_headers(headers.clone()) .build(rpc_client_url) .map_err(|e| { diff --git a/crates/sui-json-rpc-api/src/indexer.rs b/crates/sui-json-rpc-api/src/indexer.rs index 93b1d8dc70fcc..416a93739aab1 100644 --- a/crates/sui-json-rpc-api/src/indexer.rs +++ b/crates/sui-json-rpc-api/src/indexer.rs @@ -1,7 +1,7 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use jsonrpsee::core::RpcResult; +use jsonrpsee::core::{RpcResult, SubscriptionResult}; use jsonrpsee::proc_macros::rpc; use sui_json_rpc_types::SuiTransactionBlockEffects; @@ -71,11 +71,11 @@ pub trait IndexerApi { &self, /// The filter criteria of the event stream. See [Event filter](https://docs.sui.io/build/event_api#event-filters) documentation for examples. filter: EventFilter, - ); + ) -> SubscriptionResult; /// Subscribe to a stream of Sui transaction effects #[subscription(name = "subscribeTransaction", item = SuiTransactionBlockEffects)] - fn subscribe_transaction(&self, filter: TransactionFilter); + fn subscribe_transaction(&self, filter: TransactionFilter) -> SubscriptionResult; /// Return the list of dynamic field objects owned by an object. #[method(name = "getDynamicFields")] diff --git a/crates/sui-json-rpc-api/src/read.rs b/crates/sui-json-rpc-api/src/read.rs index 533a61ec78492..927c7defa0a15 100644 --- a/crates/sui-json-rpc-api/src/read.rs +++ b/crates/sui-json-rpc-api/src/read.rs @@ -120,17 +120,6 @@ pub trait ReadApi { descending_order: bool, ) -> RpcResult; - #[method(name = "getCheckpoints", version <= "0.31")] - async fn get_checkpoints_deprecated_limit( - &self, - /// An optional paging cursor. If provided, the query will start from the next item after the specified cursor. Default to start from the first item if not specified. - cursor: Option>, - /// Maximum item returned per page, default to [QUERY_MAX_RESULT_LIMIT_CHECKPOINTS] if not specified. - limit: Option>, - /// query result ordering, default to false (ascending order), oldest record first. - descending_order: bool, - ) -> RpcResult; - /// Return transaction events. #[method(name = "getEvents")] async fn get_events( diff --git a/crates/sui-json-rpc-tests/tests/routing_tests.rs b/crates/sui-json-rpc-tests/tests/routing_tests.rs deleted file mode 100644 index c5bc6a23edae8..0000000000000 --- a/crates/sui-json-rpc-tests/tests/routing_tests.rs +++ /dev/null @@ -1,241 +0,0 @@ -// Copyright (c) Mysten Labs, Inc. -// SPDX-License-Identifier: Apache-2.0 - -use async_trait::async_trait; -use jsonrpsee::core::client::ClientT; -use jsonrpsee::core::RpcResult; -use jsonrpsee::http_client::HttpClientBuilder; -use jsonrpsee::http_client::{HeaderMap, HeaderValue}; -use jsonrpsee::proc_macros::rpc; -use jsonrpsee::rpc_params; -use jsonrpsee::RpcModule; -use prometheus::Registry; -use std::env; -use sui_config::local_ip_utils; -use sui_json_rpc::{JsonRpcServerBuilder, ServerType, SuiRpcModule}; -use sui_json_rpc_api::CLIENT_TARGET_API_VERSION_HEADER; -use sui_open_rpc::Module; -use sui_open_rpc_macros::open_rpc; - -#[tokio::test] -async fn test_rpc_backward_compatibility() { - let mut builder = JsonRpcServerBuilder::new("1.5", &Registry::new(), None, None); - builder.register_module(TestApiModule).unwrap(); - - let address = local_ip_utils::new_local_tcp_socket_for_testing(); - let _handle = builder - .start(address, None, ServerType::Http, None) - .await - .unwrap(); - let url = format!("http://0.0.0.0:{}", address.port()); - - // Test with un-versioned client - let client = HttpClientBuilder::default().build(&url).unwrap(); - let response: String = client.request("test_foo", rpc_params!(true)).await.unwrap(); - assert_eq!("Some string", response); - - // try to access old method directly should fail - let client = HttpClientBuilder::default().build(&url).unwrap(); - let response: RpcResult = client.request("test_foo_1_5", rpc_params!("string")).await; - assert!(response.is_err()); - - // Test with versioned client, version > backward compatible method version - let mut versioned_header = HeaderMap::new(); - versioned_header.insert( - CLIENT_TARGET_API_VERSION_HEADER, - HeaderValue::from_static("1.6"), - ); - let client_with_new_header = HttpClientBuilder::default() - .set_headers(versioned_header) - .build(&url) - .unwrap(); - - let response: String = client_with_new_header - .request("test_foo", rpc_params!(true)) - .await - .unwrap(); - assert_eq!("Some string", response); - - // Test with versioned client, version = backward compatible method version - let mut versioned_header = HeaderMap::new(); - versioned_header.insert( - CLIENT_TARGET_API_VERSION_HEADER, - HeaderValue::from_static("1.5"), - ); - let client_with_new_header = HttpClientBuilder::default() - .set_headers(versioned_header) - .build(&url) - .unwrap(); - - let response: String = client_with_new_header - .request( - "test_foo", - rpc_params!("old version expect string as input"), - ) - .await - .unwrap(); - assert_eq!("Some string from old method", response); - - // Test with versioned client, version < backward compatible method version - let mut versioned_header = HeaderMap::new(); - versioned_header.insert( - CLIENT_TARGET_API_VERSION_HEADER, - HeaderValue::from_static("1.4"), - ); - let client_with_new_header = HttpClientBuilder::default() - .set_headers(versioned_header) - .build(&url) - .unwrap(); - - let response: String = client_with_new_header - .request( - "test_foo", - rpc_params!("old version expect string as input"), - ) - .await - .unwrap(); - assert_eq!("Some string from old method", response); -} - -#[tokio::test] -async fn test_disable_routing() { - env::set_var("DISABLE_BACKWARD_COMPATIBILITY", "true"); - - let mut builder = JsonRpcServerBuilder::new("1.5", &Registry::new(), None, None); - builder.register_module(TestApiModule).unwrap(); - - let address = local_ip_utils::new_local_tcp_socket_for_testing(); - let _handle = builder - .start(address, None, ServerType::Http, None) - .await - .unwrap(); - let url = format!("http://0.0.0.0:{}", address.port()); - - // try to access old method directly should fail - let client = HttpClientBuilder::default().build(&url).unwrap(); - let response: RpcResult = client.request("test_foo_1_5", rpc_params!("string")).await; - assert!(response.is_err()); - - // Test with versioned client, version = backward compatible method version, should fail because routing is disabled. - let mut versioned_header = HeaderMap::new(); - versioned_header.insert( - CLIENT_TARGET_API_VERSION_HEADER, - HeaderValue::from_static("1.5"), - ); - let client_with_new_header = HttpClientBuilder::default() - .set_headers(versioned_header) - .build(&url) - .unwrap(); - - let response: RpcResult = client_with_new_header - .request( - "test_foo", - rpc_params!("old version expect string as input"), - ) - .await; - assert!(response.is_err()); -} - -// TODO(chris): clean up this after March 27th, 2023 -// #[tokio::test] -// async fn test_rpc_backward_compatibility_batched_request() { -// let mut builder = JsonRpcServerBuilder::new( -// "1.5", &Registry::new(), None, None, -// ); -// let mut builder = JsonRpcServerBuilder::new( -// "1.5", &Registry::new(), None, None, -// ); -// builder.register_module(TestApiModule).unwrap(); - -// let port = get_available_port("0.0.0.0"); -// let handle = builder -// .start(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port))) -// .await -// .unwrap(); -// let url = format!("http://0.0.0.0:{}", port); - -// // Test with un-versioned client -// let client = HttpClientBuilder::default().build(&url).unwrap(); - -// let mut builder = BatchRequestBuilder::default(); -// builder.insert("test_foo", rpc_params!(true)).unwrap(); -// builder.insert("test_foo", rpc_params!(true)).unwrap(); -// builder.insert("test_foo", rpc_params!(true)).unwrap(); - -// let response = client.batch_request::(builder).await.unwrap(); -// assert_eq!(3, response.num_successful_calls()); - -// // try to access old method directly should fail -// let mut builder = BatchRequestBuilder::default(); -// builder.insert("test_foo_1_5", rpc_params!(true)).unwrap(); -// builder.insert("test_foo", rpc_params!(true)).unwrap(); -// builder.insert("test_foo", rpc_params!(true)).unwrap(); - -// let response = client.batch_request::(builder).await.unwrap(); -// assert_eq!(2, response.num_successful_calls()); - -// // One malformed request shouldn't fail the whole batch -// let client = Client::new(); -// let response = client -// .post(format!("http://127.0.0.1:{}/", port)) -// .json(&vec![ -// json!(&Request { -// jsonrpc: Default::default(), -// id: Id::Number(1), -// method: "test_foo".into(), -// params: Some(&JsonRawValue::from_string("[true]".into()).unwrap()), -// }), -// json!("Bad json input"), -// ]) -// .send() -// .await -// .unwrap(); - -// let responses = response.text().await.unwrap(); -// let responses: Vec<&JsonRawValue> = serde_json::from_str(&responses).unwrap(); - -// // Should have 2 results -// assert_eq!(2, responses.len()); - -// // First response should success -// let response = serde_json::from_str::>(responses[0].get()); -// assert!(matches!(response, Ok(result) if result.result == "Some string")); - -// // Second response should fail -// let response = serde_json::from_str::(responses[1].get()); -// assert!(matches!(response, Ok(result) if result.error_object().message() == "Invalid request")); - -// handle.stop().unwrap() -// } - -#[open_rpc(namespace = "test")] -#[rpc(server, client, namespace = "test")] -trait TestApi { - #[method(name = "foo")] - async fn foo(&self, some_bool: bool) -> RpcResult; - - #[method(name = "foo", version <= "1.5")] - async fn bar(&self, some_str: String) -> RpcResult; -} - -struct TestApiModule; - -#[async_trait] -impl TestApiServer for TestApiModule { - async fn foo(&self, _some_bool: bool) -> RpcResult { - Ok("Some string".into()) - } - - async fn bar(&self, _some_str: String) -> RpcResult { - Ok("Some string from old method".into()) - } -} - -impl SuiRpcModule for TestApiModule { - fn rpc(self) -> RpcModule { - self.into_rpc() - } - fn rpc_doc_module() -> Module { - TestApiOpenRpc::module_doc() - } -} diff --git a/crates/sui-json-rpc-types/src/sui_extended.rs b/crates/sui-json-rpc-types/src/sui_extended.rs index 08f049f59c79f..5eb5cbc6a4f78 100644 --- a/crates/sui-json-rpc-types/src/sui_extended.rs +++ b/crates/sui-json-rpc-types/src/sui_extended.rs @@ -23,7 +23,7 @@ use crate::Page; pub type EpochPage = Page>; #[serde_as] -#[derive(Serialize, Deserialize, Debug, JsonSchema)] +#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)] #[serde(rename_all = "camelCase")] pub struct EpochInfo { /// epoch number @@ -59,7 +59,7 @@ impl EpochInfo { } #[serde_as] -#[derive(Serialize, Deserialize, Debug, JsonSchema)] +#[derive(Serialize, Deserialize, Debug, JsonSchema, Clone)] #[serde(rename_all = "camelCase")] pub struct EndOfEpochInfo { #[schemars(with = "BigInt")] @@ -105,7 +105,7 @@ pub struct EndOfEpochInfo { } #[serde_as] -#[derive(Serialize, Deserialize, Debug, JsonSchema)] +#[derive(Serialize, Deserialize, Debug, JsonSchema, Clone)] #[serde(rename_all = "camelCase")] pub struct MoveFunctionName { pub package: ObjectID, diff --git a/crates/sui-json-rpc-types/src/sui_move.rs b/crates/sui-json-rpc-types/src/sui_move.rs index dad8832588361..9849cd7cf859d 100644 --- a/crates/sui-json-rpc-types/src/sui_move.rs +++ b/crates/sui-json-rpc-types/src/sui_move.rs @@ -30,7 +30,7 @@ pub type SuiMoveTypeParameterIndex = u16; #[path = "unit_tests/sui_move_tests.rs"] mod sui_move_tests; -#[derive(Serialize, Deserialize, Debug, JsonSchema)] +#[derive(Serialize, Deserialize, Debug, JsonSchema, Clone)] pub enum SuiMoveAbility { Copy, Drop, @@ -38,33 +38,33 @@ pub enum SuiMoveAbility { Key, } -#[derive(Serialize, Deserialize, Debug, JsonSchema)] +#[derive(Serialize, Deserialize, Debug, JsonSchema, Clone)] pub struct SuiMoveAbilitySet { pub abilities: Vec, } -#[derive(Serialize, Deserialize, Debug, JsonSchema)] +#[derive(Serialize, Deserialize, Debug, JsonSchema, Clone)] pub enum SuiMoveVisibility { Private, Public, Friend, } -#[derive(Serialize, Deserialize, Debug, JsonSchema)] +#[derive(Serialize, Deserialize, Debug, JsonSchema, Clone)] #[serde(rename_all = "camelCase")] pub struct SuiMoveStructTypeParameter { pub constraints: SuiMoveAbilitySet, pub is_phantom: bool, } -#[derive(Serialize, Deserialize, Debug, JsonSchema)] +#[derive(Serialize, Deserialize, Debug, JsonSchema, Clone)] pub struct SuiMoveNormalizedField { pub name: String, #[serde(rename = "type")] pub type_: SuiMoveNormalizedType, } -#[derive(Serialize, Deserialize, Debug, JsonSchema)] +#[derive(Serialize, Deserialize, Debug, JsonSchema, Clone)] #[serde(rename_all = "camelCase")] pub struct SuiMoveNormalizedStruct { pub abilities: SuiMoveAbilitySet, @@ -72,7 +72,7 @@ pub struct SuiMoveNormalizedStruct { pub fields: Vec, } -#[derive(Serialize, Deserialize, Debug, JsonSchema)] +#[derive(Serialize, Deserialize, Debug, JsonSchema, Clone)] #[serde(rename_all = "camelCase")] pub struct SuiMoveNormalizedEnum { pub abilities: SuiMoveAbilitySet, @@ -80,7 +80,7 @@ pub struct SuiMoveNormalizedEnum { pub variants: BTreeMap>, } -#[derive(Serialize, Deserialize, Debug, JsonSchema)] +#[derive(Serialize, Deserialize, Debug, JsonSchema, Clone)] pub enum SuiMoveNormalizedType { Bool, U8, @@ -104,7 +104,7 @@ pub enum SuiMoveNormalizedType { MutableReference(Box), } -#[derive(Serialize, Deserialize, Debug, JsonSchema)] +#[derive(Serialize, Deserialize, Debug, JsonSchema, Clone)] #[serde(rename_all = "camelCase")] pub struct SuiMoveNormalizedFunction { pub visibility: SuiMoveVisibility, @@ -114,13 +114,13 @@ pub struct SuiMoveNormalizedFunction { pub return_: Vec, } -#[derive(Serialize, Deserialize, Debug, JsonSchema)] +#[derive(Serialize, Deserialize, Debug, JsonSchema, Clone)] pub struct SuiMoveModuleId { address: String, name: String, } -#[derive(Serialize, Deserialize, Debug, JsonSchema)] +#[derive(Serialize, Deserialize, Debug, JsonSchema, Clone)] #[serde(rename_all = "camelCase")] pub struct SuiMoveNormalizedModule { pub file_format_version: u32, @@ -325,14 +325,14 @@ impl From for SuiMoveAbilitySet { } } -#[derive(Serialize, Deserialize, Debug, JsonSchema)] +#[derive(Serialize, Deserialize, Debug, JsonSchema, Clone)] pub enum ObjectValueKind { ByImmutableReference, ByMutableReference, ByValue, } -#[derive(Serialize, Deserialize, Debug, JsonSchema)] +#[derive(Serialize, Deserialize, Debug, JsonSchema, Clone)] pub enum MoveFunctionArgType { Pure, Object(ObjectValueKind), diff --git a/crates/sui-json-rpc-types/src/sui_transaction.rs b/crates/sui-json-rpc-types/src/sui_transaction.rs index b1d790f351d3c..a932544bd7d2b 100644 --- a/crates/sui-json-rpc-types/src/sui_transaction.rs +++ b/crates/sui-json-rpc-types/src/sui_transaction.rs @@ -2137,21 +2137,21 @@ impl From for SuiTypeTag { } } -#[derive(Serialize, Deserialize, JsonSchema)] +#[derive(Serialize, Deserialize, JsonSchema, Clone)] #[serde(rename_all = "camelCase")] pub enum RPCTransactionRequestParams { TransferObjectRequestParams(TransferObjectParams), MoveCallRequestParams(MoveCallParams), } -#[derive(Serialize, Deserialize, JsonSchema)] +#[derive(Serialize, Deserialize, JsonSchema, Clone)] #[serde(rename_all = "camelCase")] pub struct TransferObjectParams { pub recipient: SuiAddress, pub object_id: ObjectID, } -#[derive(Serialize, Deserialize, JsonSchema)] +#[derive(Serialize, Deserialize, JsonSchema, Clone)] #[serde(rename_all = "camelCase")] pub struct MoveCallParams { pub package_object_id: ObjectID, @@ -2163,7 +2163,7 @@ pub struct MoveCallParams { } #[serde_as] -#[derive(Serialize, Deserialize, JsonSchema)] +#[derive(Serialize, Deserialize, JsonSchema, Clone)] #[serde(rename_all = "camelCase")] pub struct TransactionBlockBytes { /// BCS serialized transaction data bytes without its type tag, as base-64 encoded string. diff --git a/crates/sui-json-rpc/src/axum_router.rs b/crates/sui-json-rpc/src/axum_router.rs deleted file mode 100644 index 47ef61987938f..0000000000000 --- a/crates/sui-json-rpc/src/axum_router.rs +++ /dev/null @@ -1,576 +0,0 @@ -// Copyright (c) Mysten Labs, Inc. -// SPDX-License-Identifier: Apache-2.0 - -use std::net::IpAddr; -use std::time::SystemTime; -use std::{net::SocketAddr, sync::Arc}; -use sui_types::traffic_control::RemoteFirewallConfig; - -use axum::extract::{ConnectInfo, Json, State}; -use axum::response::Response; -use futures::StreamExt; -use hyper::header::HeaderValue; -use hyper::HeaderMap; -use jsonrpsee::core::server::helpers::BoundedSubscriptions; -use jsonrpsee::core::server::helpers::MethodResponse; -use jsonrpsee::core::server::helpers::MethodSink; -use jsonrpsee::core::server::rpc_module::MethodKind; -use jsonrpsee::server::logger::{self, TransportProtocol}; -use jsonrpsee::server::RandomIntegerIdProvider; -use jsonrpsee::types::error::{ErrorCode, BATCHES_NOT_SUPPORTED_CODE, BATCHES_NOT_SUPPORTED_MSG}; -use jsonrpsee::types::{ErrorObject, Id, InvalidRequest, Params, Request}; -use jsonrpsee::{core::server::rpc_module::Methods, server::logger::Logger}; -use serde_json::value::RawValue; -use sui_core::traffic_controller::{ - metrics::TrafficControllerMetrics, parse_ip, policies::TrafficTally, TrafficController, -}; -use sui_json_rpc_api::TRANSACTION_EXECUTION_CLIENT_ERROR_CODE; -use sui_types::traffic_control::ClientIdSource; -use sui_types::traffic_control::{PolicyConfig, Weight}; -use tracing::error; - -use crate::routing_layer::RpcRouter; -use sui_json_rpc_api::CLIENT_TARGET_API_VERSION_HEADER; - -pub const MAX_RESPONSE_SIZE: u32 = 2 << 30; -const TOO_MANY_REQUESTS_MSG: &str = "Too many requests"; - -#[derive(Clone, Debug)] -pub struct JsonRpcService { - logger: L, - - id_provider: Arc, - - /// Registered server methods. - methods: Methods, - rpc_router: RpcRouter, - traffic_controller: Option>, - client_id_source: Option, -} - -impl JsonRpcService { - pub fn new( - methods: Methods, - rpc_router: RpcRouter, - logger: L, - remote_fw_config: Option, - policy_config: Option, - traffic_controller_metrics: TrafficControllerMetrics, - ) -> Self { - Self { - methods, - rpc_router, - logger, - id_provider: Arc::new(RandomIntegerIdProvider), - traffic_controller: policy_config.clone().map(|policy| { - Arc::new(TrafficController::init( - policy, - traffic_controller_metrics, - remote_fw_config, - )) - }), - client_id_source: policy_config.map(|policy| policy.client_id_source), - } - } -} - -impl JsonRpcService { - fn call_data(&self) -> CallData<'_, L> { - CallData { - logger: &self.logger, - methods: &self.methods, - rpc_router: &self.rpc_router, - max_response_body_size: MAX_RESPONSE_SIZE, - request_start: self.logger.on_request(TransportProtocol::Http), - } - } - - fn ws_call_data<'c, 'a: 'c, 'b: 'c>( - &'a self, - bounded_subscriptions: BoundedSubscriptions, - sink: &'b MethodSink, - ) -> ws::WsCallData<'c, L> { - ws::WsCallData { - logger: &self.logger, - methods: &self.methods, - max_response_body_size: MAX_RESPONSE_SIZE, - request_start: self.logger.on_request(TransportProtocol::Http), - bounded_subscriptions, - id_provider: &*self.id_provider, - sink, - } - } -} - -/// Create a response body. -fn from_template>( - status: hyper::StatusCode, - body: S, - content_type: &'static str, -) -> Response { - Response::builder() - .status(status) - .header( - "content-type", - hyper::header::HeaderValue::from_static(content_type), - ) - .body(body.into()) - // Parsing `StatusCode` and `HeaderValue` is infalliable but - // parsing body content is not. - .expect("Unable to parse response body for type conversion") -} - -/// Create a valid JSON response. -pub(crate) fn ok_response(body: String) -> Response { - const JSON: &str = "application/json; charset=utf-8"; - from_template(hyper::StatusCode::OK, body, JSON) -} - -pub async fn json_rpc_handler( - ConnectInfo(client_addr): ConnectInfo, - State(service): State>, - headers: HeaderMap, - Json(raw_request): Json>, -) -> impl axum::response::IntoResponse { - let headers_clone = headers.clone(); - // Get version from header. - let api_version = headers - .get(CLIENT_TARGET_API_VERSION_HEADER) - .and_then(|h| h.to_str().ok()); - let response = process_raw_request( - &service, - api_version, - raw_request.get(), - client_addr, - headers_clone, - ) - .await; - - ok_response(response.result) -} - -async fn process_raw_request( - service: &JsonRpcService, - api_version: Option<&str>, - raw_request: &str, - client_addr: SocketAddr, - headers: HeaderMap, -) -> MethodResponse { - let client = match service.client_id_source { - Some(ClientIdSource::SocketAddr) => Some(client_addr.ip()), - Some(ClientIdSource::XForwardedFor(num_hops)) => { - let do_header_parse = |header: &HeaderValue| match header.to_str() { - Ok(header_val) => { - let header_contents = header_val.split(',').map(str::trim).collect::>(); - if num_hops == 0 { - error!( - "x-forwarded-for: 0 specified. x-forwarded-for contents: {:?}. Please assign nonzero value for \ - number of hops here, or use `socket-addr` client-id-source type if requests are not being proxied \ - to this node. Skipping traffic controller request handling.", - header_contents, - ); - return None; - } - let contents_len = header_contents.len(); - let Some(client_ip) = header_contents.get(contents_len - num_hops) else { - error!( - "x-forwarded-for header value of {:?} contains {} values, but {} hops were specificed. \ - Expected {} values. Skipping traffic controller request handling.", - header_contents, - contents_len, - num_hops, - num_hops + 1, - ); - return None; - }; - parse_ip(client_ip) - } - Err(e) => { - error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e); - None - } - }; - if let Some(header) = headers.get("x-forwarded-for") { - do_header_parse(header) - } else if let Some(header) = headers.get("X-Forwarded-For") { - do_header_parse(header) - } else { - error!("x-forwarded-for header not present for request despite node configuring x-forwarded-for tracking type"); - None - } - } - None => None, - }; - if let Ok(request) = serde_json::from_str::(raw_request) { - // check if either IP is blocked, in which case return early - if let Some(traffic_controller) = &service.traffic_controller { - if let Err(blocked_response) = - handle_traffic_req(traffic_controller.clone(), &client).await - { - return blocked_response; - } - } - - // handle response tallying - let response = process_request(request, api_version, service.call_data()).await; - if let Some(traffic_controller) = &service.traffic_controller { - handle_traffic_resp(traffic_controller.clone(), client, &response); - } - - response - } else if let Ok(_batch) = serde_json::from_str::>(raw_request) { - MethodResponse::error( - Id::Null, - ErrorObject::borrowed(BATCHES_NOT_SUPPORTED_CODE, &BATCHES_NOT_SUPPORTED_MSG, None), - ) - } else { - let (id, code) = prepare_error(raw_request); - MethodResponse::error(id, ErrorObject::from(code)) - } -} - -async fn handle_traffic_req( - traffic_controller: Arc, - client: &Option, -) -> Result<(), MethodResponse> { - if !traffic_controller.check(client, &None).await { - // Entity in blocklist - let err_obj = - ErrorObject::borrowed(ErrorCode::ServerIsBusy.code(), &TOO_MANY_REQUESTS_MSG, None); - Err(MethodResponse::error(Id::Null, err_obj)) - } else { - Ok(()) - } -} - -fn handle_traffic_resp( - traffic_controller: Arc, - client: Option, - response: &MethodResponse, -) { - let error = response.error_code.map(ErrorCode::from); - traffic_controller.tally(TrafficTally { - direct: client, - through_fullnode: None, - error_info: error.map(|e| { - let error_type = e.to_string(); - let error_weight = normalize(e); - (error_weight, error_type) - }), - // For now, count everything as spam with equal weight - // on the rpc node side, including gas-charging endpoints - // such as `sui_executeTransactionBlock`, as this can enable - // node operators who wish to rate limit their transcation - // traffic and incentivize high volume clients to choose a - // suitable rpc provider (or run their own). Later we may want - // to provide a weight distribution based on the method being called. - spam_weight: Weight::one(), - timestamp: SystemTime::now(), - }); -} - -// TODO: refine error matching here -fn normalize(err: ErrorCode) -> Weight { - match err { - ErrorCode::InvalidRequest | ErrorCode::InvalidParams => Weight::one(), - // e.g. invalid client signature - ErrorCode::ServerError(i) if i == TRANSACTION_EXECUTION_CLIENT_ERROR_CODE => Weight::one(), - _ => Weight::zero(), - } -} - -async fn process_request( - req: Request<'_>, - api_version: Option<&str>, - call: CallData<'_, L>, -) -> MethodResponse { - let CallData { - methods, - rpc_router, - logger, - max_response_body_size, - request_start, - } = call; - let conn_id = 0; // unused - - let name = rpc_router.route(&req.method, api_version); - let raw_params: Option<&RawValue> = req.params; - let params = Params::new(raw_params.map(|params| params.get())); - - let id = req.id; - - let response = match methods.method_with_name(name) { - None => { - logger.on_call( - name, - params.clone(), - logger::MethodKind::Unknown, - TransportProtocol::Http, - ); - MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound)) - } - Some((name, method)) => match method.inner() { - MethodKind::Sync(callback) => { - logger.on_call( - name, - params.clone(), - logger::MethodKind::MethodCall, - TransportProtocol::Http, - ); - (callback)(id, params, max_response_body_size as usize) - } - MethodKind::Async(callback) => { - logger.on_call( - name, - params.clone(), - logger::MethodKind::MethodCall, - TransportProtocol::Http, - ); - - let id = id.into_owned(); - let params = params.into_owned(); - (callback)(id, params, conn_id, max_response_body_size as usize, None).await - } - MethodKind::Subscription(_) | MethodKind::Unsubscription(_) => { - logger.on_call( - name, - params.clone(), - logger::MethodKind::Unknown, - TransportProtocol::Http, - ); - // Subscriptions not supported on HTTP - MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError)) - } - }, - }; - - logger.on_result( - name, - response.success, - response.error_code, - request_start, - TransportProtocol::Http, - ); - response -} - -/// Figure out if this is a sufficiently complete request that we can extract an [`Id`] out of, or just plain -/// unparsable garbage. -pub fn prepare_error(data: &str) -> (Id<'_>, ErrorCode) { - match serde_json::from_str::(data) { - Ok(InvalidRequest { id }) => (id, ErrorCode::InvalidRequest), - Err(_) => (Id::Null, ErrorCode::ParseError), - } -} - -#[derive(Debug, Clone)] -pub(crate) struct CallData<'a, L: Logger> { - logger: &'a L, - methods: &'a Methods, - rpc_router: &'a RpcRouter, - max_response_body_size: u32, - request_start: L::Instant, -} - -pub mod ws { - use axum::{ - extract::{ - ws::{Message, WebSocket}, - WebSocketUpgrade, - }, - response::Response, - }; - use futures::channel::mpsc; - use jsonrpsee::{ - core::server::{ - helpers::{BoundedSubscriptions, MethodSink}, - rpc_module::ConnState, - }, - server::IdProvider, - types::error::reject_too_many_subscriptions, - }; - - use super::*; - - #[derive(Debug, Clone)] - pub(crate) struct WsCallData<'a, L: Logger> { - pub bounded_subscriptions: BoundedSubscriptions, - pub id_provider: &'a dyn IdProvider, - pub methods: &'a Methods, - pub max_response_body_size: u32, - pub sink: &'a MethodSink, - pub logger: &'a L, - pub request_start: L::Instant, - } - - // A WebSocket handler that echos any message it receives. - // - // This one we'll be integration testing so it can be written in the regular way. - pub async fn ws_json_rpc_upgrade( - ws: WebSocketUpgrade, - State(service): State>, - ) -> Response { - ws.on_upgrade(|ws| ws_json_rpc_handler(ws, service)) - } - - async fn ws_json_rpc_handler(mut socket: WebSocket, service: JsonRpcService) { - #[allow(clippy::disallowed_methods)] - let (tx, mut rx) = mpsc::unbounded::(); - let sink = MethodSink::new_with_limit(tx, MAX_RESPONSE_SIZE, MAX_RESPONSE_SIZE); - let bounded_subscriptions = BoundedSubscriptions::new(100); - - loop { - tokio::select! { - maybe_message = socket.recv() => { - if let Some(Ok(message)) = maybe_message { - if let Message::Text(msg) = message { - let response = - process_raw_request(&service, &msg, bounded_subscriptions.clone(), &sink).await; - if let Some(response) = response { - let _ = sink.send_raw(response.result); - } - } - } else { - break; - } - }, - Some(response) = rx.next() => { - if socket.send(Message::Text(response)).await.is_err() { - break; - } - }, - } - } - } - - async fn process_raw_request( - service: &JsonRpcService, - raw_request: &str, - bounded_subscriptions: BoundedSubscriptions, - sink: &MethodSink, - ) -> Option { - if let Ok(request) = serde_json::from_str::(raw_request) { - process_request(request, service.ws_call_data(bounded_subscriptions, sink)).await - } else if let Ok(_batch) = serde_json::from_str::>(raw_request) { - Some(MethodResponse::error( - Id::Null, - ErrorObject::borrowed(BATCHES_NOT_SUPPORTED_CODE, &BATCHES_NOT_SUPPORTED_MSG, None), - )) - } else { - let (id, code) = prepare_error(raw_request); - Some(MethodResponse::error(id, ErrorObject::from(code))) - } - } - - async fn process_request( - req: Request<'_>, - call: WsCallData<'_, L>, - ) -> Option { - let WsCallData { - methods, - logger, - max_response_body_size, - request_start, - bounded_subscriptions, - id_provider, - sink, - } = call; - let conn_id = 0; // unused - - let params = Params::new(req.params.map(|params| params.get())); - let name = &req.method; - let id = req.id; - - let response = match methods.method_with_name(name) { - None => { - logger.on_call( - name, - params.clone(), - logger::MethodKind::Unknown, - TransportProtocol::Http, - ); - Some(MethodResponse::error( - id, - ErrorObject::from(ErrorCode::MethodNotFound), - )) - } - Some((name, method)) => match method.inner() { - MethodKind::Sync(callback) => { - logger.on_call( - name, - params.clone(), - logger::MethodKind::MethodCall, - TransportProtocol::Http, - ); - Some((callback)(id, params, max_response_body_size as usize)) - } - MethodKind::Async(callback) => { - logger.on_call( - name, - params.clone(), - logger::MethodKind::MethodCall, - TransportProtocol::Http, - ); - - let id = id.into_owned(); - let params = params.into_owned(); - - Some( - (callback)(id, params, conn_id, max_response_body_size as usize, None) - .await, - ) - } - - MethodKind::Subscription(callback) => { - logger.on_call( - name, - params.clone(), - logger::MethodKind::Subscription, - TransportProtocol::WebSocket, - ); - if let Some(cn) = bounded_subscriptions.acquire() { - let conn_state = ConnState { - conn_id, - close_notify: cn, - id_provider, - }; - callback(id.clone(), params, sink.clone(), conn_state, None).await; - None - } else { - Some(MethodResponse::error( - id, - reject_too_many_subscriptions(bounded_subscriptions.max()), - )) - } - } - - MethodKind::Unsubscription(callback) => { - logger.on_call( - name, - params.clone(), - logger::MethodKind::Unsubscription, - TransportProtocol::WebSocket, - ); - - Some(callback( - id, - params, - conn_id, - max_response_body_size as usize, - )) - } - }, - }; - - if let Some(response) = &response { - logger.on_result( - name, - response.success, - response.error_code, - request_start, - TransportProtocol::WebSocket, - ); - } - response - } -} diff --git a/crates/sui-json-rpc/src/coin_api.rs b/crates/sui-json-rpc/src/coin_api.rs index 3ca4cda4f5124..41afeb5341fd5 100644 --- a/crates/sui-json-rpc/src/coin_api.rs +++ b/crates/sui-json-rpc/src/coin_api.rs @@ -422,7 +422,6 @@ mod tests { use super::*; use crate::authority_state::{MockStateRead, StateReadError}; use expect_test::expect; - use jsonrpsee::types::ErrorObjectOwned; use mockall::mock; use mockall::predicate; use move_core_types::account_address::AccountAddress; @@ -595,7 +594,6 @@ mod tests { mod get_coins_tests { use super::super::*; use super::*; - use jsonrpsee::types::ErrorObjectOwned; // Success scenarios #[tokio::test] @@ -775,8 +773,7 @@ mod tests { .await; assert!(response.is_err()); - let error_result = response.unwrap_err(); - let error_object: ErrorObjectOwned = error_result.into(); + let error_object = response.unwrap_err(); let expected = expect!["-32602"]; expected.assert_eq(&error_object.code().to_string()); let expected = expect!["Invalid struct type: 0x2::invalid::struct::tag. Got error: Expected end of token stream. Got: ::"]; @@ -794,8 +791,7 @@ mod tests { .await; assert!(response.is_err()); - let error_result = response.unwrap_err(); - let error_object: ErrorObjectOwned = error_result.into(); + let error_object = response.unwrap_err(); let expected = expect!["-32602"]; expected.assert_eq(&error_object.code().to_string()); let expected = @@ -822,8 +818,7 @@ mod tests { .await; assert!(response.is_err()); - let error_result = response.unwrap_err(); - let error_object: ErrorObjectOwned = error_result.into(); + let error_object = response.unwrap_err(); assert_eq!( error_object.code(), jsonrpsee::types::error::INVALID_PARAMS_CODE @@ -848,8 +843,7 @@ mod tests { .await; assert!(response.is_err()); - let error_result = response.unwrap_err(); - let error_object: ErrorObjectOwned = error_result.into(); + let error_object = response.unwrap_err(); assert_eq!( error_object.code(), jsonrpsee::types::error::INTERNAL_ERROR_CODE @@ -960,8 +954,7 @@ mod tests { .await; assert!(response.is_err()); - let error_result = response.unwrap_err(); - let error_object: ErrorObjectOwned = error_result.into(); + let error_object = response.unwrap_err(); assert_eq!(error_object.code(), -32602); let expected = expect!["-32602"]; expected.assert_eq(&error_object.code().to_string()); @@ -982,8 +975,7 @@ mod tests { .await; assert!(response.is_err()); - let error_result = response.unwrap_err(); - let error_object: ErrorObjectOwned = error_result.into(); + let error_object = response.unwrap_err(); let expected = expect!["-32602"]; expected.assert_eq(&error_object.code().to_string()); let expected = expect!["cursor not found"]; @@ -994,7 +986,6 @@ mod tests { mod get_balance_tests { use super::super::*; use super::*; - use jsonrpsee::types::ErrorObjectOwned; // Success scenarios #[tokio::test] async fn test_gas_coin() { @@ -1078,8 +1069,7 @@ mod tests { .await; assert!(response.is_err()); - let error_result = response.unwrap_err(); - let error_object: ErrorObjectOwned = error_result.into(); + let error_object = response.unwrap_err(); let expected = expect!["-32602"]; expected.assert_eq(&error_object.code().to_string()); let expected = expect!["Invalid struct type: 0x2::invalid::struct::tag. Got error: Expected end of token stream. Got: ::"]; @@ -1103,8 +1093,7 @@ mod tests { .await; assert!(response.is_err()); - let error_result = response.unwrap_err(); - let error_object: ErrorObjectOwned = error_result.into(); + let error_object = response.unwrap_err(); assert_eq!( error_object.code(), jsonrpsee::types::error::INVALID_PARAMS_CODE @@ -1128,8 +1117,7 @@ mod tests { .await; assert!(response.is_err()); - let error_result = response.unwrap_err(); - let error_object: ErrorObjectOwned = error_result.into(); + let error_object = response.unwrap_err(); assert_eq!( error_object.code(), @@ -1143,7 +1131,6 @@ mod tests { mod get_all_balances_tests { use super::super::*; use super::*; - use jsonrpsee::types::ErrorObjectOwned; // Success scenarios #[tokio::test] @@ -1219,8 +1206,7 @@ mod tests { let response = coin_read_api.get_all_balances(owner).await; assert!(response.is_err()); - let error_result = response.unwrap_err(); - let error_object: ErrorObjectOwned = error_result.into(); + let error_object = response.unwrap_err(); assert_eq!( error_object.code(), jsonrpsee::types::error::INVALID_PARAMS_CODE @@ -1404,8 +1390,7 @@ mod tests { let response = coin_read_api.get_total_supply(coin_name.clone()).await; assert!(response.is_err()); - let error_result = response.unwrap_err(); - let error_object: ErrorObjectOwned = error_result.into(); + let error_object = response.unwrap_err(); let expected = expect!["-32000"]; expected.assert_eq(&error_object.code().to_string()); let expected = expect!["task 1 panicked"]; @@ -1444,8 +1429,7 @@ mod tests { }; let response = coin_read_api.get_total_supply(coin_name.clone()).await; - let error_result = response.unwrap_err(); - let error_object: ErrorObjectOwned = error_result.into(); + let error_object = response.unwrap_err(); assert_eq!( error_object.code(), jsonrpsee::types::error::CALL_EXECUTION_FAILED_CODE diff --git a/crates/sui-json-rpc/src/error.rs b/crates/sui-json-rpc/src/error.rs index d4abdb05dfe06..681ca63e6c2a8 100644 --- a/crates/sui-json-rpc/src/error.rs +++ b/crates/sui-json-rpc/src/error.rs @@ -6,9 +6,9 @@ use crate::name_service::NameServiceError; use fastcrypto::error::FastCryptoError; use hyper::header::InvalidHeaderValue; use itertools::Itertools; -use jsonrpsee::core::Error as RpcError; -use jsonrpsee::types::error::{CallError, INTERNAL_ERROR_CODE}; -use jsonrpsee::types::ErrorObject; +use jsonrpsee::core::ClientError as RpcError; +use jsonrpsee::types::error::INTERNAL_ERROR_CODE; +use jsonrpsee::types::{ErrorObject, ErrorObjectOwned}; use std::collections::BTreeMap; use sui_json_rpc_api::{TRANSACTION_EXECUTION_CLIENT_ERROR_CODE, TRANSIENT_ERROR_CODE}; use sui_types::committee::{QUORUM_THRESHOLD, TOTAL_VOTING_POWER}; @@ -33,7 +33,13 @@ pub enum Error { UnexpectedError(String), #[error(transparent)] - RPCServerError(#[from] jsonrpsee::core::Error), + RPCServerError(#[from] jsonrpsee::core::ClientError), + + #[error(transparent)] + RPCError(#[from] jsonrpsee::types::ErrorObjectOwned), + + #[error(transparent)] + RegisterMethodError(#[from] jsonrpsee::server::RegisterMethodError), #[error(transparent)] InvalidHeaderValue(#[from] InvalidHeaderValue), @@ -84,20 +90,34 @@ impl From for Error { } } -impl From for RpcError { +fn invalid_params(e: E) -> ErrorObjectOwned { + ErrorObject::owned( + jsonrpsee::types::error::ErrorCode::InvalidParams.code(), + e.to_string(), + None::<()>, + ) +} + +fn failed(e: E) -> ErrorObjectOwned { + ErrorObject::owned( + jsonrpsee::types::error::CALL_EXECUTION_FAILED_CODE, + e.to_string(), + None::<()>, + ) +} + +impl From for ErrorObjectOwned { /// `InvalidParams`/`INVALID_PARAMS_CODE` for client errors. - fn from(e: Error) -> RpcError { + fn from(e: Error) -> ErrorObjectOwned { match e { - Error::UserInputError(_) => RpcError::Call(CallError::InvalidParams(e.into())), - Error::UnsupportedFeature(_) => RpcError::Call(CallError::InvalidParams(e.into())), + Error::UserInputError(_) => invalid_params(e), + Error::UnsupportedFeature(_) => invalid_params(e), Error::SuiObjectResponseError(err) => match err { SuiObjectResponseError::NotExists { .. } | SuiObjectResponseError::DynamicFieldNotFound { .. } | SuiObjectResponseError::Deleted { .. } - | SuiObjectResponseError::DisplayError { .. } => { - RpcError::Call(CallError::InvalidParams(err.into())) - } - _ => RpcError::Call(CallError::Failed(err.into())), + | SuiObjectResponseError::DisplayError { .. } => invalid_params(err), + _ => failed(err), }, Error::NameServiceError(err) => match err { NameServiceError::ExceedsMaxLength { .. } @@ -105,54 +125,43 @@ impl From for RpcError { | NameServiceError::InvalidLength { .. } | NameServiceError::InvalidUnderscore { .. } | NameServiceError::LabelsEmpty { .. } - | NameServiceError::InvalidSeparator { .. } => { - RpcError::Call(CallError::InvalidParams(err.into())) - } - _ => RpcError::Call(CallError::Failed(err.into())), + | NameServiceError::InvalidSeparator { .. } => invalid_params(err), + _ => failed(err), }, - Error::SuiRpcInputError(err) => RpcError::Call(CallError::InvalidParams(err.into())), + Error::SuiRpcInputError(err) => invalid_params(err), Error::SuiError(sui_error) => match sui_error { SuiError::TransactionNotFound { .. } | SuiError::TransactionsNotFound { .. } - | SuiError::TransactionEventsNotFound { .. } => { - RpcError::Call(CallError::InvalidParams(sui_error.into())) - } - _ => RpcError::Call(CallError::Failed(sui_error.into())), + | SuiError::TransactionEventsNotFound { .. } => invalid_params(sui_error), + _ => failed(sui_error), }, Error::StateReadError(err) => match err { - StateReadError::Client(_) => RpcError::Call(CallError::InvalidParams(err.into())), - _ => { - let error_object = ErrorObject::owned( - jsonrpsee::types::error::INTERNAL_ERROR_CODE, - err.to_string(), - None::<()>, - ); - RpcError::Call(CallError::Custom(error_object)) - } + StateReadError::Client(_) => invalid_params(err), + _ => ErrorObject::owned( + jsonrpsee::types::error::INTERNAL_ERROR_CODE, + err.to_string(), + None::<()>, + ), }, Error::QuorumDriverError(err) => { match err { QuorumDriverError::InvalidUserSignature(err) => { - let error_object = ErrorObject::owned( + ErrorObject::owned( TRANSACTION_EXECUTION_CLIENT_ERROR_CODE, format!("Invalid user signature: {err}"), None::<()>, - ); - RpcError::Call(CallError::Custom(error_object)) + ) } QuorumDriverError::TxAlreadyFinalizedWithDifferentUserSignatures => { - let error_object = ErrorObject::owned( + ErrorObject::owned( TRANSACTION_EXECUTION_CLIENT_ERROR_CODE, "The transaction is already finalized but with different user signatures", None::<()>, - ); - RpcError::Call(CallError::Custom(error_object)) + ) } QuorumDriverError::TimeoutBeforeFinality | QuorumDriverError::FailedWithTransientErrorAfterMaximumAttempts { .. } => { - let error_object = - ErrorObject::owned(TRANSIENT_ERROR_CODE, err.to_string(), None::<()>); - RpcError::Call(CallError::Custom(error_object)) + ErrorObject::owned(TRANSIENT_ERROR_CODE, err.to_string(), None::<()>) } QuorumDriverError::ObjectsDoubleUsed { conflicting_txes, @@ -205,12 +214,11 @@ impl From for RpcError { }) .collect::>>(); - let error_object = ErrorObject::owned( + ErrorObject::owned( TRANSACTION_EXECUTION_CLIENT_ERROR_CODE, error_message, Some(new_map), - ); - RpcError::Call(CallError::Custom(error_object)) + ) } QuorumDriverError::NonRecoverableTransactionError { errors } => { let new_errors: Vec = errors @@ -254,30 +262,26 @@ impl From for RpcError { let error_msg = format!("Transaction validator signing failed due to issues with transaction inputs, please review the errors and try again:\n{}", error_list.join("\n")); - let error_object = ErrorObject::owned( + ErrorObject::owned( TRANSACTION_EXECUTION_CLIENT_ERROR_CODE, error_msg, None::<()>, - ); - RpcError::Call(CallError::Custom(error_object)) + ) } QuorumDriverError::QuorumDriverInternalError(_) => { - let error_object = ErrorObject::owned( + ErrorObject::owned( INTERNAL_ERROR_CODE, "Internal error occurred while executing transaction.", None::<()>, - ); - RpcError::Call(CallError::Custom(error_object)) + ) } QuorumDriverError::SystemOverload { .. } | QuorumDriverError::SystemOverloadRetryAfter { .. } => { - let error_object = - ErrorObject::owned(TRANSIENT_ERROR_CODE, err.to_string(), None::<()>); - RpcError::Call(CallError::Custom(error_object)) + ErrorObject::owned(TRANSIENT_ERROR_CODE, err.to_string(), None::<()>) } } } - _ => RpcError::Call(CallError::Failed(e.into())), + _ => failed(e), } } } @@ -323,7 +327,13 @@ pub enum SuiRpcInputError { impl From for RpcError { fn from(e: SuiRpcInputError) -> Self { - RpcError::Call(CallError::InvalidParams(e.into())) + RpcError::Call(invalid_params(e)) + } +} + +impl From for ErrorObjectOwned { + fn from(e: SuiRpcInputError) -> Self { + invalid_params(e) } } @@ -360,9 +370,8 @@ mod tests { error: "Test inner invalid signature".to_string(), }); - let rpc_error: RpcError = Error::QuorumDriverError(quorum_driver_error).into(); - - let error_object: ErrorObjectOwned = rpc_error.into(); + let error_object: ErrorObjectOwned = + Error::QuorumDriverError(quorum_driver_error).into(); let expected_code = expect!["-32002"]; expected_code.assert_eq(&error_object.code().to_string()); let expected_message = expect![ @@ -375,9 +384,8 @@ mod tests { fn test_timeout_before_finality() { let quorum_driver_error = QuorumDriverError::TimeoutBeforeFinality; - let rpc_error: RpcError = Error::QuorumDriverError(quorum_driver_error).into(); - - let error_object: ErrorObjectOwned = rpc_error.into(); + let error_object: ErrorObjectOwned = + Error::QuorumDriverError(quorum_driver_error).into(); let expected_code = expect!["-32050"]; expected_code.assert_eq(&error_object.code().to_string()); let expected_message = expect!["Transaction timed out before reaching finality"]; @@ -391,9 +399,8 @@ mod tests { total_attempts: 10, }; - let rpc_error: RpcError = Error::QuorumDriverError(quorum_driver_error).into(); - - let error_object: ErrorObjectOwned = rpc_error.into(); + let error_object: ErrorObjectOwned = + Error::QuorumDriverError(quorum_driver_error).into(); let expected_code = expect!["-32050"]; expected_code.assert_eq(&error_object.code().to_string()); let expected_message = expect![ @@ -428,9 +435,8 @@ mod tests { retried_tx_status: Some((TransactionDigest::from([1; 32]), true)), }; - let rpc_error: RpcError = Error::QuorumDriverError(quorum_driver_error).into(); - - let error_object: ErrorObjectOwned = rpc_error.into(); + let error_object: ErrorObjectOwned = + Error::QuorumDriverError(quorum_driver_error).into(); let expected_code = expect!["-32002"]; expected_code.assert_eq(&error_object.code().to_string()); let expected_message = expect!["Failed to sign transaction by a quorum of validators because one or more of its objects is reserved for another transaction. Retried transaction 4vJ9JU1bJJE96FWSJKvHsmmFADCg4gpZQff4P3bkLKi (succeeded) because it was able to gather the necessary votes. Other transactions locking these objects:\n- 4vJ9JU1bJJE96FWSJKvHsmmFADCg4gpZQff4P3bkLKi (stake 80.0)\n- 8qbHbw2BbbTHBW1sbeqakYXVKRQM8Ne7pLK7m6CVfeR (stake 5.0)"]; @@ -469,9 +475,8 @@ mod tests { retried_tx_status: None, }; - let rpc_error: RpcError = Error::QuorumDriverError(quorum_driver_error).into(); - - let error_object: ErrorObjectOwned = rpc_error.into(); + let error_object: ErrorObjectOwned = + Error::QuorumDriverError(quorum_driver_error).into(); let expected_code = expect!["-32002"]; expected_code.assert_eq(&error_object.code().to_string()); let expected_message = expect!["Failed to sign transaction by a quorum of validators because one or more of its objects is equivocated until the next epoch. Other transactions locking these objects:\n- 8qbHbw2BbbTHBW1sbeqakYXVKRQM8Ne7pLK7m6CVfeR (stake 50.0)\n- 4vJ9JU1bJJE96FWSJKvHsmmFADCg4gpZQff4P3bkLKi (stake 40.0)"]; @@ -510,9 +515,8 @@ mod tests { ], }; - let rpc_error: RpcError = Error::QuorumDriverError(quorum_driver_error).into(); - - let error_object: ErrorObjectOwned = rpc_error.into(); + let error_object: ErrorObjectOwned = + Error::QuorumDriverError(quorum_driver_error).into(); let expected_code = expect!["-32002"]; expected_code.assert_eq(&error_object.code().to_string()); let expected_message = @@ -542,9 +546,8 @@ mod tests { ], }; - let rpc_error: RpcError = Error::QuorumDriverError(quorum_driver_error).into(); - - let error_object: ErrorObjectOwned = rpc_error.into(); + let error_object: ErrorObjectOwned = + Error::QuorumDriverError(quorum_driver_error).into(); let expected_code = expect!["-32002"]; expected_code.assert_eq(&error_object.code().to_string()); let expected_message = @@ -558,9 +561,8 @@ mod tests { SuiError::UnexpectedMessage("test".to_string()), ); - let rpc_error: RpcError = Error::QuorumDriverError(quorum_driver_error).into(); - - let error_object: ErrorObjectOwned = rpc_error.into(); + let error_object: ErrorObjectOwned = + Error::QuorumDriverError(quorum_driver_error).into(); let expected_code = expect!["-32603"]; expected_code.assert_eq(&error_object.code().to_string()); let expected_message = expect!["Internal error occurred while executing transaction."]; @@ -574,9 +576,8 @@ mod tests { errors: vec![(SuiError::UnexpectedMessage("test".to_string()), 0, vec![])], }; - let rpc_error: RpcError = Error::QuorumDriverError(quorum_driver_error).into(); - - let error_object: ErrorObjectOwned = rpc_error.into(); + let error_object: ErrorObjectOwned = + Error::QuorumDriverError(quorum_driver_error).into(); let expected_code = expect!["-32050"]; expected_code.assert_eq(&error_object.code().to_string()); let expected_message = expect!["Transaction is not processed because 10 of validators by stake are overloaded with certificates pending execution."]; diff --git a/crates/sui-json-rpc/src/indexer_api.rs b/crates/sui-json-rpc/src/indexer_api.rs index ff15f1a88e3de..0f7b749ec2bd5 100644 --- a/crates/sui-json-rpc/src/indexer_api.rs +++ b/crates/sui-json-rpc/src/indexer_api.rs @@ -5,11 +5,10 @@ use std::sync::Arc; use anyhow::bail; use async_trait::async_trait; -use futures::{future, Stream}; +use futures::{future, Stream, StreamExt}; use jsonrpsee::{ - core::{error::SubscriptionClosed, RpcResult}, - types::SubscriptionResult, - RpcModule, SubscriptionSink, + core::{RpcResult, SubscriptionResult}, + PendingSubscriptionSink, RpcModule, }; use move_bytecode_utils::layout::TypeLayoutBuilder; use move_core_types::language_storage::TypeTag; @@ -36,7 +35,7 @@ use sui_types::{ event::EventID, }; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; -use tracing::{debug, instrument, warn}; +use tracing::{instrument, warn}; use crate::{ authority_state::{StateRead, StateReadResult}, @@ -46,29 +45,42 @@ use crate::{ }; pub fn spawn_subscription( - mut sink: SubscriptionSink, - rx: S, + sink: PendingSubscriptionSink, + mut rx: S, permit: Option, ) where S: Stream + Unpin + Send + 'static, - T: Serialize, + T: Serialize + Send, { spawn_monitored_task!(async move { - let _permit = permit; - match sink.pipe_from_stream(rx).await { - SubscriptionClosed::Success => { - debug!("Subscription completed."); - sink.close(SubscriptionClosed::Success); - } - SubscriptionClosed::RemotePeerAborted => { - debug!("Subscription aborted by remote peer."); - sink.close(SubscriptionClosed::RemotePeerAborted); - } - SubscriptionClosed::Failed(err) => { - debug!("Subscription failed: {err:?}"); - sink.close(err); - } + let Ok(sink) = sink.accept().await else { + return; }; + let _permit = permit; + + while let Some(item) = rx.next().await { + let Ok(message) = jsonrpsee::server::SubscriptionMessage::from_json(&item) else { + break; + }; + let Ok(()) = sink.send(message).await else { + break; + }; + } + + // match sink.pipe_from_stream(rx).await { + // SubscriptionClosed::Success => { + // debug!("Subscription completed."); + // sink.close(SubscriptionClosed::Success); + // } + // SubscriptionClosed::RemotePeerAborted => { + // debug!("Subscription aborted by remote peer."); + // sink.close(SubscriptionClosed::RemotePeerAborted); + // } + // SubscriptionClosed::Failed(err) => { + // debug!("Subscription failed: {err:?}"); + // sink.close(err); + // } + // }; }); } const DEFAULT_MAX_SUBSCRIPTIONS: usize = 100; @@ -295,7 +307,11 @@ impl IndexerApiServer for IndexerApi { } #[instrument(skip(self))] - fn subscribe_event(&self, sink: SubscriptionSink, filter: EventFilter) -> SubscriptionResult { + fn subscribe_event( + &self, + sink: PendingSubscriptionSink, + filter: EventFilter, + ) -> SubscriptionResult { let permit = self.acquire_subscribe_permit()?; spawn_subscription( sink, @@ -309,7 +325,7 @@ impl IndexerApiServer for IndexerApi { fn subscribe_transaction( &self, - sink: SubscriptionSink, + sink: PendingSubscriptionSink, filter: TransactionFilter, ) -> SubscriptionResult { let permit = self.acquire_subscribe_permit()?; diff --git a/crates/sui-json-rpc/src/lib.rs b/crates/sui-json-rpc/src/lib.rs index d9704bbcef767..977151fd548ba 100644 --- a/crates/sui-json-rpc/src/lib.rs +++ b/crates/sui-json-rpc/src/lib.rs @@ -3,20 +3,25 @@ use std::env; use std::net::SocketAddr; -use std::str::FromStr; +use std::sync::Arc; use axum::body::Body; +use axum::http; use hyper::header::HeaderName; use hyper::header::HeaderValue; use hyper::Method; use hyper::Request; use jsonrpsee::RpcModule; +use metrics::Metrics; +use metrics::MetricsLayer; use prometheus::Registry; use sui_core::traffic_controller::metrics::TrafficControllerMetrics; +use sui_core::traffic_controller::TrafficController; use sui_types::traffic_control::PolicyConfig; use sui_types::traffic_control::RemoteFirewallConfig; use tokio::runtime::Handle; use tokio_util::sync::CancellationToken; +use tower::ServiceBuilder; use tower_http::cors::{AllowOrigin, CorsLayer}; use tower_http::trace::TraceLayer; use tracing::info; @@ -29,13 +34,11 @@ use sui_json_rpc_api::{ CLIENT_TARGET_API_VERSION_HEADER, }; use sui_open_rpc::{Module, Project}; +use traffic_control::TrafficControllerService; use crate::error::Error; -use crate::metrics::MetricsLogger; -use crate::routing_layer::RpcRouter; pub mod authority_state; -pub mod axum_router; mod balance_changes; pub mod bridge_api; pub mod coin_api; @@ -48,7 +51,7 @@ pub mod move_utils; pub mod name_service; mod object_changes; pub mod read_api; -mod routing_layer; +mod traffic_control; pub mod transaction_builder_api; pub mod transaction_execution_api; @@ -164,98 +167,89 @@ impl JsonRpcServerBuilder { } pub async fn to_router(&self, server_type: ServerType) -> Result { - let routing = self.rpc_doc.method_routing.clone(); - - let disable_routing = env::var("DISABLE_BACKWARD_COMPATIBILITY") - .ok() - .and_then(|v| bool::from_str(&v).ok()) - .unwrap_or_default(); - info!( - "Compatibility method routing {}.", - if disable_routing { - "disabled" - } else { - "enabled" - } - ); - let rpc_router = RpcRouter::new(routing, disable_routing); - let rpc_docs = self.rpc_doc.clone(); let mut module = self.module.clone(); - module.register_method("rpc.discover", move |_, _| Ok(rpc_docs.clone()))?; + module.register_method("rpc.discover", move |_, _, _| { + Ok::<_, jsonrpsee::types::ErrorObjectOwned>(rpc_docs.clone()) + })?; let methods_names = module.method_names().collect::>(); - let metrics_logger = MetricsLogger::new(&self.registry, &methods_names); + let metrics = Arc::new(Metrics::new(&self.registry, &methods_names)); let traffic_controller_metrics = TrafficControllerMetrics::new(&self.registry); + let traffic_controller = self.policy_config.clone().map(|policy| { + Arc::new(TrafficController::init( + policy, + traffic_controller_metrics, + self.firewall_config.clone(), + )) + }); + let client_id_source = self + .policy_config + .clone() + .map(|policy| policy.client_id_source); - let middleware = tower::ServiceBuilder::new() + let metrics_clone = metrics.clone(); + let middleware = ServiceBuilder::new() .layer(Self::trace_layer()) - .layer(Self::cors()?); - - let service = crate::axum_router::JsonRpcService::new( - module.into(), - rpc_router, - metrics_logger, - self.firewall_config.clone(), - self.policy_config.clone(), - traffic_controller_metrics, - ); + .layer(Self::cors()?) + .map_request(move |mut request: http::Request<_>| { + metrics_clone.on_http_request(request.headers()); + if let Some(client_id_source) = client_id_source.clone() { + traffic_control::determine_client_ip(client_id_source, &mut request); + } + request + }); + + let (stop_handle, server_handle) = jsonrpsee::server::stop_channel(); + std::mem::forget(server_handle); + + let rpc_middleware = jsonrpsee::server::middleware::rpc::RpcServiceBuilder::new() + .layer_fn(move |s| MetricsLayer::new(s, metrics.clone())) + .layer_fn(move |s| TrafficControllerService::new(s, traffic_controller.clone())); + let service_builder = + jsonrpsee::server::ServerBuilder::new().set_rpc_middleware(rpc_middleware); let mut router = axum::Router::new(); - match server_type { ServerType::WebSocket => { + let service = JsonRpcService( + service_builder + .ws_only() + .to_service_builder() + .build(module, stop_handle), + ); router = router - .route( - "/", - axum::routing::get(crate::axum_router::ws::ws_json_rpc_upgrade), - ) - .route( - "/subscribe", - axum::routing::get(crate::axum_router::ws::ws_json_rpc_upgrade), - ); + .route("/", axum::routing::get_service(service.clone())) + .route("/subscribe", axum::routing::get_service(service)); } ServerType::Http => { + let service = JsonRpcService( + service_builder + .http_only() + .to_service_builder() + .build(module, stop_handle), + ); router = router - .route( - "/", - axum::routing::post(crate::axum_router::json_rpc_handler), - ) - .route( - "/json-rpc", - axum::routing::post(crate::axum_router::json_rpc_handler), - ) - .route( - "/public", - axum::routing::post(crate::axum_router::json_rpc_handler), - ); + .route("/", axum::routing::post_service(service.clone())) + .route("/json-rpc", axum::routing::post_service(service.clone())) + .route("/public", axum::routing::post_service(service)); } ServerType::Both => { + let service = JsonRpcService( + service_builder + .to_service_builder() + .build(module, stop_handle), + ); router = router - .route( - "/", - axum::routing::post(crate::axum_router::json_rpc_handler), - ) - .route( - "/", - axum::routing::get(crate::axum_router::ws::ws_json_rpc_upgrade), - ) - .route( - "/subscribe", - axum::routing::get(crate::axum_router::ws::ws_json_rpc_upgrade), - ) - .route( - "/json-rpc", - axum::routing::post(crate::axum_router::json_rpc_handler), - ) - .route( - "/public", - axum::routing::post(crate::axum_router::json_rpc_handler), - ); + .route("/", axum::routing::post_service(service.clone())) + .route("/", axum::routing::get_service(service.clone())) + .route("/subscribe", axum::routing::get_service(service.clone())) + .route("/json-rpc", axum::routing::post_service(service.clone())) + .route("/public", axum::routing::post_service(service)); } } - let app = router.with_state(service).layer(middleware); + let app = router.layer(middleware); info!("Available JSON-RPC methods : {:?}", methods_names); @@ -320,3 +314,44 @@ where fn rpc(self) -> RpcModule; fn rpc_doc_module() -> Module; } + +use jsonrpsee::core::BoxError; + +#[derive(Clone)] +struct JsonRpcService(S); + +impl tower::Service> for JsonRpcService +where + S: tower::Service< + http::Request, + Error = BoxError, + Response = http::Response, + Future: Send + 'static, + >, +{ + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = std::pin::Pin< + Box> + Send>, + >; + + fn poll_ready( + &mut self, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + + fn call(&mut self, request: http::Request) -> Self::Future { + let fut = self.0.call(request); + Box::pin(async move { + match fut.await { + Ok(response) => Ok(response), + Err(e) => Ok(http::Response::builder() + .status(http::status::StatusCode::INTERNAL_SERVER_ERROR) + .body(jsonrpsee::server::HttpBody::from(e.to_string())) + .unwrap()), + } + }) + } +} diff --git a/crates/sui-json-rpc/src/logger.rs b/crates/sui-json-rpc/src/logger.rs index 47aff63055141..c978bd2deaa48 100644 --- a/crates/sui-json-rpc/src/logger.rs +++ b/crates/sui-json-rpc/src/logger.rs @@ -5,8 +5,8 @@ macro_rules! with_tracing { ($time_spent_threshold:expr, $future:expr) => {{ use tracing::{info, error, Instrument, Span}; - use jsonrpsee::core::{RpcResult, Error as RpcError}; - use jsonrpsee::types::error::{CallError}; + use jsonrpsee::core::{RpcResult}; + use jsonrpsee::types::error::ErrorObjectOwned; use $crate::error::RpcInterimResult; use anyhow::anyhow; @@ -17,10 +17,9 @@ macro_rules! with_tracing { let result: RpcResult<_> = interim_result.map_err(|e: Error| { let anyhow_error = anyhow!("{:?}", e); - let rpc_error: RpcError = e.into(); - if !matches!(rpc_error, RpcError::Call(CallError::InvalidParams(_))) { - error!(error=?anyhow_error); - } + let rpc_error: ErrorObjectOwned = e.into(); + + error!(error=?anyhow_error); rpc_error }); diff --git a/crates/sui-json-rpc/src/metrics.rs b/crates/sui-json-rpc/src/metrics.rs index 26b8bd6a745f1..e42a99a209253 100644 --- a/crates/sui-json-rpc/src/metrics.rs +++ b/crates/sui-json-rpc/src/metrics.rs @@ -2,11 +2,10 @@ // SPDX-License-Identifier: Apache-2.0 use std::collections::HashSet; -use std::net::SocketAddr; +use std::sync::Arc; -use http_body::Body; -use jsonrpsee::server::logger::{HttpRequest, Logger, MethodKind, TransportProtocol}; -use jsonrpsee::types::Params; +use futures::FutureExt; +use jsonrpsee::server::middleware::rpc::RpcServiceT; use prometheus::{ register_histogram_vec_with_registry, register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry, HistogramVec, IntCounterVec, IntGaugeVec, @@ -35,31 +34,17 @@ pub struct Metrics { transient_errors_by_route: IntCounterVec, /// Client info client: IntCounterVec, - /// Connection count - inflight_connection: IntGaugeVec, /// Request size rpc_request_size: HistogramVec, /// Response size rpc_response_size: HistogramVec, -} -#[derive(Clone)] -pub struct MetricsLogger { - metrics: Metrics, method_whitelist: HashSet, } -impl MetricsLogger { - fn check_spam<'a>(&'a self, method_name: &'a str) -> &'a str { - if self.method_whitelist.contains(method_name) { - method_name - } else { - SPAM_LABEL - } - } - +impl Metrics { pub fn new(registry: &prometheus::Registry, method_whitelist: &[&str]) -> Self { - let metrics = Metrics { + Self { requests_by_route: register_int_counter_vec_with_registry!( "rpc_requests_by_route", "Number of requests by route", @@ -117,17 +102,10 @@ impl MetricsLogger { registry, ) .unwrap(), - inflight_connection: register_int_gauge_vec_with_registry!( - "rpc_inflight_connection", - "Number of inflight RPC connection by protocol", - &["protocol"], - registry, - ) - .unwrap(), rpc_request_size: register_histogram_vec_with_registry!( "rpc_request_size", "Request size of rpc requests", - &["protocol"], + &["route"], prometheus::exponential_buckets(32.0, 2.0, 19) .unwrap() .to_vec(), @@ -137,135 +115,129 @@ impl MetricsLogger { rpc_response_size: register_histogram_vec_with_registry!( "rpc_response_size", "Response size of rpc requests", - &["protocol"], + &["route"], prometheus::exponential_buckets(1024.0, 2.0, 20) .unwrap() .to_vec(), registry, ) .unwrap(), - }; - - Self { - metrics, method_whitelist: method_whitelist.iter().map(|s| (*s).into()).collect(), } } -} - -impl Logger for MetricsLogger { - type Instant = Instant; - - fn on_connect(&self, _remote_addr: SocketAddr, request: &HttpRequest, t: TransportProtocol) { - let client_type = request - .headers() - .get(CLIENT_SDK_TYPE_HEADER) - .and_then(|v| v.to_str().ok()) - .unwrap_or("Unknown"); - - let api_version = request - .headers() - .get(CLIENT_TARGET_API_VERSION_HEADER) - .and_then(|v| v.to_str().ok()) - .unwrap_or("Unknown"); - self.metrics - .client - .with_label_values(&[client_type, api_version]) - .inc(); - self.metrics - .inflight_connection - .with_label_values(&[&t.to_string()]) - .inc(); - - self.metrics - .rpc_request_size - .with_label_values(&[&t.to_string()]) - .observe( - request - .size_hint() - .exact() - .unwrap_or_else(|| request.size_hint().lower()) as f64, - ); - } - fn on_request(&self, _transport: TransportProtocol) -> Self::Instant { - Instant::now() + fn check_spam<'a>(&'a self, method_name: &'a str) -> &'a str { + if self.method_whitelist.contains(method_name) { + method_name + } else { + SPAM_LABEL + } } - fn on_call( - &self, - method_name: &str, - _params: Params, - _kind: MethodKind, - _transport: TransportProtocol, - ) { + fn on_request(&self, request: &jsonrpsee::types::Request<'_>) { + let method_name = request.method_name(); let method_name = self.check_spam(method_name); - self.metrics - .inflight_requests_by_route + self.inflight_requests_by_route .with_label_values(&[method_name]) .inc(); - self.metrics - .requests_by_route + self.requests_by_route .with_label_values(&[method_name]) .inc(); + + self.rpc_request_size + .with_label_values(&[method_name]) + .observe(request.params().len_bytes() as f64); } - fn on_result( + fn on_response( &self, method_name: &str, - _success: bool, - error_code: Option, - started_at: Self::Instant, - _transport: TransportProtocol, + started_at: Instant, + response: &jsonrpsee::MethodResponse, ) { let method_name = self.check_spam(method_name); - self.metrics - .inflight_requests_by_route + self.inflight_requests_by_route .with_label_values(&[method_name]) .dec(); let req_latency_secs = (Instant::now() - started_at).as_secs_f64(); - self.metrics - .req_latency_by_route + self.req_latency_by_route .with_label_values(&[method_name]) .observe(req_latency_secs); - if let Some(code) = error_code { + if let Some(code) = response.as_error_code() { if code == jsonrpsee::types::error::CALL_EXECUTION_FAILED_CODE || code == jsonrpsee::types::error::INTERNAL_ERROR_CODE { - self.metrics - .server_errors_by_route + self.server_errors_by_route .with_label_values(&[method_name]) .inc(); } else if code == TRANSIENT_ERROR_CODE { - self.metrics - .transient_errors_by_route + self.transient_errors_by_route .with_label_values(&[method_name]) .inc(); } else { - self.metrics - .client_errors_by_route + self.client_errors_by_route .with_label_values(&[method_name]) .inc(); } - self.metrics - .errors_by_route - .with_label_values(&[method_name]) - .inc(); + self.errors_by_route.with_label_values(&[method_name]).inc(); } + + self.rpc_response_size + .with_label_values(&[method_name]) + .observe(response.as_result().len() as f64) + } + + pub fn on_http_request(&self, headers: &axum::http::HeaderMap) { + let client_type = headers + .get(CLIENT_SDK_TYPE_HEADER) + .and_then(|v| v.to_str().ok()) + .unwrap_or("Unknown"); + + let api_version = headers + .get(CLIENT_TARGET_API_VERSION_HEADER) + .and_then(|v| v.to_str().ok()) + .unwrap_or("Unknown"); + self.client + .with_label_values(&[client_type, api_version]) + .inc(); } +} - fn on_response(&self, result: &str, _started_at: Self::Instant, t: TransportProtocol) { - self.metrics - .rpc_response_size - .with_label_values(&[&t.to_string()]) - .observe(std::mem::size_of_val(result) as f64) +#[derive(Clone)] +pub struct MetricsLayer { + inner: S, + metrics: Arc, +} + +impl MetricsLayer { + pub fn new(service: S, metrics: Arc) -> Self { + Self { + inner: service, + metrics, + } } +} - fn on_disconnect(&self, _remote_addr: SocketAddr, t: TransportProtocol) { - self.metrics - .inflight_connection - .with_label_values(&[&t.to_string()]) - .dec(); +impl<'a, S> RpcServiceT<'a> for MetricsLayer +where + S: RpcServiceT<'a> + Send + Sync, + S::Future: 'a, +{ + type Future = futures::future::BoxFuture<'a, jsonrpsee::MethodResponse>; + + fn call(&self, req: jsonrpsee::types::Request<'a>) -> Self::Future { + let metrics = self.metrics.clone(); + metrics.on_request(&req); + let method_name = req.method_name().to_owned(); + let started_at = Instant::now(); + let fut = self.inner.call(req); + + async move { + let response = fut.await; + metrics.on_response(&method_name, started_at, &response); + response + } + .boxed() } } diff --git a/crates/sui-json-rpc/src/move_utils.rs b/crates/sui-json-rpc/src/move_utils.rs index ff94b3fecc730..4834ffa74ad3c 100644 --- a/crates/sui-json-rpc/src/move_utils.rs +++ b/crates/sui-json-rpc/src/move_utils.rs @@ -285,7 +285,6 @@ mod tests { mod get_normalized_move_module_tests { use super::super::*; - use jsonrpsee::types::ErrorObjectOwned; use move_binary_format::file_format::basic_test_module; fn setup() -> (ObjectID, String) { @@ -335,8 +334,7 @@ mod tests { let response = move_utils .get_normalized_move_module(package, module_name) .await; - let error_result = response.unwrap_err(); - let error_object: ErrorObjectOwned = error_result.into(); + let error_object = response.unwrap_err(); assert_eq!(error_object.code(), -32602); assert_eq!(error_object.message(), &error_string); diff --git a/crates/sui-json-rpc/src/read_api.rs b/crates/sui-json-rpc/src/read_api.rs index 3982acc9c5b09..2626cf57d97df 100644 --- a/crates/sui-json-rpc/src/read_api.rs +++ b/crates/sui-json-rpc/src/read_api.rs @@ -993,20 +993,6 @@ impl ReadApiServer for ReadApi { }) } - #[instrument(skip(self))] - async fn get_checkpoints_deprecated_limit( - &self, - cursor: Option>, - limit: Option>, - descending_order: bool, - ) -> RpcResult { - with_tracing!(async move { - self.get_checkpoints(cursor, limit.map(|l| *l as usize), descending_order) - .await - .map_err(Error::from) - }) - } - #[instrument(skip(self))] async fn get_protocol_config( &self, diff --git a/crates/sui-json-rpc/src/routing_layer.rs b/crates/sui-json-rpc/src/routing_layer.rs deleted file mode 100644 index f885cb3bf1194..0000000000000 --- a/crates/sui-json-rpc/src/routing_layer.rs +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright (c) Mysten Labs, Inc. -// SPDX-License-Identifier: Apache-2.0 - -use std::collections::{HashMap, HashSet}; -use sui_open_rpc::MethodRouting; - -#[derive(Debug, Clone)] -pub struct RpcRouter { - routes: HashMap, - route_to_methods: HashSet, - disable_routing: bool, -} - -impl RpcRouter { - pub fn new(routes: HashMap, disable_routing: bool) -> Self { - let route_to_methods = routes.values().map(|v| v.route_to.clone()).collect(); - - Self { - routes, - route_to_methods, - disable_routing, - } - } - - pub fn route<'c, 'a: 'c, 'b: 'c>(&'a self, method: &'b str, version: Option<&str>) -> &'c str { - // Reject direct access to the old methods - if self.route_to_methods.contains(method) { - "INVALID_ROUTING" - } else if self.disable_routing { - method - } else { - // Modify the method name if routing is enabled - match (version, self.routes.get(method)) { - (Some(v), Some(route)) if route.matches(v) => route.route_to.as_str(), - _ => method, - } - } - } -} diff --git a/crates/sui-json-rpc/src/traffic_control.rs b/crates/sui-json-rpc/src/traffic_control.rs new file mode 100644 index 0000000000000..adaa8b064b464 --- /dev/null +++ b/crates/sui-json-rpc/src/traffic_control.rs @@ -0,0 +1,170 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use axum::extract::ConnectInfo; +use futures::FutureExt; +use jsonrpsee::server::middleware::rpc::RpcServiceT; +use jsonrpsee::types::{ErrorCode, ErrorObject, Id}; +use jsonrpsee::MethodResponse; +use std::net::IpAddr; +use std::time::SystemTime; +use std::{net::SocketAddr, sync::Arc}; +use sui_core::traffic_controller::{parse_ip, policies::TrafficTally, TrafficController}; +use sui_json_rpc_api::TRANSACTION_EXECUTION_CLIENT_ERROR_CODE; +use sui_types::traffic_control::ClientIdSource; +use sui_types::traffic_control::Weight; +use tracing::error; + +const TOO_MANY_REQUESTS_MSG: &str = "Too many requests"; + +#[derive(Clone)] +pub struct TrafficControllerService { + inner: S, + traffic_controller: Option>, +} + +impl TrafficControllerService { + pub fn new(service: S, traffic_controller: Option>) -> Self { + Self { + inner: service, + traffic_controller, + } + } +} + +impl<'a, S> RpcServiceT<'a> for TrafficControllerService +where + S: RpcServiceT<'a> + Send + Sync + Clone + 'static, + S::Future: 'a, +{ + type Future = futures::future::BoxFuture<'a, jsonrpsee::MethodResponse>; + + fn call(&self, req: jsonrpsee::types::Request<'a>) -> Self::Future { + let service = self.inner.clone(); + let traffic_controller = self.traffic_controller.clone(); + + async move { + if let Some(traffic_controller) = traffic_controller { + let client = req.extensions().get::().cloned(); + if let Err(response) = handle_traffic_req(&traffic_controller, &client).await { + response + } else { + let response = service.call(req).await; + handle_traffic_resp(&traffic_controller, client, &response); + response + } + } else { + service.call(req).await + } + } + .boxed() + } +} + +async fn handle_traffic_req( + traffic_controller: &TrafficController, + client: &Option, +) -> Result<(), MethodResponse> { + if !traffic_controller.check(client, &None).await { + // Entity in blocklist + let err_obj = + ErrorObject::borrowed(ErrorCode::ServerIsBusy.code(), TOO_MANY_REQUESTS_MSG, None); + Err(MethodResponse::error(Id::Null, err_obj)) + } else { + Ok(()) + } +} + +fn handle_traffic_resp( + traffic_controller: &TrafficController, + client: Option, + response: &MethodResponse, +) { + let error = response.as_error_code().map(ErrorCode::from); + traffic_controller.tally(TrafficTally { + direct: client, + through_fullnode: None, + error_info: error.map(|e| { + let error_type = e.to_string(); + let error_weight = normalize(e); + (error_weight, error_type) + }), + // For now, count everything as spam with equal weight + // on the rpc node side, including gas-charging endpoints + // such as `sui_executeTransactionBlock`, as this can enable + // node operators who wish to rate limit their transcation + // traffic and incentivize high volume clients to choose a + // suitable rpc provider (or run their own). Later we may want + // to provide a weight distribution based on the method being called. + spam_weight: Weight::one(), + timestamp: SystemTime::now(), + }); +} + +// TODO: refine error matching here +fn normalize(err: ErrorCode) -> Weight { + match err { + ErrorCode::InvalidRequest | ErrorCode::InvalidParams => Weight::one(), + // e.g. invalid client signature + ErrorCode::ServerError(i) if i == TRANSACTION_EXECUTION_CLIENT_ERROR_CODE => Weight::one(), + _ => Weight::zero(), + } +} + +pub fn determine_client_ip( + client_id_source: ClientIdSource, + request: &mut axum::http::Request, +) { + let headers = request.headers(); + let client = match client_id_source { + ClientIdSource::SocketAddr => request + .extensions() + .get::>() + .map(|info| info.0.ip()), + ClientIdSource::XForwardedFor(num_hops) => { + let do_header_parse = |header: &axum::http::HeaderValue| match header.to_str() { + Ok(header_val) => { + let header_contents = header_val.split(',').map(str::trim).collect::>(); + if num_hops == 0 { + error!( + "x-forwarded-for: 0 specified. x-forwarded-for contents: {:?}. Please assign nonzero value for \ + number of hops here, or use `socket-addr` client-id-source type if requests are not being proxied \ + to this node. Skipping traffic controller request handling.", + header_contents, + ); + return None; + } + let contents_len = header_contents.len(); + let Some(client_ip) = header_contents.get(contents_len - num_hops) else { + error!( + "x-forwarded-for header value of {:?} contains {} values, but {} hops were specificed. \ + Expected {} values. Skipping traffic controller request handling.", + header_contents, + contents_len, + num_hops, + num_hops + 1, + ); + return None; + }; + parse_ip(client_ip) + } + Err(e) => { + error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e); + None + } + }; + if let Some(header) = headers.get("x-forwarded-for") { + do_header_parse(header) + } else if let Some(header) = headers.get("X-Forwarded-For") { + do_header_parse(header) + } else { + error!("x-forwarded-for header not present for request despite node configuring x-forwarded-for tracking type"); + None + } + } + }; + + if let Some(ip) = client { + request.extensions_mut().insert(ip); + } +} diff --git a/crates/sui-json-rpc/src/transaction_builder_api.rs b/crates/sui-json-rpc/src/transaction_builder_api.rs index 3a691611beeae..5e27d46adc50b 100644 --- a/crates/sui-json-rpc/src/transaction_builder_api.rs +++ b/crates/sui-json-rpc/src/transaction_builder_api.rs @@ -92,8 +92,9 @@ impl TransactionBuilderServer for TransactionBuilderApi { let data = self .0 .transfer_object(signer, object_id, gas, *gas_budget, recipient) - .await?; - Ok(TransactionBlockBytes::from_data(data)?) + .await + .map_err(crate::Error::from)?; + Ok(TransactionBlockBytes::from_data(data).map_err(crate::Error::from)?) } async fn transfer_sui( @@ -113,8 +114,9 @@ impl TransactionBuilderServer for TransactionBuilderApi { recipient, amount.map(|a| *a), ) - .await?; - Ok(TransactionBlockBytes::from_data(data)?) + .await + .map_err(crate::Error::from)?; + Ok(TransactionBlockBytes::from_data(data).map_err(crate::Error::from)?) } async fn pay( @@ -136,8 +138,9 @@ impl TransactionBuilderServer for TransactionBuilderApi { gas, *gas_budget, ) - .await?; - Ok(TransactionBlockBytes::from_data(data)?) + .await + .map_err(crate::Error::from)?; + Ok(TransactionBlockBytes::from_data(data).map_err(crate::Error::from)?) } async fn pay_sui( @@ -157,8 +160,9 @@ impl TransactionBuilderServer for TransactionBuilderApi { amounts.into_iter().map(|a| *a).collect(), *gas_budget, ) - .await?; - Ok(TransactionBlockBytes::from_data(data)?) + .await + .map_err(crate::Error::from)?; + Ok(TransactionBlockBytes::from_data(data).map_err(crate::Error::from)?) } async fn pay_all_sui( @@ -171,8 +175,9 @@ impl TransactionBuilderServer for TransactionBuilderApi { let data = self .0 .pay_all_sui(signer, input_coins, recipient, *gas_budget) - .await?; - Ok(TransactionBlockBytes::from_data(data)?) + .await + .map_err(crate::Error::from)?; + Ok(TransactionBlockBytes::from_data(data).map_err(crate::Error::from)?) } async fn publish( @@ -186,12 +191,14 @@ impl TransactionBuilderServer for TransactionBuilderApi { let compiled_modules = compiled_modules .into_iter() .map(|data| data.to_vec().map_err(|e| anyhow::anyhow!(e))) - .collect::, _>>()?; + .collect::, _>>() + .map_err(crate::Error::from)?; let data = self .0 .publish(sender, compiled_modules, dependencies, gas, *gas_budget) - .await?; - Ok(TransactionBlockBytes::from_data(data)?) + .await + .map_err(crate::Error::from)?; + Ok(TransactionBlockBytes::from_data(data).map_err(crate::Error::from)?) } async fn split_coin( @@ -206,8 +213,9 @@ impl TransactionBuilderServer for TransactionBuilderApi { let data = self .0 .split_coin(signer, coin_object_id, split_amounts, gas, *gas_budget) - .await?; - Ok(TransactionBlockBytes::from_data(data)?) + .await + .map_err(crate::Error::from)?; + Ok(TransactionBlockBytes::from_data(data).map_err(crate::Error::from)?) } async fn split_coin_equal( @@ -221,8 +229,9 @@ impl TransactionBuilderServer for TransactionBuilderApi { let data = self .0 .split_coin_equal(signer, coin_object_id, *split_count, gas, *gas_budget) - .await?; - Ok(TransactionBlockBytes::from_data(data)?) + .await + .map_err(crate::Error::from)?; + Ok(TransactionBlockBytes::from_data(data).map_err(crate::Error::from)?) } async fn merge_coin( @@ -236,8 +245,9 @@ impl TransactionBuilderServer for TransactionBuilderApi { let data = self .0 .merge_coins(signer, primary_coin, coin_to_merge, gas, *gas_budget) - .await?; - Ok(TransactionBlockBytes::from_data(data)?) + .await + .map_err(crate::Error::from)?; + Ok(TransactionBlockBytes::from_data(data).map_err(crate::Error::from)?) } async fn move_call( @@ -265,8 +275,10 @@ impl TransactionBuilderServer for TransactionBuilderApi { *gas_budget, None, ) - .await?, - )?) + .await + .map_err(crate::Error::from)?, + ) + .map_err(crate::Error::from)?) } async fn batch_transaction( @@ -280,8 +292,10 @@ impl TransactionBuilderServer for TransactionBuilderApi { Ok(TransactionBlockBytes::from_data( self.0 .batch_transaction(signer, params, gas, *gas_budget) - .await?, - )?) + .await + .map_err(crate::Error::from)?, + ) + .map_err(crate::Error::from)?) } async fn request_add_stake( @@ -297,8 +311,10 @@ impl TransactionBuilderServer for TransactionBuilderApi { Ok(TransactionBlockBytes::from_data( self.0 .request_add_stake(signer, coins, amount, validator, gas, *gas_budget) - .await?, - )?) + .await + .map_err(crate::Error::from)?, + ) + .map_err(crate::Error::from)?) } async fn request_withdraw_stake( @@ -311,8 +327,10 @@ impl TransactionBuilderServer for TransactionBuilderApi { Ok(TransactionBlockBytes::from_data( self.0 .request_withdraw_stake(signer, staked_sui, gas, *gas_budget) - .await?, - )?) + .await + .map_err(crate::Error::from)?, + ) + .map_err(crate::Error::from)?) } } diff --git a/crates/sui-replay/src/types.rs b/crates/sui-replay/src/types.rs index 045d9ab54ae4c..320fda9b857df 100644 --- a/crates/sui-replay/src/types.rs +++ b/crates/sui-replay/src/types.rs @@ -1,7 +1,7 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use jsonrpsee::core::Error as JsonRpseeError; +use jsonrpsee::core::ClientError as JsonRpseeError; use move_binary_format::CompiledModule; use move_core_types::account_address::AccountAddress; use move_core_types::language_storage::{ModuleId, StructTag}; diff --git a/crates/sui-sdk/src/error.rs b/crates/sui-sdk/src/error.rs index 5344b9466f2e6..7ef191512f3bd 100644 --- a/crates/sui-sdk/src/error.rs +++ b/crates/sui-sdk/src/error.rs @@ -11,12 +11,14 @@ pub type SuiRpcResult = Result; #[derive(Error, Debug)] pub enum Error { #[error(transparent)] - RpcError(#[from] jsonrpsee::core::Error), + RpcError(#[from] jsonrpsee::core::ClientError), #[error(transparent)] JsonRpcError(JsonRpcError), #[error(transparent)] BcsSerialisationError(#[from] bcs::Error), #[error(transparent)] + JsonSerializationError(#[from] serde_json::Error), + #[error(transparent)] UserInputError(#[from] UserInputError), #[error("Subscription error : {0}")] Subscription(String), diff --git a/crates/sui-sdk/src/json_rpc_error.rs b/crates/sui-sdk/src/json_rpc_error.rs index 78f2f2c63539e..ab75b28e03d9a 100644 --- a/crates/sui-sdk/src/json_rpc_error.rs +++ b/crates/sui-sdk/src/json_rpc_error.rs @@ -1,6 +1,6 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use jsonrpsee::types::{error::UNKNOWN_ERROR_CODE, ErrorObjectOwned}; +use jsonrpsee::types::error::UNKNOWN_ERROR_CODE; pub use sui_json_rpc_api::{TRANSACTION_EXECUTION_CLIENT_ERROR_CODE, TRANSIENT_ERROR_CODE}; use thiserror::Error; @@ -49,11 +49,14 @@ impl Error { } } -impl From for Error { - fn from(err: jsonrpsee::core::Error) -> Self { +impl From for Error { + fn from(err: jsonrpsee::core::ClientError) -> Self { // The following code relies on jsonrpsee's From for ErrorObjectOwned implementation // It converts any variant that is not Error::Call into an ErrorObject with UNKNOWN_ERROR_CODE - let error_object_owned: ErrorObjectOwned = err.into(); + let error_object_owned = match err { + jsonrpsee::core::ClientError::Call(e) => e, + _ => jsonrpsee::types::error::ErrorCode::from(UNKNOWN_ERROR_CODE).into(), + }; Error { code: error_object_owned.code(), message: error_object_owned.message().to_string(), diff --git a/crates/sui-sdk/src/lib.rs b/crates/sui-sdk/src/lib.rs index 2ac15a4bfd021..19ba1ca5f6883 100644 --- a/crates/sui-sdk/src/lib.rs +++ b/crates/sui-sdk/src/lib.rs @@ -231,13 +231,15 @@ impl SuiClientBuilder { let ws = if let Some(url) = self.ws_url { let mut builder = WsClientBuilder::default() - .max_request_body_size(2 << 30) + .max_request_size(2 << 30) .max_concurrent_requests(self.max_concurrent_requests) .set_headers(headers.clone()) .request_timeout(self.request_timeout); if let Some(duration) = self.ws_ping_interval { - builder = builder.ping_interval(duration) + builder = builder.enable_ws_ping( + jsonrpsee::ws_client::PingConfig::new().ping_interval(duration), + ); } builder.build(url).await.ok() @@ -246,7 +248,7 @@ impl SuiClientBuilder { }; let http = HttpClientBuilder::default() - .max_request_body_size(2 << 30) + .max_request_size(2 << 30) .max_concurrent_requests(self.max_concurrent_requests) .set_headers(headers.clone()) .request_timeout(self.request_timeout) diff --git a/deny.toml b/deny.toml index cbebdf3686c7b..1925ea28f0531 100644 --- a/deny.toml +++ b/deny.toml @@ -214,5 +214,4 @@ github = [ "bmwill", "mystenlabs", "nextest-rs", - "wlmyng", # jsonrpsee fork ]