Skip to content

Commit

Permalink
feat(event-bus): Simplify multi level event bus
Browse files Browse the repository at this point in the history
  • Loading branch information
zuston committed Dec 20, 2024
1 parent 269d79f commit 6f8e2b5
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 109 deletions.
30 changes: 9 additions & 21 deletions src/event_bus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ use tracing::Instrument;
pub trait Subscriber: Send + Sync {
type Input;

async fn on_event(&self, event: &Event<Self::Input>) -> bool;
async fn on_event(&self, event: Event<Self::Input>) -> bool;
}

#[derive(Debug, Clone)]
pub struct Event<T> {
pub data: T,
}
Expand Down Expand Up @@ -139,7 +140,7 @@ impl<T: Send + Sync + Clone + 'static> EventBus<T> {

let binding = bus.inner.subscriber.get();
let subscriber = binding.as_ref().unwrap();
let is_succeed = subscriber.on_event(&message).await;
let _ = subscriber.on_event(message).await;

timer.observe_duration();
GAUGE_EVENT_BUS_QUEUE_HANDLING_SIZE
Expand All @@ -151,11 +152,11 @@ impl<T: Send + Sync + Clone + 'static> EventBus<T> {

drop(concurrency_guarder);

let hook = bus.inner.event_executed_hook.clone();
if hook.get().is_some() {
let hook = hook.get().unwrap().clone();
hook(message, is_succeed)
}
// let hook = bus.inner.event_executed_hook.clone();
// if hook.get().is_some() {
// let hook = hook.get().unwrap().clone();
// hook(message, is_succeed)
// }
}));
}
}
Expand Down Expand Up @@ -212,16 +213,6 @@ mod test {
fn test_event_bus() -> anyhow::Result<()> {
let runtime = create_runtime(1, "test");
let mut event_bus = EventBus::new(&runtime, "test".to_string(), 1usize);

// create the hook
let hook_result_ref = Arc::new(AtomicBool::new(true));
let cloned = hook_result_ref.clone();
let func = move |message: Event<String>, is_succeed: bool| {
cloned.store(false, SeqCst);
};
let func = Box::new(func);
event_bus.with_hook(func);

let flag = Arc::new(AtomicI64::new(0));

struct SimpleCallback {
Expand All @@ -232,7 +223,7 @@ mod test {
impl Subscriber for SimpleCallback {
type Input = String;

async fn on_event(&self, event: &Event<Self::Input>) -> bool {
async fn on_event(&self, event: Event<Self::Input>) -> bool {
println!("SimpleCallback has accepted event: {:?}", event.get_data());
self.flag.fetch_add(1, Ordering::SeqCst);
true
Expand Down Expand Up @@ -263,9 +254,6 @@ mod test {
.get()
);

// case3: create the hook
assert_eq!(false, hook_result_ref.load(Relaxed));

Ok(())
}
}
91 changes: 27 additions & 64 deletions src/store/spill/hierarchy_event_bus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::runtime::manager::RuntimeManager;
use crate::store::spill::SpillMessage;
use anyhow::Result;
use dashmap::DashMap;
use log::warn;

// This is the predefined event bus for the spill operations.
// the parent is the dispatcher, it will firstly get the candidate
Expand All @@ -17,7 +16,7 @@ use log::warn;

pub struct HierarchyEventBus<T> {
parent: EventBus<T>,
children: DashMap<StorageType, EventBus<T>>,
pub(crate) children: DashMap<StorageType, EventBus<T>>,
}

impl HierarchyEventBus<SpillMessage> {
Expand Down Expand Up @@ -48,58 +47,11 @@ impl HierarchyEventBus<SpillMessage> {
hdfs_concurrency,
);

// dispatch event into the concrete handler when the selection is finished
let localfile_cloned = child_localfile.clone();
let hdfs_cloned = child_hdfs.clone();
let hook = move |msg: Event<SpillMessage>, is_succeed: bool| {
if !is_succeed {
return;
}

let msg = msg.data;
let stype = &msg.get_candidate_storage_type();
if stype.is_none() {
warn!(
"No any candidates storage. Ignore this. app: {}",
&msg.ctx.uid.app_id
);
} else {
if let Some(stype) = stype {
let _ = match stype {
LOCALFILE => localfile_cloned.sync_publish(msg.into()),
HDFS => hdfs_cloned.sync_publish(msg.into()),
_ => Ok(()),
};
};
}
};
parent.with_hook(Box::new(hook));

// setup the retry or drop hook for children handlers
let parent_cloned = parent.clone();
let hook = move |msg: Event<SpillMessage>, is_succeed: bool| {
if is_succeed {
return;
}
if let Err(err) = parent_cloned.sync_publish(msg) {
warn!(
"Errors on resending the event into parent event bus. err: {:#?}",
err
);
}
};
child_localfile.with_hook(Box::new(hook.clone()));
child_hdfs.with_hook(Box::new(hook));

let parent_cloned = parent.clone();
let children = DashMap::new();
children.insert(LOCALFILE, child_localfile);
children.insert(HDFS, child_hdfs);

Self {
parent: parent_cloned,
children,
}
Self { parent, children }
}

pub fn subscribe<
Expand All @@ -123,8 +75,8 @@ impl HierarchyEventBus<SpillMessage> {

#[cfg(test)]
mod tests {
use crate::config::Config;
use crate::config::StorageType::{HDFS, LOCALFILE};
use crate::config::{Config, StorageType};
use crate::event_bus::{Event, Subscriber};
use crate::runtime::manager::RuntimeManager;
use crate::store::spill::hierarchy_event_bus::HierarchyEventBus;
Expand All @@ -140,16 +92,26 @@ mod tests {
struct SelectionHandler {
ops: Arc<AtomicU64>,
result_ref: Arc<AtomicBool>,
event_bus: Arc<HierarchyEventBus<SpillMessage>>,
}
#[async_trait]
impl Subscriber for SelectionHandler {
type Input = SpillMessage;

async fn on_event(&self, event: &Event<Self::Input>) -> bool {
async fn on_event(&self, event: Event<Self::Input>) -> bool {
let msg = &event.data;
msg.set_candidate_storage_type(LOCALFILE);
self.ops.fetch_add(1, SeqCst);
self.result_ref.load(SeqCst)
if self.result_ref.load(SeqCst) {
let _ = self
.event_bus
.children
.get(&LOCALFILE)
.unwrap()
.publish(event)
.await;
}
true
}
}

Expand All @@ -159,24 +121,23 @@ mod tests {
result_ref: Arc<AtomicBool>,
failure_counter: Arc<AtomicU64>,
failure_max: u64,
event_bus: Arc<HierarchyEventBus<SpillMessage>>,
}
#[async_trait]
impl Subscriber for FlushHandler {
type Input = SpillMessage;

async fn on_event(&self, event: &Event<Self::Input>) -> bool {
async fn on_event(&self, event: Event<Self::Input>) -> bool {
println!("Flushed");
self.ops.fetch_add(1, SeqCst);
let is_succeed = self.result_ref.load(SeqCst);
if !is_succeed {
return if self.failure_counter.load(SeqCst) >= self.failure_max {
true
} else {
self.failure_counter.fetch_add(1, SeqCst);
false
};
if self.result_ref.load(SeqCst) {
return true;
}
if self.failure_counter.load(SeqCst) < self.failure_max {
let _ = self.event_bus.publish(event).await;
self.failure_counter.fetch_add(1, SeqCst);
}
return true;
true
}
}

Expand Down Expand Up @@ -223,7 +184,7 @@ mod tests {
fn test_event_bus() -> Result<()> {
let runtime_manager = RuntimeManager::default();
let config = Config::create_simple_config();
let event_bus = HierarchyEventBus::new(&runtime_manager, &config);
let event_bus = Arc::new(HierarchyEventBus::new(&runtime_manager, &config));

let select_handler_ops = Arc::new(AtomicU64::new(0));
let select_handler_result = Arc::new(AtomicBool::new(true));
Expand All @@ -233,6 +194,7 @@ mod tests {
let select_handler = SelectionHandler {
ops: cloned,
result_ref: result_cloned,
event_bus: event_bus.clone(),
};

let flush_handler_ops = Arc::new(AtomicU64::new(0));
Expand All @@ -245,6 +207,7 @@ mod tests {
result_ref: result_cloned,
failure_counter: Default::default(),
failure_max: 3,
event_bus: event_bus.clone(),
};

event_bus.subscribe(select_handler, flush_handler);
Expand Down
30 changes: 17 additions & 13 deletions src/store/spill/storage_flush_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::store::hybrid::HybridStore;
use crate::store::spill::metrics::FlushingMetricsMonitor;
use crate::store::spill::{handle_spill_failure, handle_spill_success, SpillMessage};
use async_trait::async_trait;
use log::{error, warn};
use std::sync::Arc;

#[derive(Clone)]
Expand All @@ -23,28 +24,31 @@ unsafe impl Sync for StorageFlushHandler {}
impl Subscriber for StorageFlushHandler {
type Input = SpillMessage;

async fn on_event(&self, event: &Event<Self::Input>) -> bool {
async fn on_event(&self, event: Event<Self::Input>) -> bool {
let message = event.get_data();
let app_id = &message.ctx.uid.app_id;

let _ =
FlushingMetricsMonitor::new(app_id, message.size, message.get_candidate_storage_type());

let result = self.store.flush_storage_for_buffer(message).await;
let result = if result.is_ok() {
// release resource
handle_spill_success(message, self.store.clone()).await;
true
} else {
if let Err(err) = result {
match result {
Ok(_) => {
handle_spill_success(message, self.store.clone()).await;
}
Err(err) => {
message.inc_retry_counter();
let could_be_retried = handle_spill_failure(err, message, self.store.clone()).await;
!could_be_retried
} else {
true
if could_be_retried {
if let Err(e) = &self.store.event_bus.publish(event).await {
error!(
"Errors on resending the event into parent event bus. err: {:#?}",
e
);
}
}
}
};

result
}
true
}
}
26 changes: 15 additions & 11 deletions src/store/spill/storage_select_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,23 @@ unsafe impl Sync for StorageSelectHandler {}
impl Subscriber for StorageSelectHandler {
type Input = SpillMessage;

async fn on_event(&self, event: &Event<Self::Input>) -> bool {
async fn on_event(&self, event: Event<Self::Input>) -> bool {
let msg = event.get_data();
let select_result = self.store.select_storage_for_buffer(msg).await;

if select_result.is_ok() {
msg.set_candidate_storage_type(select_result.unwrap());
return true;
}

if let Err(err) = select_result {
error!("Errors on the selecting storage for app: {:?} and then drop this event. error: {:?}", &msg.ctx.uid, &err);
handle_spill_failure_whatever_error(msg, self.store.clone(), err).await;
let upstream_event_bus = &self.store.event_bus;
match select_result {
Ok(storage) => {
if let Some(event_bus) = upstream_event_bus.children.get(&storage) {
msg.set_candidate_storage_type(storage);
let _ = event_bus.publish(event).await;
}
true
}
Err(e) => {
error!("Errors on the selecting storage for app: {:?} and then drop this event. error: {:?}", &msg.ctx.uid, &e);
handle_spill_failure_whatever_error(msg, self.store.clone(), e).await;
false
}
}
false
}
}

0 comments on commit 6f8e2b5

Please sign in to comment.