Skip to content
This repository has been archived by the owner on Jun 21, 2022. It is now read-only.

Commit

Permalink
Merge pull request #132 from scikit-hep/issue-arrowbuffers-toobig
Browse files Browse the repository at this point in the history
Issue arrowbuffers toobig
  • Loading branch information
jpivarski authored May 23, 2019
2 parents 627c5c7 + baab846 commit 5f40a74
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 36 deletions.
3 changes: 0 additions & 3 deletions awkward/array/masked.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,6 @@ def lsborder(self, value):
def _valid(self):
if self.check_whole_valid:
if not self._isvalid:
if len(self._mask) != self._ceildiv8(len(self._content)):
raise ValueError("mask length ({0}) must be equal to ceil(content length / 8) ({1})".format(len(self._mask), self._ceildiv8(len(self._content))))

self._isvalid = True

def __iter__(self, checkiter=True):
Expand Down
72 changes: 40 additions & 32 deletions awkward/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,114 +218,122 @@ def fromarrow(obj, awkwardlib=None):
ARROW_TAGTYPE = awkwardlib.numpy.uint8
ARROW_CHARTYPE = awkwardlib.numpy.uint8

def popbuffers(tpe, buffers):
def popbuffers(tpe, buffers, length):
if isinstance(tpe, pyarrow.lib.DictionaryType):
index = popbuffers(tpe.index_type, buffers, length)
content = fromarrow(tpe.dictionary)
index = popbuffers(tpe.index_type, buffers)
if isinstance(index, awkwardlib.BitMaskedArray):
return awkwardlib.BitMaskedArray(index.mask, awkwardlib.IndexedArray(index.content, content), maskedwhen=index.maskedwhen, lsborder=index.lsborder)
else:
return awkwardlib.IndexedArray(index, content)

elif isinstance(tpe, pyarrow.lib.StructType):
mask = buffers.pop(0)
pairs = []
for i in range(tpe.num_children - 1, -1, -1):
pairs.insert(0, (tpe[i].name, popbuffers(tpe[i].type, buffers)))
for i in range(tpe.num_children):
pairs.append((tpe[i].name, popbuffers(tpe[i].type, buffers, length)))
out = awkwardlib.Table.frompairs(pairs, 0) # FIXME: better rowstart
mask = buffers.pop()
if mask is not None:
mask = awkwardlib.numpy.frombuffer(mask, dtype=ARROW_BITMASKTYPE)
return awkwardlib.BitMaskedArray(mask, out, maskedwhen=False, lsborder=True)
else:
return out

elif isinstance(tpe, pyarrow.lib.ListType):
content = popbuffers(tpe.value_type, buffers)
offsets = awkwardlib.numpy.frombuffer(buffers.pop(), dtype=ARROW_INDEXTYPE)
out = awkwardlib.JaggedArray.fromoffsets(offsets, content[:offsets[-1]])
mask = buffers.pop()
mask = buffers.pop(0)
offsets = awkwardlib.numpy.frombuffer(buffers.pop(0), dtype=ARROW_INDEXTYPE)[:length + 1]
content = popbuffers(tpe.value_type, buffers, offsets[-1])
out = awkwardlib.JaggedArray.fromoffsets(offsets, content)
if mask is not None:
mask = awkwardlib.numpy.frombuffer(mask, dtype=ARROW_BITMASKTYPE)
return awkwardlib.BitMaskedArray(mask, out, maskedwhen=False, lsborder=True)
else:
return out

elif isinstance(tpe, pyarrow.lib.UnionType) and tpe.mode == "sparse":
contents = []
for i in range(tpe.num_children - 1, -1, -1):
contents.insert(0, popbuffers(tpe[i].type, buffers))
assert buffers.pop() is None
tags = awkwardlib.numpy.frombuffer(buffers.pop(), dtype=ARROW_TAGTYPE)
mask = buffers.pop(0)
tags = awkwardlib.numpy.frombuffer(buffers.pop(0), dtype=ARROW_TAGTYPE)[:length]
assert buffers.pop(0) is None
index = awkwardlib.numpy.arange(len(tags), dtype=ARROW_INDEXTYPE)
contents = []
for i in range(tpe.num_children):
try:
sublength = index[tags == i][-1] + 1
except IndexError:
sublength = 0
contents.append(popbuffers(tpe[i].type, buffers, sublength))
for i in range(len(contents)):
these = index[tags == i]
if len(these) == 0:
contents[i] = contents[i][0:0]
else:
contents[i] = contents[i][: these[-1] + 1]
out = awkwardlib.UnionArray(tags, index, contents)
mask = buffers.pop()
if mask is not None:
mask = awkwardlib.numpy.frombuffer(mask, dtype=ARROW_BITMASKTYPE)
return awkwardlib.BitMaskedArray(mask, out, maskedwhen=False, lsborder=True)
else:
return out

elif isinstance(tpe, pyarrow.lib.UnionType) and tpe.mode == "dense":
mask = buffers.pop(0)
tags = awkwardlib.numpy.frombuffer(buffers.pop(0), dtype=ARROW_TAGTYPE)[:length]
index = awkwardlib.numpy.frombuffer(buffers.pop(0), dtype=ARROW_INDEXTYPE)[:length]
contents = []
for i in range(tpe.num_children - 1, -1, -1):
contents.insert(0, popbuffers(tpe[i].type, buffers))
index = awkwardlib.numpy.frombuffer(buffers.pop(), dtype=ARROW_INDEXTYPE)
tags = awkwardlib.numpy.frombuffer(buffers.pop(), dtype=ARROW_TAGTYPE)
for i in range(tpe.num_children):
try:
sublength = index[tags == i].max() + 1
except ValueError:
sublength = 0
contents.append(popbuffers(tpe[i].type, buffers, sublength))
for i in range(len(contents)):
these = index[tags == i]
if len(these) == 0:
contents[i] = contents[i][0:0]
else:
contents[i] = contents[i][: these.max() + 1]
out = awkwardlib.UnionArray(tags, index, contents)
mask = buffers.pop()
if mask is not None:
mask = awkwardlib.numpy.frombuffer(mask, dtype=ARROW_BITMASKTYPE)
return awkwardlib.BitMaskedArray(mask, out, maskedwhen=False, lsborder=True)
else:
return out

elif tpe == pyarrow.string():
content = awkwardlib.numpy.frombuffer(buffers.pop(), dtype=ARROW_CHARTYPE)
offsets = awkwardlib.numpy.frombuffer(buffers.pop(), dtype=ARROW_INDEXTYPE)
mask = buffers.pop(0)
offsets = awkwardlib.numpy.frombuffer(buffers.pop(0), dtype=ARROW_INDEXTYPE)[:length + 1]
content = awkwardlib.numpy.frombuffer(buffers.pop(0), dtype=ARROW_CHARTYPE)[:offsets[-1]]
out = awkwardlib.StringArray.fromoffsets(offsets, content[:offsets[-1]], encoding="utf-8")
mask = buffers.pop()
if mask is not None:
mask = awkwardlib.numpy.frombuffer(mask, dtype=ARROW_BITMASKTYPE)
return awkwardlib.BitMaskedArray(mask, out, maskedwhen=False, lsborder=True)
else:
return out

elif tpe == pyarrow.binary():
content = awkwardlib.numpy.frombuffer(buffers.pop(), dtype=ARROW_CHARTYPE)
offsets = awkwardlib.numpy.frombuffer(buffers.pop(), dtype=ARROW_INDEXTYPE)
mask = buffers.pop(0)
offsets = awkwardlib.numpy.frombuffer(buffers.pop(0), dtype=ARROW_INDEXTYPE)[:length + 1]
content = awkwardlib.numpy.frombuffer(buffers.pop(0), dtype=ARROW_CHARTYPE)[:offsets[-1]]
out = awkwardlib.StringArray.fromoffsets(offsets, content[:offsets[-1]], encoding=None)
mask = buffers.pop()
if mask is not None:
mask = awkwardlib.numpy.frombuffer(mask, dtype=ARROW_BITMASKTYPE)
return awkwardlib.BitMaskedArray(mask, out, maskedwhen=False, lsborder=True)
else:
return out

elif tpe == pyarrow.bool_():
out = awkwardlib.numpy.unpackbits(awkwardlib.numpy.frombuffer(buffers.pop(), dtype=ARROW_CHARTYPE)).view(awkwardlib.MaskedArray.BOOLTYPE)
out = out.reshape(-1, 8)[:,::-1].reshape(-1) # lsborder=True
mask = buffers.pop()
mask = buffers.pop(0)
out = awkwardlib.numpy.unpackbits(awkwardlib.numpy.frombuffer(buffers.pop(0), dtype=ARROW_CHARTYPE)).view(awkwardlib.MaskedArray.BOOLTYPE)
out = out.reshape(-1, 8)[:,::-1].reshape(-1)[:length] # lsborder=True
if mask is not None:
mask = awkwardlib.numpy.frombuffer(mask, dtype=ARROW_BITMASKTYPE)
return awkwardlib.BitMaskedArray(mask, out, maskedwhen=False, lsborder=True)
else:
return out

elif isinstance(tpe, pyarrow.lib.DataType):
out = awkwardlib.numpy.frombuffer(buffers.pop(), dtype=tpe.to_pandas_dtype())
mask = buffers.pop()
mask = buffers.pop(0)
out = awkwardlib.numpy.frombuffer(buffers.pop(0), dtype=tpe.to_pandas_dtype())[:length]
if mask is not None:
mask = awkwardlib.numpy.frombuffer(mask, dtype=ARROW_BITMASKTYPE)
return awkwardlib.BitMaskedArray(mask, out, maskedwhen=False, lsborder=True)
Expand All @@ -337,7 +345,7 @@ def popbuffers(tpe, buffers):

if isinstance(obj, pyarrow.lib.Array):
buffers = obj.buffers()
out = popbuffers(obj.type, buffers)[:len(obj)]
out = popbuffers(obj.type, buffers, len(obj))
assert len(buffers) == 0
return out

Expand Down
2 changes: 1 addition & 1 deletion awkward/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import re

__version__ = "0.10.1"
__version__ = "0.10.2"
version = __version__
version_info = tuple(re.split(r"[-\.]", __version__))

Expand Down
16 changes: 16 additions & 0 deletions tests/test_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,22 @@ def test_arrow_nonnullable_table(self):
table2 = table.add_column(1, pyarrow.column(pyarrow.field("y", y.type, False), numpy.array([1.1, 2.2, 3.3])))
assert awkward.arrow.fromarrow(table2).tolist() == [{"x": 1, "y": 1.1}, {"x": 2, "y": 2.2}, {"x": 3, "y": 3.3}]

def test_arrow_trailing_zero(self):
a = awkward.fromiter([[1.1, 2.2, 3.3], [], [4.4, 5.5], [6.6, 7.7, 8.8], [], [9.9]])
pa_table = awkward.toarrow(awkward.Table(a=a))

batches = pa_table.to_batches()
sink = pyarrow.BufferOutputStream()
writer = pyarrow.RecordBatchStreamWriter(sink, batches[0].schema)
writer.write_batch(batches[0])
writer.close()

buf = sink.getvalue()
reader = pyarrow.ipc.open_stream(buf)
for batch in reader:
b = awkward.fromarrow(batch)
assert a.tolist() == b["a"].tolist()

# def test_arrow_writeparquet(self):
# if pyarrow is None:
# pytest.skip("unable to import pyarrow")
Expand Down

0 comments on commit 5f40a74

Please sign in to comment.