Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add associated document id field to ChangeMeta #33807

Merged
merged 10 commits into from
Dec 5, 2023
9 changes: 6 additions & 3 deletions corehq/ex-submodules/pillowtop/feed/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ class ChangeMeta(jsonobject.JsonObject):
# track when first published (will not get updated on retry, unlike publish_timestamp)
original_publication_datetime = jsonobject.DateTimeProperty(default=datetime.utcnow)

# available to hold any associated document. For cases, this is the form ID responsible for the change
associated_document_id = jsonobject.StringProperty()


class Change(object):
"""
Expand Down Expand Up @@ -119,10 +122,10 @@ def __getitem__(self, key):
return self._dict[key]

def __setitem__(self, key, value):
raise NotImplemented('This is a read-only dictionary!')
raise NotImplementedError('This is a read-only dictionary!')

def __delitem__(self, key, value):
raise NotImplemented('This is a read-only dictionary!')
raise NotImplementedError('This is a read-only dictionary!')

def __iter__(self):
return iter(self._dict)
Expand All @@ -134,7 +137,7 @@ def get(self, key, default=None):
return self._dict.get(key, default)

def pop(self, key, default):
raise NotImplemented('This is a read-only dictionary!')
raise NotImplementedError('This is a read-only dictionary!')

def to_dict(self):
return self._dict
Expand Down
6 changes: 5 additions & 1 deletion corehq/form_processor/backends/sql/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,11 @@ def publish_changes_to_kafka(processed_forms, cases, stock_result):
publish_form_saved(processed_forms.submitted)
cases = cases or []
for case in cases:
publish_case_saved(case, send_post_save_signal=False)
publish_case_saved(
case,
associated_form_id=processed_forms.submitted.form_id,
send_post_save_signal=False
)

if stock_result:
for ledger in stock_result.models_to_save:
Expand Down
7 changes: 4 additions & 3 deletions corehq/form_processor/change_publishers.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@ def publish_form_deleted(domain, form_id):
))


def publish_case_saved(case, send_post_save_signal=True):
def publish_case_saved(case, associated_form_id=None, send_post_save_signal=True):
"""
Publish the change to kafka and run case post-save signals.
"""
producer.send_change(topics.CASE_SQL, change_meta_from_sql_case(case))
producer.send_change(topics.CASE_SQL, change_meta_from_sql_case(case, associated_form_id))
if send_post_save_signal:
sql_case_post_save.send(case.__class__, case=case)


def change_meta_from_sql_case(case):
def change_meta_from_sql_case(case, associated_form_id=None):
return ChangeMeta(
document_id=case.case_id,
data_source_type=data_sources.SOURCE_SQL,
Expand All @@ -49,6 +49,7 @@ def change_meta_from_sql_case(case):
document_subtype=case.type,
domain=case.domain,
is_deletion=case.is_deleted,
associated_document_id=associated_form_id
)


Expand Down
Loading