From 95ba60e5f40e40afada377f868cf499de4c99f87 Mon Sep 17 00:00:00 2001 From: Cristian Ferretti <37232625+jcferretti@users.noreply.github.com> Date: Wed, 8 Dec 2021 20:59:18 -0500 Subject: [PATCH] Added tests for avro mappings in consume. Reworked somewhat the impl. (#1665) --- Integrations/python/deephaven/ConsumeKafka.py | 6 +- Integrations/python/deephaven/ProduceKafka.py | 2 +- .../python/deephaven/conversion_utils.py | 13 +-- Integrations/python/test/testConsumeKafka.py | 88 ++++++++++++++----- 4 files changed, 78 insertions(+), 31 deletions(-) diff --git a/Integrations/python/deephaven/ConsumeKafka.py b/Integrations/python/deephaven/ConsumeKafka.py index d0c47f37dd1..9137166266d 100644 --- a/Integrations/python/deephaven/ConsumeKafka.py +++ b/Integrations/python/deephaven/ConsumeKafka.py @@ -11,7 +11,7 @@ import deephaven.Types as dh from deephaven.conversion_utils import \ - _dictToFun, _dictToMap, _dictToProperties, IDENTITY, _isStr + _dictToFunWithIdentity, _dictToFunWithDefault, _dictToMap, _dictToProperties, _isStr from deephaven.Types import _jclassFromType @@ -224,11 +224,11 @@ def avro(schema, schema_version:str = None, mapping:dict = None, mapping_only:di if mapping is not None: have_mapping = True # when providing 'mapping', fields names not given are mapped as identity - mapping = _dictToFun(mapping, default_value=IDENTITY) + mapping = _dictToFunWithIdentity(mapping) elif mapping_only is not None: have_mapping = True # when providing 'mapping_only', fields not given are ignored. - mapping = _dictToFun(mapping_only, default_value=None) + mapping = _dictToFunWithDefault(mapping_only, None) else: have_mapping = False if _isStr(schema): diff --git a/Integrations/python/deephaven/ProduceKafka.py b/Integrations/python/deephaven/ProduceKafka.py index c96f3810bcf..8730650bc70 100644 --- a/Integrations/python/deephaven/ProduceKafka.py +++ b/Integrations/python/deephaven/ProduceKafka.py @@ -9,7 +9,7 @@ import deephaven.Types as dh from deephaven.conversion_utils import _isJavaType, _isStr, \ - _typeFromName, _dictToProperties, _dictToMap, _seqToSet, IDENTITY + _typeFromName, _dictToProperties, _dictToMap, _seqToSet # None until the first _defineSymbols() call _java_type_ = None diff --git a/Integrations/python/deephaven/conversion_utils.py b/Integrations/python/deephaven/conversion_utils.py index 37887b8f2c2..2281492d5ee 100644 --- a/Integrations/python/deephaven/conversion_utils.py +++ b/Integrations/python/deephaven/conversion_utils.py @@ -30,13 +30,12 @@ def _isStr(input): _jmap_ = None _jset_ = None _python_tools_ = None -IDENTITY = None def _defineSymbols(): if not jpy.has_jvm(): raise SystemError("No java functionality can be used until the JVM has been initialized through the jpy module") - global _table_tools_, _col_def_, _jprops_, _jmap_, _jset_, _python_tools_, IDENTITY + global _table_tools_, _col_def_, _jprops_, _jmap_, _jset_, _python_tools_ if _table_tools_ is None: # This will raise an exception if the desired object is not the classpath _table_tools_ = jpy.get_type("io.deephaven.engine.util.TableTools") @@ -45,7 +44,6 @@ def _defineSymbols(): _jmap_ = jpy.get_type("java.util.HashMap") _jset_ = jpy.get_type("java.util.HashSet") _python_tools_ = jpy.get_type("io.deephaven.integrations.python.PythonTools") - IDENTITY = object() # Ensure IDENTITY is unique. # every method that depends on symbols defined via _defineSymbols() should be decorated with @_passThrough @@ -1107,8 +1105,11 @@ def _seqToSet(s): return r @_passThrough -def _dictToFun(dict_mapping, default_value): +def _dictToFunWithIdentity(dict_mapping): + java_map = _dictToMap(dict_mapping) + return _python_tools_.functionFromMapWithIdentityDefaults(java_map) + +@_passThrough +def _dictToFunWithDefault(dict_mapping, default_value): java_map = _dictToMap(dict_mapping) - if default_value is IDENTITY: - return _python_tools_.functionFromMapWithIdentityDefaults(java_map) return _python_tools_.functionFromMapWithDefault(java_map, default_value) diff --git a/Integrations/python/test/testConsumeKafka.py b/Integrations/python/test/testConsumeKafka.py index c5be6c3bf7d..2b017b0f641 100644 --- a/Integrations/python/test/testConsumeKafka.py +++ b/Integrations/python/test/testConsumeKafka.py @@ -142,25 +142,71 @@ def testAvro(self): r = os.system(sys_str) self.assertEquals(0, r) - t = ck.consumeToTable( - { 'bootstrap.servers' : 'redpanda:29092', - 'schema.registry.url' : 'http://redpanda:8081' }, - 'share_price', - key = ck.IGNORE, - value = ck.avro('share_price_record', schema_version='1'), - table_type='append' - ) - - cols = t.getDefinition().getColumns() - self.assertEquals(7, len(cols)) - self._assertCommonCols(cols) + with self.subTest(msg='straight schema, no mapping'): + t = ck.consumeToTable( + { 'bootstrap.servers' : 'redpanda:29092', + 'schema.registry.url' : 'http://redpanda:8081' }, + 'share_price', + key = ck.IGNORE, + value = ck.avro('share_price_record', schema_version='1'), + table_type='append' + ) + + cols = t.getDefinition().getColumns() + self.assertEquals(7, len(cols)) + self._assertCommonCols(cols) + + self.assertEquals("Symbol", cols[3].getName()) + self.assertEquals(dh.string.clazz(), cols[3].getDataType()) + self.assertEquals("Side", cols[4].getName()) + self.assertEquals(dh.string.clazz(), cols[4].getDataType()) + self.assertEquals("Qty", cols[5].getName()) + self.assertEquals(dh.int_.clazz(), cols[5].getDataType()) + self.assertEquals("Price", cols[6].getName()) + self.assertEquals(dh.double.clazz(), cols[6].getDataType()) + + with self.subTest( + msg='mapping_only (filter out some schema fields)'): + m = {'Symbol' : 'Ticker', 'Price' : 'Dollars'} + t = ck.consumeToTable( + { 'bootstrap.servers' : 'redpanda:29092', + 'schema.registry.url' : 'http://redpanda:8081' }, + 'share_price', + key = ck.IGNORE, + value = ck.avro('share_price_record', mapping_only=m), + table_type='append' + ) + + cols = t.getDefinition().getColumns() + self.assertEquals(5, len(cols)) + self._assertCommonCols(cols) + + self.assertEquals("Ticker", cols[3].getName()) + self.assertEquals(dh.string.clazz(), cols[3].getDataType()) + self.assertEquals("Dollars", cols[4].getName()) + self.assertEquals(dh.double.clazz(), cols[4].getDataType()) + + with self.subTest(msg='mapping (rename some fields)'): + m = {'Symbol' : 'Ticker', 'Qty' : 'Quantity'} + t = ck.consumeToTable( + { 'bootstrap.servers' : 'redpanda:29092', + 'schema.registry.url' : 'http://redpanda:8081' }, + 'share_price', + key = ck.IGNORE, + value = ck.avro('share_price_record', mapping=m), + table_type='append' + ) + + cols = t.getDefinition().getColumns() + self.assertEquals(7, len(cols)) + self._assertCommonCols(cols) + + self.assertEquals("Ticker", cols[3].getName()) + self.assertEquals(dh.string.clazz(), cols[3].getDataType()) + self.assertEquals("Side", cols[4].getName()) + self.assertEquals(dh.string.clazz(), cols[4].getDataType()) + self.assertEquals("Quantity", cols[5].getName()) + self.assertEquals(dh.int_.clazz(), cols[5].getDataType()) + self.assertEquals("Price", cols[6].getName()) + self.assertEquals(dh.double.clazz(), cols[6].getDataType()) - self.assertEquals("Symbol", cols[3].getName()) - self.assertEquals(dh.string.clazz(), cols[3].getDataType()) - self.assertEquals("Side", cols[4].getName()) - self.assertEquals(dh.string.clazz(), cols[4].getDataType()) - self.assertEquals("Qty", cols[5].getName()) - self.assertEquals(dh.int_.clazz(), cols[5].getDataType()) - self.assertEquals("Price", cols[6].getName()) - self.assertEquals(dh.double.clazz(), cols[6].getDataType()) -