Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async callback functions, partnership function, rsaes_oaep for KeyEncryptionAlgorithm #64

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion AUTHORS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
- Abhishek Ram <[email protected]> @abhishek-ram
- Chad Gates @chadgates
- Wassilios Lytras @chadgates
- Bruno Ribeiro da Silva <[email protected]> @loop0
- Robin C Samuel @robincsamuel
- Brandon Joyce @brandonjoyce
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
# Release History

## 1.4.4 - 2024-

* feat: added partnership lookup function
* feat: added support for async callback functions
* feat: added support for optional key encryption algorithm rsaes_oaep for encryption and decryption

## 1.4.3 - 2023-01-25

* fix: update pyopenssl version to resovle pyca/cryptography#7959
* fix: update pyopenssl version to resolve pyca/cryptography#7959

## 1.4.2 - 2022-12-11

Expand Down
100 changes: 86 additions & 14 deletions pyas2lib/as2.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
"""Define the core functions/classes of the pyas2 package."""
import logging
import hashlib
import asyncio
import binascii
import hashlib
import inspect
import logging
import traceback
from dataclasses import dataclass
from email import encoders
from email import message as email_message
from email import message_from_bytes as parse_mime
from email import utils as email_utils
from email.mime.multipart import MIMEMultipart

from oscrypto import asymmetric

from pyas2lib.cms import (
Expand Down Expand Up @@ -539,26 +542,41 @@

return False, payload

def parse(self, raw_content, find_org_cb, find_partner_cb, find_message_cb=None):
async def aparse(
self,
raw_content,
find_org_cb=None,
find_partner_cb=None,
find_message_cb=None,
find_org_partner_cb=None,
):
"""Function parses the RAW AS2 message; decrypts, verifies and
decompresses it and extracts the payload.

:param raw_content:
A byte string of the received HTTP headers followed by the body.

:param find_org_cb:
A callback the returns an Organization object if exists. The
as2-to header value is passed as an argument to it.
A conditional callback the returns an Organization object if exists. The
as2-to header value is passed as an argument to it. Must be provided
when find_partner_cb is provided and find_org_partner_cb is None

:param find_partner_cb:
A callback the returns an Partner object if exists. The
as2-from header value is passed as an argument to it.
An conditional callback the returns an Partner object if exists. The
as2-from header value is passed as an argument to it. Must be provided
when find_org_cb is provided and find_org_partner_cb is None.

:param find_message_cb:
An optional callback the returns an Message object if exists in
order to check for duplicates. The message id and partner id is
passed as arguments to it.

:param find_org_partner_cb:
A conditional callback that return Organization object and
Partner object if exist. The as2-to and as2-from header value
are passed as an argument to it. Must be provided
when find_org_cb and find_org_partner_cb is None.

:return:
A three element tuple containing (status, (exception, traceback)
, mdn). The status is a string indicating the status of the
Expand All @@ -567,6 +585,18 @@
the partner did not request it.
"""

# Validate passed arguments
if not any(
[
find_org_cb and find_partner_cb and not find_org_partner_cb,
find_org_partner_cb and not find_partner_cb and not find_org_cb,
]
):
raise TypeError(
"Incorrect arguments passed: either find_org_cb and find_partner_cb "
"or only find_org_partner_cb must be passed."
)

# Parse the raw MIME message and extract its content and headers
status, detailed_status, exception, mdn = "processed", None, (None, None), None
self.payload = parse_mime(raw_content)
Expand All @@ -580,19 +610,44 @@
try:
# Get the organization and partner for this transmission
org_id = unquote_as2name(as2_headers["as2-to"])
self.receiver = find_org_cb(org_id)
partner_id = unquote_as2name(as2_headers["as2-from"])

if find_org_partner_cb:
if inspect.iscoroutinefunction(find_org_partner_cb):
self.receiver, self.sender = await find_org_partner_cb(

Check warning on line 617 in pyas2lib/as2.py

View check run for this annotation

Codecov / codecov/patch

pyas2lib/as2.py#L617

Added line #L617 was not covered by tests
org_id, partner_id
)
else:
self.receiver, self.sender = find_org_partner_cb(org_id, partner_id)

else:
if find_org_cb:
if inspect.iscoroutinefunction(find_org_cb):
self.receiver = await find_org_cb(org_id)

Check warning on line 626 in pyas2lib/as2.py

View check run for this annotation

Codecov / codecov/patch

pyas2lib/as2.py#L626

Added line #L626 was not covered by tests
else:
self.receiver = find_org_cb(org_id)

if find_partner_cb:
if inspect.iscoroutinefunction(find_partner_cb):
self.sender = await find_partner_cb(partner_id)

Check warning on line 632 in pyas2lib/as2.py

View check run for this annotation

Codecov / codecov/patch

pyas2lib/as2.py#L632

Added line #L632 was not covered by tests
else:
self.sender = find_partner_cb(partner_id)

if not self.receiver:
raise PartnerNotFound(f"Unknown AS2 organization with id {org_id}")

partner_id = unquote_as2name(as2_headers["as2-from"])
self.sender = find_partner_cb(partner_id)
if not self.sender:
raise PartnerNotFound(f"Unknown AS2 partner with id {partner_id}")

if find_message_cb and find_message_cb(self.message_id, partner_id):
raise DuplicateDocument(
"Duplicate message received, message with this ID already processed."
)
if find_message_cb:
if inspect.iscoroutinefunction(find_message_cb):
message_exists = await find_message_cb(self.message_id, partner_id)

Check warning on line 644 in pyas2lib/as2.py

View check run for this annotation

Codecov / codecov/patch

pyas2lib/as2.py#L644

Added line #L644 was not covered by tests
else:
message_exists = find_message_cb(self.message_id, partner_id)
if message_exists:
raise DuplicateDocument(
"Duplicate message received, message with this ID already processed."
)

if (
self.sender.encrypt
Expand Down Expand Up @@ -713,6 +768,18 @@

return status, exception, mdn

def parse(self, *args, **kwargs):
"""
A synchronous wrapper for the asynchronous parse method.
It runs the parse coroutine in an event loop and returns the result.
"""
loop = asyncio.get_event_loop()
if loop.is_running():
raise RuntimeError(

Check warning on line 778 in pyas2lib/as2.py

View check run for this annotation

Codecov / codecov/patch

pyas2lib/as2.py#L778

Added line #L778 was not covered by tests
"Cannot run synchronous parse within an already running event loop."
)
return loop.run_until_complete(self.aparse(*args, **kwargs))


class Mdn:
"""Class for handling AS2 MDNs. Includes functions for both
Expand Down Expand Up @@ -915,6 +982,11 @@
# Call the find message callback which should return a Message instance
orig_message = find_message_cb(self.orig_message_id, orig_recipient)

if not orig_message:
status = "failed/Failure"
details_status = "original-message-not-found"
return status, details_status

Check warning on line 988 in pyas2lib/as2.py

View check run for this annotation

Codecov / codecov/patch

pyas2lib/as2.py#L986-L988

Added lines #L986 - L988 were not covered by tests

# Extract the headers and save it
mdn_headers = {}
for k, v in self.payload.items():
Expand Down
83 changes: 51 additions & 32 deletions pyas2lib/cms.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import zlib
from datetime import datetime, timezone

from asn1crypto import cms, core, algos
from asn1crypto import algos, cms, core
from asn1crypto.cms import SMIMECapabilityIdentifier
from oscrypto import asymmetric, symmetric, util

Expand Down Expand Up @@ -65,12 +65,15 @@
raise DecompressionError("Decompression failed with cause: {}".format(e)) from e


def encrypt_message(data_to_encrypt, enc_alg, encryption_cert):
def encrypt_message(
data_to_encrypt, enc_alg, encryption_cert, key_enc_alg="rsaes_pkcs1v15"
):
"""Function encrypts data and returns the generated ASN.1

:param data_to_encrypt: A byte string of the data to be encrypted
:param enc_alg: The algorithm to be used for encrypting the data
:param encryption_cert: The certificate to be used for encrypting the data
:param key_enc_alg: The algo for the key encryption: rsaes_pkcs1v15 (default) or rsaes_oaep

:return: A CMS ASN.1 byte string of the encrypted data.
"""
Expand Down Expand Up @@ -136,7 +139,12 @@
raise AS2Exception("Unsupported Encryption Algorithm")

# Encrypt the key and build the ASN.1 message
encrypted_key = asymmetric.rsa_pkcs1v15_encrypt(encryption_cert, key)
if key_enc_alg == "rsaes_pkcs1v15":
encrypted_key = asymmetric.rsa_pkcs1v15_encrypt(encryption_cert, key)
elif key_enc_alg == "rsaes_oaep":
encrypted_key = asymmetric.rsa_oaep_encrypt(encryption_cert, key)
else:
raise AS2Exception(f"Unsupported Key Encryption Scheme: {key_enc_alg}")

return cms.ContentInfo(
{
Expand All @@ -163,7 +171,11 @@
}
),
"key_encryption_algorithm": cms.KeyEncryptionAlgorithm(
{"algorithm": cms.KeyEncryptionAlgorithmId("rsa")}
{
"algorithm": cms.KeyEncryptionAlgorithmId(
key_enc_alg
)
}
),
"encrypted_key": cms.OctetString(encrypted_key),
}
Expand Down Expand Up @@ -200,46 +212,53 @@
encrypted_key = recipient_info["encrypted_key"].native

if cms.KeyEncryptionAlgorithmId(key_enc_alg) == cms.KeyEncryptionAlgorithmId(
"rsa"
"rsaes_pkcs1v15"
):
try:
key = asymmetric.rsa_pkcs1v15_decrypt(decryption_key[0], encrypted_key)
except Exception as e:
raise DecryptionError(
"Failed to decrypt the payload: Could not extract decryption key."
) from e

alg = cms_content["content"]["encrypted_content_info"][
"content_encryption_algorithm"
]
encapsulated_data = cms_content["content"]["encrypted_content_info"][
"encrypted_content"
].native

elif cms.KeyEncryptionAlgorithmId(key_enc_alg) == cms.KeyEncryptionAlgorithmId(
"rsaes_oaep"
):
try:
if alg["algorithm"].native == "rc4":
decrypted_content = symmetric.rc4_decrypt(key, encapsulated_data)
elif alg.encryption_cipher == "tripledes":
cipher = "tripledes_192_cbc"
decrypted_content = symmetric.tripledes_cbc_pkcs5_decrypt(
key, encapsulated_data, alg.encryption_iv
)
elif alg.encryption_cipher == "aes":
decrypted_content = symmetric.aes_cbc_pkcs7_decrypt(
key, encapsulated_data, alg.encryption_iv
)
elif alg.encryption_cipher == "rc2":
decrypted_content = symmetric.rc2_cbc_pkcs5_decrypt(
key, encapsulated_data, alg["parameters"]["iv"].native
)
else:
raise AS2Exception("Unsupported Encryption Algorithm")
key = asymmetric.rsa_oaep_decrypt(decryption_key[0], encrypted_key)
except Exception as e:
raise DecryptionError(
"Failed to decrypt the payload: {}".format(e)
"Failed to decrypt the payload: Could not extract decryption key."
) from e
else:
raise AS2Exception("Unsupported Encryption Algorithm")
raise AS2Exception(f"Unsupported Key Encryption Algorithm {key_enc_alg}")

Check warning on line 233 in pyas2lib/cms.py

View check run for this annotation

Codecov / codecov/patch

pyas2lib/cms.py#L233

Added line #L233 was not covered by tests

alg = cms_content["content"]["encrypted_content_info"][
"content_encryption_algorithm"
]
encapsulated_data = cms_content["content"]["encrypted_content_info"][
"encrypted_content"
].native

try:
if alg["algorithm"].native == "rc4":
decrypted_content = symmetric.rc4_decrypt(key, encapsulated_data)
elif alg.encryption_cipher == "tripledes":
cipher = "tripledes_192_cbc"
decrypted_content = symmetric.tripledes_cbc_pkcs5_decrypt(
key, encapsulated_data, alg.encryption_iv
)
elif alg.encryption_cipher == "aes":
decrypted_content = symmetric.aes_cbc_pkcs7_decrypt(

Check warning on line 251 in pyas2lib/cms.py

View check run for this annotation

Codecov / codecov/patch

pyas2lib/cms.py#L251

Added line #L251 was not covered by tests
key, encapsulated_data, alg.encryption_iv
)
elif alg.encryption_cipher == "rc2":
decrypted_content = symmetric.rc2_cbc_pkcs5_decrypt(
key, encapsulated_data, alg["parameters"]["iv"].native
)
else:
raise AS2Exception("Unsupported Encryption Algorithm")
except Exception as e:
raise DecryptionError("Failed to decrypt the payload: {}".format(e)) from e
else:
raise DecryptionError("Encrypted data not found in ASN.1 ")

Expand Down
Loading
Loading