From 999865c661c045fd8bec166c451d35c2bae6ee3d Mon Sep 17 00:00:00 2001 From: Wassilios Lytras Date: Fri, 22 Jul 2022 16:33:52 +0200 Subject: [PATCH 1/6] Fix github action build fail due to: https://stackoverflow.com/questions/71673404/importerror-cannot-import-name-unicodefun-from-click --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 59a607f..71bda72 100644 --- a/setup.py +++ b/setup.py @@ -14,7 +14,7 @@ "pylint==2.12.1", "pylama==8.3.7", "pylama-pylint==3.1.1", - "black==21.12b0", + "black==22.6.0", "pytest-black==0.3.12", ] From 85d0b28666a9533aa9dcc3a40ed8de4a38ee0ca1 Mon Sep 17 00:00:00 2001 From: Wassilios Lytras Date: Fri, 22 Jul 2022 20:10:09 +0200 Subject: [PATCH 2/6] Added partner setting to force canonicalize binary. --- pyas2lib/as2.py | 5 ++++- pyas2lib/utils.py | 9 +++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/pyas2lib/as2.py b/pyas2lib/as2.py index 9eadd4e..cd2d915 100644 --- a/pyas2lib/as2.py +++ b/pyas2lib/as2.py @@ -177,6 +177,8 @@ class Partner: :param mdn_confirm_text: The text to be used in the MDN for successfully processed messages received from this partner. + :param canonicalize_as_binary: force binary canonicalization for this partner + """ as2_name: str @@ -194,6 +196,7 @@ class Partner: mdn_digest_alg: str = None mdn_confirm_text: str = MDN_CONFIRM_TEXT ignore_self_signed: bool = True + canonicalize_as_binary: bool = False def __post_init__(self): """Run the post initialisation checks for this class.""" @@ -638,7 +641,7 @@ def parse(self, raw_content, find_org_cb, find_partner_cb, find_message_cb=None) # Verify the message, first using raw message and if it fails # then convert to canonical form and try again - mic_content = canonicalize(self.payload) + mic_content = canonicalize(self.payload, canonicalize_as_binary=self.sender.canonicalize_as_binary) verify_cert = self.sender.load_verify_cert() self.digest_alg = verify_message(mic_content, signature, verify_cert) diff --git a/pyas2lib/utils.py b/pyas2lib/utils.py index 5947c55..5a2e999 100644 --- a/pyas2lib/utils.py +++ b/pyas2lib/utils.py @@ -48,8 +48,8 @@ def _handle_text(self, msg): newline replacements. """ if ( - msg.get_content_type() == "application/octet-stream" - or msg.get("Content-Transfer-Encoding") == "binary" + msg.get_content_type() == "application/octet-stream" + or msg.get("Content-Transfer-Encoding") == "binary" ): payload = msg.get_payload(decode=True) if payload is None: @@ -75,15 +75,16 @@ def mime_to_bytes(msg: message.Message, email_policy: policy.Policy = policy.HTT return fp.getvalue() -def canonicalize(email_message: message.Message): +def canonicalize(email_message: message.Message, canonicalize_as_binary: bool = False): """ Function to convert an email Message to standard format string/ :param email_message: email.message.Message to be converted to standard string + :param canonicalize_as_binary: force binary canonicalization :return: the standard representation of the email message in bytes """ - if email_message.get("Content-Transfer-Encoding") == "binary": + if email_message.get("Content-Transfer-Encoding") == "binary" or canonicalize_as_binary: message_header = "" message_body = email_message.get_payload(decode=True) for k, v in email_message.items(): From 0b3aeae7f4efd20c1e1a130cf094166255fc7bfb Mon Sep 17 00:00:00 2001 From: Wassilios Lytras Date: Fri, 22 Jul 2022 20:13:02 +0200 Subject: [PATCH 3/6] Formatted with black --- pyas2lib/as2.py | 5 ++++- pyas2lib/utils.py | 9 ++++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/pyas2lib/as2.py b/pyas2lib/as2.py index cd2d915..cb058ad 100644 --- a/pyas2lib/as2.py +++ b/pyas2lib/as2.py @@ -641,7 +641,10 @@ def parse(self, raw_content, find_org_cb, find_partner_cb, find_message_cb=None) # Verify the message, first using raw message and if it fails # then convert to canonical form and try again - mic_content = canonicalize(self.payload, canonicalize_as_binary=self.sender.canonicalize_as_binary) + mic_content = canonicalize( + self.payload, + canonicalize_as_binary=self.sender.canonicalize_as_binary, + ) verify_cert = self.sender.load_verify_cert() self.digest_alg = verify_message(mic_content, signature, verify_cert) diff --git a/pyas2lib/utils.py b/pyas2lib/utils.py index 5a2e999..48ad8dd 100644 --- a/pyas2lib/utils.py +++ b/pyas2lib/utils.py @@ -48,8 +48,8 @@ def _handle_text(self, msg): newline replacements. """ if ( - msg.get_content_type() == "application/octet-stream" - or msg.get("Content-Transfer-Encoding") == "binary" + msg.get_content_type() == "application/octet-stream" + or msg.get("Content-Transfer-Encoding") == "binary" ): payload = msg.get_payload(decode=True) if payload is None: @@ -84,7 +84,10 @@ def canonicalize(email_message: message.Message, canonicalize_as_binary: bool = :return: the standard representation of the email message in bytes """ - if email_message.get("Content-Transfer-Encoding") == "binary" or canonicalize_as_binary: + if ( + email_message.get("Content-Transfer-Encoding") == "binary" + or canonicalize_as_binary + ): message_header = "" message_body = email_message.get_payload(decode=True) for k, v in email_message.items(): From c4be1f057562f576bc63639ee475b8a9687b63a9 Mon Sep 17 00:00:00 2001 From: Wassilios Lytras Date: Sat, 4 May 2024 00:20:54 +0200 Subject: [PATCH 4/6] Option to send a partnership callback option. --- pyas2lib/as2.py | 50 ++++++++++++++++++++++++++++-------- pyas2lib/tests/test_basic.py | 27 +++++++++++++++++++ 2 files changed, 66 insertions(+), 11 deletions(-) diff --git a/pyas2lib/as2.py b/pyas2lib/as2.py index 9c74e54..e65c288 100644 --- a/pyas2lib/as2.py +++ b/pyas2lib/as2.py @@ -564,7 +564,14 @@ def _decompress_data(self, payload): return False, payload - def parse(self, raw_content, find_org_cb, find_partner_cb, find_message_cb=None): + def parse( + self, + raw_content, + find_org_cb=None, + find_partner_cb=None, + find_message_cb=None, + find_org_partner_cb=None, + ): """Function parses the RAW AS2 message; decrypts, verifies and decompresses it and extracts the payload. @@ -572,18 +579,24 @@ def parse(self, raw_content, find_org_cb, find_partner_cb, find_message_cb=None) A byte string of the received HTTP headers followed by the body. :param find_org_cb: - A callback the returns an Organization object if exists. The + A conditional callback the returns an Organization object if exists. The as2-to header value is passed as an argument to it. :param find_partner_cb: - A callback the returns an Partner object if exists. The + A conditional callback the returns a Partner object if exists. The as2-from header value is passed as an argument to it. :param find_message_cb: - An optional callback the returns an Message object if exists in + An optional callback the returns a Message object if exists in order to check for duplicates. The message id and partner id is passed as arguments to it. + :param find_org_partner_cb: + A conditional callback that return Organization object and + Partner object if exist. The as2-to and as2-from header value + are passed as an argument to it. Must be provided + when find_org_cb and find_org_partner_cb is None. + :return: A three element tuple containing (status, (exception, traceback) , mdn). The status is a string indicating the status of the @@ -592,6 +605,18 @@ def parse(self, raw_content, find_org_cb, find_partner_cb, find_message_cb=None) the partner did not request it. """ + # Validate passed arguments + if not any( + [ + find_org_cb and find_partner_cb and not find_org_partner_cb, + find_org_partner_cb and not find_partner_cb and not find_org_cb, + ] + ): + raise TypeError( + "Incorrect arguments passed: either find_org_cb and find_partner_cb " + "or only find_org_partner_cb must be passed." + ) + # Parse the raw MIME message and extract its content and headers status, detailed_status, exception, mdn = "processed", None, (None, None), None self.payload = parse_mime(raw_content) @@ -605,14 +630,17 @@ def parse(self, raw_content, find_org_cb, find_partner_cb, find_message_cb=None) try: # Get the organization and partner for this transmission org_id = unquote_as2name(as2_headers["as2-to"]) - self.receiver = find_org_cb(org_id) - if not self.receiver: - raise PartnerNotFound(f"Unknown AS2 organization with id {org_id}") - partner_id = unquote_as2name(as2_headers["as2-from"]) - self.sender = find_partner_cb(partner_id) - if not self.sender: - raise PartnerNotFound(f"Unknown AS2 partner with id {partner_id}") + if find_org_partner_cb: + self.receiver, self.sender = find_org_partner_cb(org_id, partner_id) + elif find_org_cb and find_partner_cb: + self.receiver = find_org_cb(org_id) + if not self.receiver: + raise PartnerNotFound(f"Unknown AS2 organization with id {org_id}") + + self.sender = find_partner_cb(partner_id) + if not self.sender: + raise PartnerNotFound(f"Unknown AS2 partner with id {partner_id}") if find_message_cb and find_message_cb(self.message_id, partner_id): raise DuplicateDocument( diff --git a/pyas2lib/tests/test_basic.py b/pyas2lib/tests/test_basic.py index fed94b9..87f09cb 100644 --- a/pyas2lib/tests/test_basic.py +++ b/pyas2lib/tests/test_basic.py @@ -184,6 +184,30 @@ def test_encrypted_signed_compressed_message(self): self.assertEqual(out_message.mic, in_message.mic) self.assertEqual(self.test_data.splitlines(), in_message.content.splitlines()) + def test_encrypted_signed_message_partnership(self): + """Test Encrypted Signed Uncompressed Message with Partnership""" + + # Build an As2 message to be transmitted to partner + self.partner.sign = True + self.partner.encrypt = True + out_message = as2.Message(self.org, self.partner) + out_message.build(self.test_data) + raw_out_message = out_message.headers_str + b"\r\n" + out_message.content + + # Parse the generated AS2 message as the partner + in_message = as2.Message() + status, _, _ = in_message.parse( + raw_out_message, + find_org_partner_cb=self.find_org_partner, + ) + + # Compare the mic contents of the input and output messages + self.assertEqual(status, "processed") + self.assertTrue(in_message.signed) + self.assertTrue(in_message.encrypted) + self.assertEqual(out_message.mic, in_message.mic) + self.assertEqual(self.test_data.splitlines(), in_message.content.splitlines()) + def test_plain_message_with_domain(self): """Test Message building with an org domain""" @@ -229,3 +253,6 @@ def find_org(self, as2_id): def find_partner(self, as2_id): return self.partner + + def find_org_partner(self, as2_org, as2_partner): + return self.org, self.partner From 29af788f835dc32f8e88142a83162f0107e34b9c Mon Sep 17 00:00:00 2001 From: Wassilios Lytras Date: Sat, 4 May 2024 23:52:48 +0200 Subject: [PATCH 5/6] Raise missing partner/org errors also for partnership --- pyas2lib/as2.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pyas2lib/as2.py b/pyas2lib/as2.py index e65c288..2e03331 100644 --- a/pyas2lib/as2.py +++ b/pyas2lib/as2.py @@ -635,12 +635,13 @@ def parse( self.receiver, self.sender = find_org_partner_cb(org_id, partner_id) elif find_org_cb and find_partner_cb: self.receiver = find_org_cb(org_id) - if not self.receiver: - raise PartnerNotFound(f"Unknown AS2 organization with id {org_id}") - self.sender = find_partner_cb(partner_id) - if not self.sender: - raise PartnerNotFound(f"Unknown AS2 partner with id {partner_id}") + + if not self.receiver: + raise PartnerNotFound(f"Unknown AS2 organization with id {org_id}") + + if not self.sender: + raise PartnerNotFound(f"Unknown AS2 partner with id {partner_id}") if find_message_cb and find_message_cb(self.message_id, partner_id): raise DuplicateDocument( From 894c087fdcbfd6b9763109c1d5fb7d127f4d2a6e Mon Sep 17 00:00:00 2001 From: Wassilios Lytras Date: Sat, 4 May 2024 23:53:10 +0200 Subject: [PATCH 6/6] Adding tests for invalid callback function combination --- pyas2lib/tests/test_basic.py | 41 ++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/pyas2lib/tests/test_basic.py b/pyas2lib/tests/test_basic.py index 87f09cb..e1dc83a 100644 --- a/pyas2lib/tests/test_basic.py +++ b/pyas2lib/tests/test_basic.py @@ -248,6 +248,47 @@ def test_invalid_message_id_length_raises_error(self): in str(excinfo.value) ) + def test_invalid_cb_function_passed(self): + """Checking allowed combination of CB functions""" + + # Create AS2 message and parse with wrong combination of callback functions + + as2_message = as2.Message() + with pytest.raises( + TypeError, + match="Incorrect arguments passed: either find_org_cb and find_partner_cb " + "or only find_org_partner_cb must be passed.", + ): + + _, _, _ = as2_message.parse( + "abc", + find_org_partner_cb=self.find_org_partner, + find_partner_cb=self.find_partner, + ) + + with pytest.raises( + TypeError, + match="Incorrect arguments passed: either find_org_cb and find_partner_cb " + "or only find_org_partner_cb must be passed.", + ): + _, _, _ = as2_message.parse( + "abc", + find_org_partner_cb=self.find_org_partner, + find_org_cb=self.find_org, + ) + + with pytest.raises( + TypeError, + match="Incorrect arguments passed: either find_org_cb and find_partner_cb " + "or only find_org_partner_cb must be passed.", + ): + _, _, _ = as2_message.parse( + "abc", + find_org_partner_cb=self.find_org_partner, + find_org_cb=self.find_org, + find_partner_cb=self.find_partner, + ) + def find_org(self, as2_id): return self.org