Skip to content

Commit

Permalink
feat(core): Implement list with deleted and versions for oss
Browse files Browse the repository at this point in the history
  • Loading branch information
hoslo committed Jan 9, 2025
1 parent 90c64a1 commit 60036fc
Show file tree
Hide file tree
Showing 4 changed files with 260 additions and 11 deletions.
45 changes: 34 additions & 11 deletions core/src/services/oss/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use reqsign::AliyunOssSigner;
use super::core::*;
use super::delete::OssDeleter;
use super::error::parse_error;
use super::lister::OssLister;
use super::lister::{OssLister, OssListers, OssObjectVersionsLister};
use super::writer::OssWriter;
use super::writer::OssWriters;
use crate::raw::*;
Expand Down Expand Up @@ -97,6 +97,13 @@ impl OssBuilder {
self
}

/// Set bucket versioning status for this backend
pub fn enable_versioning(mut self, enabled: bool) -> Self {
self.config.enable_versioning = enabled;

self
}

/// Set an endpoint for generating presigned urls.
///
/// You can offer a public endpoint like <https://oss-cn-beijing.aliyuncs.com> to return a presinged url for
Expand Down Expand Up @@ -408,6 +415,7 @@ impl Builder for OssBuilder {
host,
presign_endpoint,
allow_anonymous: self.config.allow_anonymous,
enable_versioning: self.config.enable_versioning,
signer,
loader,
client,
Expand All @@ -428,7 +436,7 @@ pub struct OssBackend {
impl Access for OssBackend {
type Reader = HttpBody;
type Writer = OssWriters;
type Lister = oio::PageLister<OssLister>;
type Lister = OssListers;
type Deleter = oio::BatchDeleter<OssDeleter>;
type BlockingReader = ();
type BlockingWriter = ();
Expand All @@ -449,16 +457,19 @@ impl Access for OssBackend {
stat_has_content_type: true,
stat_has_content_encoding: true,
stat_has_content_range: true,
stat_with_version: self.core.enable_versioning,
stat_has_etag: true,
stat_has_content_md5: true,
stat_has_last_modified: true,
stat_has_content_disposition: true,
stat_has_user_metadata: true,
stat_has_version: true,

read: true,

read_with_if_match: true,
read_with_if_none_match: true,
read_with_version: self.core.enable_versioning,

write: true,
write_can_empty: true,
Expand All @@ -468,7 +479,7 @@ impl Access for OssBackend {
write_with_content_type: true,
write_with_content_disposition: true,
// TODO: set this to false while version has been enabled.
write_with_if_not_exists: true,
write_with_if_not_exists: !self.core.enable_versioning,

// The min multipart size of OSS is 100 KiB.
//
Expand All @@ -485,6 +496,7 @@ impl Access for OssBackend {
write_with_user_metadata: true,

delete: true,
delete_with_version: self.core.enable_versioning,
delete_max_size: Some(self.core.delete_max_size),

copy: true,
Expand All @@ -495,6 +507,8 @@ impl Access for OssBackend {
list_with_recursive: true,
list_has_etag: true,
list_has_content_md5: true,
list_with_versions: self.core.enable_versioning,
list_with_deleted: self.core.enable_versioning,
list_has_content_length: true,
list_has_last_modified: true,

Expand Down Expand Up @@ -572,14 +586,23 @@ impl Access for OssBackend {
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
let l = OssLister::new(
self.core.clone(),
path,
args.recursive(),
args.limit(),
args.start_after(),
);
Ok((RpList::default(), oio::PageLister::new(l)))
let l = if args.versions() || args.deleted() {
TwoWays::Two(oio::PageLister::new(OssObjectVersionsLister::new(
self.core.clone(),
path,
args,
)))
} else {
TwoWays::One(oio::PageLister::new(OssLister::new(
self.core.clone(),
path,
args.recursive(),
args.limit(),
args.start_after(),
)))
};

Ok((RpList::default(), l))
}

async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
Expand Down
3 changes: 3 additions & 0 deletions core/src/services/oss/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ pub struct OssConfig {
/// Bucket for oss.
pub bucket: String,

/// is bucket versioning enabled for this bucket
pub enable_versioning: bool,

// OSS features
/// Server side encryption for oss.
pub server_side_encryption: Option<String>,
Expand Down
84 changes: 84 additions & 0 deletions core/src/services/oss/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub struct OssCore {
pub endpoint: String,
pub presign_endpoint: String,
pub allow_anonymous: bool,
pub enable_versioning: bool,

pub server_side_encryption: Option<HeaderValue>,
pub server_side_encryption_key_id: Option<HeaderValue>,
Expand Down Expand Up @@ -504,6 +505,50 @@ impl OssCore {
self.send(req).await
}

pub async fn oss_list_object_versions(
&self,
prefix: &str,
delimiter: &str,
limit: Option<usize>,
key_marker: &str,
version_id_marker: &str,
) -> Result<Response<Buffer>> {
let p = build_abs_path(&self.root, prefix);

let mut url = format!("{}?versions", self.endpoint);
if !p.is_empty() {
write!(url, "&prefix={}", percent_encode_path(p.as_str()))
.expect("write into string must succeed");
}
if !delimiter.is_empty() {
write!(url, "&delimiter={}", delimiter).expect("write into string must succeed");
}

if let Some(limit) = limit {
write!(url, "&max-keys={}", limit).expect("write into string must succeed");
}
if !key_marker.is_empty() {
write!(url, "&key-marker={}", percent_encode_path(key_marker))
.expect("write into string must succeed");
}
if !version_id_marker.is_empty() {
write!(
url,
"&version-id-marker={}",
percent_encode_path(version_id_marker)
)
.expect("write into string must succeed");
}

let mut req = Request::get(&url)
.body(Buffer::new())
.map_err(new_request_build_error)?;

self.sign(&mut req).await?;

self.send(req).await
}

pub async fn oss_delete_object(&self, path: &str, args: &OpDelete) -> Result<Response<Buffer>> {
let mut req = self.oss_delete_object_request(path, args)?;
self.sign(&mut req).await?;
Expand Down Expand Up @@ -752,6 +797,45 @@ pub struct CommonPrefix {
pub prefix: String,
}

#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct OutputCommonPrefix {
pub prefix: String,
}

/// Output of ListObjectVersions
#[derive(Default, Debug, Deserialize)]
#[serde(default, rename_all = "PascalCase")]
pub struct ListObjectVersionsOutput {
pub is_truncated: Option<bool>,
pub next_key_marker: Option<String>,
pub next_version_id_marker: Option<String>,
pub common_prefixes: Vec<OutputCommonPrefix>,
pub version: Vec<ListObjectVersionsOutputVersion>,
pub delete_marker: Vec<ListObjectVersionsOutputDeleteMarker>,
}

#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct ListObjectVersionsOutputVersion {
pub key: String,
pub version_id: String,
pub is_latest: bool,
pub size: u64,
pub last_modified: String,
#[serde(rename = "ETag")]
pub etag: Option<String>,
}

#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct ListObjectVersionsOutputDeleteMarker {
pub key: String,
pub version_id: String,
pub is_latest: bool,
pub last_modified: String,
}

#[cfg(test)]
mod tests {
use bytes::Buf;
Expand Down
139 changes: 139 additions & 0 deletions core/src/services/oss/lister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ use quick_xml::de;

use super::core::*;
use super::error::parse_error;
use crate::raw::oio::PageContext;
use crate::raw::*;
use crate::*;

pub type OssListers = TwoWays<oio::PageLister<OssLister>, oio::PageLister<OssObjectVersionsLister>>;

pub struct OssLister {
core: Arc<OssCore>,

Expand Down Expand Up @@ -115,3 +118,139 @@ impl oio::PageList for OssLister {
Ok(())
}
}

/// refer: https://help.aliyun.com/zh/oss/developer-reference/listobjectversions?spm=a2c4g.11186623.help-menu-31815.d_3_1_1_5_5_2.53f67237GJlMPw&scm=20140722.H_112467._.OR_help-T_cn~zh-V_1
pub struct OssObjectVersionsLister {
core: Arc<OssCore>,

prefix: String,
args: OpList,

delimiter: &'static str,
abs_start_after: Option<String>,
}

impl OssObjectVersionsLister {
pub fn new(core: Arc<OssCore>, path: &str, args: OpList) -> Self {
let delimiter = if args.recursive() { "" } else { "/" };
let abs_start_after = args
.start_after()
.map(|start_after| build_abs_path(&core.root, start_after));

Self {
core,
prefix: path.to_string(),
args,
delimiter,
abs_start_after,
}
}
}

impl oio::PageList for OssObjectVersionsLister {
async fn next_page(&self, ctx: &mut PageContext) -> Result<()> {
let markers = ctx.token.rsplit_once(" ");
let (key_marker, version_id_marker) = if let Some(data) = markers {
data
} else if let Some(start_after) = &self.abs_start_after {
(start_after.as_str(), "")
} else {
("", "")
};

let resp = self
.core
.oss_list_object_versions(
&self.prefix,
self.delimiter,
self.args.limit(),
key_marker,
version_id_marker,
)
.await?;
if resp.status() != http::StatusCode::OK {
return Err(parse_error(resp));
}

let body = resp.into_body();
let output: ListObjectVersionsOutput = de::from_reader(body.reader())
.map_err(new_xml_deserialize_error)
// Allow Cos list to retry on XML deserialization errors.
//
// This is because the Cos list API may return incomplete XML data under high load.
// We are confident that our XML decoding logic is correct. When this error occurs,
// we allow retries to obtain the correct data.
.map_err(Error::set_temporary)?;

ctx.done = if let Some(is_truncated) = output.is_truncated {
!is_truncated
} else {
false
};
ctx.token = format!(
"{} {}",
output.next_key_marker.unwrap_or_default(),
output.next_version_id_marker.unwrap_or_default()
);

for prefix in output.common_prefixes {
let de = oio::Entry::new(
&build_rel_path(&self.core.root, &prefix.prefix),
Metadata::new(EntryMode::DIR),
);
ctx.entries.push_back(de);
}

for version_object in output.version {
// `list` must be additive, so we need to include the latest version object
// even if `versions` is not enabled.
//
// Here we skip all non-latest version objects if `versions` is not enabled.
if !(self.args.versions() || version_object.is_latest) {
continue;
}

let mut path = build_rel_path(&self.core.root, &version_object.key);
if path.is_empty() {
path = "/".to_owned();
}

let mut meta = Metadata::new(EntryMode::from_path(&path));
meta.set_version(&version_object.version_id);
meta.set_is_current(version_object.is_latest);
meta.set_content_length(version_object.size);
meta.set_last_modified(parse_datetime_from_rfc3339(
version_object.last_modified.as_str(),
)?);
if let Some(etag) = version_object.etag {
meta.set_etag(&etag);
meta.set_content_md5(etag.trim_matches('"'));
}

let entry = oio::Entry::new(&path, meta);
ctx.entries.push_back(entry);
}

if self.args.deleted() {
for delete_marker in output.delete_marker {
let mut path = build_rel_path(&self.core.root, &delete_marker.key);
if path.is_empty() {
path = "/".to_owned();
}

let mut meta = Metadata::new(EntryMode::FILE);
meta.set_version(&delete_marker.version_id);
meta.set_is_deleted(true);
meta.set_is_current(delete_marker.is_latest);
meta.set_last_modified(parse_datetime_from_rfc3339(
delete_marker.last_modified.as_str(),
)?);

let entry = oio::Entry::new(&path, meta);
ctx.entries.push_back(entry);
}
}

Ok(())
}
}

0 comments on commit 60036fc

Please sign in to comment.