Skip to content

Commit

Permalink
Merge pull request #277 from sourcefrog/write-mode
Browse files Browse the repository at this point in the history
Distinguish overwrite vs create new in Transport
  • Loading branch information
sourcefrog authored Dec 8, 2024
2 parents e1f198b + 4b6254b commit 00b929a
Show file tree
Hide file tree
Showing 10 changed files with 112 additions and 55 deletions.
22 changes: 16 additions & 6 deletions src/blockdir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use rayon::prelude::*;
use serde::{Deserialize, Serialize};
use tracing::{debug, warn};
use tracing::{instrument, trace};
use transport::WriteMode;

use crate::compress::snappy::{Compressor, Decompressor};
use crate::counters::Counter;
Expand Down Expand Up @@ -131,7 +132,19 @@ impl BlockDir {
let hex_hash = hash.to_string();
let relpath = block_relpath(&hash);
self.transport.create_dir(subdir_relpath(&hex_hash))?;
self.transport.write_file(&relpath, &compressed)?;
match self
.transport
.write_file(&relpath, &compressed, WriteMode::CreateNew)
{
Ok(()) => {}
Err(err) if err.kind() == transport::ErrorKind::AlreadyExists => {
// let's assume the contents are correct
}
Err(err) => {
warn!(?err, ?hash, "Error writing block");
return Err(err.into());
}
}
stats.written_blocks += 1;
stats.uncompressed_bytes += uncomp_len;
stats.compressed_bytes += comp_len;
Expand Down Expand Up @@ -386,11 +399,8 @@ mod test {
let monitor = TestMonitor::arc();
let subdir = tempdir.path().join(subdir_relpath("123"));
create_dir(&subdir).unwrap();
write(
subdir.join(format!("{}{}", TMP_PREFIX, "123123123")),
b"123",
)
.unwrap();
// Write a temp file as was created by earlier versions of the code.
write(subdir.join("tmp123123123"), b"123").unwrap();
let blocks = blockdir
.blocks(monitor.clone())
.unwrap()
Expand Down
6 changes: 5 additions & 1 deletion src/gc_lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
//! delete but before starting to actually delete them, we check that no
//! new bands have been created.
use transport::WriteMode;

use crate::*;

pub static GC_LOCK: &str = "GC_LOCK";
Expand Down Expand Up @@ -63,7 +65,9 @@ impl GarbageCollectionLock {
if archive.transport().is_file(GC_LOCK).unwrap_or(true) {
return Err(Error::GarbageCollectionLockHeld);
}
archive.transport().write_file(GC_LOCK, b"{}\n")?;
archive
.transport()
.write_file(GC_LOCK, b"{}\n", WriteMode::CreateNew)?;
Ok(GarbageCollectionLock { archive, band_id })
}

Expand Down
4 changes: 3 additions & 1 deletion src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::transport::Transport;
use itertools::Itertools;
use time::OffsetDateTime;
use tracing::{debug, debug_span, error};
use transport::WriteMode;

use crate::compress::snappy::{Compressor, Decompressor};
use crate::counters::Counter;
Expand Down Expand Up @@ -253,7 +254,8 @@ impl IndexWriter {
self.transport.create_dir(&subdir_relpath(self.sequence))?;
}
let compressed_bytes = self.compressor.compress(&json)?;
self.transport.write_file(&relpath, &compressed_bytes)?;
self.transport
.write_file(&relpath, &compressed_bytes, WriteMode::CreateNew)?;
self.hunks_written += 1;
monitor.count(Counter::IndexWrites, 1);
monitor.count(Counter::IndexWriteCompressedBytes, compressed_bytes.len());
Expand Down
4 changes: 2 additions & 2 deletions src/jsonio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::path::PathBuf;

use serde::de::DeserializeOwned;

use crate::transport::{self, Transport};
use crate::transport::{self, Transport, WriteMode};

#[derive(Debug, thiserror::Error)]
pub enum Error {
Expand Down Expand Up @@ -54,7 +54,7 @@ where
})?;
s.push('\n');
transport
.write_file(relpath, s.as_bytes())
.write_file(relpath, s.as_bytes(), WriteMode::CreateNew)
.map_err(Error::from)
}

Expand Down
3 changes: 0 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,6 @@ pub const ARCHIVE_VERSION: &str = "0.6";

pub const SYMLINKS_SUPPORTED: bool = cfg!(target_family = "unix");

/// Temporary files in the archive have this prefix.
const TMP_PREFIX: &str = "tmp";

/// Metadata file in the band directory.
static BAND_HEAD_FILENAME: &str = "BANDHEAD";

Expand Down
15 changes: 12 additions & 3 deletions src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ impl Transport {
}
}

pub fn write_file(&self, relpath: &str, content: &[u8]) -> Result<()> {
self.protocol.write_file(relpath, content)
pub fn write_file(&self, relpath: &str, content: &[u8], mode: WriteMode) -> Result<()> {
self.protocol.write_file(relpath, content, mode)
}

pub fn create_dir(&self, relpath: &str) -> Result<()> {
Expand Down Expand Up @@ -163,9 +163,18 @@ impl fmt::Debug for Transport {
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WriteMode {
/// Create the file if it does not exist, or overwrite it if it does.
Overwrite,

/// Create the file if it does not exist, or fail if it does.
CreateNew,
}

trait Protocol: Send + Sync {
fn read_file(&self, path: &str) -> Result<Bytes>;
fn write_file(&self, relpath: &str, content: &[u8]) -> Result<()>;
fn write_file(&self, relpath: &str, content: &[u8], mode: WriteMode) -> Result<()>;
fn list_dir(&self, relpath: &str) -> Result<ListDir>;
fn create_dir(&self, relpath: &str) -> Result<()>;
fn metadata(&self, relpath: &str) -> Result<Metadata>;
Expand Down
53 changes: 30 additions & 23 deletions src/transport/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ use std::sync::Arc;
use std::{io, path};

use bytes::Bytes;
use tracing::{instrument, trace, warn};
use tracing::{error, instrument, trace, warn};
use url::Url;

use super::{Error, ListDir, Metadata, Result};
use super::{Error, ListDir, Metadata, Result, WriteMode};

pub(super) struct Protocol {
path: PathBuf,
Expand Down Expand Up @@ -68,28 +68,31 @@ impl super::Protocol for Protocol {
}

#[instrument(skip(self, content))]
fn write_file(&self, relpath: &str, content: &[u8]) -> Result<()> {
fn write_file(&self, relpath: &str, content: &[u8], write_mode: WriteMode) -> Result<()> {
// TODO: Just write directly; remove if the write fails.
let full_path = self.full_path(relpath);
let dir = full_path.parent().unwrap();
let context = |err| super::Error::io_error(&full_path, err);
let mut temp = tempfile::Builder::new()
.prefix(crate::TMP_PREFIX)
.tempfile_in(dir)
.map_err(context)?;
if let Err(err) = temp.write_all(content) {
let _ = temp.close();
warn!("Failed to write {:?}: {:?}", relpath, err);
return Err(context(err));
let oops = |err| super::Error::io_error(&full_path, err);
let mut options = File::options();
options.write(true);
match write_mode {
WriteMode::CreateNew => {
options.create_new(true);
}
WriteMode::Overwrite => {
options.create(true).truncate(true);
}
}
if let Err(persist_error) = temp.persist(&full_path) {
warn!("Failed to persist {:?}: {:?}", full_path, persist_error);
persist_error.file.close().map_err(context)?;
Err(context(persist_error.error))
} else {
trace!("Wrote {} bytes", content.len());
Ok(())
let mut file = options.open(&full_path).map_err(oops)?;
if let Err(err) = file.write_all(content) {
error!("Failed to write {full_path:?}: {err:?}");
drop(file);
if let Err(err2) = remove_file(&full_path) {
error!("Failed to remove {full_path:?}: {err2:?}");
}
return Err(oops(err));
}
trace!("Wrote {} bytes", content.len());
Ok(())
}

fn list_dir(&self, relpath: &str) -> Result<ListDir> {
Expand Down Expand Up @@ -257,7 +260,11 @@ mod test {

transport.create_dir("subdir").unwrap();
transport
.write_file("subdir/subfile", b"Must I paint you a picture?")
.write_file(
"subdir/subfile",
b"Must I paint you a picture?",
WriteMode::CreateNew,
)
.unwrap();

temp.child("subdir").assert(predicate::path::is_dir());
Expand Down Expand Up @@ -291,10 +298,10 @@ mod test {
let transport = Transport::local(temp.path());
let filename = "filename";
transport
.write_file(filename, b"original content")
.write_file(filename, b"original content", WriteMode::Overwrite)
.expect("first write succeeds");
transport
.write_file(filename, b"new content")
.write_file(filename, b"new content", WriteMode::Overwrite)
.expect("write over existing file succeeds");
assert_eq!(
transport.read_file(filename).unwrap().as_ref(),
Expand Down
13 changes: 8 additions & 5 deletions src/transport/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ use aws_types::SdkConfig;
use base64::Engine;
use bytes::Bytes;
use tokio::runtime::Runtime;
use tracing::{debug, trace, trace_span};
use tracing::{debug, instrument, trace, trace_span};
use url::Url;

use super::{Error, ErrorKind, Kind, ListDir, Metadata, Result};
use super::{Error, ErrorKind, Kind, ListDir, Metadata, Result, WriteMode};

pub(super) struct Protocol {
url: Url,
Expand Down Expand Up @@ -261,19 +261,22 @@ impl super::Protocol for Protocol {
Ok(())
}

fn write_file(&self, relpath: &str, content: &[u8]) -> Result<()> {
let _span = trace_span!("S3Transport::write_file", %relpath).entered();
#[instrument(skip(self, content))]
fn write_file(&self, relpath: &str, content: &[u8], write_mode: WriteMode) -> Result<()> {
let key = self.join_path(relpath);
let crc32c =
base64::engine::general_purpose::STANDARD.encode(crc32c::crc32c(content).to_be_bytes());
let request = self
let mut request = self
.client
.put_object()
.bucket(&self.bucket)
.key(&key)
.storage_class(self.storage_class.clone())
.checksum_crc32_c(crc32c)
.body(content.to_owned().into());
if write_mode == WriteMode::CreateNew {
request = request.if_none_match("*");
}
let response = self.runtime.block_on(request.send());
// trace!(?response);
response.map_err(|err| s3_error(key, err))?;
Expand Down
40 changes: 30 additions & 10 deletions src/transport/sftp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use url::Url;

use crate::Kind;

use super::{Error, ErrorKind, ListDir, Result};
use super::{Error, ErrorKind, ListDir, Result, WriteMode};

pub(super) struct Protocol {
url: Url,
Expand Down Expand Up @@ -151,17 +151,37 @@ impl super::Protocol for Protocol {
}
}

fn write_file(&self, relpath: &str, content: &[u8]) -> Result<()> {
fn write_file(&self, relpath: &str, content: &[u8], write_mode: WriteMode) -> Result<()> {
let full_path = self.base_path.join(relpath);
trace!("write_file {:>9} bytes to {:?}", content.len(), full_path);
let mut file = self.sftp.create(&full_path).map_err(|err| {
warn!(?err, ?relpath, "sftp error creating file");
ssh_error(err, relpath)
})?;
file.write_all(content).map_err(|err| {
trace!(
"write_file {len:>9} bytes to {full_path:?}",
len = content.len()
);
let flags = ssh2::OpenFlags::WRITE
| ssh2::OpenFlags::CREATE
| match write_mode {
WriteMode::CreateNew => ssh2::OpenFlags::EXCLUSIVE,
WriteMode::Overwrite => ssh2::OpenFlags::TRUNCATE,
};
let mut file = self
.sftp
.open_mode(&full_path, flags, 0o644, ssh2::OpenType::File)
.map_err(|err| {
warn!(?err, ?relpath, "sftp error creating file");
ssh_error(err, relpath)
})?;
if let Err(err) = file.write_all(content) {
warn!(?err, ?full_path, "sftp error writing file");
io_error(err, relpath)
})
if let Err(err2) = self.sftp.unlink(&full_path) {
warn!(
?err2,
?full_path,
"sftp error unlinking file after write error"
);
}
return Err(super::Error::io_error(&full_path, err));
}
Ok(())
}

fn metadata(&self, relpath: &str) -> Result<super::Metadata> {
Expand Down
7 changes: 6 additions & 1 deletion tests/format_flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use conserve::test_fixtures::ScratchArchive;
use conserve::*;
use transport::WriteMode;

#[test]
// This can be updated if/when Conserve does start writing some flags by default.
Expand Down Expand Up @@ -43,7 +44,11 @@ fn unknown_format_flag_fails_to_open() {
});
af.transport()
.chdir("b0000")
.write_file("BANDHEAD", &serde_json::to_vec(&head).unwrap())
.write_file(
"BANDHEAD",
&serde_json::to_vec(&head).unwrap(),
WriteMode::CreateNew,
)
.unwrap();

let err = Band::open(&af, BandId::zero()).unwrap_err();
Expand Down

0 comments on commit 00b929a

Please sign in to comment.