diff --git a/.gitignore b/.gitignore index a3a6384fe..a506158a4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ bazel-* target/ .vscode/ +.idea/ .zed .cache .terraform* diff --git a/Cargo.lock b/Cargo.lock index 6e25bb2b5..2ea8f4f8f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,12 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "RustyXML" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b5ace29ee3216de37c0546865ad08edef58b0f9e76838ed8959a84a990e58c5" + [[package]] name = "addr2line" version = "0.24.2" @@ -116,13 +122,24 @@ dependencies = [ "serde_json", ] +[[package]] +name = "async-channel" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" +dependencies = [ + "concurrent-queue", + "event-listener 2.5.3", + "futures-core", +] + [[package]] name = "async-lock" version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18" dependencies = [ - "event-listener", + "event-listener 5.3.1", "event-listener-strategy", "pin-project-lite", ] @@ -198,7 +215,7 @@ dependencies = [ "aws-smithy-types", "aws-types", "bytes", - "fastrand", + "fastrand 2.3.0", "http 0.2.12", "time", "tokio", @@ -234,7 +251,7 @@ dependencies = [ "aws-smithy-types", "aws-types", "bytes", - "fastrand", + "fastrand 2.3.0", "http 0.2.12", "http-body 0.4.6", "once_cell", @@ -264,7 +281,7 @@ dependencies = [ "aws-smithy-xml", "aws-types", "bytes", - "fastrand", + "fastrand 2.3.0", "hex", "hmac", "http 0.2.12", @@ -448,7 +465,7 @@ dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", "bytes", - "fastrand", + "fastrand 2.3.0", "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", @@ -581,6 +598,91 @@ dependencies = [ "tower-service", ] +[[package]] +name = "azure_core" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b552ad43a45a746461ec3d3a51dfb6466b4759209414b439c165eb6a6b7729e" +dependencies = [ + "async-trait", + "base64 0.22.1", + "bytes", + "dyn-clone", + "futures", + "getrandom 0.2.15", + "hmac", + "http-types", + "once_cell", + "paste", + "pin-project", + "quick-xml", + "rand 0.8.5", + "rustc_version", + "serde", + "serde_json", + "sha2", + "time", + "tracing", + "url", + "uuid", +] + +[[package]] +name = "azure_storage" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59f838159f4d29cb400a14d9d757578ba495ae64feb07a7516bf9e4415127126" +dependencies = [ + "RustyXML", + "async-lock", + "async-trait", + "azure_core", + "bytes", + "serde", + "serde_derive", + "time", + "tracing", + "url", + "uuid", +] + +[[package]] +name = "azure_storage_blobs" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97e83c3636ae86d9a6a7962b2112e3b19eb3903915c50ce06ff54ff0a2e6a7e4" +dependencies = [ + "RustyXML", + "azure_core", + "azure_storage", + "azure_svc_blobstorage", + "bytes", + "futures", + "serde", + "serde_derive", + "serde_json", + "time", + "tracing", + "url", + "uuid", +] + +[[package]] +name = "azure_svc_blobstorage" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e6c6f20c5611b885ba94c7bae5e02849a267381aecb8aee577e8c35ff4064c6" +dependencies = [ + "azure_core", + "bytes", + "futures", + "log", + "once_cell", + "serde", + "serde_json", + "time", +] + [[package]] name = "backtrace" version = "0.3.74" @@ -596,6 +698,12 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "base64" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" + [[package]] name = "base64" version = "0.21.7" @@ -1006,6 +1114,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" dependencies = [ "powerfmt", + "serde", ] [[package]] @@ -1036,6 +1145,12 @@ dependencies = [ "syn 2.0.90", ] +[[package]] +name = "dyn-clone" +version = "1.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d6ef0072f8a535281e4876be788938b528e9a1d43900b82c2569af7da799125" + [[package]] name = "either" version = "1.13.0" @@ -1058,6 +1173,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + [[package]] name = "event-listener" version = "5.3.1" @@ -1075,7 +1196,7 @@ version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c3e4e0dd3673c1139bf041f3008816d9cf2946bbfac2945c09e523b8d7b05b2" dependencies = [ - "event-listener", + "event-listener 5.3.1", "pin-project-lite", ] @@ -1091,6 +1212,15 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "fastrand" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" +dependencies = [ + "instant", +] + [[package]] name = "fastrand" version = "2.3.0" @@ -1176,7 +1306,7 @@ dependencies = [ "futures", "log", "parking_lot", - "rand", + "rand 0.8.5", "redis-protocol", "rustls 0.23.20", "rustls-native-certs 0.8.1", @@ -1250,6 +1380,21 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" +[[package]] +name = "futures-lite" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" +dependencies = [ + "fastrand 1.9.0", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + [[package]] name = "futures-macro" version = "0.3.31" @@ -1301,6 +1446,17 @@ dependencies = [ "version_check", ] +[[package]] +name = "getrandom" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fc3cb4d91f53b50155bdcfd23f6a4c39ae1969c2ae85982b135750cccaf5fce" +dependencies = [ + "cfg-if", + "libc", + "wasi 0.9.0+wasi-snapshot-preview1", +] + [[package]] name = "getrandom" version = "0.2.15" @@ -1308,8 +1464,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" dependencies = [ "cfg-if", + "js-sys", "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", + "wasm-bindgen", ] [[package]] @@ -1479,6 +1637,26 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-types" +version = "2.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e9b187a72d63adbfba487f48095306ac823049cb504ee195541e91c7775f5ad" +dependencies = [ + "anyhow", + "async-channel", + "base64 0.13.1", + "futures-lite", + "infer", + "pin-project-lite", + "rand 0.7.3", + "serde", + "serde_json", + "serde_qs", + "serde_urlencoded", + "url", +] + [[package]] name = "httparse" version = "1.9.5" @@ -1751,6 +1929,21 @@ dependencies = [ "serde", ] +[[package]] +name = "infer" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64e9829a50b42bb782c1df523f78d332fe371b10c661e78b7a3c34b0198e9fac" + +[[package]] +name = "instant" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" +dependencies = [ + "cfg-if", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.1" @@ -1951,7 +2144,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd" dependencies = [ "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys 0.52.0", ] @@ -2109,7 +2302,7 @@ dependencies = [ "parking_lot", "pretty_assertions", "prost", - "rand", + "rand 0.8.5", "scopeguard", "serde", "serde_json", @@ -2167,6 +2360,9 @@ dependencies = [ "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", + "azure_core", + "azure_storage", + "azure_storage_blobs", "bincode", "blake3", "byteorder", @@ -2195,7 +2391,7 @@ dependencies = [ "patricia_tree", "pretty_assertions", "prost", - "rand", + "rand 0.8.5", "serde", "serde_json", "serial_test", @@ -2238,7 +2434,7 @@ dependencies = [ "pretty_assertions", "prost", "prost-types", - "rand", + "rand 0.8.5", "serde", "serde_json", "sha2", @@ -2274,7 +2470,7 @@ dependencies = [ "pretty_assertions", "prost", "prost-types", - "rand", + "rand 0.8.5", "relative-path", "scopeguard", "serde", @@ -2457,6 +2653,12 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "paste" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" + [[package]] name = "patricia_tree" version = "0.8.0" @@ -2479,7 +2681,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b7cafe60d6cf8e62e1b9b2ea516a089c008945bb5a275416789e7db0bc199dc" dependencies = [ "memchr", - "thiserror 2.0.6", + "thiserror 2.0.9", "ucd-trie", ] @@ -2700,6 +2902,16 @@ version = "2.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" +[[package]] +name = "quick-xml" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1004a344b30a54e2ee58d66a71b32d2db2feb0a31f9a2d302bf0536f15de2a33" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quote" version = "1.0.37" @@ -2709,6 +2921,19 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" +dependencies = [ + "getrandom 0.1.16", + "libc", + "rand_chacha 0.2.2", + "rand_core 0.5.1", + "rand_hc", +] + [[package]] name = "rand" version = "0.8.5" @@ -2716,8 +2941,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", - "rand_chacha", - "rand_core", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_chacha" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" +dependencies = [ + "ppv-lite86", + "rand_core 0.5.1", ] [[package]] @@ -2727,7 +2962,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_core" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" +dependencies = [ + "getrandom 0.1.16", ] [[package]] @@ -2736,7 +2980,16 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" dependencies = [ - "getrandom", + "getrandom 0.2.15", +] + +[[package]] +name = "rand_hc" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" +dependencies = [ + "rand_core 0.5.1", ] [[package]] @@ -2826,7 +3079,7 @@ checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d" dependencies = [ "cc", "cfg-if", - "getrandom", + "getrandom 0.2.15", "libc", "spin", "untrusted", @@ -3120,6 +3373,29 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_qs" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7715380eec75f029a4ef7de39a9200e0a63823176b759d055b613f5a87df6a6" +dependencies = [ + "percent-encoding", + "serde", + "thiserror 1.0.69", +] + +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "serial_test" version = "3.2.0" @@ -3308,7 +3584,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28cce251fcbc87fac86a866eeb0d6c2d536fc16d06f184bb61aeae11aa4cee0c" dependencies = [ "cfg-if", - "fastrand", + "fastrand 2.3.0", "once_cell", "rustix", "windows-sys 0.59.0", @@ -3325,11 +3601,11 @@ dependencies = [ [[package]] name = "thiserror" -version = "2.0.6" +version = "2.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fec2a1820ebd077e2b90c4df007bebf344cd394098a13c563957d0afc83ea47" +checksum = "f072643fd0190df67a8bab670c20ef5d8737177d6ac6b2e9a236cb096206b2cc" dependencies = [ - "thiserror-impl 2.0.6", + "thiserror-impl 2.0.9", ] [[package]] @@ -3345,9 +3621,9 @@ dependencies = [ [[package]] name = "thiserror-impl" -version = "2.0.6" +version = "2.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d65750cab40f4ff1929fb1ba509e9914eb756131cef4210da8d5d700d26f6312" +checksum = "7b50fa271071aae2e6ee85f842e2e28ba8cd2c5fb67f11fcb1fd70b276f9e7d4" dependencies = [ "proc-macro2", "quote", @@ -3371,6 +3647,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "35e7868883861bd0e56d9ac6efcaaca0d6d5d82a2a7ec8209ff492c07cf37b21" dependencies = [ "deranged", + "itoa", + "js-sys", "num-conv", "powerfmt", "serde", @@ -3550,7 +3828,7 @@ dependencies = [ "indexmap 1.9.3", "pin-project", "pin-project-lite", - "rand", + "rand 0.8.5", "slab", "tokio", "tokio-util", @@ -3704,6 +3982,7 @@ dependencies = [ "form_urlencoded", "idna", "percent-encoding", + "serde", ] [[package]] @@ -3743,7 +4022,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" dependencies = [ "atomic", - "getrandom", + "getrandom 0.2.15", "serde", ] @@ -3765,6 +4044,12 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" +[[package]] +name = "waker-fn" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "317211a0dc0ceedd78fb2ca9a44aed3d7b9b26f81870d485c07122b4350673b7" + [[package]] name = "want" version = "0.3.1" @@ -3774,6 +4059,12 @@ dependencies = [ "try-lock", ] +[[package]] +name = "wasi" +version = "0.9.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" diff --git a/nativelink-config/examples/README.md b/nativelink-config/examples/README.md index 50828e65c..d1779c825 100644 --- a/nativelink-config/examples/README.md +++ b/nativelink-config/examples/README.md @@ -41,7 +41,7 @@ The value of `stores` includes top-level keys, which are user supplied names sto ### Store Type Once the store has been named and its object exists, -the next key is the type of store. The options are `filesystem`, `memory`, `compression`, `dedup`, `fast_slow`, `verify`, and `experimental_s3_store`. +the next key is the type of store. The options are `filesystem`, `memory`, `compression`, `dedup`, `fast_slow`, `verify`, and `experimental_s3_store` and `experimental_azure_blob_store`. ```json5 { diff --git a/nativelink-config/examples/azure_blob_backend_with_local_fast_cas.json5 b/nativelink-config/examples/azure_blob_backend_with_local_fast_cas.json5 new file mode 100644 index 000000000..26a6c1259 --- /dev/null +++ b/nativelink-config/examples/azure_blob_backend_with_local_fast_cas.json5 @@ -0,0 +1,172 @@ +{ + "stores": { + "CAS_MAIN_STORE": { + "verify": { + "backend": { + "dedup": { + "index_store": { + "fast_slow": { + "fast": { + "filesystem": { + "content_path": "/tmp/nativelink/data/content_path-index", + "temp_path": "/tmp/nativelink/data/tmp_path-index", + "eviction_policy": { + "max_bytes": 500000000 + } + } + }, + "slow": { + "experimental_azure_blob_store": { + "account_name": "storageaccount", + "account_key": "${AZURE_STORAGE_KEY}", + "container": "container", + "blob_prefix": "test-prefix-index/", + "endpoint_suffix": "core.windows.net", + "retry": { + "max_retries": 6, + "delay": 0.3, + "jitter": 0.5 + }, + "max_concurrent_uploads": 10 + } + } + } + }, + "content_store": { + "compression": { + "compression_algorithm": { + "lz4": {} + }, + "backend": { + "fast_slow": { + "fast": { + "filesystem": { + "content_path": "/tmp/nativelink/data/content_path-content", + "temp_path": "/tmp/nativelink/data/tmp_path-content", + "eviction_policy": { + "max_bytes": 2000000000 + } + } + }, + "slow": { + "experimental_azure_blob_store": { + "account_name": "storageaccount", + "account_key": "${AZURE_STORAGE_KEY}", + "container": "container", + "blob_prefix": "test-prefix-dedup-cas/", + "endpoint_suffix": "core.windows.net", + "retry": { + "max_retries": 6, + "delay": 0.3, + "jitter": 0.5 + }, + "max_concurrent_uploads": 10 + } + } + } + } + } + } + } + }, + "verify_size": true, + "verify_hash": true + } + }, + "AC_MAIN_STORE": { + "fast_slow": { + "fast": { + "memory": { + "eviction_policy": { + "max_bytes": 100000000 + } + }, + "filesystem": { + "content_path": "/tmp/nativelink/data/content_path-ac", + "temp_path": "/tmp/nativelink/data/tmp_path-ac", + "eviction_policy": { + "max_bytes": 500000000 + } + } + }, + "slow": { + "experimental_azure_blob_store": { + "account_name": "storageaccount", + "account_key": "${AZURE_STORAGE_KEY}", + "container": "container", + "blob_prefix": "test-prefix-ac/", + "endpoint_suffix": "core.windows.net", + "retry": { + "max_retries": 6, + "delay": 0.3, + "jitter": 0.5 + }, + "max_concurrent_uploads": 10 + } + } + } + } + }, + "schedulers": { + "MAIN_SCHEDULER": { + "simple": { + "supported_platform_properties": { + "cpu_count": "minimum", + "memory_kb": "minimum", + "network_kbps": "minimum", + "disk_read_iops": "minimum", + "disk_read_bps": "minimum", + "disk_write_iops": "minimum", + "disk_write_bps": "minimum", + "shm_size": "minimum", + "gpu_count": "minimum", + "gpu_model": "exact", + "cpu_vendor": "exact", + "cpu_arch": "exact", + "cpu_model": "exact", + "kernel_version": "exact", + "docker_image": "priority", + "lre-rs": "priority" + } + } + } + }, + "servers": [{ + "listener": { + "http": { + "socket_address": "0.0.0.0:50051" + } + }, + "services": { + "cas": { + "main": { + "cas_store": "CAS_MAIN_STORE" + } + }, + "ac": { + "main": { + "ac_store": "AC_MAIN_STORE" + } + }, + "execution": { + "main": { + "cas_store": "CAS_MAIN_STORE", + "scheduler": "MAIN_SCHEDULER" + } + }, + "capabilities": { + "main": { + "remote_execution": { + "scheduler": "MAIN_SCHEDULER" + } + } + }, + "bytestream": { + "cas_stores": { + "main": "CAS_MAIN_STORE" + } + }, + "health": {} + } + }] +} diff --git a/nativelink-config/src/stores.rs b/nativelink-config/src/stores.rs index 7e6e31821..d51f4dafc 100644 --- a/nativelink-config/src/stores.rs +++ b/nativelink-config/src/stores.rs @@ -54,6 +54,32 @@ pub enum StoreSpec { /// memory(MemorySpec), + /// Azure Blob store will use Microsoft's Azure Blob service as a + /// backend to store the files. This configuration can be used to + /// share files across multiple instances. + /// + /// This configuration will never delete files, so you are + /// responsible for purging old files in other ways. + /// + /// **Example JSON Config:** + /// ```json + /// "experimental_azure_blob_store": { + /// "account_name": "cloudshell1393657559", + /// "account_key": "${AZURE_STORAGE_KEY}", + /// "container": "simple-test-container", + /// "blob_prefix": "folder/", + /// "endpoint_suffix": "core.windows.net", + /// "retry": { + /// "max_retries": 6, + /// "delay": 0.3, + /// "jitter": 0.5 + /// }, + /// "max_concurrent_uploads": 10 + /// } + /// ``` + /// + experimental_azure_blob_store(AzureBlobSpec), + /// S3 store will use Amazon's S3 service as a backend to store /// the files. This configuration can be used to share files /// across multiple instances. @@ -787,6 +813,96 @@ pub struct S3Spec { pub disable_http2: bool, } +#[derive(Serialize, Deserialize, Debug, Default, Clone)] +#[serde(deny_unknown_fields)] +pub struct AzureBlobSpec { + /// The Azure Storage account name + #[serde(default, deserialize_with = "convert_string_with_shellexpand")] + pub account_name: String, + + /// The Azure Storage account key + #[serde(default, deserialize_with = "convert_string_with_shellexpand")] + pub account_key: String, + + /// The container name to use as the backend + #[serde(default, deserialize_with = "convert_string_with_shellexpand")] + pub container: String, + + /// Optional prefix for blob paths within the container. If None, no prefix will be used. + #[serde(default)] + pub blob_prefix: Option, + + /// Retry configuration to use when a network request fails. + #[serde(default)] + pub retry: Retry, + + /// If the number of seconds since the blob's last modified time + /// is greater than this value, the blob will not be considered + /// "existing". This allows for external lifecycle management policies + /// to delete blobs that haven't been accessed in a long time. + /// If a client receives a `NotFound`, it should re-upload the blob. + /// + /// There should be sufficient buffer time between the lifecycle policy's + /// deletion threshold and this value. A few days buffer is recommended. + /// + /// Default: 0. Zero means never consider a blob expired. + #[serde(default, deserialize_with = "convert_duration_with_shellexpand")] + pub consider_expired_after_s: u32, + + /// The maximum buffer size to retain for retryable upload errors. + /// Setting this to zero will disable upload buffering, meaning + /// upload failures will result in a complete abort and likely + /// client errors. + /// + /// Default: 5MB. + pub max_retry_buffer_per_request: Option, + + /// Maximum number of concurrent block uploads per blob. + /// This affects the parallelism of block blob uploads. + /// + /// Default: 10. + pub max_concurrent_uploads: Option, + + /// Allow unencrypted HTTP connections. Only use for local testing + /// with Azurite or other local emulators. + /// + /// Default: false + #[serde(default)] + pub insecure_allow_http: bool, + + /// The Azure Storage endpoint suffix. + /// Examples: "core.windows.net", "core.chinacloudapi.cn" + /// + /// Default: "core.windows.net" + #[serde(default)] + pub endpoint_suffix: String, + + /// Service version to use when making requests to Azure Storage. + /// Should match one of Azure Storage REST API versions. + /// + /// Default: Latest supported version + #[serde(default)] + pub api_version: Option, + + /// Optional connection string for Azure Storage account. + /// If provided, this will be used instead of `account_name` and `account_key`. + /// Format: "DefaultEndpointsProtocol=https;AccountName=...;AccountKey=...;EndpointSuffix=..." + /// + /// Default: None + #[serde(default)] + pub connection_string: Option, + + /// Disable http/2 connections and only use http/1.1. Default client + /// configuration will have http/1.1 and http/2 enabled for connection + /// schemes. Http/2 should be disabled if environments have poor support + /// or performance related to http/2. Safe to keep default unless + /// underlying network environment or S3 API servers specify otherwise. + /// + /// Default: false + #[serde(default)] + pub disable_http2: bool, +} + #[allow(non_camel_case_types)] #[derive(Serialize, Deserialize, Debug, Clone, Copy)] pub enum StoreType { diff --git a/nativelink-store/BUILD.bazel b/nativelink-store/BUILD.bazel index 4f7691e7d..6bc7a1c39 100644 --- a/nativelink-store/BUILD.bazel +++ b/nativelink-store/BUILD.bazel @@ -10,6 +10,7 @@ rust_library( name = "nativelink-store", srcs = [ "src/ac_utils.rs", + "src/azure_blob_store.rs", "src/cas_utils.rs", "src/completeness_checking_store.rs", "src/compression_store.rs", @@ -46,6 +47,9 @@ rust_library( "@crates//:aws-config", "@crates//:aws-sdk-s3", "@crates//:aws-smithy-runtime", + "@crates//:azure_core", + "@crates//:azure_storage", + "@crates//:azure_storage_blobs", "@crates//:bincode", "@crates//:blake3", "@crates//:byteorder", @@ -56,6 +60,7 @@ rust_library( "@crates//:fred", "@crates//:futures", "@crates//:hex", + "@crates//:http", "@crates//:http-body", "@crates//:hyper-0.14.31", "@crates//:hyper-rustls", @@ -79,6 +84,7 @@ rust_test_suite( timeout = "short", srcs = [ "tests/ac_utils_test.rs", + "tests/azure_blob_store_test.rs", "tests/completeness_checking_store_test.rs", "tests/compression_store_test.rs", "tests/dedup_store_test.rs", @@ -110,6 +116,9 @@ rust_test_suite( "@crates//:aws-smithy-runtime", "@crates//:aws-smithy-runtime-api", "@crates//:aws-smithy-types", + "@crates//:azure_core", + "@crates//:azure_storage", + "@crates//:azure_storage_blobs", "@crates//:bincode", "@crates//:bytes", "@crates//:filetime", diff --git a/nativelink-store/Cargo.toml b/nativelink-store/Cargo.toml index d646c53e6..2263957ca 100644 --- a/nativelink-store/Cargo.toml +++ b/nativelink-store/Cargo.toml @@ -18,6 +18,9 @@ aws-sdk-s3 = { version = "1.65.0", features = [ "rt-tokio", ], default-features = false } aws-smithy-runtime = { version = "1.7.4" } +azure_core = { version = "0.21.0", default-features = false, features = ["hmac_rust"] } +azure_storage = { version = "0.21.0", default-features = false, features = ["hmac_rust"] } +azure_storage_blobs = { version = "0.21.0", default-features = false, features = ["hmac_rust"] } bincode = "1.3.3" blake3 = { version = "1.5.5", default-features = false } byteorder = { version = "1.5.0", default-features = false } @@ -57,6 +60,7 @@ tokio-util = { version = "0.7.13" } tonic = { version = "0.12.3", features = ["transport", "tls"], default-features = false } tracing = { version = "0.1.41", default-features = false } uuid = { version = "1.11.0", default-features = false, features = ["v4", "serde"] } +http = "1.2.0" [dev-dependencies] nativelink-macro = { path = "../nativelink-macro" } diff --git a/nativelink-store/src/azure_blob_store.rs b/nativelink-store/src/azure_blob_store.rs new file mode 100644 index 000000000..4140c0996 --- /dev/null +++ b/nativelink-store/src/azure_blob_store.rs @@ -0,0 +1,843 @@ +// Copyright 2024 The NativeLink Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::borrow::Cow; +use std::cmp; +use std::collections::HashMap; +use std::fmt::{Debug, Formatter}; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::time::Duration; + +use async_trait::async_trait; +use azure_core::auth::Secret; +use azure_core::prelude::Range; +use azure_core::{Body, HttpClient, StatusCode, TransportOptions}; +use azure_storage::StorageCredentials; +use azure_storage_blobs::prelude::*; +use bytes::Bytes; +use futures::future::FusedFuture; +use futures::stream::{unfold, FuturesUnordered}; +use futures::{FutureExt, StreamExt, TryStreamExt}; +use http_body::{Frame, SizeHint}; +use hyper::client::connect::{Connected, Connection}; +use hyper::client::HttpConnector; +use hyper::service::Service; +use hyper::{Body as HyperBody, Client, Request as HyperRequest, Uri}; +use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder, MaybeHttpsStream}; +use nativelink_config::stores::AzureBlobSpec; +use nativelink_error::{make_err, Code, Error, ResultExt}; +use nativelink_metric::MetricsComponent; +use nativelink_util::buf_channel::{ + make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf, +}; +use nativelink_util::fs; +use nativelink_util::health_utils::{HealthStatus, HealthStatusIndicator}; +use nativelink_util::instant_wrapper::InstantWrapper; +use nativelink_util::retry::{Retrier, RetryResult}; +use nativelink_util::store_trait::{StoreDriver, StoreKey, UploadSizeInfo}; +use rand::rngs::OsRng; +use rand::Rng; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use tokio::net::TcpStream; +use tokio::sync::{mpsc, SemaphorePermit}; +use tokio::time::sleep; +use tokio_stream::Stream; +use tracing::{event, Level}; + +use crate::cas_utils::is_zero_digest; + +// Check the below doc for the limits specific to Azure. +// https://learn.microsoft.com/en-us/azure/storage/blobs/scalability-targets#scale-targets-for-blob-storage + +// Maximum number of blocks in a block blob or append blob +const MAX_BLOCKS: usize = 50_000; + +// Maximum size of a block in a block blob (4,000 MiB) +const MAX_BLOCK_SIZE: u64 = 4_000 * 1024 * 1024; // 4,000 MiB = 4 GiB + +// Default block size for uploads (4 MiB) +const DEFAULT_BLOCK_SIZE: u64 = 5 * 1024 * 1024; // 5 MiB + +// Default maximum retry buffer per request +const DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST: usize = 5 * 1024 * 1024; // 5 MiB + +// Default maximum number of concurrent uploads +const DEFAULT_MAX_CONCURRENT_UPLOADS: usize = 10; + +// Default idle timeout +const IDLE_TIMEOUT: Duration = Duration::from_secs(15); + +// Maximum number of idle connections per host +const MAX_IDLE_PER_HOST: usize = 32; + +pub struct AzureConnectionWithPermit { + connection: T, + _permit: SemaphorePermit<'static>, +} + +impl Connection for AzureConnectionWithPermit { + fn connected(&self) -> Connected { + self.connection.connected() + } +} + +impl AsyncRead for AzureConnectionWithPermit { + #[inline] + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + Pin::new(&mut Pin::get_mut(self).connection).poll_read(cx, buf) + } +} + +impl AsyncWrite for AzureConnectionWithPermit { + #[inline] + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut Pin::get_mut(self).connection).poll_write(cx, buf) + } + + #[inline] + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut Pin::get_mut(self).connection).poll_flush(cx) + } + + #[inline] + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut Pin::get_mut(self).connection).poll_shutdown(cx) + } +} + +// Created an AzureClient to handle the custom HttpsConnector. +// Unlike Amazon S3, Azure Blob Storage doesn't support +// specifying a custom http client in the Transport Options. +// So, we have specialized it to use a custom HttpsConnector. +#[derive(Clone)] +pub struct AzureClient { + client: Client>, + config: Arc, + retrier: Retrier, + connector: HttpsConnector, +} + +impl AzureClient { + pub fn new( + config: AzureBlobSpec, + jitter_fn: Arc Duration + Send + Sync>, + ) -> Result { + let connector = Self::build_connector(&config); + let client = Self::build_client(connector.clone()); + + Ok(Self { + client, + retrier: Retrier::new( + Arc::new(|duration| Box::pin(sleep(duration))), + jitter_fn, + config.retry.clone(), + ), + config: Arc::new(config), + connector, + }) + } + + pub fn get_connector(&self) -> &HttpsConnector { + &self.connector + } + + fn build_connector(config: &AzureBlobSpec) -> HttpsConnector { + let builder = HttpsConnectorBuilder::new().with_webpki_roots(); + + let builder_with_schemes = if config.insecure_allow_http { + builder.https_or_http() + } else { + builder.https_only() + }; + + if config.disable_http2 { + builder_with_schemes.enable_http1().build() + } else { + builder_with_schemes.enable_http1().enable_http2().build() + } + } + + fn build_client( + connector: HttpsConnector, + ) -> Client> { + Client::builder() + .pool_idle_timeout(IDLE_TIMEOUT) + .pool_max_idle_per_host(MAX_IDLE_PER_HOST) + .build(connector) + } + + async fn call_with_retry( + &self, + req: &Uri, + ) -> Result>, Error> { + let retry_stream_fn = unfold(self.connector.clone(), move |mut client| async move { + let _permit = fs::get_permit().await.unwrap(); + match client.call(req.clone()).await { + Ok(connection) => Some(( + RetryResult::Ok(AzureConnectionWithPermit { + connection, + _permit, + }), + client, + )), + Err(e) => Some(( + RetryResult::Retry(make_err!( + Code::Unavailable, + "Failed to call Azure Blob Storage: {e:?}" + )), + client, + )), + } + }); + self.retrier.retry(retry_stream_fn).await + } +} + +impl Debug for AzureClient { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AzureClient") + .field("config", &self.config) + .finish() + } +} + +#[async_trait::async_trait] +impl HttpClient for AzureClient { + async fn execute_request( + &self, + request: &azure_core::Request, + ) -> azure_core::Result { + let mut builder = HyperRequest::builder() + .method(request.method().as_ref()) + .uri(request.url().as_str()); + + // Copy headers from the original request + for (name, value) in request.headers().iter() { + builder = builder.header(name.as_str(), value.as_str()); + } + + // Copy the body + let body = match request.body() { + Body::Bytes(bytes) if bytes.is_empty() => HyperBody::empty(), + Body::Bytes(bytes) => HyperBody::from(bytes.to_vec()), + Body::SeekableStream(_) => { + return Err(azure_core::Error::new( + azure_core::error::ErrorKind::Other, + "Unsupported body type: SeekableStream", + )) + } + }; + + let hyper_request = builder + .body(body) + .map_err(|e| azure_core::Error::new(azure_core::error::ErrorKind::Other, e))?; + + let response = tokio::time::timeout(IDLE_TIMEOUT, self.client.request(hyper_request)) + .await + .map_err(|_| { + azure_core::Error::new(azure_core::error::ErrorKind::Other, "Request timeout") + })? + .map_err(|e| azure_core::Error::new(azure_core::error::ErrorKind::Other, e))?; + + let (parts, body) = response.into_parts(); + + let mapped_stream = body.map(|result| { + result.map_err(|e| azure_core::Error::new(azure_core::error::ErrorKind::Other, e)) + }); + + // Copy headers back to the original request + let headers: HashMap<_, _> = parts + .headers + .iter() + .filter_map(|(k, v)| { + Some(( + azure_core::headers::HeaderName::from(k.as_str().to_owned()), + azure_core::headers::HeaderValue::from(v.to_str().ok()?.to_owned()), + )) + }) + .collect(); + + Ok(azure_core::Response::new( + azure_core::StatusCode::try_from(parts.status.as_u16()).expect("Invalid status code"), + azure_core::headers::Headers::from(headers), + Box::pin(mapped_stream), + )) + } +} + +impl Service for AzureClient { + type Response = AzureConnectionWithPermit>; + type Error = Error; + type Future = + Pin> + Send + 'static>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.client + .poll_ready(cx) + .map_err(|e| make_err!(Code::Unavailable, "Failed poll in Azure Blob Storage: {e}")) + } + + fn call(&mut self, req: Uri) -> Self::Future { + let client_clone = self.clone(); + Box::pin(async move { client_clone.call_with_retry(&req).await }) + } +} + +pub struct AzureBodyWrapper { + reader: DropCloserReadHalf, + size: u64, +} + +impl http_body::Body for AzureBodyWrapper { + type Data = Bytes; + type Error = std::io::Error; + + fn poll_frame( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + let reader = Pin::new(&mut Pin::get_mut(self).reader); + reader + .poll_next(cx) + .map(|maybe_bytes_res| maybe_bytes_res.map(|res| res.map(Frame::data))) + } + + fn size_hint(&self) -> SizeHint { + SizeHint::with_exact(self.size) + } +} + +#[derive(MetricsComponent)] +pub struct AzureBlobStore { + client: Arc, + now_fn: NowFn, + #[metric(help = "The container name for the Azure store")] + container: String, + #[metric(help = "The blob prefix for the Azure store")] + blob_prefix: String, + retrier: Retrier, + #[metric(help = "The number of seconds to consider an object expired")] + consider_expired_after_s: i64, + #[metric(help = "The number of bytes to buffer for retrying requests")] + max_retry_buffer_per_request: usize, + #[metric(help = "The number of concurrent uploads allowed")] + max_concurrent_uploads: usize, +} + +impl AzureBlobStore +where + I: InstantWrapper, + NowFn: Fn() -> I + Send + Sync + Unpin + 'static, +{ + pub async fn new(spec: &AzureBlobSpec, now_fn: NowFn) -> Result, Error> { + // TODO: Should we use a common jitter function to minimize duplication? + let jitter_amt = spec.retry.jitter; + let jitter_fn = Arc::new(move |delay: Duration| { + if jitter_amt == 0. { + return delay; + } + let min = 1. - (jitter_amt / 2.); + let max = 1. + (jitter_amt / 2.); + delay.mul_f32(OsRng.gen_range(min..max)) + }); + + let http_client = Arc::new( + AzureClient::new(spec.clone(), jitter_fn.clone()) + .map_err(|e| make_err!(Code::Unavailable, "Failed to create Azure client: {e}"))?, + ); + let transport_options = TransportOptions::new(http_client); + + let storage_credentials = StorageCredentials::access_key( + spec.account_name.clone(), + Secret::new(spec.account_key.clone()), + ); + + let container_client = BlobServiceClient::builder(&spec.account_name, storage_credentials) + .transport(transport_options) + .container_client(&spec.container); + + Self::new_with_client_and_jitter(spec, container_client, jitter_fn, now_fn) + } + + pub fn new_with_client_and_jitter( + spec: &AzureBlobSpec, + client: ContainerClient, + jitter_fn: Arc Duration + Send + Sync>, + now_fn: NowFn, + ) -> Result, Error> { + Ok(Arc::new(Self { + client: Arc::new(client), + now_fn, + container: spec.container.to_string(), + blob_prefix: spec.blob_prefix.as_ref().unwrap_or(&String::new()).clone(), + retrier: Retrier::new( + Arc::new(|duration| Box::pin(sleep(duration))), + jitter_fn, + spec.retry.clone(), + ), + consider_expired_after_s: i64::from(spec.consider_expired_after_s), + max_retry_buffer_per_request: spec + .max_retry_buffer_per_request + .unwrap_or(DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST), + max_concurrent_uploads: spec + .max_concurrent_uploads + .map_or(DEFAULT_MAX_CONCURRENT_UPLOADS, |v| v), + })) + } + + fn make_blob_path(&self, key: &StoreKey<'_>) -> String { + format!("{}{}", self.blob_prefix, key.as_str()) + } + + async fn has(self: Pin<&Self>, digest: &StoreKey<'_>) -> Result, Error> { + let blob_path = self.make_blob_path(digest); + + self.retrier + .retry(unfold((), move |state| { + let blob_path = blob_path.clone(); + async move { + let result = self.client.blob_client(&blob_path).get_properties().await; + + match result { + Ok(props) => { + if self.consider_expired_after_s > 0 { + let last_modified = props.blob.properties.last_modified; + let now = (self.now_fn)().unix_timestamp() as i64; + if last_modified.unix_timestamp() + self.consider_expired_after_s + <= now + { + return Some((RetryResult::Ok(None), state)); + } + } + let blob_size = props.blob.properties.content_length; + Some((RetryResult::Ok(Some(blob_size)), state)) + } + Err(err) => { + if err + .as_http_error() + .is_some_and(|e| e.status() == StatusCode::NotFound) + { + Some((RetryResult::Ok(None), state)) + } else if err.to_string().contains("ContainerNotFound") { + Some(( + RetryResult::Err(make_err!( + Code::InvalidArgument, + "Container not found: {}", + err + )), + state, + )) + } else { + Some(( + RetryResult::Retry(make_err!( + Code::Unavailable, + "Failed to get blob properties: {:?}", + err + )), + state, + )) + } + } + } + } + })) + .await + } +} + +#[async_trait] +impl StoreDriver for AzureBlobStore +where + I: InstantWrapper, + NowFn: Fn() -> I + Send + Sync + Unpin + 'static, +{ + // TODO(Aman): This is also duplicated. Should we have a trait or a macro for this? + async fn has_with_results( + self: Pin<&Self>, + keys: &[StoreKey<'_>], + results: &mut [Option], + ) -> Result<(), Error> { + keys.iter() + .zip(results.iter_mut()) + .map(|(key, result)| async move { + if is_zero_digest(key.borrow()) { + *result = Some(0); + return Ok::<_, Error>(()); + } + *result = self.has(key).await?; + Ok::<_, Error>(()) + }) + .collect::>() + .try_collect() + .await + } + + async fn update( + self: Pin<&Self>, + digest: StoreKey<'_>, + mut reader: DropCloserReadHalf, + upload_size: UploadSizeInfo, + ) -> Result<(), Error> { + let blob_path = self.make_blob_path(&digest); + // Handling zero-sized content check + if let UploadSizeInfo::ExactSize(0) = upload_size { + return Ok(()); + } + + let max_size = match upload_size { + UploadSizeInfo::ExactSize(sz) | UploadSizeInfo::MaxSize(sz) => sz, + }; + + // For small files, using single block upload + if max_size < DEFAULT_BLOCK_SIZE && matches!(upload_size, UploadSizeInfo::ExactSize(_)) { + let UploadSizeInfo::ExactSize(sz) = upload_size else { + unreachable!("upload_size must be UploadSizeInfo::ExactSize here"); + }; + + reader.set_max_recent_data_size( + u64::try_from(self.max_retry_buffer_per_request) + .err_tip(|| "Could not convert max_retry_buffer_per_request to u64")?, + ); + + return self.retrier + .retry(unfold(reader, move |mut reader| { + let client = Arc::clone(&self.client); + let blob_client = client.blob_client(&blob_path); + async move { + let (mut tx, mut rx) = make_buf_channel_pair(); + + let result = { + let reader_ref = &mut reader; + let (upload_res, bind_res) = tokio::join!( + async { + let mut buffer = Vec::with_capacity(sz as usize); + while let Ok(Some(chunk)) = rx.try_next().await { + buffer.extend_from_slice(&chunk); + } + + blob_client + .put_block_blob(Body::from(buffer)) + .content_type("application/octet-stream") + .into_future() + .await + .map(|_| ()) + .map_err(|e| make_err!(Code::Aborted, "{:?}", e)) + }, + async { + tx.bind_buffered(reader_ref).await + } + ); + + upload_res + .and(bind_res) + .err_tip(|| "Failed to upload blob in single chunk") + }; + + match result { + Ok(()) => Some((RetryResult::Ok(()), reader)), + Err(mut err) => { + err.code = Code::Aborted; + let bytes_received = reader.get_bytes_received(); + + if let Err(try_reset_err) = reader.try_reset_stream() { + event!( + Level::ERROR, + ?bytes_received, + err = ?try_reset_err, + "Unable to reset stream after failed upload in AzureStore::update" + ); + Some((RetryResult::Err(err + .merge(try_reset_err) + .append(format!("Failed to retry upload with {bytes_received} bytes received in AzureStore::update"))), + reader)) + } else { + let err = err.append(format!("Retry on upload happened with {bytes_received} bytes received in AzureStore::update")); + event!( + Level::INFO, + ?err, + ?bytes_received, + "Retryable Azure error" + ); + Some((RetryResult::Retry(err), reader)) + } + } + } + } + })) + .await; + } + + // For larger files, we'll use block upload strategy + let block_size = + cmp::min(max_size / (MAX_BLOCKS as u64 - 1), MAX_BLOCK_SIZE).max(DEFAULT_BLOCK_SIZE); + + let (tx, mut rx) = mpsc::channel(self.max_concurrent_uploads); + let mut block_ids = Vec::with_capacity(MAX_BLOCKS); + let retrier = self.retrier.clone(); + + let read_stream_fut = { + let tx = tx.clone(); + let blob_path = blob_path.clone(); + async move { + for block_id in 0..MAX_BLOCKS { + let write_buf = reader + .consume(Some( + usize::try_from(block_size) + .err_tip(|| "Could not convert block_size to usize")?, + )) + .await + .err_tip(|| "Failed to read chunk in azure_store")?; + + if write_buf.is_empty() { + break; + } + + let block_id = format!("{block_id:032}"); + let blob_path = blob_path.clone(); + + tx.send(async move { + self.retrier + .retry(unfold( + (write_buf, block_id.clone()), + move |(write_buf, block_id)| { + let client = Arc::clone(&self.client); + let blob_client = client.blob_client(&blob_path); + async move { + let retry_result = blob_client + .put_block( + block_id.clone(), + Body::from(write_buf.clone()), + ) + .into_future() + .await + .map_or_else( + |e| { + RetryResult::Retry(make_err!( + Code::Aborted, + "Failed to upload block {} in Azure store: {:?}", + block_id, + e + )) + }, + |_| RetryResult::Ok(block_id.clone()), + ); + Some((retry_result, (write_buf, block_id))) + } + }, + )) + .await + }) + .await + .map_err(|_| make_err!(Code::Internal, "Failed to send block to channel"))?; + } + Ok::<_, Error>(()) + } + .fuse() + }; + + let mut upload_futures = FuturesUnordered::new(); + + tokio::pin!(read_stream_fut); + + loop { + if read_stream_fut.is_terminated() && rx.is_empty() && upload_futures.is_empty() { + break; + } + tokio::select! { + result = &mut read_stream_fut => result?, + Some(block_id) = upload_futures.next() => block_ids.push(block_id?), + Some(fut) = rx.recv() => upload_futures.push(fut), + } + } + + // Sorting block IDs to ensure consistent ordering + block_ids.sort_unstable(); + + // Commit the block list + let block_list = BlockList { + blocks: block_ids + .into_iter() + .map(|id| BlobBlockType::Latest(BlockId::from(id))) + .collect(), + }; + + retrier + .retry(unfold(block_list, move |block_list| { + let client = Arc::clone(&self.client); + let blob_client = client.blob_client(&blob_path); + + async move { + Some(( + blob_client + .put_block_list(block_list.clone()) + .content_type("application/octet-stream") + .into_future() + .await + .map_or_else( + |e| { + RetryResult::Retry(make_err!( + Code::Aborted, + "Failed to commit block list in Azure store: {e:?}" + )) + }, + |_| RetryResult::Ok(()), + ), + block_list, + )) + } + })) + .await + } + + async fn get_part( + self: Pin<&Self>, + key: StoreKey<'_>, + writer: &mut DropCloserWriteHalf, + offset: u64, + length: Option, + ) -> Result<(), Error> { + if is_zero_digest(key.borrow()) { + writer + .send_eof() + .err_tip(|| "Failed to send zero EOF in azure store get_part")?; + return Ok(()); + } + + let blob_path = self.make_blob_path(&key); + + let client = Arc::clone(&self.client); + let blob_client = client.blob_client(&blob_path); + let range = match length { + Some(len) => Range::new(offset, offset + len - 1), + None => Range::from(offset..), + }; + + self.retrier + .retry(unfold(writer, move |writer| { + let range_clone = range.clone(); + let blob_client = blob_client.clone(); + async move { + let result = async { + let mut stream = blob_client.get().range(range_clone.clone()).into_stream(); + + while let Some(chunk_result) = stream.next().await { + match chunk_result { + Ok(response) => { + let data = response.data.collect().await.map_err(|e| { + make_err!( + Code::Aborted, + "Failed to collect response data: {:?}", + e + ) + })?; + if data.is_empty() { + continue; + } + writer.send(data).await.map_err(|e| { + make_err!( + Code::Aborted, + "Failed to send data to writer: {:?}", + e + ) + })?; + } + Err(e) => { + return match e { + e if e + .as_http_error() + .map(|e| e.status() == StatusCode::NotFound) + .unwrap_or_default() => + { + Err(make_err!( + Code::NotFound, + "Blob not found in Azure: {:?}", + e + )) + } + _ => Err(make_err!( + Code::Aborted, + "Error reading from Azure stream: {:?}", + e + )), + }; + } + } + } + + writer.send_eof().map_err(|e| { + make_err!(Code::Aborted, "Failed to send EOF to writer: {:?}", e) + })?; + Ok(()) + } + .await; + + match result { + Ok(()) => Some((RetryResult::Ok(()), writer)), + Err(e) => { + if e.code == Code::NotFound { + Some((RetryResult::Err(e), writer)) + } else { + Some((RetryResult::Retry(e), writer)) + } + } + } + } + })) + .await + } + + fn inner_store(&self, _digest: Option) -> &'_ dyn StoreDriver { + self + } + + fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) { + self + } + + fn as_any_arc(self: Arc) -> Arc { + self + } +} + +#[async_trait] +impl HealthStatusIndicator for AzureBlobStore +where + I: InstantWrapper, + NowFn: Fn() -> I + Send + Sync + Unpin + 'static, +{ + fn get_name(&self) -> &'static str { + "AzureBlobStore" + } + + async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus { + StoreDriver::check_health(Pin::new(self), namespace).await + } +} diff --git a/nativelink-store/src/default_store_factory.rs b/nativelink-store/src/default_store_factory.rs index 506ef6752..0947d67c2 100644 --- a/nativelink-store/src/default_store_factory.rs +++ b/nativelink-store/src/default_store_factory.rs @@ -23,6 +23,7 @@ use nativelink_error::Error; use nativelink_util::health_utils::HealthRegistryBuilder; use nativelink_util::store_trait::{Store, StoreDriver}; +use crate::azure_blob_store::AzureBlobStore; use crate::completeness_checking_store::CompletenessCheckingStore; use crate::compression_store::CompressionStore; use crate::dedup_store::DedupStore; @@ -50,6 +51,9 @@ pub fn store_factory<'a>( Box::pin(async move { let store: Arc = match backend { StoreSpec::memory(spec) => MemoryStore::new(spec), + StoreSpec::experimental_azure_blob_store(spec) => { + AzureBlobStore::new(spec, SystemTime::now).await? + } StoreSpec::experimental_s3_store(spec) => S3Store::new(spec, SystemTime::now).await?, StoreSpec::redis_store(spec) => RedisStore::new(spec.clone())?, StoreSpec::verify(spec) => VerifyStore::new( diff --git a/nativelink-store/src/lib.rs b/nativelink-store/src/lib.rs index 04040fa5b..579f4e09d 100644 --- a/nativelink-store/src/lib.rs +++ b/nativelink-store/src/lib.rs @@ -13,6 +13,7 @@ // limitations under the License. pub mod ac_utils; +pub mod azure_blob_store; pub mod cas_utils; pub mod completeness_checking_store; pub mod compression_store; diff --git a/nativelink-store/tests/azure_blob_store_test.rs b/nativelink-store/tests/azure_blob_store_test.rs new file mode 100644 index 000000000..5cf0e1481 --- /dev/null +++ b/nativelink-store/tests/azure_blob_store_test.rs @@ -0,0 +1,658 @@ +use std::fmt::{Debug, Formatter}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use azure_core::{base64, HttpClient, StatusCode, TransportOptions}; +use azure_storage::StorageCredentials; +use azure_storage_blobs::prelude::*; +use base64::encode; +use bytes::{BufMut, BytesMut}; +use nativelink_config::stores::AzureBlobSpec; +use nativelink_error::{Error, ResultExt}; +use nativelink_macro::nativelink_test; +use nativelink_store::azure_blob_store::AzureBlobStore; +use nativelink_util::buf_channel::make_buf_channel_pair; +use nativelink_util::common::DigestInfo; +use nativelink_util::instant_wrapper::MockInstantWrapped; +use nativelink_util::origin_context::OriginContext; +use nativelink_util::store_trait::{StoreKey, StoreLike, UploadSizeInfo}; +use sha2::{Digest, Sha256}; + +// Test constants +const TEST_CONTAINER: &str = "test-container"; +const TEST_ACCOUNT: &str = "testaccount"; +const TEST_KEY: &str = "dGVzdGtleQ=="; // base64 encoded "testkey" +const TEST_HASH: &str = "0000000000000000000000000000000000000000000000000000000000000000"; +const TEST_SIZE: u64 = 100; +type AzureBlobStoreTest = Arc MockInstantWrapped>>; + +/// Test utilities +mod test_utils { + use super::*; + + #[derive(Debug)] + pub struct TestResponse { + pub status: StatusCode, + pub headers: Vec<(&'static str, String)>, + pub body: Vec, + } + + impl TestResponse { + pub fn ok() -> Self { + Self { + status: StatusCode::Ok, + // A sample set of headers. + headers: vec![ + ("content-length", "0".to_string()), + ("last-modified", "Thu, 01 Jan 1970 00:00:00 GMT".to_string()), + ("etag", "\"test-etag\"".to_string()), + ( + "x-ms-creation-time", + "Thu, 01 Jan 1970 00:00:00 GMT".to_string(), + ), + ("x-ms-lease-status", "unlocked".to_string()), + ("x-ms-lease-state", "available".to_string()), + ("x-ms-blob-type", "BlockBlob".to_string()), + ("x-ms-server-encrypted", "true".to_string()), + ("x-ms-request-server-encrypted", "true".to_string()), + ("x-ms-blob-committed-block-count", "0".to_string()), + ("x-ms-blob-content-encoding", "utf-8".to_string()), + ( + "x-ms-request-id", + "00000000-0000-0000-0000-000000000000".to_string(), + ), + ("x-ms-version", "2020-04-08".to_string()), + ("date", "Thu, 01 Jan 1970 00:00:00 GMT".to_string()), + ], + body: vec![], + } + } + + pub fn error(status: StatusCode) -> Self { + Self { + status, + headers: vec![], + body: vec![], + } + } + + pub fn not_found() -> Self { + Self::error(StatusCode::NotFound) + } + + pub fn with_content_length(mut self, length: usize) -> Self { + if let Some(header) = self + .headers + .iter_mut() + .find(|(key, _)| *key == "content-length") + { + header.1 = length.to_string(); + } else { + self.headers.push(("content-length", length.to_string())); + } + self + } + + pub fn with_body(mut self, body: Vec) -> Self { + self.body = body; + self + } + } + + #[derive(Debug)] + pub struct TestRequest { + pub method: &'static str, + pub url_pattern: String, + pub response: TestResponse, + } + + impl TestRequest { + pub fn new(method: &'static str, url_pattern: String, response: TestResponse) -> Self { + Self { + method, + url_pattern, + response, + } + } + + pub fn head(digest: &str, size: u64, response: TestResponse) -> Self { + Self::new( + "HEAD", + format!("{TEST_CONTAINER}/{digest}-{size}"), + response, + ) + } + + pub fn get(digest: &str, size: u64, response: TestResponse) -> Self { + Self::new("GET", format!("{TEST_CONTAINER}/{digest}-{size}"), response) + } + + pub fn put(digest: &str, size: u64, response: TestResponse) -> Self { + Self::new("PUT", format!("{TEST_CONTAINER}/{digest}-{size}"), response) + } + } + + #[derive(Clone)] + pub struct MockAzureClient { + expected_requests: Arc>, + current_request: Arc, + request_log: Arc>>, + } + + impl Debug for MockAzureClient { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MockAzureClient") + .field("current_request", &self.current_request) + .field("request_count", &self.request_log.lock().unwrap().len()) + .finish() + } + } + + impl MockAzureClient { + pub fn new(requests: Vec) -> Self { + Self { + expected_requests: Arc::new(requests), + current_request: Arc::new(AtomicUsize::new(0)), + request_log: Arc::new(Mutex::new(Vec::new())), + } + } + + pub fn request_count(&self) -> usize { + self.request_log.lock().unwrap().len() + } + + pub fn single_head_request(response: TestResponse) -> Self { + Self::new(vec![TestRequest::head(TEST_HASH, TEST_SIZE, response)]) + } + } + + #[async_trait::async_trait] + impl HttpClient for MockAzureClient { + async fn execute_request( + &self, + request: &azure_core::Request, + ) -> azure_core::Result { + self.request_log.lock().unwrap().push(request.clone()); + + let index = self.current_request.fetch_add(1, Ordering::SeqCst); + let expected = &self.expected_requests[index]; + + if request.method().to_string() != expected.method + || !request.url().as_str().contains(&expected.url_pattern) + { + return Err(azure_core::Error::new( + azure_core::error::ErrorKind::Other, + format!("Unexpected request: {} {}", request.method(), request.url()), + )); + } + + let mut headers = azure_core::headers::Headers::new(); + for (key, value) in &expected.response.headers { + headers.insert(*key, value); + } + + let body = expected.response.body.clone(); + Ok(azure_core::Response::new( + expected.response.status, + headers, + Box::pin(futures::stream::once(async move { + Ok(hyper::body::Bytes::from(body)) + })), + )) + } + } + + pub fn create_test_store( + client: MockAzureClient, + retry_config: Option, + consider_expired_after_s: Option, + ) -> Result { + let transport = TransportOptions::new(Arc::new(client) as Arc); + let credentials = StorageCredentials::access_key( + TEST_ACCOUNT.to_string(), + azure_core::auth::Secret::new(TEST_KEY.to_string()), + ); + + let container_client = BlobServiceClient::builder(TEST_ACCOUNT, credentials) + .transport(transport) + .container_client(TEST_CONTAINER); + + let spec = AzureBlobSpec { + account_name: TEST_ACCOUNT.to_string(), + account_key: TEST_KEY.to_string(), + container: TEST_CONTAINER.to_string(), + retry: retry_config.unwrap_or_default(), + consider_expired_after_s: consider_expired_after_s.unwrap_or(0), + ..Default::default() + }; + + AzureBlobStore::new_with_client_and_jitter( + &spec, + container_client, + Arc::new(|_| Duration::from_secs(0)), + MockInstantWrapped::default, + ) + } + + pub fn create_test_digest() -> Result { + DigestInfo::try_new(TEST_HASH, TEST_SIZE) + } +} + +use test_utils::*; + +#[nativelink_test] +async fn test_has_object_found() -> Result<(), Error> { + let client = MockAzureClient::single_head_request(TestResponse::ok().with_content_length(512)); + let store = create_test_store(client.clone(), None, None)?; + + let result = store.has(create_test_digest()?).await?; + + assert_eq!(result, Some(512)); + assert_eq!(client.request_count(), 1); + Ok(()) +} + +#[nativelink_test] +async fn test_has_object_not_found() -> Result<(), Error> { + let client = MockAzureClient::single_head_request(TestResponse::not_found()); + let store = create_test_store(client.clone(), None, None)?; + + let result = store.has(create_test_digest()?).await?; + + assert_eq!(result, None); + assert_eq!(client.request_count(), 1); + Ok(()) +} + +#[nativelink_test] +async fn test_has_with_retries() -> Result<(), Error> { + let client = MockAzureClient::new(vec![ + TestRequest::head( + TEST_HASH, + TEST_SIZE, + TestResponse::error(StatusCode::InternalServerError), + ), + TestRequest::head( + TEST_HASH, + TEST_SIZE, + TestResponse::ok().with_content_length(111), + ), + ]); + + let retry_config = nativelink_config::stores::Retry { + max_retries: 3, + delay: 0.0, + jitter: 0.0, + ..Default::default() + }; + + let store = create_test_store(client.clone(), Some(retry_config), None)?; + let result = store.has(create_test_digest()?).await?; + + assert_eq!(result, Some(111)); + assert_eq!(client.request_count(), 2); + Ok(()) +} + +#[nativelink_test] +async fn test_get() -> Result<(), Error> { + const VALUE: &str = "test_content"; + + let client = MockAzureClient::new(vec![TestRequest::get( + TEST_HASH, + TEST_SIZE, + TestResponse::ok().with_body(VALUE.as_bytes().to_vec()), + )]); + + let store = create_test_store(client.clone(), None, None)?; + let result = store + .get_part_unchunked(create_test_digest()?, 0, None) + .await?; + + assert_eq!(result, VALUE.as_bytes()); + assert_eq!(client.request_count(), 1); + Ok(()) +} + +#[nativelink_test] +async fn test_get_with_retries() -> Result<(), Error> { + const VALUE: &str = "test_content"; + + let client = MockAzureClient::new(vec![ + TestRequest::get( + TEST_HASH, + TEST_SIZE, + TestResponse::error(StatusCode::InternalServerError), + ), + TestRequest::get( + TEST_HASH, + TEST_SIZE, + TestResponse::error(StatusCode::ServiceUnavailable), + ), + TestRequest::get( + TEST_HASH, + TEST_SIZE, + TestResponse::error(StatusCode::Conflict), + ), + TestRequest::get( + TEST_HASH, + TEST_SIZE, + TestResponse::ok().with_body(VALUE.as_bytes().to_vec()), + ), + ]); + + let retry_config = nativelink_config::stores::Retry { + max_retries: 1024, + delay: 0.0, + jitter: 0.0, + ..Default::default() + }; + + let store = create_test_store(client.clone(), Some(retry_config), None)?; + let result = store + .get_part_unchunked(create_test_digest()?, 0, None) + .await?; + + assert_eq!(result, VALUE.as_bytes()); + assert_eq!(client.request_count(), 4); + Ok(()) +} + +#[nativelink_test] +async fn test_update_small_file() -> Result<(), Error> { + const CONTENT_LENGTH: usize = 1024; // Small enough for single block upload (<5MB). + let mut send_data = BytesMut::new(); + for i in 0..CONTENT_LENGTH { + send_data.put_u8(((i % 93) + 33) as u8); + } + let send_data = send_data.freeze(); + + let client = MockAzureClient::new(vec![TestRequest::put( + TEST_HASH, + TEST_SIZE, + TestResponse::ok(), + )]); + + let store = create_test_store(client.clone(), None, None)?; + let (mut tx, rx) = make_buf_channel_pair(); + + // Starting the update futures + let update_fut = store.update( + create_test_digest()?, + rx, + UploadSizeInfo::ExactSize(CONTENT_LENGTH as u64), + ); + + // Sending the data in smaller chunks to test streaming + let send_data_copy = send_data.clone(); + let send_fut = Box::pin(async move { + const CHUNK_SIZE: usize = 256; + for chunk in send_data_copy.chunks(CHUNK_SIZE) { + tx.send(hyper::body::Bytes::copy_from_slice(chunk)).await?; + } + tx.send_eof() + }); + + // Waiting for both futures to complete + let (update_result, send_result) = tokio::join!(update_fut, send_fut); + + update_result?; + send_result?; + assert_eq!(client.request_count(), 1); + Ok(()) +} + +#[nativelink_test] +async fn test_update_zero_size() -> Result<(), Error> { + let client = MockAzureClient::new(vec![]); // Should not make any requests + let store = create_test_store(client.clone(), None, None)?; + let (mut tx, rx) = make_buf_channel_pair(); + + let update_fut = store.update(create_test_digest()?, rx, UploadSizeInfo::ExactSize(0)); + + let send_fut = async move { tx.send_eof() }; + + let (update_result, send_result) = tokio::join!(update_fut, send_fut); + + update_result?; + send_result?; + assert_eq!( + client.request_count(), + 0, + "Zero-size upload should not make any requests" + ); + Ok(()) +} + +#[nativelink_test] +async fn test_update_with_retries() -> Result<(), Error> { + const CONTENT_LENGTH: usize = 512; + + let mut send_data = BytesMut::new(); + for i in 0..CONTENT_LENGTH { + send_data.put_u8(((i % 93) + 33) as u8); + } + let send_data = send_data.freeze(); + + let client = MockAzureClient::new(vec![ + TestRequest::put( + TEST_HASH, + TEST_SIZE, + TestResponse::error(StatusCode::InternalServerError), + ), + TestRequest::put(TEST_HASH, TEST_SIZE, TestResponse::ok()), + ]); + + let retry_config = nativelink_config::stores::Retry { + max_retries: 3, + delay: 0.0, + jitter: 0.0, + ..Default::default() + }; + + let store = create_test_store(client.clone(), Some(retry_config), None)?; + let (mut tx, rx) = make_buf_channel_pair(); + + let update_fut = store.update( + create_test_digest()?, + rx, + UploadSizeInfo::ExactSize(CONTENT_LENGTH as u64), + ); + + let send_data_copy = send_data.clone(); + let send_fut = async move { + tx.send(send_data_copy).await?; + tx.send_eof() + }; + + let (update_result, send_result) = tokio::join!(update_fut, send_fut); + + update_result?; + send_result?; + assert_eq!(client.request_count(), 2); + Ok(()) +} + +#[nativelink_test] +async fn test_multipart_upload_large_file() -> Result<(), Error> { + const MIN_BLOCK_SIZE: usize = 5 * 1024 * 1024; // 5MB + const TOTAL_SIZE: usize = MIN_BLOCK_SIZE * 2 + 50; + const DIGEST_STR: &str = TEST_HASH; + + let mut send_data = Vec::with_capacity(TOTAL_SIZE); + for i in 0..TOTAL_SIZE { + send_data.push(((i * 3) % 256) as u8); + } + + // Generate and manually URL-encode Base64-encoded block IDs + let block_ids: Vec = (0..3) + .map(|i| { + let block_id = format!("{i:032}"); + let base64_encoded = encode(block_id); + base64_encoded + .replace('=', "%3D") + .replace('+', "%2B") + .replace('/', "%2F") + }) + .collect(); + + let client = MockAzureClient::new(vec![ + TestRequest::new( + "PUT", + format!( + "{TEST_CONTAINER}/{DIGEST_STR}-{TOTAL_SIZE}?blockid={}&comp=block", + block_ids[0] + ), + TestResponse::ok(), + ), + TestRequest::new( + "PUT", + format!( + "{TEST_CONTAINER}/{DIGEST_STR}-{TOTAL_SIZE}?blockid={}&comp=block", + block_ids[1] + ), + TestResponse::ok(), + ), + TestRequest::new( + "PUT", + format!( + "{TEST_CONTAINER}/{DIGEST_STR}-{TOTAL_SIZE}?blockid={}&comp=block", + block_ids[2] + ), + TestResponse::ok(), + ), + TestRequest::new( + "PUT", + format!("{TEST_CONTAINER}/{DIGEST_STR}-{TOTAL_SIZE}?comp=blocklist"), + TestResponse::ok(), + ), + ]); + + let store = create_test_store(client.clone(), None, None)?; + let digest = DigestInfo::try_new(DIGEST_STR, TOTAL_SIZE)?; + + let _origin_guard = OriginContext::new(); + + let store_key: StoreKey = StoreKey::from(&digest); + store.update_oneshot(store_key, send_data.into()).await?; + assert_eq!(client.request_count(), 4); + Ok(()) +} + +#[nativelink_test] +async fn test_get_part_zero_digest() -> Result<(), Error> { + let digest = DigestInfo::new(Sha256::new().finalize().into(), 0); + let client = MockAzureClient::new(vec![]); + let store = Arc::new(create_test_store(client.clone(), None, None)?); + let (mut writer, mut reader) = make_buf_channel_pair(); + + let _origin_guard = OriginContext::new(); + tokio::task::yield_now().await; + + // Running both operations concurrently using join! + let (get_result, file_data) = tokio::join!( + store.get_part(digest, &mut writer, 0, None), + reader.consume(Some(1024)) + ); + + get_result?; + let file_data = file_data.err_tip(|| "Error reading bytes")?; + + assert_eq!(file_data.len(), 0, "Expected empty file content"); + assert_eq!( + client.request_count(), + 0, + "Expected no requests for zero digest" + ); + Ok(()) +} +#[nativelink_test] +async fn test_has_with_results_zero_digests() -> Result<(), Error> { + let digest = DigestInfo::new(Sha256::new().finalize().into(), 0); + let store_key: StoreKey = StoreKey::from(&digest); + let keys = vec![store_key]; + let mut results = vec![None]; + + let client = MockAzureClient::new(vec![]); // Should make no requests + let store = create_test_store(client.clone(), None, None)?; + + let origin_guard = OriginContext::new(); + tokio::task::yield_now().await; + + store.has_with_results(&keys, &mut results).await?; + + assert_eq!(results, vec![Some(0)]); + assert_eq!( + client.request_count(), + 0, + "Expected no requests for zero digest" + ); + drop(origin_guard); + Ok(()) +} + +#[nativelink_test] +async fn test_has_with_expired_result() -> Result<(), Error> { + const CONTENT_SIZE: usize = 10; + use mock_instant::thread_local::MockClock; + + let client = MockAzureClient::new(vec![ + TestRequest::head( + TEST_HASH, + CONTENT_SIZE as u64, + TestResponse::ok().with_content_length(512), + ), + TestRequest::head( + TEST_HASH, + CONTENT_SIZE as u64, + TestResponse::ok().with_content_length(512), + ), + ]); + + let store = create_test_store( + client.clone(), + Some(nativelink_config::stores::Retry { + max_retries: 1, + delay: 0.0, + jitter: 0.0, + ..Default::default() + }), + Some(2 * 24 * 60 * 60), + )?; + + // Time starts at 1970-01-01 00:00:00 + let digest = DigestInfo::try_new(TEST_HASH, CONTENT_SIZE)?; + + // Check at 1 day + { + MockClock::advance(Duration::from_secs(24 * 60 * 60)); + let mut results = vec![None]; + store + .has_with_results(&[digest.into()], &mut results) + .await?; + assert_eq!( + results, + vec![Some(512)], + "Should find non-expired content after 1 day" + ); + } + + // Check at 3 days (expired) + { + MockClock::advance(Duration::from_secs(3 * 24 * 60 * 60)); + let mut results = vec![None]; + store + .has_with_results(&[digest.into()], &mut results) + .await?; + assert_eq!( + results, + vec![None], + "Should not find expired content after 3 days" + ); + } + + assert_eq!(client.request_count(), 2); + Ok(()) +}