Skip to content

Commit

Permalink
feat(python): Allow construction of Series from memory buffers (#13323
Browse files Browse the repository at this point in the history
)
  • Loading branch information
stinodego authored Dec 31, 2023
1 parent 2341372 commit 09820de
Show file tree
Hide file tree
Showing 7 changed files with 374 additions and 34 deletions.
3 changes: 0 additions & 3 deletions py-polars/polars/datatypes/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@
Datetime("ms"),
Datetime("us"),
Datetime("ns"),
Datetime("ms", "*"),
Datetime("us", "*"),
Datetime("ns", "*"),
]
)
DURATION_DTYPES: frozenset[PolarsDataType] = DataTypeGroup(
Expand Down
46 changes: 41 additions & 5 deletions py-polars/polars/series/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
from polars import DataFrame, DataType, Expr
from polars.series._numpy import SeriesView
from polars.type_aliases import (
BufferInfo,
ClosedInterval,
ComparisonOperator,
FillNullStrategy,
Expand Down Expand Up @@ -361,7 +362,7 @@ def _from_pandas(
pandas_to_pyseries(name, values, nan_to_null=nan_to_null)
)

def _get_buffer_info(self) -> tuple[int, int, int]:
def _get_buffer_info(self) -> BufferInfo:
"""
Return pointer, offset, and length information about the underlying buffer.
Expand Down Expand Up @@ -419,7 +420,7 @@ def _get_buffer(self, index: Literal[0, 1, 2]) -> Self | None:

@classmethod
def _from_buffer(
self, dtype: PolarsDataType, buffer_info: tuple[int, int, int], base: Any
self, dtype: PolarsDataType, buffer_info: BufferInfo, owner: Any
) -> Self:
"""
Construct a Series from information about its underlying buffer.
Expand All @@ -429,15 +430,50 @@ def _from_buffer(
dtype
The data type of the buffer.
buffer_info
Tuple containing buffer information in the form (pointer, offset, length).
base
Tuple containing buffer information in the form `(pointer, offset, length)`.
owner
The object owning the buffer.
Returns
-------
Series
"""
return self._from_pyseries(PySeries._from_buffer(dtype, buffer_info, base))
return self._from_pyseries(PySeries._from_buffer(dtype, buffer_info, owner))

@classmethod
def _from_buffers(
self,
dtype: PolarsDataType,
data: Series | Sequence[Series],
validity: Series | None = None,
) -> Self:
"""
Construct a Series from information about its underlying buffers.
Parameters
----------
dtype
The data type of the resulting Series.
data
Buffers describing the data. For most data types, this is a single Series of
the physical data type of `dtype`. Some data types require multiple buffers:
- `String`: A data buffer of type `UInt8` and an offsets buffer
of type `Int64`.
validity
Validity buffer. If specified, must be a Series of data type `Boolean`.
Returns
-------
Series
"""
if isinstance(data, Series):
data = [data._s]
else:
data = [s._s for s in data]
if validity is not None:
validity = validity._s
return self._from_pyseries(PySeries._from_buffers(dtype, data, validity))

@property
def dtype(self) -> DataType:
Expand Down
2 changes: 2 additions & 0 deletions py-polars/polars/type_aliases.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@
PolarsType = TypeVar("PolarsType", "DataFrame", "LazyFrame", "Series", "Expr")
FrameType = TypeVar("FrameType", "DataFrame", "LazyFrame")

BufferInfo: TypeAlias = Tuple[int, int, int]


# minimal protocol definitions that can reasonably represent
# an executable connection, cursor, or equivalent object
Expand Down
170 changes: 149 additions & 21 deletions py-polars/src/series/buffers.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use polars::export::arrow;
use polars::export::arrow::array::Array;
use polars::export::arrow::array::{Array, BooleanArray, PrimitiveArray, Utf8Array};
use polars::export::arrow::bitmap::Bitmap;
use polars::export::arrow::buffer::Buffer;
use polars::export::arrow::offset::OffsetsBuffer;
use polars::export::arrow::types::NativeType;
use polars_core::export::arrow::array::PrimitiveArray;
use polars_rs::export::arrow::offset::OffsetsBuffer;
use pyo3::exceptions::{PyTypeError, PyValueError};

use super::*;
Expand Down Expand Up @@ -51,7 +52,7 @@ impl PySeries {
length: len,
})
},
dt if dt.is_numeric() => Ok(with_match_physical_numeric_polars_type!(s.dtype(), |$T| {
dt if dt.is_numeric() => Ok(with_match_physical_numeric_polars_type!(dt, |$T| {
let ca: &ChunkedArray<$T> = s.as_ref().as_ref().as_ref();
BufferInfo { pointer: get_pointer(ca), offset: 0, length: ca.len() }
})),
Expand Down Expand Up @@ -211,29 +212,24 @@ impl PySeries {
py: Python,
dtype: Wrap<DataType>,
buffer_info: BufferInfo,
base: &PyAny,
owner: &PyAny,
) -> PyResult<Self> {
let dtype = dtype.0;
let BufferInfo {
pointer,
offset,
length,
} = buffer_info;
let base = base.to_object(py);
let owner = owner.to_object(py);

let arr_boxed = match dtype {
DataType::Int8 => unsafe { from_buffer_impl::<i8>(pointer, length, base) },
DataType::Int16 => unsafe { from_buffer_impl::<i16>(pointer, length, base) },
DataType::Int32 => unsafe { from_buffer_impl::<i32>(pointer, length, base) },
DataType::Int64 => unsafe { from_buffer_impl::<i64>(pointer, length, base) },
DataType::UInt8 => unsafe { from_buffer_impl::<u8>(pointer, length, base) },
DataType::UInt16 => unsafe { from_buffer_impl::<u16>(pointer, length, base) },
DataType::UInt32 => unsafe { from_buffer_impl::<u32>(pointer, length, base) },
DataType::UInt64 => unsafe { from_buffer_impl::<u64>(pointer, length, base) },
DataType::Float32 => unsafe { from_buffer_impl::<f32>(pointer, length, base) },
DataType::Float64 => unsafe { from_buffer_impl::<f64>(pointer, length, base) },
dt if dt.is_numeric() => {
with_match_physical_numeric_type!(dt, |$T| unsafe {
from_buffer_impl::<$T>(pointer, length, owner)
})
},
DataType::Boolean => {
unsafe { from_buffer_boolean_impl(pointer, offset, length, base) }?
unsafe { from_buffer_boolean_impl(pointer, offset, length, owner) }?
},
dt => {
return Err(PyTypeError::new_err(format!(
Expand All @@ -250,24 +246,24 @@ impl PySeries {
unsafe fn from_buffer_impl<T: NativeType>(
pointer: usize,
length: usize,
base: Py<PyAny>,
owner: Py<PyAny>,
) -> Box<dyn Array> {
let pointer = pointer as *const T;
let slice = unsafe { std::slice::from_raw_parts(pointer, length) };
let arr = unsafe { arrow::ffi::mmap::slice_and_owner(slice, base) };
let arr = unsafe { arrow::ffi::mmap::slice_and_owner(slice, owner) };
arr.to_boxed()
}
unsafe fn from_buffer_boolean_impl(
pointer: usize,
offset: usize,
length: usize,
base: Py<PyAny>,
owner: Py<PyAny>,
) -> PyResult<Box<dyn Array>> {
let length_in_bytes = get_boolean_buffer_length_in_bytes(length, offset);

let pointer = pointer as *const u8;
let slice = unsafe { std::slice::from_raw_parts(pointer, length_in_bytes) };
let arr_result = unsafe { arrow::ffi::mmap::bitmap_and_owner(slice, offset, length, base) };
let arr_result = unsafe { arrow::ffi::mmap::bitmap_and_owner(slice, offset, length, owner) };
let arr = arr_result.map_err(PyPolarsErr::from)?;
Ok(arr.to_boxed())
}
Expand All @@ -281,3 +277,135 @@ fn get_boolean_buffer_length_in_bytes(length: usize, offset: usize) -> usize {
n_bytes + 1
}
}

#[pymethods]
impl PySeries {
/// Construct a PySeries from information about its underlying buffers.
#[staticmethod]
unsafe fn _from_buffers(
dtype: Wrap<DataType>,
data: Vec<PySeries>,
validity: Option<PySeries>,
) -> PyResult<Self> {
let dtype = dtype.0;
let mut data = data.to_series();

match data.len() {
0 => {
return Err(PyTypeError::new_err(
"`data` input to `from_buffers` must contain at least one buffer",
));
},
1 if validity.is_none() => {
let values = data.pop().unwrap();
let s = values.strict_cast(&dtype).map_err(PyPolarsErr::from)?;
return Ok(s.into());
},
_ => (),
}

let validity = match validity {
Some(s) => {
let dtype = s.series.dtype();
if !dtype.is_bool() {
return Err(PyTypeError::new_err(format!(
"validity buffer must have data type Boolean, got {:?}",
dtype
)));
}
Some(series_to_bitmap(s.series).unwrap())
},
None => None,
};

let s = match dtype.to_physical() {
dt if dt.is_numeric() => {
let values = data.into_iter().next().unwrap();
with_match_physical_numeric_polars_type!(dt, |$T| {
let values_buffer = series_to_buffer::<$T>(values);
from_buffers_num_impl::<<$T as PolarsNumericType>::Native>(values_buffer, validity)?
})
},
DataType::Boolean => {
let values = data.into_iter().next().unwrap();
let values_buffer = series_to_bitmap(values)?;
from_buffers_bool_impl(values_buffer, validity)?
},
DataType::String => {
let mut data_iter = data.into_iter();
let values = data_iter.next().unwrap();
let offsets = match data_iter.next() {
Some(s) => {
let dtype = s.dtype();
if !matches!(dtype, DataType::Int64) {
return Err(PyTypeError::new_err(format!(
"offsets buffer must have data type Int64, got {:?}",
dtype
)));
}
series_to_offsets(s)
},
None => return Err(PyTypeError::new_err(
"`from_buffers` cannot create a String column without an offsets buffer",
)),
};
let values = series_to_buffer::<UInt8Type>(values);
from_buffers_string_impl(values, validity, offsets)?
},
dt => {
return Err(PyTypeError::new_err(format!(
"`from_buffers` not implemented for `dtype` {dt}",
)))
},
};

let out = s.strict_cast(&dtype).map_err(PyPolarsErr::from)?;
Ok(out.into())
}
}

fn series_to_buffer<T>(s: Series) -> Buffer<T::Native>
where
T: PolarsNumericType,
{
let ca: &ChunkedArray<T> = s.as_ref().as_ref();
let arr = ca.downcast_iter().next().unwrap();
arr.values().clone()
}
fn series_to_bitmap(s: Series) -> PyResult<Bitmap> {
let ca_result = s.bool();
let ca = ca_result.map_err(PyPolarsErr::from)?;
let arr = ca.downcast_iter().next().unwrap();
let bitmap = arr.values().clone();
Ok(bitmap)
}
fn series_to_offsets(s: Series) -> OffsetsBuffer<i64> {
let buffer = series_to_buffer::<Int64Type>(s);
unsafe { OffsetsBuffer::new_unchecked(buffer) }
}

fn from_buffers_num_impl<T: NativeType>(
data: Buffer<T>,
validity: Option<Bitmap>,
) -> PyResult<Series> {
let arr = PrimitiveArray::new(T::PRIMITIVE.into(), data, validity);
let s_result = Series::from_arrow("", arr.to_boxed());
let s = s_result.map_err(PyPolarsErr::from)?;
Ok(s)
}
fn from_buffers_bool_impl(data: Bitmap, validity: Option<Bitmap>) -> PyResult<Series> {
let arr = BooleanArray::new(ArrowDataType::Boolean, data, validity);
let s_result = Series::from_arrow("", arr.to_boxed());
let s = s_result.map_err(PyPolarsErr::from)?;
Ok(s)
}
fn from_buffers_string_impl(
data: Buffer<u8>,
validity: Option<Bitmap>,
offsets: OffsetsBuffer<i64>,
) -> PyResult<Series> {
let arr = Utf8Array::new(ArrowDataType::LargeUtf8, offsets, data, validity);
let s_result = Series::from_arrow("", arr.to_boxed());
let s = s_result.map_err(PyPolarsErr::from)?;
Ok(s)
}
2 changes: 1 addition & 1 deletion py-polars/src/series/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::io::Cursor;

use polars_core::series::IsSorted;
use polars_core::utils::flatten::flatten_series;
use polars_core::with_match_physical_numeric_polars_type;
use polars_core::{with_match_physical_numeric_polars_type, with_match_physical_numeric_type};
use pyo3::exceptions::{PyIndexError, PyRuntimeError, PyValueError};
use pyo3::prelude::*;
use pyo3::types::PyBytes;
Expand Down
8 changes: 4 additions & 4 deletions py-polars/tests/unit/series/buffers/test_from_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,21 @@
)
def test_series_from_buffer(s: pl.Series) -> None:
buffer_info = s._get_buffer_info()
result = pl.Series._from_buffer(s.dtype, buffer_info, base=s)
result = pl.Series._from_buffer(s.dtype, buffer_info, owner=s)
assert_series_equal(s, result)


def test_series_from_buffer_numeric() -> None:
s = pl.Series([1, 2, 3], dtype=pl.UInt16)
buffer_info = s._get_buffer_info()
result = pl.Series._from_buffer(s.dtype, buffer_info, base=s)
result = pl.Series._from_buffer(s.dtype, buffer_info, owner=s)
assert_series_equal(s, result)


def test_series_from_buffer_sliced_bitmask() -> None:
s = pl.Series([True] * 9, dtype=pl.Boolean)[5:]
buffer_info = s._get_buffer_info()
result = pl.Series._from_buffer(s.dtype, buffer_info, base=s)
result = pl.Series._from_buffer(s.dtype, buffer_info, owner=s)
assert_series_equal(s, result)


Expand All @@ -44,4 +44,4 @@ def test_series_from_buffer_unsupported() -> None:
TypeError,
match="`from_buffer` requires a physical type as input for `dtype`, got date",
):
pl.Series._from_buffer(pl.Date, buffer_info, base=s)
pl.Series._from_buffer(pl.Date, buffer_info, owner=s)
Loading

0 comments on commit 09820de

Please sign in to comment.