Skip to content

Commit

Permalink
fix: Only delete hdfs files with owned shuffle-server id (#32)
Browse files Browse the repository at this point in the history
* fix: Only delete hdfs files with owned shuffle-server id

* fix tests

* fix

* add

* fix2

* fix3

* ignore not found error

* fix tests
  • Loading branch information
zuston authored Jan 15, 2025
1 parent 7f4bfb9 commit 9b6e913
Show file tree
Hide file tree
Showing 12 changed files with 509 additions and 122 deletions.
145 changes: 86 additions & 59 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::config::{Config, StorageType};
use crate::error::WorkerError;
use crate::metric::{
GAUGE_APP_NUMBER, GAUGE_HUGE_PARTITION_NUMBER, GAUGE_PARTITION_NUMBER,
GAUGE_TOPN_APP_RESIDENT_BYTES, TOTAL_APP_FLUSHED_BYTES, TOTAL_APP_NUMBER,
GAUGE_TOPN_APP_RESIDENT_BYTES, PURGE_FAILED_COUNTER, TOTAL_APP_FLUSHED_BYTES, TOTAL_APP_NUMBER,
TOTAL_HUGE_PARTITION_NUMBER, TOTAL_HUGE_PARTITION_REQUIRE_BUFFER_FAILED,
TOTAL_PARTITION_NUMBER, TOTAL_READ_DATA, TOTAL_READ_DATA_FROM_LOCALFILE,
TOTAL_READ_DATA_FROM_MEMORY, TOTAL_READ_INDEX_FROM_LOCALFILE, TOTAL_RECEIVED_DATA,
Expand All @@ -42,7 +42,7 @@ use std::collections::hash_map::DefaultHasher;
use std::collections::{BTreeMap, HashMap, HashSet};

use std::hash::{Hash, Hasher};

use std::ops::Deref;
use std::str::FromStr;

use crate::await_tree::AWAIT_TREE_REGISTRY;
Expand Down Expand Up @@ -514,11 +514,9 @@ impl App {
Ok(())
}

pub async fn purge(&self, app_id: String, shuffle_id: Option<i32>) -> Result<()> {
let removed_size = self
.store
.purge(PurgeDataContext::new(app_id, shuffle_id))
.await?;
pub async fn purge(&self, reason: &PurgeReason) -> Result<()> {
let (app_id, shuffle_id) = reason.extract();
let removed_size = self.store.purge(&PurgeDataContext::new(reason)).await?;
self.total_resident_data_size
.fetch_sub(removed_size as u64, SeqCst);

Expand Down Expand Up @@ -564,27 +562,53 @@ impl App {
}
}

#[allow(non_camel_case_types)]
#[derive(Debug, Clone)]
pub struct PurgeDataContext {
pub(crate) app_id: String,
pub(crate) shuffle_id: Option<i32>,
pub enum PurgeReason {
SHUFFLE_LEVEL_EXPLICIT_UNREGISTER(String, i32),
APP_LEVEL_EXPLICIT_UNREGISTER(String),
APP_LEVEL_HEARTBEAT_TIMEOUT(String),
}

impl PurgeDataContext {
pub fn new(app_id: String, shuffle_id: Option<i32>) -> PurgeDataContext {
PurgeDataContext { app_id, shuffle_id }
impl PurgeReason {
pub fn extract(&self) -> (String, Option<i32>) {
match &self {
PurgeReason::SHUFFLE_LEVEL_EXPLICIT_UNREGISTER(x, y) => (x.to_owned(), Some(*y)),
PurgeReason::APP_LEVEL_EXPLICIT_UNREGISTER(x) => (x.to_owned(), None),
PurgeReason::APP_LEVEL_HEARTBEAT_TIMEOUT(x) => (x.to_owned(), None),
}
}

pub fn extract_app_id(&self) -> String {
match &self {
PurgeReason::SHUFFLE_LEVEL_EXPLICIT_UNREGISTER(x, y) => x.to_owned(),
PurgeReason::APP_LEVEL_EXPLICIT_UNREGISTER(x) => x.to_owned(),
PurgeReason::APP_LEVEL_HEARTBEAT_TIMEOUT(x) => x.to_owned(),
}
}
}

impl From<&str> for PurgeDataContext {
fn from(app_id_ref: &str) -> Self {
#[derive(Debug, Clone)]
pub struct PurgeDataContext {
pub purge_reason: PurgeReason,
}

impl PurgeDataContext {
pub fn new(reason: &PurgeReason) -> PurgeDataContext {
PurgeDataContext {
app_id: app_id_ref.to_string(),
shuffle_id: None,
purge_reason: reason.clone(),
}
}
}

impl Deref for PurgeDataContext {
type Target = PurgeReason;

fn deref(&self) -> &Self::Target {
&self.purge_reason
}
}

#[derive(Debug, Clone)]
pub struct ReportBlocksContext {
pub(crate) uid: PartitionedUId,
Expand Down Expand Up @@ -704,14 +728,8 @@ pub enum ReadingOptions {
// ==========================================================

#[derive(Debug, Clone)]
#[allow(non_camel_case_types)]
pub enum PurgeEvent {
// app_id
HEARTBEAT_TIMEOUT(String),
// app_id + shuffle_id
APP_PARTIAL_SHUFFLES_PURGE(String, i32),
// app_id
APP_PURGE(String),
pub struct PurgeEvent {
reason: PurgeReason,
}

pub type AppManagerRef = Arc<AppManager>;
Expand Down Expand Up @@ -777,7 +795,9 @@ impl AppManager {
key, current, last_time, app_manager_ref_cloned.app_heartbeat_timeout_min);
if app_manager_ref_cloned
.sender
.send(PurgeEvent::HEARTBEAT_TIMEOUT(key.clone()))
.send(PurgeEvent {
reason: PurgeReason::APP_LEVEL_HEARTBEAT_TIMEOUT(key.clone()),
})
.await
.is_err()
{
Expand Down Expand Up @@ -833,35 +853,31 @@ impl AppManager {

let app_manager_cloned = app_ref.clone();
runtime_manager.default_runtime.spawn(async move {
let await_root = AWAIT_TREE_REGISTRY.clone()
let await_root = AWAIT_TREE_REGISTRY
.clone()
.register(format!("App periodic purger"))
.await;
await_root.instrument(async move {
info!("Starting purge event handler...");
while let Ok(event) = app_manager_cloned.receiver.recv().instrument_await("waiting events coming...").await {
let _ = match event {
PurgeEvent::HEARTBEAT_TIMEOUT(app_id) => {
info!(
"The app:[{}]'s data will be purged due to heartbeat timeout",
&app_id
);
app_manager_cloned.purge_app_data(app_id, None).await
}
PurgeEvent::APP_PURGE(app_id) => {
info!(
"The app:[{}] has been finished, its data will be purged.",
&app_id
);
app_manager_cloned.purge_app_data(app_id, None).await
}
PurgeEvent::APP_PARTIAL_SHUFFLES_PURGE(app_id, shuffle_id) => {
info!("The app:[{:?}] with shuffleId: [{:?}] will be purged due to unregister service interface", &app_id, shuffle_id);
app_manager_cloned.purge_app_data(app_id, Some(shuffle_id)).await
await_root
.instrument(async move {
info!("Starting purge event handler...");
while let Ok(event) = app_manager_cloned
.receiver
.recv()
.instrument_await("waiting events coming...")
.await
{
let reason = event.reason;
info!("Purging data with reason: {:?}", &reason);
if let Err(err) = app_manager_cloned.purge_app_data(&reason).await {
PURGE_FAILED_COUNTER.inc();
error!(
"Errors on purging data with reason: {:?}. err: {:?}",
&reason, err
);
}
}
.map_err(|err| error!("Errors on purging data. error: {:?}", err));
}
}).await;
})
.await;
});

app_ref
Expand All @@ -887,7 +903,8 @@ impl AppManager {
self.store.get_spill_event_num()
}

async fn purge_app_data(&self, app_id: String, shuffle_id_option: Option<i32>) -> Result<()> {
async fn purge_app_data(&self, reason: &PurgeReason) -> Result<()> {
let (app_id, shuffle_id_option) = reason.extract();
let app = self.get_app(&app_id).ok_or(anyhow!(format!(
"App:{} don't exist when purging data, this should not happen",
&app_id
Expand All @@ -907,7 +924,7 @@ impl AppManager {
format!("{:?}", StorageType::HDFS).as_str(),
]);
}
app.purge(app_id.clone(), shuffle_id_option).await?;
app.purge(reason).await?;
Ok(())
}

Expand Down Expand Up @@ -947,13 +964,19 @@ impl AppManager {

pub async fn unregister_shuffle(&self, app_id: String, shuffle_id: i32) -> Result<()> {
self.sender
.send(PurgeEvent::APP_PARTIAL_SHUFFLES_PURGE(app_id, shuffle_id))
.send(PurgeEvent {
reason: PurgeReason::SHUFFLE_LEVEL_EXPLICIT_UNREGISTER(app_id, shuffle_id),
})
.await?;
Ok(())
}

pub async fn unregister_app(&self, app_id: String) -> Result<()> {
self.sender.send(PurgeEvent::APP_PURGE(app_id)).await?;
self.sender
.send(PurgeEvent {
reason: PurgeReason::APP_LEVEL_EXPLICIT_UNREGISTER(app_id),
})
.await?;
Ok(())
}

Expand Down Expand Up @@ -991,9 +1014,9 @@ impl PartitionedUId {
#[cfg(test)]
pub(crate) mod test {
use crate::app::{
AppManager, GetBlocksContext, GetMultiBlockIdsContext, PartitionedUId, ReadingOptions,
ReadingViewContext, ReportBlocksContext, ReportMultiBlockIdsContext, RequireBufferContext,
WritingViewContext,
AppManager, GetBlocksContext, GetMultiBlockIdsContext, PartitionedUId, PurgeReason,
ReadingOptions, ReadingViewContext, ReportBlocksContext, ReportMultiBlockIdsContext,
RequireBufferContext, WritingViewContext,
};
use crate::config::{Config, HybridStoreConfig, LocalfileStoreConfig, MemoryStoreConfig};
use bytes::Bytes;
Expand Down Expand Up @@ -1164,7 +1187,11 @@ pub(crate) mod test {

// case3: purge
runtime_manager
.wait(app_manager_ref.purge_app_data(app_id.to_string(), None))
.wait(
app_manager_ref.purge_app_data(&PurgeReason::APP_LEVEL_HEARTBEAT_TIMEOUT(
app_id.to_owned(),
)),
)
.expect("");

assert_eq!(false, app_manager_ref.get_app(app_id).is_none());
Expand Down
5 changes: 5 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ pub enum WorkerError {
#[error("{0}. error: {1}")]
HDFS_IO_ERROR(String, anyhow::Error),

#[error("dir or file is not found. error: {0}")]
DIR_OR_FILE_NOT_FOUND(anyhow::Error),

#[error("Out of memory. error: {0}")]
OUT_OF_MEMORY(anyhow::Error),

Expand Down Expand Up @@ -120,6 +123,8 @@ impl From<std::io::Error> for WorkerError {
fn from(err: std::io::Error) -> Self {
match err.kind() {
std::io::ErrorKind::OutOfMemory => WorkerError::OUT_OF_MEMORY(Error::new(err)),
// todo: should cover the hdfs-native not found error!
std::io::ErrorKind::NotFound => WorkerError::DIR_OR_FILE_NOT_FOUND(Error::new(err)),
_ => WorkerError::Other(Error::new(err)),
}
}
Expand Down
8 changes: 8 additions & 0 deletions src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,10 @@ pub static URPC_GET_LOCALFILE_DATA_TRANSPORT_TIME: Lazy<Histogram> = Lazy::new(|
pub static URPC_CONNECTION_NUMBER: Lazy<IntGauge> =
Lazy::new(|| IntGauge::new("urpc_connection_number", "urpc_connection_number").expect(""));

pub static PURGE_FAILED_COUNTER: Lazy<IntCounter> = Lazy::new(|| {
IntCounter::new("purge_failed_count", "purge_failed_count").expect("metric should be created")
});

// ===========

pub static TOTAL_MEMORY_USED: Lazy<IntCounter> = Lazy::new(|| {
Expand Down Expand Up @@ -664,6 +668,10 @@ pub static IO_SCHEDULER_APPEND_WAIT: Lazy<IntGaugeVec> =
Lazy::new(|| register_int_gauge_vec!("append_wait", "append_wait", &["root"]).unwrap());

fn register_custom_metrics() {
REGISTRY
.register(Box::new(PURGE_FAILED_COUNTER.clone()))
.expect("purge_failed_count must be registered");

REGISTRY
.register(Box::new(ALIGNMENT_BUFFER_POOL_ACQUIRED_MISS.clone()))
.expect("");
Expand Down
40 changes: 29 additions & 11 deletions src/store/hadoop/hdfs_native.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::error::WorkerError;
use crate::store::hadoop::HdfsDelegator;
use crate::store::hadoop::{FileStatus, HdfsDelegator};
use crate::store::BytesWrapper;
use anyhow::{Error, Result};
use async_trait::async_trait;
Expand Down Expand Up @@ -31,10 +31,6 @@ unsafe impl Send for HdfsNativeClient {}
unsafe impl Sync for HdfsNativeClient {}

impl HdfsNativeClient {
fn wrap_root(&self, path: &str) -> String {
format!("{}/{}", &self.inner.root, path)
}

pub(crate) fn new(root: String, configs: HashMap<String, String>) -> Result<HdfsNativeClient> {
// todo: do more optimizations!
let url = Url::parse(root.as_str())?;
Expand All @@ -60,7 +56,7 @@ impl HdfsNativeClient {
#[async_trait]
impl HdfsDelegator for HdfsNativeClient {
async fn touch(&self, file_path: &str) -> Result<()> {
let file_path = &self.wrap_root(file_path);
let file_path = &self.with_root(file_path)?;
self.inner
.client
.create(file_path, WriteOptions::default())
Expand All @@ -72,7 +68,7 @@ impl HdfsDelegator for HdfsNativeClient {

async fn append(&self, file_path: &str, data: BytesWrapper) -> Result<(), WorkerError> {
debug!("appending to {} with {} bytes", file_path, data.len());
let file_path = &self.wrap_root(file_path);
let file_path = &self.with_root(file_path)?;
let mut file_writer = self
.inner
.client
Expand All @@ -88,20 +84,42 @@ impl HdfsDelegator for HdfsNativeClient {
}

async fn len(&self, file_path: &str) -> Result<u64> {
let file_path = &self.wrap_root(file_path);
let file_path = &self.with_root(file_path)?;
let file_info = self.inner.client.get_file_info(file_path).await?;
Ok(file_info.length as u64)
}

async fn create_dir(&self, dir: &str) -> Result<()> {
let dir = &self.wrap_root(dir);
let dir = &self.with_root(dir)?;
let _ = self.inner.client.mkdirs(dir, 777, true).await?;
Ok(())
}

async fn delete_dir(&self, dir: &str) -> Result<()> {
let dir = &self.wrap_root(dir);
async fn delete_dir(&self, dir: &str) -> Result<(), WorkerError> {
let dir = &self.with_root(dir)?;
self.inner.client.delete(dir, true).await?;
Ok(())
}

async fn delete_file(&self, file_path: &str) -> Result<(), WorkerError> {
self.delete_dir(file_path).await
}

async fn list_status(&self, dir: &str) -> Result<Vec<FileStatus>, WorkerError> {
let complete_path = &self.with_root(dir)?;
let list = self.inner.client.list_status(complete_path, false).await?;
let mut vec = vec![];
for file_status in list {
let path = file_status.path;
vec.push(FileStatus {
path: self.without_root(&path)?,
is_dir: file_status.isdir,
});
}
Ok(vec)
}

fn root(&self) -> String {
self.inner.root.to_string()
}
}
Loading

0 comments on commit 9b6e913

Please sign in to comment.