From 2f92564f939b7d8b68f0383465aed86200008a0f Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Thu, 9 May 2024 11:29:03 +1000 Subject: [PATCH 01/13] wip --- crates/polars-io/src/csv/read/mod.rs | 4 +- crates/polars-io/src/csv/read/options.rs | 319 +++++++++++-- crates/polars-io/src/csv/read/read_impl.rs | 22 +- .../src/csv/read/read_impl/batched_mmap.rs | 21 +- .../src/csv/read/read_impl/batched_read.rs | 23 +- crates/polars-io/src/csv/read/reader.rs | 434 ++++++------------ crates/polars-io/src/ipc/ipc_reader_async.rs | 2 +- crates/polars-io/src/options.rs | 4 +- crates/polars-io/src/parquet/read/reader.rs | 3 +- crates/polars-lazy/src/frame/mod.rs | 2 +- .../src/physical_plan/executors/scan/csv.rs | 48 +- crates/polars-lazy/src/scan/csv.rs | 13 +- crates/polars-lazy/src/tests/io.rs | 4 +- crates/polars-lazy/src/tests/mod.rs | 14 +- .../polars-pipe/src/executors/sources/csv.rs | 47 +- .../src/logical_plan/builder_dsl.rs | 48 +- .../src/logical_plan/conversion/dsl_to_ir.rs | 2 +- .../src/logical_plan/conversion/scans.rs | 22 +- .../polars-plan/src/logical_plan/file_scan.rs | 4 +- .../src/logical_plan/functions/count.rs | 9 +- .../optimizer/predicate_pushdown/mod.rs | 9 +- .../optimizer/projection_pushdown/mod.rs | 2 +- py-polars/src/batched_csv.rs | 47 +- py-polars/src/dataframe/io.rs | 66 ++- py-polars/src/lazyframe/mod.rs | 30 +- py-polars/src/lazyframe/visitor/nodes.rs | 2 +- 26 files changed, 650 insertions(+), 551 deletions(-) diff --git a/crates/polars-io/src/csv/read/mod.rs b/crates/polars-io/src/csv/read/mod.rs index 5f5b93948f02..91c95fc70143 100644 --- a/crates/polars-io/src/csv/read/mod.rs +++ b/crates/polars-io/src/csv/read/mod.rs @@ -13,7 +13,7 @@ //! fn example() -> PolarsResult { //! // Prefer `from_path` over `new` as it is faster. //! CsvReader::from_path("example.csv")? -//! .has_header(true) +//! .with_has_header(true) //! .finish() //! } //! ``` @@ -26,7 +26,7 @@ mod reader; mod splitfields; mod utils; -pub use options::{CommentPrefix, CsvEncoding, CsvReaderOptions, NullValues}; +pub use options::{CommentPrefix, CsvEncoding, CsvParseOptions, CsvReadOptions, NullValues}; pub use parser::count_rows; pub use read_impl::batched_mmap::{BatchedCsvReaderMmap, OwnedBatchedCsvReaderMmap}; pub use read_impl::batched_read::{BatchedCsvReaderRead, OwnedBatchedCsvReader}; diff --git a/crates/polars-io/src/csv/read/options.rs b/crates/polars-io/src/csv/read/options.rs index 2764d085d093..0dad82a0d9b1 100644 --- a/crates/polars-io/src/csv/read/options.rs +++ b/crates/polars-io/src/csv/read/options.rs @@ -1,58 +1,314 @@ +use std::path::PathBuf; +use std::sync::Arc; + +use polars_core::datatypes::DataType; use polars_core::schema::{IndexOfSchema, Schema, SchemaRef}; use polars_error::PolarsResult; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; +use crate::RowIndex; + #[derive(Clone, Debug, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -pub struct CsvReaderOptions { +pub struct CsvReadOptions { + pub path: Option, + // Performance related options + pub rechunk: bool, + pub n_threads: Option, + pub low_memory: bool, + // Row-wise options + pub n_rows: Option, + pub row_index: Option, + // Column-wise options + pub columns: Option>>, + pub projection: Option>>, + pub schema: Option, + pub schema_overwrite: Option, + pub dtype_overwrite: Option>>, + // CSV-specific options + pub parse_options: Arc, pub has_header: bool, + pub sample_size: usize, + pub chunk_size: usize, + pub skip_rows: usize, + pub skip_rows_after_header: usize, + pub infer_schema_length: Option, + pub raise_if_empty: bool, + // TODO: Ask if we should rename this to `ignore_parse_errors` to make it + // clear this targets parsing errors. + pub ignore_errors: bool, +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct CsvParseOptions { pub separator: u8, pub quote_char: Option, - pub comment_prefix: Option, pub eol_char: u8, pub encoding: CsvEncoding, - pub skip_rows: usize, - pub skip_rows_after_header: usize, - pub schema: Option, - pub schema_overwrite: Option, - pub infer_schema_length: Option, - pub try_parse_dates: bool, pub null_values: Option, - pub ignore_errors: bool, - pub raise_if_empty: bool, + pub missing_is_null: bool, pub truncate_ragged_lines: bool, + pub comment_prefix: Option, + pub try_parse_dates: bool, pub decimal_comma: bool, - pub n_threads: Option, - pub low_memory: bool, } -impl Default for CsvReaderOptions { +impl Default for CsvReadOptions { fn default() -> Self { Self { + path: None, + + rechunk: true, + n_threads: None, + low_memory: false, + + n_rows: None, + row_index: None, + + columns: None, + projection: None, + schema: None, + schema_overwrite: None, + dtype_overwrite: None, + + parse_options: Default::default(), has_header: true, - separator: b',', - quote_char: Some(b'"'), - comment_prefix: None, - eol_char: b'\n', - encoding: CsvEncoding::default(), + sample_size: 1024, + chunk_size: 1 << 18, skip_rows: 0, skip_rows_after_header: 0, - schema: None, - schema_overwrite: None, infer_schema_length: Some(100), - try_parse_dates: false, - null_values: None, - ignore_errors: false, raise_if_empty: true, + ignore_errors: false, + } + } +} + +/// Options related to parsing the CSV format. +impl Default for CsvParseOptions { + fn default() -> Self { + Self { + separator: b',', + quote_char: Some(b'"'), + eol_char: b'\n', + encoding: Default::default(), + null_values: None, + missing_is_null: true, + truncate_ragged_lines: false, + comment_prefix: None, + try_parse_dates: false, decimal_comma: false, - n_threads: None, - low_memory: false, } } } +impl CsvReadOptions { + pub fn get_parse_options(&self) -> Arc { + self.parse_options.clone() + } + + pub fn with_path>(mut self, path: Option

) -> Self { + self.path = path.map(|p| p.into()); + self + } + + /// Whether to makes the columns contiguous in memory. + pub fn with_rechunk(mut self, rechunk: bool) -> Self { + self.rechunk = rechunk; + self + } + + /// Number of threads to use for reading. Defaults to the size of the polars + /// thread pool. + pub fn with_n_threads(mut self, n_threads: Option) -> Self { + self.n_threads = n_threads; + self + } + + /// Reduce memory consumption at the expense of performance + pub fn with_low_memory(mut self, low_memory: bool) -> Self { + self.low_memory = low_memory; + self + } + + /// Limits the number of rows to read. + pub fn with_n_rows(mut self, n_rows: Option) -> Self { + self.n_rows = n_rows; + self + } + + /// Adds a row index column. + pub fn with_row_index(mut self, row_index: Option) -> Self { + self.row_index = row_index; + self + } + + /// Which columns to select. + pub fn with_columns(mut self, columns: Option>>) -> Self { + self.columns = columns; + self + } + + /// Which columns to select denoted by their index. The index starts from 0 + /// (i.e. [0, 4] would select the 1st and 5th column). + pub fn with_projection(mut self, projection: Option>>) -> Self { + self.projection = projection; + self + } + + /// Set the schema to use for CSV file. The length of the schema must match + /// the number of columns in the file. If this is [None], the schema is + /// inferred from the file. + pub fn with_schema(mut self, schema: Option) -> Self { + self.schema = schema; + self + } + + /// Overwrites the data types in the schema by column name. + pub fn with_schema_overwrite(mut self, schema_overwrite: Option) -> Self { + self.schema_overwrite = schema_overwrite; + self + } + + /// Overwrite the dtypes in the schema in the order of the slice that's given. + /// This is useful if you don't know the column names beforehand + pub fn with_dtype_overwrite(mut self, dtype_overwrite: Option>>) -> Self { + self.dtype_overwrite = dtype_overwrite; + self + } + + /// Sets the CSV parsing options. + pub fn with_parse_options(mut self, parse_options: CsvParseOptions) -> Self { + self.parse_options = Arc::new(parse_options); + self + } + + /// Sets whether the CSV file has a header row. + pub fn with_has_header(mut self, has_header: bool) -> Self { + self.has_header = has_header; + self + } + + /// Sets the number of rows sampled from the file to determine approximately + /// how much memory to use for the initial allocation. + pub fn with_sample_size(mut self, sample_size: usize) -> Self { + self.sample_size = sample_size; + self + } + + /// Sets the chunk size used by the parser. This influences performance. + pub fn with_chunk_size(mut self, chunk_size: usize) -> Self { + self.chunk_size = chunk_size; + self + } + + /// Number of rows to skip before the header row. + pub fn with_skip_rows(mut self, skip_rows: usize) -> Self { + self.skip_rows = skip_rows; + self + } + + /// Number of rows to skip after the header row. + pub fn with_skip_rows_after_header(mut self, skip_rows_after_header: usize) -> Self { + self.skip_rows_after_header = skip_rows_after_header; + self + } + + /// Number of rows to use for schema inference. Pass [None] to use all rows. + pub fn with_infer_schema_length(mut self, infer_schema_length: Option) -> Self { + self.infer_schema_length = infer_schema_length; + self + } + + /// Whether to raise an error if the frame is empty. By default an empty + /// DataFrame is returned. + pub fn with_raise_if_empty(mut self, raise_if_empty: bool) -> Self { + self.raise_if_empty = raise_if_empty; + self + } + + /// Continue with next batch when a ParserError is encountered. + pub fn with_ignore_errors(mut self, ignore_errors: bool) -> Self { + self.ignore_errors = ignore_errors; + self + } +} + +impl CsvParseOptions { + /// The character used to separate fields in the CSV file. This + /// is most often a comma ','. + pub fn with_separator(mut self, separator: u8) -> Self { + self.separator = separator; + self + } + + /// Set the character used for field quoting. This is most often double + /// quotes '"'. Set this to [None] to disable quote parsing. + pub fn with_quote_char(mut self, quote_char: Option) -> Self { + self.quote_char = quote_char; + self + } + + /// Set the character used to indicate an end-of-line (eol). + pub fn with_eol_char(mut self, eol_char: u8) -> Self { + self.eol_char = eol_char; + self + } + + /// Set the encoding used by the file. + pub fn with_encoding(mut self, encoding: CsvEncoding) -> Self { + self.encoding = encoding; + self + } + + /// Set values that will be interpreted as missing/null. + /// + /// Note: These values are matched before quote-parsing, so if the null values + /// are quoted then those quotes also need to be included here. + pub fn with_null_values(mut self, null_values: Option) -> Self { + self.null_values = null_values; + self + } + + /// Treat missing fields as null. + pub fn with_missing_is_null(mut self, missing_is_null: bool) -> Self { + self.missing_is_null = missing_is_null; + self + } + + /// Truncate lines that are longer than the schema. + pub fn with_truncate_ragged_lines(mut self, truncate_ragged_lines: bool) -> Self { + self.truncate_ragged_lines = truncate_ragged_lines; + self + } + + /// Sets the comment prefix for this instance. Lines starting with this + /// prefix will be ignored. + pub fn with_comment_prefix>( + mut self, + comment_prefix: Option, + ) -> Self { + self.comment_prefix = comment_prefix.map(Into::into); + self + } + + /// Automatically try to parse dates/datetimes and time. If parsing fails, + /// columns remain of dtype `[DataType::String]`. + pub fn with_try_parse_dates(mut self, try_parse_dates: bool) -> Self { + self.try_parse_dates = try_parse_dates; + self + } + + /// Parse floats with a comma as decimal separator. + pub fn with_decimal_comma(mut self, decimal_comma: bool) -> Self { + self.decimal_comma = decimal_comma; + self + } +} + #[derive(Copy, Clone, Debug, Default, Eq, PartialEq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum CsvEncoding { @@ -70,7 +326,7 @@ pub enum CommentPrefix { Single(u8), /// A string that indicates the start of a comment line. /// This allows for multiple characters to be used as a comment identifier. - Multi(String), + Multi(Arc), } impl CommentPrefix { @@ -81,7 +337,7 @@ impl CommentPrefix { /// Creates a new `CommentPrefix` for the `Multi` variant. pub fn new_multi(prefix: String) -> Self { - CommentPrefix::Multi(prefix) + CommentPrefix::Multi(Arc::from(prefix.as_str())) } /// Creates a new `CommentPrefix` from a `&str`. @@ -90,11 +346,17 @@ impl CommentPrefix { let c = prefix.as_bytes()[0]; CommentPrefix::Single(c) } else { - CommentPrefix::Multi(prefix.to_string()) + CommentPrefix::Multi(Arc::from(prefix)) } } } +impl From<&str> for CommentPrefix { + fn from(value: &str) -> Self { + Self::new_from_str(value) + } +} + #[derive(Clone, Debug, Eq, PartialEq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum NullValues { @@ -123,6 +385,7 @@ impl NullValues { } } +#[derive(Debug, Clone)] pub(super) enum NullValuesCompiled { /// A single value that's used for all columns AllColumnsSingle(String), diff --git a/crates/polars-io/src/csv/read/read_impl.rs b/crates/polars-io/src/csv/read/read_impl.rs index 5805d1898fdc..3331d453f765 100644 --- a/crates/polars-io/src/csv/read/read_impl.rs +++ b/crates/polars-io/src/csv/read/read_impl.rs @@ -96,7 +96,7 @@ pub(crate) struct CoreReader<'a> { /// Optional projection for which columns to load (zero-based column indices) projection: Option>, /// Current line number, used in error reporting - line_number: usize, + current_line: usize, ignore_errors: bool, skip_rows_before_header: usize, // after the header, we need to take embedded lines into account @@ -126,7 +126,7 @@ impl<'a> fmt::Debug for CoreReader<'a> { f.debug_struct("Reader") .field("schema", &self.schema) .field("projection", &self.projection) - .field("line_number", &self.line_number) + .field("current_line", &self.current_line) .finish() } } @@ -143,11 +143,11 @@ impl<'a> CoreReader<'a> { has_header: bool, ignore_errors: bool, schema: Option, - columns: Option>, + columns: Option>>, encoding: CsvEncoding, mut n_threads: Option, schema_overwrite: Option, - dtype_overwrite: Option<&'a [DataType]>, + dtype_overwrite: Option>>, sample_size: usize, chunk_size: usize, low_memory: bool, @@ -165,7 +165,9 @@ impl<'a> CoreReader<'a> { truncate_ragged_lines: bool, decimal_comma: bool, ) -> PolarsResult> { - check_decimal_comma(decimal_comma, separator.unwrap_or(b','))?; + let separator = separator.unwrap_or(b','); + + check_decimal_comma(decimal_comma, separator)?; #[cfg(any(feature = "decompress", feature = "decompress-fast"))] let mut reader_bytes = reader_bytes; @@ -176,10 +178,6 @@ impl<'a> CoreReader<'a> { compile with feature 'decompress' or 'decompress-fast'" ); } - - // check if schema should be inferred - let separator = separator.unwrap_or(b','); - // We keep track of the inferred schema bool // In case the file is compressed this schema inference is wrong and has to be done // again after decompression. @@ -229,8 +227,8 @@ impl<'a> CoreReader<'a> { if let Some(cols) = columns { let mut prj = Vec::with_capacity(cols.len()); - for col in cols { - let i = schema.try_index_of(&col)?; + for col in cols.as_ref() { + let i = schema.try_index_of(col)?; prj.push(i); } projection = Some(prj); @@ -240,7 +238,7 @@ impl<'a> CoreReader<'a> { reader_bytes: Some(reader_bytes), schema, projection, - line_number: usize::from(has_header), + current_line: usize::from(has_header), ignore_errors, skip_rows_before_header: skip_rows, skip_rows_after_header, diff --git a/crates/polars-io/src/csv/read/read_impl/batched_mmap.rs b/crates/polars-io/src/csv/read/read_impl/batched_mmap.rs index e3e5e592e34a..637c68a9e349 100644 --- a/crates/polars-io/src/csv/read/read_impl/batched_mmap.rs +++ b/crates/polars-io/src/csv/read/read_impl/batched_mmap.rs @@ -285,7 +285,7 @@ pub struct OwnedBatchedCsvReaderMmap { #[allow(dead_code)] // this exist because we need to keep ownership schema: SchemaRef, - reader: *mut CsvReader<'static, Box>, + reader: *mut CsvReader>, batched_reader: *mut BatchedCsvReaderMmap<'static>, } @@ -310,23 +310,12 @@ impl Drop for OwnedBatchedCsvReaderMmap { } pub fn to_batched_owned_mmap( - reader: CsvReader<'_, Box>, - schema: SchemaRef, + reader: CsvReader>, ) -> OwnedBatchedCsvReaderMmap { - // make sure that the schema is bound to the schema we have - // we will keep ownership of the schema so that the lifetime remains bound to ourselves - let reader = reader.with_schema(Some(schema.clone())); - // extend the lifetime - // the lifetime was bound to schema, which we own and will store on the heap - let reader = unsafe { - std::mem::transmute::< - CsvReader<'_, Box>, - CsvReader<'static, Box>, - >(reader) - }; - let reader = Box::new(reader); + let schema = reader.get_schema().unwrap(); - let reader = Box::leak(reader) as *mut CsvReader<'static, Box>; + let reader = Box::new(reader); + let reader = Box::leak(reader) as *mut CsvReader>; let batched_reader = unsafe { Box::new((*reader).batched_borrowed_mmap().unwrap()) }; let batched_reader = Box::leak(batched_reader) as *mut BatchedCsvReaderMmap; diff --git a/crates/polars-io/src/csv/read/read_impl/batched_read.rs b/crates/polars-io/src/csv/read/read_impl/batched_read.rs index 64e165844e7a..db87593750f9 100644 --- a/crates/polars-io/src/csv/read/read_impl/batched_read.rs +++ b/crates/polars-io/src/csv/read/read_impl/batched_read.rs @@ -391,7 +391,7 @@ pub struct OwnedBatchedCsvReader { #[allow(dead_code)] // this exist because we need to keep ownership schema: SchemaRef, - reader: *mut CsvReader<'static, Box>, + reader: *mut CsvReader>, batched_reader: *mut BatchedCsvReaderRead<'static>, } @@ -415,24 +415,11 @@ impl Drop for OwnedBatchedCsvReader { } } -pub fn to_batched_owned_read( - reader: CsvReader<'_, Box>, - schema: SchemaRef, -) -> OwnedBatchedCsvReader { - // make sure that the schema is bound to the schema we have - // we will keep ownership of the schema so that the lifetime remains bound to ourselves - let reader = reader.with_schema(Some(schema.clone())); - // extend the lifetime - // the lifetime was bound to schema, which we own and will store on the heap - let reader = unsafe { - std::mem::transmute::< - CsvReader<'_, Box>, - CsvReader<'static, Box>, - >(reader) - }; - let reader = Box::new(reader); +pub fn to_batched_owned_read(reader: CsvReader>) -> OwnedBatchedCsvReader { + let schema = reader.get_schema().unwrap(); - let reader = Box::leak(reader) as *mut CsvReader<'static, Box>; + let reader = Box::new(reader); + let reader = Box::leak(reader) as *mut CsvReader>; let batched_reader = unsafe { Box::new((*reader).batched_borrowed_read().unwrap()) }; let batched_reader = Box::leak(batched_reader) as *mut BatchedCsvReaderRead; diff --git a/crates/polars-io/src/csv/read/reader.rs b/crates/polars-io/src/csv/read/reader.rs index 65fcff4b3c47..447c43f471dc 100644 --- a/crates/polars-io/src/csv/read/reader.rs +++ b/crates/polars-io/src/csv/read/reader.rs @@ -8,7 +8,7 @@ use polars_time::prelude::*; use rayon::prelude::*; use super::infer_file_schema; -use super::options::{CommentPrefix, CsvEncoding, CsvReaderOptions, NullValues}; +use super::options::CsvReadOptions; use super::read_impl::batched_mmap::{ to_batched_owned_mmap, BatchedCsvReaderMmap, OwnedBatchedCsvReaderMmap, }; @@ -20,7 +20,6 @@ use crate::mmap::MmapBytesReader; use crate::predicates::PhysicalIoExpr; use crate::shared::SerReader; use crate::utils::{get_reader_bytes, resolve_homedir}; -use crate::RowIndex; /// Create a new DataFrame by reading a csv file. /// @@ -33,307 +32,138 @@ use crate::RowIndex; /// /// fn example() -> PolarsResult { /// CsvReader::from_path("iris.csv")? -/// .has_header(true) +/// .with_has_header(true) /// .finish() /// } /// ``` #[must_use] -pub struct CsvReader<'a, R> +pub struct CsvReader where R: MmapBytesReader, { /// File or Stream object. reader: R, /// Options for the CSV reader. - options: CsvReaderOptions, - /// Stop reading from the csv after this number of rows is reached - n_rows: Option, - /// Optional indexes of the columns to project - projection: Option>, - /// Optional column names to project/ select. - columns: Option>, - path: Option, - dtype_overwrite: Option<&'a [DataType]>, - sample_size: usize, - chunk_size: usize, + options: CsvReadOptions, predicate: Option>, - row_index: Option, - /// Aggregates chunk afterwards to a single chunk. - rechunk: bool, - missing_is_null: bool, } -impl<'a, R> CsvReader<'a, R> +impl CsvReader where - R: 'a + MmapBytesReader, + R: MmapBytesReader, { - /// Skip these rows after the header - pub fn with_options(mut self, options: CsvReaderOptions) -> Self { - self.options = options; - self - } - - /// Sets whether the CSV file has headers - pub fn has_header(mut self, has_header: bool) -> Self { - self.options.has_header = has_header; - self - } - - /// Sets the CSV file's column separator as a byte character - pub fn with_separator(mut self, separator: u8) -> Self { - self.options.separator = separator; - self - } - - /// Sets the `char` used as quote char. The default is `b'"'`. If set to [`None`], quoting is disabled. - pub fn with_quote_char(mut self, quote_char: Option) -> Self { - self.options.quote_char = quote_char; - self - } - - /// Sets the comment prefix for this instance. Lines starting with this prefix will be ignored. - pub fn with_comment_prefix(mut self, comment_prefix: Option<&str>) -> Self { - self.options.comment_prefix = comment_prefix.map(CommentPrefix::new_from_str); - self - } - - /// Sets the comment prefix from `CsvParserOptions` for internal initialization. - pub fn _with_comment_prefix(mut self, comment_prefix: Option) -> Self { - self.options.comment_prefix = comment_prefix; - self - } - - /// Set the `char` used as end-of-line char. The default is `b'\n'`. - pub fn with_end_of_line_char(mut self, eol_char: u8) -> Self { - self.options.eol_char = eol_char; - self - } - - /// Set [`CsvEncoding`]. - pub fn with_encoding(mut self, encoding: CsvEncoding) -> Self { - self.options.encoding = encoding; - self - } - - /// Skip the first `n` rows during parsing. The header will be parsed at `n` lines. - pub fn with_skip_rows(mut self, n: usize) -> Self { - self.options.skip_rows = n; - self - } - - /// Skip these rows after the header - pub fn with_skip_rows_after_header(mut self, n: usize) -> Self { - self.options.skip_rows_after_header = n; - self - } - - /// Set the CSV file's schema. This only accepts datatypes that are implemented - /// in the csv parser and expects a complete Schema. - /// - /// It is recommended to use [with_dtypes](Self::with_dtypes) instead. - pub fn with_schema(mut self, schema: Option) -> Self { - self.options.schema = schema; - self - } - - /// Overwrite the schema with the dtypes in this given Schema. The given schema may be a subset - /// of the total schema. - pub fn with_dtypes(mut self, schema: Option) -> Self { - self.options.schema_overwrite = schema; - self - } - - /// Set the CSV reader to infer the schema of the file - /// - /// # Arguments - /// * `n` - Maximum number of rows read for schema inference. - /// Setting this to `None` will do a full table scan (slow). - pub fn infer_schema(mut self, n: Option) -> Self { - // used by error ignore logic - self.options.infer_schema_length = n; - self - } - - /// Automatically try to parse dates/ datetimes and time. If parsing fails, columns remain of dtype `[DataType::String]`. - pub fn with_try_parse_dates(mut self, toggle: bool) -> Self { - self.options.try_parse_dates = toggle; - self - } - - /// Set values that will be interpreted as missing/null. - /// - /// Note: any value you set as null value will not be escaped, so if quotation marks - /// are part of the null value you should include them. - pub fn with_null_values(mut self, null_values: Option) -> Self { - self.options.null_values = null_values; - self - } - - /// Continue with next batch when a ParserError is encountered. - pub fn with_ignore_errors(mut self, toggle: bool) -> Self { - self.options.ignore_errors = toggle; - self - } - - /// Raise an error if CSV is empty (otherwise return an empty frame) - pub fn raise_if_empty(mut self, toggle: bool) -> Self { - self.options.raise_if_empty = toggle; + pub fn _with_predicate(mut self, predicate: Option>) -> Self { + self.predicate = predicate; self } - /// Truncate lines that are longer than the schema. - pub fn truncate_ragged_lines(mut self, toggle: bool) -> Self { - self.options.truncate_ragged_lines = toggle; + // TODO: Investigate if we can remove this + pub(crate) fn with_schema(mut self, schema: SchemaRef) -> Self { + self.options.schema = Some(schema); self } - /// Parse floats with a comma as decimal separator. - pub fn with_decimal_comma(mut self, toggle: bool) -> Self { - self.options.decimal_comma = toggle; - self + // TODO: Investigate if we can remove this + pub(crate) fn get_schema(&self) -> Option { + self.options.schema.clone() } +} - /// Set the number of threads used in CSV reading. The default uses the number of cores of - /// your cpu. +impl CsvReadOptions { + /// Creates a CSV reader using a file path. /// - /// Note that this only works if this is initialized with `CsvReader::from_path`. - /// Note that the number of cores is the maximum allowed number of threads. - pub fn with_n_threads(mut self, n: Option) -> Self { - self.options.n_threads = n; - self - } - - /// Reduce memory consumption at the expense of performance - pub fn low_memory(mut self, toggle: bool) -> Self { - self.options.low_memory = toggle; - self - } - - /// Add a row index column. - pub fn with_row_index(mut self, row_index: Option) -> Self { - self.row_index = row_index; - self - } - - /// Sets the chunk size used by the parser. This influences performance - pub fn with_chunk_size(mut self, chunk_size: usize) -> Self { - self.chunk_size = chunk_size; - self - } - - /// Try to stop parsing when `n` rows are parsed. During multithreaded parsing the upper bound `n` cannot - /// be guaranteed. - pub fn with_n_rows(mut self, num_rows: Option) -> Self { - self.n_rows = num_rows; - self - } - - /// Rechunk the DataFrame to contiguous memory after the CSV is parsed. - pub fn with_rechunk(mut self, rechunk: bool) -> Self { - self.rechunk = rechunk; - self - } - - /// Treat missing fields as null. - pub fn with_missing_is_null(mut self, missing_is_null: bool) -> Self { - self.missing_is_null = missing_is_null; - self - } - - /// Overwrite the dtypes in the schema in the order of the slice that's given. - /// This is useful if you don't know the column names beforehand - pub fn with_dtypes_slice(mut self, dtypes: Option<&'a [DataType]>) -> Self { - self.dtype_overwrite = dtypes; - self - } - - /// Set the reader's column projection. This counts from 0, meaning that - /// `vec![0, 4]` would select the 1st and 5th column. - pub fn with_projection(mut self, projection: Option>) -> Self { - self.projection = projection; - self - } + /// # Panics + /// If both self.path and the path parameter are non-null. Only one of them is + /// to be non-null. + pub fn try_into_reader_with_file_path( + mut self, + path: Option, + ) -> PolarsResult> { + if self.path.is_some() { + assert!( + path.is_none(), + "impl error: only 1 of self.path or the path parameter is to be non-null" + ); + } else { + self.path = path; + }; - /// Columns to select/ project - pub fn with_columns(mut self, columns: Option>) -> Self { - self.columns = columns; - self - } + assert!( + self.path.is_some(), + "impl error: either one of self.path or the path parameter is to be non-null" + ); - /// The preferred way to initialize this builder. This allows the CSV file to be memory mapped - /// and thereby greatly increases parsing performance. - pub fn with_path>(mut self, path: Option

) -> Self { - self.path = path.map(|p| p.into()); - self - } + let path = resolve_homedir(self.path.as_ref().unwrap()); + let reader = polars_utils::open_file(path)?; + let options = self; - /// Sets the size of the sample taken from the CSV file. The sample is used to get statistic about - /// the file. These statistics are used to try to optimally allocate up front. Increasing this may - /// improve performance. - pub fn sample_size(mut self, size: usize) -> Self { - self.sample_size = size; - self + Ok(CsvReader { + reader, + options, + predicate: None, + }) } - pub fn with_predicate(mut self, predicate: Option>) -> Self { - self.predicate = predicate; - self - } -} + /// Creates a CSV reader using a file handle. + pub fn into_reader_with_file_handle(self, reader: R) -> CsvReader { + let options = self; -impl<'a> CsvReader<'a, File> { - /// This is the recommended way to create a csv reader as this allows for fastest parsing. - pub fn from_path>(path: P) -> PolarsResult { - let path = resolve_homedir(&path.into()); - let f = polars_utils::open_file(&path)?; - Ok(Self::new(f).with_path(Some(path))) + CsvReader { + reader, + options, + predicate: Default::default(), + } } } -impl<'a, R: MmapBytesReader + 'a> CsvReader<'a, R> { - fn core_reader<'b>( - &'b mut self, +impl CsvReader { + fn core_reader( + &mut self, schema: Option, to_cast: Vec, - ) -> PolarsResult> - where - 'a: 'b, - { + ) -> PolarsResult { let reader_bytes = get_reader_bytes(&mut self.reader)?; + + let parse_options = self.options.get_parse_options(); + CoreReader::new( reader_bytes, - self.n_rows, + self.options.n_rows, self.options.skip_rows, - std::mem::take(&mut self.projection), + self.options.projection.clone().map(|x| x.as_ref().clone()), self.options.infer_schema_length, - Some(self.options.separator), + Some(parse_options.separator), self.options.has_header, self.options.ignore_errors, self.options.schema.clone(), - std::mem::take(&mut self.columns), - self.options.encoding, + self.options.columns.clone(), + parse_options.encoding, self.options.n_threads, schema, - self.dtype_overwrite, - self.sample_size, - self.chunk_size, + self.options.dtype_overwrite.clone(), + self.options.sample_size, + self.options.chunk_size, self.options.low_memory, - std::mem::take(&mut self.options.comment_prefix), - self.options.quote_char, - self.options.eol_char, - std::mem::take(&mut self.options.null_values), - self.missing_is_null, - std::mem::take(&mut self.predicate), + parse_options.comment_prefix.clone(), + parse_options.quote_char, + parse_options.eol_char, + parse_options.null_values.clone(), + parse_options.missing_is_null, + self.predicate.clone(), to_cast, self.options.skip_rows_after_header, - std::mem::take(&mut self.row_index), - self.options.try_parse_dates, + self.options.row_index.clone(), + parse_options.try_parse_dates, self.options.raise_if_empty, - self.options.truncate_ragged_lines, - self.options.decimal_comma, + parse_options.truncate_ragged_lines, + parse_options.decimal_comma, ) } + // TODO: + // * Move this step outside of the reader so that we don't do it multiple times + // when we read a file list. + // * See if we can avoid constructing a filtered schema. fn prepare_schema_overwrite( &self, overwriting_schema: &Schema, @@ -387,7 +217,7 @@ impl<'a, R: MmapBytesReader + 'a> CsvReader<'a, R> { } } - pub fn batched_borrowed_mmap(&'a mut self) -> PolarsResult> { + pub fn batched_borrowed_mmap(&mut self) -> PolarsResult { if let Some(schema) = self.options.schema_overwrite.as_deref() { let (schema, to_cast, has_cat) = self.prepare_schema_overwrite(schema)?; let schema = Arc::new(schema); @@ -399,7 +229,7 @@ impl<'a, R: MmapBytesReader + 'a> CsvReader<'a, R> { csv_reader.batched_mmap(false) } } - pub fn batched_borrowed_read(&'a mut self) -> PolarsResult> { + pub fn batched_borrowed_read(&mut self) -> PolarsResult { if let Some(schema) = self.options.schema_overwrite.as_deref() { let (schema, to_cast, has_cat) = self.prepare_schema_overwrite(schema)?; let schema = Arc::new(schema); @@ -413,35 +243,36 @@ impl<'a, R: MmapBytesReader + 'a> CsvReader<'a, R> { } } -impl<'a> CsvReader<'a, Box> { +impl CsvReader> { pub fn batched_mmap( mut self, schema: Option, ) -> PolarsResult { match schema { - Some(schema) => Ok(to_batched_owned_mmap(self, schema)), + Some(schema) => Ok(to_batched_owned_mmap(self.with_schema(schema))), None => { + let parse_options = self.options.get_parse_options(); let reader_bytes = get_reader_bytes(&mut self.reader)?; let (inferred_schema, _, _) = infer_file_schema( &reader_bytes, - self.options.separator, + parse_options.separator, self.options.infer_schema_length, self.options.has_header, None, &mut self.options.skip_rows, self.options.skip_rows_after_header, - self.options.comment_prefix.as_ref(), - self.options.quote_char, - self.options.eol_char, - self.options.null_values.as_ref(), - self.options.try_parse_dates, + parse_options.comment_prefix.as_ref(), + parse_options.quote_char, + parse_options.eol_char, + parse_options.null_values.as_ref(), + parse_options.try_parse_dates, self.options.raise_if_empty, &mut self.options.n_threads, - self.options.decimal_comma, + parse_options.decimal_comma, )?; let schema = Arc::new(inferred_schema); - Ok(to_batched_owned_mmap(self, schema)) + Ok(to_batched_owned_mmap(self.with_schema(schema))) }, } } @@ -450,60 +281,52 @@ impl<'a> CsvReader<'a, Box> { schema: Option, ) -> PolarsResult { match schema { - Some(schema) => Ok(to_batched_owned_read(self, schema)), + Some(schema) => Ok(to_batched_owned_read(self.with_schema(schema))), None => { let reader_bytes = get_reader_bytes(&mut self.reader)?; + let parse_options = self.options.get_parse_options(); + let (inferred_schema, _, _) = infer_file_schema( &reader_bytes, - self.options.separator, + parse_options.separator, self.options.infer_schema_length, self.options.has_header, None, &mut self.options.skip_rows, self.options.skip_rows_after_header, - self.options.comment_prefix.as_ref(), - self.options.quote_char, - self.options.eol_char, - self.options.null_values.as_ref(), - self.options.try_parse_dates, + parse_options.comment_prefix.as_ref(), + parse_options.quote_char, + parse_options.eol_char, + parse_options.null_values.as_ref(), + parse_options.try_parse_dates, self.options.raise_if_empty, &mut self.options.n_threads, - self.options.decimal_comma, + parse_options.decimal_comma, )?; let schema = Arc::new(inferred_schema); - Ok(to_batched_owned_read(self, schema)) + Ok(to_batched_owned_read(self.with_schema(schema))) }, } } } -impl<'a, R> SerReader for CsvReader<'a, R> +impl SerReader for CsvReader where - R: MmapBytesReader + 'a, + R: MmapBytesReader, { /// Create a new CsvReader from a file/stream. fn new(reader: R) -> Self { CsvReader { reader, - options: CsvReaderOptions::default(), - rechunk: true, - n_rows: None, - projection: None, - columns: None, - path: None, - dtype_overwrite: None, - sample_size: 1024, - chunk_size: 1 << 18, - missing_is_null: true, + options: Default::default(), predicate: None, - row_index: None, } } /// Read the file and create the DataFrame. fn finish(mut self) -> PolarsResult { - let rechunk = self.rechunk; + let rechunk = self.options.rechunk; let schema_overwrite = self.options.schema_overwrite.clone(); let low_memory = self.options.low_memory; @@ -552,24 +375,29 @@ where } #[cfg(feature = "temporal")] - // only needed until we also can parse time columns in place - if self.options.try_parse_dates { - // determine the schema that's given by the user. That should not be changed - let fixed_schema = match (schema_overwrite, self.dtype_overwrite) { - (Some(schema), _) => schema, - (None, Some(dtypes)) => { - let schema = dtypes - .iter() - .zip(df.get_column_names()) - .map(|(dtype, name)| Field::new(name, dtype.clone())) - .collect::(); - - Arc::new(schema) - }, - _ => Arc::default(), - }; - df = parse_dates(df, &fixed_schema) + { + let parse_options = self.options.get_parse_options(); + + // only needed until we also can parse time columns in place + if parse_options.try_parse_dates { + // determine the schema that's given by the user. That should not be changed + let fixed_schema = match (schema_overwrite, self.options.dtype_overwrite) { + (Some(schema), _) => schema, + (None, Some(dtypes)) => { + let schema = dtypes + .iter() + .zip(df.get_column_names()) + .map(|(dtype, name)| Field::new(name, dtype.clone())) + .collect::(); + + Arc::new(schema) + }, + _ => Arc::default(), + }; + df = parse_dates(df, &fixed_schema) + } } + Ok(df) } } diff --git a/crates/polars-io/src/ipc/ipc_reader_async.rs b/crates/polars-io/src/ipc/ipc_reader_async.rs index c2d526c4fb9a..dc3883e89a2a 100644 --- a/crates/polars-io/src/ipc/ipc_reader_async.rs +++ b/crates/polars-io/src/ipc/ipc_reader_async.rs @@ -143,7 +143,7 @@ impl IpcReaderAsync { Some(projection) => { fn prepare_schema(mut schema: Schema, row_index: Option<&RowIndex>) -> Schema { if let Some(rc) = row_index { - let _ = schema.insert_at_index(0, rc.name.as_str().into(), IDX_DTYPE); + let _ = schema.insert_at_index(0, rc.name.as_ref().into(), IDX_DTYPE); } schema } diff --git a/crates/polars-io/src/options.rs b/crates/polars-io/src/options.rs index 995bf5ec2904..606e7b46536e 100644 --- a/crates/polars-io/src/options.rs +++ b/crates/polars-io/src/options.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use polars_core::schema::SchemaRef; use polars_utils::IdxSize; #[cfg(feature = "serde")] @@ -6,7 +8,7 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Eq, PartialEq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct RowIndex { - pub name: String, + pub name: Arc, pub offset: IdxSize, } diff --git a/crates/polars-io/src/parquet/read/reader.rs b/crates/polars-io/src/parquet/read/reader.rs index 60760e2bb1af..97d96634be54 100644 --- a/crates/polars-io/src/parquet/read/reader.rs +++ b/crates/polars-io/src/parquet/read/reader.rs @@ -55,8 +55,7 @@ impl ParquetReader { self } - /// Stop parsing when `n` rows are parsed. By settings this parameter the csv will be parsed - /// sequentially. + /// Stop reading at `num_rows` rows. pub fn with_n_rows(mut self, num_rows: Option) -> Self { self.n_rows = num_rows; self diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index 68735a69052b..a71057859fb4 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -1602,7 +1602,7 @@ impl LazyFrame { .. } if !matches!(scan_type, FileScan::Anonymous { .. }) => { options.row_index = Some(RowIndex { - name: name.to_string(), + name: Arc::from(name), offset: offset.unwrap_or(0), }); false diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/csv.rs b/crates/polars-lazy/src/physical_plan/executors/scan/csv.rs index 69a8df57c41c..9e568363be27 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/csv.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/csv.rs @@ -1,54 +1,52 @@ use std::path::PathBuf; +use std::sync::Arc; use super::*; pub struct CsvExec { pub path: PathBuf, pub file_info: FileInfo, - pub options: CsvReaderOptions, + pub options: CsvReadOptions, pub file_options: FileScanOptions, pub predicate: Option>, } impl CsvExec { - fn read(&mut self) -> PolarsResult { + fn read(&self) -> PolarsResult { let with_columns = self .file_options .with_columns - .take() + .clone() // Interpret selecting no columns as selecting all columns. - .filter(|columns| !columns.is_empty()) - .map(Arc::unwrap_or_clone); + .filter(|columns| !columns.is_empty()); let n_rows = _set_n_rows_for_scan(self.file_options.n_rows); let predicate = self.predicate.clone().map(phys_expr_to_io_expr); - CsvReader::from_path(&self.path) - .unwrap() - .has_header(self.options.has_header) + self.options + .clone() + .with_skip_rows_after_header( + // If we don't set it to 0 here, it will skip double the amount of rows. + // But if we set it to 0, it will still skip the requested amount of rows. + // The reason I currently cannot fathom. + // TODO: Find out why + 0, + ) .with_schema(Some( self.file_info.reader_schema.clone().unwrap().unwrap_right(), )) - .with_separator(self.options.separator) - .with_ignore_errors(self.options.ignore_errors) - .with_skip_rows(self.options.skip_rows) .with_n_rows(n_rows) .with_columns(with_columns) - .low_memory(self.options.low_memory) - .with_null_values(std::mem::take(&mut self.options.null_values)) - .with_predicate(predicate) - .with_encoding(CsvEncoding::LossyUtf8) - ._with_comment_prefix(std::mem::take(&mut self.options.comment_prefix)) - .with_quote_char(self.options.quote_char) - .with_end_of_line_char(self.options.eol_char) - .with_encoding(self.options.encoding) .with_rechunk(self.file_options.rechunk) - .with_row_index(std::mem::take(&mut self.file_options.row_index)) - .with_try_parse_dates(self.options.try_parse_dates) - .with_n_threads(self.options.n_threads) - .truncate_ragged_lines(self.options.truncate_ragged_lines) - .with_decimal_comma(self.options.decimal_comma) - .raise_if_empty(self.options.raise_if_empty) + .with_row_index(self.file_options.row_index.clone()) + .with_parse_options( + // TODO: Confirm why we set lossy utf8 here. + Arc::unwrap_or_clone(self.options.parse_options.clone()) + .with_encoding(CsvEncoding::LossyUtf8), + ) + .with_path(Some(self.path.clone())) + .try_into_reader_with_file_path(None)? + ._with_predicate(predicate) .finish() } } diff --git a/crates/polars-lazy/src/scan/csv.rs b/crates/polars-lazy/src/scan/csv.rs index 86373dbfa0e3..fb0051fbc664 100644 --- a/crates/polars-lazy/src/scan/csv.rs +++ b/crates/polars-lazy/src/scan/csv.rs @@ -10,6 +10,7 @@ use crate::prelude::*; #[derive(Clone)] #[cfg(feature = "csv")] pub struct LazyCsvReader { + // TODO: Use CsvReadOptions here. path: PathBuf, paths: Arc<[PathBuf]>, separator: u8, @@ -139,7 +140,7 @@ impl LazyCsvReader { /// Set whether the CSV file has headers #[must_use] - pub fn has_header(mut self, has_header: bool) -> Self { + pub fn with_has_header(mut self, has_header: bool) -> Self { self.has_header = has_header; self } @@ -158,7 +159,7 @@ impl LazyCsvReader { if s.len() == 1 && s.chars().next().unwrap().is_ascii() { CommentPrefix::Single(s.as_bytes()[0]) } else { - CommentPrefix::Multi(s.to_string()) + CommentPrefix::Multi(Arc::from(s)) } }); self @@ -173,7 +174,7 @@ impl LazyCsvReader { /// Set the `char` used as end of line. The default is `b'\n'`. #[must_use] - pub fn with_end_of_line_char(mut self, eol_char: u8) -> Self { + pub fn with_eol_char(mut self, eol_char: u8) -> Self { self.eol_char = eol_char; self } @@ -200,7 +201,7 @@ impl LazyCsvReader { /// Reduce memory usage at the expense of performance #[must_use] - pub fn low_memory(mut self, toggle: bool) -> Self { + pub fn with_low_memory(mut self, toggle: bool) -> Self { self.low_memory = toggle; self } @@ -222,14 +223,14 @@ impl LazyCsvReader { /// Raise an error if CSV is empty (otherwise return an empty frame) #[must_use] - pub fn raise_if_empty(mut self, toggle: bool) -> Self { + pub fn with_raise_if_empty(mut self, toggle: bool) -> Self { self.raise_if_empty = toggle; self } /// Truncate lines that are longer than the schema. #[must_use] - pub fn truncate_ragged_lines(mut self, toggle: bool) -> Self { + pub fn with_truncate_ragged_lines(mut self, toggle: bool) -> Self { self.truncate_ragged_lines = toggle; self } diff --git a/crates/polars-lazy/src/tests/io.rs b/crates/polars-lazy/src/tests/io.rs index 19095e44536f..de53c8e398a2 100644 --- a/crates/polars-lazy/src/tests/io.rs +++ b/crates/polars-lazy/src/tests/io.rs @@ -586,7 +586,7 @@ fn test_row_index_on_files() -> PolarsResult<()> { for offset in [0 as IdxSize, 10] { let lf = LazyCsvReader::new(FOODS_CSV) .with_row_index(Some(RowIndex { - name: "index".into(), + name: Arc::from("index"), offset, })) .finish()?; @@ -671,7 +671,7 @@ fn scan_small_dtypes() -> PolarsResult<()> { ]; for dt in small_dt { let df = LazyCsvReader::new(FOODS_CSV) - .has_header(true) + .with_has_header(true) .with_dtype_overwrite(Some(Arc::new(Schema::from_iter([Field::new( "sugars_g", dt.clone(), diff --git a/crates/polars-lazy/src/tests/mod.rs b/crates/polars-lazy/src/tests/mod.rs index 956fec468707..63126aebb584 100644 --- a/crates/polars-lazy/src/tests/mod.rs +++ b/crates/polars-lazy/src/tests/mod.rs @@ -82,7 +82,11 @@ fn init_files() { let out_path = path.replace(".csv", ext); if std::fs::metadata(&out_path).is_err() { - let mut df = CsvReader::from_path(path).unwrap().finish().unwrap(); + let mut df = CsvReadOptions::default() + .try_into_reader_with_file_path(Some(path.into())) + .unwrap() + .finish() + .unwrap(); let f = std::fs::File::create(&out_path).unwrap(); match ext { @@ -175,10 +179,10 @@ pub(crate) fn get_df() -> DataFrame { let file = Cursor::new(s); - let df = CsvReader::new(file) - // we also check if infer schema ignores errors - .infer_schema(Some(3)) - .has_header(true) + let df = CsvReadOptions::default() + .with_infer_schema_length(Some(3)) + .with_has_header(true) + .into_reader_with_file_handle(file) .finish() .unwrap(); df diff --git a/crates/polars-pipe/src/executors/sources/csv.rs b/crates/polars-pipe/src/executors/sources/csv.rs index 8d49bcb9ea41..da134df9a7fb 100644 --- a/crates/polars-pipe/src/executors/sources/csv.rs +++ b/crates/polars-pipe/src/executors/sources/csv.rs @@ -1,10 +1,11 @@ use std::fs::File; use std::path::PathBuf; +use std::sync::Arc; use polars_core::export::arrow::Either; use polars_core::POOL; use polars_io::csv::read::{ - BatchedCsvReaderMmap, BatchedCsvReaderRead, CsvEncoding, CsvReader, CsvReaderOptions, + BatchedCsvReaderMmap, BatchedCsvReaderRead, CsvEncoding, CsvReadOptions, CsvReader, }; use polars_plan::global::_set_n_rows_for_scan; use polars_plan::prelude::FileScanOptions; @@ -17,12 +18,12 @@ pub(crate) struct CsvSource { #[allow(dead_code)] // this exist because we need to keep ownership schema: SchemaRef, - reader: Option<*mut CsvReader<'static, File>>, + reader: Option<*mut CsvReader>, batched_reader: Option, *mut BatchedCsvReaderRead<'static>>>, n_threads: usize, path: Option, - options: Option, + options: Option, file_options: Option, verbose: bool, } @@ -60,36 +61,26 @@ impl CsvSource { eprintln!("STREAMING CHUNK SIZE: {chunk_size} rows") } - let reader = CsvReader::from_path(&path) - .unwrap() - .has_header(options.has_header) - .with_dtypes(Some(self.schema.clone())) - .with_separator(options.separator) - .with_ignore_errors(options.ignore_errors) - .with_skip_rows(options.skip_rows) + let low_memory = options.low_memory; + let parse_options = Arc::unwrap_or_clone(options.clone().parse_options); + + let reader: CsvReader = options + .with_schema_overwrite(Some(self.schema.clone())) .with_n_rows(n_rows) - .with_columns(with_columns.map(|mut cols| std::mem::take(Arc::make_mut(&mut cols)))) - .low_memory(options.low_memory) - .with_null_values(options.null_values) - .with_encoding(CsvEncoding::LossyUtf8) - ._with_comment_prefix(options.comment_prefix) - .with_quote_char(options.quote_char) - .with_end_of_line_char(options.eol_char) - .with_encoding(options.encoding) - // never rechunk in streaming + .with_columns(with_columns) .with_rechunk(false) - .with_chunk_size(chunk_size) .with_row_index(file_options.row_index) - .with_n_threads(options.n_threads) - .with_try_parse_dates(options.try_parse_dates) - .truncate_ragged_lines(options.truncate_ragged_lines) - .with_decimal_comma(options.decimal_comma) - .raise_if_empty(options.raise_if_empty); + .with_parse_options(parse_options.with_encoding( + // TODO: Confirm why we set lossy utf8 here. + CsvEncoding::LossyUtf8, + )) + .with_path(Some(path)) + .try_into_reader_with_file_path(None)?; let reader = Box::new(reader); - let reader = Box::leak(reader) as *mut CsvReader<'static, File>; + let reader = Box::leak(reader) as *mut CsvReader; - let batched_reader = if options.low_memory { + let batched_reader = if low_memory { let batched_reader = unsafe { Box::new((*reader).batched_borrowed_read()?) }; let batched_reader = Box::leak(batched_reader) as *mut BatchedCsvReaderRead; Either::Right(batched_reader) @@ -106,7 +97,7 @@ impl CsvSource { pub(crate) fn new( path: PathBuf, schema: SchemaRef, - options: CsvReaderOptions, + options: CsvReadOptions, file_options: FileScanOptions, verbose: bool, ) -> PolarsResult { diff --git a/crates/polars-plan/src/logical_plan/builder_dsl.rs b/crates/polars-plan/src/logical_plan/builder_dsl.rs index fafcdfc4286f..cf0bd1aae92c 100644 --- a/crates/polars-plan/src/logical_plan/builder_dsl.rs +++ b/crates/polars-plan/src/logical_plan/builder_dsl.rs @@ -2,7 +2,9 @@ use polars_core::prelude::*; #[cfg(feature = "parquet")] use polars_io::cloud::CloudOptions; #[cfg(feature = "csv")] -use polars_io::csv::read::{CommentPrefix, CsvEncoding, CsvReaderOptions, NullValues}; +use polars_io::csv::read::{ + CommentPrefix, CsvEncoding, CsvParseOptions, CsvReadOptions, NullValues, +}; #[cfg(feature = "ipc")] use polars_io::ipc::IpcScanOptions; #[cfg(feature = "parquet")] @@ -216,27 +218,29 @@ impl DslBuilder { file_options: options, predicate: None, scan_type: FileScan::Csv { - options: CsvReaderOptions { - has_header, - separator, - ignore_errors, - skip_rows, - low_memory, - comment_prefix, - quote_char, - eol_char, - null_values, - encoding, - try_parse_dates, - raise_if_empty, - truncate_ragged_lines, - n_threads, - schema, - schema_overwrite, - skip_rows_after_header, - infer_schema_length, - decimal_comma, - }, + options: CsvReadOptions::default() + .with_has_header(has_header) + .with_ignore_errors(ignore_errors) + .with_skip_rows(skip_rows) + .with_low_memory(low_memory) + .with_raise_if_empty(raise_if_empty) + .with_n_threads(n_threads) + .with_schema(schema) + .with_schema_overwrite(schema_overwrite) + .with_skip_rows_after_header(skip_rows_after_header) + .with_infer_schema_length(infer_schema_length) + .with_parse_options( + CsvParseOptions::default() + .with_separator(separator) + .with_comment_prefix(comment_prefix) + .with_quote_char(quote_char) + .with_eol_char(eol_char) + .with_null_values(null_values) + .with_encoding(encoding) + .with_try_parse_dates(try_parse_dates) + .with_truncate_ragged_lines(truncate_ragged_lines) + .with_decimal_comma(decimal_comma), + ), }, } .into()) diff --git a/crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs b/crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs index e2a2d514a4aa..9989ccff910c 100644 --- a/crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs +++ b/crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs @@ -128,7 +128,7 @@ pub fn to_alp_impl( if let Some(row_index) = &file_options.row_index { let schema = Arc::make_mut(&mut file_info.schema); *schema = schema - .new_inserting_at_index(0, row_index.name.as_str().into(), IDX_DTYPE) + .new_inserting_at_index(0, row_index.name.as_ref().into(), IDX_DTYPE) .unwrap(); } diff --git a/crates/polars-plan/src/logical_plan/conversion/scans.rs b/crates/polars-plan/src/logical_plan/conversion/scans.rs index 7d03fa9b7d56..c70cf61382f8 100644 --- a/crates/polars-plan/src/logical_plan/conversion/scans.rs +++ b/crates/polars-plan/src/logical_plan/conversion/scans.rs @@ -20,7 +20,7 @@ fn get_path(paths: &[PathBuf]) -> PolarsResult<&PathBuf> { #[cfg(any(feature = "parquet", feature = "parquet_async",))] fn prepare_schema(mut schema: Schema, row_index: Option<&RowIndex>) -> SchemaRef { if let Some(rc) = row_index { - let _ = schema.insert_at_index(0, rc.name.as_str().into(), IDX_DTYPE); + let _ = schema.insert_at_index(0, rc.name.as_ref().into(), IDX_DTYPE); } Arc::new(schema) } @@ -122,7 +122,7 @@ pub(super) fn ipc_file_info( pub(super) fn csv_file_info( paths: &[PathBuf], file_options: &FileScanOptions, - csv_options: &mut CsvReaderOptions, + csv_options: &mut CsvReadOptions, ) -> PolarsResult { use std::io::Seek; @@ -148,23 +148,25 @@ pub(super) fn csv_file_info( file.rewind()?; let reader_bytes = get_reader_bytes(&mut file).expect("could not mmap file"); + let parse_options = csv_options.get_parse_options(); + // this needs a way to estimated bytes/rows. let (inferred_schema, rows_read, bytes_read) = infer_file_schema( &reader_bytes, - csv_options.separator, + parse_options.separator, csv_options.infer_schema_length, csv_options.has_header, csv_options.schema_overwrite.as_deref(), &mut csv_options.skip_rows, csv_options.skip_rows_after_header, - csv_options.comment_prefix.as_ref(), - csv_options.quote_char, - csv_options.eol_char, - csv_options.null_values.as_ref(), - csv_options.try_parse_dates, + parse_options.comment_prefix.as_ref(), + parse_options.quote_char, + parse_options.eol_char, + parse_options.null_values.as_ref(), + parse_options.try_parse_dates, csv_options.raise_if_empty, &mut csv_options.n_threads, - csv_options.decimal_comma, + parse_options.decimal_comma, )?; let mut schema = csv_options @@ -175,7 +177,7 @@ pub(super) fn csv_file_info( let reader_schema = if let Some(rc) = &file_options.row_index { let reader_schema = schema.clone(); let mut output_schema = (*reader_schema).clone(); - output_schema.insert_at_index(0, rc.name.as_str().into(), IDX_DTYPE)?; + output_schema.insert_at_index(0, rc.name.as_ref().into(), IDX_DTYPE)?; schema = Arc::new(output_schema); reader_schema } else { diff --git a/crates/polars-plan/src/logical_plan/file_scan.rs b/crates/polars-plan/src/logical_plan/file_scan.rs index 2777ad8a5e1b..94295b1c0db1 100644 --- a/crates/polars-plan/src/logical_plan/file_scan.rs +++ b/crates/polars-plan/src/logical_plan/file_scan.rs @@ -1,7 +1,7 @@ use std::hash::{Hash, Hasher}; #[cfg(feature = "csv")] -use polars_io::csv::read::CsvReaderOptions; +use polars_io::csv::read::CsvReadOptions; #[cfg(feature = "ipc")] use polars_io::ipc::IpcScanOptions; #[cfg(feature = "parquet")] @@ -15,7 +15,7 @@ use super::*; #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum FileScan { #[cfg(feature = "csv")] - Csv { options: CsvReaderOptions }, + Csv { options: CsvReadOptions }, #[cfg(feature = "parquet")] Parquet { options: ParquetOptions, diff --git a/crates/polars-plan/src/logical_plan/functions/count.rs b/crates/polars-plan/src/logical_plan/functions/count.rs index c1538aacfa64..7c8def715ef4 100644 --- a/crates/polars-plan/src/logical_plan/functions/count.rs +++ b/crates/polars-plan/src/logical_plan/functions/count.rs @@ -22,15 +22,16 @@ pub fn count_rows(paths: &Arc<[PathBuf]>, scan_type: &FileScan) -> PolarsResult< match scan_type { #[cfg(feature = "csv")] FileScan::Csv { options } => { + let parse_options = options.get_parse_options(); let n_rows: PolarsResult = paths .iter() .map(|path| { count_rows_csv( path, - options.separator, - options.quote_char, - options.comment_prefix.as_ref(), - options.eol_char, + parse_options.separator, + parse_options.quote_char, + parse_options.comment_prefix.as_ref(), + parse_options.eol_char, options.has_header, ) }) diff --git a/crates/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs b/crates/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs index 4477e1176d64..d6ec2e01ca64 100644 --- a/crates/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs +++ b/crates/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs @@ -342,12 +342,9 @@ impl<'a> PredicatePushDown<'a> { // not update the row index properly before applying the // predicate (e.g. FileScan::Csv doesn't). if let Some(ref row_index) = options.row_index { - let row_index_predicates = transfer_to_local_by_name( - expr_arena, - &mut acc_predicates, - |name| name.as_ref() == row_index.name, - ); - row_index_predicates + transfer_to_local_by_name(expr_arena, &mut acc_predicates, |name| { + name == row_index.name + }) } else { vec![] } diff --git a/crates/polars-plan/src/logical_plan/optimizer/projection_pushdown/mod.rs b/crates/polars-plan/src/logical_plan/optimizer/projection_pushdown/mod.rs index f39d46ab7452..56a3f74a1b37 100644 --- a/crates/polars-plan/src/logical_plan/optimizer/projection_pushdown/mod.rs +++ b/crates/polars-plan/src/logical_plan/optimizer/projection_pushdown/mod.rs @@ -47,7 +47,7 @@ fn get_scan_columns( // we shouldn't project the row-count column, as that is generated // in the scan let push = match row_index { - Some(rc) if name.as_ref() != rc.name.as_str() => true, + Some(rc) if name != rc.name => true, None => true, _ => false, }; diff --git a/py-polars/src/batched_csv.rs b/py-polars/src/batched_csv.rs index 3cd892f2d13e..c547da5dd94c 100644 --- a/py-polars/src/batched_csv.rs +++ b/py-polars/src/batched_csv.rs @@ -64,7 +64,10 @@ impl PyBatchedCsv { ) -> PyResult { let null_values = null_values.map(|w| w.0); let eol_char = eol_char.as_bytes()[0]; - let row_index = row_index.map(|(name, offset)| RowIndex { name, offset }); + let row_index = row_index.map(|(name, offset)| RowIndex { + name: Arc::from(name.as_str()), + offset, + }); let quote_char = if let Some(s) = quote_char { if s.is_empty() { None @@ -94,33 +97,37 @@ impl PyBatchedCsv { let file = std::fs::File::open(path).map_err(PyPolarsErr::from)?; let reader = Box::new(file) as Box; - let reader = CsvReader::new(reader) - .infer_schema(infer_schema_length) - .has_header(has_header) + let reader = CsvReadOptions::default() + .with_infer_schema_length(infer_schema_length) + .with_has_header(has_header) .with_n_rows(n_rows) - .with_separator(separator.as_bytes()[0]) .with_skip_rows(skip_rows) .with_ignore_errors(ignore_errors) - .with_projection(projection) + .with_projection(projection.map(Arc::new)) .with_rechunk(rechunk) .with_chunk_size(chunk_size) - .with_encoding(encoding.0) - .with_columns(columns) + .with_columns(columns.map(Arc::new)) .with_n_threads(n_threads) - .with_dtypes_slice(overwrite_dtype_slice.as_deref()) - .with_missing_is_null(!missing_utf8_is_empty_string) - .low_memory(low_memory) - .with_comment_prefix(comment_prefix) - .with_null_values(null_values) - .with_try_parse_dates(try_parse_dates) - .with_quote_char(quote_char) - .with_end_of_line_char(eol_char) + .with_dtype_overwrite(overwrite_dtype_slice.map(Arc::new)) + .with_low_memory(low_memory) .with_skip_rows_after_header(skip_rows_after_header) .with_row_index(row_index) - .sample_size(sample_size) - .truncate_ragged_lines(truncate_ragged_lines) - .with_decimal_comma(decimal_comma) - .raise_if_empty(raise_if_empty); + .with_sample_size(sample_size) + .with_raise_if_empty(raise_if_empty) + .with_parse_options( + CsvParseOptions::default() + .with_separator(separator.as_bytes()[0]) + .with_encoding(encoding.0) + .with_missing_is_null(!missing_utf8_is_empty_string) + .with_comment_prefix(comment_prefix) + .with_null_values(null_values) + .with_try_parse_dates(try_parse_dates) + .with_quote_char(quote_char) + .with_eol_char(eol_char) + .with_truncate_ragged_lines(truncate_ragged_lines) + .with_decimal_comma(decimal_comma), + ) + .into_reader_with_file_handle(reader); let reader = if low_memory { let reader = reader diff --git a/py-polars/src/dataframe/io.rs b/py-polars/src/dataframe/io.rs index ad69143b042a..d462259d9be7 100644 --- a/py-polars/src/dataframe/io.rs +++ b/py-polars/src/dataframe/io.rs @@ -63,7 +63,10 @@ impl PyDataFrame { ) -> PyResult { let null_values = null_values.map(|w| w.0); let eol_char = eol_char.as_bytes()[0]; - let row_index = row_index.map(|(name, offset)| RowIndex { name, offset }); + let row_index = row_index.map(|(name, offset)| RowIndex { + name: Arc::from(name.as_str()), + offset, + }); let quote_char = quote_char.and_then(|s| s.as_bytes().first().copied()); let overwrite_dtype = overwrite_dtype.map(|overwrite_dtype| { @@ -86,36 +89,40 @@ impl PyDataFrame { py_f = read_if_bytesio(py_f); let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?; let df = py.allow_threads(move || { - CsvReader::new(mmap_bytes_r) - .infer_schema(infer_schema_length) - .has_header(has_header) + CsvReadOptions::default() + .with_path(path) + .with_infer_schema_length(infer_schema_length) + .with_has_header(has_header) .with_n_rows(n_rows) - .with_separator(separator.as_bytes()[0]) .with_skip_rows(skip_rows) .with_ignore_errors(ignore_errors) - .with_projection(projection) + .with_projection(projection.map(Arc::new)) .with_rechunk(rechunk) .with_chunk_size(chunk_size) - .with_encoding(encoding.0) - .with_columns(columns) + .with_columns(columns.map(Arc::new)) .with_n_threads(n_threads) - .with_path(path) - .with_dtypes(overwrite_dtype.map(Arc::new)) - .with_dtypes_slice(overwrite_dtype_slice.as_deref()) + .with_schema_overwrite(overwrite_dtype.map(Arc::new)) + .with_dtype_overwrite(overwrite_dtype_slice.map(Arc::new)) .with_schema(schema.map(|schema| Arc::new(schema.0))) - .low_memory(low_memory) - .with_null_values(null_values) - .with_missing_is_null(!missing_utf8_is_empty_string) - .with_comment_prefix(comment_prefix) - .with_try_parse_dates(try_parse_dates) - .with_quote_char(quote_char) - .with_end_of_line_char(eol_char) + .with_low_memory(low_memory) .with_skip_rows_after_header(skip_rows_after_header) .with_row_index(row_index) - .sample_size(sample_size) - .raise_if_empty(raise_if_empty) - .truncate_ragged_lines(truncate_ragged_lines) - .with_decimal_comma(decimal_comma) + .with_sample_size(sample_size) + .with_raise_if_empty(raise_if_empty) + .with_parse_options( + CsvParseOptions::default() + .with_separator(separator.as_bytes()[0]) + .with_encoding(encoding.0) + .with_missing_is_null(!missing_utf8_is_empty_string) + .with_comment_prefix(comment_prefix) + .with_null_values(null_values) + .with_try_parse_dates(try_parse_dates) + .with_quote_char(quote_char) + .with_eol_char(eol_char) + .with_truncate_ragged_lines(truncate_ragged_lines) + .with_decimal_comma(decimal_comma), + ) + .into_reader_with_file_handle(mmap_bytes_r) .finish() .map_err(PyPolarsErr::from) })?; @@ -139,7 +146,10 @@ impl PyDataFrame { ) -> PyResult { use EitherRustPythonFile::*; - let row_index = row_index.map(|(name, offset)| RowIndex { name, offset }); + let row_index = row_index.map(|(name, offset)| RowIndex { + name: Arc::from(name.as_str()), + offset, + }); let result = match get_either_file(py_f, false)? { Py(f) => { let buf = f.as_buffer(); @@ -264,7 +274,10 @@ impl PyDataFrame { row_index: Option<(String, IdxSize)>, memory_map: bool, ) -> PyResult { - let row_index = row_index.map(|(name, offset)| RowIndex { name, offset }); + let row_index = row_index.map(|(name, offset)| RowIndex { + name: Arc::from(name.as_str()), + offset, + }); py_f = read_if_bytesio(py_f); let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?; let df = py.allow_threads(move || { @@ -292,7 +305,10 @@ impl PyDataFrame { row_index: Option<(String, IdxSize)>, rechunk: bool, ) -> PyResult { - let row_index = row_index.map(|(name, offset)| RowIndex { name, offset }); + let row_index = row_index.map(|(name, offset)| RowIndex { + name: Arc::from(name.as_str()), + offset, + }); py_f = read_if_bytesio(py_f); let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?; let df = py.allow_threads(move || { diff --git a/py-polars/src/lazyframe/mod.rs b/py-polars/src/lazyframe/mod.rs index 46a6283927a2..3c521e473f07 100644 --- a/py-polars/src/lazyframe/mod.rs +++ b/py-polars/src/lazyframe/mod.rs @@ -116,7 +116,10 @@ impl PyLazyFrame { row_index: Option<(String, IdxSize)>, ignore_errors: bool, ) -> PyResult { - let row_index = row_index.map(|(name, offset)| RowIndex { name, offset }); + let row_index = row_index.map(|(name, offset)| RowIndex { + name: Arc::from(name.as_str()), + offset, + }); let r = if let Some(path) = &path { LazyJsonLineReader::new(path) @@ -180,7 +183,10 @@ impl PyLazyFrame { let quote_char = quote_char.map(|s| s.as_bytes()[0]); let separator = separator.as_bytes()[0]; let eol_char = eol_char.as_bytes()[0]; - let row_index = row_index.map(|(name, offset)| RowIndex { name, offset }); + let row_index = row_index.map(|(name, offset)| RowIndex { + name: Arc::from(name.as_str()), + offset, + }); let overwrite_dtype = overwrite_dtype.map(|overwrite_dtype| { overwrite_dtype @@ -198,17 +204,17 @@ impl PyLazyFrame { let mut r = r .with_infer_schema_length(infer_schema_length) .with_separator(separator) - .has_header(has_header) + .with_has_header(has_header) .with_ignore_errors(ignore_errors) .with_skip_rows(skip_rows) .with_n_rows(n_rows) .with_cache(cache) .with_dtype_overwrite(overwrite_dtype.map(Arc::new)) .with_schema(schema.map(|schema| Arc::new(schema.0))) - .low_memory(low_memory) + .with_low_memory(low_memory) .with_comment_prefix(comment_prefix) .with_quote_char(quote_char) - .with_end_of_line_char(eol_char) + .with_eol_char(eol_char) .with_rechunk(rechunk) .with_skip_rows_after_header(skip_rows_after_header) .with_encoding(encoding.0) @@ -216,10 +222,10 @@ impl PyLazyFrame { .with_try_parse_dates(try_parse_dates) .with_null_values(null_values) .with_missing_is_null(!missing_utf8_is_empty_string) - .truncate_ragged_lines(truncate_ragged_lines) + .with_truncate_ragged_lines(truncate_ragged_lines) .with_decimal_comma(decimal_comma) .with_glob(glob) - .raise_if_empty(raise_if_empty); + .with_raise_if_empty(raise_if_empty); if let Some(lambda) = with_schema_modify { let f = |schema: Schema| { @@ -292,7 +298,10 @@ impl PyLazyFrame { options }); } - let row_index = row_index.map(|(name, offset)| RowIndex { name, offset }); + let row_index = row_index.map(|(name, offset)| RowIndex { + name: Arc::from(name.as_str()), + offset, + }); let hive_options = HiveOptions { enabled: hive_partitioning, schema: hive_schema, @@ -334,7 +343,10 @@ impl PyLazyFrame { cloud_options: Option>, retries: usize, ) -> PyResult { - let row_index = row_index.map(|(name, offset)| RowIndex { name, offset }); + let row_index = row_index.map(|(name, offset)| RowIndex { + name: Arc::from(name.as_str()), + offset, + }); #[cfg(feature = "cloud")] let cloud_options = { diff --git a/py-polars/src/lazyframe/visitor/nodes.rs b/py-polars/src/lazyframe/visitor/nodes.rs index 219ec49aa6b2..ba37cc3f4fd8 100644 --- a/py-polars/src/lazyframe/visitor/nodes.rs +++ b/py-polars/src/lazyframe/visitor/nodes.rs @@ -71,7 +71,7 @@ impl PyFileOptions { .inner .row_index .as_ref() - .map_or_else(|| py.None(), |n| (&n.name, n.offset).to_object(py))) + .map_or_else(|| py.None(), |n| (n.name.as_ref(), n.offset).to_object(py))) } #[getter] fn rechunk(&self, _py: Python<'_>) -> PyResult { From 2248b730143e47c49219cb0ec9f211fb707c0c25 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Thu, 9 May 2024 20:17:13 +1000 Subject: [PATCH 02/13] update todos --- crates/polars-io/src/csv/read/options.rs | 2 -- crates/polars-lazy/src/physical_plan/executors/scan/csv.rs | 5 +++-- crates/polars-pipe/src/executors/sources/csv.rs | 3 ++- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/polars-io/src/csv/read/options.rs b/crates/polars-io/src/csv/read/options.rs index 0dad82a0d9b1..0934c2ed562a 100644 --- a/crates/polars-io/src/csv/read/options.rs +++ b/crates/polars-io/src/csv/read/options.rs @@ -35,8 +35,6 @@ pub struct CsvReadOptions { pub skip_rows_after_header: usize, pub infer_schema_length: Option, pub raise_if_empty: bool, - // TODO: Ask if we should rename this to `ignore_parse_errors` to make it - // clear this targets parsing errors. pub ignore_errors: bool, } diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/csv.rs b/crates/polars-lazy/src/physical_plan/executors/scan/csv.rs index 9e568363be27..27a8f744b32a 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/csv.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/csv.rs @@ -29,7 +29,7 @@ impl CsvExec { // If we don't set it to 0 here, it will skip double the amount of rows. // But if we set it to 0, it will still skip the requested amount of rows. // The reason I currently cannot fathom. - // TODO: Find out why + // TODO: Find out why. Maybe has something to do with schema inference. 0, ) .with_schema(Some( @@ -40,7 +40,8 @@ impl CsvExec { .with_rechunk(self.file_options.rechunk) .with_row_index(self.file_options.row_index.clone()) .with_parse_options( - // TODO: Confirm why we set lossy utf8 here. + // TODO: We don't know why LossyUtf8 is set here, so remove it + // to see if it breaks anything. Arc::unwrap_or_clone(self.options.parse_options.clone()) .with_encoding(CsvEncoding::LossyUtf8), ) diff --git a/crates/polars-pipe/src/executors/sources/csv.rs b/crates/polars-pipe/src/executors/sources/csv.rs index da134df9a7fb..c06465786d56 100644 --- a/crates/polars-pipe/src/executors/sources/csv.rs +++ b/crates/polars-pipe/src/executors/sources/csv.rs @@ -71,7 +71,8 @@ impl CsvSource { .with_rechunk(false) .with_row_index(file_options.row_index) .with_parse_options(parse_options.with_encoding( - // TODO: Confirm why we set lossy utf8 here. + // TODO: We don't know why LossyUtf8 is set here, so remove it + // to see if it breaks anything. CsvEncoding::LossyUtf8, )) .with_path(Some(path)) From a2783758b215d411ec597a2096d4259dd1d07181 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Thu, 9 May 2024 21:11:25 +1000 Subject: [PATCH 03/13] wip [skip ci] --- crates/polars/tests/it/io/csv.rs | 149 +++++++++++++++++++------------ 1 file changed, 94 insertions(+), 55 deletions(-) diff --git a/crates/polars/tests/it/io/csv.rs b/crates/polars/tests/it/io/csv.rs index 9732cda02a06..f29f9b396f0b 100644 --- a/crates/polars/tests/it/io/csv.rs +++ b/crates/polars/tests/it/io/csv.rs @@ -128,8 +128,8 @@ fn write_dates() { #[test] fn test_read_csv_file() { let file = std::fs::File::open(FOODS_CSV).unwrap(); - let df = CsvReader::new(file) - .with_path(Some(FOODS_CSV.to_string())) + let df = CsvReadOptions::default() + .into_reader_with_file_handle(file) .finish() .unwrap(); @@ -138,7 +138,9 @@ fn test_read_csv_file() { #[test] fn test_read_csv_filter() -> PolarsResult<()> { - let df = CsvReader::from_path(FOODS_CSV)?.finish()?; + let df = CsvReadOptions::default() + .try_into_reader_with_file_path(Some(FOODS_CSV.into()))? + .finish()?; let out = df.filter(&df.column("fats_g")?.gt(4)?)?; @@ -162,10 +164,11 @@ fn test_parser() -> PolarsResult<()> { "#; let file = Cursor::new(s); - CsvReader::new(file) - .infer_schema(Some(100)) - .has_header(true) + CsvReadOptions::default() + .with_infer_schema_length(Some(100)) + .with_has_header(true) .with_ignore_errors(true) + .into_reader_with_file_handle(file) .finish() .unwrap(); @@ -178,11 +181,12 @@ fn test_parser() -> PolarsResult<()> { let file = Cursor::new(s); // just checks if unwrap doesn't panic - CsvReader::new(file) + CsvReadOptions::default() // we also check if infer schema ignores errors - .infer_schema(Some(10)) - .has_header(true) + .with_infer_schema_length(Some(10)) + .with_has_header(true) .with_ignore_errors(true) + .into_reader_with_file_handle(file) .finish() .unwrap(); @@ -197,9 +201,10 @@ fn test_parser() -> PolarsResult<()> { "#; let file = Cursor::new(s); - let df = CsvReader::new(file) - .infer_schema(Some(100)) - .has_header(true) + let df = CsvReadOptions::default() + .with_infer_schema_length(Some(100)) + .with_has_header(true) + .into_reader_with_file_handle(file) .finish() .unwrap(); @@ -215,9 +220,10 @@ fn test_parser() -> PolarsResult<()> { let s = "head_1,head_2\r\n1,2\r\n1,2\r\n1,2\r\n"; let file = Cursor::new(s); - let df = CsvReader::new(file) - .infer_schema(Some(100)) - .has_header(true) + let df = CsvReadOptions::default() + .with_infer_schema_length(Some(100)) + .with_has_header(true) + .into_reader_with_file_handle(file) .finish() .unwrap(); @@ -228,9 +234,10 @@ fn test_parser() -> PolarsResult<()> { let s = "head_1\r\n1\r\n2\r\n3"; let file = Cursor::new(s); - let df = CsvReader::new(file) - .infer_schema(Some(100)) - .has_header(true) + let df = CsvReadOptions::default() + .with_infer_schema_length(Some(100)) + .with_has_header(true) + .into_reader_with_file_handle(file) .finish() .unwrap(); @@ -252,11 +259,12 @@ fn test_tab_sep() { "#.as_ref(); let file = Cursor::new(csv); - let df = CsvReader::new(file) - .infer_schema(Some(100)) - .with_separator(b'\t') - .has_header(false) + let df = CsvReadOptions::default() + .with_infer_schema_length(Some(100)) + .with_has_header(false) .with_ignore_errors(true) + .with_parse_options(CsvParseOptions::default().with_separator(b'\t')) + .into_reader_with_file_handle(file) .finish() .unwrap(); assert_eq!(df.shape(), (8, 26)) @@ -348,8 +356,9 @@ fn test_newline_in_custom_quote_char() { "#; let file = Cursor::new(csv); - let df = CsvReader::new(file) - .with_quote_char(Some(b'\'')) + let df = CsvReadOptions::default() + .with_parse_options(CsvParseOptions::default().with_quote_char(Some(b'\''))) + .into_reader_with_file_handle(file) .finish() .unwrap(); assert_eq!(df.shape(), (2, 2)); @@ -370,9 +379,10 @@ hello,","," ",world,"!" hello,","," ",world,"!" "#; let file = Cursor::new(csv); - let df = CsvReader::new(file) - .has_header(false) + let df = CsvReadOptions::default() + .with_has_header(false) .with_n_threads(Some(1)) + .into_reader_with_file_handle(file) .finish() .unwrap(); @@ -403,7 +413,10 @@ and more recently with desktop publishing software like Aldus PageMaker includin versions of Lorem Ipsum.",11 "#; let file = Cursor::new(csv); - let df = CsvReader::new(file).finish().unwrap(); + let df = CsvReadOptions::default() + .into_reader_with_file_handle(file) + .finish() + .unwrap(); assert!(df.column("column_2").unwrap().equals(&Series::new( "column_2", @@ -447,7 +460,11 @@ fn test_new_line_escape() { "#; let file = Cursor::new(s); - let _df = CsvReader::new(file).has_header(true).finish().unwrap(); + CsvReadOptions::default() + .with_has_header(true) + .into_reader_with_file_handle(file) + .finish() + .unwrap(); } #[test] @@ -457,7 +474,11 @@ new line character","width" 5.1,3.5,1.4 "#; let file: Cursor<&str> = Cursor::new(s); - let df: DataFrame = CsvReader::new(file).has_header(true).finish().unwrap(); + let df = CsvReadOptions::default() + .with_has_header(true) + .into_reader_with_file_handle(file) + .finish() + .unwrap(); assert_eq!(df.shape(), (1, 3)); assert_eq!( df.get_column_names(), @@ -474,7 +495,11 @@ fn test_quoted_numeric() { "#; let file = Cursor::new(s); - let df = CsvReader::new(file).has_header(true).finish().unwrap(); + let df = CsvReadOptions::default() + .with_has_header(true) + .into_reader_with_file_handle(file) + .finish() + .unwrap(); assert_eq!(df.column("bar").unwrap().dtype(), &DataType::Int64); assert_eq!(df.column("foo").unwrap().dtype(), &DataType::Float64); } @@ -485,11 +510,15 @@ fn test_empty_bytes_to_dataframe() { let schema = Schema::from_iter(fields); let file = Cursor::new(vec![]); - let result = CsvReader::new(file) - .has_header(false) - .with_columns(Some(schema.iter_names().map(|s| s.to_string()).collect())) + let result = CsvReadOptions::default() + .with_has_header(false) + .with_columns(Some(Arc::new( + schema.iter_names().map(|s| s.to_string()).collect(), + ))) .with_schema(Some(Arc::new(schema))) + .into_reader_with_file_handle(file) .finish(); + assert!(result.is_ok()) } @@ -498,9 +527,10 @@ fn test_carriage_return() { let csv = "\"foo\",\"bar\"\r\n\"158252579.00\",\"7.5800\"\r\n\"158252579.00\",\"7.5800\"\r\n"; let file = Cursor::new(csv); - let df = CsvReader::new(file) - .has_header(true) + let df = CsvReadOptions::default() + .with_has_header(true) .with_n_threads(Some(1)) + .into_reader_with_file_handle(file) .finish() .unwrap(); assert_eq!(df.shape(), (2, 2)); @@ -515,13 +545,14 @@ fn test_missing_value() { "#; let file = Cursor::new(csv); - let df = CsvReader::new(file) - .has_header(true) + let df = CsvReadOptions::default() + .with_has_header(true) .with_schema(Some(Arc::new(Schema::from_iter([ Field::new("foo", DataType::UInt32), Field::new("bar", DataType::UInt32), Field::new("ham", DataType::UInt32), ])))) + .into_reader_with_file_handle(file) .finish() .unwrap(); assert_eq!(df.column("ham").unwrap().len(), 3) @@ -537,13 +568,14 @@ AUDCAD,1616455920,0.92212,0.95556,1 AUDCAD,1616455921,0.96212,0.95666,1 "#; let file = Cursor::new(csv); - let df = CsvReader::new(file) - .has_header(true) - .with_dtypes(Some(Arc::new(Schema::from_iter([Field::new( + let df = CsvReadOptions::default() + .with_has_header(true) + .with_schema_overwrite(Some(Arc::new(Schema::from_iter([Field::new( "b", DataType::Datetime(TimeUnit::Nanoseconds, None), )])))) .with_ignore_errors(true) + .into_reader_with_file_handle(file) .finish()?; assert_eq!( @@ -570,10 +602,11 @@ fn test_skip_rows() -> PolarsResult<()> { "; let file = Cursor::new(csv); - let df = CsvReader::new(file) - .has_header(false) + let df = CsvReadOptions::default() + .with_has_header(false) .with_skip_rows(3) - .with_separator(b' ') + .with_parse_options(CsvParseOptions::default().with_separator(b' ')) + .into_reader_with_file_handle(file) .finish()?; assert_eq!(df.height(), 3); @@ -588,20 +621,22 @@ fn test_projection_idx() -> PolarsResult<()> { "; let file = Cursor::new(csv); - let df = CsvReader::new(file) - .has_header(false) - .with_projection(Some(vec![4, 5])) - .with_separator(b' ') + let df = CsvReadOptions::default() + .with_has_header(false) + .with_projection(Some(Arc::new(vec![4, 5]))) + .with_parse_options(CsvParseOptions::default().with_separator(b' ')) + .into_reader_with_file_handle(file) .finish()?; assert_eq!(df.width(), 2); // this should give out of bounds error let file = Cursor::new(csv); - let out = CsvReader::new(file) - .has_header(false) - .with_projection(Some(vec![4, 6])) - .with_separator(b' ') + let out = CsvReadOptions::default() + .with_has_header(false) + .with_projection(Some(Arc::new(vec![4, 6]))) + .with_parse_options(CsvParseOptions::default().with_separator(b' ')) + .into_reader_with_file_handle(file) .finish(); assert!(out.is_err()); @@ -617,7 +652,10 @@ fn test_missing_fields() -> PolarsResult<()> { "; let file = Cursor::new(csv); - let df = CsvReader::new(file).has_header(false).finish()?; + let df = CsvReadOptions::default() + .with_has_header(false) + .into_reader_with_file_handle(file) + .finish()?; use polars_core::df; let expect = df![ @@ -641,9 +679,10 @@ fn test_comment_lines() -> PolarsResult<()> { "; let file = Cursor::new(csv); - let df = CsvReader::new(file) - .has_header(false) - .with_comment_prefix(Some("#")) + let df = CsvReadOptions::default() + .with_has_header(false) + .with_parse_options(CsvParseOptions::default().with_comment_prefix(Some("#"))) + .into_reader_with_file_handle(file) .finish()?; assert_eq!(df.shape(), (3, 5)); From 801135124ea806afb84c0031452b8019e2db648f Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Thu, 9 May 2024 21:12:32 +1000 Subject: [PATCH 04/13] wip [skip ci] --- crates/polars/tests/it/io/csv.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/crates/polars/tests/it/io/csv.rs b/crates/polars/tests/it/io/csv.rs index f29f9b396f0b..d033ccb5b8f9 100644 --- a/crates/polars/tests/it/io/csv.rs +++ b/crates/polars/tests/it/io/csv.rs @@ -694,9 +694,10 @@ fn test_comment_lines() -> PolarsResult<()> { "; let file = Cursor::new(csv); - let df = CsvReader::new(file) - .has_header(false) - .with_comment_prefix(Some("!#&")) + let df = CsvReadOptions::default() + .with_has_header(false) + .with_parse_options(CsvParseOptions::default().with_comment_prefix(Some("#"))) + .into_reader_with_file_handle(file) .finish()?; assert_eq!(df.shape(), (3, 5)); @@ -709,9 +710,10 @@ fn test_comment_lines() -> PolarsResult<()> { "; let file = Cursor::new(csv); - let df = CsvReader::new(file) - .has_header(true) - .with_comment_prefix(Some("%")) + let df = CsvReadOptions::default() + .with_has_header(true) + .with_parse_options(CsvParseOptions::default().with_comment_prefix(Some("%"))) + .into_reader_with_file_handle(file) .finish()?; assert_eq!(df.shape(), (3, 5)); From 07ca55e6b716835e2d118adffaeea964cdce995f Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Fri, 10 May 2024 14:12:03 +1000 Subject: [PATCH 05/13] wip [skip ci] --- crates/polars-io/src/csv/read/options.rs | 10 +++++ crates/polars-io/src/csv/read/reader.rs | 4 +- .../src/physical_plan/executors/scan/csv.rs | 13 ++++--- .../polars-pipe/src/executors/sources/csv.rs | 14 +++---- crates/polars/tests/it/io/csv.rs | 37 +++++++++++++------ 5 files changed, 52 insertions(+), 26 deletions(-) diff --git a/crates/polars-io/src/csv/read/options.rs b/crates/polars-io/src/csv/read/options.rs index 0934c2ed562a..1ea2b8262253 100644 --- a/crates/polars-io/src/csv/read/options.rs +++ b/crates/polars-io/src/csv/read/options.rs @@ -233,6 +233,16 @@ impl CsvReadOptions { self.ignore_errors = ignore_errors; self } + + /// Apply a function to the parse options. + pub fn map_parse_options CsvParseOptions>( + mut self, + map_func: F, + ) -> Self { + let parse_options = Arc::unwrap_or_clone(self.parse_options); + self.parse_options = Arc::new(map_func(parse_options)); + self + } } impl CsvParseOptions { diff --git a/crates/polars-io/src/csv/read/reader.rs b/crates/polars-io/src/csv/read/reader.rs index 447c43f471dc..cfac92132d07 100644 --- a/crates/polars-io/src/csv/read/reader.rs +++ b/crates/polars-io/src/csv/read/reader.rs @@ -31,8 +31,9 @@ use crate::utils::{get_reader_bytes, resolve_homedir}; /// use std::fs::File; /// /// fn example() -> PolarsResult { -/// CsvReader::from_path("iris.csv")? +/// CsvReadOptions::default() /// .with_has_header(true) +/// .try_into_reader_with_path("iris.csv")? /// .finish() /// } /// ``` @@ -315,7 +316,6 @@ impl SerReader for CsvReader where R: MmapBytesReader, { - /// Create a new CsvReader from a file/stream. fn new(reader: R) -> Self { CsvReader { reader, diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/csv.rs b/crates/polars-lazy/src/physical_plan/executors/scan/csv.rs index 27a8f744b32a..360b80179db8 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/csv.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/csv.rs @@ -39,12 +39,13 @@ impl CsvExec { .with_columns(with_columns) .with_rechunk(self.file_options.rechunk) .with_row_index(self.file_options.row_index.clone()) - .with_parse_options( - // TODO: We don't know why LossyUtf8 is set here, so remove it - // to see if it breaks anything. - Arc::unwrap_or_clone(self.options.parse_options.clone()) - .with_encoding(CsvEncoding::LossyUtf8), - ) + .map_parse_options(|parse_options| { + parse_options.with_encoding( + // TODO: We don't know why LossyUtf8 is set here, so remove it + // to see if it breaks anything. + CsvEncoding::LossyUtf8, + ) + }) .with_path(Some(self.path.clone())) .try_into_reader_with_file_path(None)? ._with_predicate(predicate) diff --git a/crates/polars-pipe/src/executors/sources/csv.rs b/crates/polars-pipe/src/executors/sources/csv.rs index c06465786d56..5bb368ae26bb 100644 --- a/crates/polars-pipe/src/executors/sources/csv.rs +++ b/crates/polars-pipe/src/executors/sources/csv.rs @@ -1,6 +1,5 @@ use std::fs::File; use std::path::PathBuf; -use std::sync::Arc; use polars_core::export::arrow::Either; use polars_core::POOL; @@ -62,7 +61,6 @@ impl CsvSource { } let low_memory = options.low_memory; - let parse_options = Arc::unwrap_or_clone(options.clone().parse_options); let reader: CsvReader = options .with_schema_overwrite(Some(self.schema.clone())) @@ -70,11 +68,13 @@ impl CsvSource { .with_columns(with_columns) .with_rechunk(false) .with_row_index(file_options.row_index) - .with_parse_options(parse_options.with_encoding( - // TODO: We don't know why LossyUtf8 is set here, so remove it - // to see if it breaks anything. - CsvEncoding::LossyUtf8, - )) + .map_parse_options(|parse_options| { + parse_options.with_encoding( + // TODO: We don't know why LossyUtf8 is set here, so remove it + // to see if it breaks anything. + CsvEncoding::LossyUtf8, + ) + }) .with_path(Some(path)) .try_into_reader_with_file_path(None)?; diff --git a/crates/polars/tests/it/io/csv.rs b/crates/polars/tests/it/io/csv.rs index d033ccb5b8f9..808aafedfa95 100644 --- a/crates/polars/tests/it/io/csv.rs +++ b/crates/polars/tests/it/io/csv.rs @@ -728,9 +728,12 @@ null-value,b,bar "; let file = Cursor::new(csv); - let df = CsvReader::new(file) - .has_header(false) - .with_null_values(NullValues::AllColumnsSingle("null-value".to_string()).into()) + let df = CsvReadOptions::default() + .with_parse_options( + CsvParseOptions::default() + .with_null_values(Some(NullValues::AllColumnsSingle("null-value".to_string()))), + ) + .into_reader_with_file_handle(file) .finish()?; assert!(df.get_columns()[0].null_count() > 0); Ok(()) @@ -981,9 +984,13 @@ fn test_scientific_floats() -> PolarsResult<()> { fn test_tsv_header_offset() -> PolarsResult<()> { let csv = "foo\tbar\n\t1000011\t1\n\t1000026\t2\n\t1000949\t2"; let file = Cursor::new(csv); - let df = CsvReader::new(file) - .truncate_ragged_lines(true) - .with_separator(b'\t') + let df = CsvReadOptions::default() + .with_parse_options( + CsvParseOptions::default() + .with_truncate_ragged_lines(true) + .with_separator(b'\t'), + ) + .into_reader_with_file_handle(file) .finish()?; assert_eq!(df.shape(), (3, 2)); @@ -1002,8 +1009,12 @@ fn test_null_values_infer_schema() -> PolarsResult<()> { 3,NA 5,6"#; let file = Cursor::new(csv); - let df = CsvReader::new(file) - .with_null_values(Some(NullValues::AllColumnsSingle("NA".into()))) + let df = CsvReadOptions::default() + .with_parse_options( + CsvParseOptions::default() + .with_null_values(Some(NullValues::AllColumnsSingle("NA".into()))), + ) + .into_reader_with_file_handle(file) .finish()?; let expected = &[DataType::Int64, DataType::Int64]; assert_eq!(df.dtypes(), expected); @@ -1014,7 +1025,10 @@ fn test_null_values_infer_schema() -> PolarsResult<()> { fn test_comma_separated_field_in_tsv() -> PolarsResult<()> { let csv = "first\tsecond\n1\t2.3,2.4\n3\t4.5,4.6\n"; let file = Cursor::new(csv); - let df = CsvReader::new(file).with_separator(b'\t').finish()?; + let df = CsvReadOptions::default() + .map_parse_options(|parse_options| parse_options.with_separator(b'\t')) + .into_reader_with_file_handle(file) + .finish()?; assert_eq!(df.dtypes(), &[DataType::Int64, DataType::String]); Ok(()) } @@ -1026,8 +1040,9 @@ a,"b",c,d,1 a,"b",c,d,1 a,b,c,d,1"#; let file = Cursor::new(csv); - let df = CsvReader::new(file) - .with_projection(Some(vec![1, 4])) + let df = CsvReadOptions::default() + .with_projection(Some(Arc::new(vec![1, 4]))) + .into_reader_with_file_handle(file) .finish()?; assert_eq!(df.shape(), (3, 2)); From 010427d47dcff38ff115c2138c7a89a9364d6f79 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Fri, 10 May 2024 14:18:33 +1000 Subject: [PATCH 06/13] wip [skip ci] --- crates/polars/tests/it/io/csv.rs | 34 ++++++++++++++++++++++++-------- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/crates/polars/tests/it/io/csv.rs b/crates/polars/tests/it/io/csv.rs index 808aafedfa95..030e6f3797c8 100644 --- a/crates/polars/tests/it/io/csv.rs +++ b/crates/polars/tests/it/io/csv.rs @@ -1055,7 +1055,10 @@ fn test_last_line_incomplete() -> PolarsResult<()> { let csv = "b5bbf310dffe3372fd5d37a18339fea5,6a2752ffad059badb5f1f3c7b9e4905d,-2,0.033191,811.619 0.487341,16,GGTGTGAAATTTCACACC,TTTAATTATAATTAAG,+ b5bbf310dffe3372fd5d37a18339fea5,e3fd7b95be3453a34361da84f815687d,-2,0.0335936,821.465 0.490834,1"; let file = Cursor::new(csv); - let df = CsvReader::new(file).has_header(false).finish()?; + let df = CsvReadOptions::default() + .with_has_header(false) + .into_reader_with_file_handle(file) + .finish()?; assert_eq!(df.shape(), (2, 9)); Ok(()) } @@ -1089,16 +1092,23 @@ foo,bar 5,6 "#; let file = Cursor::new(csv); - let df = CsvReader::new(file.clone()).with_skip_rows(2).finish()?; + let df = CsvReadOptions::default() + .with_skip_rows(2) + .into_reader_with_file_handle(file.clone()) + .finish()?; assert_eq!(df.get_column_names(), &["foo", "bar"]); assert_eq!(df.shape(), (3, 2)); - let df = CsvReader::new(file.clone()) + let df = CsvReadOptions::default() .with_skip_rows(2) .with_skip_rows_after_header(2) + .into_reader_with_file_handle(file.clone()) .finish()?; assert_eq!(df.get_column_names(), &["foo", "bar"]); assert_eq!(df.shape(), (1, 2)); - let df = CsvReader::new(file).truncate_ragged_lines(true).finish()?; + let df = CsvReadOptions::default() + .map_parse_options(|parse_options| parse_options.with_truncate_ragged_lines(true)) + .into_reader_with_file_handle(file) + .finish()?; assert_eq!(df.shape(), (5, 1)); Ok(()) @@ -1106,22 +1116,24 @@ foo,bar #[test] fn test_with_row_index() -> PolarsResult<()> { - let df = CsvReader::from_path(FOODS_CSV)? + let df = CsvReadOptions::default() .with_row_index(Some(RowIndex { name: "rc".into(), offset: 0, })) + .try_into_reader_with_file_path(Some(FOODS_CSV.into()))? .finish()?; let rc = df.column("rc")?; assert_eq!( rc.idx()?.into_no_null_iter().collect::>(), (0 as IdxSize..27).collect::>() ); - let df = CsvReader::from_path(FOODS_CSV)? + let df = CsvReadOptions::default() .with_row_index(Some(RowIndex { name: "rc_2".into(), offset: 10, })) + .try_into_reader_with_file_path(Some(FOODS_CSV.into()))? .finish()?; let rc = df.column("rc_2")?; assert_eq!( @@ -1135,7 +1147,10 @@ fn test_with_row_index() -> PolarsResult<()> { fn test_empty_string_cols() -> PolarsResult<()> { let csv = "\nabc\n\nxyz\n"; let file = Cursor::new(csv); - let df = CsvReader::new(file).has_header(false).finish()?; + let df = CsvReadOptions::default() + .with_has_header(false) + .into_reader_with_file_handle(file) + .finish()?; let s = df.column("column_1")?; let ca = s.str()?; assert_eq!( @@ -1145,7 +1160,10 @@ fn test_empty_string_cols() -> PolarsResult<()> { let csv = ",\nabc,333\n,666\nxyz,999"; let file = Cursor::new(csv); - let df = CsvReader::new(file).has_header(false).finish()?; + let df = CsvReadOptions::default() + .with_has_header(false) + .into_reader_with_file_handle(file) + .finish()?; let expected = df![ "column_1" => [None, Some("abc"), None, Some("xyz")], "column_2" => [None, Some(333i64), Some(666), Some(999)] From 1f02408a76a34ed85a0a2b6128ff29db2480ff15 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Fri, 10 May 2024 14:29:45 +1000 Subject: [PATCH 07/13] wip [skip ci] --- crates/polars-io/src/csv/read/reader.rs | 3 ++ crates/polars/tests/it/io/csv.rs | 61 ++++++++++++++++--------- 2 files changed, 43 insertions(+), 21 deletions(-) diff --git a/crates/polars-io/src/csv/read/reader.rs b/crates/polars-io/src/csv/read/reader.rs index cfac92132d07..bf384322aa4a 100644 --- a/crates/polars-io/src/csv/read/reader.rs +++ b/crates/polars-io/src/csv/read/reader.rs @@ -316,6 +316,9 @@ impl SerReader for CsvReader where R: MmapBytesReader, { + /// Create a new CsvReader from a file/stream using default read options. To + /// use non-default read options, first contruct [CsvReadOptions] and then use + /// any of the `(try)_into_` methods. fn new(reader: R) -> Self { CsvReader { reader, diff --git a/crates/polars/tests/it/io/csv.rs b/crates/polars/tests/it/io/csv.rs index 030e6f3797c8..fac3a071842d 100644 --- a/crates/polars/tests/it/io/csv.rs +++ b/crates/polars/tests/it/io/csv.rs @@ -272,11 +272,10 @@ fn test_tab_sep() { #[test] fn test_projection() -> PolarsResult<()> { - let df = CsvReader::from_path(FOODS_CSV) - .unwrap() - .with_projection(Some(vec![0, 2])) - .finish() - .unwrap(); + let df = CsvReadOptions::default() + .with_projection(Some(vec![0, 2].into())) + .try_into_reader_with_file_path(Some(FOODS_CSV.into()))? + .finish()?; let col_1 = df.select_at_idx(0).unwrap(); assert_eq!(col_1.get(0)?, AnyValue::String("vegetables")); assert_eq!(col_1.get(1)?, AnyValue::String("seafood")); @@ -443,9 +442,10 @@ id090,id048,id0000067778,24,2,51862,4,9, "#; let file = Cursor::new(csv); - let df = CsvReader::new(file) - .has_header(true) + let df = CsvReadOptions::default() + .with_has_header(true) .with_n_threads(Some(1)) + .into_reader_with_file_handle(file) .finish() .unwrap(); assert_eq!(df.shape(), (3, 9)); @@ -767,7 +767,10 @@ fn test_automatic_datetime_parsing() -> PolarsResult<()> { "; let file = Cursor::new(csv); - let df = CsvReader::new(file).with_try_parse_dates(true).finish()?; + let df = CsvReadOptions::default() + .map_parse_options(|parse_options| parse_options.with_try_parse_dates(true)) + .into_reader_with_file_handle(file) + .finish()?; let ts = df.column("timestamp")?; assert_eq!( @@ -790,7 +793,10 @@ fn test_automatic_datetime_parsing_default_formats() -> PolarsResult<()> { "; let file = Cursor::new(csv); - let df = CsvReader::new(file).with_try_parse_dates(true).finish()?; + let df = CsvReadOptions::default() + .map_parse_options(|parse_options| parse_options.with_try_parse_dates(true)) + .into_reader_with_file_handle(file) + .finish()?; for col in df.get_column_names() { let ts = df.column(col)?; @@ -819,7 +825,10 @@ fn test_no_quotes() -> PolarsResult<()> { "#; let file = Cursor::new(rolling_stones); - let df = CsvReader::new(file).with_quote_char(None).finish()?; + let df = CsvReadOptions::default() + .map_parse_options(|parse_options| parse_options.with_quote_char(None)) + .into_reader_with_file_handle(file) + .finish()?; assert_eq!(df.shape(), (9, 3)); Ok(()) @@ -846,7 +855,10 @@ fn test_header_inference() -> PolarsResult<()> { 4,3,2,1 "#; let file = Cursor::new(csv); - let df = CsvReader::new(file).has_header(false).finish()?; + let df = CsvReadOptions::default() + .with_has_header(false) + .into_reader_with_file_handle(file) + .finish()?; assert_eq!(df.dtypes(), vec![DataType::String; 4]); Ok(()) } @@ -856,8 +868,9 @@ fn test_header_with_comments() -> PolarsResult<()> { let csv = "# ignore me\na,b,c\nd,e,f"; let file = Cursor::new(csv); - let df = CsvReader::new(file) - .with_comment_prefix(Some("#")) + let df = CsvReadOptions::default() + .map_parse_options(|parse_options| parse_options.with_comment_prefix(Some("#"))) + .into_reader_with_file_handle(file) .finish()?; // 1 row. assert_eq!(df.shape(), (1, 3)); @@ -877,9 +890,10 @@ fn test_ignore_parse_dates() -> PolarsResult<()> { use DataType::*; let file = Cursor::new(csv); - let df = CsvReader::new(file) - .with_try_parse_dates(true) - .with_dtypes_slice(Some(&[String, String, String])) + let df = CsvReadOptions::default() + .with_dtype_overwrite(Some(vec![String, String, String].into())) + .map_parse_options(|parse_options| parse_options.with_try_parse_dates(true)) + .into_reader_with_file_handle(file) .finish()?; assert_eq!(df.dtypes(), &[String, String, String]); @@ -899,16 +913,18 @@ A3,\"B4_\"\"with_embedded_double_quotes\"\"\",C4,4"; assert_eq!(df.shape(), (4, 4)); let file = Cursor::new(csv); - let df = CsvReader::new(file) + let df = CsvReadOptions::default() .with_n_threads(Some(1)) - .with_projection(Some(vec![0, 2])) + .with_projection(Some(vec![0, 2].into())) + .into_reader_with_file_handle(file) .finish()?; assert_eq!(df.shape(), (4, 2)); let file = Cursor::new(csv); - let df = CsvReader::new(file) + let df = CsvReadOptions::default() .with_n_threads(Some(1)) - .with_projection(Some(vec![1])) + .with_projection(Some(vec![1].into())) + .into_reader_with_file_handle(file) .finish()?; assert_eq!(df.shape(), (4, 1)); @@ -922,7 +938,10 @@ fn test_infer_schema_0_rows() -> PolarsResult<()> { 1,a,1.0,false "#; let file = Cursor::new(csv); - let df = CsvReader::new(file).infer_schema(Some(0)).finish()?; + let df = CsvReadOptions::default() + .with_infer_schema_length(Some(0)) + .into_reader_with_file_handle(file) + .finish()?; assert_eq!( df.dtypes(), &[ From d5df68c8f854e6758753539014effd7a041d5aa4 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Fri, 10 May 2024 14:39:23 +1000 Subject: [PATCH 08/13] wip [skip ci] --- crates/polars/tests/it/io/csv.rs | 85 ++++++++++++------- .../src/rust/user-guide/expressions/window.rs | 6 +- 2 files changed, 58 insertions(+), 33 deletions(-) diff --git a/crates/polars/tests/it/io/csv.rs b/crates/polars/tests/it/io/csv.rs index fac3a071842d..bae32662fe51 100644 --- a/crates/polars/tests/it/io/csv.rs +++ b/crates/polars/tests/it/io/csv.rs @@ -263,7 +263,7 @@ fn test_tab_sep() { .with_infer_schema_length(Some(100)) .with_has_header(false) .with_ignore_errors(true) - .with_parse_options(CsvParseOptions::default().with_separator(b'\t')) + .map_parse_options(|parse_options| parse_options.with_separator(b'\t')) .into_reader_with_file_handle(file) .finish() .unwrap(); @@ -356,7 +356,7 @@ fn test_newline_in_custom_quote_char() { let file = Cursor::new(csv); let df = CsvReadOptions::default() - .with_parse_options(CsvParseOptions::default().with_quote_char(Some(b'\''))) + .map_parse_options(|parse_options| parse_options.with_quote_char(Some(b'\''))) .into_reader_with_file_handle(file) .finish() .unwrap(); @@ -605,7 +605,7 @@ fn test_skip_rows() -> PolarsResult<()> { let df = CsvReadOptions::default() .with_has_header(false) .with_skip_rows(3) - .with_parse_options(CsvParseOptions::default().with_separator(b' ')) + .map_parse_options(|parse_options| parse_options.with_separator(b' ')) .into_reader_with_file_handle(file) .finish()?; @@ -624,7 +624,7 @@ fn test_projection_idx() -> PolarsResult<()> { let df = CsvReadOptions::default() .with_has_header(false) .with_projection(Some(Arc::new(vec![4, 5]))) - .with_parse_options(CsvParseOptions::default().with_separator(b' ')) + .map_parse_options(|parse_options| parse_options.with_separator(b' ')) .into_reader_with_file_handle(file) .finish()?; @@ -635,7 +635,7 @@ fn test_projection_idx() -> PolarsResult<()> { let out = CsvReadOptions::default() .with_has_header(false) .with_projection(Some(Arc::new(vec![4, 6]))) - .with_parse_options(CsvParseOptions::default().with_separator(b' ')) + .map_parse_options(|parse_options| parse_options.with_separator(b' ')) .into_reader_with_file_handle(file) .finish(); @@ -681,7 +681,7 @@ fn test_comment_lines() -> PolarsResult<()> { let file = Cursor::new(csv); let df = CsvReadOptions::default() .with_has_header(false) - .with_parse_options(CsvParseOptions::default().with_comment_prefix(Some("#"))) + .map_parse_options(|parse_options| parse_options.with_comment_prefix(Some("#"))) .into_reader_with_file_handle(file) .finish()?; assert_eq!(df.shape(), (3, 5)); @@ -696,7 +696,7 @@ fn test_comment_lines() -> PolarsResult<()> { let file = Cursor::new(csv); let df = CsvReadOptions::default() .with_has_header(false) - .with_parse_options(CsvParseOptions::default().with_comment_prefix(Some("#"))) + .map_parse_options(|parse_options| parse_options.with_comment_prefix(Some("!#&"))) .into_reader_with_file_handle(file) .finish()?; assert_eq!(df.shape(), (3, 5)); @@ -712,7 +712,7 @@ fn test_comment_lines() -> PolarsResult<()> { let file = Cursor::new(csv); let df = CsvReadOptions::default() .with_has_header(true) - .with_parse_options(CsvParseOptions::default().with_comment_prefix(Some("%"))) + .map_parse_options(|parse_options| parse_options.with_comment_prefix(Some("%"))) .into_reader_with_file_handle(file) .finish()?; assert_eq!(df.shape(), (3, 5)); @@ -729,10 +729,10 @@ null-value,b,bar let file = Cursor::new(csv); let df = CsvReadOptions::default() - .with_parse_options( - CsvParseOptions::default() - .with_null_values(Some(NullValues::AllColumnsSingle("null-value".to_string()))), - ) + .map_parse_options(|parse_options| { + parse_options + .with_null_values(Some(NullValues::AllColumnsSingle("null-value".to_string()))) + }) .into_reader_with_file_handle(file) .finish()?; assert!(df.get_columns()[0].null_count() > 0); @@ -976,7 +976,10 @@ fn test_whitespace_separators() -> PolarsResult<()> { for (content, sep) in contents { let file = Cursor::new(&content); - let df = CsvReader::new(file).with_separator(sep).finish()?; + let df = CsvReadOptions::default() + .map_parse_options(|parse_options| parse_options.with_separator(sep)) + .into_reader_with_file_handle(file) + .finish()?; assert_eq!(df.shape(), (2, 4)); assert_eq!(df.get_column_names(), &["", "a", "b", "c"]); @@ -1004,11 +1007,11 @@ fn test_tsv_header_offset() -> PolarsResult<()> { let csv = "foo\tbar\n\t1000011\t1\n\t1000026\t2\n\t1000949\t2"; let file = Cursor::new(csv); let df = CsvReadOptions::default() - .with_parse_options( - CsvParseOptions::default() + .map_parse_options(|parse_options| { + parse_options .with_truncate_ragged_lines(true) - .with_separator(b'\t'), - ) + .with_separator(b'\t') + }) .into_reader_with_file_handle(file) .finish()?; @@ -1029,10 +1032,9 @@ fn test_null_values_infer_schema() -> PolarsResult<()> { 5,6"#; let file = Cursor::new(csv); let df = CsvReadOptions::default() - .with_parse_options( - CsvParseOptions::default() - .with_null_values(Some(NullValues::AllColumnsSingle("NA".into()))), - ) + .map_parse_options(|parse_options| { + parse_options.with_null_values(Some(NullValues::AllColumnsSingle("NA".into()))) + }) .into_reader_with_file_handle(file) .finish()?; let expected = &[DataType::Int64, DataType::Int64]; @@ -1277,13 +1279,19 @@ fn test_header_only() -> PolarsResult<()> { let file = Cursor::new(csv); // no header - let df = CsvReader::new(file).has_header(false).finish()?; + let df = CsvReadOptions::default() + .with_has_header(false) + .into_reader_with_file_handle(file) + .finish()?; assert_eq!(df.shape(), (1, 3)); // has header for csv in &["x,y,z", "x,y,z\n"] { let file = Cursor::new(csv); - let df = CsvReader::new(file).has_header(true).finish()?; + let df = CsvReadOptions::default() + .with_has_header(true) + .into_reader_with_file_handle(file) + .finish()?; assert_eq!(df.shape(), (0, 3)); assert_eq!( @@ -1301,7 +1309,10 @@ fn test_empty_csv() { let file = Cursor::new(csv); for h in [true, false] { assert!(matches!( - CsvReader::new(file.clone()).has_header(h).finish(), + CsvReadOptions::default() + .with_has_header(h) + .into_reader_with_file_handle(file.clone()) + .finish(), Err(PolarsError::NoData(_)) )) } @@ -1319,9 +1330,13 @@ fn test_try_parse_dates() -> PolarsResult<()> { "; let file = Cursor::new(csv); - let out = CsvReader::new(file).with_try_parse_dates(true).finish()?; - assert_eq!(out.dtypes(), &[DataType::Date]); - assert_eq!(out.column("date")?.null_count(), 1); + let df = CsvReadOptions::default() + .map_parse_options(|parse_options| parse_options.with_try_parse_dates(true)) + .into_reader_with_file_handle(file) + .finish()?; + + assert_eq!(df.dtypes(), &[DataType::Date]); + assert_eq!(df.column("date")?.null_count(), 1); Ok(()) } @@ -1331,10 +1346,15 @@ fn test_try_parse_dates_3380() -> PolarsResult<()> { 46.685;7.953;2022-05-10T07:07:12Z;6.1;0.00 46.685;7.953;2022-05-10T08:07:12Z;8.8;0.00"; let file = Cursor::new(csv); - let df = CsvReader::new(file) - .with_separator(b';') - .with_try_parse_dates(true) + let df = CsvReadOptions::default() + .map_parse_options(|parse_options| { + parse_options + .with_separator(b';') + .with_try_parse_dates(true) + }) + .into_reader_with_file_handle(file) .finish()?; + assert_eq!(df.column("validdate")?.null_count(), 0); Ok(()) } @@ -1358,7 +1378,10 @@ fn test_leading_whitespace_with_quote() -> PolarsResult<()> { fn test_read_io_reader() { let path = "../../examples/datasets/foods1.csv"; let file = std::fs::File::open(path).unwrap(); - let mut reader = CsvReader::from_path(path).unwrap().with_chunk_size(5); + let mut reader = CsvReadOptions::default() + .with_chunk_size(5) + .try_into_reader_with_file_path(Some(path.into())) + .unwrap(); let mut reader = reader.batched_borrowed_read().unwrap(); let batches = reader.next_batches(5).unwrap().unwrap(); diff --git a/docs/src/rust/user-guide/expressions/window.rs b/docs/src/rust/user-guide/expressions/window.rs index b73e62b05490..6414bc984c09 100644 --- a/docs/src/rust/user-guide/expressions/window.rs +++ b/docs/src/rust/user-guide/expressions/window.rs @@ -10,8 +10,10 @@ fn main() -> Result<(), Box> { .bytes() .collect(); - let df = CsvReader::new(std::io::Cursor::new(data)) - .has_header(true) + let file = std::io::Cursor::new(data); + let df = CsvReadOptions::default() + .with_has_header(true) + .into_reader_with_file_handle(file) .finish()?; println!("{}", df); From 84c00eef5a413db5dbd539f6bb1b2181341f76ea Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Fri, 10 May 2024 14:51:29 +1000 Subject: [PATCH 09/13] wip [skip ci] --- crates/polars-io/src/csv/read/options.rs | 3 ++- crates/polars-io/src/csv/read/reader.rs | 2 +- crates/polars-lazy/src/tests/mod.rs | 5 ++--- docs/src/rust/home/example.rs | 2 +- .../rust/user-guide/concepts/lazy-vs-eager.rs | 5 +++-- docs/src/rust/user-guide/concepts/streaming.rs | 2 +- .../rust/user-guide/expressions/aggregation.rs | 9 +++++---- .../getting-started/reading-writing.rs | 17 ++++++++++------- docs/src/rust/user-guide/io/csv.rs | 3 ++- .../transformations/time-series/filter.rs | 5 +++-- .../transformations/time-series/parsing.rs | 10 ++++++---- .../transformations/time-series/rolling.rs | 5 +++-- 12 files changed, 39 insertions(+), 29 deletions(-) diff --git a/crates/polars-io/src/csv/read/options.rs b/crates/polars-io/src/csv/read/options.rs index 1ea2b8262253..3338eec7cd49 100644 --- a/crates/polars-io/src/csv/read/options.rs +++ b/crates/polars-io/src/csv/read/options.rs @@ -178,7 +178,8 @@ impl CsvReadOptions { self } - /// Sets the CSV parsing options. + /// Sets the CSV parsing options. See [map_parse_options][Self::map_parse_options] + /// for an easier way to mutate them in-place. pub fn with_parse_options(mut self, parse_options: CsvParseOptions) -> Self { self.parse_options = Arc::new(parse_options); self diff --git a/crates/polars-io/src/csv/read/reader.rs b/crates/polars-io/src/csv/read/reader.rs index bf384322aa4a..b2bddd4817aa 100644 --- a/crates/polars-io/src/csv/read/reader.rs +++ b/crates/polars-io/src/csv/read/reader.rs @@ -317,7 +317,7 @@ where R: MmapBytesReader, { /// Create a new CsvReader from a file/stream using default read options. To - /// use non-default read options, first contruct [CsvReadOptions] and then use + /// use non-default read options, first construct [CsvReadOptions] and then use /// any of the `(try)_into_` methods. fn new(reader: R) -> Self { CsvReader { diff --git a/crates/polars-lazy/src/tests/mod.rs b/crates/polars-lazy/src/tests/mod.rs index 63126aebb584..3e689b2edd80 100644 --- a/crates/polars-lazy/src/tests/mod.rs +++ b/crates/polars-lazy/src/tests/mod.rs @@ -179,11 +179,10 @@ pub(crate) fn get_df() -> DataFrame { let file = Cursor::new(s); - let df = CsvReadOptions::default() + CsvReadOptions::default() .with_infer_schema_length(Some(3)) .with_has_header(true) .into_reader_with_file_handle(file) .finish() - .unwrap(); - df + .unwrap() } diff --git a/docs/src/rust/home/example.rs b/docs/src/rust/home/example.rs index 398b86cb46eb..6ede797f6758 100644 --- a/docs/src/rust/home/example.rs +++ b/docs/src/rust/home/example.rs @@ -3,7 +3,7 @@ fn main() -> Result<(), Box> { use polars::prelude::*; let q = LazyCsvReader::new("docs/data/iris.csv") - .has_header(true) + .with_has_header(true) .finish()? .filter(col("sepal_length").gt(lit(5))) .group_by(vec![col("species")]) diff --git a/docs/src/rust/user-guide/concepts/lazy-vs-eager.rs b/docs/src/rust/user-guide/concepts/lazy-vs-eager.rs index 54b16b5d894c..12cac8afab26 100644 --- a/docs/src/rust/user-guide/concepts/lazy-vs-eager.rs +++ b/docs/src/rust/user-guide/concepts/lazy-vs-eager.rs @@ -2,7 +2,8 @@ use polars::prelude::*; fn main() -> Result<(), Box> { // --8<-- [start:eager] - let df = CsvReader::from_path("docs/data/iris.csv") + let df = CsvReadOptions::default() + .try_into_reader_with_file_path(Some("docs/data/iris.csv".into())) .unwrap() .finish() .unwrap(); @@ -18,7 +19,7 @@ fn main() -> Result<(), Box> { // --8<-- [start:lazy] let q = LazyCsvReader::new("docs/data/iris.csv") - .has_header(true) + .with_has_header(true) .finish()? .filter(col("sepal_length").gt(lit(5))) .group_by(vec![col("species")]) diff --git a/docs/src/rust/user-guide/concepts/streaming.rs b/docs/src/rust/user-guide/concepts/streaming.rs index 700458fb635b..9c9ddec631cf 100644 --- a/docs/src/rust/user-guide/concepts/streaming.rs +++ b/docs/src/rust/user-guide/concepts/streaming.rs @@ -3,7 +3,7 @@ use polars::prelude::*; fn main() -> Result<(), Box> { // --8<-- [start:streaming] let q1 = LazyCsvReader::new("docs/data/iris.csv") - .has_header(true) + .with_has_header(true) .finish()? .filter(col("sepal_length").gt(lit(5))) .group_by(vec![col("species")]) diff --git a/docs/src/rust/user-guide/expressions/aggregation.rs b/docs/src/rust/user-guide/expressions/aggregation.rs index a0b6f7bf029d..fe5e13a38940 100644 --- a/docs/src/rust/user-guide/expressions/aggregation.rs +++ b/docs/src/rust/user-guide/expressions/aggregation.rs @@ -33,10 +33,11 @@ fn main() -> Result<(), Box> { let data: Vec = Client::new().get(url).send()?.text()?.bytes().collect(); - let dataset = CsvReader::new(Cursor::new(data)) - .has_header(true) - .with_dtypes(Some(Arc::new(schema))) - .with_try_parse_dates(true) + let dataset = CsvReadOptions::default() + .with_has_header(true) + .with_schema(Some(Arc::new(schema))) + .map_parse_options(|parse_options| parse_options.with_try_parse_dates(true)) + .into_reader_with_file_handle(Cursor::new(data)) .finish()?; println!("{}", &dataset); diff --git a/docs/src/rust/user-guide/getting-started/reading-writing.rs b/docs/src/rust/user-guide/getting-started/reading-writing.rs index 9f6eaacd9dbc..8fde957c373f 100644 --- a/docs/src/rust/user-guide/getting-started/reading-writing.rs +++ b/docs/src/rust/user-guide/getting-started/reading-writing.rs @@ -25,9 +25,10 @@ fn main() -> Result<(), Box> { .include_header(true) .with_separator(b',') .finish(&mut df)?; - let df_csv = CsvReader::from_path("docs/data/output.csv")? - .infer_schema(None) - .has_header(true) + let df_csv = CsvReadOptions::default() + .with_infer_schema_length(None) + .with_has_header(true) + .try_into_reader_with_file_path(Some("docs/data/output.csv".into()))? .finish()?; println!("{}", df_csv); // --8<-- [end:csv] @@ -38,11 +39,13 @@ fn main() -> Result<(), Box> { .include_header(true) .with_separator(b',') .finish(&mut df)?; - let df_csv = CsvReader::from_path("docs/data/output.csv")? - .infer_schema(None) - .has_header(true) - .with_try_parse_dates(true) + let df_csv = CsvReadOptions::default() + .with_infer_schema_length(None) + .with_has_header(true) + .map_parse_options(|parse_options| parse_options.with_try_parse_dates(true)) + .try_into_reader_with_file_path(Some("docs/data/output.csv".into()))? .finish()?; + println!("{}", df_csv); // --8<-- [end:csv2] diff --git a/docs/src/rust/user-guide/io/csv.rs b/docs/src/rust/user-guide/io/csv.rs index 5827913977c7..dc8b556a7faa 100644 --- a/docs/src/rust/user-guide/io/csv.rs +++ b/docs/src/rust/user-guide/io/csv.rs @@ -4,7 +4,8 @@ fn main() -> Result<(), Box> { // --8<-- [start:read] use polars::prelude::*; - let df = CsvReader::from_path("docs/data/path.csv") + let df = CsvReadOptions::default() + .try_into_reader_with_file_path(Some("docs/data/path.csv".into())) .unwrap() .finish() .unwrap(); diff --git a/docs/src/rust/user-guide/transformations/time-series/filter.rs b/docs/src/rust/user-guide/transformations/time-series/filter.rs index 06ce39eb0c5f..14eab6d4f95a 100644 --- a/docs/src/rust/user-guide/transformations/time-series/filter.rs +++ b/docs/src/rust/user-guide/transformations/time-series/filter.rs @@ -6,9 +6,10 @@ use polars::prelude::*; fn main() -> Result<(), Box> { // --8<-- [start:df] - let df = CsvReader::from_path("docs/data/apple_stock.csv") + let df = CsvReadOptions::default() + .map_parse_options(|parse_options| parse_options.with_try_parse_dates(true)) + .try_into_reader_with_file_path(Some("docs/data/apple_stock.csv".into())) .unwrap() - .with_try_parse_dates(true) .finish() .unwrap(); println!("{}", &df); diff --git a/docs/src/rust/user-guide/transformations/time-series/parsing.rs b/docs/src/rust/user-guide/transformations/time-series/parsing.rs index 3462943d15af..a58b5cf2850e 100644 --- a/docs/src/rust/user-guide/transformations/time-series/parsing.rs +++ b/docs/src/rust/user-guide/transformations/time-series/parsing.rs @@ -5,18 +5,20 @@ use polars::prelude::*; fn main() -> Result<(), Box> { // --8<-- [start:df] - let df = CsvReader::from_path("docs/data/apple_stock.csv") + let df = CsvReadOptions::default() + .map_parse_options(|parse_options| parse_options.with_try_parse_dates(true)) + .try_into_reader_with_file_path(Some("docs/data/apple_stock.csv".into())) .unwrap() - .with_try_parse_dates(true) .finish() .unwrap(); println!("{}", &df); // --8<-- [end:df] // --8<-- [start:cast] - let df = CsvReader::from_path("docs/data/apple_stock.csv") + let df = CsvReadOptions::default() + .map_parse_options(|parse_options| parse_options.with_try_parse_dates(false)) + .try_into_reader_with_file_path(Some("docs/data/apple_stock.csv".into())) .unwrap() - .with_try_parse_dates(false) .finish() .unwrap(); let df = df diff --git a/docs/src/rust/user-guide/transformations/time-series/rolling.rs b/docs/src/rust/user-guide/transformations/time-series/rolling.rs index f8849ddabe41..559bf0bc2fed 100644 --- a/docs/src/rust/user-guide/transformations/time-series/rolling.rs +++ b/docs/src/rust/user-guide/transformations/time-series/rolling.rs @@ -6,9 +6,10 @@ use polars::prelude::*; fn main() -> Result<(), Box> { // --8<-- [start:df] - let df = CsvReader::from_path("docs/data/apple_stock.csv") + let df = CsvReadOptions::default() + .map_parse_options(|parse_options| parse_options.with_try_parse_dates(true)) + .try_into_reader_with_file_path(Some("docs/data/apple_stock.csv".into())) .unwrap() - .with_try_parse_dates(true) .finish() .unwrap() .sort( From a2492d23087f5fff8fe88ef6117458d7db0f086b Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Fri, 10 May 2024 14:54:42 +1000 Subject: [PATCH 10/13] wip [skip ci] --- crates/polars-io/src/csv/read/mod.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/crates/polars-io/src/csv/read/mod.rs b/crates/polars-io/src/csv/read/mod.rs index 91c95fc70143..7dba71f6546d 100644 --- a/crates/polars-io/src/csv/read/mod.rs +++ b/crates/polars-io/src/csv/read/mod.rs @@ -1,8 +1,5 @@ //! Functionality for reading CSV files. //! -//! Note: currently, `CsvReader::new` has an extra copy. If you want optimal performance, -//! it is advised to use [`CsvReader::from_path`] instead. -//! //! # Examples //! //! ``` @@ -12,8 +9,9 @@ //! //! fn example() -> PolarsResult { //! // Prefer `from_path` over `new` as it is faster. -//! CsvReader::from_path("example.csv")? +//! CsvReadOptions::default() //! .with_has_header(true) +//! .try_into_reader_with_file_path(Some("example.csv".into()))? //! .finish() //! } //! ``` From 4644ba934fa6654388058ca316aa1c1803efaf5c Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Fri, 10 May 2024 15:08:51 +1000 Subject: [PATCH 11/13] fix rust doc --- crates/polars-io/src/csv/read/reader.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/polars-io/src/csv/read/reader.rs b/crates/polars-io/src/csv/read/reader.rs index b2bddd4817aa..3786bfc87d2c 100644 --- a/crates/polars-io/src/csv/read/reader.rs +++ b/crates/polars-io/src/csv/read/reader.rs @@ -33,7 +33,7 @@ use crate::utils::{get_reader_bytes, resolve_homedir}; /// fn example() -> PolarsResult { /// CsvReadOptions::default() /// .with_has_header(true) -/// .try_into_reader_with_path("iris.csv")? +/// .try_into_reader_with_file_path(Some("iris.csv".into()))? /// .finish() /// } /// ``` From aefdb4fa52ca4b03a5b7f48b70aba4a490cae833 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Fri, 10 May 2024 15:21:49 +1000 Subject: [PATCH 12/13] wip [skip ci] --- .../src/csv/read/read_impl/batched_read.rs | 29 +++++-------------- 1 file changed, 8 insertions(+), 21 deletions(-) diff --git a/crates/polars-io/src/csv/read/read_impl/batched_read.rs b/crates/polars-io/src/csv/read/read_impl/batched_read.rs index db87593750f9..7c36ba7b2c2d 100644 --- a/crates/polars-io/src/csv/read/read_impl/batched_read.rs +++ b/crates/polars-io/src/csv/read/read_impl/batched_read.rs @@ -391,8 +391,7 @@ pub struct OwnedBatchedCsvReader { #[allow(dead_code)] // this exist because we need to keep ownership schema: SchemaRef, - reader: *mut CsvReader>, - batched_reader: *mut BatchedCsvReaderRead<'static>, + batched_reader: BatchedCsvReaderRead<'static>, } unsafe impl Send for OwnedBatchedCsvReader {} @@ -400,32 +399,20 @@ unsafe impl Sync for OwnedBatchedCsvReader {} impl OwnedBatchedCsvReader { pub fn next_batches(&mut self, n: usize) -> PolarsResult>> { - let reader = unsafe { &mut *self.batched_reader }; - reader.next_batches(n) + self.batched_reader.next_batches(n) } } -impl Drop for OwnedBatchedCsvReader { - fn drop(&mut self) { - // release heap allocated - unsafe { - let _to_drop = Box::from_raw(self.batched_reader); - let _to_drop = Box::from_raw(self.reader); - }; - } -} - -pub fn to_batched_owned_read(reader: CsvReader>) -> OwnedBatchedCsvReader { +pub fn to_batched_owned_read( + mut reader: CsvReader>, +) -> OwnedBatchedCsvReader { let schema = reader.get_schema().unwrap(); - - let reader = Box::new(reader); - let reader = Box::leak(reader) as *mut CsvReader>; - let batched_reader = unsafe { Box::new((*reader).batched_borrowed_read().unwrap()) }; - let batched_reader = Box::leak(batched_reader) as *mut BatchedCsvReaderRead; + let batched_reader = reader.batched_borrowed_read().unwrap(); + let batched_reader: BatchedCsvReaderRead<'static> = + unsafe { std::mem::transmute(batched_reader) }; OwnedBatchedCsvReader { schema, - reader, batched_reader, } } From b6b00c363e42bf92834e81cb8b22a1a40ea8be06 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Fri, 10 May 2024 15:29:15 +1000 Subject: [PATCH 13/13] dont box leak --- .../src/csv/read/read_impl/batched_mmap.rs | 36 +++++++------------ .../src/csv/read/read_impl/batched_read.rs | 9 +++-- 2 files changed, 18 insertions(+), 27 deletions(-) diff --git a/crates/polars-io/src/csv/read/read_impl/batched_mmap.rs b/crates/polars-io/src/csv/read/read_impl/batched_mmap.rs index 637c68a9e349..cb8ce04947d8 100644 --- a/crates/polars-io/src/csv/read/read_impl/batched_mmap.rs +++ b/crates/polars-io/src/csv/read/read_impl/batched_mmap.rs @@ -285,43 +285,31 @@ pub struct OwnedBatchedCsvReaderMmap { #[allow(dead_code)] // this exist because we need to keep ownership schema: SchemaRef, - reader: *mut CsvReader>, - batched_reader: *mut BatchedCsvReaderMmap<'static>, + batched_reader: BatchedCsvReaderMmap<'static>, + // keep ownership + _reader: CsvReader>, } -unsafe impl Send for OwnedBatchedCsvReaderMmap {} -unsafe impl Sync for OwnedBatchedCsvReaderMmap {} - impl OwnedBatchedCsvReaderMmap { pub fn next_batches(&mut self, n: usize) -> PolarsResult>> { - let reader = unsafe { &mut *self.batched_reader }; - reader.next_batches(n) - } -} - -impl Drop for OwnedBatchedCsvReaderMmap { - fn drop(&mut self) { - // release heap allocated - unsafe { - let _to_drop = Box::from_raw(self.batched_reader); - let _to_drop = Box::from_raw(self.reader); - }; + self.batched_reader.next_batches(n) } } pub fn to_batched_owned_mmap( - reader: CsvReader>, + mut reader: CsvReader>, ) -> OwnedBatchedCsvReaderMmap { let schema = reader.get_schema().unwrap(); - - let reader = Box::new(reader); - let reader = Box::leak(reader) as *mut CsvReader>; - let batched_reader = unsafe { Box::new((*reader).batched_borrowed_mmap().unwrap()) }; - let batched_reader = Box::leak(batched_reader) as *mut BatchedCsvReaderMmap; + let batched_reader = reader.batched_borrowed_mmap().unwrap(); + // If you put a drop(reader) here, rust will complain that reader is borrowed, + // so we presumably have to keep ownership of it to maintain the safety of the + // 'static transmute. + let batched_reader: BatchedCsvReaderMmap<'static> = + unsafe { std::mem::transmute(batched_reader) }; OwnedBatchedCsvReaderMmap { schema, - reader, batched_reader, + _reader: reader, } } diff --git a/crates/polars-io/src/csv/read/read_impl/batched_read.rs b/crates/polars-io/src/csv/read/read_impl/batched_read.rs index 7c36ba7b2c2d..b42be05f14b4 100644 --- a/crates/polars-io/src/csv/read/read_impl/batched_read.rs +++ b/crates/polars-io/src/csv/read/read_impl/batched_read.rs @@ -392,11 +392,10 @@ pub struct OwnedBatchedCsvReader { // this exist because we need to keep ownership schema: SchemaRef, batched_reader: BatchedCsvReaderRead<'static>, + // keep ownership + _reader: CsvReader>, } -unsafe impl Send for OwnedBatchedCsvReader {} -unsafe impl Sync for OwnedBatchedCsvReader {} - impl OwnedBatchedCsvReader { pub fn next_batches(&mut self, n: usize) -> PolarsResult>> { self.batched_reader.next_batches(n) @@ -408,11 +407,15 @@ pub fn to_batched_owned_read( ) -> OwnedBatchedCsvReader { let schema = reader.get_schema().unwrap(); let batched_reader = reader.batched_borrowed_read().unwrap(); + // If you put a drop(reader) here, rust will complain that reader is borrowed, + // so we presumably have to keep ownership of it to maintain the safety of the + // 'static transmute. let batched_reader: BatchedCsvReaderRead<'static> = unsafe { std::mem::transmute(batched_reader) }; OwnedBatchedCsvReader { schema, batched_reader, + _reader: reader, } }