Skip to content

Commit

Permalink
Added tests for avro mappings in consume. Reworked somewhat the impl. (
Browse files Browse the repository at this point in the history
  • Loading branch information
jcferretti authored Dec 9, 2021
1 parent 4b18d35 commit 95ba60e
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 31 deletions.
6 changes: 3 additions & 3 deletions Integrations/python/deephaven/ConsumeKafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import deephaven.Types as dh

from deephaven.conversion_utils import \
_dictToFun, _dictToMap, _dictToProperties, IDENTITY, _isStr
_dictToFunWithIdentity, _dictToFunWithDefault, _dictToMap, _dictToProperties, _isStr

from deephaven.Types import _jclassFromType

Expand Down Expand Up @@ -224,11 +224,11 @@ def avro(schema, schema_version:str = None, mapping:dict = None, mapping_only:di
if mapping is not None:
have_mapping = True
# when providing 'mapping', fields names not given are mapped as identity
mapping = _dictToFun(mapping, default_value=IDENTITY)
mapping = _dictToFunWithIdentity(mapping)
elif mapping_only is not None:
have_mapping = True
# when providing 'mapping_only', fields not given are ignored.
mapping = _dictToFun(mapping_only, default_value=None)
mapping = _dictToFunWithDefault(mapping_only, None)
else:
have_mapping = False
if _isStr(schema):
Expand Down
2 changes: 1 addition & 1 deletion Integrations/python/deephaven/ProduceKafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import deephaven.Types as dh

from deephaven.conversion_utils import _isJavaType, _isStr, \
_typeFromName, _dictToProperties, _dictToMap, _seqToSet, IDENTITY
_typeFromName, _dictToProperties, _dictToMap, _seqToSet

# None until the first _defineSymbols() call
_java_type_ = None
Expand Down
13 changes: 7 additions & 6 deletions Integrations/python/deephaven/conversion_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,12 @@ def _isStr(input):
_jmap_ = None
_jset_ = None
_python_tools_ = None
IDENTITY = None

def _defineSymbols():
if not jpy.has_jvm():
raise SystemError("No java functionality can be used until the JVM has been initialized through the jpy module")

global _table_tools_, _col_def_, _jprops_, _jmap_, _jset_, _python_tools_, IDENTITY
global _table_tools_, _col_def_, _jprops_, _jmap_, _jset_, _python_tools_
if _table_tools_ is None:
# This will raise an exception if the desired object is not the classpath
_table_tools_ = jpy.get_type("io.deephaven.engine.util.TableTools")
Expand All @@ -45,7 +44,6 @@ def _defineSymbols():
_jmap_ = jpy.get_type("java.util.HashMap")
_jset_ = jpy.get_type("java.util.HashSet")
_python_tools_ = jpy.get_type("io.deephaven.integrations.python.PythonTools")
IDENTITY = object() # Ensure IDENTITY is unique.


# every method that depends on symbols defined via _defineSymbols() should be decorated with @_passThrough
Expand Down Expand Up @@ -1107,8 +1105,11 @@ def _seqToSet(s):
return r

@_passThrough
def _dictToFun(dict_mapping, default_value):
def _dictToFunWithIdentity(dict_mapping):
java_map = _dictToMap(dict_mapping)
return _python_tools_.functionFromMapWithIdentityDefaults(java_map)

@_passThrough
def _dictToFunWithDefault(dict_mapping, default_value):
java_map = _dictToMap(dict_mapping)
if default_value is IDENTITY:
return _python_tools_.functionFromMapWithIdentityDefaults(java_map)
return _python_tools_.functionFromMapWithDefault(java_map, default_value)
88 changes: 67 additions & 21 deletions Integrations/python/test/testConsumeKafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,25 +142,71 @@ def testAvro(self):
r = os.system(sys_str)
self.assertEquals(0, r)

t = ck.consumeToTable(
{ 'bootstrap.servers' : 'redpanda:29092',
'schema.registry.url' : 'http://redpanda:8081' },
'share_price',
key = ck.IGNORE,
value = ck.avro('share_price_record', schema_version='1'),
table_type='append'
)

cols = t.getDefinition().getColumns()
self.assertEquals(7, len(cols))
self._assertCommonCols(cols)
with self.subTest(msg='straight schema, no mapping'):
t = ck.consumeToTable(
{ 'bootstrap.servers' : 'redpanda:29092',
'schema.registry.url' : 'http://redpanda:8081' },
'share_price',
key = ck.IGNORE,
value = ck.avro('share_price_record', schema_version='1'),
table_type='append'
)

cols = t.getDefinition().getColumns()
self.assertEquals(7, len(cols))
self._assertCommonCols(cols)

self.assertEquals("Symbol", cols[3].getName())
self.assertEquals(dh.string.clazz(), cols[3].getDataType())
self.assertEquals("Side", cols[4].getName())
self.assertEquals(dh.string.clazz(), cols[4].getDataType())
self.assertEquals("Qty", cols[5].getName())
self.assertEquals(dh.int_.clazz(), cols[5].getDataType())
self.assertEquals("Price", cols[6].getName())
self.assertEquals(dh.double.clazz(), cols[6].getDataType())

with self.subTest(
msg='mapping_only (filter out some schema fields)'):
m = {'Symbol' : 'Ticker', 'Price' : 'Dollars'}
t = ck.consumeToTable(
{ 'bootstrap.servers' : 'redpanda:29092',
'schema.registry.url' : 'http://redpanda:8081' },
'share_price',
key = ck.IGNORE,
value = ck.avro('share_price_record', mapping_only=m),
table_type='append'
)

cols = t.getDefinition().getColumns()
self.assertEquals(5, len(cols))
self._assertCommonCols(cols)

self.assertEquals("Ticker", cols[3].getName())
self.assertEquals(dh.string.clazz(), cols[3].getDataType())
self.assertEquals("Dollars", cols[4].getName())
self.assertEquals(dh.double.clazz(), cols[4].getDataType())

with self.subTest(msg='mapping (rename some fields)'):
m = {'Symbol' : 'Ticker', 'Qty' : 'Quantity'}
t = ck.consumeToTable(
{ 'bootstrap.servers' : 'redpanda:29092',
'schema.registry.url' : 'http://redpanda:8081' },
'share_price',
key = ck.IGNORE,
value = ck.avro('share_price_record', mapping=m),
table_type='append'
)

cols = t.getDefinition().getColumns()
self.assertEquals(7, len(cols))
self._assertCommonCols(cols)

self.assertEquals("Ticker", cols[3].getName())
self.assertEquals(dh.string.clazz(), cols[3].getDataType())
self.assertEquals("Side", cols[4].getName())
self.assertEquals(dh.string.clazz(), cols[4].getDataType())
self.assertEquals("Quantity", cols[5].getName())
self.assertEquals(dh.int_.clazz(), cols[5].getDataType())
self.assertEquals("Price", cols[6].getName())
self.assertEquals(dh.double.clazz(), cols[6].getDataType())

self.assertEquals("Symbol", cols[3].getName())
self.assertEquals(dh.string.clazz(), cols[3].getDataType())
self.assertEquals("Side", cols[4].getName())
self.assertEquals(dh.string.clazz(), cols[4].getDataType())
self.assertEquals("Qty", cols[5].getName())
self.assertEquals(dh.int_.clazz(), cols[5].getDataType())
self.assertEquals("Price", cols[6].getName())
self.assertEquals(dh.double.clazz(), cols[6].getDataType())

0 comments on commit 95ba60e

Please sign in to comment.