Skip to content

Commit

Permalink
Push column projections down to the file IO layer (#568)
Browse files Browse the repository at this point in the history
Part of #567

---------

Co-authored-by: Robert Kruszewski <[email protected]>
  • Loading branch information
AdamGS and robert3005 authored Aug 8, 2024
1 parent 79d4df2 commit 9fb3327
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 34 deletions.
10 changes: 5 additions & 5 deletions vortex-serde/src/layouts/reader/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl BatchReader {
}

pub fn read(&mut self) -> VortexResult<Option<ReadResult>> {
let mut rr1 = Vec::new();
let mut messages = Vec::new();
for (i, child_array) in self
.arrays
.iter_mut()
Expand All @@ -34,8 +34,8 @@ impl BatchReader {
{
match self.children[i].read()? {
Some(rr) => match rr {
ReadResult::GetMsgs(r1) => {
rr1.extend(r1);
ReadResult::GetMsgs(message) => {
messages.extend(message);
}
ReadResult::Batch(a) => *child_array = Some(a),
},
Expand All @@ -49,7 +49,7 @@ impl BatchReader {
}
}

if rr1.is_empty() {
if messages.is_empty() {
let child_arrays = mem::replace(&mut self.arrays, vec![None; self.children.len()])
.into_iter()
.map(|a| a.unwrap());
Expand All @@ -58,7 +58,7 @@ impl BatchReader {
.into_array(),
)));
} else {
Ok(Some(ReadResult::GetMsgs(rr1)))
Ok(Some(ReadResult::GetMsgs(messages)))
}
}
}
50 changes: 33 additions & 17 deletions vortex-serde/src/layouts/reader/layouts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ use std::collections::VecDeque;
use std::sync::Arc;

use bytes::Bytes;
use flatbuffers::{ForwardsUOffset, Vector};
use vortex::Context;
use vortex_dtype::DType;
use vortex_error::{vortex_bail, vortex_err, VortexResult};
use vortex_flatbuffers::footer as fb;

use super::projections::Projection;
use crate::layouts::reader::batch::BatchReader;
use crate::layouts::reader::buffered::BufferedReader;
use crate::layouts::reader::context::{LayoutDeserializer, LayoutId, LayoutSpec};
Expand Down Expand Up @@ -129,7 +131,6 @@ impl ColumnLayout {
fb_bytes: Bytes,
fb_loc: usize,
scan: Scan,

layout_serde: LayoutDeserializer,
message_cache: RelativeLayoutCache,
) -> Self {
Expand All @@ -150,6 +151,26 @@ impl ColumnLayout {
};
fb_layout.layout_as_nested_layout().expect("must be nested")
}

fn read_child(
&self,
idx: usize,
children: Vector<ForwardsUOffset<fb::Layout>>,
st_dtype: &Arc<[DType]>,
) -> VortexResult<Box<dyn Layout>> {
let layout = children.get(idx);
let dtype = st_dtype[idx].clone();
// TODO: Figure out complex nested schema projections
let mut child_scan = self.scan.clone();
child_scan.projection = Projection::All;

self.layout_serde.read_layout(
self.fb_bytes.clone(),
layout._tab.loc(),
child_scan,
self.message_cache.relative(idx as u16, dtype),
)
}
}

impl Layout for ColumnLayout {
Expand All @@ -162,24 +183,19 @@ impl Layout for ColumnLayout {

let fb_children = self.flatbuffer().children().expect("must have children");

let columns = fb_children
.into_iter()
.enumerate()
.zip(s.dtypes().iter().cloned())
.map(|((idx, child), dtype)| {
self.layout_serde.read_layout(
self.fb_bytes.clone(),
child._tab.loc(),
self.scan.clone(),
self.message_cache.relative(idx as u16, dtype),
)
})
.collect::<VortexResult<Vec<_>>>()?;
let column_layouts = match self.scan.projection {
Projection::All => (0..fb_children.len())
.map(|idx| self.read_child(idx, fb_children, s.dtypes()))
.collect::<VortexResult<Vec<_>>>()?,
Projection::Partial(ref v) => v
.iter()
.map(|&idx| self.read_child(idx, fb_children, s.dtypes()))
.collect::<VortexResult<Vec<_>>>()?,
};

let mut reader = BatchReader::new(s.names().clone(), columns);
let rr = reader.read()?;
let reader = BatchReader::new(s.names().clone(), column_layouts);
self.state = ColumnLayoutState::ReadColumns(reader);
Ok(rr)
self.read()
}
ColumnLayoutState::ReadColumns(br) => br.read(),
}
Expand Down
15 changes: 3 additions & 12 deletions vortex-serde/src/layouts/reader/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use bytes::{Bytes, BytesMut};
use futures::Stream;
use futures_util::future::BoxFuture;
use futures_util::{stream, FutureExt, StreamExt, TryStreamExt};
use vortex::array::StructArray;
use vortex::compute::unary::subtract_scalar;
use vortex::compute::{filter, filter_indices, search_sorted, slice, take, SearchSortedSide};
use vortex::{Array, IntoArray, IntoArrayVariant};
Expand All @@ -16,7 +15,6 @@ use vortex_error::{VortexError, VortexResult};
use vortex_scalar::Scalar;

use crate::io::VortexReadAt;
use crate::layouts::reader::projections::Projection;
use crate::layouts::reader::schema::Schema;
use crate::layouts::reader::{Layout, LayoutMessageCache, MessageId, ReadResult, Scan};
use crate::writer::ByteRange;
Expand Down Expand Up @@ -94,10 +92,10 @@ impl<R: VortexReadAt + Unpin + Send + 'static> Stream for VortexLayoutBatchStrea
StreamingState::Init => {
if let Some(read) = self.layout.read()? {
match read {
ReadResult::GetMsgs(r1) => {
ReadResult::GetMsgs(messages) => {
let reader =
mem::take(&mut self.reader).expect("Invalid state transition");
let read_future = read_ranges(reader, r1).boxed();
let read_future = read_ranges(reader, messages).boxed();
self.state = StreamingState::Reading(read_future);
}
ReadResult::Batch(a) => self.state = StreamingState::Decoding(a),
Expand All @@ -117,15 +115,8 @@ impl<R: VortexReadAt + Unpin + Send + 'static> Stream for VortexLayoutBatchStrea
batch = filter(&batch, &mask)?;
}

let projected = match &self.scan.projection {
Projection::All => batch,
Projection::Partial(indices) => StructArray::try_from(batch.clone())?
.project(indices.as_ref())?
.into_array(),
};

self.state = StreamingState::Init;
return Poll::Ready(Some(Ok(projected)));
return Poll::Ready(Some(Ok(batch)));
}
StreamingState::Reading(f) => match ready!(f.poll_unpin(cx)) {
Ok((read, buffers)) => {
Expand Down

0 comments on commit 9fb3327

Please sign in to comment.