diff --git a/horaedb/metric_engine/src/lib.rs b/horaedb/metric_engine/src/lib.rs index 8a05223ca3..be7ef2b471 100644 --- a/horaedb/metric_engine/src/lib.rs +++ b/horaedb/metric_engine/src/lib.rs @@ -19,6 +19,7 @@ pub mod error; mod manifest; +mod read; mod sst; pub mod storage; pub mod types; diff --git a/horaedb/metric_engine/src/manifest.rs b/horaedb/metric_engine/src/manifest.rs index aceac3daba..0d865a040c 100644 --- a/horaedb/metric_engine/src/manifest.rs +++ b/horaedb/metric_engine/src/manifest.rs @@ -23,7 +23,7 @@ use tokio::sync::RwLock; use crate::{ sst::{FileId, FileMeta, SstFile}, - types::ObjectStoreRef, + types::{ObjectStoreRef, TimeRange}, AnyhowError, Error, Result, }; @@ -56,6 +56,18 @@ impl TryFrom for Payload { } } +impl From for pb_types::Manifest { + fn from(value: Payload) -> Self { + pb_types::Manifest { + files: value + .files + .into_iter() + .map(pb_types::SstFile::from) + .collect(), + } + } +} + impl Manifest { pub async fn try_new(path: String, store: ObjectStoreRef) -> Result { let snapshot_path = Path::from(format!("{path}/{SNAPSHOT_FILENAME}")); @@ -97,20 +109,7 @@ impl Manifest { let new_sst = SstFile { id, meta }; tmp_ssts.push(new_sst.clone()); let pb_manifest = pb_types::Manifest { - files: tmp_ssts - .into_iter() - .map(|f| pb_types::SstFile { - id: f.id, - meta: Some(pb_types::SstMeta { - max_sequence: f.meta.max_sequence, - num_rows: f.meta.num_rows, - time_range: Some(pb_types::TimeRange { - start: f.meta.time_range.start, - end: f.meta.time_range.end, - }), - }), - }) - .collect::>(), + files: tmp_ssts.into_iter().map(|f| f.into()).collect::>(), }; let mut buf = Vec::with_capacity(pb_manifest.encoded_len()); @@ -130,4 +129,15 @@ impl Manifest { Ok(()) } + + pub async fn find_ssts(&self, time_range: &TimeRange) -> Vec { + let payload = self.payload.read().await; + + payload + .files + .iter() + .filter(move |f| f.meta.time_range.overlaps(time_range)) + .cloned() + .collect() + } } diff --git a/horaedb/metric_engine/src/read.rs b/horaedb/metric_engine/src/read.rs new file mode 100644 index 0000000000..88522fe9cf --- /dev/null +++ b/horaedb/metric_engine/src/read.rs @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion::{ + datasource::physical_plan::{FileMeta, ParquetFileReaderFactory}, + error::Result as DfResult, + parquet::arrow::async_reader::AsyncFileReader, + physical_plan::metrics::ExecutionPlanMetricsSet, +}; +use parquet::arrow::async_reader::ParquetObjectReader; + +use crate::types::ObjectStoreRef; + +#[derive(Debug, Clone)] +pub struct DefaultParquetFileReaderFactory { + object_store: ObjectStoreRef, +} + +/// Returns a AsyncFileReader factory +impl DefaultParquetFileReaderFactory { + pub fn new(object_store: ObjectStoreRef) -> Self { + Self { object_store } + } +} + +impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { + fn create_reader( + &self, + _partition_index: usize, + file_meta: FileMeta, + metadata_size_hint: Option, + _metrics: &ExecutionPlanMetricsSet, + ) -> DfResult> { + let object_store = self.object_store.clone(); + let mut reader = ParquetObjectReader::new(object_store, file_meta.object_meta); + if let Some(size) = metadata_size_hint { + reader = reader.with_footer_size_hint(size); + } + Ok(Box::new(reader)) + } +} diff --git a/horaedb/metric_engine/src/sst.rs b/horaedb/metric_engine/src/sst.rs index 5eb96867ad..703b4bc41f 100644 --- a/horaedb/metric_engine/src/sst.rs +++ b/horaedb/metric_engine/src/sst.rs @@ -49,10 +49,20 @@ impl TryFrom for SstFile { } } +impl From for pb_types::SstFile { + fn from(value: SstFile) -> Self { + pb_types::SstFile { + id: value.id, + meta: Some(value.meta.into()), + } + } +} + #[derive(Clone, Debug)] pub struct FileMeta { pub max_sequence: u64, pub num_rows: u32, + pub size: u32, pub time_range: TimeRange, } @@ -66,14 +76,26 @@ impl TryFrom for FileMeta { Ok(Self { max_sequence: value.max_sequence, num_rows: value.num_rows, - time_range: TimeRange { - start: time_range.start, - end: time_range.end, - }, + size: value.size, + time_range: TimeRange::new(time_range.start.into(), time_range.end.into()), }) } } +impl From for pb_types::SstMeta { + fn from(value: FileMeta) -> Self { + pb_types::SstMeta { + max_sequence: value.max_sequence, + num_rows: value.num_rows, + size: value.size, + time_range: Some(pb_types::TimeRange { + start: *value.time_range.start, + end: *value.time_range.end, + }), + } + } +} + // Used for sst file id allocation. // This number mustn't go backwards on restarts, otherwise file id // collisions are possible. So don't change time on the server diff --git a/horaedb/metric_engine/src/storage.rs b/horaedb/metric_engine/src/storage.rs index 3a8d42f0fd..b64e7a8d55 100644 --- a/horaedb/metric_engine/src/storage.rs +++ b/horaedb/metric_engine/src/storage.rs @@ -25,10 +25,13 @@ use arrow::{ use async_trait::async_trait; use datafusion::{ common::DFSchema, - execution::{ - context::ExecutionProps, SendableRecordBatchStream as DFSendableRecordBatchStream, + datasource::{ + listing::PartitionedFile, + physical_plan::{FileScanConfig, ParquetExec}, }, - logical_expr::Expr, + execution::{context::ExecutionProps, object_store::ObjectStoreUrl, SendableRecordBatchStream}, + logical_expr::{utils::conjunction, Expr}, + physical_expr::{create_physical_expr, LexOrdering}, physical_plan::{execute_stream, memory::MemoryExec, sorts::sort::SortExec}, physical_planner::create_physical_sort_exprs, prelude::{ident, SessionContext}, @@ -43,8 +46,9 @@ use parquet::{ use crate::{ manifest::Manifest, + read::DefaultParquetFileReaderFactory, sst::{allocate_id, FileId, FileMeta}, - types::{ObjectStoreRef, SendableRecordBatchStream, TimeRange, Timestamp}, + types::{ObjectStoreRef, TimeRange, Timestamp, WriteResult}, Result, }; @@ -55,7 +59,7 @@ pub struct WriteRequest { pub struct ScanRequest { range: TimeRange, - predicate: Expr, + predicate: Vec, /// `None` means all columns. projections: Option>, } @@ -84,6 +88,8 @@ pub struct CloudObjectStorage { num_primary_key: usize, timestamp_index: usize, manifest: Manifest, + + df_schema: DFSchema, } /// It will organize the data in the following way: @@ -107,6 +113,7 @@ impl CloudObjectStorage { let manifest_prefix = crate::manifest::PREFIX_PATH; let manifest = Manifest::try_new(format!("{root_path}/{manifest_prefix}"), store.clone()).await?; + let df_schema = DFSchema::try_from(arrow_schema.clone()).context("build DFSchema")?; Ok(Self { path: root_path, num_primary_key, @@ -114,6 +121,7 @@ impl CloudObjectStorage { store, arrow_schema, manifest, + df_schema, }) } @@ -123,11 +131,11 @@ impl CloudObjectStorage { format!("{root}/{prefix}/{id}") } - async fn write_batch(&self, req: WriteRequest) -> Result { + async fn write_batch(&self, req: WriteRequest) -> Result { let file_id = allocate_id(); let file_path = self.build_file_path(file_id); - let object_store_writer = - ParquetObjectWriter::new(self.store.clone(), Path::from(file_path)); + let file_path = Path::from(file_path); + let object_store_writer = ParquetObjectWriter::new(self.store.clone(), file_path.clone()); let mut writer = AsyncArrowWriter::try_new(object_store_writer, self.schema().clone(), req.props) .context("create arrow writer")?; @@ -139,27 +147,38 @@ impl CloudObjectStorage { writer.write(&batch).await.context("write arrow batch")?; } writer.close().await.context("close arrow writer")?; - - Ok(file_id) + let object_meta = self + .store + .head(&file_path) + .await + .context("get object meta")?; + + Ok(WriteResult { + id: file_id, + size: object_meta.size, + }) } - async fn sort_batch(&self, batch: RecordBatch) -> Result { - let ctx = SessionContext::default(); - let schema = batch.schema(); - let df_schema = DFSchema::try_from(schema.clone()).context("build DFSchema")?; - + fn build_sort_exprs(&self) -> Result { let sort_exprs = (0..self.num_primary_key) .collect::>() .iter() - .map(|i| ident(schema.clone().field(*i).name()).sort(true, true)) + .map(|i| ident(self.schema().field(*i).name()).sort(true, true)) .collect::>(); - let physical_sort_exprs = - create_physical_sort_exprs(&sort_exprs, &df_schema, &ExecutionProps::default()) + let sort_exprs = + create_physical_sort_exprs(&sort_exprs, &self.df_schema, &ExecutionProps::default()) .context("create physical sort exprs")?; + Ok(sort_exprs) + } + + async fn sort_batch(&self, batch: RecordBatch) -> Result { + let ctx = SessionContext::default(); + let schema = batch.schema(); + let sort_exprs = self.build_sort_exprs()?; let batch_plan = MemoryExec::try_new(&[vec![batch]], schema, None).context("build batch plan")?; - let physical_plan = Arc::new(SortExec::new(physical_sort_exprs, Arc::new(batch_plan))); + let physical_plan = Arc::new(SortExec::new(sort_exprs, Arc::new(batch_plan))); let res = execute_stream(physical_plan, ctx.task_ctx()).context("execute sort physical plan")?; @@ -187,17 +206,18 @@ impl TimeMergeStorage for CloudObjectStorage { let mut start = Timestamp::MAX; let mut end = Timestamp::MIN; for v in time_column.values() { - start = start.min(*v); - end = end.max(*v); + start = start.min(Timestamp(*v)); + end = end.max(Timestamp(*v)); } - let time_range = TimeRange { - start, - end: end + 1, - }; - let file_id = self.write_batch(req).await?; + let time_range = TimeRange::new(start, end + 1); + let WriteResult { + id: file_id, + size: file_size, + } = self.write_batch(req).await?; let file_meta = FileMeta { max_sequence: file_id, // Since file_id in increasing order, we can use it as sequence. num_rows: num_rows as u32, + size: file_size as u32, time_range, }; self.manifest.add_file(file_id, file_meta).await?; @@ -206,7 +226,39 @@ impl TimeMergeStorage for CloudObjectStorage { } async fn scan(&self, req: ScanRequest) -> Result { - todo!() + let ssts = self.manifest.find_ssts(&req.range).await; + // we won't use url for selecting object_store. + let dummy_url = ObjectStoreUrl::parse("empty://").unwrap(); + // TODO: we could group ssts based on time range. + // TODO: fetch using multiple threads since read from parquet will incur CPU + // when convert between arrow and parquet. + let file_groups = ssts + .iter() + .map(|f| PartitionedFile::new(self.build_file_path(f.id), f.meta.size as u64)) + .collect::>(); + let scan_config = FileScanConfig::new(dummy_url, self.schema().clone()) + .with_file_group(file_groups) + .with_projection(req.projections); + + let mut builder = ParquetExec::builder(scan_config).with_parquet_file_reader_factory( + Arc::new(DefaultParquetFileReaderFactory::new(self.store.clone())), + ); + if let Some(expr) = conjunction(req.predicate) { + let filters = create_physical_expr(&expr, &self.df_schema, &ExecutionProps::new()) + .context("create pyhsical expr")?; + builder = builder.with_predicate(filters); + } + + let parquet_exec = builder.build(); + let sort_exprs = self.build_sort_exprs()?; + let physical_plan = Arc::new(SortExec::new(sort_exprs, Arc::new(parquet_exec))); + + let ctx = SessionContext::default(); + // TODO: dedup record batch based on primary keys and sequence number. + let res = + execute_stream(physical_plan, ctx.task_ctx()).context("execute sort physical plan")?; + + Ok(res) } async fn compact(&self, req: CompactRequest) -> Result<()> { diff --git a/horaedb/metric_engine/src/types.rs b/horaedb/metric_engine/src/types.rs index 96a4b74ad4..e6b518a503 100644 --- a/horaedb/metric_engine/src/types.rs +++ b/horaedb/metric_engine/src/types.rs @@ -15,23 +15,83 @@ // specific language governing permissions and limitations // under the License. -use std::{ops::Range, pin::Pin, sync::Arc}; +use std::{ + ops::{Add, Deref, Range}, + sync::Arc, +}; -use arrow::{array::RecordBatch, datatypes::Schema}; -use futures::Stream; use object_store::ObjectStore; -use crate::error::Result; +use crate::sst::FileId; -pub type Timestamp = i64; -pub type TimeRange = Range; +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub struct Timestamp(pub i64); -pub type ObjectStoreRef = Arc; +impl Add for Timestamp { + type Output = Self; + + fn add(self, rhs: Self) -> Self::Output { + Self(self.0 + rhs.0) + } +} + +impl Add for Timestamp { + type Output = Self; + + fn add(self, rhs: i64) -> Self::Output { + Self(self.0 + rhs) + } +} -/// Trait for types that stream [arrow::record_batch::RecordBatch] -pub trait RecordBatchStream: Stream> { - fn schema(&self) -> &Schema; +impl From for Timestamp { + fn from(value: i64) -> Self { + Self(value) + } } -/// Trait for a [`Stream`] of [`RecordBatch`]es -pub type SendableRecordBatchStream = Pin>; +impl Deref for Timestamp { + type Target = i64; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Timestamp { + pub const MAX: Timestamp = Timestamp(i64::MAX); + pub const MIN: Timestamp = Timestamp(i64::MIN); +} + +#[derive(Clone, Debug)] +pub struct TimeRange(Range); + +impl From> for TimeRange { + fn from(value: Range) -> Self { + Self(value) + } +} + +impl Deref for TimeRange { + type Target = Range; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl TimeRange { + pub fn new(start: Timestamp, end: Timestamp) -> Self { + Self(start..end) + } + + pub fn overlaps(&self, other: &TimeRange) -> bool { + self.0.start < other.0.end && other.0.start < self.0.end + } +} + +pub type ObjectStoreRef = Arc; + +pub struct WriteResult { + pub id: FileId, + pub size: usize, +} diff --git a/horaedb/pb_types/protos/sst.proto b/horaedb/pb_types/protos/sst.proto index ce3db30169..5312ffa70c 100644 --- a/horaedb/pb_types/protos/sst.proto +++ b/horaedb/pb_types/protos/sst.proto @@ -32,7 +32,8 @@ message TimeRange { message SstMeta { uint64 max_sequence = 1; uint32 num_rows = 2; - TimeRange time_range = 3; + uint32 size = 3; + TimeRange time_range = 4; } message SstFile {