diff --git a/src/transport.rs b/src/transport.rs index 82884f98..5dec9a1b 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -20,6 +20,7 @@ use std::{error, fmt, io, result}; use bytes::Bytes; use derive_more::Display; +use time::OffsetDateTime; use url::Url; use crate::*; @@ -45,6 +46,7 @@ pub mod s3; /// support streaming or partial reads and writes. #[derive(Clone)] pub struct Transport { + /// The concrete protocol implementation: local, S3, etc. protocol: Arc, } @@ -84,7 +86,7 @@ impl Transport { _other => { return Err(Error { kind: ErrorKind::UrlScheme, - path: Some(url.as_str().to_owned()), + url: Some(url.clone()), source: None, }) } @@ -174,9 +176,18 @@ pub enum WriteMode { trait Protocol: Send + Sync { fn read_file(&self, path: &str) -> Result; + + /// Write a complete file. + /// + /// Depending on the [WriteMode] this may either overwrite existing files, or error. + /// + /// As much as possible, the file should be written atomically so that it is only visible with + /// the complete content. fn write_file(&self, relpath: &str, content: &[u8], mode: WriteMode) -> Result<()>; fn list_dir(&self, relpath: &str) -> Result; fn create_dir(&self, relpath: &str) -> Result<()>; + + /// Get metadata about a file. fn metadata(&self, relpath: &str) -> Result; /// Delete a file. @@ -212,6 +223,9 @@ pub struct Metadata { /// Kind of file. pub kind: Kind, + + /// Last modified time. + pub modified: OffsetDateTime, } /// A list of all the files and directories in a directory. @@ -225,11 +239,11 @@ pub struct ListDir { #[derive(Debug)] pub struct Error { /// What type of generally known error? - kind: ErrorKind, + pub kind: ErrorKind, /// The underlying error: for example an IO or S3 error. - source: Option>, - /// The affected path, possibly relative to the transport. - path: Option, + pub source: Option>, + /// The affected URL, if known. + pub url: Option, } /// General categories of transport errors. @@ -244,6 +258,12 @@ pub enum ErrorKind { #[display(fmt = "Permission denied")] PermissionDenied, + #[display(fmt = "Create transport error")] + CreateTransport, + + #[display(fmt = "Connect error")] + Connect, + #[display(fmt = "Unsupported URL scheme")] UrlScheme, @@ -268,10 +288,17 @@ impl Error { } pub(self) fn io_error(path: &Path, source: io::Error) -> Error { + let kind = match source.kind() { + io::ErrorKind::NotFound => ErrorKind::NotFound, + io::ErrorKind::AlreadyExists => ErrorKind::AlreadyExists, + io::ErrorKind::PermissionDenied => ErrorKind::PermissionDenied, + _ => ErrorKind::Other, + }; + Error { - kind: source.kind().into(), source: Some(Box::new(source)), - path: Some(path.to_string_lossy().to_string()), + url: Url::from_file_path(path).ok(), + kind, } } @@ -279,17 +306,17 @@ impl Error { self.kind == ErrorKind::NotFound } - /// The transport-relative path where this error occurred, if known. - pub fn path(&self) -> Option<&str> { - self.path.as_deref() + /// The URL where this error occurred, if known. + pub fn url(&self) -> Option<&Url> { + self.url.as_ref() } } impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}", self.kind)?; - if let Some(ref path) = self.path { - write!(f, ": {}", path)?; + if let Some(ref url) = self.url { + write!(f, ": {url}")?; } if let Some(source) = &self.source { // I'm not sure we should write this here; it might be repetitive. diff --git a/src/transport/local.rs b/src/transport/local.rs index 80d5f1e6..275dafdc 100644 --- a/src/transport/local.rs +++ b/src/transport/local.rs @@ -133,9 +133,14 @@ impl super::Protocol for Protocol { fn metadata(&self, relpath: &str) -> Result { let path = self.full_path(relpath); let fsmeta = path.metadata().map_err(|err| Error::io_error(&path, err))?; + let modified = fsmeta + .modified() + .map_err(|err| Error::io_error(&path, err))? + .into(); Ok(Metadata { len: fsmeta.len(), kind: fsmeta.file_type().into(), + modified, }) } @@ -164,9 +169,11 @@ impl super::Protocol for Protocol { #[cfg(test)] mod test { use std::error::Error; + use std::time::Duration; use assert_fs::prelude::*; use predicates::prelude::*; + use time::OffsetDateTime; use super::*; use crate::kind::Kind; @@ -200,7 +207,12 @@ mod test { assert!(message.contains("Not found")); assert!(message.contains("nonexistent.json")); - assert!(err.path().expect("path").ends_with("nonexistent.json")); + assert!(err + .url + .as_ref() + .expect("url") + .path() + .ends_with("/nonexistent.json")); assert_eq!(err.kind(), transport::ErrorKind::NotFound); assert!(err.is_not_found()); @@ -218,13 +230,12 @@ mod test { let transport = Transport::local(temp.path()); - assert_eq!( - transport.metadata(filename).unwrap(), - Metadata { - len: 24, - kind: Kind::File - } - ); + let metadata = transport.metadata(filename).unwrap(); + dbg!(&metadata); + + assert_eq!(metadata.len, 24); + assert_eq!(metadata.kind, Kind::File); + assert!(metadata.modified + Duration::from_secs(60) > OffsetDateTime::now_utc()); assert!(transport.metadata("nopoem").unwrap_err().is_not_found()); } diff --git a/src/transport/s3.rs b/src/transport/s3.rs index a9a4bb48..25683f16 100644 --- a/src/transport/s3.rs +++ b/src/transport/s3.rs @@ -25,8 +25,8 @@ // cargo mutants -f s3.rs --no-config -C --features=s3,s3-integration-test use std::fmt; -use std::path::Path; use std::sync::Arc; +use std::time::SystemTime; use aws_config::{AppName, BehaviorVersion}; use aws_sdk_s3::error::SdkError; @@ -73,7 +73,11 @@ impl Protocol { let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() .build() - .map_err(|err| Error::io_error(Path::new(""), err))?; + .map_err(|source| Error { + kind: ErrorKind::CreateTransport, + url: Some(url.to_owned()), + source: Some(Box::new(source)), + })?; let bucket = url.authority().to_owned(); assert!(!bucket.is_empty(), "S3 bucket name is empty in {url:?}"); @@ -121,6 +125,24 @@ impl Protocol { fn join_path(&self, relpath: &str) -> String { join_paths(&self.base_path, relpath) } + + fn s3_error(&self, key: &str, source: SdkError) -> Error + where + E: std::error::Error + Send + Sync + 'static, + R: std::fmt::Debug + Send + Sync + 'static, + ErrorKind: for<'a> From<&'a E>, + { + debug!(s3_error = ?source); + let kind = match &source { + SdkError::ServiceError(service_err) => ErrorKind::from(service_err.err()), + _ => ErrorKind::Other, + }; + Error { + kind, + url: self.url.join(key).ok(), + source: Some(source.into()), + } + } } impl fmt::Debug for Protocol { @@ -221,7 +243,7 @@ impl super::Protocol for Protocol { result.files.push(name.to_owned()); } } - Some(Err(err)) => return Err(s3_error(prefix, err)), + Some(Err(err)) => return Err(self.s3_error(&prefix, err)), None => break, } } @@ -240,13 +262,13 @@ impl super::Protocol for Protocol { let response = self .runtime .block_on(request.send()) - .map_err(|source| s3_error(key.clone(), source))?; + .map_err(|source| self.s3_error(&key, source))?; let body_bytes = self .runtime .block_on(response.body.collect()) .map_err(|source| Error { kind: ErrorKind::Other, - path: Some(key.clone()), + url: self.url.join(relpath).ok(), source: Some(Box::new(source)), })? .into_bytes(); @@ -279,7 +301,7 @@ impl super::Protocol for Protocol { } let response = self.runtime.block_on(request.send()); // trace!(?response); - response.map_err(|err| s3_error(key, err))?; + response.map_err(|err| self.s3_error(&key, err))?; trace!(body_len = content.len(), "wrote file"); Ok(()) } @@ -290,7 +312,7 @@ impl super::Protocol for Protocol { let request = self.client.delete_object().bucket(&self.bucket).key(&key); let response = self.runtime.block_on(request.send()); trace!(?response); - response.map_err(|err| s3_error(key, err))?; + response.map_err(|err| self.s3_error(&key, err))?; trace!("deleted file"); Ok(()) } @@ -311,7 +333,7 @@ impl super::Protocol for Protocol { let mut n_files = 0; while let Some(response) = self.runtime.block_on(stream.next()) { for object in response - .map_err(|err| s3_error(prefix.clone(), err))? + .map_err(|err| self.s3_error(&prefix, err))? .contents .expect("ListObjectsV2Response has contents") { @@ -324,7 +346,7 @@ impl super::Protocol for Protocol { .key(&key) .send(), ) - .map_err(|err| s3_error(key, err))?; + .map_err(|err| self.s3_error(&key, err))?; n_files += 1; } } @@ -346,14 +368,20 @@ impl super::Protocol for Protocol { .expect("S3 HeadObject response should have a content_length") .try_into() .expect("Content length non-negative"); + let modified: SystemTime = response + .last_modified + .expect("S3 HeadObject response should have a last_modified") + .try_into() + .expect("S3 last_modified is valid SystemTime"); trace!(?len, "File exists"); Ok(Metadata { kind: Kind::File, len, + modified: modified.into(), }) } Err(err) => { - let translated = s3_error(key, err); + let translated = self.s3_error(&key, err); if translated.is_not_found() { trace!("file does not exist"); } else { @@ -380,25 +408,6 @@ impl super::Protocol for Protocol { } } -fn s3_error(key: K, source: SdkError) -> Error -where - K: ToOwned, - E: std::error::Error + Send + Sync + 'static, - R: std::fmt::Debug + Send + Sync + 'static, - ErrorKind: for<'a> From<&'a E>, -{ - debug!(s3_error = ?source); - let kind = match &source { - SdkError::ServiceError(service_err) => ErrorKind::from(service_err.err()), - _ => ErrorKind::Other, - }; - Error { - kind, - path: Some(key.to_owned()), - source: Some(source.into()), - } -} - impl From<&GetObjectError> for ErrorKind { fn from(source: &GetObjectError) -> Self { match source { diff --git a/src/transport/sftp.rs b/src/transport/sftp.rs index 8efe1c00..5f409fec 100644 --- a/src/transport/sftp.rs +++ b/src/transport/sftp.rs @@ -9,6 +9,7 @@ use std::path::PathBuf; use std::sync::Arc; use bytes::Bytes; +use time::OffsetDateTime; use tracing::{error, info, instrument, trace, warn}; use url::Url; @@ -32,17 +33,17 @@ impl Protocol { ); let tcp_stream = TcpStream::connect(addr).map_err(|err| { error!(?err, ?url, "Error opening SSH TCP connection"); - io_error(err, url.as_ref()) + io_error(err, url) })?; trace!("got tcp connection"); let mut session = ssh2::Session::new().map_err(|err| { error!(?err, "Error opening SSH session"); - ssh_error(err, url.as_ref()) + ssh_error(err, url) })?; session.set_tcp_stream(tcp_stream); session.handshake().map_err(|err| { error!(?err, "Error in SSH handshake"); - ssh_error(err, url.as_ref()) + ssh_error(err, url) })?; trace!( "SSH hands shaken, banner: {}", @@ -57,12 +58,12 @@ impl Protocol { }; session.userauth_agent(&username).map_err(|err| { error!(?err, username, "Error in SSH user auth with agent"); - ssh_error(err, url.as_ref()) + ssh_error(err, url) })?; trace!("Authenticated!"); let sftp = session.sftp().map_err(|err| { error!(?err, "Error opening SFTP session"); - ssh_error(err, url.as_ref()) + ssh_error(err, url) })?; Ok(Protocol { url: url.to_owned(), @@ -75,7 +76,15 @@ impl Protocol { trace!("lstat {path}"); self.sftp .lstat(&self.base_path.join(path)) - .map_err(|err| ssh_error(err, path)) + .map_err(|err| self.ssh_error(err, path)) + } + + fn relative_url(&self, path: &str) -> Url { + self.url.join(path).expect("join URL") + } + + fn ssh_error(&self, source: ssh2::Error, path: &str) -> Error { + ssh_error(source, &self.relative_url(path)) } } @@ -87,7 +96,7 @@ impl super::Protocol for Protocol { let mut dirs = Vec::new(); let mut dir = self.sftp.opendir(full_path).map_err(|err| { error!(?err, ?full_path, "Error opening directory"); - ssh_error(err, full_path.to_string_lossy().as_ref()) + self.ssh_error(err, path) })?; loop { match dir.readdir() { @@ -112,7 +121,7 @@ impl super::Protocol for Protocol { } Err(err) => { info!("SFTP error {:?}", err); - return Err(ssh_error(err, path)); + return Err(self.ssh_error(err, path)); } } } @@ -121,15 +130,16 @@ impl super::Protocol for Protocol { fn read_file(&self, path: &str) -> Result { let full_path = self.base_path.join(path); - trace!("attempt open {}", full_path.display()); + let url = &self.url.join(path).expect("join URL"); + trace!("read {url}"); let mut buf = Vec::with_capacity(2 << 20); let mut file = self .sftp .open(&full_path) - .map_err(|err| ssh_error(err, path))?; + .map_err(|err| self.ssh_error(err, path))?; let len = file .read_to_end(&mut buf) - .map_err(|err| io_error(err, path))?; + .map_err(|err| io_error(err, url))?; assert_eq!(len, buf.len()); trace!("read {} bytes from {}", len, full_path.display()); Ok(buf.into()) @@ -146,7 +156,7 @@ impl super::Protocol for Protocol { } Err(err) => { warn!(?err, ?relpath); - Err(ssh_error(err, relpath)) + Err(self.ssh_error(err, relpath)) } } } @@ -168,7 +178,7 @@ impl super::Protocol for Protocol { .open_mode(&full_path, flags, 0o644, ssh2::OpenType::File) .map_err(|err| { warn!(?err, ?relpath, "sftp error creating file"); - ssh_error(err, relpath) + self.ssh_error(err, relpath) })?; if let Err(err) = file.write_all(content) { warn!(?err, ?full_path, "sftp error writing file"); @@ -179,7 +189,11 @@ impl super::Protocol for Protocol { "sftp error unlinking file after write error" ); } - return Err(super::Error::io_error(&full_path, err)); + return Err(super::Error { + url: Some(self.relative_url(relpath)), + source: Some(Box::new(err)), + kind: ErrorKind::Other, + }); } Ok(()) } @@ -188,9 +202,26 @@ impl super::Protocol for Protocol { let full_path = self.base_path.join(relpath); let stat = self.lstat(relpath)?; trace!("metadata {full_path:?}"); + let modified = stat.mtime.ok_or_else(|| { + warn!("No mtime for {full_path:?}"); + super::Error { + kind: ErrorKind::Other, + source: None, + url: Some(self.relative_url(relpath)), + } + })?; + let modified = OffsetDateTime::from_unix_timestamp(modified as i64).map_err(|err| { + warn!("Invalid mtime for {full_path:?}"); + super::Error { + kind: ErrorKind::Other, + source: Some(Box::new(err)), + url: Some(self.relative_url(relpath)), + } + })?; Ok(super::Metadata { kind: stat.file_type().into(), len: stat.size.unwrap_or_default(), + modified, }) } @@ -199,7 +230,7 @@ impl super::Protocol for Protocol { trace!("remove_file {full_path:?}"); self.sftp .unlink(&full_path) - .map_err(|err| ssh_error(err, relpath)) + .map_err(|err| self.ssh_error(err, relpath)) } #[instrument] @@ -226,12 +257,9 @@ impl super::Protocol for Protocol { trace!(?dir, "rmdir"); self.sftp .rmdir(&full_path) - .map_err(|err| ssh_error(err, dir))?; + .map_err(|err| self.ssh_error(err, dir))?; } Ok(()) - // let full_path = self.base_path.join(relpath); - // trace!("remove_dir {full_path:?}"); - // self.sftp.rmdir(&full_path).map_err(translate_error) } fn chdir(&self, relpath: &str) -> Arc { @@ -283,18 +311,18 @@ impl From for ErrorKind { } } -fn ssh_error(source: ssh2::Error, path: &str) -> super::Error { +fn ssh_error(source: ssh2::Error, url: &Url) -> super::Error { super::Error { kind: source.code().into(), source: Some(Box::new(source)), - path: Some(path.to_owned()), + url: Some(url.clone()), } } -fn io_error(source: io::Error, path: &str) -> Error { +fn io_error(source: io::Error, url: &Url) -> Error { Error { kind: source.kind().into(), source: Some(Box::new(source)), - path: Some(path.to_owned()), + url: Some(url.clone()), } }