diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 1e0339a8..92dd5958 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -52,8 +52,7 @@ jobs: fail-fast: true matrix: os: [ubuntu-latest, windows-latest, macOS-latest] - features: ["", "s3"] - version: [stable, nightly, "1.74"] + version: [stable, nightly, "1.79"] steps: - uses: actions/checkout@v4 @@ -61,8 +60,6 @@ jobs: with: toolchain: ${{ matrix.version }} - uses: Swatinem/rust-cache@v2 - with: - key: ${{ matrix.features }} - name: Show version run: | rustup show @@ -70,12 +67,10 @@ jobs: rustc --version - name: Build run: > - cargo build --all-targets --no-default-features --features=${{ - matrix.features }} --features fail/failpoints + cargo build --all-targets --features fail/failpoints - name: Test run: > - cargo test --no-default-features --features=${{ matrix.features }} - --features fail/failpoints -- --include-ignored + cargo test --features fail/failpoints -- --include-ignored # Run rustfmt separately so that it does not block test results rustfmt: @@ -92,7 +87,7 @@ jobs: # S3 integration tests can't run from CI because they need credentials, but we # can at least make sure that they compile. - build-large-tests: + build-all-features: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 @@ -101,22 +96,9 @@ jobs: toolchain: stable - uses: swatinem/rust-cache@v2 - name: cargo build - run: cargo build --all-targets --features s3,s3-integration-test + run: cargo build --all-targets --all-features clippy: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - uses: dtolnay/rust-toolchain@master - with: - toolchain: stable - - uses: swatinem/rust-cache@v2 - - name: clippy - run: cargo clippy --all-targets -- --deny clippy::all - - # Not all features can be run: s3 integration tests require credentials. - # But all of them should compile. - check-all-features: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 @@ -149,8 +131,8 @@ jobs: tool: cargo-mutants - name: Mutants in diff run: > - cargo mutants --no-shuffle -vV --in-diff git.diff --in-place -- --features - fail/failpoints + cargo mutants --no-shuffle -vV --in-diff git.diff --in-place -- + --features fail/failpoints - name: Archive mutants.out uses: actions/upload-artifact@v4 if: always() @@ -165,7 +147,7 @@ jobs: # We want to see all the missed mutants so don't fail fast. fail-fast: false matrix: - shard: [0, 1, 2, 3] + shard: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] steps: - name: Checkout uses: actions/checkout@v4 @@ -181,11 +163,10 @@ jobs: # Don't use the S3 features because they require AWS credentials for realistic # testing. run: | - cargo mutants --no-shuffle -vV --cargo-arg=--no-default-features \ - --in-place \ - --baseline=skip --shard ${{ matrix.shard }}/4 \ - -- \ - --features fail/failpoints + cargo mutants --no-shuffle -vV --in-place --baseline=skip \ + --shard ${{ matrix.shard }}/10 \ + -- \ + --features fail/failpoints - name: Archive results uses: actions/upload-artifact@v4 if: always() diff --git a/Cargo.lock b/Cargo.lock index 71d82b5d..f7a36a25 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -24,7 +24,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ "cfg-if", - "getrandom", "once_cell", "version_check", "zerocopy", @@ -188,9 +187,9 @@ dependencies = [ [[package]] name = "aws-credential-types" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e16838e6c9e12125face1c1eff1343c75e3ff540de98ff7ebd61874a89bcfeb9" +checksum = "60e8f6b615cb5fc60a98132268508ad104310f0cfb25a1c22eee76efdf9154da" dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api", @@ -200,15 +199,16 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.2.0" +version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4963ac9ff2d33a4231b3806c1c69f578f221a9cabb89ad2bde62ce2b442c8a7" +checksum = "a10d5c055aa540164d9561a0e2e74ad30f0dcf7393c3a92f6733ddf9c5762468" dependencies = [ "aws-credential-types", "aws-sigv4", "aws-smithy-async", "aws-smithy-eventstream", "aws-smithy-http", + "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", "aws-types", @@ -216,6 +216,7 @@ dependencies = [ "fastrand", "http 0.2.12", "http-body 0.4.6", + "once_cell", "percent-encoding", "pin-project-lite", "tracing", @@ -224,9 +225,9 @@ dependencies = [ [[package]] name = "aws-sdk-s3" -version = "1.23.0" +version = "1.56.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4576ae7eb91e4d0ca76a3b443c3be979322fc01836cad7908534ae507fa41d99" +checksum = "cecd672c8d4265fd4fbecacd4a479180e616881bbe639250cf81ddb604e4c301" dependencies = [ "ahash", "aws-credential-types", @@ -326,9 +327,9 @@ dependencies = [ [[package]] name = "aws-sigv4" -version = "1.2.0" +version = "1.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11d6f29688a4be9895c0ba8bef861ad0c0dac5c15e9618b9b7a6c233990fc263" +checksum = "cc8db6904450bafe7473c6ca9123f88cc11089e41a025408f992db4e22d3be68" dependencies = [ "aws-credential-types", "aws-smithy-eventstream", @@ -366,9 +367,9 @@ dependencies = [ [[package]] name = "aws-smithy-checksums" -version = "0.60.7" +version = "0.60.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83fa43bc04a6b2441968faeab56e68da3812f978a670a5db32accbdcafddd12f" +checksum = "598b1689d001c4d4dc3cb386adb07d37786783aee3ac4b324bcadac116bf3d23" dependencies = [ "aws-smithy-http", "aws-smithy-types", @@ -387,9 +388,9 @@ dependencies = [ [[package]] name = "aws-smithy-eventstream" -version = "0.60.4" +version = "0.60.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6363078f927f612b970edf9d1903ef5cef9a64d1e8423525ebb1f0a1633c858" +checksum = "cef7d0a272725f87e51ba2bf89f8c21e4df61b9e49ae1ac367a6d69916ef7c90" dependencies = [ "aws-smithy-types", "bytes", @@ -398,9 +399,9 @@ dependencies = [ [[package]] name = "aws-smithy-http" -version = "0.60.7" +version = "0.60.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f10fa66956f01540051b0aa7ad54574640f748f9839e843442d99b970d3aff9" +checksum = "5c8bc3e8fdc6b8d07d976e301c02fe553f72a39b7a9fea820e023268467d7ab6" dependencies = [ "aws-smithy-eventstream", "aws-smithy-runtime-api", @@ -438,9 +439,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.3.0" +version = "1.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de34bcfa1fb3c82a80e252a753db34a6658e07f23d3a5b3fc96919518fa7a3f5" +checksum = "a065c0fe6fdbdf9f11817eb68582b2ab4aff9e9c39e986ae48f7ec576c6322db" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -452,6 +453,7 @@ dependencies = [ "http 0.2.12", "http-body 0.4.6", "http-body 1.0.0", + "httparse", "hyper", "hyper-rustls", "once_cell", @@ -464,9 +466,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "1.4.0" +version = "1.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4cc56a5c96ec741de6c5e6bf1ce6948be969d6506dfa9c39cffc284e31e4979b" +checksum = "e086682a53d3aa241192aa110fa8dfce98f2f5ac2ead0de84d41582c7e8fdb96" dependencies = [ "aws-smithy-async", "aws-smithy-types", @@ -481,9 +483,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.1.8" +version = "1.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "abe14dceea1e70101d38fbf2a99e6a34159477c0fb95e68e05c66bd7ae4c3729" +checksum = "147100a7bea70fa20ef224a6bad700358305f5dc0f84649c53769761395b355b" dependencies = [ "base64-simd", "bytes", @@ -507,24 +509,23 @@ dependencies = [ [[package]] name = "aws-smithy-xml" -version = "0.60.7" +version = "0.60.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "872c68cf019c0e4afc5de7753c4f7288ce4b71663212771bf5e4542eb9346ca9" +checksum = "ab0b0166827aa700d3dc519f72f8b3a91c35d0b8d042dc5d643a91e6f80648fc" dependencies = [ "xmlparser", ] [[package]] name = "aws-types" -version = "1.2.0" +version = "1.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a43b56df2c529fe44cb4d92bd64d0479883fb9608ff62daede4df5405381814" +checksum = "5221b91b3e441e6675310829fd8984801b772cb1546ef6c0e54dec9f1ac13fef" dependencies = [ "aws-credential-types", "aws-smithy-async", "aws-smithy-runtime-api", "aws-smithy-types", - "http 0.2.12", "rustc_version", "tracing", ] @@ -635,6 +636,12 @@ dependencies = [ "serde", ] +[[package]] +name = "bumpalo" +version = "3.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" + [[package]] name = "bytes" version = "1.7.1" @@ -662,9 +669,12 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.94" +version = "1.1.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17f6e324229dc011159fcc089755d1e2e216a90d43a7dea6853ca740b84f35e7" +checksum = "b16803a61b81d9eabb7eae2588776c4c1e584b738ede45fdbb4c972cec1e9945" +dependencies = [ + "shlex", +] [[package]] name = "cfg-if" @@ -766,6 +776,7 @@ dependencies = [ "indoc", "itertools", "lazy_static", + "libssh2-sys", "lru", "mutants", "nix", @@ -783,6 +794,7 @@ dependencies = [ "serde", "serde_json", "snap", + "ssh2", "strum", "strum_macros", "tempfile", @@ -797,6 +809,7 @@ dependencies = [ "unix_mode", "url", "uzers", + "whoami", ] [[package]] @@ -1092,7 +1105,7 @@ checksum = "1ee447700ac8aa0b2f2bd7bc4462ad686ba06baa6727ac149a2d6277f0d240fd" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.4.1", "windows-sys 0.52.0", ] @@ -1509,6 +1522,15 @@ version = "2.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b248f5224d1d606005e02c97f5aa4e88eeb230488bcc03bc9ca4d7991399f2b5" +[[package]] +name = "instant" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" +dependencies = [ + "cfg-if", +] + [[package]] name = "io-lifetimes" version = "1.0.11" @@ -1535,6 +1557,15 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" +[[package]] +name = "js-sys" +version = "0.3.72" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a88f1bda2bd75b0452a14784937d796722fdebfe50df998aeb3f0b7603019a9" +dependencies = [ + "wasm-bindgen", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -1553,6 +1584,32 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" +[[package]] +name = "libssh2-sys" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dc8a030b787e2119a731f1951d6a773e2280c660f8ec4b0f5e1505a386e71ee" +dependencies = [ + "cc", + "libc", + "libz-sys", + "openssl-sys", + "pkg-config", + "vcpkg", +] + +[[package]] +name = "libz-sys" +version = "1.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2d16453e800a8cf6dd2fc3eb4bc99b786a9b90c663b8559a5b1a041bf89e472" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linux-raw-sys" version = "0.3.8" @@ -1732,7 +1789,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "210b363fa6901c372f264fa32ef3710c0e86328901deaed31294fecfd51e848b" dependencies = [ "atty", - "parking_lot", + "parking_lot 0.12.1", "terminal_size 0.2.6", "yansi", ] @@ -1758,6 +1815,18 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "openssl-sys" +version = "0.9.103" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f9e8deee91df40a943c71b917e5874b951d32a802526c85721ce3b776c929d6" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "outref" version = "0.5.1" @@ -1781,6 +1850,17 @@ dependencies = [ "sha2", ] +[[package]] +name = "parking_lot" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core 0.8.6", +] + [[package]] name = "parking_lot" version = "0.12.1" @@ -1788,7 +1868,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" dependencies = [ "lock_api", - "parking_lot_core", + "parking_lot_core 0.9.9", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc" +dependencies = [ + "cfg-if", + "instant", + "libc", + "redox_syscall 0.2.16", + "smallvec", + "winapi", ] [[package]] @@ -1799,7 +1893,7 @@ checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.4.1", "smallvec", "windows-targets 0.48.5", ] @@ -1832,6 +1926,12 @@ dependencies = [ "spki", ] +[[package]] +name = "pkg-config" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "953ec861398dccce10c670dfeaf3ec4911ca479e9c02154b3a215178c5f566f2" + [[package]] name = "powerfmt" version = "0.2.0" @@ -1895,9 +1995,9 @@ dependencies = [ [[package]] name = "proptest" -version = "1.4.0" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31b476131c3c86cb68032fdc5cb6d5a1045e3e42d96b69fa599fd77701e1f5bf" +checksum = "b4c2511913b88df1637da85cc8d96ec8e43a3f8bb8ccb71ee1ac240d6f3df58d" dependencies = [ "bit-set", "bit-vec", @@ -1915,13 +2015,13 @@ dependencies = [ [[package]] name = "proptest-derive" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cf16337405ca084e9c78985114633b6827711d22b9e6ef6c6c0d665eb3f0b6e" +checksum = "6ff7ff745a347b87471d859a377a9a404361e7efc2a971d73424a6d183c0fc77" dependencies = [ "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.58", ] [[package]] @@ -2004,6 +2104,15 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73ea134c32fe12df286020949d57d052a90c4001f2dbec4c1c074f39bcb7fc8c" +[[package]] +name = "redox_syscall" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "redox_syscall" version = "0.4.1" @@ -2013,6 +2122,15 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "redox_syscall" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b6dfecf2c74bce2466cabf93f6664d6998a69eb21e39f4207930065b27b771f" +dependencies = [ + "bitflags 2.5.0", +] + [[package]] name = "regex" version = "1.10.4" @@ -2370,6 +2488,12 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "signal-hook-registry" version = "1.4.1" @@ -2436,6 +2560,18 @@ dependencies = [ "der", ] +[[package]] +name = "ssh2" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7fe461910559f6d5604c3731d00d2aafc4a83d1665922e280f42f9a168d5455" +dependencies = [ + "bitflags 1.3.2", + "libc", + "libssh2-sys", + "parking_lot 0.11.2", +] + [[package]] name = "strsim" version = "0.11.1" @@ -2622,7 +2758,7 @@ dependencies = [ "libc", "mio", "num_cpus", - "parking_lot", + "parking_lot 0.12.1", "pin-project-lite", "signal-hook-registry", "socket2", @@ -2877,6 +3013,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.4" @@ -2923,6 +3065,88 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasite" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" + +[[package]] +name = "wasm-bindgen" +version = "0.2.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "128d1e363af62632b8eb57219c8fd7877144af57558fb2ef0368d0087bddeb2e" +dependencies = [ + "cfg-if", + "once_cell", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb6dd4d3ca0ddffd1dd1c9c04f94b868c37ff5fac97c30b97cff2d74fce3a358" +dependencies = [ + "bumpalo", + "log", + "once_cell", + "proc-macro2", + "quote", + "syn 2.0.58", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e79384be7f8f5a9dd5d7167216f022090cf1f9ec128e6e6a482a2cb5c5422c56" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.58", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65fc09f10666a9f147042251e0dda9c18f166ff7de300607007e96bdebc1068d" + +[[package]] +name = "web-sys" +version = "0.3.72" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6488b90108c040df0fe62fa815cbdee25124641df01814dd7282749234c6112" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "whoami" +version = "1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "372d5b87f58ec45c384ba03563b03544dc5fadc3983e434b286913f5b4a9bb6d" +dependencies = [ + "redox_syscall 0.5.7", + "wasite", + "web-sys", +] + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index c9fc5d0c..51534dfd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,20 @@ repository = "https://github.com/sourcefrog/conserve/" version = "24.8.0" rust-version = "1.79" +[features] +default = ["s3", "sftp"] +s3 = [ + "dep:aws-config", + "dep:aws-sdk-s3", + "dep:aws-types", + "dep:base64", + "dep:crc32c", + "dep:futures", + "dep:tokio", +] +s3-integration-test = ["s3"] +sftp = ["dep:ssh2", "dep:libssh2-sys"] + [[bin]] doc = false name = "conserve" @@ -18,7 +32,7 @@ name = "conserve" [dependencies] assert_matches = "1.5.0" aws-config = { version = "1.1", optional = true } -aws-sdk-s3 = { version = "1.21", optional = true } +aws-sdk-s3 = { version = "1.56", optional = true } aws-types = { version = "1.1", optional = true } base64 = { version = "0.22", optional = true } blake2-rfc = "0.2.18" @@ -35,8 +49,10 @@ hex = "0.4.2" hostname = "0.3" itertools = "0.12" lazy_static = "1.4.0" +libssh2-sys = { version = "0.3.0", optional = true } lru = "0.12" mutants = "0.0.3" +rand = "0.8" rayon = "1.3.0" readahead-iterator = "0.1.1" regex = "1.3.9" @@ -44,12 +60,13 @@ semver = "1" serde = { version = "1", features = ["derive"] } serde_json = "1" snap = "1.0.0" +ssh2 = { version = "0.9.4", optional = true } strum = "0.26" strum_macros = "0.26" tempfile = "3" thiserror = "1.0.19" thousands = "0.2.0" -time = { version = "0.3.28", features = [ +time = { version = "0.3.35", features = [ "local-offset", "macros", "serde", @@ -60,7 +77,7 @@ tracing = "0.1" tracing-appender = "0.2" unix_mode = "0.1" url = "2.2.2" -indoc = "2.0" +whoami = "1.5.2" [target.'cfg(unix)'.dependencies] uzers = "0.11" @@ -85,29 +102,15 @@ assert_cmd = "2.0" assert_fs = "1.0" cp_r = "0.5" dir-assert = "0.2" +indoc = "2.0" predicates = "3" pretty_assertions = "1.0" -proptest = "1.0" -proptest-derive = "0.4" +proptest = "1.5" +proptest-derive = "0.5" rand = "0.8" rstest = { version = "0.19", default-features = false } tracing-test = { version = "0.2", features = ["no-env-filter"] } -[features] -default = ["s3"] -# blake2-rfc/simd_asm needs nightly, so it's no longer a feature here so that --all-features works on stable. -# blake2_simd_asm = ["blake2-rfc/simd_asm"] -s3 = [ - "dep:aws-config", - "dep:aws-sdk-s3", - "dep:aws-types", - "dep:base64", - "dep:crc32c", - "dep:futures", - "dep:tokio", -] -s3-integration-test = ["s3"] - [lib] doctest = false diff --git a/NEWS.md b/NEWS.md index c6b3d851..187ac88b 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,5 +1,9 @@ # Conserve release history +## Unreleased + +- Changed: S3 is no longer built by default, because it adds many dependencies. It can be turned on again with `--features s3`. + ## 24.8.0 - Fixed: `restore --only` specifying a subdirectory no longer fails due to parent directories missing from the destination. diff --git a/README.md b/README.md index 690853e7..c79db9ed 100644 --- a/README.md +++ b/README.md @@ -114,9 +114,6 @@ automatically excluded from backups. From 23.9 Conserve supports storing backups in Amazon S3. AWS IAM credentials are read from the standard sources: the environment, config file, or, on EC2, the instance metadata service. -S3 support can be turned off by passing `cargo install --no-default-features`. (There's no -runtime impact if it is not used, but it does add a lot of build-time dependencies.) - To use this, just specify an S3 URL for the archive location. The bucket must already exist. conserve init s3://my-bucket/ @@ -141,9 +138,12 @@ To install from a git checkout, run [rust]: https://rustup.rs/ -On nightly Rust only, and only on x86_64, you can enable a slight speed-up with +### Optional features + +The following features are enabled by default, but can be turned off with `cargo install --no-default-features` if they are not needed: - cargo +nightly install -f --path . --features blake2-rfc/simd_asm +- `s3`: support for storing backups in Amazon S3 (or compatible services) +- `sftp`: support for storing backups on SFTP servers, addressed with `sftp://` URLs ### Arch Linux diff --git a/doc/sftp.md b/doc/sftp.md new file mode 100644 index 00000000..2cf4a1f2 --- /dev/null +++ b/doc/sftp.md @@ -0,0 +1,12 @@ +# SFTP support + +Conserve can read and write archives over SFTP. + +To use this, just specify an SFTP URL, like `sftp://user@host/path`, for the archive location. + + conserve init sftp://user@host/path + conserve backup sftp://user@host/path ~ + +If no username is present in the URL, Conserve will use the current user's username. + +Currently, Conserve only supports agent authentication. diff --git a/src/apath.rs b/src/apath.rs index f2495354..706a22d0 100644 --- a/src/apath.rs +++ b/src/apath.rs @@ -116,7 +116,7 @@ pub struct DecodeFilenameError<'name> { name: &'name OsStr, } -impl<'name> fmt::Display for DecodeFilenameError<'name> { +impl fmt::Display for DecodeFilenameError<'_> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "Couldn't decode filename {:?}", self.name) } diff --git a/src/archive.rs b/src/archive.rs index c241cf83..e30d935a 100644 --- a/src/archive.rs +++ b/src/archive.rs @@ -26,7 +26,7 @@ use tracing::{debug, warn}; use crate::jsonio::{read_json, write_json}; use crate::monitor::Monitor; -use crate::transport::local::LocalTransport; +use crate::transport::Transport; use crate::*; const HEADER_FILENAME: &str = "CONSERVE"; @@ -39,7 +39,7 @@ pub struct Archive { pub(crate) block_dir: Arc, /// Transport to the root directory of the archive. - transport: Arc, + transport: Transport, } #[derive(Debug, Serialize, Deserialize)] @@ -56,24 +56,21 @@ pub struct DeleteOptions { impl Archive { /// Make a new archive in a local directory. pub fn create_path(path: &Path) -> Result { - Archive::create(Arc::new(LocalTransport::new(path))) + Archive::create(Transport::local(path)) } /// Make a new archive in a new directory accessed by a Transport. - pub fn create(transport: Arc) -> Result { + pub fn create(transport: Transport) -> Result { transport.create_dir("")?; let names = transport.list_dir("")?; if !names.files.is_empty() || !names.dirs.is_empty() { return Err(Error::NewArchiveDirectoryNotEmpty); } - let block_dir = Arc::new(BlockDir::create(transport.sub_transport(BLOCK_DIR))?); - write_json( - &transport, - HEADER_FILENAME, - &ArchiveHeader { - conserve_archive_version: String::from(ARCHIVE_VERSION), - }, - )?; + let block_dir = Arc::new(BlockDir::create(transport.chdir(BLOCK_DIR))?); + let header = ArchiveHeader { + conserve_archive_version: String::from(ARCHIVE_VERSION), + }; + write_json(&transport, HEADER_FILENAME, &header)?; Ok(Archive { block_dir, transport, @@ -84,18 +81,18 @@ impl Archive { /// /// Checks that the header is correct. pub fn open_path(path: &Path) -> Result { - Archive::open(Arc::new(LocalTransport::new(path))) + Archive::open(Transport::local(path)) } - pub fn open(transport: Arc) -> Result { + pub fn open(transport: Transport) -> Result { let header: ArchiveHeader = - read_json(transport.as_ref(), HEADER_FILENAME)?.ok_or(Error::NotAnArchive)?; + read_json(&transport, HEADER_FILENAME)?.ok_or(Error::NotAnArchive)?; if header.conserve_archive_version != ARCHIVE_VERSION { return Err(Error::UnsupportedArchiveVersion { version: header.conserve_archive_version, }); } - let block_dir = Arc::new(BlockDir::open(transport.sub_transport(BLOCK_DIR))); + let block_dir = Arc::new(BlockDir::open(transport.chdir(BLOCK_DIR))); debug!(?header, "Opened archive"); Ok(Archive { block_dir, @@ -108,7 +105,7 @@ impl Archive { } pub fn band_exists(&self, band_id: BandId) -> Result { - self.transport + self.transport() .is_file(&format!("{}/{}", band_id, crate::BAND_HEAD_FILENAME)) .map_err(Error::from) } @@ -138,8 +135,8 @@ impl Archive { Ok(band_ids) } - pub(crate) fn transport(&self) -> &dyn Transport { - self.transport.as_ref() + pub(crate) fn transport(&self) -> &Transport { + &self.transport } pub fn resolve_band_id(&self, band_selection: BandSelectionPolicy) -> Result { diff --git a/src/band.rs b/src/band.rs index a207b48a..6da265eb 100644 --- a/src/band.rs +++ b/src/band.rs @@ -24,6 +24,7 @@ use std::borrow::Cow; use std::sync::Arc; +use crate::transport::Transport; use itertools::Itertools; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; @@ -75,7 +76,7 @@ pub struct Band { band_id: BandId, /// Transport pointing to the archive directory. - transport: Arc, + transport: Transport, /// Deserialized band head info. head: Head, @@ -143,7 +144,7 @@ impl Band { let band_id = archive .last_band_id()? .map_or_else(BandId::zero, |b| b.next_sibling()); - let transport = archive.transport().sub_transport(&band_id.to_string()); + let transport = archive.transport().chdir(&band_id.to_string()); transport.create_dir("")?; transport.create_dir(INDEX_DIR)?; let band_format_version = if format_flags.is_empty() { @@ -179,9 +180,9 @@ impl Band { /// Open the band with the given id. pub fn open(archive: &Archive, band_id: BandId) -> Result { - let transport = archive.transport().sub_transport(&band_id.to_string()); - let head: Head = read_json(transport.as_ref(), BAND_HEAD_FILENAME)? - .ok_or(Error::BandHeadMissing { band_id })?; + let transport = archive.transport().chdir(&band_id.to_string()); + let head: Head = + read_json(&transport, BAND_HEAD_FILENAME)?.ok_or(Error::BandHeadMissing { band_id })?; if let Some(version) = &head.band_format_version { if !band_version_supported(version) { return Err(Error::UnsupportedBandVersion { @@ -250,17 +251,17 @@ impl Band { } pub fn index_builder(&self) -> IndexWriter { - IndexWriter::new(self.transport.sub_transport(INDEX_DIR)) + IndexWriter::new(self.transport.chdir(INDEX_DIR)) } /// Get read-only access to the index of this band. pub fn index(&self) -> IndexRead { - IndexRead::open(self.transport.sub_transport(INDEX_DIR)) + IndexRead::open(self.transport.chdir(INDEX_DIR)) } /// Return info about the state of this band. pub fn get_info(&self) -> Result { - let tail_option: Option = read_json(self.transport.as_ref(), BAND_TAIL_FILENAME)?; + let tail_option: Option = read_json(&self.transport, BAND_TAIL_FILENAME)?; let start_time = OffsetDateTime::from_unix_timestamp(self.head.start_time).map_err(|_| { Error::InvalidMetadata { diff --git a/src/bin/conserve.rs b/src/bin/conserve.rs index 23eca0ac..6726b52b 100644 --- a/src/bin/conserve.rs +++ b/src/bin/conserve.rs @@ -28,6 +28,7 @@ use time::UtcOffset; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn, Level}; +use crate::transport::Transport; use conserve::termui::{enable_tracing, TermUiMonitor, TraceTimeStyle}; use conserve::*; @@ -327,7 +328,7 @@ impl Command { ..Default::default() }; let stats = backup( - &Archive::open(open_transport(archive)?)?, + &Archive::open(Transport::new(archive)?)?, source, &options, monitor, @@ -338,7 +339,7 @@ impl Command { } Command::Debug(Debug::Blocks { archive }) => { let mut bw = BufWriter::new(stdout); - for hash in Archive::open(open_transport(archive)?)? + for hash in Archive::open(Transport::new(archive)?)? .block_dir() .blocks(monitor)? .collect::>() @@ -353,7 +354,7 @@ impl Command { } Command::Debug(Debug::Referenced { archive }) => { let mut bw = BufWriter::new(stdout); - let archive = Archive::open(open_transport(archive)?)?; + let archive = Archive::open(Transport::new(archive)?)?; for hash in archive.referenced_blocks(&archive.list_band_ids()?, monitor)? { writeln!(bw, "{hash}")?; } @@ -361,7 +362,7 @@ impl Command { Command::Debug(Debug::Unreferenced { archive }) => { print!( "{}", - Archive::open(open_transport(archive)?)? + Archive::open(Transport::new(archive)?)? .unreferenced_blocks(monitor)? .map(|hash| format!("{}\n", hash)) .collect::>() @@ -375,7 +376,7 @@ impl Command { break_lock, no_stats, } => { - let stats = Archive::open(open_transport(archive)?)?.delete_bands( + let stats = Archive::open(Transport::new(archive)?)?.delete_bands( backup, &DeleteOptions { dry_run: *dry_run, @@ -418,7 +419,7 @@ impl Command { break_lock, no_stats, } => { - let archive = Archive::open(open_transport(archive)?)?; + let archive = Archive::open(Transport::new(archive)?)?; let stats = archive.delete_bands( &[], &DeleteOptions { @@ -432,7 +433,7 @@ impl Command { } } Command::Init { archive } => { - Archive::create(open_transport(archive)?)?; + Archive::create(Transport::new(archive)?)?; debug!("Created new archive in {archive:?}"); } Command::Ls { @@ -481,7 +482,7 @@ impl Command { no_stats, } => { let band_selection = band_selection_policy_from_opt(backup); - let archive = Archive::open(open_transport(archive)?)?; + let archive = Archive::open(Transport::new(archive)?)?; let _ = no_stats; // accepted but ignored; we never currently print stats let options = RestoreOptions { exclude: Exclude::from_patterns_and_files(exclude, exclude_from)?, @@ -524,7 +525,7 @@ impl Command { let options = ValidateOptions { skip_block_hashes: *quick, }; - Archive::open(open_transport(archive)?)?.validate(&options, monitor.clone())?; + Archive::open(Transport::new(archive)?)?.validate(&options, monitor.clone())?; if monitor.error_count() != 0 { warn!("Archive has some problems."); } else { @@ -543,7 +544,7 @@ impl Command { } else { Some(*LOCAL_OFFSET.read().unwrap()) }; - let archive = Archive::open(open_transport(archive)?)?; + let archive = Archive::open(Transport::new(archive)?)?; let options = ShowVersionsOptions { newest_first: *newest, tree_size: *sizes, @@ -559,7 +560,7 @@ impl Command { } fn stored_tree_from_opt(archive_location: &str, backup: &Option) -> Result { - let archive = Archive::open(open_transport(archive_location)?)?; + let archive = Archive::open(Transport::new(archive_location)?)?; let policy = band_selection_policy_from_opt(backup); archive.open_stored_tree(policy) } diff --git a/src/blockdir.rs b/src/blockdir.rs index abead85b..d0904b64 100644 --- a/src/blockdir.rs +++ b/src/blockdir.rs @@ -32,11 +32,12 @@ use rayon::prelude::*; use serde::{Deserialize, Serialize}; use tracing::{debug, warn}; use tracing::{instrument, trace}; +use transport::WriteMode; use crate::compress::snappy::{Compressor, Decompressor}; use crate::counters::Counter; use crate::monitor::Monitor; -use crate::transport::ListDir; +use crate::transport::{ListDir, Transport}; use crate::*; // const BLOCKDIR_FILE_NAME_LEN: usize = crate::BLAKE_HASH_SIZE_BYTES * 2; @@ -65,7 +66,7 @@ pub struct Address { /// A readable, writable directory within a band holding data blocks. #[derive(Debug)] pub struct BlockDir { - transport: Arc, + transport: Transport, pub stats: BlockDirStats, // TODO: There are fancier caches and they might help, but this one works, and Stretto did not work for me. cache: RwLock>, @@ -85,7 +86,7 @@ pub fn block_relpath(hash: &BlockHash) -> String { } impl BlockDir { - pub fn open(transport: Arc) -> BlockDir { + pub fn open(transport: Transport) -> BlockDir { /// Cache this many blocks in memory. // TODO: Change to a cache that tracks the size of stored blocks? // As a safe conservative value, 100 blocks of 20MB each would be 2GB. @@ -102,7 +103,7 @@ impl BlockDir { } } - pub fn create(transport: Arc) -> Result { + pub fn create(transport: Transport) -> Result { transport.create_dir("")?; Ok(BlockDir::open(transport)) } @@ -131,7 +132,19 @@ impl BlockDir { let hex_hash = hash.to_string(); let relpath = block_relpath(&hash); self.transport.create_dir(subdir_relpath(&hex_hash))?; - self.transport.write_file(&relpath, &compressed)?; + match self + .transport + .write_file(&relpath, &compressed, WriteMode::CreateNew) + { + Ok(()) => {} + Err(err) if err.kind() == transport::ErrorKind::AlreadyExists => { + // let's assume the contents are correct + } + Err(err) => { + warn!(?err, ?hash, "Error writing block"); + return Err(err.into()); + } + } stats.written_blocks += 1; stats.uncompressed_bytes += uncomp_len; stats.compressed_bytes += comp_len; @@ -343,7 +356,6 @@ mod test { use tempfile::TempDir; use crate::monitor::test::TestMonitor; - use crate::transport::open_local_transport; use super::*; @@ -353,7 +365,7 @@ mod test { // file with 0 bytes. It's not valid compressed data. We just treat // the block as not present at all. let tempdir = TempDir::new().unwrap(); - let blockdir = BlockDir::open(open_local_transport(tempdir.path()).unwrap()); + let blockdir = BlockDir::open(Transport::local(tempdir.path())); let mut stats = BackupStats::default(); let monitor = TestMonitor::arc(); let hash = blockdir @@ -367,7 +379,7 @@ mod test { assert_eq!(monitor.get_counter(Counter::BlockExistenceCacheHit), 1); // Since we just wrote it, we know it's there. // Open again to get a fresh cache - let blockdir = BlockDir::open(open_local_transport(tempdir.path()).unwrap()); + let blockdir = BlockDir::open(Transport::local(tempdir.path())); let monitor = TestMonitor::arc(); OpenOptions::new() .write(true) @@ -383,15 +395,12 @@ mod test { #[test] fn temp_files_are_not_returned_as_blocks() { let tempdir = TempDir::new().unwrap(); - let blockdir = BlockDir::open(open_local_transport(tempdir.path()).unwrap()); + let blockdir = BlockDir::open(Transport::local(tempdir.path())); let monitor = TestMonitor::arc(); let subdir = tempdir.path().join(subdir_relpath("123")); create_dir(&subdir).unwrap(); - write( - subdir.join(format!("{}{}", TMP_PREFIX, "123123123")), - b"123", - ) - .unwrap(); + // Write a temp file as was created by earlier versions of the code. + write(subdir.join("tmp123123123"), b"123").unwrap(); let blocks = blockdir .blocks(monitor.clone()) .unwrap() @@ -402,7 +411,7 @@ mod test { #[test] fn cache_hit() { let tempdir = TempDir::new().unwrap(); - let blockdir = BlockDir::open(open_local_transport(tempdir.path()).unwrap()); + let blockdir = BlockDir::open(Transport::local(tempdir.path())); let mut stats = BackupStats::default(); let content = Bytes::from("stuff"); let hash = blockdir @@ -432,7 +441,7 @@ mod test { #[test] fn existence_cache_hit() { let tempdir = TempDir::new().unwrap(); - let blockdir = BlockDir::open(open_local_transport(tempdir.path()).unwrap()); + let blockdir = BlockDir::open(Transport::local(tempdir.path())); let mut stats = BackupStats::default(); let content = Bytes::from("stuff"); let monitor = TestMonitor::arc(); @@ -442,7 +451,7 @@ mod test { // reopen let monitor = TestMonitor::arc(); - let blockdir = BlockDir::open(open_local_transport(tempdir.path()).unwrap()); + let blockdir = BlockDir::open(Transport::local(tempdir.path())); assert!(blockdir.contains(&hash, monitor.clone()).unwrap()); assert_eq!(blockdir.stats.cache_hit.load(Relaxed), 0); assert_eq!(monitor.get_counter(Counter::BlockExistenceCacheHit), 0); diff --git a/src/gc_lock.rs b/src/gc_lock.rs index 25fc2869..919d34b1 100644 --- a/src/gc_lock.rs +++ b/src/gc_lock.rs @@ -31,6 +31,8 @@ //! delete but before starting to actually delete them, we check that no //! new bands have been created. +use transport::WriteMode; + use crate::*; pub static GC_LOCK: &str = "GC_LOCK"; @@ -63,7 +65,9 @@ impl GarbageCollectionLock { if archive.transport().is_file(GC_LOCK).unwrap_or(true) { return Err(Error::GarbageCollectionLockHeld); } - archive.transport().write_file(GC_LOCK, b"{}\n")?; + archive + .transport() + .write_file(GC_LOCK, b"{}\n", WriteMode::CreateNew)?; Ok(GarbageCollectionLock { archive, band_id }) } diff --git a/src/index.rs b/src/index.rs index 47a1b94c..c388dcae 100644 --- a/src/index.rs +++ b/src/index.rs @@ -19,16 +19,17 @@ use std::path::Path; use std::sync::Arc; use std::vec; +use crate::transport::Transport; use itertools::Itertools; use time::OffsetDateTime; use tracing::{debug, debug_span, error}; +use transport::WriteMode; use crate::compress::snappy::{Compressor, Decompressor}; use crate::counters::Counter; use crate::entry::KindMeta; use crate::monitor::Monitor; use crate::stats::IndexReadStats; -use crate::transport::local::LocalTransport; use crate::unix_time::FromUnixAndNanos; use crate::*; @@ -177,7 +178,7 @@ impl IndexEntry { /// hunks preserve apath order. pub struct IndexWriter { /// The `i` directory within the band where all files for this index are written. - transport: Arc, + transport: Transport, /// Currently queued entries to be written out, in arbitrary order. entries: Vec, @@ -199,7 +200,7 @@ pub struct IndexWriter { /// Accumulate and write out index entries into files in an index directory. impl IndexWriter { /// Make a new builder that will write files into the given directory. - pub fn new(transport: Arc) -> IndexWriter { + pub fn new(transport: Transport) -> IndexWriter { IndexWriter { transport, entries: Vec::new(), @@ -253,7 +254,8 @@ impl IndexWriter { self.transport.create_dir(&subdir_relpath(self.sequence))?; } let compressed_bytes = self.compressor.compress(&json)?; - self.transport.write_file(&relpath, &compressed_bytes)?; + self.transport + .write_file(&relpath, &compressed_bytes, WriteMode::CreateNew)?; self.hunks_written += 1; monitor.count(Counter::IndexWrites, 1); monitor.count(Counter::IndexWriteCompressedBytes, compressed_bytes.len()); @@ -279,16 +281,17 @@ fn hunk_relpath(hunk_number: u32) -> String { #[derive(Debug, Clone)] pub struct IndexRead { /// Transport pointing to this index directory. - transport: Arc, + transport: Transport, } impl IndexRead { #[allow(unused)] + // TODO: Deprecate, use Transport? pub(crate) fn open_path(path: &Path) -> IndexRead { - IndexRead::open(Arc::new(LocalTransport::new(path))) + IndexRead::open(Transport::local(path)) } - pub(crate) fn open(transport: Arc) -> IndexRead { + pub(crate) fn open(transport: Transport) -> IndexRead { IndexRead { transport } } @@ -321,7 +324,7 @@ impl IndexRead { debug!(?hunks); IndexHunkIter { hunks: hunks.into_iter(), - transport: Arc::clone(&self.transport), + transport: self.transport.clone(), decompressor: Decompressor::new(), stats: IndexReadStats::default(), after: None, @@ -335,7 +338,7 @@ impl IndexRead { pub struct IndexHunkIter { hunks: std::vec::IntoIter, /// The `i` directory within the band where all files for this index are written. - transport: Arc, + transport: Transport, decompressor: Decompressor, pub stats: IndexReadStats, /// If set, yield only entries ordered after this apath. @@ -524,7 +527,7 @@ mod tests { fn setup() -> (TempDir, IndexWriter) { let testdir = TempDir::new().unwrap(); - let ib = IndexWriter::new(Arc::new(LocalTransport::new(testdir.path()))); + let ib = IndexWriter::new(Transport::local(testdir.path())); (testdir, ib) } diff --git a/src/jsonio.rs b/src/jsonio.rs index bc1a6253..54eb6e86 100644 --- a/src/jsonio.rs +++ b/src/jsonio.rs @@ -17,7 +17,7 @@ use std::path::PathBuf; use serde::de::DeserializeOwned; -use crate::transport::{self, Transport}; +use crate::transport::{self, Transport, WriteMode}; #[derive(Debug, thiserror::Error)] pub enum Error { @@ -37,10 +37,9 @@ pub enum Error { pub type Result = std::result::Result; /// Write uncompressed json to a file on a Transport. -pub(crate) fn write_json(transport: &TR, relpath: &str, obj: &T) -> Result<()> +pub(crate) fn write_json(transport: &Transport, relpath: &str, obj: &T) -> Result<()> where T: serde::Serialize, - TR: AsRef, { let mut s: String = serde_json::to_string(&obj).map_err(|source| Error::Json { source, @@ -48,15 +47,14 @@ where })?; s.push('\n'); transport - .as_ref() - .write_file(relpath, s.as_bytes()) + .write_file(relpath, s.as_bytes(), WriteMode::CreateNew) .map_err(Error::from) } /// Read and deserialize uncompressed json from a file on a Transport. /// /// Returns None if the file does not exist. -pub(crate) fn read_json(transport: &dyn Transport, path: &str) -> Result> +pub(crate) fn read_json(transport: &Transport, path: &str) -> Result> where T: DeserializeOwned, { @@ -79,8 +77,6 @@ mod tests { use assert_fs::prelude::*; use serde::{Deserialize, Serialize}; - use crate::transport::local::LocalTransport; - use super::*; #[derive(Debug, Eq, PartialEq, Serialize, Deserialize)] @@ -98,7 +94,7 @@ mod tests { }; let filename = "test.json"; - let transport = LocalTransport::new(temp.path()); + let transport = Transport::local(temp.path()); super::write_json(&transport, filename, &entry).unwrap(); let json_child = temp.child("test.json"); @@ -114,7 +110,7 @@ mod tests { .write_str(r#"{"id": 42, "weather": "cold"}"#) .unwrap(); - let transport = LocalTransport::new(temp.path()); + let transport = Transport::local(temp.path()); let content: TestContents = read_json(&transport, "test.json") .expect("no error") .expect("file exists"); diff --git a/src/lease.rs b/src/lease.rs index 823c9ed6..9a603e5c 100644 --- a/src/lease.rs +++ b/src/lease.rs @@ -2,28 +2,31 @@ //! Leases controlling write access to an archive. -use std::{sync::Arc, time::Duration}; +use std::process; +use std::time::Duration; use serde::{Deserialize, Serialize}; use thiserror::Error; use time::OffsetDateTime; -use tracing::{debug, instrument}; +use tracing::{debug, instrument, trace, warn}; use url::Url; -use crate::{ - jsonio::{self, read_json}, - transport, Transport, -}; +use crate::transport::WriteMode; +use crate::{transport, transport::Transport}; -pub static LEASE_FILENAME: &str = "LEASE.json"; +pub static LEASE_FILENAME: &str = "LEASE"; /// A lease on an archive. #[derive(Debug)] pub struct Lease { - transport: Arc, - lease_taken: OffsetDateTime, + transport: Transport, + content: LeaseContent, /// The next refresh after this time must rewrite the lease. next_renewal: OffsetDateTime, + /// How often should we renew the lease? + renewal_interval: Duration, + /// URL of the lease file. + url: Url, } #[non_exhaustive] @@ -35,6 +38,9 @@ pub enum Error { content: Box, }, + #[error("Existing lease file {url} is corrupt")] + Corrupt { url: Url }, + #[error("Transport error on lease file: {source}")] Transport { #[from] @@ -43,45 +49,77 @@ pub enum Error { #[error("JSON serialization error in lease {url}: {source}")] Json { source: serde_json::Error, url: Url }, + + #[error("Lease {url} was stolen: {content:?}")] + Stolen { + url: Url, + content: Box, + }, + + #[error("Lease {url} disappeared")] + Disappeared { url: Url }, } type Result = std::result::Result; +/// Options controlling lease behavior, exposed for testing. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct LeaseOptions { + /// How long do leases last before they're assumed stale? + lease_expiry: Duration, + + /// Renew the lease soon after it becomes this old. + renewal_interval: Duration, +} + +impl Default for LeaseOptions { + fn default() -> Self { + Self { + lease_expiry: Duration::from_secs(60), + renewal_interval: Duration::from_secs(10), + } + } +} + impl Lease { /// Acquire a lease, if one is available. /// - /// Returns [Error::Busy] if the lease is already held by another process. - #[instrument] - pub fn acquire(transport: Arc) -> Result { + /// Returns [Error::Busy] or [Error::Corrupt] if the lease is already held by another process. + pub fn try_acquire(transport: &Transport, lease_options: &LeaseOptions) -> Result { + trace!("trying to acquire lease"); let lease_taken = OffsetDateTime::now_utc(); - let lease_expiry = lease_taken + Duration::from_secs(5 * 60); + let lease_expiry = lease_taken + lease_options.lease_expiry; let content = LeaseContent { host: hostname::get() .unwrap_or_default() .to_string_lossy() - .into_owned(), - pid: std::process::id(), - client_version: crate::VERSION.to_string(), - lease_taken, - lease_expiry, + .into_owned() + .into(), + pid: Some(process::id()), + client_version: Some(crate::VERSION.to_string()), + acquired: lease_taken, + expiry: lease_expiry, + nonce: rand::random(), }; - let url = transport.relative_file_url(LEASE_FILENAME); + let url = transport.url().join(LEASE_FILENAME).unwrap(); let mut s: String = serde_json::to_string(&content).expect("serialize lease"); s.push('\n'); - while let Err(err) = transport - .as_ref() - .write_new_file(LEASE_FILENAME, s.as_bytes()) + while let Err(err) = + transport.write_file(LEASE_FILENAME, s.as_bytes(), WriteMode::CreateNew) { if err.kind() == transport::ErrorKind::AlreadyExists { - match Lease::peek(transport.as_ref())? { - Some(content) => { + match Lease::peek(transport)? { + LeaseState::Held(content) => { return Err(Error::Busy { url, content: Box::new(content), }) } - None => { - debug!("Lease file disappeared after conflict"); + LeaseState::Corrupt(_mtime) => { + return Err(Error::Corrupt { url }); + } + LeaseState::Free => { + debug!("Lease file disappeared after conflict; retrying"); continue; } } @@ -91,73 +129,254 @@ impl Lease { } let next_renewal = lease_taken + Duration::from_secs(60); Ok(Lease { - transport, - lease_taken, + transport: transport.clone(), + content, next_renewal, + renewal_interval: lease_options.renewal_interval, + url, }) } + /// Unconditionally renew a held lease, after checking that it was not stolen. + /// + /// This takes the existing lease and returns a new one only if renewal succeeds. + pub fn renew(mut self) -> Result { + let state = Lease::peek(&self.transport)?; + match state { + LeaseState::Held(content) => { + if content != self.content { + warn!(actual = ?content, expected = ?self.content, "lease stolen"); + return Err(Error::Stolen { + url: self.url.clone(), + content: Box::new(content), + }); + } + } + LeaseState::Free => { + warn!("lease file disappeared"); + return Err(Error::Disappeared { + url: self.url.clone(), + }); + } + LeaseState::Corrupt(_mtime) => { + warn!("lease file is corrupt"); + return Err(Error::Corrupt { + url: self.url.clone(), + }); + } + } + self.content.acquired = OffsetDateTime::now_utc(); + self.next_renewal = self.content.acquired + self.renewal_interval; + let json: String = serde_json::to_string(&self.content).expect("serialize lease"); + // At this point we know the lease was, at least very recently, still held by us, so + // we can overwrite it. + self.transport + .write_file(LEASE_FILENAME, json.as_bytes(), WriteMode::Overwrite)?; + Ok(self) + } + #[instrument] pub fn release(self) -> Result<()> { // TODO: Check that it was not stolen? self.transport - .as_ref() .remove_file(LEASE_FILENAME) .map_err(Error::from) } /// Return information about the current leaseholder, if any. - pub fn peek(transport: &dyn Transport) -> Result> { - read_json(transport, LEASE_FILENAME).map_err(|err| match err { - jsonio::Error::Transport { source, .. } => Error::Transport { source }, - jsonio::Error::Json { source, .. } => Error::Json { - source, - url: transport.base_url().join(LEASE_FILENAME).unwrap(), - }, - }) + pub fn peek(transport: &Transport) -> Result { + // TODO: Atomically get the content and mtime; that should be one call on s3. + let metadata = match transport.metadata(LEASE_FILENAME) { + Ok(m) => m, + Err(err) if err.is_not_found() => { + trace!("lease file not present"); + return Ok(LeaseState::Free); + } + Err(err) => { + warn!(?err, "error getting lease file metadata"); + return Err(err.into()); + } + }; + let bytes = transport.read_file(LEASE_FILENAME)?; + match serde_json::from_slice(&bytes) { + Ok(content) => Ok(LeaseState::Held(content)), + Err(err) => { + warn!(?err, "error deserializing lease file"); + // We do still at least know that it's held, and when it was taken. + Ok(LeaseState::Corrupt(metadata.modified)) + } + } } } +#[derive(Debug, Clone)] +pub enum LeaseState { + Free, + Held(LeaseContent), + Corrupt(OffsetDateTime), +} + /// Contents of the lease file. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)] pub struct LeaseContent { /// Hostname of the client process - pub host: String, + pub host: Option, /// Process id of the client. - pub pid: u32, + pub pid: Option, /// Conserve version string. - pub client_version: String, + pub client_version: Option, + /// Random nonce to distinguish different leases from the same client. + pub nonce: u32, /// Time when the lease was taken. #[serde(with = "time::serde::iso8601")] - pub lease_taken: OffsetDateTime, + pub acquired: OffsetDateTime, /// Unix time after which this lease is stale. #[serde(with = "time::serde::iso8601")] - pub lease_expiry: OffsetDateTime, + pub expiry: OffsetDateTime, } #[cfg(test)] mod test { + use std::fs::{write, File}; + use std::process; + use std::time::Duration; + + use assert_matches::assert_matches; use tempfile::TempDir; - use crate::transport::open_local_transport; + use crate::lease::LEASE_FILENAME; + use crate::transport::Transport; - use super::Lease; + use super::{Lease, LeaseState}; #[test] fn take_lease() { + let options = super::LeaseOptions { + lease_expiry: Duration::from_secs(60), + renewal_interval: Duration::from_secs(10), + }; let tmp = TempDir::new().unwrap(); - let transport = open_local_transport(tmp.path()).unwrap(); - let lease = Lease::acquire(transport.clone()).unwrap(); - assert!(tmp.path().join("LEASE.json").exists()); - assert!(lease.next_renewal > lease.lease_taken); + let transport = Transport::local(tmp.path()); + let lease = Lease::try_acquire(&transport, &options) + .expect("original lease in new dir should be acquired"); + assert!(tmp.path().join("LEASE").exists()); + let orig_lease_taken = lease.content.acquired; + + let peeked = Lease::peek(&transport).expect("should be able to peek newly acquired lease"); + let LeaseState::Held(content) = peeked else { + panic!("lease not held") + }; + assert_eq!( + content.host.unwrap(), + hostname::get().unwrap().to_string_lossy() + ); + assert_eq!(content.pid, Some(process::id())); - let peeked = Lease::peek(transport.as_ref()).unwrap().unwrap(); - assert_eq!(peeked.host, hostname::get().unwrap().to_string_lossy()); - assert_eq!(peeked.pid, std::process::id()); + let lease = lease.renew().expect("renew already-held lease"); + let state2 = Lease::peek(&transport).expect("should be able to peek renewed lease"); + match state2 { + LeaseState::Held(content) => { + assert!(content.acquired > orig_lease_taken); + } + _ => panic!("lease should be held, got {state2:?}"), + } lease.release().unwrap(); - assert!(!tmp.path().join("LEASE.json").exists()); + assert!(!tmp.path().join("LEASE").exists()); + } + + #[test] + fn fail_to_renew_deleted_lease() { + let options = super::LeaseOptions { + lease_expiry: Duration::from_secs(60), + renewal_interval: Duration::from_secs(10), + }; + let tmp = TempDir::new().unwrap(); + let transport = Transport::local(tmp.path()); + let lease = Lease::try_acquire(&transport, &options).unwrap(); + assert!(tmp.path().join("LEASE").exists()); + + transport.remove_file(LEASE_FILENAME).unwrap(); + + let result = lease.renew(); + assert_matches!(result, Err(super::Error::Disappeared { .. })); + } + + #[test] + fn fail_to_renew_stolen_lease() { + let options = super::LeaseOptions { + lease_expiry: Duration::from_secs(60), + renewal_interval: Duration::from_secs(10), + }; + let tmp = TempDir::new().unwrap(); + let transport = Transport::local(tmp.path()); + let lease1 = Lease::try_acquire(&transport, &options).unwrap(); + assert!(tmp.path().join("LEASE").exists()); + + // Delete the lease to make it easy to steal. + transport.remove_file(LEASE_FILENAME).unwrap(); + let lease2 = Lease::try_acquire(&transport, &options).unwrap(); + assert!(tmp.path().join("LEASE").exists()); + + // Renewal through the first handle should now fail. + let result = lease1.renew(); + assert_matches!(result, Err(super::Error::Stolen { .. })); + + // Lease 2 can still renew. + lease2.renew().unwrap(); + } + + #[test] + fn peek_fixed_lease_content() { + let tmp = TempDir::new().unwrap(); + let transport = Transport::local(tmp.path()); + write( + tmp.path().join("LEASE"), + r#" + { + "host": "somehost", + "pid": 1234, + "client_version": "0.1.2", + "acquired": "2021-01-01T12:34:56Z", + "expiry": "2021-01-01T12:35:56Z", + "nonce": 12345 + }"#, + ) + .unwrap(); + let state = Lease::peek(&transport).unwrap(); + dbg!(&state); + match state { + LeaseState::Held(content) => { + assert_eq!(content.host.unwrap(), "somehost"); + assert_eq!(content.pid, Some(1234)); + assert_eq!(content.client_version.unwrap(), "0.1.2"); + assert_eq!(content.acquired.year(), 2021); + assert_eq!(content.expiry.year(), 2021); + assert_eq!( + content.expiry - content.acquired, + time::Duration::seconds(60) + ); + } + _ => panic!("lease should be recognized as held, got {state:?}"), + } + } + + /// An empty lease file is judged by its mtime; the lease can be grabbed a while + /// after it was last written. + #[test] + fn peek_corrupt_empty_lease() { + let tmp = TempDir::new().unwrap(); + let transport = Transport::local(tmp.path()); + File::create(tmp.path().join("LEASE")).unwrap(); + let state = Lease::peek(&transport).unwrap(); + match state { + LeaseState::Corrupt(mtime) => { + let now = time::OffsetDateTime::now_utc(); + assert!(now - mtime < time::Duration::seconds(15)); + } + _ => panic!("lease should be recognized as corrupt, got {state:?}"), + } } } diff --git a/src/lib.rs b/src/lib.rs index 512fdaf7..88e0eba8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -74,7 +74,6 @@ pub use crate::restore::{restore, RestoreOptions}; pub use crate::show::{show_versions, ShowVersionsOptions}; pub use crate::stats::DeleteStats; pub use crate::stored_tree::StoredTree; -pub use crate::transport::{open_transport, Transport}; pub use crate::tree::{ReadTree, TreeSize}; pub use crate::unix_mode::UnixMode; pub use crate::validate::ValidateOptions; @@ -94,9 +93,6 @@ pub const ARCHIVE_VERSION: &str = "0.6"; pub const SYMLINKS_SUPPORTED: bool = cfg!(target_family = "unix"); -/// Temporary files in the archive have this prefix. -const TMP_PREFIX: &str = "tmp"; - /// Metadata file in the band directory. static BAND_HEAD_FILENAME: &str = "BANDHEAD"; diff --git a/src/owner.rs b/src/owner.rs index 3bf9fb90..165e4a21 100644 --- a/src/owner.rs +++ b/src/owner.rs @@ -14,7 +14,7 @@ //! Tracks the file owner user/group. -// There is potentially a more efficient way to do this, but this approach works +// TODO: There is potentially a more efficient way to do this, but this approach works // better than just saving the uid and gid, so that backups may potentially // be restored on a different system. diff --git a/src/owner/windows.rs b/src/owner/windows.rs index 14e6fc1d..d40772a5 100644 --- a/src/owner/windows.rs +++ b/src/owner/windows.rs @@ -19,7 +19,6 @@ use std::io; use std::path::Path; use super::Owner; -use crate::Result; impl From<&Metadata> for Owner { fn from(_: &Metadata) -> Self { diff --git a/src/stitch.rs b/src/stitch.rs index 68f1f9f9..dc936df1 100644 --- a/src/stitch.rs +++ b/src/stitch.rs @@ -175,10 +175,9 @@ fn previous_existing_band(archive: &Archive, mut band_id: BandId) -> Option &dyn Transport { + pub fn transport(&self) -> &Transport { self.archive.transport() } } diff --git a/src/transport.rs b/src/transport.rs index 35b9837a..5dec9a1b 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -14,50 +14,24 @@ //! //! Transport operations return std::io::Result to reflect their narrower focus. -use std::path::Path; +use std::path::{Path, PathBuf}; use std::sync::Arc; use std::{error, fmt, io, result}; use bytes::Bytes; use derive_more::Display; +use time::OffsetDateTime; use url::Url; use crate::*; pub mod local; -use local::LocalTransport; +#[cfg(feature = "sftp")] +pub mod sftp; #[cfg(feature = "s3")] pub mod s3; -/// Open a `Transport` to access a local directory. -/// -/// `s` may be a local path or a URL. -pub fn open_transport(s: &str) -> crate::Result> { - if let Ok(url) = Url::parse(s) { - match url.scheme() { - "file" => Ok(Arc::new(LocalTransport::new( - &url.to_file_path().expect("extract URL file path"), - ))), - #[cfg(feature = "s3")] - "s3" => Ok(s3::S3Transport::new(&url)?), - d if d.len() == 1 => { - // Probably a Windows path with drive letter, like "c:/thing", not actually a URL. - Ok(Arc::new(LocalTransport::new(Path::new(s)))) - } - other => Err(crate::Error::UrlScheme { - scheme: other.to_owned(), - }), - } - } else { - Ok(Arc::new(LocalTransport::new(Path::new(s)))) - } -} - -pub fn open_local_transport(path: &Path) -> crate::Result> { - Ok(Arc::new(LocalTransport::new(path))) -} - /// Abstracted filesystem IO to access an archive. /// /// This supports operations that are common across local filesystems, SFTP, and cloud storage, and @@ -66,17 +40,66 @@ pub fn open_local_transport(path: &Path) -> crate::Result> { /// A transport has a root location, which will typically be the top directory of the Archive. /// Below that point everything is accessed with a relative path, expressed as a PathBuf. /// -/// All Transports must be `Send + Sync`, so they can be passed across or shared across threads. +/// Transport objects can be cheaply cloned. /// /// Files in Conserve archives have bounded size and fit in memory so this does not need to /// support streaming or partial reads and writes. -pub trait Transport: Send + Sync + std::fmt::Debug { - /// Get the base URL for this transport. - fn base_url(&self) -> &Url; +#[derive(Clone)] +pub struct Transport { + /// The concrete protocol implementation: local, S3, etc. + protocol: Arc, +} + +impl Transport { + /// Open a new local transport addressing a filesystem directory. + pub fn local(path: &Path) -> Self { + Transport { + protocol: Arc::new(local::Protocol::new(path)), + } + } + + /// Open a new transport from a string that might be a URL or local path. + pub fn new(s: &str) -> Result { + if let Ok(url) = Url::parse(s) { + Transport::from_url(&url) + } else { + Ok(Transport::local(Path::new(s))) + } + } + + pub fn from_url(url: &Url) -> Result { + let protocol: Arc = match url.scheme() { + "file" => Arc::new(local::Protocol::new( + &url.to_file_path().expect("extract URL file path"), + )), + d if d.len() == 1 => { + // Probably a Windows path with drive letter, like "c:/thing", not actually a URL. + Arc::new(local::Protocol::new(Path::new(url.as_str()))) + } + + #[cfg(feature = "s3")] + "s3" => Arc::new(s3::Protocol::new(url)?), + + #[cfg(feature = "sftp")] + "sftp" => Arc::new(sftp::Protocol::new(url)?), - /// Get the URL for a file relative to this transport. - fn relative_file_url(&self, path: &str) -> Url { - self.base_url().join(path).expect("join relative file URL") + _other => { + return Err(Error { + kind: ErrorKind::UrlScheme, + url: Some(url.clone()), + source: None, + }) + } + }; + Ok(Transport { protocol }) + } + + /// Get one complete file into a caller-provided buffer. + /// + /// Files in the archive are of bounded size, so it's OK to always read them entirely into + /// memory, and this is simple to support on all implementations. + pub fn read_file(&self, path: &str) -> Result { + self.protocol.read_file(path) } /// List a directory, separating out file and subdirectory names. @@ -84,16 +107,41 @@ pub trait Transport: Send + Sync + std::fmt::Debug { /// Names are in the arbitrary order that they're returned from the transport. /// /// Any error during iteration causes overall failure. - fn list_dir(&self, relpath: &str) -> Result; + pub fn list_dir(&self, relpath: &str) -> Result { + self.protocol.list_dir(relpath) + } - /// Get one complete file into a caller-provided buffer. - /// - /// Files in the archive are of bounded size, so it's OK to always read them entirely into - /// memory, and this is simple to support on all implementations. - fn read_file(&self, path: &str) -> Result; + /// Make a new transport addressing a subdirectory. + pub fn chdir(&self, relpath: &str) -> Self { + Transport { + protocol: self.protocol.chdir(relpath), + } + } + + pub fn write_file(&self, relpath: &str, content: &[u8], mode: WriteMode) -> Result<()> { + self.protocol.write_file(relpath, content, mode) + } + + pub fn create_dir(&self, relpath: &str) -> Result<()> { + self.protocol.create_dir(relpath) + } + + pub fn metadata(&self, relpath: &str) -> Result { + self.protocol.metadata(relpath) + } + + /// Delete a file. + pub fn remove_file(&self, relpath: &str) -> Result<()> { + self.protocol.remove_file(relpath) + } + + /// Delete a directory and all its contents. + pub fn remove_dir_all(&self, relpath: &str) -> Result<()> { + self.protocol.remove_dir_all(relpath) + } /// Check if a regular file exists. - fn is_file(&self, path: &str) -> Result { + pub fn is_file(&self, path: &str) -> Result { match self.metadata(path) { Ok(metadata) => Ok(metadata.kind == Kind::File), Err(err) if err.kind() == ErrorKind::NotFound => Ok(false), @@ -101,26 +149,43 @@ pub trait Transport: Send + Sync + std::fmt::Debug { } } - /// Create a directory, if it does not exist. - /// - /// If the directory already exists, it's not an error. - /// - /// This function does not create missing parent directories. - fn create_dir(&self, relpath: &str) -> Result<()>; + pub fn url(&self) -> &Url { + self.protocol.url() + } + + #[allow(unused)] + fn local_path(&self) -> Option { + self.protocol.local_path() + } +} + +impl fmt::Debug for Transport { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "Transport({})", self.url()) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum WriteMode { + /// Create the file if it does not exist, or overwrite it if it does. + Overwrite, + + /// Create the file if it does not exist, or fail if it does. + CreateNew, +} + +trait Protocol: Send + Sync { + fn read_file(&self, path: &str) -> Result; /// Write a complete file. /// - /// As much as possible, the file should be written atomically so that it is only visible with - /// the complete content. On a local filesystem the content is written to a temporary file and - /// then renamed. - /// If a temporary file is used, the name should start with `crate::TMP_PREFIX`. + /// Depending on the [WriteMode] this may either overwrite existing files, or error. /// - /// If the file exists it is replaced. (Across transports, and particularly on S3, - /// we can't rely on detecting existing files.) - fn write_file(&self, relpath: &str, content: &[u8]) -> Result<()>; - - /// Write a new file and error if the file exists. - fn write_new_file(&self, relpath: &str, content: &[u8]) -> Result<()>; + /// As much as possible, the file should be written atomically so that it is only visible with + /// the complete content. + fn write_file(&self, relpath: &str, content: &[u8], mode: WriteMode) -> Result<()>; + fn list_dir(&self, relpath: &str) -> Result; + fn create_dir(&self, relpath: &str) -> Result<()>; /// Get metadata about a file. fn metadata(&self, relpath: &str) -> Result; @@ -132,7 +197,13 @@ pub trait Transport: Send + Sync + std::fmt::Debug { fn remove_dir_all(&self, relpath: &str) -> Result<()>; /// Make a new transport addressing a subdirectory. - fn sub_transport(&self, relpath: &str) -> Arc; + fn chdir(&self, relpath: &str) -> Arc; + + fn url(&self) -> &Url; + + fn local_path(&self) -> Option { + None + } } /// A directory entry read from a transport. @@ -152,6 +223,9 @@ pub struct Metadata { /// Kind of file. pub kind: Kind, + + /// Last modified time. + pub modified: OffsetDateTime, } /// A list of all the files and directories in a directory. @@ -165,11 +239,11 @@ pub struct ListDir { #[derive(Debug)] pub struct Error { /// What type of generally known error? - kind: ErrorKind, + pub kind: ErrorKind, /// The underlying error: for example an IO or S3 error. - source: Option>, + pub source: Option>, /// The affected URL, if known. - url: Option, + pub url: Option, } /// General categories of transport errors. @@ -187,25 +261,43 @@ pub enum ErrorKind { #[display(fmt = "Create transport error")] CreateTransport, + #[display(fmt = "Connect error")] + Connect, + + #[display(fmt = "Unsupported URL scheme")] + UrlScheme, + #[display(fmt = "Other transport error")] Other, } +impl From for ErrorKind { + fn from(kind: io::ErrorKind) -> Self { + match kind { + io::ErrorKind::NotFound => ErrorKind::NotFound, + io::ErrorKind::AlreadyExists => ErrorKind::AlreadyExists, + io::ErrorKind::PermissionDenied => ErrorKind::PermissionDenied, + _ => ErrorKind::Other, + } + } +} + impl Error { pub fn kind(&self) -> ErrorKind { self.kind } - pub(self) fn io_error(url: Url, source: io::Error) -> Error { + pub(self) fn io_error(path: &Path, source: io::Error) -> Error { let kind = match source.kind() { io::ErrorKind::NotFound => ErrorKind::NotFound, io::ErrorKind::AlreadyExists => ErrorKind::AlreadyExists, io::ErrorKind::PermissionDenied => ErrorKind::PermissionDenied, _ => ErrorKind::Other, }; + Error { source: Some(Box::new(source)), - url: Some(url), + url: Url::from_file_path(path).ok(), kind, } } @@ -241,3 +333,31 @@ impl error::Error for Error { } type Result = result::Result; + +#[cfg(test)] +mod test { + use std::path::Path; + + use super::Transport; + + #[test] + fn get_path_from_local_transport() { + let transport = Transport::local(Path::new("/tmp")); + assert_eq!(transport.local_path().as_deref(), Some(Path::new("/tmp"))); + } + + #[test] + fn local_transport_debug_form() { + let transport = Transport::local(Path::new("/tmp")); + #[cfg(unix)] + assert_eq!(format!("{:?}", transport), "Transport(file:///tmp/)"); + #[cfg(windows)] + { + use regex::Regex; + let dbg = format!("{:?}", transport); + dbg!(&dbg); + let re = Regex::new(r#"Transport\(file:///[A-Za-z]:/tmp/\)"#).unwrap(); + assert!(re.is_match(&dbg)); + } + } +} diff --git a/src/transport/local.rs b/src/transport/local.rs index fa0fdd1a..1676bf16 100644 --- a/src/transport/local.rs +++ b/src/transport/local.rs @@ -12,53 +12,89 @@ //! Access to an archive on the local filesystem. -use std::borrow::Cow; -use std::fs::{create_dir, File}; -use std::io; +use std::fs::{create_dir, remove_file, File}; use std::io::prelude::*; -use std::path::{absolute, Path, PathBuf}; +use std::path::{Path, PathBuf}; use std::sync::Arc; +use std::{io, path}; use bytes::Bytes; -use tracing::{instrument, trace, warn}; +use tracing::{error, instrument, trace, warn}; use url::Url; -use super::{Error, ListDir, Metadata, Result, Transport}; +use super::{Error, ListDir, Metadata, Result, WriteMode}; -#[derive(Clone, Debug)] -pub struct LocalTransport { - /// Base directory for this transport. - root: PathBuf, +pub(super) struct Protocol { + /// Directory addressed by this protocol. + path: PathBuf, - /// URL representation of the base directory. + /// URL corresponding to the directory. url: Url, } -impl LocalTransport { - pub fn new(path: &Path) -> Self { - LocalTransport { - root: path.to_owned(), - // TODO: Maybe return an error on construction if we can't find the absolute path. - url: Url::from_directory_path(absolute(path).expect("absolute path")) - .expect("path to URL"), +impl Protocol { + pub(super) fn new(path: &Path) -> Self { + Protocol { + path: path.to_owned(), + url: Url::from_directory_path(path::absolute(path).expect("make path absolute")) + .expect("convert path to URL"), } } - pub fn full_path(&self, relpath: &str) -> PathBuf { + fn full_path(&self, relpath: &str) -> PathBuf { debug_assert!(!relpath.contains("/../"), "path must not contain /../"); - self.root.join(relpath) + self.path.join(relpath) } fn io_error(&self, relpath: &str, err: io::Error) -> Error { - Error::io_error(self.url.join(relpath).expect("join URL"), err) + Error::io_error(&self.path.join(relpath), err) } } -impl Transport for LocalTransport { - fn base_url(&self) -> &Url { +impl super::Protocol for Protocol { + fn url(&self) -> &Url { &self.url } + #[instrument(skip(self))] + fn read_file(&self, relpath: &str) -> Result { + fn try_block(path: &Path) -> io::Result { + let mut file = File::open(path)?; + let mut out_buf = Vec::new(); + let actual_len = file.read_to_end(&mut out_buf)?; + trace!("Read {actual_len} bytes"); + Ok(out_buf.into()) + } + try_block(&self.full_path(relpath)).map_err(|err| self.io_error(relpath, err)) + } + + #[instrument(skip(self, content))] + fn write_file(&self, relpath: &str, content: &[u8], write_mode: WriteMode) -> Result<()> { + let full_path = self.full_path(relpath); + let oops = |err| self.io_error(relpath, err); + let mut options = File::options(); + options.write(true); + match write_mode { + WriteMode::CreateNew => { + options.create_new(true); + } + WriteMode::Overwrite => { + options.create(true).truncate(true); + } + } + let mut file = options.open(&full_path).map_err(oops)?; + if let Err(err) = file.write_all(content) { + error!("Failed to write {full_path:?}: {err:?}"); + drop(file); + if let Err(err2) = remove_file(&full_path) { + error!("Failed to remove {full_path:?}: {err2:?}"); + } + return Err(oops(err)); + } + trace!("Wrote {} bytes", content.len()); + Ok(()) + } + fn list_dir(&self, relpath: &str) -> Result { // Archives should never normally contain non-UTF-8 (or even non-ASCII) filenames, but // let's pass them back as lossy UTF-8 so they can be reported at a higher level, for @@ -83,29 +119,6 @@ impl Transport for LocalTransport { Ok(names) } - #[instrument(skip(self))] - fn read_file(&self, relpath: &str) -> Result { - fn try_block(path: &Path) -> io::Result { - let mut file = File::open(path)?; - let estimated_len: usize = file - .metadata()? - .len() - .try_into() - .expect("File size fits in usize"); - let mut out_buf = Vec::with_capacity(estimated_len); - let actual_len = file.read_to_end(&mut out_buf)?; - trace!("Read {actual_len} bytes"); - out_buf.truncate(actual_len); - Ok(out_buf.into()) - } - try_block(&self.full_path(relpath)).map_err(|err| self.io_error(relpath, err)) - } - - fn is_file(&self, relpath: &str) -> Result { - let path = self.full_path(relpath); - Ok(path.is_file()) - } - fn create_dir(&self, relpath: &str) -> super::Result<()> { let path = self.full_path(relpath); create_dir(&path).or_else(|err| { @@ -117,59 +130,12 @@ impl Transport for LocalTransport { }) } - #[instrument(skip(self, content))] - fn write_new_file(&self, relpath: &str, content: &[u8]) -> super::Result<()> { - File::options() - .create(true) - .write(true) - .open(&self.full_path(relpath)) - .and_then(|mut file| file.write_all(content).and_then(|()| file.flush())) - .map_err(|err| self.io_error(relpath, err)) - } - - #[instrument(skip(self, content))] - fn write_file(&self, relpath: &str, content: &[u8]) -> super::Result<()> { - let full_path = self.full_path(relpath); - let dir = full_path.parent().unwrap(); - let context = |err| self.io_error(relpath, err); - let mut temp = tempfile::Builder::new() - .prefix(crate::TMP_PREFIX) - .tempfile_in(dir) - .map_err(context)?; - if let Err(err) = temp.write_all(content) { - let _ = temp.close(); - warn!("Failed to write {:?}: {:?}", relpath, err); - return Err(context(err)); - } - if let Err(persist_error) = temp.persist(&full_path) { - warn!("Failed to persist {:?}: {:?}", full_path, persist_error); - persist_error.file.close().map_err(context)?; - Err(context(persist_error.error)) - } else { - trace!("Wrote {} bytes", content.len()); - Ok(()) - } - } - fn remove_file(&self, relpath: &str) -> super::Result<()> { std::fs::remove_file(self.full_path(relpath)).map_err(|err| self.io_error(relpath, err)) } fn remove_dir_all(&self, relpath: &str) -> super::Result<()> { - let path = self.full_path(relpath); - std::fs::remove_dir_all(&path).map_err(|err| self.io_error(relpath, err)) - } - - fn sub_transport(&self, relpath: &str) -> Arc { - let subdir = if relpath.ends_with('/') { - Cow::Borrowed(relpath) - } else { - Cow::Owned(format!("{relpath}/")) - }; - Arc::new(LocalTransport { - root: self.root.join(relpath), - url: self.url.join(&subdir).expect("join component to URL"), - }) + std::fs::remove_dir_all(self.full_path(relpath)).map_err(|err| self.io_error(relpath, err)) } fn metadata(&self, relpath: &str) -> Result { @@ -177,29 +143,41 @@ impl Transport for LocalTransport { .full_path(relpath) .metadata() .map_err(|err| self.io_error(relpath, err))?; + let modified = fsmeta + .modified() + .map_err(|err| self.io_error(relpath, err))? + .into(); Ok(Metadata { len: fsmeta.len(), kind: fsmeta.file_type().into(), + modified, }) } -} -impl AsRef for LocalTransport { - fn as_ref(&self) -> &(dyn Transport + 'static) { - self + fn chdir(&self, relpath: &str) -> Arc { + Arc::new(Protocol { + path: self.path.join(relpath), + url: self.url.join(relpath).expect("join URL"), + }) + } + + fn local_path(&self) -> Option { + Some(self.path.clone()) } } #[cfg(test)] mod test { use std::error::Error; + use std::time::Duration; use assert_fs::prelude::*; use predicates::prelude::*; + use time::OffsetDateTime; use super::*; use crate::kind::Kind; - use crate::transport; + use crate::transport::{self, Transport}; #[test] fn read_file() { @@ -209,7 +187,7 @@ mod test { temp.child(filename).write_str(content).unwrap(); - let transport = LocalTransport::new(temp.path()); + let transport = Transport::local(temp.path()); let buf = transport.read_file(filename).unwrap(); assert_eq!(buf, content.as_bytes()); @@ -219,7 +197,7 @@ mod test { #[test] fn read_file_not_found() { let temp = assert_fs::TempDir::new().unwrap(); - let transport = LocalTransport::new(temp.path()); + let transport = Transport::local(temp.path()); let err = transport .read_file("nonexistent.json") @@ -253,15 +231,14 @@ mod test { let filename = "poem.txt"; temp.child(filename).write_str(content).unwrap(); - let transport = LocalTransport::new(temp.path()); + let transport = Transport::local(temp.path()); - assert_eq!( - transport.metadata(filename).unwrap(), - Metadata { - len: 24, - kind: Kind::File - } - ); + let metadata = transport.metadata(filename).unwrap(); + dbg!(&metadata); + + assert_eq!(metadata.len, 24); + assert_eq!(metadata.kind, Kind::File); + assert!(metadata.modified + Duration::from_secs(60) > OffsetDateTime::now_utc()); assert!(transport.metadata("nopoem").unwrap_err().is_not_found()); } @@ -275,7 +252,7 @@ mod test { .write_str("Morning coffee") .unwrap(); - let transport = LocalTransport::new(temp.path()); + let transport = Transport::local(temp.path()); let root_list = transport.list_dir(".").unwrap(); assert_eq!(root_list.files, ["root file"]); assert_eq!(root_list.dirs, ["subdir"]); @@ -293,11 +270,15 @@ mod test { #[test] fn write_file() { let temp = assert_fs::TempDir::new().unwrap(); - let transport = LocalTransport::new(temp.path()); + let transport = Transport::local(temp.path()); transport.create_dir("subdir").unwrap(); transport - .write_file("subdir/subfile", b"Must I paint you a picture?") + .write_file( + "subdir/subfile", + b"Must I paint you a picture?", + WriteMode::CreateNew, + ) .unwrap(); temp.child("subdir").assert(predicate::path::is_dir()); @@ -315,7 +296,7 @@ mod test { use std::os::unix::prelude::PermissionsExt; let temp = assert_fs::TempDir::new().unwrap(); - let transport = LocalTransport::new(temp.path()); + let transport = Transport::local(temp.path()); temp.child("file").touch().unwrap(); fs::set_permissions(temp.child("file").path(), fs::Permissions::from_mode(0o000)) .expect("set_permissions"); @@ -328,13 +309,13 @@ mod test { #[test] fn write_file_can_overwrite() { let temp = assert_fs::TempDir::new().unwrap(); - let transport = LocalTransport::new(temp.path()); + let transport = Transport::local(temp.path()); let filename = "filename"; transport - .write_file(filename, b"original content") + .write_file(filename, b"original content", WriteMode::Overwrite) .expect("first write succeeds"); transport - .write_file(filename, b"new content") + .write_file(filename, b"new content", WriteMode::Overwrite) .expect("write over existing file succeeds"); assert_eq!( transport.read_file(filename).unwrap().as_ref(), @@ -345,7 +326,7 @@ mod test { #[test] fn create_existing_dir() { let temp = assert_fs::TempDir::new().unwrap(); - let transport = LocalTransport::new(temp.path()); + let transport = Transport::local(temp.path()); transport.create_dir("aaa").unwrap(); transport.create_dir("aaa").unwrap(); @@ -357,12 +338,12 @@ mod test { #[test] fn sub_transport() { let temp = assert_fs::TempDir::new().unwrap(); - let transport = LocalTransport::new(temp.path()); + let transport = Transport::local(temp.path()); transport.create_dir("aaa").unwrap(); transport.create_dir("aaa/bbb").unwrap(); - let sub_transport = transport.sub_transport("aaa"); + let sub_transport = transport.chdir("aaa"); let sub_list = sub_transport.list_dir("").unwrap(); assert_eq!(sub_list.dirs, ["bbb"]); @@ -374,7 +355,7 @@ mod test { #[test] fn remove_dir_all() { let temp = assert_fs::TempDir::new().unwrap(); - let transport = LocalTransport::new(temp.path()); + let transport = Transport::local(temp.path()); transport.create_dir("aaa").unwrap(); transport.create_dir("aaa/bbb").unwrap(); diff --git a/src/transport/s3.rs b/src/transport/s3.rs index c559ef2c..3cb0b0e1 100644 --- a/src/transport/s3.rs +++ b/src/transport/s3.rs @@ -24,9 +24,9 @@ // // cargo mutants -f s3.rs --no-config -C --features=s3,s3-integration-test -use std::borrow::Cow; use std::fmt; use std::sync::Arc; +use std::time::SystemTime; use aws_config::{AppName, BehaviorVersion}; use aws_sdk_s3::error::SdkError; @@ -42,12 +42,13 @@ use aws_types::SdkConfig; use base64::Engine; use bytes::Bytes; use tokio::runtime::Runtime; -use tracing::{debug, trace, trace_span}; +use tracing::{debug, instrument, trace, trace_span}; use url::Url; -use super::{Error, ErrorKind, Kind, ListDir, Metadata, Result, Transport}; +use super::{Error, ErrorKind, Kind, ListDir, Metadata, Result, WriteMode}; -pub struct S3Transport { +pub(super) struct Protocol { + url: Url, /// Tokio runtime specifically for S3 IO. /// /// S3 SDK is built on Tokio but the rest of Conserve uses threads. @@ -60,41 +61,26 @@ pub struct S3Transport { bucket: String, base_path: String, - url: Url, - /// Storage class for new objects. storage_class: StorageClass, } -impl fmt::Debug for S3Transport { - #[mutants::skip] // unimportant to test - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("S3Transport") - .field("bucket", &self.bucket) - .field("base_path", &self.base_path) - .finish() - } -} +impl Protocol { + pub(super) fn new(url: &Url) -> Result { + assert_eq!(url.scheme(), "s3"); -// Clippy false positive here: https://github.com/rust-lang/rust-clippy/issues/12444 -#[allow(clippy::assigning_clones)] -impl S3Transport { - pub fn new(base_url: &Url) -> Result> { // Like in . let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .map_err(|source| Error { kind: ErrorKind::CreateTransport, - url: Some(base_url.to_owned()), + url: Some(url.to_owned()), source: Some(Box::new(source)), })?; - let bucket = base_url.authority().to_owned(); - assert!( - !bucket.is_empty(), - "S3 bucket name is empty in {base_url:?}" - ); + let bucket = url.authority().to_owned(); + assert!(!bucket.is_empty(), "S3 bucket name is empty in {url:?}"); // Find the bucket region. let config = load_aws_config(&runtime, None); @@ -116,7 +102,7 @@ impl S3Transport { let config = load_aws_config(&runtime, region); let client = aws_sdk_s3::Client::new(&config); - let mut base_path = base_url.path().to_owned(); + let mut base_path = url.path().to_owned(); if !base_path.is_empty() { base_path = base_path .strip_prefix('/') @@ -125,16 +111,48 @@ impl S3Transport { .to_owned(); } let url = Url::parse(&format!("s3://{bucket}/{base_path}")).expect("valid s3 URL"); - debug!(%base_url); + debug!(%url); - Ok(Arc::new(S3Transport { + Ok(Protocol { bucket, base_path, url, client: Arc::new(client), runtime: Arc::new(runtime), storage_class: StorageClass::IntelligentTiering, - })) + }) + } + + fn join_path(&self, relpath: &str) -> String { + join_paths(&self.base_path, relpath) + } + + fn s3_error(&self, key: &str, source: SdkError) -> Error + where + E: std::error::Error + Send + Sync + 'static, + R: std::fmt::Debug + Send + Sync + 'static, + ErrorKind: for<'a> From<&'a E>, + { + debug!(s3_error = ?source); + let kind = match &source { + SdkError::ServiceError(service_err) => ErrorKind::from(service_err.err()), + _ => ErrorKind::Other, + }; + Error { + kind, + url: self.url.join(key).ok(), + source: Some(source.into()), + } + } +} + +impl fmt::Debug for Protocol { + #[mutants::skip] // unimportant to test + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("conserve::transport::s3::Protocol") + .field("bucket", &self.bucket) + .field("base_path", &self.base_path) + .finish() } } @@ -185,11 +203,7 @@ fn join_paths(a: &str, b: &str) -> String { result } -impl Transport for S3Transport { - fn base_url(&self) -> &Url { - &self.url - } - +impl super::Protocol for Protocol { fn list_dir(&self, relpath: &str) -> Result { let _span = trace_span!("S3Transport::list_file", %relpath).entered(); let mut prefix = self.join_path(relpath); @@ -230,7 +244,7 @@ impl Transport for S3Transport { result.files.push(name.to_owned()); } } - Some(Err(err)) => return Err(s3_error(&self.url, prefix, err)), + Some(Err(err)) => return Err(self.s3_error(&prefix, err)), None => break, } } @@ -249,13 +263,13 @@ impl Transport for S3Transport { let response = self .runtime .block_on(request.send()) - .map_err(|source| s3_error(&self.url, key.clone(), source))?; + .map_err(|source| self.s3_error(&key, source))?; let body_bytes = self .runtime .block_on(response.body.collect()) .map_err(|source| Error { kind: ErrorKind::Other, - url: Some(self.url.join(&key).expect("join URL")), + url: self.url.join(relpath).ok(), source: Some(Box::new(source)), })? .into_bytes(); @@ -270,12 +284,12 @@ impl Transport for S3Transport { Ok(()) } - fn write_file(&self, relpath: &str, content: &[u8]) -> Result<()> { - let _span = trace_span!("S3Transport::write_file", %relpath).entered(); + #[instrument(skip(self, content))] + fn write_file(&self, relpath: &str, content: &[u8], write_mode: WriteMode) -> Result<()> { let key = self.join_path(relpath); let crc32c = base64::engine::general_purpose::STANDARD.encode(crc32c::crc32c(content).to_be_bytes()); - let request = self + let mut request = self .client .put_object() .bucket(&self.bucket) @@ -283,24 +297,23 @@ impl Transport for S3Transport { .storage_class(self.storage_class.clone()) .checksum_crc32_c(crc32c) .body(content.to_owned().into()); + if write_mode == WriteMode::CreateNew { + request = request.if_none_match("*"); + } let response = self.runtime.block_on(request.send()); // trace!(?response); - response.map_err(|err| s3_error(&self.url, key, err))?; + response.map_err(|err| self.s3_error(&key, err))?; trace!(body_len = content.len(), "wrote file"); Ok(()) } - fn write_new_file(&self, relpath: &str, content: &[u8]) -> Result<()> { - todo!() - } - fn remove_file(&self, relpath: &str) -> Result<()> { let _span = trace_span!("S3Transport::remove_file", %relpath).entered(); let key = self.join_path(relpath); let request = self.client.delete_object().bucket(&self.bucket).key(&key); let response = self.runtime.block_on(request.send()); trace!(?response); - response.map_err(|err| s3_error(&self.url, key, err))?; + response.map_err(|err| self.s3_error(&key, err))?; trace!("deleted file"); Ok(()) } @@ -321,7 +334,7 @@ impl Transport for S3Transport { let mut n_files = 0; while let Some(response) = self.runtime.block_on(stream.next()) { for object in response - .map_err(|err| s3_error(&self.url, prefix.clone(), err))? + .map_err(|err| self.s3_error(&prefix, err))? .contents .expect("ListObjectsV2Response has contents") { @@ -334,7 +347,7 @@ impl Transport for S3Transport { .key(&key) .send(), ) - .map_err(|err| s3_error(&self.url, key, err))?; + .map_err(|err| self.s3_error(&key, err))?; n_files += 1; } } @@ -356,14 +369,20 @@ impl Transport for S3Transport { .expect("S3 HeadObject response should have a content_length") .try_into() .expect("Content length non-negative"); + let modified: SystemTime = response + .last_modified + .expect("S3 HeadObject response should have a last_modified") + .try_into() + .expect("S3 last_modified is valid SystemTime"); trace!(?len, "File exists"); Ok(Metadata { kind: Kind::File, len, + modified: modified.into(), }) } Err(err) => { - let translated = s3_error(&self.url, key, err); + let translated = self.s3_error(&key, err); if translated.is_not_found() { trace!("file does not exist"); } else { @@ -374,51 +393,19 @@ impl Transport for S3Transport { } } - fn sub_transport(&self, relpath: &str) -> Arc { - let subdir = if relpath.ends_with('/') { - Cow::Borrowed(relpath) - } else { - Cow::Owned(format!("{relpath}/")) - }; - Arc::new(S3Transport { + fn chdir(&self, relpath: &str) -> Arc { + Arc::new(Protocol { base_path: join_paths(&self.base_path, relpath), - url: self.url.join(&subdir).expect("join subdir URL"), + url: self.url.join(relpath).expect("join subdir URL"), bucket: self.bucket.clone(), runtime: self.runtime.clone(), client: self.client.clone(), storage_class: self.storage_class.clone(), }) } -} -impl S3Transport { - fn join_path(&self, relpath: &str) -> String { - join_paths(&self.base_path, relpath) - } -} - -impl AsRef for S3Transport { - fn as_ref(&self) -> &(dyn Transport + 'static) { - self - } -} - -fn s3_error(base: &Url, key: K, source: SdkError) -> Error -where - K: ToOwned, - E: std::error::Error + Send + Sync + 'static, - R: std::fmt::Debug + Send + Sync + 'static, - ErrorKind: for<'a> From<&'a E>, -{ - debug!(s3_error = ?source); - let kind = match &source { - SdkError::ServiceError(service_err) => ErrorKind::from(service_err.err()), - _ => ErrorKind::Other, - }; - Error { - kind, - url: base.join(key.to_owned().as_ref()).ok(), - source: Some(source.into()), + fn url(&self) -> &Url { + &self.url } } diff --git a/src/transport/sftp.rs b/src/transport/sftp.rs new file mode 100644 index 00000000..b8a845a5 --- /dev/null +++ b/src/transport/sftp.rs @@ -0,0 +1,331 @@ +// Copyright 2022 Martin Pool + +//! Read/write archive over SFTP. + +use std::fmt; +use std::io::{self, Read, Write}; +use std::net::TcpStream; +use std::path::PathBuf; +use std::sync::Arc; + +use bytes::Bytes; +use time::OffsetDateTime; +use tracing::{error, info, instrument, trace, warn}; +use url::Url; + +use crate::Kind; + +use super::{Error, ErrorKind, ListDir, Result, WriteMode}; + +pub(super) struct Protocol { + url: Url, + sftp: Arc, + base_path: PathBuf, +} + +impl Protocol { + pub fn new(url: &Url) -> Result { + assert_eq!(url.scheme(), "sftp"); + let addr = format!( + "{}:{}", + url.host_str().expect("url must have a host"), + url.port().unwrap_or(22) + ); + let tcp_stream = TcpStream::connect(addr).map_err(|err| { + error!(?err, ?url, "Error opening SSH TCP connection"); + io_error(err, url) + })?; + trace!("got tcp connection"); + let mut session = ssh2::Session::new().map_err(|err| { + error!(?err, "Error opening SSH session"); + ssh_error(err, url) + })?; + session.set_tcp_stream(tcp_stream); + session.handshake().map_err(|err| { + error!(?err, "Error in SSH handshake"); + ssh_error(err, url) + })?; + trace!( + "SSH connected, banner: {}", + session.banner().unwrap_or("(none)") + ); + let username = match url.username() { + "" => { + trace!("Take default SSH username from environment"); + whoami::username() + } + u => u.to_owned(), + }; + session.userauth_agent(&username).map_err(|err| { + error!(?err, username, "Error in SSH user auth with agent"); + ssh_error(err, url) + })?; + trace!("Authenticated!"); + let sftp = session.sftp().map_err(|err| { + error!(?err, "Error opening SFTP session"); + ssh_error(err, url) + })?; + Ok(Protocol { + url: url.to_owned(), + sftp: Arc::new(sftp), + base_path: url.path().into(), + }) + } + + fn lstat(&self, path: &str) -> Result { + trace!("lstat {path}"); + self.sftp + .lstat(&self.base_path.join(path)) + .map_err(|err| self.ssh_error(err, path)) + } + + fn io_error(&self, source: io::Error, path: &str) -> Error { + Error { + kind: source.kind().into(), + source: Some(Box::new(source)), + url: self.url.join(path).ok(), + } + } + + fn relative_url(&self, path: &str) -> Url { + self.url.join(path).expect("join URL") + } + + fn ssh_error(&self, source: ssh2::Error, path: &str) -> Error { + ssh_error(source, &self.relative_url(path)) + } +} + +fn ssh_error(source: ssh2::Error, url: &Url) -> super::Error { + super::Error { + kind: source.code().into(), + source: Some(Box::new(source)), + url: Some(url.to_owned()), + } +} + +fn io_error(source: io::Error, url: &Url) -> super::Error { + super::Error { + kind: source.kind().into(), + source: Some(Box::new(source)), + url: Some(url.to_owned()), + } +} + +impl super::Protocol for Protocol { + fn list_dir(&self, path: &str) -> Result { + let full_path = &self.base_path.join(path); + trace!("iter_dir_entries {:?}", full_path); + let mut files = Vec::new(); + let mut dirs = Vec::new(); + let mut dir = self.sftp.opendir(full_path).map_err(|err| { + error!(?err, ?full_path, "Error opening directory"); + self.ssh_error(err, path) + })?; + loop { + match dir.readdir() { + Ok((pathbuf, file_stat)) => { + let name = pathbuf.to_string_lossy().into(); + if name == "." || name == ".." { + continue; + } + trace!("read dir got name {}", name); + match file_stat.file_type().into() { + Kind::File => files.push(name), + Kind::Dir => dirs.push(name), + _ => (), + } + } + Err(err) if err.code() == ssh2::ErrorCode::Session(-16) => { + // Apparently there's no symbolic version for it, but this is the error + // code. + // + trace!("read dir end"); + break; + } + Err(err) => { + info!("SFTP error {:?}", err); + return Err(self.ssh_error(err, path)); + } + } + } + Ok(ListDir { files, dirs }) + } + + fn read_file(&self, path: &str) -> Result { + let full_path = self.base_path.join(path); + let url = &self.url.join(path).expect("join URL"); + trace!("read {url}"); + let mut buf = Vec::with_capacity(2 << 20); + let mut file = self + .sftp + .open(&full_path) + .map_err(|err| self.ssh_error(err, path))?; + let len = file + .read_to_end(&mut buf) + .map_err(|err| self.io_error(err, path))?; + assert_eq!(len, buf.len()); + trace!("read {} bytes from {}", len, full_path.display()); + Ok(buf.into()) + } + + fn create_dir(&self, relpath: &str) -> Result<()> { + let full_path = self.base_path.join(relpath); + trace!("create_dir {:?}", full_path); + match self.sftp.mkdir(&full_path, 0o700) { + Ok(()) => Ok(()), + Err(err) if err.code() == ssh2::ErrorCode::SFTP(libssh2_sys::LIBSSH2_FX_FAILURE) => { + // openssh seems to say failure for "directory exists" :/ + Ok(()) + } + Err(err) => { + warn!(?err, ?relpath); + Err(self.ssh_error(err, relpath)) + } + } + } + + fn write_file(&self, relpath: &str, content: &[u8], write_mode: WriteMode) -> Result<()> { + let full_path = self.base_path.join(relpath); + trace!( + "write_file {len:>9} bytes to {full_path:?}", + len = content.len() + ); + let flags = ssh2::OpenFlags::WRITE + | ssh2::OpenFlags::CREATE + | match write_mode { + WriteMode::CreateNew => ssh2::OpenFlags::EXCLUSIVE, + WriteMode::Overwrite => ssh2::OpenFlags::TRUNCATE, + }; + let mut file = self + .sftp + .open_mode(&full_path, flags, 0o644, ssh2::OpenType::File) + .map_err(|err| { + warn!(?err, ?relpath, "sftp error creating file"); + self.ssh_error(err, relpath) + })?; + if let Err(err) = file.write_all(content) { + warn!(?err, ?full_path, "sftp error writing file"); + if let Err(err2) = self.sftp.unlink(&full_path) { + warn!( + ?err2, + ?full_path, + "sftp error unlinking file after write error" + ); + } + return Err(super::Error { + url: Some(self.relative_url(relpath)), + source: Some(Box::new(err)), + kind: ErrorKind::Other, + }); + } + Ok(()) + } + + fn metadata(&self, relpath: &str) -> Result { + let full_path = self.base_path.join(relpath); + trace!("metadata {full_path:?}"); + let stat = self.lstat(relpath)?; + let modified = stat + .mtime + .and_then(|mtime| OffsetDateTime::from_unix_timestamp(mtime as i64).ok()) + .ok_or_else(|| { + warn!("No mtime for {full_path:?}"); + super::Error { + kind: ErrorKind::Other, + source: None, + url: Some(self.relative_url(relpath)), + } + })?; + Ok(super::Metadata { + kind: stat.file_type().into(), + len: stat.size.unwrap_or_default(), + modified, + }) + } + + fn remove_file(&self, relpath: &str) -> Result<()> { + let full_path = self.base_path.join(relpath); + trace!("remove_file {full_path:?}"); + self.sftp + .unlink(&full_path) + .map_err(|err| self.ssh_error(err, relpath)) + } + + #[instrument] + fn remove_dir_all(&self, path: &str) -> Result<()> { + let mut dirs_to_walk = vec![path.to_owned()]; + let mut dirs_to_delete = vec![path.to_owned()]; + while let Some(dir) = dirs_to_walk.pop() { + trace!(?dir, "Walk down dir"); + let list = self.list_dir(&dir)?; + for file in list.files { + self.remove_file(&format!("{dir}/{file}"))?; + } + list.dirs + .iter() + .map(|subdir| format!("{dir}/{subdir}")) + .for_each(|p| { + dirs_to_delete.push(p.clone()); + dirs_to_walk.push(p) + }); + } + // Consume them in the reverse order discovered, so bottom up + for dir in dirs_to_delete.iter().rev() { + let full_path = self.base_path.join(dir); + trace!(?dir, "rmdir"); + self.sftp + .rmdir(&full_path) + .map_err(|err| self.ssh_error(err, dir))?; + } + Ok(()) + } + + fn chdir(&self, relpath: &str) -> Arc { + let base_path = self.base_path.join(relpath); + let url = self.url.join(relpath).expect("join URL"); + Arc::new(Protocol { + url, + sftp: Arc::clone(&self.sftp), + base_path, + }) + } + + fn url(&self) -> &Url { + &self.url + } +} + +impl fmt::Debug for Protocol { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("sftp::Protocol") + .field("url", &self.url) + .finish() + } +} + +impl From for Kind { + fn from(kind: ssh2::FileType) -> Self { + use ssh2::FileType::*; + match kind { + RegularFile => Kind::File, + Directory => Kind::Dir, + Symlink => Kind::Symlink, + _ => Kind::Unknown, + } + } +} + +impl From for ErrorKind { + fn from(code: ssh2::ErrorCode) -> Self { + // Map other errors to io::Error that aren't handled by libssh. + // + // See https://github.com/alexcrichton/ssh2-rs/issues/244. + match code { + ssh2::ErrorCode::SFTP(libssh2_sys::LIBSSH2_FX_NO_SUCH_FILE) + | ssh2::ErrorCode::SFTP(libssh2_sys::LIBSSH2_FX_NO_SUCH_PATH) => ErrorKind::NotFound, + // TODO: Others + _ => ErrorKind::Other, + } + } +} diff --git a/tests/cli.rs b/tests/cli.rs index bd3babf1..df0a5756 100644 --- a/tests/cli.rs +++ b/tests/cli.rs @@ -31,7 +31,9 @@ use url::Url; use conserve::test_fixtures::{ScratchArchive, TreeFixture}; fn run_conserve() -> Command { - Command::cargo_bin("conserve").expect("locate conserve binary") + let mut command = Command::cargo_bin("conserve").expect("locate conserve binary"); + command.env_remove("RUST_LOG"); + command } #[test] diff --git a/tests/damage.rs b/tests/damage.rs index 207b4d95..b887f20e 100644 --- a/tests/damage.rs +++ b/tests/damage.rs @@ -46,7 +46,7 @@ use tracing_test::traced_test; use conserve::counters::Counter; use conserve::monitor::test::TestMonitor; -use conserve::transport::open_local_transport; +use conserve::transport::Transport; use conserve::{ backup, restore, Apath, Archive, BackupOptions, BandId, BandSelectionPolicy, BlockHash, EntryTrait, Exclude, RestoreOptions, ValidateOptions, @@ -113,9 +113,7 @@ fn backup_after_damage( action.damage(&location.to_path(&archive_dir)); // Open the archive again to avoid cache effects. - let archive = - Archive::open(conserve::transport::open_local_transport(archive_dir.path()).unwrap()) - .expect("open archive"); + let archive = Archive::open(Transport::local(archive_dir.path())).expect("open archive"); // A second backup should succeed. changes.apply(&source_dir); @@ -264,9 +262,7 @@ impl DamageLocation { .join(BandId::from(*band_id).to_string()) .join("BANDTAIL"), DamageLocation::Block(block_index) => { - let archive = - Archive::open(open_local_transport(archive_dir).expect("open transport")) - .expect("open archive"); + let archive = Archive::open(Transport::local(archive_dir)).expect("open archive"); let block_dir = archive.block_dir(); let block_hash = block_dir .blocks(TestMonitor::arc()) diff --git a/tests/failpoints.rs b/tests/failpoints.rs index eb1ecf92..43d22895 100644 --- a/tests/failpoints.rs +++ b/tests/failpoints.rs @@ -17,18 +17,19 @@ use std::path::Path; use assert_fs::TempDir; use conserve::monitor::test::TestMonitor; -use conserve::transport::open_local_transport; use fail::FailScenario; +use crate::transport::Transport; use conserve::*; #[test] fn create_dir_permission_denied() { let scenario = FailScenario::setup(); fail::cfg("restore::create-dir", "return").unwrap(); - let archive = - Archive::open(open_local_transport(Path::new("testdata/archive/simple/v0.6.10")).unwrap()) - .unwrap(); + let archive = Archive::open(Transport::local(Path::new( + "testdata/archive/simple/v0.6.10", + ))) + .unwrap(); let options = RestoreOptions { ..RestoreOptions::default() }; diff --git a/tests/format_flags.rs b/tests/format_flags.rs index 3e245f2b..ade487d8 100644 --- a/tests/format_flags.rs +++ b/tests/format_flags.rs @@ -5,6 +5,7 @@ use conserve::test_fixtures::ScratchArchive; use conserve::*; +use transport::WriteMode; #[test] // This can be updated if/when Conserve does start writing some flags by default. @@ -42,8 +43,12 @@ fn unknown_format_flag_fails_to_open() { "format_flags": ["wibble"] }); af.transport() - .sub_transport("b0000") - .write_file("BANDHEAD", &serde_json::to_vec(&head).unwrap()) + .chdir("b0000") + .write_file( + "BANDHEAD", + &serde_json::to_vec(&head).unwrap(), + WriteMode::CreateNew, + ) .unwrap(); let err = Band::open(&af, BandId::zero()).unwrap_err(); diff --git a/tests/proptest_changes.rs b/tests/proptest_changes.rs index ae54a9f8..7ced94fa 100644 --- a/tests/proptest_changes.rs +++ b/tests/proptest_changes.rs @@ -11,6 +11,8 @@ // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. +#![allow(non_local_definitions)] // side effect of proptest macro? + //! Try backing up and restoring various sequences of changes to a tree. use std::collections::BTreeMap; diff --git a/tests/restore.rs b/tests/restore.rs index 71ca40fb..ad7de612 100644 --- a/tests/restore.rs +++ b/tests/restore.rs @@ -14,13 +14,9 @@ use std::cell::RefCell; use std::fs::{create_dir, write}; -#[cfg(unix)] -use std::fs::{read_link, symlink_metadata}; -use std::path::PathBuf; use conserve::counters::Counter; use conserve::monitor::test::TestMonitor; -use filetime::{set_symlink_file_times, FileTime}; use tempfile::TempDir; use conserve::test_fixtures::ScratchArchive; @@ -194,6 +190,11 @@ fn exclude_files() { #[test] #[cfg(unix)] fn restore_symlink() { + use std::fs::{read_link, symlink_metadata}; + use std::path::PathBuf; + + use filetime::{set_symlink_file_times, FileTime}; + use conserve::monitor::test::TestMonitor; let af = ScratchArchive::new(); diff --git a/tests/s3_integration.rs b/tests/s3_integration.rs index 3ad1d75e..5dfc441b 100644 --- a/tests/s3_integration.rs +++ b/tests/s3_integration.rs @@ -1,4 +1,4 @@ -// Copyright 2023 Martin Pool +// Copyright 2023-2024 Martin Pool #![cfg(feature = "s3-integration-test")] @@ -97,23 +97,23 @@ impl TempBucket { .expect("Create bucket"); println!("Created bucket {bucket_name}"); + let lifecycle_configuration = BucketLifecycleConfiguration::builder() + .rules( + LifecycleRule::builder() + .id("delete-after-7d") + .filter(LifecycleRuleFilter::builder().prefix("").build()) + .status(ExpirationStatus::Enabled) + .expiration(LifecycleExpiration::builder().days(7).build()) + .build() + .expect("Build S3 lifecycle rule"), + ) + .build() + .expect("Build S3 lifecycle configuration"); + dbg!(&lifecycle_configuration); client .put_bucket_lifecycle_configuration() .bucket(bucket_name) - .lifecycle_configuration( - BucketLifecycleConfiguration::builder() - .rules( - LifecycleRule::builder() - .id("delete-after-7d") - .filter(LifecycleRuleFilter::ObjectSizeGreaterThan(0)) - .status(ExpirationStatus::Enabled) - .expiration(LifecycleExpiration::builder().days(7).build()) - .build() - .expect("Build S3 lifecycle rule"), - ) - .build() - .expect("Build S3 lifecycle configuration"), - ) + .lifecycle_configuration(lifecycle_configuration) .send() .await .expect("Set bucket lifecycle"); diff --git a/tests/transport.rs b/tests/transport.rs index 8d0f9dd5..d61fdb41 100644 --- a/tests/transport.rs +++ b/tests/transport.rs @@ -10,14 +10,16 @@ // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. +use std::path::Path; + use assert_fs::prelude::*; use url::Url; -use conserve::transport::{open_transport, ListDir}; +use conserve::transport::{ListDir, Transport}; #[test] fn open_local() { - open_transport("/backup").unwrap(); + Transport::local(Path::new("/backup")); } #[test] @@ -29,7 +31,7 @@ fn list_dir_names() { let url = Url::from_directory_path(temp.path()).unwrap(); dbg!(&url); - let transport = open_transport(url.as_str()).unwrap(); + let transport = Transport::new(url.as_str()).unwrap(); dbg!(&transport); let ListDir { mut files, dirs } = transport.list_dir("").unwrap(); @@ -49,22 +51,22 @@ fn parse_location_urls() { "c:/backup/repo", r"c:\backup\repo\", ] { - assert!(open_transport(n).is_ok(), "Failed to parse {n:?}"); + assert!(Transport::new(n).is_ok(), "Failed to parse {n:?}"); } } #[test] fn unsupported_location_urls() { assert_eq!( - open_transport("http://conserve.example/repo") + Transport::new("http://conserve.example/repo") .unwrap_err() .to_string(), - "Unsupported URL scheme \"http\"" + "Unsupported URL scheme: http://conserve.example/repo" ); assert_eq!( - open_transport("ftp://user@conserve.example/repo") + Transport::new("ftp://user@conserve.example/repo") .unwrap_err() .to_string(), - "Unsupported URL scheme \"ftp\"" + "Unsupported URL scheme: ftp://user@conserve.example/repo" ); }