diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 1e0339a8..92dd5958 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -52,8 +52,7 @@ jobs: fail-fast: true matrix: os: [ubuntu-latest, windows-latest, macOS-latest] - features: ["", "s3"] - version: [stable, nightly, "1.74"] + version: [stable, nightly, "1.79"] steps: - uses: actions/checkout@v4 @@ -61,8 +60,6 @@ jobs: with: toolchain: ${{ matrix.version }} - uses: Swatinem/rust-cache@v2 - with: - key: ${{ matrix.features }} - name: Show version run: | rustup show @@ -70,12 +67,10 @@ jobs: rustc --version - name: Build run: > - cargo build --all-targets --no-default-features --features=${{ - matrix.features }} --features fail/failpoints + cargo build --all-targets --features fail/failpoints - name: Test run: > - cargo test --no-default-features --features=${{ matrix.features }} - --features fail/failpoints -- --include-ignored + cargo test --features fail/failpoints -- --include-ignored # Run rustfmt separately so that it does not block test results rustfmt: @@ -92,7 +87,7 @@ jobs: # S3 integration tests can't run from CI because they need credentials, but we # can at least make sure that they compile. - build-large-tests: + build-all-features: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 @@ -101,22 +96,9 @@ jobs: toolchain: stable - uses: swatinem/rust-cache@v2 - name: cargo build - run: cargo build --all-targets --features s3,s3-integration-test + run: cargo build --all-targets --all-features clippy: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - uses: dtolnay/rust-toolchain@master - with: - toolchain: stable - - uses: swatinem/rust-cache@v2 - - name: clippy - run: cargo clippy --all-targets -- --deny clippy::all - - # Not all features can be run: s3 integration tests require credentials. - # But all of them should compile. - check-all-features: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 @@ -149,8 +131,8 @@ jobs: tool: cargo-mutants - name: Mutants in diff run: > - cargo mutants --no-shuffle -vV --in-diff git.diff --in-place -- --features - fail/failpoints + cargo mutants --no-shuffle -vV --in-diff git.diff --in-place -- + --features fail/failpoints - name: Archive mutants.out uses: actions/upload-artifact@v4 if: always() @@ -165,7 +147,7 @@ jobs: # We want to see all the missed mutants so don't fail fast. fail-fast: false matrix: - shard: [0, 1, 2, 3] + shard: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] steps: - name: Checkout uses: actions/checkout@v4 @@ -181,11 +163,10 @@ jobs: # Don't use the S3 features because they require AWS credentials for realistic # testing. run: | - cargo mutants --no-shuffle -vV --cargo-arg=--no-default-features \ - --in-place \ - --baseline=skip --shard ${{ matrix.shard }}/4 \ - -- \ - --features fail/failpoints + cargo mutants --no-shuffle -vV --in-place --baseline=skip \ + --shard ${{ matrix.shard }}/10 \ + -- \ + --features fail/failpoints - name: Archive results uses: actions/upload-artifact@v4 if: always() diff --git a/Cargo.lock b/Cargo.lock index 71d82b5d..f7a36a25 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -24,7 +24,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ "cfg-if", - "getrandom", "once_cell", "version_check", "zerocopy", @@ -188,9 +187,9 @@ dependencies = [ [[package]] name = "aws-credential-types" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e16838e6c9e12125face1c1eff1343c75e3ff540de98ff7ebd61874a89bcfeb9" +checksum = "60e8f6b615cb5fc60a98132268508ad104310f0cfb25a1c22eee76efdf9154da" dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api", @@ -200,15 +199,16 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.2.0" +version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4963ac9ff2d33a4231b3806c1c69f578f221a9cabb89ad2bde62ce2b442c8a7" +checksum = "a10d5c055aa540164d9561a0e2e74ad30f0dcf7393c3a92f6733ddf9c5762468" dependencies = [ "aws-credential-types", "aws-sigv4", "aws-smithy-async", "aws-smithy-eventstream", "aws-smithy-http", + "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", "aws-types", @@ -216,6 +216,7 @@ dependencies = [ "fastrand", "http 0.2.12", "http-body 0.4.6", + "once_cell", "percent-encoding", "pin-project-lite", "tracing", @@ -224,9 +225,9 @@ dependencies = [ [[package]] name = "aws-sdk-s3" -version = "1.23.0" +version = "1.56.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4576ae7eb91e4d0ca76a3b443c3be979322fc01836cad7908534ae507fa41d99" +checksum = "cecd672c8d4265fd4fbecacd4a479180e616881bbe639250cf81ddb604e4c301" dependencies = [ "ahash", "aws-credential-types", @@ -326,9 +327,9 @@ dependencies = [ [[package]] name = "aws-sigv4" -version = "1.2.0" +version = "1.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11d6f29688a4be9895c0ba8bef861ad0c0dac5c15e9618b9b7a6c233990fc263" +checksum = "cc8db6904450bafe7473c6ca9123f88cc11089e41a025408f992db4e22d3be68" dependencies = [ "aws-credential-types", "aws-smithy-eventstream", @@ -366,9 +367,9 @@ dependencies = [ [[package]] name = "aws-smithy-checksums" -version = "0.60.7" +version = "0.60.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83fa43bc04a6b2441968faeab56e68da3812f978a670a5db32accbdcafddd12f" +checksum = "598b1689d001c4d4dc3cb386adb07d37786783aee3ac4b324bcadac116bf3d23" dependencies = [ "aws-smithy-http", "aws-smithy-types", @@ -387,9 +388,9 @@ dependencies = [ [[package]] name = "aws-smithy-eventstream" -version = "0.60.4" +version = "0.60.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6363078f927f612b970edf9d1903ef5cef9a64d1e8423525ebb1f0a1633c858" +checksum = "cef7d0a272725f87e51ba2bf89f8c21e4df61b9e49ae1ac367a6d69916ef7c90" dependencies = [ "aws-smithy-types", "bytes", @@ -398,9 +399,9 @@ dependencies = [ [[package]] name = "aws-smithy-http" -version = "0.60.7" +version = "0.60.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f10fa66956f01540051b0aa7ad54574640f748f9839e843442d99b970d3aff9" +checksum = "5c8bc3e8fdc6b8d07d976e301c02fe553f72a39b7a9fea820e023268467d7ab6" dependencies = [ "aws-smithy-eventstream", "aws-smithy-runtime-api", @@ -438,9 +439,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.3.0" +version = "1.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de34bcfa1fb3c82a80e252a753db34a6658e07f23d3a5b3fc96919518fa7a3f5" +checksum = "a065c0fe6fdbdf9f11817eb68582b2ab4aff9e9c39e986ae48f7ec576c6322db" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -452,6 +453,7 @@ dependencies = [ "http 0.2.12", "http-body 0.4.6", "http-body 1.0.0", + "httparse", "hyper", "hyper-rustls", "once_cell", @@ -464,9 +466,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "1.4.0" +version = "1.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4cc56a5c96ec741de6c5e6bf1ce6948be969d6506dfa9c39cffc284e31e4979b" +checksum = "e086682a53d3aa241192aa110fa8dfce98f2f5ac2ead0de84d41582c7e8fdb96" dependencies = [ "aws-smithy-async", "aws-smithy-types", @@ -481,9 +483,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.1.8" +version = "1.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "abe14dceea1e70101d38fbf2a99e6a34159477c0fb95e68e05c66bd7ae4c3729" +checksum = "147100a7bea70fa20ef224a6bad700358305f5dc0f84649c53769761395b355b" dependencies = [ "base64-simd", "bytes", @@ -507,24 +509,23 @@ dependencies = [ [[package]] name = "aws-smithy-xml" -version = "0.60.7" +version = "0.60.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "872c68cf019c0e4afc5de7753c4f7288ce4b71663212771bf5e4542eb9346ca9" +checksum = "ab0b0166827aa700d3dc519f72f8b3a91c35d0b8d042dc5d643a91e6f80648fc" dependencies = [ "xmlparser", ] [[package]] name = "aws-types" -version = "1.2.0" +version = "1.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a43b56df2c529fe44cb4d92bd64d0479883fb9608ff62daede4df5405381814" +checksum = "5221b91b3e441e6675310829fd8984801b772cb1546ef6c0e54dec9f1ac13fef" dependencies = [ "aws-credential-types", "aws-smithy-async", "aws-smithy-runtime-api", "aws-smithy-types", - "http 0.2.12", "rustc_version", "tracing", ] @@ -635,6 +636,12 @@ dependencies = [ "serde", ] +[[package]] +name = "bumpalo" +version = "3.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" + [[package]] name = "bytes" version = "1.7.1" @@ -662,9 +669,12 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.94" +version = "1.1.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17f6e324229dc011159fcc089755d1e2e216a90d43a7dea6853ca740b84f35e7" +checksum = "b16803a61b81d9eabb7eae2588776c4c1e584b738ede45fdbb4c972cec1e9945" +dependencies = [ + "shlex", +] [[package]] name = "cfg-if" @@ -766,6 +776,7 @@ dependencies = [ "indoc", "itertools", "lazy_static", + "libssh2-sys", "lru", "mutants", "nix", @@ -783,6 +794,7 @@ dependencies = [ "serde", "serde_json", "snap", + "ssh2", "strum", "strum_macros", "tempfile", @@ -797,6 +809,7 @@ dependencies = [ "unix_mode", "url", "uzers", + "whoami", ] [[package]] @@ -1092,7 +1105,7 @@ checksum = "1ee447700ac8aa0b2f2bd7bc4462ad686ba06baa6727ac149a2d6277f0d240fd" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.4.1", "windows-sys 0.52.0", ] @@ -1509,6 +1522,15 @@ version = "2.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b248f5224d1d606005e02c97f5aa4e88eeb230488bcc03bc9ca4d7991399f2b5" +[[package]] +name = "instant" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" +dependencies = [ + "cfg-if", +] + [[package]] name = "io-lifetimes" version = "1.0.11" @@ -1535,6 +1557,15 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" +[[package]] +name = "js-sys" +version = "0.3.72" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a88f1bda2bd75b0452a14784937d796722fdebfe50df998aeb3f0b7603019a9" +dependencies = [ + "wasm-bindgen", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -1553,6 +1584,32 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" +[[package]] +name = "libssh2-sys" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dc8a030b787e2119a731f1951d6a773e2280c660f8ec4b0f5e1505a386e71ee" +dependencies = [ + "cc", + "libc", + "libz-sys", + "openssl-sys", + "pkg-config", + "vcpkg", +] + +[[package]] +name = "libz-sys" +version = "1.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2d16453e800a8cf6dd2fc3eb4bc99b786a9b90c663b8559a5b1a041bf89e472" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linux-raw-sys" version = "0.3.8" @@ -1732,7 +1789,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "210b363fa6901c372f264fa32ef3710c0e86328901deaed31294fecfd51e848b" dependencies = [ "atty", - "parking_lot", + "parking_lot 0.12.1", "terminal_size 0.2.6", "yansi", ] @@ -1758,6 +1815,18 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "openssl-sys" +version = "0.9.103" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f9e8deee91df40a943c71b917e5874b951d32a802526c85721ce3b776c929d6" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "outref" version = "0.5.1" @@ -1781,6 +1850,17 @@ dependencies = [ "sha2", ] +[[package]] +name = "parking_lot" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core 0.8.6", +] + [[package]] name = "parking_lot" version = "0.12.1" @@ -1788,7 +1868,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" dependencies = [ "lock_api", - "parking_lot_core", + "parking_lot_core 0.9.9", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc" +dependencies = [ + "cfg-if", + "instant", + "libc", + "redox_syscall 0.2.16", + "smallvec", + "winapi", ] [[package]] @@ -1799,7 +1893,7 @@ checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.4.1", "smallvec", "windows-targets 0.48.5", ] @@ -1832,6 +1926,12 @@ dependencies = [ "spki", ] +[[package]] +name = "pkg-config" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "953ec861398dccce10c670dfeaf3ec4911ca479e9c02154b3a215178c5f566f2" + [[package]] name = "powerfmt" version = "0.2.0" @@ -1895,9 +1995,9 @@ dependencies = [ [[package]] name = "proptest" -version = "1.4.0" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31b476131c3c86cb68032fdc5cb6d5a1045e3e42d96b69fa599fd77701e1f5bf" +checksum = "b4c2511913b88df1637da85cc8d96ec8e43a3f8bb8ccb71ee1ac240d6f3df58d" dependencies = [ "bit-set", "bit-vec", @@ -1915,13 +2015,13 @@ dependencies = [ [[package]] name = "proptest-derive" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cf16337405ca084e9c78985114633b6827711d22b9e6ef6c6c0d665eb3f0b6e" +checksum = "6ff7ff745a347b87471d859a377a9a404361e7efc2a971d73424a6d183c0fc77" dependencies = [ "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.58", ] [[package]] @@ -2004,6 +2104,15 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73ea134c32fe12df286020949d57d052a90c4001f2dbec4c1c074f39bcb7fc8c" +[[package]] +name = "redox_syscall" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "redox_syscall" version = "0.4.1" @@ -2013,6 +2122,15 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "redox_syscall" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b6dfecf2c74bce2466cabf93f6664d6998a69eb21e39f4207930065b27b771f" +dependencies = [ + "bitflags 2.5.0", +] + [[package]] name = "regex" version = "1.10.4" @@ -2370,6 +2488,12 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "signal-hook-registry" version = "1.4.1" @@ -2436,6 +2560,18 @@ dependencies = [ "der", ] +[[package]] +name = "ssh2" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7fe461910559f6d5604c3731d00d2aafc4a83d1665922e280f42f9a168d5455" +dependencies = [ + "bitflags 1.3.2", + "libc", + "libssh2-sys", + "parking_lot 0.11.2", +] + [[package]] name = "strsim" version = "0.11.1" @@ -2622,7 +2758,7 @@ dependencies = [ "libc", "mio", "num_cpus", - "parking_lot", + "parking_lot 0.12.1", "pin-project-lite", "signal-hook-registry", "socket2", @@ -2877,6 +3013,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.4" @@ -2923,6 +3065,88 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasite" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" + +[[package]] +name = "wasm-bindgen" +version = "0.2.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "128d1e363af62632b8eb57219c8fd7877144af57558fb2ef0368d0087bddeb2e" +dependencies = [ + "cfg-if", + "once_cell", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb6dd4d3ca0ddffd1dd1c9c04f94b868c37ff5fac97c30b97cff2d74fce3a358" +dependencies = [ + "bumpalo", + "log", + "once_cell", + "proc-macro2", + "quote", + "syn 2.0.58", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e79384be7f8f5a9dd5d7167216f022090cf1f9ec128e6e6a482a2cb5c5422c56" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.58", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65fc09f10666a9f147042251e0dda9c18f166ff7de300607007e96bdebc1068d" + +[[package]] +name = "web-sys" +version = "0.3.72" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6488b90108c040df0fe62fa815cbdee25124641df01814dd7282749234c6112" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "whoami" +version = "1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "372d5b87f58ec45c384ba03563b03544dc5fadc3983e434b286913f5b4a9bb6d" +dependencies = [ + "redox_syscall 0.5.7", + "wasite", + "web-sys", +] + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index f347dc0c..51534dfd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,20 @@ repository = "https://github.com/sourcefrog/conserve/" version = "24.8.0" rust-version = "1.79" +[features] +default = ["s3", "sftp"] +s3 = [ + "dep:aws-config", + "dep:aws-sdk-s3", + "dep:aws-types", + "dep:base64", + "dep:crc32c", + "dep:futures", + "dep:tokio", +] +s3-integration-test = ["s3"] +sftp = ["dep:ssh2", "dep:libssh2-sys"] + [[bin]] doc = false name = "conserve" @@ -18,7 +32,7 @@ name = "conserve" [dependencies] assert_matches = "1.5.0" aws-config = { version = "1.1", optional = true } -aws-sdk-s3 = { version = "1.21", optional = true } +aws-sdk-s3 = { version = "1.56", optional = true } aws-types = { version = "1.1", optional = true } base64 = { version = "0.22", optional = true } blake2-rfc = "0.2.18" @@ -35,6 +49,7 @@ hex = "0.4.2" hostname = "0.3" itertools = "0.12" lazy_static = "1.4.0" +libssh2-sys = { version = "0.3.0", optional = true } lru = "0.12" mutants = "0.0.3" rand = "0.8" @@ -45,12 +60,13 @@ semver = "1" serde = { version = "1", features = ["derive"] } serde_json = "1" snap = "1.0.0" +ssh2 = { version = "0.9.4", optional = true } strum = "0.26" strum_macros = "0.26" tempfile = "3" thiserror = "1.0.19" thousands = "0.2.0" -time = { version = "0.3.28", features = [ +time = { version = "0.3.35", features = [ "local-offset", "macros", "serde", @@ -61,7 +77,7 @@ tracing = "0.1" tracing-appender = "0.2" unix_mode = "0.1" url = "2.2.2" -indoc = "2.0" +whoami = "1.5.2" [target.'cfg(unix)'.dependencies] uzers = "0.11" @@ -86,28 +102,15 @@ assert_cmd = "2.0" assert_fs = "1.0" cp_r = "0.5" dir-assert = "0.2" +indoc = "2.0" predicates = "3" pretty_assertions = "1.0" -proptest = "1.0" -proptest-derive = "0.4" +proptest = "1.5" +proptest-derive = "0.5" +rand = "0.8" rstest = { version = "0.19", default-features = false } tracing-test = { version = "0.2", features = ["no-env-filter"] } -[features] -default = ["s3"] -# blake2-rfc/simd_asm needs nightly, so it's no longer a feature here so that --all-features works on stable. -# blake2_simd_asm = ["blake2-rfc/simd_asm"] -s3 = [ - "dep:aws-config", - "dep:aws-sdk-s3", - "dep:aws-types", - "dep:base64", - "dep:crc32c", - "dep:futures", - "dep:tokio", -] -s3-integration-test = ["s3"] - [lib] doctest = false diff --git a/NEWS.md b/NEWS.md index c6b3d851..187ac88b 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,5 +1,9 @@ # Conserve release history +## Unreleased + +- Changed: S3 is no longer built by default, because it adds many dependencies. It can be turned on again with `--features s3`. + ## 24.8.0 - Fixed: `restore --only` specifying a subdirectory no longer fails due to parent directories missing from the destination. diff --git a/README.md b/README.md index 690853e7..c79db9ed 100644 --- a/README.md +++ b/README.md @@ -114,9 +114,6 @@ automatically excluded from backups. From 23.9 Conserve supports storing backups in Amazon S3. AWS IAM credentials are read from the standard sources: the environment, config file, or, on EC2, the instance metadata service. -S3 support can be turned off by passing `cargo install --no-default-features`. (There's no -runtime impact if it is not used, but it does add a lot of build-time dependencies.) - To use this, just specify an S3 URL for the archive location. The bucket must already exist. conserve init s3://my-bucket/ @@ -141,9 +138,12 @@ To install from a git checkout, run [rust]: https://rustup.rs/ -On nightly Rust only, and only on x86_64, you can enable a slight speed-up with +### Optional features + +The following features are enabled by default, but can be turned off with `cargo install --no-default-features` if they are not needed: - cargo +nightly install -f --path . --features blake2-rfc/simd_asm +- `s3`: support for storing backups in Amazon S3 (or compatible services) +- `sftp`: support for storing backups on SFTP servers, addressed with `sftp://` URLs ### Arch Linux diff --git a/doc/sftp.md b/doc/sftp.md new file mode 100644 index 00000000..2cf4a1f2 --- /dev/null +++ b/doc/sftp.md @@ -0,0 +1,12 @@ +# SFTP support + +Conserve can read and write archives over SFTP. + +To use this, just specify an SFTP URL, like `sftp://user@host/path`, for the archive location. + + conserve init sftp://user@host/path + conserve backup sftp://user@host/path ~ + +If no username is present in the URL, Conserve will use the current user's username. + +Currently, Conserve only supports agent authentication. diff --git a/src/archive.rs b/src/archive.rs index c241cf83..e30d935a 100644 --- a/src/archive.rs +++ b/src/archive.rs @@ -26,7 +26,7 @@ use tracing::{debug, warn}; use crate::jsonio::{read_json, write_json}; use crate::monitor::Monitor; -use crate::transport::local::LocalTransport; +use crate::transport::Transport; use crate::*; const HEADER_FILENAME: &str = "CONSERVE"; @@ -39,7 +39,7 @@ pub struct Archive { pub(crate) block_dir: Arc, /// Transport to the root directory of the archive. - transport: Arc, + transport: Transport, } #[derive(Debug, Serialize, Deserialize)] @@ -56,24 +56,21 @@ pub struct DeleteOptions { impl Archive { /// Make a new archive in a local directory. pub fn create_path(path: &Path) -> Result { - Archive::create(Arc::new(LocalTransport::new(path))) + Archive::create(Transport::local(path)) } /// Make a new archive in a new directory accessed by a Transport. - pub fn create(transport: Arc) -> Result { + pub fn create(transport: Transport) -> Result { transport.create_dir("")?; let names = transport.list_dir("")?; if !names.files.is_empty() || !names.dirs.is_empty() { return Err(Error::NewArchiveDirectoryNotEmpty); } - let block_dir = Arc::new(BlockDir::create(transport.sub_transport(BLOCK_DIR))?); - write_json( - &transport, - HEADER_FILENAME, - &ArchiveHeader { - conserve_archive_version: String::from(ARCHIVE_VERSION), - }, - )?; + let block_dir = Arc::new(BlockDir::create(transport.chdir(BLOCK_DIR))?); + let header = ArchiveHeader { + conserve_archive_version: String::from(ARCHIVE_VERSION), + }; + write_json(&transport, HEADER_FILENAME, &header)?; Ok(Archive { block_dir, transport, @@ -84,18 +81,18 @@ impl Archive { /// /// Checks that the header is correct. pub fn open_path(path: &Path) -> Result { - Archive::open(Arc::new(LocalTransport::new(path))) + Archive::open(Transport::local(path)) } - pub fn open(transport: Arc) -> Result { + pub fn open(transport: Transport) -> Result { let header: ArchiveHeader = - read_json(transport.as_ref(), HEADER_FILENAME)?.ok_or(Error::NotAnArchive)?; + read_json(&transport, HEADER_FILENAME)?.ok_or(Error::NotAnArchive)?; if header.conserve_archive_version != ARCHIVE_VERSION { return Err(Error::UnsupportedArchiveVersion { version: header.conserve_archive_version, }); } - let block_dir = Arc::new(BlockDir::open(transport.sub_transport(BLOCK_DIR))); + let block_dir = Arc::new(BlockDir::open(transport.chdir(BLOCK_DIR))); debug!(?header, "Opened archive"); Ok(Archive { block_dir, @@ -108,7 +105,7 @@ impl Archive { } pub fn band_exists(&self, band_id: BandId) -> Result { - self.transport + self.transport() .is_file(&format!("{}/{}", band_id, crate::BAND_HEAD_FILENAME)) .map_err(Error::from) } @@ -138,8 +135,8 @@ impl Archive { Ok(band_ids) } - pub(crate) fn transport(&self) -> &dyn Transport { - self.transport.as_ref() + pub(crate) fn transport(&self) -> &Transport { + &self.transport } pub fn resolve_band_id(&self, band_selection: BandSelectionPolicy) -> Result { diff --git a/src/band.rs b/src/band.rs index a207b48a..6da265eb 100644 --- a/src/band.rs +++ b/src/band.rs @@ -24,6 +24,7 @@ use std::borrow::Cow; use std::sync::Arc; +use crate::transport::Transport; use itertools::Itertools; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; @@ -75,7 +76,7 @@ pub struct Band { band_id: BandId, /// Transport pointing to the archive directory. - transport: Arc, + transport: Transport, /// Deserialized band head info. head: Head, @@ -143,7 +144,7 @@ impl Band { let band_id = archive .last_band_id()? .map_or_else(BandId::zero, |b| b.next_sibling()); - let transport = archive.transport().sub_transport(&band_id.to_string()); + let transport = archive.transport().chdir(&band_id.to_string()); transport.create_dir("")?; transport.create_dir(INDEX_DIR)?; let band_format_version = if format_flags.is_empty() { @@ -179,9 +180,9 @@ impl Band { /// Open the band with the given id. pub fn open(archive: &Archive, band_id: BandId) -> Result { - let transport = archive.transport().sub_transport(&band_id.to_string()); - let head: Head = read_json(transport.as_ref(), BAND_HEAD_FILENAME)? - .ok_or(Error::BandHeadMissing { band_id })?; + let transport = archive.transport().chdir(&band_id.to_string()); + let head: Head = + read_json(&transport, BAND_HEAD_FILENAME)?.ok_or(Error::BandHeadMissing { band_id })?; if let Some(version) = &head.band_format_version { if !band_version_supported(version) { return Err(Error::UnsupportedBandVersion { @@ -250,17 +251,17 @@ impl Band { } pub fn index_builder(&self) -> IndexWriter { - IndexWriter::new(self.transport.sub_transport(INDEX_DIR)) + IndexWriter::new(self.transport.chdir(INDEX_DIR)) } /// Get read-only access to the index of this band. pub fn index(&self) -> IndexRead { - IndexRead::open(self.transport.sub_transport(INDEX_DIR)) + IndexRead::open(self.transport.chdir(INDEX_DIR)) } /// Return info about the state of this band. pub fn get_info(&self) -> Result { - let tail_option: Option = read_json(self.transport.as_ref(), BAND_TAIL_FILENAME)?; + let tail_option: Option = read_json(&self.transport, BAND_TAIL_FILENAME)?; let start_time = OffsetDateTime::from_unix_timestamp(self.head.start_time).map_err(|_| { Error::InvalidMetadata { diff --git a/src/bin/conserve.rs b/src/bin/conserve.rs index 23eca0ac..6726b52b 100644 --- a/src/bin/conserve.rs +++ b/src/bin/conserve.rs @@ -28,6 +28,7 @@ use time::UtcOffset; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn, Level}; +use crate::transport::Transport; use conserve::termui::{enable_tracing, TermUiMonitor, TraceTimeStyle}; use conserve::*; @@ -327,7 +328,7 @@ impl Command { ..Default::default() }; let stats = backup( - &Archive::open(open_transport(archive)?)?, + &Archive::open(Transport::new(archive)?)?, source, &options, monitor, @@ -338,7 +339,7 @@ impl Command { } Command::Debug(Debug::Blocks { archive }) => { let mut bw = BufWriter::new(stdout); - for hash in Archive::open(open_transport(archive)?)? + for hash in Archive::open(Transport::new(archive)?)? .block_dir() .blocks(monitor)? .collect::>() @@ -353,7 +354,7 @@ impl Command { } Command::Debug(Debug::Referenced { archive }) => { let mut bw = BufWriter::new(stdout); - let archive = Archive::open(open_transport(archive)?)?; + let archive = Archive::open(Transport::new(archive)?)?; for hash in archive.referenced_blocks(&archive.list_band_ids()?, monitor)? { writeln!(bw, "{hash}")?; } @@ -361,7 +362,7 @@ impl Command { Command::Debug(Debug::Unreferenced { archive }) => { print!( "{}", - Archive::open(open_transport(archive)?)? + Archive::open(Transport::new(archive)?)? .unreferenced_blocks(monitor)? .map(|hash| format!("{}\n", hash)) .collect::>() @@ -375,7 +376,7 @@ impl Command { break_lock, no_stats, } => { - let stats = Archive::open(open_transport(archive)?)?.delete_bands( + let stats = Archive::open(Transport::new(archive)?)?.delete_bands( backup, &DeleteOptions { dry_run: *dry_run, @@ -418,7 +419,7 @@ impl Command { break_lock, no_stats, } => { - let archive = Archive::open(open_transport(archive)?)?; + let archive = Archive::open(Transport::new(archive)?)?; let stats = archive.delete_bands( &[], &DeleteOptions { @@ -432,7 +433,7 @@ impl Command { } } Command::Init { archive } => { - Archive::create(open_transport(archive)?)?; + Archive::create(Transport::new(archive)?)?; debug!("Created new archive in {archive:?}"); } Command::Ls { @@ -481,7 +482,7 @@ impl Command { no_stats, } => { let band_selection = band_selection_policy_from_opt(backup); - let archive = Archive::open(open_transport(archive)?)?; + let archive = Archive::open(Transport::new(archive)?)?; let _ = no_stats; // accepted but ignored; we never currently print stats let options = RestoreOptions { exclude: Exclude::from_patterns_and_files(exclude, exclude_from)?, @@ -524,7 +525,7 @@ impl Command { let options = ValidateOptions { skip_block_hashes: *quick, }; - Archive::open(open_transport(archive)?)?.validate(&options, monitor.clone())?; + Archive::open(Transport::new(archive)?)?.validate(&options, monitor.clone())?; if monitor.error_count() != 0 { warn!("Archive has some problems."); } else { @@ -543,7 +544,7 @@ impl Command { } else { Some(*LOCAL_OFFSET.read().unwrap()) }; - let archive = Archive::open(open_transport(archive)?)?; + let archive = Archive::open(Transport::new(archive)?)?; let options = ShowVersionsOptions { newest_first: *newest, tree_size: *sizes, @@ -559,7 +560,7 @@ impl Command { } fn stored_tree_from_opt(archive_location: &str, backup: &Option) -> Result { - let archive = Archive::open(open_transport(archive_location)?)?; + let archive = Archive::open(Transport::new(archive_location)?)?; let policy = band_selection_policy_from_opt(backup); archive.open_stored_tree(policy) } diff --git a/src/blockdir.rs b/src/blockdir.rs index abead85b..d0904b64 100644 --- a/src/blockdir.rs +++ b/src/blockdir.rs @@ -32,11 +32,12 @@ use rayon::prelude::*; use serde::{Deserialize, Serialize}; use tracing::{debug, warn}; use tracing::{instrument, trace}; +use transport::WriteMode; use crate::compress::snappy::{Compressor, Decompressor}; use crate::counters::Counter; use crate::monitor::Monitor; -use crate::transport::ListDir; +use crate::transport::{ListDir, Transport}; use crate::*; // const BLOCKDIR_FILE_NAME_LEN: usize = crate::BLAKE_HASH_SIZE_BYTES * 2; @@ -65,7 +66,7 @@ pub struct Address { /// A readable, writable directory within a band holding data blocks. #[derive(Debug)] pub struct BlockDir { - transport: Arc, + transport: Transport, pub stats: BlockDirStats, // TODO: There are fancier caches and they might help, but this one works, and Stretto did not work for me. cache: RwLock>, @@ -85,7 +86,7 @@ pub fn block_relpath(hash: &BlockHash) -> String { } impl BlockDir { - pub fn open(transport: Arc) -> BlockDir { + pub fn open(transport: Transport) -> BlockDir { /// Cache this many blocks in memory. // TODO: Change to a cache that tracks the size of stored blocks? // As a safe conservative value, 100 blocks of 20MB each would be 2GB. @@ -102,7 +103,7 @@ impl BlockDir { } } - pub fn create(transport: Arc) -> Result { + pub fn create(transport: Transport) -> Result { transport.create_dir("")?; Ok(BlockDir::open(transport)) } @@ -131,7 +132,19 @@ impl BlockDir { let hex_hash = hash.to_string(); let relpath = block_relpath(&hash); self.transport.create_dir(subdir_relpath(&hex_hash))?; - self.transport.write_file(&relpath, &compressed)?; + match self + .transport + .write_file(&relpath, &compressed, WriteMode::CreateNew) + { + Ok(()) => {} + Err(err) if err.kind() == transport::ErrorKind::AlreadyExists => { + // let's assume the contents are correct + } + Err(err) => { + warn!(?err, ?hash, "Error writing block"); + return Err(err.into()); + } + } stats.written_blocks += 1; stats.uncompressed_bytes += uncomp_len; stats.compressed_bytes += comp_len; @@ -343,7 +356,6 @@ mod test { use tempfile::TempDir; use crate::monitor::test::TestMonitor; - use crate::transport::open_local_transport; use super::*; @@ -353,7 +365,7 @@ mod test { // file with 0 bytes. It's not valid compressed data. We just treat // the block as not present at all. let tempdir = TempDir::new().unwrap(); - let blockdir = BlockDir::open(open_local_transport(tempdir.path()).unwrap()); + let blockdir = BlockDir::open(Transport::local(tempdir.path())); let mut stats = BackupStats::default(); let monitor = TestMonitor::arc(); let hash = blockdir @@ -367,7 +379,7 @@ mod test { assert_eq!(monitor.get_counter(Counter::BlockExistenceCacheHit), 1); // Since we just wrote it, we know it's there. // Open again to get a fresh cache - let blockdir = BlockDir::open(open_local_transport(tempdir.path()).unwrap()); + let blockdir = BlockDir::open(Transport::local(tempdir.path())); let monitor = TestMonitor::arc(); OpenOptions::new() .write(true) @@ -383,15 +395,12 @@ mod test { #[test] fn temp_files_are_not_returned_as_blocks() { let tempdir = TempDir::new().unwrap(); - let blockdir = BlockDir::open(open_local_transport(tempdir.path()).unwrap()); + let blockdir = BlockDir::open(Transport::local(tempdir.path())); let monitor = TestMonitor::arc(); let subdir = tempdir.path().join(subdir_relpath("123")); create_dir(&subdir).unwrap(); - write( - subdir.join(format!("{}{}", TMP_PREFIX, "123123123")), - b"123", - ) - .unwrap(); + // Write a temp file as was created by earlier versions of the code. + write(subdir.join("tmp123123123"), b"123").unwrap(); let blocks = blockdir .blocks(monitor.clone()) .unwrap() @@ -402,7 +411,7 @@ mod test { #[test] fn cache_hit() { let tempdir = TempDir::new().unwrap(); - let blockdir = BlockDir::open(open_local_transport(tempdir.path()).unwrap()); + let blockdir = BlockDir::open(Transport::local(tempdir.path())); let mut stats = BackupStats::default(); let content = Bytes::from("stuff"); let hash = blockdir @@ -432,7 +441,7 @@ mod test { #[test] fn existence_cache_hit() { let tempdir = TempDir::new().unwrap(); - let blockdir = BlockDir::open(open_local_transport(tempdir.path()).unwrap()); + let blockdir = BlockDir::open(Transport::local(tempdir.path())); let mut stats = BackupStats::default(); let content = Bytes::from("stuff"); let monitor = TestMonitor::arc(); @@ -442,7 +451,7 @@ mod test { // reopen let monitor = TestMonitor::arc(); - let blockdir = BlockDir::open(open_local_transport(tempdir.path()).unwrap()); + let blockdir = BlockDir::open(Transport::local(tempdir.path())); assert!(blockdir.contains(&hash, monitor.clone()).unwrap()); assert_eq!(blockdir.stats.cache_hit.load(Relaxed), 0); assert_eq!(monitor.get_counter(Counter::BlockExistenceCacheHit), 0); diff --git a/src/gc_lock.rs b/src/gc_lock.rs index 25fc2869..919d34b1 100644 --- a/src/gc_lock.rs +++ b/src/gc_lock.rs @@ -31,6 +31,8 @@ //! delete but before starting to actually delete them, we check that no //! new bands have been created. +use transport::WriteMode; + use crate::*; pub static GC_LOCK: &str = "GC_LOCK"; @@ -63,7 +65,9 @@ impl GarbageCollectionLock { if archive.transport().is_file(GC_LOCK).unwrap_or(true) { return Err(Error::GarbageCollectionLockHeld); } - archive.transport().write_file(GC_LOCK, b"{}\n")?; + archive + .transport() + .write_file(GC_LOCK, b"{}\n", WriteMode::CreateNew)?; Ok(GarbageCollectionLock { archive, band_id }) } diff --git a/src/index.rs b/src/index.rs index 47a1b94c..c388dcae 100644 --- a/src/index.rs +++ b/src/index.rs @@ -19,16 +19,17 @@ use std::path::Path; use std::sync::Arc; use std::vec; +use crate::transport::Transport; use itertools::Itertools; use time::OffsetDateTime; use tracing::{debug, debug_span, error}; +use transport::WriteMode; use crate::compress::snappy::{Compressor, Decompressor}; use crate::counters::Counter; use crate::entry::KindMeta; use crate::monitor::Monitor; use crate::stats::IndexReadStats; -use crate::transport::local::LocalTransport; use crate::unix_time::FromUnixAndNanos; use crate::*; @@ -177,7 +178,7 @@ impl IndexEntry { /// hunks preserve apath order. pub struct IndexWriter { /// The `i` directory within the band where all files for this index are written. - transport: Arc, + transport: Transport, /// Currently queued entries to be written out, in arbitrary order. entries: Vec, @@ -199,7 +200,7 @@ pub struct IndexWriter { /// Accumulate and write out index entries into files in an index directory. impl IndexWriter { /// Make a new builder that will write files into the given directory. - pub fn new(transport: Arc) -> IndexWriter { + pub fn new(transport: Transport) -> IndexWriter { IndexWriter { transport, entries: Vec::new(), @@ -253,7 +254,8 @@ impl IndexWriter { self.transport.create_dir(&subdir_relpath(self.sequence))?; } let compressed_bytes = self.compressor.compress(&json)?; - self.transport.write_file(&relpath, &compressed_bytes)?; + self.transport + .write_file(&relpath, &compressed_bytes, WriteMode::CreateNew)?; self.hunks_written += 1; monitor.count(Counter::IndexWrites, 1); monitor.count(Counter::IndexWriteCompressedBytes, compressed_bytes.len()); @@ -279,16 +281,17 @@ fn hunk_relpath(hunk_number: u32) -> String { #[derive(Debug, Clone)] pub struct IndexRead { /// Transport pointing to this index directory. - transport: Arc, + transport: Transport, } impl IndexRead { #[allow(unused)] + // TODO: Deprecate, use Transport? pub(crate) fn open_path(path: &Path) -> IndexRead { - IndexRead::open(Arc::new(LocalTransport::new(path))) + IndexRead::open(Transport::local(path)) } - pub(crate) fn open(transport: Arc) -> IndexRead { + pub(crate) fn open(transport: Transport) -> IndexRead { IndexRead { transport } } @@ -321,7 +324,7 @@ impl IndexRead { debug!(?hunks); IndexHunkIter { hunks: hunks.into_iter(), - transport: Arc::clone(&self.transport), + transport: self.transport.clone(), decompressor: Decompressor::new(), stats: IndexReadStats::default(), after: None, @@ -335,7 +338,7 @@ impl IndexRead { pub struct IndexHunkIter { hunks: std::vec::IntoIter, /// The `i` directory within the band where all files for this index are written. - transport: Arc, + transport: Transport, decompressor: Decompressor, pub stats: IndexReadStats, /// If set, yield only entries ordered after this apath. @@ -524,7 +527,7 @@ mod tests { fn setup() -> (TempDir, IndexWriter) { let testdir = TempDir::new().unwrap(); - let ib = IndexWriter::new(Arc::new(LocalTransport::new(testdir.path()))); + let ib = IndexWriter::new(Transport::local(testdir.path())); (testdir, ib) } diff --git a/src/jsonio.rs b/src/jsonio.rs index bc1a6253..54eb6e86 100644 --- a/src/jsonio.rs +++ b/src/jsonio.rs @@ -17,7 +17,7 @@ use std::path::PathBuf; use serde::de::DeserializeOwned; -use crate::transport::{self, Transport}; +use crate::transport::{self, Transport, WriteMode}; #[derive(Debug, thiserror::Error)] pub enum Error { @@ -37,10 +37,9 @@ pub enum Error { pub type Result = std::result::Result; /// Write uncompressed json to a file on a Transport. -pub(crate) fn write_json(transport: &TR, relpath: &str, obj: &T) -> Result<()> +pub(crate) fn write_json(transport: &Transport, relpath: &str, obj: &T) -> Result<()> where T: serde::Serialize, - TR: AsRef, { let mut s: String = serde_json::to_string(&obj).map_err(|source| Error::Json { source, @@ -48,15 +47,14 @@ where })?; s.push('\n'); transport - .as_ref() - .write_file(relpath, s.as_bytes()) + .write_file(relpath, s.as_bytes(), WriteMode::CreateNew) .map_err(Error::from) } /// Read and deserialize uncompressed json from a file on a Transport. /// /// Returns None if the file does not exist. -pub(crate) fn read_json(transport: &dyn Transport, path: &str) -> Result> +pub(crate) fn read_json(transport: &Transport, path: &str) -> Result> where T: DeserializeOwned, { @@ -79,8 +77,6 @@ mod tests { use assert_fs::prelude::*; use serde::{Deserialize, Serialize}; - use crate::transport::local::LocalTransport; - use super::*; #[derive(Debug, Eq, PartialEq, Serialize, Deserialize)] @@ -98,7 +94,7 @@ mod tests { }; let filename = "test.json"; - let transport = LocalTransport::new(temp.path()); + let transport = Transport::local(temp.path()); super::write_json(&transport, filename, &entry).unwrap(); let json_child = temp.child("test.json"); @@ -114,7 +110,7 @@ mod tests { .write_str(r#"{"id": 42, "weather": "cold"}"#) .unwrap(); - let transport = LocalTransport::new(temp.path()); + let transport = Transport::local(temp.path()); let content: TestContents = read_json(&transport, "test.json") .expect("no error") .expect("file exists"); diff --git a/src/lease.rs b/src/lease.rs index 57dcc3c6..9a603e5c 100644 --- a/src/lease.rs +++ b/src/lease.rs @@ -3,7 +3,7 @@ //! Leases controlling write access to an archive. use std::process; -use std::{sync::Arc, time::Duration}; +use std::time::Duration; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -11,19 +11,22 @@ use time::OffsetDateTime; use tracing::{debug, instrument, trace, warn}; use url::Url; -use crate::{transport, Transport}; +use crate::transport::WriteMode; +use crate::{transport, transport::Transport}; pub static LEASE_FILENAME: &str = "LEASE"; /// A lease on an archive. #[derive(Debug)] pub struct Lease { - transport: Arc, + transport: Transport, content: LeaseContent, /// The next refresh after this time must rewrite the lease. next_renewal: OffsetDateTime, /// How often should we renew the lease? renewal_interval: Duration, + /// URL of the lease file. + url: Url, } #[non_exhaustive] @@ -82,10 +85,7 @@ impl Lease { /// Acquire a lease, if one is available. /// /// Returns [Error::Busy] or [Error::Corrupt] if the lease is already held by another process. - pub fn try_acquire( - transport: Arc, - lease_options: &LeaseOptions, - ) -> Result { + pub fn try_acquire(transport: &Transport, lease_options: &LeaseOptions) -> Result { trace!("trying to acquire lease"); let lease_taken = OffsetDateTime::now_utc(); let lease_expiry = lease_taken + lease_options.lease_expiry; @@ -101,15 +101,14 @@ impl Lease { expiry: lease_expiry, nonce: rand::random(), }; - let url = transport.relative_file_url(LEASE_FILENAME); + let url = transport.url().join(LEASE_FILENAME).unwrap(); let mut s: String = serde_json::to_string(&content).expect("serialize lease"); s.push('\n'); - while let Err(err) = transport - .as_ref() - .write_new_file(LEASE_FILENAME, s.as_bytes()) + while let Err(err) = + transport.write_file(LEASE_FILENAME, s.as_bytes(), WriteMode::CreateNew) { if err.kind() == transport::ErrorKind::AlreadyExists { - match Lease::peek(transport.as_ref())? { + match Lease::peek(transport)? { LeaseState::Held(content) => { return Err(Error::Busy { url, @@ -130,10 +129,11 @@ impl Lease { } let next_renewal = lease_taken + Duration::from_secs(60); Ok(Lease { - transport, + transport: transport.clone(), content, next_renewal, renewal_interval: lease_options.renewal_interval, + url, }) } @@ -141,31 +141,37 @@ impl Lease { /// /// This takes the existing lease and returns a new one only if renewal succeeds. pub fn renew(mut self) -> Result { - let state = Lease::peek(self.transport.as_ref())?; - let url = self.transport.relative_file_url(LEASE_FILENAME); + let state = Lease::peek(&self.transport)?; match state { LeaseState::Held(content) => { if content != self.content { warn!(actual = ?content, expected = ?self.content, "lease stolen"); return Err(Error::Stolen { - url, + url: self.url.clone(), content: Box::new(content), }); } } LeaseState::Free => { warn!("lease file disappeared"); - return Err(Error::Disappeared { url }); + return Err(Error::Disappeared { + url: self.url.clone(), + }); } LeaseState::Corrupt(_mtime) => { warn!("lease file is corrupt"); - return Err(Error::Corrupt { url }); + return Err(Error::Corrupt { + url: self.url.clone(), + }); } } self.content.acquired = OffsetDateTime::now_utc(); self.next_renewal = self.content.acquired + self.renewal_interval; let json: String = serde_json::to_string(&self.content).expect("serialize lease"); - self.transport.write_file(LEASE_FILENAME, json.as_bytes())?; + // At this point we know the lease was, at least very recently, still held by us, so + // we can overwrite it. + self.transport + .write_file(LEASE_FILENAME, json.as_bytes(), WriteMode::Overwrite)?; Ok(self) } @@ -173,13 +179,12 @@ impl Lease { pub fn release(self) -> Result<()> { // TODO: Check that it was not stolen? self.transport - .as_ref() .remove_file(LEASE_FILENAME) .map_err(Error::from) } /// Return information about the current leaseholder, if any. - pub fn peek(transport: &dyn Transport) -> Result { + pub fn peek(transport: &Transport) -> Result { // TODO: Atomically get the content and mtime; that should be one call on s3. let metadata = match transport.metadata(LEASE_FILENAME) { Ok(m) => m, @@ -242,7 +247,7 @@ mod test { use tempfile::TempDir; use crate::lease::LEASE_FILENAME; - use crate::transport::open_local_transport; + use crate::transport::Transport; use super::{Lease, LeaseState}; @@ -253,12 +258,13 @@ mod test { renewal_interval: Duration::from_secs(10), }; let tmp = TempDir::new().unwrap(); - let transport = open_local_transport(tmp.path()).unwrap(); - let lease = Lease::try_acquire(transport.clone(), &options).unwrap(); + let transport = Transport::local(tmp.path()); + let lease = Lease::try_acquire(&transport, &options) + .expect("original lease in new dir should be acquired"); assert!(tmp.path().join("LEASE").exists()); let orig_lease_taken = lease.content.acquired; - let peeked = Lease::peek(transport.as_ref()).unwrap(); + let peeked = Lease::peek(&transport).expect("should be able to peek newly acquired lease"); let LeaseState::Held(content) = peeked else { panic!("lease not held") }; @@ -268,8 +274,8 @@ mod test { ); assert_eq!(content.pid, Some(process::id())); - let lease = lease.renew().unwrap(); - let state2 = Lease::peek(transport.as_ref()).unwrap(); + let lease = lease.renew().expect("renew already-held lease"); + let state2 = Lease::peek(&transport).expect("should be able to peek renewed lease"); match state2 { LeaseState::Held(content) => { assert!(content.acquired > orig_lease_taken); @@ -288,8 +294,8 @@ mod test { renewal_interval: Duration::from_secs(10), }; let tmp = TempDir::new().unwrap(); - let transport = open_local_transport(tmp.path()).unwrap(); - let lease = Lease::try_acquire(transport.clone(), &options).unwrap(); + let transport = Transport::local(tmp.path()); + let lease = Lease::try_acquire(&transport, &options).unwrap(); assert!(tmp.path().join("LEASE").exists()); transport.remove_file(LEASE_FILENAME).unwrap(); @@ -305,13 +311,13 @@ mod test { renewal_interval: Duration::from_secs(10), }; let tmp = TempDir::new().unwrap(); - let transport = open_local_transport(tmp.path()).unwrap(); - let lease1 = Lease::try_acquire(transport.clone(), &options).unwrap(); + let transport = Transport::local(tmp.path()); + let lease1 = Lease::try_acquire(&transport, &options).unwrap(); assert!(tmp.path().join("LEASE").exists()); // Delete the lease to make it easy to steal. transport.remove_file(LEASE_FILENAME).unwrap(); - let lease2 = Lease::try_acquire(transport.clone(), &options).unwrap(); + let lease2 = Lease::try_acquire(&transport, &options).unwrap(); assert!(tmp.path().join("LEASE").exists()); // Renewal through the first handle should now fail. @@ -325,7 +331,7 @@ mod test { #[test] fn peek_fixed_lease_content() { let tmp = TempDir::new().unwrap(); - let transport = open_local_transport(tmp.path()).unwrap(); + let transport = Transport::local(tmp.path()); write( tmp.path().join("LEASE"), r#" @@ -339,7 +345,7 @@ mod test { }"#, ) .unwrap(); - let state = Lease::peek(transport.as_ref()).unwrap(); + let state = Lease::peek(&transport).unwrap(); dbg!(&state); match state { LeaseState::Held(content) => { @@ -362,9 +368,9 @@ mod test { #[test] fn peek_corrupt_empty_lease() { let tmp = TempDir::new().unwrap(); - let transport = open_local_transport(tmp.path()).unwrap(); + let transport = Transport::local(tmp.path()); File::create(tmp.path().join("LEASE")).unwrap(); - let state = Lease::peek(transport.as_ref()).unwrap(); + let state = Lease::peek(&transport).unwrap(); match state { LeaseState::Corrupt(mtime) => { let now = time::OffsetDateTime::now_utc(); diff --git a/src/lib.rs b/src/lib.rs index 512fdaf7..88e0eba8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -74,7 +74,6 @@ pub use crate::restore::{restore, RestoreOptions}; pub use crate::show::{show_versions, ShowVersionsOptions}; pub use crate::stats::DeleteStats; pub use crate::stored_tree::StoredTree; -pub use crate::transport::{open_transport, Transport}; pub use crate::tree::{ReadTree, TreeSize}; pub use crate::unix_mode::UnixMode; pub use crate::validate::ValidateOptions; @@ -94,9 +93,6 @@ pub const ARCHIVE_VERSION: &str = "0.6"; pub const SYMLINKS_SUPPORTED: bool = cfg!(target_family = "unix"); -/// Temporary files in the archive have this prefix. -const TMP_PREFIX: &str = "tmp"; - /// Metadata file in the band directory. static BAND_HEAD_FILENAME: &str = "BANDHEAD"; diff --git a/src/owner.rs b/src/owner.rs index 3bf9fb90..165e4a21 100644 --- a/src/owner.rs +++ b/src/owner.rs @@ -14,7 +14,7 @@ //! Tracks the file owner user/group. -// There is potentially a more efficient way to do this, but this approach works +// TODO: There is potentially a more efficient way to do this, but this approach works // better than just saving the uid and gid, so that backups may potentially // be restored on a different system. diff --git a/src/owner/windows.rs b/src/owner/windows.rs index 14e6fc1d..d40772a5 100644 --- a/src/owner/windows.rs +++ b/src/owner/windows.rs @@ -19,7 +19,6 @@ use std::io; use std::path::Path; use super::Owner; -use crate::Result; impl From<&Metadata> for Owner { fn from(_: &Metadata) -> Self { diff --git a/src/stitch.rs b/src/stitch.rs index 68f1f9f9..dc936df1 100644 --- a/src/stitch.rs +++ b/src/stitch.rs @@ -175,10 +175,9 @@ fn previous_existing_band(archive: &Archive, mut band_id: BandId) -> Option &dyn Transport { + pub fn transport(&self) -> &Transport { self.archive.transport() } } diff --git a/src/transport.rs b/src/transport.rs index 794becb3..b85c84b4 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -14,7 +14,7 @@ //! //! Transport operations return std::io::Result to reflect their narrower focus. -use std::path::Path; +use std::path::{Path, PathBuf}; use std::sync::Arc; use std::{error, fmt, io, result}; @@ -26,39 +26,12 @@ use url::Url; use crate::*; pub mod local; -use local::LocalTransport; +#[cfg(feature = "sftp")] +pub mod sftp; #[cfg(feature = "s3")] pub mod s3; -/// Open a `Transport` to access a local directory. -/// -/// `s` may be a local path or a URL. -pub fn open_transport(s: &str) -> crate::Result> { - if let Ok(url) = Url::parse(s) { - match url.scheme() { - "file" => Ok(Arc::new(LocalTransport::new( - &url.to_file_path().expect("extract URL file path"), - ))), - #[cfg(feature = "s3")] - "s3" => Ok(s3::S3Transport::new(&url)?), - d if d.len() == 1 => { - // Probably a Windows path with drive letter, like "c:/thing", not actually a URL. - Ok(Arc::new(LocalTransport::new(Path::new(s)))) - } - other => Err(crate::Error::UrlScheme { - scheme: other.to_owned(), - }), - } - } else { - Ok(Arc::new(LocalTransport::new(Path::new(s)))) - } -} - -pub fn open_local_transport(path: &Path) -> crate::Result> { - Ok(Arc::new(LocalTransport::new(path))) -} - /// Abstracted filesystem IO to access an archive. /// /// This supports operations that are common across local filesystems, SFTP, and cloud storage, and @@ -67,17 +40,66 @@ pub fn open_local_transport(path: &Path) -> crate::Result> { /// A transport has a root location, which will typically be the top directory of the Archive. /// Below that point everything is accessed with a relative path, expressed as a PathBuf. /// -/// All Transports must be `Send + Sync`, so they can be passed across or shared across threads. +/// Transport objects can be cheaply cloned. /// /// Files in Conserve archives have bounded size and fit in memory so this does not need to /// support streaming or partial reads and writes. -pub trait Transport: Send + Sync + std::fmt::Debug { - /// Get the base URL for this transport. - fn base_url(&self) -> &Url; +#[derive(Clone)] +pub struct Transport { + /// The concrete protocol implementation: local, S3, etc. + protocol: Arc, +} + +impl Transport { + /// Open a new local transport addressing a filesystem directory. + pub fn local(path: &Path) -> Self { + Transport { + protocol: Arc::new(local::Protocol::new(path)), + } + } + + /// Open a new transport from a string that might be a URL or local path. + pub fn new(s: &str) -> Result { + if let Ok(url) = Url::parse(s) { + Transport::from_url(&url) + } else { + Ok(Transport::local(Path::new(s))) + } + } + + pub fn from_url(url: &Url) -> Result { + let protocol: Arc = match url.scheme() { + "file" => Arc::new(local::Protocol::new( + &url.to_file_path().expect("extract URL file path"), + )), + d if d.len() == 1 => { + // Probably a Windows path with drive letter, like "c:/thing", not actually a URL. + Arc::new(local::Protocol::new(Path::new(url.as_str()))) + } + + #[cfg(feature = "s3")] + "s3" => Arc::new(s3::Protocol::new(url)?), + + #[cfg(feature = "sftp")] + "sftp" => Arc::new(sftp::Protocol::new(url)?), - /// Get the URL for a file relative to this transport. - fn relative_file_url(&self, path: &str) -> Url { - self.base_url().join(path).expect("join relative file URL") + _other => { + return Err(Error { + kind: ErrorKind::UrlScheme, + url: Some(url.clone()), + source: None, + }) + } + }; + Ok(Transport { protocol }) + } + + /// Get one complete file into a caller-provided buffer. + /// + /// Files in the archive are of bounded size, so it's OK to always read them entirely into + /// memory, and this is simple to support on all implementations. + pub fn read_file(&self, path: &str) -> Result { + self.protocol.read_file(path) } /// List a directory, separating out file and subdirectory names. @@ -85,16 +107,41 @@ pub trait Transport: Send + Sync + std::fmt::Debug { /// Names are in the arbitrary order that they're returned from the transport. /// /// Any error during iteration causes overall failure. - fn list_dir(&self, relpath: &str) -> Result; + pub fn list_dir(&self, relpath: &str) -> Result { + self.protocol.list_dir(relpath) + } - /// Get one complete file into a caller-provided buffer. - /// - /// Files in the archive are of bounded size, so it's OK to always read them entirely into - /// memory, and this is simple to support on all implementations. - fn read_file(&self, path: &str) -> Result; + /// Make a new transport addressing a subdirectory. + pub fn chdir(&self, relpath: &str) -> Self { + Transport { + protocol: self.protocol.chdir(relpath), + } + } + + pub fn write_file(&self, relpath: &str, content: &[u8], mode: WriteMode) -> Result<()> { + self.protocol.write_file(relpath, content, mode) + } + + pub fn create_dir(&self, relpath: &str) -> Result<()> { + self.protocol.create_dir(relpath) + } + + pub fn metadata(&self, relpath: &str) -> Result { + self.protocol.metadata(relpath) + } + + /// Delete a file. + pub fn remove_file(&self, relpath: &str) -> Result<()> { + self.protocol.remove_file(relpath) + } + + /// Delete a directory and all its contents. + pub fn remove_dir_all(&self, relpath: &str) -> Result<()> { + self.protocol.remove_dir_all(relpath) + } /// Check if a regular file exists. - fn is_file(&self, path: &str) -> Result { + pub fn is_file(&self, path: &str) -> Result { match self.metadata(path) { Ok(metadata) => Ok(metadata.kind == Kind::File), Err(err) if err.kind() == ErrorKind::NotFound => Ok(false), @@ -102,26 +149,43 @@ pub trait Transport: Send + Sync + std::fmt::Debug { } } - /// Create a directory, if it does not exist. - /// - /// If the directory already exists, it's not an error. - /// - /// This function does not create missing parent directories. - fn create_dir(&self, relpath: &str) -> Result<()>; + pub fn url(&self) -> &Url { + self.protocol.url() + } + + #[allow(unused)] + fn local_path(&self) -> Option { + self.protocol.local_path() + } +} + +impl fmt::Debug for Transport { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "Transport({})", self.url()) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum WriteMode { + /// Create the file if it does not exist, or overwrite it if it does. + Overwrite, + + /// Create the file if it does not exist, or fail if it does. + CreateNew, +} + +trait Protocol: Send + Sync { + fn read_file(&self, path: &str) -> Result; /// Write a complete file. /// - /// As much as possible, the file should be written atomically so that it is only visible with - /// the complete content. On a local filesystem the content is written to a temporary file and - /// then renamed. - /// If a temporary file is used, the name should start with `crate::TMP_PREFIX`. + /// Depending on the [WriteMode] this may either overwrite existing files, or error. /// - /// If the file exists it is replaced. (Across transports, and particularly on S3, - /// we can't rely on detecting existing files.) - fn write_file(&self, relpath: &str, content: &[u8]) -> Result<()>; - - /// Write a new file and error if the file exists. - fn write_new_file(&self, relpath: &str, content: &[u8]) -> Result<()>; + /// As much as possible, the file should be written atomically so that it is only visible with + /// the complete content. + fn write_file(&self, relpath: &str, content: &[u8], mode: WriteMode) -> Result<()>; + fn list_dir(&self, relpath: &str) -> Result; + fn create_dir(&self, relpath: &str) -> Result<()>; /// Get metadata about a file. fn metadata(&self, relpath: &str) -> Result; @@ -133,7 +197,13 @@ pub trait Transport: Send + Sync + std::fmt::Debug { fn remove_dir_all(&self, relpath: &str) -> Result<()>; /// Make a new transport addressing a subdirectory. - fn sub_transport(&self, relpath: &str) -> Arc; + fn chdir(&self, relpath: &str) -> Arc; + + fn url(&self) -> &Url; + + fn local_path(&self) -> Option { + None + } } /// A directory entry read from a transport. @@ -191,10 +261,27 @@ pub enum ErrorKind { #[display(fmt = "Create transport error")] CreateTransport, + #[display(fmt = "Connect error")] + Connect, + + #[display(fmt = "Unsupported URL scheme")] + UrlScheme, + #[display(fmt = "Other transport error")] Other, } +impl From for ErrorKind { + fn from(kind: io::ErrorKind) -> Self { + match kind { + io::ErrorKind::NotFound => ErrorKind::NotFound, + io::ErrorKind::AlreadyExists => ErrorKind::AlreadyExists, + io::ErrorKind::PermissionDenied => ErrorKind::PermissionDenied, + _ => ErrorKind::Other, + } + } +} + impl Error { pub fn kind(&self) -> ErrorKind { self.kind @@ -207,6 +294,7 @@ impl Error { io::ErrorKind::PermissionDenied => ErrorKind::PermissionDenied, _ => ErrorKind::Other, }; + Error { source: Some(Box::new(source)), url: Some(url), @@ -245,3 +333,31 @@ impl error::Error for Error { } type Result = result::Result; + +#[cfg(test)] +mod test { + use std::path::Path; + + use super::Transport; + + #[test] + fn get_path_from_local_transport() { + let transport = Transport::local(Path::new("/tmp")); + assert_eq!(transport.local_path().as_deref(), Some(Path::new("/tmp"))); + } + + #[test] + fn local_transport_debug_form() { + let transport = Transport::local(Path::new("/tmp")); + #[cfg(unix)] + assert_eq!(format!("{:?}", transport), "Transport(file:///tmp/)"); + #[cfg(windows)] + { + use regex::Regex; + let dbg = format!("{:?}", transport); + dbg!(&dbg); + let re = Regex::new(r#"Transport\(file:///[A-Za-z]:/tmp/\)"#).unwrap(); + assert!(re.is_match(&dbg)); + } + } +} diff --git a/src/transport/local.rs b/src/transport/local.rs index bfa0075f..15269320 100644 --- a/src/transport/local.rs +++ b/src/transport/local.rs @@ -12,41 +12,38 @@ //! Access to an archive on the local filesystem. -use std::borrow::Cow; -use std::fs::{create_dir, File}; -use std::io; +use std::fs::{create_dir, remove_file, File}; use std::io::prelude::*; -use std::path::{absolute, Path, PathBuf}; +use std::path::{Path, PathBuf}; use std::sync::Arc; +use std::{io, path}; use bytes::Bytes; -use tracing::{instrument, trace, warn}; +use tracing::{error, instrument, trace, warn}; use url::Url; -use super::{Error, ListDir, Metadata, Result, Transport}; +use super::{Error, ListDir, Metadata, Result, WriteMode}; -#[derive(Clone, Debug)] -pub struct LocalTransport { - /// Base directory for this transport. - root: PathBuf, +pub(super) struct Protocol { + /// Directory addressed by this protocol. + path: PathBuf, - /// URL representation of the base directory. + /// URL corresponding to the directory. url: Url, } -impl LocalTransport { - pub fn new(path: &Path) -> Self { - LocalTransport { - root: path.to_owned(), - // TODO: Maybe return an error on construction if we can't find the absolute path. - url: Url::from_directory_path(absolute(path).expect("absolute path")) - .expect("path to URL"), +impl Protocol { + pub(super) fn new(path: &Path) -> Self { + Protocol { + path: path.to_owned(), + url: Url::from_directory_path(path::absolute(path).expect("make path absolute")) + .expect("convert path to URL"), } } - pub fn full_path(&self, relpath: &str) -> PathBuf { + fn full_path(&self, relpath: &str) -> PathBuf { debug_assert!(!relpath.contains("/../"), "path must not contain /../"); - self.root.join(relpath) + self.path.join(relpath) } fn io_error(&self, relpath: &str, err: io::Error) -> Error { @@ -54,11 +51,50 @@ impl LocalTransport { } } -impl Transport for LocalTransport { - fn base_url(&self) -> &Url { +impl super::Protocol for Protocol { + fn url(&self) -> &Url { &self.url } + #[instrument(skip(self))] + fn read_file(&self, relpath: &str) -> Result { + fn try_block(path: &Path) -> io::Result { + let mut file = File::open(path)?; + let mut out_buf = Vec::new(); + let actual_len = file.read_to_end(&mut out_buf)?; + trace!("Read {actual_len} bytes"); + Ok(out_buf.into()) + } + try_block(&self.full_path(relpath)).map_err(|err| self.io_error(relpath, err)) + } + + #[instrument(skip(self, content))] + fn write_file(&self, relpath: &str, content: &[u8], write_mode: WriteMode) -> Result<()> { + let full_path = self.full_path(relpath); + let oops = |err| super::Error::io_error(self.url.join(relpath).unwrap(), err); + let mut options = File::options(); + options.write(true); + match write_mode { + WriteMode::CreateNew => { + options.create_new(true); + } + WriteMode::Overwrite => { + options.create(true).truncate(true); + } + } + let mut file = options.open(&full_path).map_err(oops)?; + if let Err(err) = file.write_all(content) { + error!("Failed to write {full_path:?}: {err:?}"); + drop(file); + if let Err(err2) = remove_file(&full_path) { + error!("Failed to remove {full_path:?}: {err2:?}"); + } + return Err(oops(err)); + } + trace!("Wrote {} bytes", content.len()); + Ok(()) + } + fn list_dir(&self, relpath: &str) -> Result { // Archives should never normally contain non-UTF-8 (or even non-ASCII) filenames, but // let's pass them back as lossy UTF-8 so they can be reported at a higher level, for @@ -83,29 +119,6 @@ impl Transport for LocalTransport { Ok(names) } - #[instrument(skip(self))] - fn read_file(&self, relpath: &str) -> Result { - fn try_block(path: &Path) -> io::Result { - let mut file = File::open(path)?; - let estimated_len: usize = file - .metadata()? - .len() - .try_into() - .expect("File size fits in usize"); - let mut out_buf = Vec::with_capacity(estimated_len); - let actual_len = file.read_to_end(&mut out_buf)?; - trace!("Read {actual_len} bytes"); - out_buf.truncate(actual_len); - Ok(out_buf.into()) - } - try_block(&self.full_path(relpath)).map_err(|err| self.io_error(relpath, err)) - } - - fn is_file(&self, relpath: &str) -> Result { - let path = self.full_path(relpath); - Ok(path.is_file()) - } - fn create_dir(&self, relpath: &str) -> super::Result<()> { let path = self.full_path(relpath); create_dir(&path).or_else(|err| { @@ -117,59 +130,12 @@ impl Transport for LocalTransport { }) } - #[instrument(skip(self, content))] - fn write_new_file(&self, relpath: &str, content: &[u8]) -> super::Result<()> { - File::options() - .create_new(true) - .write(true) - .open(self.full_path(relpath)) - .and_then(|mut file| file.write_all(content).and_then(|()| file.flush())) - .map_err(|err| self.io_error(relpath, err)) - } - - #[instrument(skip(self, content))] - fn write_file(&self, relpath: &str, content: &[u8]) -> super::Result<()> { - let full_path = self.full_path(relpath); - let dir = full_path.parent().unwrap(); - let context = |err| self.io_error(relpath, err); - let mut temp = tempfile::Builder::new() - .prefix(crate::TMP_PREFIX) - .tempfile_in(dir) - .map_err(context)?; - if let Err(err) = temp.write_all(content) { - let _ = temp.close(); - warn!("Failed to write {:?}: {:?}", relpath, err); - return Err(context(err)); - } - if let Err(persist_error) = temp.persist(&full_path) { - warn!("Failed to persist {:?}: {:?}", full_path, persist_error); - persist_error.file.close().map_err(context)?; - Err(context(persist_error.error)) - } else { - trace!("Wrote {} bytes", content.len()); - Ok(()) - } - } - fn remove_file(&self, relpath: &str) -> super::Result<()> { std::fs::remove_file(self.full_path(relpath)).map_err(|err| self.io_error(relpath, err)) } fn remove_dir_all(&self, relpath: &str) -> super::Result<()> { - let path = self.full_path(relpath); - std::fs::remove_dir_all(&path).map_err(|err| self.io_error(relpath, err)) - } - - fn sub_transport(&self, relpath: &str) -> Arc { - let subdir = if relpath.ends_with('/') { - Cow::Borrowed(relpath) - } else { - Cow::Owned(format!("{relpath}/")) - }; - Arc::new(LocalTransport { - root: self.root.join(relpath), - url: self.url.join(&subdir).expect("join component to URL"), - }) + std::fs::remove_dir_all(self.full_path(relpath)).map_err(|err| self.io_error(relpath, err)) } fn metadata(&self, relpath: &str) -> Result { @@ -187,11 +153,16 @@ impl Transport for LocalTransport { modified, }) } -} -impl AsRef for LocalTransport { - fn as_ref(&self) -> &(dyn Transport + 'static) { - self + fn chdir(&self, relpath: &str) -> Arc { + Arc::new(Protocol { + path: self.path.join(relpath), + url: self.url.join(relpath).expect("join URL"), + }) + } + + fn local_path(&self) -> Option { + Some(self.path.clone()) } } @@ -206,7 +177,7 @@ mod test { use super::*; use crate::kind::Kind; - use crate::transport; + use crate::transport::{self, Transport}; #[test] fn read_file() { @@ -216,7 +187,7 @@ mod test { temp.child(filename).write_str(content).unwrap(); - let transport = LocalTransport::new(temp.path()); + let transport = Transport::local(temp.path()); let buf = transport.read_file(filename).unwrap(); assert_eq!(buf, content.as_bytes()); @@ -226,7 +197,7 @@ mod test { #[test] fn read_file_not_found() { let temp = assert_fs::TempDir::new().unwrap(); - let transport = LocalTransport::new(temp.path()); + let transport = Transport::local(temp.path()); let err = transport .read_file("nonexistent.json") @@ -260,7 +231,7 @@ mod test { let filename = "poem.txt"; temp.child(filename).write_str(content).unwrap(); - let transport = LocalTransport::new(temp.path()); + let transport = Transport::local(temp.path()); let metadata = transport.metadata(filename).unwrap(); dbg!(&metadata); @@ -281,7 +252,7 @@ mod test { .write_str("Morning coffee") .unwrap(); - let transport = LocalTransport::new(temp.path()); + let transport = Transport::local(temp.path()); let root_list = transport.list_dir(".").unwrap(); assert_eq!(root_list.files, ["root file"]); assert_eq!(root_list.dirs, ["subdir"]); @@ -299,11 +270,15 @@ mod test { #[test] fn write_file() { let temp = assert_fs::TempDir::new().unwrap(); - let transport = LocalTransport::new(temp.path()); + let transport = Transport::local(temp.path()); transport.create_dir("subdir").unwrap(); transport - .write_file("subdir/subfile", b"Must I paint you a picture?") + .write_file( + "subdir/subfile", + b"Must I paint you a picture?", + WriteMode::CreateNew, + ) .unwrap(); temp.child("subdir").assert(predicate::path::is_dir()); @@ -321,7 +296,7 @@ mod test { use std::os::unix::prelude::PermissionsExt; let temp = assert_fs::TempDir::new().unwrap(); - let transport = LocalTransport::new(temp.path()); + let transport = Transport::local(temp.path()); temp.child("file").touch().unwrap(); fs::set_permissions(temp.child("file").path(), fs::Permissions::from_mode(0o000)) .expect("set_permissions"); @@ -334,13 +309,13 @@ mod test { #[test] fn write_file_can_overwrite() { let temp = assert_fs::TempDir::new().unwrap(); - let transport = LocalTransport::new(temp.path()); + let transport = Transport::local(temp.path()); let filename = "filename"; transport - .write_file(filename, b"original content") + .write_file(filename, b"original content", WriteMode::Overwrite) .expect("first write succeeds"); transport - .write_file(filename, b"new content") + .write_file(filename, b"new content", WriteMode::Overwrite) .expect("write over existing file succeeds"); assert_eq!( transport.read_file(filename).unwrap().as_ref(), @@ -351,7 +326,7 @@ mod test { #[test] fn create_existing_dir() { let temp = assert_fs::TempDir::new().unwrap(); - let transport = LocalTransport::new(temp.path()); + let transport = Transport::local(temp.path()); transport.create_dir("aaa").unwrap(); transport.create_dir("aaa").unwrap(); @@ -363,12 +338,12 @@ mod test { #[test] fn sub_transport() { let temp = assert_fs::TempDir::new().unwrap(); - let transport = LocalTransport::new(temp.path()); + let transport = Transport::local(temp.path()); transport.create_dir("aaa").unwrap(); transport.create_dir("aaa/bbb").unwrap(); - let sub_transport = transport.sub_transport("aaa"); + let sub_transport = transport.chdir("aaa"); let sub_list = sub_transport.list_dir("").unwrap(); assert_eq!(sub_list.dirs, ["bbb"]); @@ -380,7 +355,7 @@ mod test { #[test] fn remove_dir_all() { let temp = assert_fs::TempDir::new().unwrap(); - let transport = LocalTransport::new(temp.path()); + let transport = Transport::local(temp.path()); transport.create_dir("aaa").unwrap(); transport.create_dir("aaa/bbb").unwrap(); diff --git a/src/transport/s3.rs b/src/transport/s3.rs index f5993f4c..4d44f78a 100644 --- a/src/transport/s3.rs +++ b/src/transport/s3.rs @@ -24,7 +24,6 @@ // // cargo mutants -f s3.rs --no-config -C --features=s3,s3-integration-test -use std::borrow::Cow; use std::fmt; use std::sync::Arc; use std::time::SystemTime; @@ -43,12 +42,13 @@ use aws_types::SdkConfig; use base64::Engine; use bytes::Bytes; use tokio::runtime::Runtime; -use tracing::{debug, trace, trace_span}; +use tracing::{debug, instrument, trace, trace_span}; use url::Url; -use super::{Error, ErrorKind, Kind, ListDir, Metadata, Result, Transport}; +use super::{Error, ErrorKind, Kind, ListDir, Metadata, Result, WriteMode}; -pub struct S3Transport { +pub(super) struct Protocol { + url: Url, /// Tokio runtime specifically for S3 IO. /// /// S3 SDK is built on Tokio but the rest of Conserve uses threads. @@ -61,41 +61,26 @@ pub struct S3Transport { bucket: String, base_path: String, - url: Url, - /// Storage class for new objects. storage_class: StorageClass, } -impl fmt::Debug for S3Transport { - #[mutants::skip] // unimportant to test - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("S3Transport") - .field("bucket", &self.bucket) - .field("base_path", &self.base_path) - .finish() - } -} +impl Protocol { + pub(super) fn new(url: &Url) -> Result { + assert_eq!(url.scheme(), "s3"); -// Clippy false positive here: https://github.com/rust-lang/rust-clippy/issues/12444 -#[allow(clippy::assigning_clones)] -impl S3Transport { - pub fn new(base_url: &Url) -> Result> { // Like in . let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .map_err(|source| Error { kind: ErrorKind::CreateTransport, - url: Some(base_url.to_owned()), + url: Some(url.to_owned()), source: Some(Box::new(source)), })?; - let bucket = base_url.authority().to_owned(); - assert!( - !bucket.is_empty(), - "S3 bucket name is empty in {base_url:?}" - ); + let bucket = url.authority().to_owned(); + assert!(!bucket.is_empty(), "S3 bucket name is empty in {url:?}"); // Find the bucket region. let config = load_aws_config(&runtime, None); @@ -117,7 +102,7 @@ impl S3Transport { let config = load_aws_config(&runtime, region); let client = aws_sdk_s3::Client::new(&config); - let mut base_path = base_url.path().to_owned(); + let mut base_path = url.path().to_owned(); if !base_path.is_empty() { base_path = base_path .strip_prefix('/') @@ -126,16 +111,30 @@ impl S3Transport { .to_owned(); } let url = Url::parse(&format!("s3://{bucket}/{base_path}")).expect("valid s3 URL"); - debug!(%base_url); + debug!(%url); - Ok(Arc::new(S3Transport { + Ok(Protocol { bucket, base_path, url, client: Arc::new(client), runtime: Arc::new(runtime), storage_class: StorageClass::IntelligentTiering, - })) + }) + } + + fn join_path(&self, relpath: &str) -> String { + join_paths(&self.base_path, relpath) + } +} + +impl fmt::Debug for Protocol { + #[mutants::skip] // unimportant to test + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("conserve::transport::s3::Protocol") + .field("bucket", &self.bucket) + .field("base_path", &self.base_path) + .finish() } } @@ -186,11 +185,7 @@ fn join_paths(a: &str, b: &str) -> String { result } -impl Transport for S3Transport { - fn base_url(&self) -> &Url { - &self.url - } - +impl super::Protocol for Protocol { fn list_dir(&self, relpath: &str) -> Result { let _span = trace_span!("S3Transport::list_file", %relpath).entered(); let mut prefix = self.join_path(relpath); @@ -271,12 +266,12 @@ impl Transport for S3Transport { Ok(()) } - fn write_file(&self, relpath: &str, content: &[u8]) -> Result<()> { - let _span = trace_span!("S3Transport::write_file", %relpath).entered(); + #[instrument(skip(self, content))] + fn write_file(&self, relpath: &str, content: &[u8], write_mode: WriteMode) -> Result<()> { let key = self.join_path(relpath); let crc32c = base64::engine::general_purpose::STANDARD.encode(crc32c::crc32c(content).to_be_bytes()); - let request = self + let mut request = self .client .put_object() .bucket(&self.bucket) @@ -284,6 +279,9 @@ impl Transport for S3Transport { .storage_class(self.storage_class.clone()) .checksum_crc32_c(crc32c) .body(content.to_owned().into()); + if write_mode == WriteMode::CreateNew { + request = request.if_none_match("*"); + } let response = self.runtime.block_on(request.send()); // trace!(?response); response.map_err(|err| s3_error(&self.url, key, err))?; @@ -291,10 +289,6 @@ impl Transport for S3Transport { Ok(()) } - fn write_new_file(&self, _relpath: &str, _content: &[u8]) -> Result<()> { - unimplemented!("S3Transport::write_new_file"); - } - fn remove_file(&self, relpath: &str) -> Result<()> { let _span = trace_span!("S3Transport::remove_file", %relpath).entered(); let key = self.join_path(relpath); @@ -383,32 +377,19 @@ impl Transport for S3Transport { } } - fn sub_transport(&self, relpath: &str) -> Arc { - let subdir = if relpath.ends_with('/') { - Cow::Borrowed(relpath) - } else { - Cow::Owned(format!("{relpath}/")) - }; - Arc::new(S3Transport { + fn chdir(&self, relpath: &str) -> Arc { + Arc::new(Protocol { base_path: join_paths(&self.base_path, relpath), - url: self.url.join(&subdir).expect("join subdir URL"), + url: self.url.join(relpath).expect("join subdir URL"), bucket: self.bucket.clone(), runtime: self.runtime.clone(), client: self.client.clone(), storage_class: self.storage_class.clone(), }) } -} -impl S3Transport { - fn join_path(&self, relpath: &str) -> String { - join_paths(&self.base_path, relpath) - } -} - -impl AsRef for S3Transport { - fn as_ref(&self) -> &(dyn Transport + 'static) { - self + fn url(&self) -> &Url { + &self.url } } diff --git a/src/transport/sftp.rs b/src/transport/sftp.rs new file mode 100644 index 00000000..56fcb9cd --- /dev/null +++ b/src/transport/sftp.rs @@ -0,0 +1,322 @@ +// Copyright 2022 Martin Pool + +//! Read/write archive over SFTP. + +use std::fmt; +use std::io::{self, Read, Write}; +use std::net::TcpStream; +use std::path::PathBuf; +use std::sync::Arc; + +use bytes::Bytes; +use time::OffsetDateTime; +use tracing::{error, info, instrument, trace, warn}; +use url::Url; + +use crate::Kind; + +use super::{Error, ErrorKind, ListDir, Result, WriteMode}; + +pub(super) struct Protocol { + url: Url, + sftp: Arc, + base_path: PathBuf, +} + +impl Protocol { + pub fn new(url: &Url) -> Result { + assert_eq!(url.scheme(), "sftp"); + let addr = format!( + "{}:{}", + url.host_str().expect("url must have a host"), + url.port().unwrap_or(22) + ); + let tcp_stream = TcpStream::connect(addr).map_err(|err| { + error!(?err, ?url, "Error opening TCP connection"); + io_error(err, url) + })?; + trace!("got tcp connection"); + let mut session = ssh2::Session::new().map_err(|err| { + error!(?err, "Error creating SSH session"); + ssh_error(err, url) + })?; + session.set_tcp_stream(tcp_stream); + session.handshake().map_err(|err| { + error!(?err, "Error in SSH handshake"); + ssh_error(err, url) + })?; + trace!( + "SSH connected, banner: {}", + session.banner().unwrap_or("(none)") + ); + let username = match url.username() { + "" => { + trace!("Take default SSH username from environment"); + whoami::username() + } + u => u.to_owned(), + }; + session.userauth_agent(&username).map_err(|err| { + error!(?err, username, "Error in SSH user auth with agent"); + ssh_error(err, url) + })?; + trace!("Authenticated!"); + let sftp = session.sftp().map_err(|err| { + error!(?err, "Error opening SFTP session"); + ssh_error(err, url) + })?; + Ok(Protocol { + url: url.to_owned(), + sftp: Arc::new(sftp), + base_path: url.path().into(), + }) + } + + fn lstat(&self, path: &str) -> Result { + trace!("lstat {path}"); + self.sftp + .lstat(&self.base_path.join(path)) + .map_err(|err| self.ssh_error(err, path)) + } + + fn ssh_error(&self, source: ssh2::Error, path: &str) -> super::Error { + ssh_error(source, self.url.join(path).as_ref().unwrap_or(&self.url)) + } + + fn io_error(&self, source: io::Error, path: &str) -> Error { + Error { + kind: source.kind().into(), + source: Some(Box::new(source)), + url: self.url.join(path).ok(), + } + } +} + +fn ssh_error(source: ssh2::Error, url: &Url) -> super::Error { + super::Error { + kind: source.code().into(), + source: Some(Box::new(source)), + url: Some(url.to_owned()), + } +} + +fn io_error(source: io::Error, url: &Url) -> super::Error { + super::Error { + kind: source.kind().into(), + source: Some(Box::new(source)), + url: Some(url.to_owned()), + } +} + +impl super::Protocol for Protocol { + fn list_dir(&self, path: &str) -> Result { + let full_path = &self.base_path.join(path); + trace!("iter_dir_entries {:?}", full_path); + let mut files = Vec::new(); + let mut dirs = Vec::new(); + let mut dir = self.sftp.opendir(full_path).map_err(|err| { + error!(?err, ?full_path, "Error opening directory"); + self.ssh_error(err, full_path.to_string_lossy().as_ref()) + })?; + loop { + match dir.readdir() { + Ok((pathbuf, file_stat)) => { + let name = pathbuf.to_string_lossy().into(); + if name == "." || name == ".." { + continue; + } + trace!("read dir got name {}", name); + match file_stat.file_type().into() { + Kind::File => files.push(name), + Kind::Dir => dirs.push(name), + _ => (), + } + } + Err(err) if err.code() == ssh2::ErrorCode::Session(-16) => { + // Apparently there's no symbolic version for it, but this is the error + // code. + // + trace!("read dir end"); + break; + } + Err(err) => { + info!("SFTP error {:?}", err); + return Err(self.ssh_error(err, path)); + } + } + } + Ok(ListDir { files, dirs }) + } + + fn read_file(&self, path: &str) -> Result { + let full_path = self.base_path.join(path); + trace!("attempt open {}", full_path.display()); + let mut buf = Vec::with_capacity(2 << 20); + let mut file = self + .sftp + .open(&full_path) + .map_err(|err| self.ssh_error(err, path))?; + let len = file + .read_to_end(&mut buf) + .map_err(|err| self.io_error(err, path))?; + assert_eq!(len, buf.len()); + trace!("read {} bytes from {}", len, full_path.display()); + Ok(buf.into()) + } + + fn create_dir(&self, relpath: &str) -> Result<()> { + let full_path = self.base_path.join(relpath); + trace!("create_dir {:?}", full_path); + match self.sftp.mkdir(&full_path, 0o700) { + Ok(()) => Ok(()), + Err(err) if err.code() == ssh2::ErrorCode::SFTP(libssh2_sys::LIBSSH2_FX_FAILURE) => { + // openssh seems to say failure for "directory exists" :/ + Ok(()) + } + Err(err) => { + warn!(?err, ?relpath); + Err(self.ssh_error(err, relpath)) + } + } + } + + fn write_file(&self, relpath: &str, content: &[u8], write_mode: WriteMode) -> Result<()> { + let full_path = self.base_path.join(relpath); + trace!( + "write_file {len:>9} bytes to {full_path:?}", + len = content.len() + ); + let flags = ssh2::OpenFlags::WRITE + | ssh2::OpenFlags::CREATE + | match write_mode { + WriteMode::CreateNew => ssh2::OpenFlags::EXCLUSIVE, + WriteMode::Overwrite => ssh2::OpenFlags::TRUNCATE, + }; + let mut file = self + .sftp + .open_mode(&full_path, flags, 0o644, ssh2::OpenType::File) + .map_err(|err| { + warn!(?err, ?relpath, "sftp error creating file"); + self.ssh_error(err, relpath) + })?; + if let Err(err) = file.write_all(content) { + warn!(?err, ?full_path, "sftp error writing file"); + if let Err(err2) = self.sftp.unlink(&full_path) { + warn!( + ?err2, + ?full_path, + "sftp error unlinking file after write error" + ); + } + return Err(super::Error::io_error(self.url.join(relpath).unwrap(), err)); + } + Ok(()) + } + + fn metadata(&self, relpath: &str) -> Result { + let full_path = self.base_path.join(relpath); + let stat = self.lstat(relpath)?; + // TODO: runtime error, don't panic + let modified = OffsetDateTime::from_unix_timestamp( + stat.mtime + .expect("sftp stat has an mtime") + .try_into() + .unwrap(), + ) + .expect("convert mtime"); + trace!("metadata {full_path:?}"); + Ok(super::Metadata { + kind: stat.file_type().into(), + len: stat.size.unwrap_or_default(), + modified, + }) + } + + fn remove_file(&self, relpath: &str) -> Result<()> { + let full_path = self.base_path.join(relpath); + trace!("remove_file {full_path:?}"); + self.sftp + .unlink(&full_path) + .map_err(|err| self.ssh_error(err, relpath)) + } + + #[instrument] + fn remove_dir_all(&self, path: &str) -> Result<()> { + let mut dirs_to_walk = vec![path.to_owned()]; + let mut dirs_to_delete = vec![path.to_owned()]; + while let Some(dir) = dirs_to_walk.pop() { + trace!(?dir, "Walk down dir"); + let list = self.list_dir(&dir)?; + for file in list.files { + self.remove_file(&format!("{dir}/{file}"))?; + } + list.dirs + .iter() + .map(|subdir| format!("{dir}/{subdir}")) + .for_each(|p| { + dirs_to_delete.push(p.clone()); + dirs_to_walk.push(p) + }); + } + // Consume them in the reverse order discovered, so bottom up + for dir in dirs_to_delete.iter().rev() { + let full_path = self.base_path.join(dir); + trace!(?dir, "rmdir"); + self.sftp + .rmdir(&full_path) + .map_err(|err| self.ssh_error(err, dir))?; + } + Ok(()) + // let full_path = self.base_path.join(relpath); + // trace!("remove_dir {full_path:?}"); + // self.sftp.rmdir(&full_path).map_err(translate_error) + } + + fn chdir(&self, relpath: &str) -> Arc { + let base_path = self.base_path.join(relpath); + let url = self.url.join(relpath).expect("join URL"); + Arc::new(Protocol { + url, + sftp: Arc::clone(&self.sftp), + base_path, + }) + } + + fn url(&self) -> &Url { + &self.url + } +} + +impl fmt::Debug for Protocol { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("sftp::Protocol") + .field("url", &self.url) + .finish() + } +} + +impl From for Kind { + fn from(kind: ssh2::FileType) -> Self { + use ssh2::FileType::*; + match kind { + RegularFile => Kind::File, + Directory => Kind::Dir, + Symlink => Kind::Symlink, + _ => Kind::Unknown, + } + } +} + +impl From for ErrorKind { + fn from(code: ssh2::ErrorCode) -> Self { + // Map other errors to io::Error that aren't handled by libssh. + // + // See https://github.com/alexcrichton/ssh2-rs/issues/244. + match code { + ssh2::ErrorCode::SFTP(libssh2_sys::LIBSSH2_FX_NO_SUCH_FILE) + | ssh2::ErrorCode::SFTP(libssh2_sys::LIBSSH2_FX_NO_SUCH_PATH) => ErrorKind::NotFound, + // TODO: Others + _ => ErrorKind::Other, + } + } +} diff --git a/tests/cli.rs b/tests/cli.rs index bd3babf1..df0a5756 100644 --- a/tests/cli.rs +++ b/tests/cli.rs @@ -31,7 +31,9 @@ use url::Url; use conserve::test_fixtures::{ScratchArchive, TreeFixture}; fn run_conserve() -> Command { - Command::cargo_bin("conserve").expect("locate conserve binary") + let mut command = Command::cargo_bin("conserve").expect("locate conserve binary"); + command.env_remove("RUST_LOG"); + command } #[test] diff --git a/tests/damage.rs b/tests/damage.rs index 207b4d95..b887f20e 100644 --- a/tests/damage.rs +++ b/tests/damage.rs @@ -46,7 +46,7 @@ use tracing_test::traced_test; use conserve::counters::Counter; use conserve::monitor::test::TestMonitor; -use conserve::transport::open_local_transport; +use conserve::transport::Transport; use conserve::{ backup, restore, Apath, Archive, BackupOptions, BandId, BandSelectionPolicy, BlockHash, EntryTrait, Exclude, RestoreOptions, ValidateOptions, @@ -113,9 +113,7 @@ fn backup_after_damage( action.damage(&location.to_path(&archive_dir)); // Open the archive again to avoid cache effects. - let archive = - Archive::open(conserve::transport::open_local_transport(archive_dir.path()).unwrap()) - .expect("open archive"); + let archive = Archive::open(Transport::local(archive_dir.path())).expect("open archive"); // A second backup should succeed. changes.apply(&source_dir); @@ -264,9 +262,7 @@ impl DamageLocation { .join(BandId::from(*band_id).to_string()) .join("BANDTAIL"), DamageLocation::Block(block_index) => { - let archive = - Archive::open(open_local_transport(archive_dir).expect("open transport")) - .expect("open archive"); + let archive = Archive::open(Transport::local(archive_dir)).expect("open archive"); let block_dir = archive.block_dir(); let block_hash = block_dir .blocks(TestMonitor::arc()) diff --git a/tests/failpoints.rs b/tests/failpoints.rs index eb1ecf92..43d22895 100644 --- a/tests/failpoints.rs +++ b/tests/failpoints.rs @@ -17,18 +17,19 @@ use std::path::Path; use assert_fs::TempDir; use conserve::monitor::test::TestMonitor; -use conserve::transport::open_local_transport; use fail::FailScenario; +use crate::transport::Transport; use conserve::*; #[test] fn create_dir_permission_denied() { let scenario = FailScenario::setup(); fail::cfg("restore::create-dir", "return").unwrap(); - let archive = - Archive::open(open_local_transport(Path::new("testdata/archive/simple/v0.6.10")).unwrap()) - .unwrap(); + let archive = Archive::open(Transport::local(Path::new( + "testdata/archive/simple/v0.6.10", + ))) + .unwrap(); let options = RestoreOptions { ..RestoreOptions::default() }; diff --git a/tests/format_flags.rs b/tests/format_flags.rs index 3e245f2b..ade487d8 100644 --- a/tests/format_flags.rs +++ b/tests/format_flags.rs @@ -5,6 +5,7 @@ use conserve::test_fixtures::ScratchArchive; use conserve::*; +use transport::WriteMode; #[test] // This can be updated if/when Conserve does start writing some flags by default. @@ -42,8 +43,12 @@ fn unknown_format_flag_fails_to_open() { "format_flags": ["wibble"] }); af.transport() - .sub_transport("b0000") - .write_file("BANDHEAD", &serde_json::to_vec(&head).unwrap()) + .chdir("b0000") + .write_file( + "BANDHEAD", + &serde_json::to_vec(&head).unwrap(), + WriteMode::CreateNew, + ) .unwrap(); let err = Band::open(&af, BandId::zero()).unwrap_err(); diff --git a/tests/restore.rs b/tests/restore.rs index 71ca40fb..ad7de612 100644 --- a/tests/restore.rs +++ b/tests/restore.rs @@ -14,13 +14,9 @@ use std::cell::RefCell; use std::fs::{create_dir, write}; -#[cfg(unix)] -use std::fs::{read_link, symlink_metadata}; -use std::path::PathBuf; use conserve::counters::Counter; use conserve::monitor::test::TestMonitor; -use filetime::{set_symlink_file_times, FileTime}; use tempfile::TempDir; use conserve::test_fixtures::ScratchArchive; @@ -194,6 +190,11 @@ fn exclude_files() { #[test] #[cfg(unix)] fn restore_symlink() { + use std::fs::{read_link, symlink_metadata}; + use std::path::PathBuf; + + use filetime::{set_symlink_file_times, FileTime}; + use conserve::monitor::test::TestMonitor; let af = ScratchArchive::new(); diff --git a/tests/s3_integration.rs b/tests/s3_integration.rs index 3ad1d75e..5dfc441b 100644 --- a/tests/s3_integration.rs +++ b/tests/s3_integration.rs @@ -1,4 +1,4 @@ -// Copyright 2023 Martin Pool +// Copyright 2023-2024 Martin Pool #![cfg(feature = "s3-integration-test")] @@ -97,23 +97,23 @@ impl TempBucket { .expect("Create bucket"); println!("Created bucket {bucket_name}"); + let lifecycle_configuration = BucketLifecycleConfiguration::builder() + .rules( + LifecycleRule::builder() + .id("delete-after-7d") + .filter(LifecycleRuleFilter::builder().prefix("").build()) + .status(ExpirationStatus::Enabled) + .expiration(LifecycleExpiration::builder().days(7).build()) + .build() + .expect("Build S3 lifecycle rule"), + ) + .build() + .expect("Build S3 lifecycle configuration"); + dbg!(&lifecycle_configuration); client .put_bucket_lifecycle_configuration() .bucket(bucket_name) - .lifecycle_configuration( - BucketLifecycleConfiguration::builder() - .rules( - LifecycleRule::builder() - .id("delete-after-7d") - .filter(LifecycleRuleFilter::ObjectSizeGreaterThan(0)) - .status(ExpirationStatus::Enabled) - .expiration(LifecycleExpiration::builder().days(7).build()) - .build() - .expect("Build S3 lifecycle rule"), - ) - .build() - .expect("Build S3 lifecycle configuration"), - ) + .lifecycle_configuration(lifecycle_configuration) .send() .await .expect("Set bucket lifecycle"); diff --git a/tests/transport.rs b/tests/transport.rs index 8d0f9dd5..d61fdb41 100644 --- a/tests/transport.rs +++ b/tests/transport.rs @@ -10,14 +10,16 @@ // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. +use std::path::Path; + use assert_fs::prelude::*; use url::Url; -use conserve::transport::{open_transport, ListDir}; +use conserve::transport::{ListDir, Transport}; #[test] fn open_local() { - open_transport("/backup").unwrap(); + Transport::local(Path::new("/backup")); } #[test] @@ -29,7 +31,7 @@ fn list_dir_names() { let url = Url::from_directory_path(temp.path()).unwrap(); dbg!(&url); - let transport = open_transport(url.as_str()).unwrap(); + let transport = Transport::new(url.as_str()).unwrap(); dbg!(&transport); let ListDir { mut files, dirs } = transport.list_dir("").unwrap(); @@ -49,22 +51,22 @@ fn parse_location_urls() { "c:/backup/repo", r"c:\backup\repo\", ] { - assert!(open_transport(n).is_ok(), "Failed to parse {n:?}"); + assert!(Transport::new(n).is_ok(), "Failed to parse {n:?}"); } } #[test] fn unsupported_location_urls() { assert_eq!( - open_transport("http://conserve.example/repo") + Transport::new("http://conserve.example/repo") .unwrap_err() .to_string(), - "Unsupported URL scheme \"http\"" + "Unsupported URL scheme: http://conserve.example/repo" ); assert_eq!( - open_transport("ftp://user@conserve.example/repo") + Transport::new("ftp://user@conserve.example/repo") .unwrap_err() .to_string(), - "Unsupported URL scheme \"ftp\"" + "Unsupported URL scheme: ftp://user@conserve.example/repo" ); }