Skip to content

Commit

Permalink
support persist manifest efficientlly
Browse files Browse the repository at this point in the history
  • Loading branch information
baojinri committed Nov 15, 2024
1 parent e65c504 commit c346c9f
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 16 deletions.
1 change: 1 addition & 0 deletions horaedb/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions horaedb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ itertools = "0.3"
lazy_static = "1"
tracing = "0.1"
tracing-subscriber = "0.3"
log = "0.4"

# This profile optimizes for good runtime performance.
[profile.release]
Expand Down
1 change: 1 addition & 0 deletions horaedb/metric_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ datafusion = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
lazy_static = { workspace = true }
log = { workspace = true }
macros = { workspace = true }
object_store = { workspace = true }
parquet = { workspace = true, features = ["object_store"] }
Expand Down
233 changes: 217 additions & 16 deletions horaedb/metric_engine/src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,21 @@
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use anyhow::Context;
use bytes::Bytes;
use futures::StreamExt;
use log::{error, info};
use object_store::{path::Path, PutPayload};
use prost::Message;
use tokio::sync::RwLock;
use tokio::{
sync::{
mpsc::{self, Receiver, Sender},
RwLock,
},
time,
};

use crate::{
sst::{FileId, FileMeta, SstFile},
Expand All @@ -29,13 +39,16 @@ use crate::{

pub const PREFIX_PATH: &str = "manifest";
pub const SNAPSHOT_FILENAME: &str = "snapshot";
pub const SST_PREFIX: &str = "sst";

pub struct Manifest {
path: String,
snapshot_path: Path,
sst_path: Path,
store: ObjectStoreRef,

payload: RwLock<Payload>,
payload: Arc<RwLock<Payload>>,
tx: Sender<SstsMergeTask>,
}

pub struct Payload {
Expand Down Expand Up @@ -71,6 +84,18 @@ impl From<Payload> for pb_types::Manifest {
impl Manifest {
pub async fn try_new(path: String, store: ObjectStoreRef) -> Result<Self> {
let snapshot_path = Path::from(format!("{path}/{SNAPSHOT_FILENAME}"));
let sst_path = Path::from(format!("{path}/{SST_PREFIX}"));

let (tx, rx) = mpsc::channel(100);
let mut merge_sst = SstsMerge::try_new(
snapshot_path.clone(),
sst_path.clone(),
store.clone(),
rx,
SstsMergeOptions::default(),
)
.await?;

let payload = match store.get(&snapshot_path).await {
Ok(v) => {
let bytes = v
Expand All @@ -90,46 +115,55 @@ impl Manifest {
}
}
};
let payload = Arc::new(RwLock::new(payload));

tokio::spawn(async move {
merge_sst.run().await;
});

Ok(Self {
path,
snapshot_path,
sst_path,
store,
payload: RwLock::new(payload),
payload: payload,
tx,
})
}

// TODO: Now this functions is poorly implemented, we concat new_sst to
// snapshot, and upload it back in a whole.
// In more efficient way, we can create a new diff file, and do compaction in
// background to merge them to `snapshot`.
pub async fn add_file(&self, id: FileId, meta: FileMeta) -> Result<()> {
let mut payload = self.payload.write().await;
let mut tmp_ssts = payload.files.clone();
let new_sst_path = Path::from(format!("{}/{id}", self.sst_path));
let new_sst = SstFile { id, meta };
tmp_ssts.push(new_sst.clone());
let pb_manifest = pb_types::Manifest {
files: tmp_ssts.into_iter().map(|f| f.into()).collect::<Vec<_>>(),
};
let new_sst_payload = pb_types::SstFile::from(new_sst.clone());

let mut buf = Vec::with_capacity(pb_manifest.encoded_len());
pb_manifest
let mut buf: Vec<u8> = Vec::with_capacity(new_sst_payload.encoded_len());
new_sst_payload
.encode(&mut buf)
.context("failed to encode manifest")?;
let put_payload = PutPayload::from_bytes(Bytes::from(buf));

// 1. Persist the snapshot
self.store
.put(&self.snapshot_path, put_payload)
.put(&new_sst_path, put_payload)
.await
.context("Failed to update manifest")?;

// 2. Update cached payload
let mut payload = self.payload.write().await;
payload.files.push(new_sst);

// 3. Schedule merge
self.sender(SstsMergeTask::MergeSsts(1)).await;

Ok(())
}

async fn sender(&self, task: SstsMergeTask) {
if let Err(err) = self.tx.send(task).await {
error!("Failed to send merge ssts task, err: {:?}", err);
}
}

pub async fn find_ssts(&self, time_range: &TimeRange) -> Vec<SstFile> {
let payload = self.payload.read().await;

Expand All @@ -141,3 +175,170 @@ impl Manifest {
.collect()
}
}

enum SstsMergeTask {
ForceMergeSsts,
MergeSsts(usize),
}

struct SstsMergeOptions {
max_interval_seconds: usize,
merge_threshold: usize,
}

impl Default for SstsMergeOptions {
fn default() -> Self {
Self {
max_interval_seconds: 10,
merge_threshold: 50,
}
}
}

struct SstsMerge {
snapshot_path: Path,
sst_path: Path,
store: ObjectStoreRef,
receiver: Receiver<SstsMergeTask>,
sst_num: RwLock<usize>,
merge_options: SstsMergeOptions,
}

impl SstsMerge {
async fn try_new(
snapshot_path: Path,
sst_path: Path,
store: ObjectStoreRef,
rx: Receiver<SstsMergeTask>,
merge_options: SstsMergeOptions,
) -> Result<Self> {
let ssts_merge = Self {
snapshot_path,
sst_path,
store,
receiver: rx,
sst_num: RwLock::new(0),
merge_options,
};

// Merge all sst files when start
ssts_merge.merge_ssts().await?;
Ok(ssts_merge)
}

async fn run(&mut self) {
let interval = time::Duration::from_secs(self.merge_options.max_interval_seconds as u64);
loop {
match time::timeout(interval, self.receiver.recv()).await {
Ok(Some(SstsMergeTask::ForceMergeSsts)) => match self.merge_ssts().await {
Ok(_) => {
*self.sst_num.write().await = 0;
}
Err(err) => {
error!("Failed to force merge ssts, err: {:?}", err);
}
},
Ok(Some(SstsMergeTask::MergeSsts(num))) => {
let mut sst_num = self.sst_num.write().await;
*sst_num += num;
if *sst_num >= self.merge_options.merge_threshold {
match self.merge_ssts().await {
Ok(_) => {
*sst_num = 0;
}
Err(err) => {
error!("Failed to merge ssts, err: {:?}", err);
}
}
}
}
Ok(None) => {
// The channel is disconnected.
info!("Channel disconnected, merge ssts task exit");
break;
}
Err(_) => {
info!("Timeout receive merge ssts task");
}
}
}
}

async fn merge_ssts(&self) -> Result<()> {
let meta_infos = self
.store
.list(Some(&self.sst_path))
.collect::<Vec<_>>()
.await;
if meta_infos.len() == 0 {
return Ok(());
}

let mut sst_files = Vec::new();
let mut paths = Vec::with_capacity(meta_infos.len());
for meta_info in meta_infos {
let path = meta_info
.context("failed to get path of manifest sst")?
.location;
paths.push(path);
}

for path in &paths {
let bytes = self
.store
.get(path)
.await
.context("failed to read sst file")?
.bytes()
.await
.context("failed to read sst file")?;
let pb_sst = pb_types::SstFile::decode(bytes).context("failed to decode sst file")?;
sst_files.push(SstFile::try_from(pb_sst)?);
}

let mut payload = self
.store
.get(&self.snapshot_path)
.await
.context("Failed to get manifest snapshot")?
.bytes()
.await
.map(|v| {
let pb_payload =
pb_types::Manifest::decode(v).context("failed to decode manifest snapshot")?;
Payload::try_from(pb_payload)
})
.context("Failed to get manifest snapshot")??;

payload.files.extend(sst_files.clone());
let pb_manifest = pb_types::Manifest {
files: payload
.files
.into_iter()
.map(|f| f.into())
.collect::<Vec<_>>(),
};

let mut buf = Vec::with_capacity(pb_manifest.encoded_len());
pb_manifest
.encode(&mut buf)
.context("failed to encode manifest")?;
let put_payload = PutPayload::from_bytes(Bytes::from(buf));

// 1. Persist the snapshot
self.store
.put(&self.snapshot_path, put_payload)
.await
.context("Failed to update manifest")?;

// 2. Delete the old sst files
for path in &paths {
self.store
.delete(path)
.await
.context("failed to delete sst file")?;
}

Ok(())
}
}

0 comments on commit c346c9f

Please sign in to comment.