Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix performance regression for DataFrame serialization/pickling #20641

Merged
merged 6 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions crates/polars-arrow/src/io/ipc/read/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub struct StreamMetadata {
}

/// Reads the metadata of the stream
pub fn read_stream_metadata<R: Read>(reader: &mut R) -> PolarsResult<StreamMetadata> {
pub fn read_stream_metadata(reader: &mut dyn std::io::Read) -> PolarsResult<StreamMetadata> {
// determine metadata length
let mut meta_size: [u8; 4] = [0; 4];
reader.read_exact(&mut meta_size)?;
Expand All @@ -48,10 +48,7 @@ pub fn read_stream_metadata<R: Read>(reader: &mut R) -> PolarsResult<StreamMetad

let mut buffer = vec![];
buffer.try_reserve(length)?;
reader
.by_ref()
.take(length as u64)
.read_to_end(&mut buffer)?;
reader.take(length as u64).read_to_end(&mut buffer)?;

deserialize_stream_metadata(&buffer)
}
Expand Down
70 changes: 69 additions & 1 deletion crates/polars-core/src/frame/chunks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use arrow::record_batch::RecordBatch;
use rayon::prelude::*;

use crate::prelude::*;
use crate::utils::_split_offsets;
use crate::utils::{_split_offsets, accumulate_dataframes_vertical_unchecked, split_df_as_ref};
use crate::POOL;

impl TryFrom<(RecordBatch, &ArrowSchema)> for DataFrame {
Expand Down Expand Up @@ -51,3 +51,71 @@ impl DataFrame {
}
}
}

/// Split DataFrame into chunks in preparation for writing. The chunks have a
/// maximum number of rows per chunk to ensure reasonable memory efficiency when
/// reading the resulting file, and a minimum size per chunk to ensure
/// reasonable performance when writing.
pub fn chunk_df_for_writing(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code is moved from polars-io

df: &mut DataFrame,
row_group_size: usize,
) -> PolarsResult<std::borrow::Cow<DataFrame>> {
// ensures all chunks are aligned.
df.align_chunks_par();

// Accumulate many small chunks to the row group size.
// See: #16403
if !df.get_columns().is_empty()
&& df.get_columns()[0]
.as_materialized_series()
.chunk_lengths()
.take(5)
.all(|len| len < row_group_size)
{
fn finish(scratch: &mut Vec<DataFrame>, new_chunks: &mut Vec<DataFrame>) {
let mut new = accumulate_dataframes_vertical_unchecked(scratch.drain(..));
new.as_single_chunk_par();
new_chunks.push(new);
}

let mut new_chunks = Vec::with_capacity(df.first_col_n_chunks()); // upper limit;
let mut scratch = vec![];
let mut remaining = row_group_size;

for df in df.split_chunks() {
remaining = remaining.saturating_sub(df.height());
scratch.push(df);

if remaining == 0 {
remaining = row_group_size;
finish(&mut scratch, &mut new_chunks);
}
}
if !scratch.is_empty() {
finish(&mut scratch, &mut new_chunks);
}
return Ok(std::borrow::Cow::Owned(
accumulate_dataframes_vertical_unchecked(new_chunks),
));
}

let n_splits = df.height() / row_group_size;
let result = if n_splits > 0 {
let mut splits = split_df_as_ref(df, n_splits, false);

for df in splits.iter_mut() {
// If the chunks are small enough, writing many small chunks
// leads to slow writing performance, so in that case we
// merge them.
let n_chunks = df.first_col_n_chunks();
if n_chunks > 1 && (df.estimated_size() / n_chunks < 128 * 1024) {
df.as_single_chunk_par();
}
}

std::borrow::Cow::Owned(accumulate_dataframes_vertical_unchecked(splits))
} else {
std::borrow::Cow::Borrowed(df)
};
Ok(result)
}
13 changes: 13 additions & 0 deletions crates/polars-core/src/frame/column/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,19 @@ impl Column {
}
}

/// Returns whether the flags were set
pub fn set_flags(&mut self, flags: StatisticsFlags) -> bool {
match self {
Column::Series(s) => {
s.set_flags(flags);
true
},
// @partition-opt
Column::Partitioned(_) => false,
Column::Scalar(_) => false,
}
}

pub fn vec_hash(&self, build_hasher: PlRandomState, buf: &mut Vec<u64>) -> PolarsResult<()> {
// @scalar-opt?
self.as_materialized_series().vec_hash(build_hasher, buf)
Expand Down
38 changes: 1 addition & 37 deletions crates/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::{HEAD_DEFAULT_LENGTH, TAIL_DEFAULT_LENGTH};
#[cfg(feature = "dataframe_arithmetic")]
mod arithmetic;
mod chunks;
pub use chunks::chunk_df_for_writing;
pub mod column;
pub mod explode;
mod from;
Expand Down Expand Up @@ -3578,41 +3579,4 @@ mod test {
assert_eq!(df.get_column_names(), &["a", "b", "c"]);
Ok(())
}

#[cfg(feature = "serde")]
#[test]
fn test_deserialize_height_validation_8751() {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test no longer works as serialization directly errors now on mismatching height

// Construct an invalid directly from the inner fields as the `new_unchecked_*` functions
// have debug assertions

use polars_utils::pl_serialize;

let df = DataFrame {
height: 2,
columns: vec![
Int64Chunked::full("a".into(), 1, 2).into_column(),
Int64Chunked::full("b".into(), 1, 1).into_column(),
],
cached_schema: OnceLock::new(),
};

// We rely on the fact that the serialization doesn't check the heights of all columns
let serialized = serde_json::to_string(&df).unwrap();
let err = serde_json::from_str::<DataFrame>(&serialized).unwrap_err();

assert!(err.to_string().contains(
"successful parse invalid data: lengths don't match: could not create a new DataFrame:",
));

let serialized = pl_serialize::SerializeOptions::default()
.serialize_to_bytes(&df)
.unwrap();
let err = pl_serialize::SerializeOptions::default()
.deserialize_from_reader::<DataFrame, _>(serialized.as_slice())
.unwrap_err();

assert!(err.to_string().contains(
"successful parse invalid data: lengths don't match: could not create a new DataFrame:",
));
}
}
188 changes: 157 additions & 31 deletions crates/polars-core/src/serde/df.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,144 @@
use polars_error::PolarsError;
use std::sync::Arc;

use arrow::datatypes::Metadata;
use arrow::io::ipc::read::{read_stream_metadata, StreamReader, StreamState};
use arrow::io::ipc::write::WriteOptions;
use polars_error::{polars_err, to_compute_err, PolarsResult};
use polars_utils::format_pl_smallstr;
use polars_utils::pl_serialize::deserialize_map_bytes;
use polars_utils::pl_str::PlSmallStr;
use serde::de::Error;
use serde::*;

use crate::prelude::{Column, DataFrame};

// utility to ensure we serde to a struct
// {
// columns: Vec<Series>
// }
// that ensures it differentiates between Vec<Series>
// and is backwards compatible
#[derive(Deserialize)]
struct Util {
columns: Vec<Column>,
}
use crate::chunked_array::flags::StatisticsFlags;
use crate::config;
use crate::frame::chunk_df_for_writing;
use crate::prelude::{CompatLevel, DataFrame, SchemaExt};
use crate::utils::accumulate_dataframes_vertical_unchecked;

#[derive(Serialize)]
struct UtilBorrowed<'a> {
columns: &'a [Column],
}
const FLAGS_KEY: PlSmallStr = PlSmallStr::from_static("_PL_FLAGS");

impl<'de> Deserialize<'de> for DataFrame {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let parsed = <Util>::deserialize(deserializer)?;
DataFrame::new(parsed.columns).map_err(|e| {
let e = PolarsError::ComputeError(format!("successful parse invalid data: {e}").into());
D::Error::custom::<PolarsError>(e)
})
impl DataFrame {
pub fn serialize_into_writer(&mut self, writer: &mut dyn std::io::Write) -> PolarsResult<()> {
let schema = self.schema();
Copy link
Collaborator Author

@nameexhaustion nameexhaustion Jan 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Serialization logic moved to impl DataFrame rather than on impl Series.


if schema.iter_values().any(|x| x.is_object()) {
return Err(polars_err!(
ComputeError:
"serializing data of type Object is not supported",
));
}

let mut ipc_writer =
arrow::io::ipc::write::StreamWriter::new(writer, WriteOptions { compression: None });

ipc_writer.set_custom_schema_metadata(Arc::new(Metadata::from_iter(
self.get_columns().iter().map(|c| {
(
format_pl_smallstr!("{}{}", FLAGS_KEY, c.name()),
PlSmallStr::from(c.get_flags().bits().to_string()),
)
}),
)));

ipc_writer.set_custom_schema_metadata(Arc::new(Metadata::from([(
FLAGS_KEY,
serde_json::to_string(
&self
.iter()
.map(|s| s.get_flags().bits())
.collect::<Vec<u32>>(),
)
.map_err(to_compute_err)?
.into(),
)])));

ipc_writer.start(&schema.to_arrow(CompatLevel::newest()), None)?;

for batch in chunk_df_for_writing(self, 512 * 512)?.iter_chunks(CompatLevel::newest(), true)
{
ipc_writer.write(&batch, None)?;
}

ipc_writer.finish()?;

Ok(())
}

pub fn serialize_to_bytes(&mut self) -> PolarsResult<Vec<u8>> {
let mut buf = vec![];
self.serialize_into_writer(&mut buf)?;

Ok(buf)
}

pub fn deserialize_from_reader(reader: &mut dyn std::io::Read) -> PolarsResult<Self> {
let mut md = read_stream_metadata(reader)?;
let arrow_schema = md.schema.clone();

let custom_metadata = md.custom_schema_metadata.take();

let reader = StreamReader::new(reader, md, None);
let dfs = reader
.into_iter()
.map_while(|batch| match batch {
Ok(StreamState::Some(batch)) => Some(DataFrame::try_from((batch, &arrow_schema))),
Ok(StreamState::Waiting) => None,
Err(e) => Some(Err(e)),
})
.collect::<PolarsResult<Vec<DataFrame>>>()?;

let mut df = accumulate_dataframes_vertical_unchecked(dfs);

// Set custom metadata (fallible)
(|| {
let custom_metadata = custom_metadata?;
let flags = custom_metadata.get(&FLAGS_KEY)?;

let flags: PolarsResult<Vec<u32>> = serde_json::from_str(flags).map_err(to_compute_err);

let verbose = config::verbose();

if let Err(e) = &flags {
if verbose {
eprintln!("DataFrame::read_ipc: Error parsing metadata flags: {}", e);
}
}

let flags = flags.ok()?;

if flags.len() != df.width() {
if verbose {
eprintln!(
"DataFrame::read_ipc: Metadata flags width mismatch: {} != {}",
flags.len(),
df.width()
);
}

return None;
}

let mut n_set = 0;

for (c, v) in unsafe { df.get_columns_mut() }.iter_mut().zip(flags) {
if let Some(flags) = StatisticsFlags::from_bits(v) {
n_set += c.set_flags(flags) as usize;
}
}

if verbose {
eprintln!(
"DataFrame::read_ipc: Loaded metadata for {} / {} columns",
n_set,
df.width()
);
}

Some(())
})();

Ok(df)
}
}

Expand All @@ -38,9 +147,26 @@ impl Serialize for DataFrame {
where
S: Serializer,
{
UtilBorrowed {
columns: &self.columns,
}
.serialize(serializer)
use serde::ser::Error;

let mut bytes = vec![];
self.clone()
.serialize_into_writer(&mut bytes)
.map_err(S::Error::custom)?;

serializer.serialize_bytes(bytes.as_slice())
}
Copy link
Collaborator Author

@nameexhaustion nameexhaustion Jan 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where the extra memcopy happens when going through serde - we are calling Serializer::serialize_bytes(bytes: &[u8])

}

impl<'de> Deserialize<'de> for DataFrame {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
deserialize_map_bytes(deserializer, &mut |b| {
let v = &mut b.as_ref();
Self::deserialize_from_reader(v)
})?
.map_err(D::Error::custom)
}
}
Loading
Loading