diff --git a/hdfs/__init__.py b/hdfs/__init__.py index 8a7f099..88af8cc 100644 --- a/hdfs/__init__.py +++ b/hdfs/__init__.py @@ -9,7 +9,7 @@ import logging as lg -__version__ = '2.5.7' +__version__ = '2.5.8' __license__ = 'MIT' diff --git a/hdfs/ext/avro/__init__.py b/hdfs/ext/avro/__init__.py index d1e6df9..4589626 100644 --- a/hdfs/ext/avro/__init__.py +++ b/hdfs/ext/avro/__init__.py @@ -105,11 +105,9 @@ class _SeekableReader(object): """ - sync_size = 16 - def __init__(self, reader, size=None): self._reader = reader - self._size = size or self.sync_size + self._size = size or SYNC_SIZE self._buffer = None self._saught = False @@ -152,6 +150,8 @@ class AvroReader(object): :param parts: Part-files to read, when reading a distributed file. The default is to read all part-files in order. See :meth:`hdfs.client.Client.parts` for details. + :param reader_schema: Schema to read the data as. If specified, it must be + compatible with the writer's schema (the default). The contents of the file will be decoded in a streaming manner, as the data is transferred. This makes it possible to use on files of arbitrary size. As @@ -163,17 +163,18 @@ class AvroReader(object): .. code-block:: python with AvroReader(client, 'foo.avro') as reader: - schema = reader.schema # The remote file's Avro schema. + schema = reader.writer_schema # The remote file's Avro schema. content = reader.content # Content metadata (e.g. size). for record in reader: pass # and its records """ - def __init__(self, client, hdfs_path, parts=None): + def __init__(self, client, hdfs_path, parts=None, reader_schema=None): self.content = client.content(hdfs_path) #: Content summary of Avro file. self.metadata = None #: Avro header metadata. - self._schema = None + self.reader_schema = reader_schema #: Input reader schema. + self._writer_schema = None if self.content['directoryCount']: # This is a folder. self._paths = [ @@ -196,8 +197,11 @@ def _reader(): """Record generator over all part-files.""" for path in self._paths: with self._client.read(path) as bytes_reader: - reader = fastavro.reader(_SeekableReader(bytes_reader)) - if not self._schema: + reader = fastavro.reader( + _SeekableReader(bytes_reader), + reader_schema=self.reader_schema + ) + if not self._writer_schema: schema = reader.writer_schema _logger.debug('Read schema from %r.', path) yield (schema, reader.metadata) @@ -205,7 +209,7 @@ def _reader(): yield record self._records = _reader() - self._schema, self.metadata = next(self._records) + self._writer_schema, self.metadata = next(self._records) return self def __exit__(self, exc_type, exc_value, traceback): @@ -218,16 +222,19 @@ def __iter__(self): # pylint: disable=non-iterator-returned return self._records @property - def schema(self): + def writer_schema(self): """Get the underlying file's schema. The schema will only be available after entering the reader's corresponding `with` block. """ - if not self._schema: + if not self._writer_schema: raise HdfsError('Schema not yet inferred.') - return self._schema + return self._writer_schema + + # Legacy property, preserved for backwards-compatibility. + schema = writer_schema class AvroWriter(object): diff --git a/test/test_ext_avro.py b/test/test_ext_avro.py index 939b899..7553540 100644 --- a/test/test_ext_avro.py +++ b/test/test_ext_avro.py @@ -148,6 +148,27 @@ def test_read(self): with AvroReader(self.client, 'weather.avro') as reader: eq_(list(reader), self.records) + def test_read_with_same_schema(self): + self.client.upload('w.avro', osp.join(self.dpath, 'weather.avro')) + with AvroReader(self.client, 'w.avro', reader_schema=self.schema) as reader: + eq_(list(reader), self.records) + + def test_read_with_compatible_schema(self): + self.client.upload('w.avro', osp.join(self.dpath, 'weather.avro')) + schema = { + 'name': 'test.Weather', + 'type': 'record', + 'fields': [ + {'name': 'temp', 'type': 'int'}, + {'name': 'tag', 'type': 'string', 'default': ''}, + ], + } + with AvroReader(self.client, 'w.avro', reader_schema=schema) as reader: + eq_( + list(reader), + [{'temp': r['temp'], 'tag': ''} for r in self.records] + ) + class TestWriter(_AvroIntegrationTest): diff --git a/test/util.py b/test/util.py index beb0c7b..ed673a9 100644 --- a/test/util.py +++ b/test/util.py @@ -71,7 +71,7 @@ def setup(self): self.client.delete('', recursive=True) # Wrapped inside a `ConnectionError` block because this causes failures # when trying to reuse some streamed connections when they aren't fully - # read (even though it is closed explicitely, it acts differently than + # read (even though it is closed explicitly, it acts differently than # when all its content has been read), but only on HttpFS. A test which # needs this for example is `test_ext_avro.py:TestMain.test_schema`. # This seems related to this issue: