Skip to content

Commit

Permalink
Merge pull request #281 from sourcefrog/transport-ref
Browse files Browse the repository at this point in the history
Transport metadata mtime, and URLs in Transport::Error
  • Loading branch information
sourcefrog authored Dec 23, 2024
2 parents 896ab59 + b54c9db commit 68d631c
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 72 deletions.
51 changes: 39 additions & 12 deletions src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::{error, fmt, io, result};

use bytes::Bytes;
use derive_more::Display;
use time::OffsetDateTime;
use url::Url;

use crate::*;
Expand All @@ -45,6 +46,7 @@ pub mod s3;
/// support streaming or partial reads and writes.
#[derive(Clone)]
pub struct Transport {
/// The concrete protocol implementation: local, S3, etc.
protocol: Arc<dyn Protocol + 'static>,
}

Expand Down Expand Up @@ -84,7 +86,7 @@ impl Transport {
_other => {
return Err(Error {
kind: ErrorKind::UrlScheme,
path: Some(url.as_str().to_owned()),
url: Some(url.clone()),
source: None,
})
}
Expand Down Expand Up @@ -174,9 +176,18 @@ pub enum WriteMode {

trait Protocol: Send + Sync {
fn read_file(&self, path: &str) -> Result<Bytes>;

/// Write a complete file.
///
/// Depending on the [WriteMode] this may either overwrite existing files, or error.
///
/// As much as possible, the file should be written atomically so that it is only visible with
/// the complete content.
fn write_file(&self, relpath: &str, content: &[u8], mode: WriteMode) -> Result<()>;
fn list_dir(&self, relpath: &str) -> Result<ListDir>;
fn create_dir(&self, relpath: &str) -> Result<()>;

/// Get metadata about a file.
fn metadata(&self, relpath: &str) -> Result<Metadata>;

/// Delete a file.
Expand Down Expand Up @@ -212,6 +223,9 @@ pub struct Metadata {

/// Kind of file.
pub kind: Kind,

/// Last modified time.
pub modified: OffsetDateTime,
}

/// A list of all the files and directories in a directory.
Expand All @@ -225,11 +239,11 @@ pub struct ListDir {
#[derive(Debug)]
pub struct Error {
/// What type of generally known error?
kind: ErrorKind,
pub kind: ErrorKind,
/// The underlying error: for example an IO or S3 error.
source: Option<Box<dyn error::Error + Send + Sync>>,
/// The affected path, possibly relative to the transport.
path: Option<String>,
pub source: Option<Box<dyn error::Error + Send + Sync>>,
/// The affected URL, if known.
pub url: Option<Url>,
}

/// General categories of transport errors.
Expand All @@ -244,6 +258,12 @@ pub enum ErrorKind {
#[display(fmt = "Permission denied")]
PermissionDenied,

#[display(fmt = "Create transport error")]
CreateTransport,

#[display(fmt = "Connect error")]
Connect,

#[display(fmt = "Unsupported URL scheme")]
UrlScheme,

Expand All @@ -268,28 +288,35 @@ impl Error {
}

pub(self) fn io_error(path: &Path, source: io::Error) -> Error {
let kind = match source.kind() {
io::ErrorKind::NotFound => ErrorKind::NotFound,
io::ErrorKind::AlreadyExists => ErrorKind::AlreadyExists,
io::ErrorKind::PermissionDenied => ErrorKind::PermissionDenied,
_ => ErrorKind::Other,
};

Error {
kind: source.kind().into(),
source: Some(Box::new(source)),
path: Some(path.to_string_lossy().to_string()),
url: Url::from_file_path(path).ok(),
kind,
}
}

pub fn is_not_found(&self) -> bool {
self.kind == ErrorKind::NotFound
}

/// The transport-relative path where this error occurred, if known.
pub fn path(&self) -> Option<&str> {
self.path.as_deref()
/// The URL where this error occurred, if known.
pub fn url(&self) -> Option<&Url> {
self.url.as_ref()
}
}

impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.kind)?;
if let Some(ref path) = self.path {
write!(f, ": {}", path)?;
if let Some(ref url) = self.url {
write!(f, ": {url}")?;
}
if let Some(source) = &self.source {
// I'm not sure we should write this here; it might be repetitive.
Expand Down
27 changes: 19 additions & 8 deletions src/transport/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,14 @@ impl super::Protocol for Protocol {
fn metadata(&self, relpath: &str) -> Result<Metadata> {
let path = self.full_path(relpath);
let fsmeta = path.metadata().map_err(|err| Error::io_error(&path, err))?;
let modified = fsmeta
.modified()
.map_err(|err| Error::io_error(&path, err))?
.into();
Ok(Metadata {
len: fsmeta.len(),
kind: fsmeta.file_type().into(),
modified,
})
}

Expand Down Expand Up @@ -164,9 +169,11 @@ impl super::Protocol for Protocol {
#[cfg(test)]
mod test {
use std::error::Error;
use std::time::Duration;

use assert_fs::prelude::*;
use predicates::prelude::*;
use time::OffsetDateTime;

use super::*;
use crate::kind::Kind;
Expand Down Expand Up @@ -200,7 +207,12 @@ mod test {
assert!(message.contains("Not found"));
assert!(message.contains("nonexistent.json"));

assert!(err.path().expect("path").ends_with("nonexistent.json"));
assert!(err
.url
.as_ref()
.expect("url")
.path()
.ends_with("/nonexistent.json"));
assert_eq!(err.kind(), transport::ErrorKind::NotFound);
assert!(err.is_not_found());

Expand All @@ -218,13 +230,12 @@ mod test {

let transport = Transport::local(temp.path());

assert_eq!(
transport.metadata(filename).unwrap(),
Metadata {
len: 24,
kind: Kind::File
}
);
let metadata = transport.metadata(filename).unwrap();
dbg!(&metadata);

assert_eq!(metadata.len, 24);
assert_eq!(metadata.kind, Kind::File);
assert!(metadata.modified + Duration::from_secs(60) > OffsetDateTime::now_utc());
assert!(transport.metadata("nopoem").unwrap_err().is_not_found());
}

Expand Down
67 changes: 38 additions & 29 deletions src/transport/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
// cargo mutants -f s3.rs --no-config -C --features=s3,s3-integration-test

use std::fmt;
use std::path::Path;
use std::sync::Arc;
use std::time::SystemTime;

use aws_config::{AppName, BehaviorVersion};
use aws_sdk_s3::error::SdkError;
Expand Down Expand Up @@ -73,7 +73,11 @@ impl Protocol {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|err| Error::io_error(Path::new(""), err))?;
.map_err(|source| Error {
kind: ErrorKind::CreateTransport,
url: Some(url.to_owned()),
source: Some(Box::new(source)),
})?;

let bucket = url.authority().to_owned();
assert!(!bucket.is_empty(), "S3 bucket name is empty in {url:?}");
Expand Down Expand Up @@ -121,6 +125,24 @@ impl Protocol {
fn join_path(&self, relpath: &str) -> String {
join_paths(&self.base_path, relpath)
}

fn s3_error<E, R>(&self, key: &str, source: SdkError<E, R>) -> Error
where
E: std::error::Error + Send + Sync + 'static,
R: std::fmt::Debug + Send + Sync + 'static,
ErrorKind: for<'a> From<&'a E>,
{
debug!(s3_error = ?source);
let kind = match &source {
SdkError::ServiceError(service_err) => ErrorKind::from(service_err.err()),
_ => ErrorKind::Other,
};
Error {
kind,
url: self.url.join(key).ok(),
source: Some(source.into()),
}
}
}

impl fmt::Debug for Protocol {
Expand Down Expand Up @@ -221,7 +243,7 @@ impl super::Protocol for Protocol {
result.files.push(name.to_owned());
}
}
Some(Err(err)) => return Err(s3_error(prefix, err)),
Some(Err(err)) => return Err(self.s3_error(&prefix, err)),
None => break,
}
}
Expand All @@ -240,13 +262,13 @@ impl super::Protocol for Protocol {
let response = self
.runtime
.block_on(request.send())
.map_err(|source| s3_error(key.clone(), source))?;
.map_err(|source| self.s3_error(&key, source))?;
let body_bytes = self
.runtime
.block_on(response.body.collect())
.map_err(|source| Error {
kind: ErrorKind::Other,
path: Some(key.clone()),
url: self.url.join(relpath).ok(),
source: Some(Box::new(source)),
})?
.into_bytes();
Expand Down Expand Up @@ -279,7 +301,7 @@ impl super::Protocol for Protocol {
}
let response = self.runtime.block_on(request.send());
// trace!(?response);
response.map_err(|err| s3_error(key, err))?;
response.map_err(|err| self.s3_error(&key, err))?;
trace!(body_len = content.len(), "wrote file");
Ok(())
}
Expand All @@ -290,7 +312,7 @@ impl super::Protocol for Protocol {
let request = self.client.delete_object().bucket(&self.bucket).key(&key);
let response = self.runtime.block_on(request.send());
trace!(?response);
response.map_err(|err| s3_error(key, err))?;
response.map_err(|err| self.s3_error(&key, err))?;
trace!("deleted file");
Ok(())
}
Expand All @@ -311,7 +333,7 @@ impl super::Protocol for Protocol {
let mut n_files = 0;
while let Some(response) = self.runtime.block_on(stream.next()) {
for object in response
.map_err(|err| s3_error(prefix.clone(), err))?
.map_err(|err| self.s3_error(&prefix, err))?
.contents
.expect("ListObjectsV2Response has contents")
{
Expand All @@ -324,7 +346,7 @@ impl super::Protocol for Protocol {
.key(&key)
.send(),
)
.map_err(|err| s3_error(key, err))?;
.map_err(|err| self.s3_error(&key, err))?;
n_files += 1;
}
}
Expand All @@ -346,14 +368,20 @@ impl super::Protocol for Protocol {
.expect("S3 HeadObject response should have a content_length")
.try_into()
.expect("Content length non-negative");
let modified: SystemTime = response
.last_modified
.expect("S3 HeadObject response should have a last_modified")
.try_into()
.expect("S3 last_modified is valid SystemTime");
trace!(?len, "File exists");
Ok(Metadata {
kind: Kind::File,
len,
modified: modified.into(),
})
}
Err(err) => {
let translated = s3_error(key, err);
let translated = self.s3_error(&key, err);
if translated.is_not_found() {
trace!("file does not exist");
} else {
Expand All @@ -380,25 +408,6 @@ impl super::Protocol for Protocol {
}
}

fn s3_error<K, E, R>(key: K, source: SdkError<E, R>) -> Error
where
K: ToOwned<Owned = String>,
E: std::error::Error + Send + Sync + 'static,
R: std::fmt::Debug + Send + Sync + 'static,
ErrorKind: for<'a> From<&'a E>,
{
debug!(s3_error = ?source);
let kind = match &source {
SdkError::ServiceError(service_err) => ErrorKind::from(service_err.err()),
_ => ErrorKind::Other,
};
Error {
kind,
path: Some(key.to_owned()),
source: Some(source.into()),
}
}

impl From<&GetObjectError> for ErrorKind {
fn from(source: &GetObjectError) -> Self {
match source {
Expand Down
Loading

0 comments on commit 68d631c

Please sign in to comment.