Skip to content

Commit

Permalink
Add Avro reader schema option
Browse files Browse the repository at this point in the history
  • Loading branch information
mtth committed Jul 3, 2019
1 parent c9e032a commit 5b40065
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 14 deletions.
2 changes: 1 addition & 1 deletion hdfs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import logging as lg


__version__ = '2.5.7'
__version__ = '2.5.8'
__license__ = 'MIT'


Expand Down
31 changes: 19 additions & 12 deletions hdfs/ext/avro/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,9 @@ class _SeekableReader(object):
"""

sync_size = 16

def __init__(self, reader, size=None):
self._reader = reader
self._size = size or self.sync_size
self._size = size or SYNC_SIZE
self._buffer = None
self._saught = False

Expand Down Expand Up @@ -152,6 +150,8 @@ class AvroReader(object):
:param parts: Part-files to read, when reading a distributed file. The
default is to read all part-files in order. See
:meth:`hdfs.client.Client.parts` for details.
:param reader_schema: Schema to read the data as. If specified, it must be
compatible with the writer's schema (the default).
The contents of the file will be decoded in a streaming manner, as the data
is transferred. This makes it possible to use on files of arbitrary size. As
Expand All @@ -163,17 +163,18 @@ class AvroReader(object):
.. code-block:: python
with AvroReader(client, 'foo.avro') as reader:
schema = reader.schema # The remote file's Avro schema.
schema = reader.writer_schema # The remote file's Avro schema.
content = reader.content # Content metadata (e.g. size).
for record in reader:
pass # and its records
"""

def __init__(self, client, hdfs_path, parts=None):
def __init__(self, client, hdfs_path, parts=None, reader_schema=None):
self.content = client.content(hdfs_path) #: Content summary of Avro file.
self.metadata = None #: Avro header metadata.
self._schema = None
self.reader_schema = reader_schema #: Input reader schema.
self._writer_schema = None
if self.content['directoryCount']:
# This is a folder.
self._paths = [
Expand All @@ -196,16 +197,19 @@ def _reader():
"""Record generator over all part-files."""
for path in self._paths:
with self._client.read(path) as bytes_reader:
reader = fastavro.reader(_SeekableReader(bytes_reader))
if not self._schema:
reader = fastavro.reader(
_SeekableReader(bytes_reader),
reader_schema=self.reader_schema
)
if not self._writer_schema:
schema = reader.writer_schema
_logger.debug('Read schema from %r.', path)
yield (schema, reader.metadata)
for record in reader:
yield record

self._records = _reader()
self._schema, self.metadata = next(self._records)
self._writer_schema, self.metadata = next(self._records)
return self

def __exit__(self, exc_type, exc_value, traceback):
Expand All @@ -218,16 +222,19 @@ def __iter__(self): # pylint: disable=non-iterator-returned
return self._records

@property
def schema(self):
def writer_schema(self):
"""Get the underlying file's schema.
The schema will only be available after entering the reader's corresponding
`with` block.
"""
if not self._schema:
if not self._writer_schema:
raise HdfsError('Schema not yet inferred.')
return self._schema
return self._writer_schema

# Legacy property, preserved for backwards-compatibility.
schema = writer_schema


class AvroWriter(object):
Expand Down
21 changes: 21 additions & 0 deletions test/test_ext_avro.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,27 @@ def test_read(self):
with AvroReader(self.client, 'weather.avro') as reader:
eq_(list(reader), self.records)

def test_read_with_same_schema(self):
self.client.upload('w.avro', osp.join(self.dpath, 'weather.avro'))
with AvroReader(self.client, 'w.avro', reader_schema=self.schema) as reader:
eq_(list(reader), self.records)

def test_read_with_compatible_schema(self):
self.client.upload('w.avro', osp.join(self.dpath, 'weather.avro'))
schema = {
'name': 'test.Weather',
'type': 'record',
'fields': [
{'name': 'temp', 'type': 'int'},
{'name': 'tag', 'type': 'string', 'default': ''},
],
}
with AvroReader(self.client, 'w.avro', reader_schema=schema) as reader:
eq_(
list(reader),
[{'temp': r['temp'], 'tag': ''} for r in self.records]
)


class TestWriter(_AvroIntegrationTest):

Expand Down
2 changes: 1 addition & 1 deletion test/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def setup(self):
self.client.delete('', recursive=True)
# Wrapped inside a `ConnectionError` block because this causes failures
# when trying to reuse some streamed connections when they aren't fully
# read (even though it is closed explicitely, it acts differently than
# read (even though it is closed explicitly, it acts differently than
# when all its content has been read), but only on HttpFS. A test which
# needs this for example is `test_ext_avro.py:TestMain.test_schema`.
# This seems related to this issue:
Expand Down

0 comments on commit 5b40065

Please sign in to comment.