diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..feb1b19 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,22 @@ +[run] +branch = True +omit = + */site-packages/* + */tests/* + +[report] +exclude_lines = + + # Don't complain about missing debug-only code: + def __repr__ + def __str__ + + # Don't complain if tests don't hit defensive assertion code: + raise AssertionError + raise NotImplementedError + assert + + # Don't complain if non-runnable code isn't run: + if 0: + pass + if __name__ == .__main__.: diff --git a/.gitignore b/.gitignore index 3ecab8f..934b7fb 100644 --- a/.gitignore +++ b/.gitignore @@ -91,5 +91,4 @@ ENV/ # IDEA .idea .pytest_cache/ -.coverage .DS_Store \ No newline at end of file diff --git a/.travis.yml b/.travis.yml index d5b2585..90461ee 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,14 +1,13 @@ dist: xenial language: python python: - - '3.5' - '3.6' - '3.7' install: - python setup.py install - pip install pytest-cov script: - - pytest --cov=pyas2lib + - pytest --cov=pyas2lib --cov-config .coveragerc after_success: - pip install codecov - codecov \ No newline at end of file diff --git a/pyas2lib/as2.py b/pyas2lib/as2.py index 23c3dbc..8173f5d 100644 --- a/pyas2lib/as2.py +++ b/pyas2lib/as2.py @@ -2,6 +2,7 @@ import hashlib import binascii import traceback +from dataclasses import dataclass from email import encoders from email import message as email_message from email import message_from_bytes as parse_mime @@ -9,84 +10,73 @@ from email.mime.multipart import MIMEMultipart from oscrypto import asymmetric -from pyas2lib.cms import DIGEST_ALGORITHMS -from pyas2lib.cms import ENCRYPTION_ALGORITHMS -from pyas2lib.cms import compress_message -from pyas2lib.cms import decompress_message -from pyas2lib.cms import decrypt_message -from pyas2lib.cms import encrypt_message -from pyas2lib.cms import sign_message -from pyas2lib.cms import verify_message +from pyas2lib.cms import ( + compress_message, + decompress_message, + decrypt_message, + encrypt_message, + sign_message, + verify_message +) +from pyas2lib.constants import * from pyas2lib.exceptions import * -from pyas2lib.utils import canonicalize -from pyas2lib.utils import extract_first_part -from pyas2lib.utils import make_mime_boundary -from pyas2lib.utils import mime_to_bytes -from pyas2lib.utils import pem_to_der -from pyas2lib.utils import quote_as2name -from pyas2lib.utils import split_pem -from pyas2lib.utils import unquote_as2name -from pyas2lib.utils import verify_certificate_chain - -logger = logging.getLogger('pyas2lib') - -AS2_VERSION = '1.2' - -EDIINT_FEATURES = 'CMS' - -IGNORE_SELF_SIGNED_CERTS = True - -SYNCHRONOUS_MDN = 'SYNC' -ASYNCHRONOUS_MDN = 'ASYNC' - -MDN_MODES = ( - SYNCHRONOUS_MDN, - ASYNCHRONOUS_MDN +from pyas2lib.utils import ( + canonicalize, + extract_first_part, + make_mime_boundary, + mime_to_bytes, + pem_to_der, + quote_as2name, + split_pem, + unquote_as2name, + verify_certificate_chain ) -MDN_CONFIRM_TEXT = 'The AS2 message has been successfully processed. ' \ - 'Thank you for exchanging AS2 messages with pyAS2.' - -MDN_FAILED_TEXT = 'The AS2 message could not be processed. The ' \ - 'disposition-notification report has additional details.' +logger = logging.getLogger('pyas2lib') +@dataclass class Organization(object): - """Class represents an AS2 organization and defines the certificates and - settings to be used when sending and receiving messages. """ + """ + Class represents an AS2 organization and defines the certificates and + settings to be used when sending and receiving messages. - def __init__(self, as2_name, sign_key=None, sign_key_pass=None, - decrypt_key=None, decrypt_key_pass=None, mdn_url=None, - mdn_confirm_text=MDN_CONFIRM_TEXT): - """ - :param as2_name: The unique AS2 name for this organization + :param as2_name: The unique AS2 name for this organization - :param sign_key: A byte string of the pkcs12 encoded key pair - used for signing outbound messages and MDNs. + :param sign_key: A byte string of the pkcs12 encoded key pair + used for signing outbound messages and MDNs. - :param sign_key_pass: The password for decrypting the `sign_key` + :param sign_key_pass: The password for decrypting the `sign_key` - :param decrypt_key: A byte string of the pkcs12 encoded key pair - used for decrypting inbound messages. + :param decrypt_key: A byte string of the pkcs12 encoded key pair + used for decrypting inbound messages. - :param decrypt_key_pass: The password for decrypting the `decrypt_key` + :param decrypt_key_pass: The password for decrypting the `decrypt_key` - :param mdn_url: The URL where the receiver is expected to post - asynchronous MDNs. - """ - self.sign_key = self.load_key( - sign_key, sign_key_pass) if sign_key else None + :param mdn_url: The URL where the receiver is expected to post + asynchronous MDNs. + """ - self.decrypt_key = self.load_key( - decrypt_key, decrypt_key_pass) if decrypt_key else None + as2_name: str + sign_key: bytes = None + sign_key_pass: str = None + decrypt_key: bytes = None + decrypt_key_pass: str = None + mdn_url: str = None + mdn_confirm_text: str = MDN_CONFIRM_TEXT - self.as2_name = as2_name - self.mdn_url = mdn_url - self.mdn_confirm_text = mdn_confirm_text + def __post_init__(self): + """Run the post initialisation checks for this class.""" + # Load the signature and decryption keys + if self.sign_key: + self.sign_key = self.load_key(self.sign_key, self.sign_key_pass) + + if self.decrypt_key: + self.decrypt_key = self.load_key(self.decrypt_key, self.decrypt_key_pass) @staticmethod - def load_key(key_str, key_pass): - """ Function to load password protected key file in p12 or pem format""" + def load_key(key_str: bytes, key_pass: str): + """Function to load password protected key file in p12 or pem format.""" try: # First try to parse as a p12 file @@ -115,99 +105,95 @@ def load_key(key_str, key_pass): return key, cert +@dataclass class Partner(object): - """Class represents an AS2 partner and defines the certificates and - settings to be used when sending and receiving messages.""" - - def __init__(self, as2_name, verify_cert=None, verify_cert_ca=None, - encrypt_cert=None, encrypt_cert_ca=None, validate_certs=True, - compress=False, sign=False, digest_alg='sha256', encrypt=False, - enc_alg='tripledes_192_cbc', mdn_mode=None, - mdn_digest_alg=None, mdn_confirm_text=MDN_CONFIRM_TEXT): - """ - :param as2_name: The unique AS2 name for this partner. + """ + Class represents an AS2 partner and defines the certificates and + settings to be used when sending and receiving messages. - :param verify_cert: A byte string of the certificate to be used for - verifying signatures of inbound messages and MDNs. + :param as2_name: The unique AS2 name for this partner. - :param verify_cert_ca: A byte string of the ca certificate if any of - the verification cert + :param verify_cert: A byte string of the certificate to be used for + verifying signatures of inbound messages and MDNs. - :param encrypt_cert: A byte string of the certificate to be used for - encrypting outbound message. + :param verify_cert_ca: A byte string of the ca certificate if any of + the verification cert - :param encrypt_cert_ca: A byte string of the ca certificate if any of - the encryption cert + :param encrypt_cert: A byte string of the certificate to be used for + encrypting outbound message. - :param validate_certs: Set this flag to `False` to disable validations of - the encryption and verification certificates. (default `True`) + :param encrypt_cert_ca: A byte string of the ca certificate if any of + the encryption cert - :param compress: Set this flag to `True` to compress outgoing - messages. (default `False`) + :param validate_certs: Set this flag to `False` to disable validations of + the encryption and verification certificates. (default `True`) - :param sign: Set this flag to `True` to sign outgoing - messages. (default `False`) + :param compress: Set this flag to `True` to compress outgoing + messages. (default `False`) - :param digest_alg: The digest algorithm to be used for generating the - signature. (default "sha256") + :param sign: Set this flag to `True` to sign outgoing + messages. (default `False`) - :param encrypt: Set this flag to `True` to encrypt outgoing - messages. (default `False`) + :param digest_alg: The digest algorithm to be used for generating the + signature. (default "sha256") - :param enc_alg: - The encryption algorithm to be used. (default `"tripledes_192_cbc"`) + :param encrypt: Set this flag to `True` to encrypt outgoing + messages. (default `False`) - :param mdn_mode: The mode to be used for receiving the MDN. - Set to `None` for no MDN, `'SYNC'` for synchronous and `'ASYNC'` - for asynchronous. (default `None`) + :param enc_alg: + The encryption algorithm to be used. (default `"tripledes_192_cbc"`) - :param mdn_digest_alg: The digest algorithm to be used by the receiver - for signing the MDN. Use `None` for unsigned MDN. (default `None`) + :param mdn_mode: The mode to be used for receiving the MDN. + Set to `None` for no MDN, `'SYNC'` for synchronous and `'ASYNC'` + for asynchronous. (default `None`) - :param mdn_confirm_text: The text to be used in the MDN for successfully - processed messages received from this partner. + :param mdn_digest_alg: The digest algorithm to be used by the receiver + for signing the MDN. Use `None` for unsigned MDN. (default `None`) - """ + :param mdn_confirm_text: The text to be used in the MDN for successfully + processed messages received from this partner. + + """ + + as2_name: str + verify_cert: bytes = None + verify_cert_ca: bytes = None + encrypt_cert: bytes = None + encrypt_cert_ca: bytes = None + validate_certs: bool = True + compress: bool = False + encrypt: bool = False + enc_alg: str = 'tripledes_192_cbc' + sign: bool = False + digest_alg: str = 'sha256' + mdn_mode: str = None + mdn_digest_alg: str = None + mdn_confirm_text: str = MDN_CONFIRM_TEXT + ignore_self_signed: bool = True + + def __post_init__(self): + """Run the post initialisation checks for this class.""" # Validations - if digest_alg and digest_alg not in DIGEST_ALGORITHMS: + if self.digest_alg and self.digest_alg not in DIGEST_ALGORITHMS: raise ImproperlyConfigured( - 'Unsupported Digest Algorithm {}, must be ' - 'one of {}'.format(digest_alg, DIGEST_ALGORITHMS)) + f'Unsupported Digest Algorithm {self.digest_alg}, must be ' + f'one of {DIGEST_ALGORITHMS}') - if enc_alg and enc_alg not in ENCRYPTION_ALGORITHMS: + if self.enc_alg and self.enc_alg not in ENCRYPTION_ALGORITHMS: raise ImproperlyConfigured( - 'Unsupported Encryption Algorithm {}, must be ' - 'one of {}'.format(enc_alg, ENCRYPTION_ALGORITHMS)) + f'Unsupported Encryption Algorithm {self.enc_alg}, must be ' + f'one of {ENCRYPTION_ALGORITHMS}') - if mdn_mode and mdn_mode not in MDN_MODES: + if self.mdn_mode and self.mdn_mode not in MDN_MODES: raise ImproperlyConfigured( - 'Unsupported MDN Mode {}, must be ' - 'one of {}'.format(digest_alg, MDN_MODES)) - - # if mdn_mode == 'ASYNC' and not mdn_url: - # raise ImproperlyConfigured( - # 'mdn_url is mandatory when mdn_mode is set to ASYNC ') + f'Unsupported MDN Mode {self.mdn_mode}, must be ' + f'one of {MDN_MODES}') - if mdn_digest_alg and mdn_digest_alg not in DIGEST_ALGORITHMS: + if self.mdn_digest_alg and self.mdn_digest_alg not in DIGEST_ALGORITHMS: raise ImproperlyConfigured( - 'Unsupported MDN Digest Algorithm {}, must be ' - 'one of {}'.format(mdn_digest_alg, DIGEST_ALGORITHMS)) - - self.as2_name = as2_name - self.compress = compress - self.sign = sign - self.digest_alg = digest_alg - self.encrypt = encrypt - self.enc_alg = enc_alg - self.mdn_mode = mdn_mode - self.mdn_digest_alg = mdn_digest_alg - self.mdn_confirm_text = mdn_confirm_text - self.verify_cert = verify_cert - self.verify_cert_ca = verify_cert_ca - self.encrypt_cert = encrypt_cert - self.encrypt_cert_ca = encrypt_cert_ca - self.validate_certs = validate_certs + f'Unsupported MDN Digest Algorithm {self.mdn_digest_alg}, ' + f'must be one of {DIGEST_ALGORITHMS}') def load_verify_cert(self): if self.validate_certs: @@ -222,7 +208,7 @@ def load_verify_cert(self): # Verify the certificate against the trusted roots verify_certificate_chain( - cert, trust_roots, ignore_self_signed=IGNORE_SELF_SIGNED_CERTS) + cert, trust_roots, ignore_self_signed=self.ignore_self_signed) return asymmetric.load_certificate(self.verify_cert) @@ -239,7 +225,7 @@ def load_encrypt_cert(self): # Verify the certificate against the trusted roots verify_certificate_chain( - cert, trust_roots, ignore_self_signed=IGNORE_SELF_SIGNED_CERTS) + cert, trust_roots, ignore_self_signed=self.ignore_self_signed) return asymmetric.load_certificate(self.encrypt_cert) @@ -273,20 +259,17 @@ def __init__(self, sender=None, receiver=None): @property def content(self): """Function returns the body of the as2 payload as a bytes object""" - - if not self.payload: + if self.payload is None: return '' if self.payload.is_multipart(): - message_bytes = mime_to_bytes(self.payload, 0) + message_bytes = mime_to_bytes(self.payload) boundary = b'--' + self.payload.get_boundary().encode('utf-8') temp = message_bytes.split(boundary) temp.pop(0) return boundary + boundary.join(temp) else: content = self.payload.get_payload(decode=True) - if isinstance(content, str): - content = content.encode('utf-8') return content @property @@ -301,7 +284,7 @@ def headers_str(self): message_header = '' if self.payload: for k, v in self.headers.items(): - message_header += '{}: {}\r\n'.format(k, v) + message_header += f'{k}: {v}\r\n' return message_header.encode('utf-8') def build(self, data, filename=None, subject='AS2 Message', @@ -334,21 +317,18 @@ def build(self, data, filename=None, subject='AS2 Message', """ # Validations - assert type(data) is bytes, \ - 'Parameter data must be of bytes type.' + assert type(data) is bytes, 'Parameter data must be of bytes type.' additional_headers = additional_headers if additional_headers else {} assert type(additional_headers) is dict if self.receiver.sign and not self.sender.sign_key: raise ImproperlyConfigured( - 'Signing of messages is enabled but sign key is not set ' - 'for the sender.') + 'Signing of messages is enabled but sign key is not set for the sender.') if self.receiver.encrypt and not self.receiver.encrypt_cert: raise ImproperlyConfigured( - 'Encryption of messages is enabled but encrypt key is not set ' - 'for the receiver.') + 'Encryption of messages is enabled but encrypt key is not set for the receiver.') # Generate message id using UUID 1 as it uses both hostname and time self.message_id = email_utils.make_msgid().lstrip('<').rstrip('>') @@ -357,12 +337,11 @@ def build(self, data, filename=None, subject='AS2 Message', as2_headers = { 'AS2-Version': AS2_VERSION, 'ediint-features': EDIINT_FEATURES, - 'Message-ID': '<{}>'.format(self.message_id), + 'Message-ID': f'<{self.message_id}>', 'AS2-From': quote_as2name(self.sender.as2_name), 'AS2-To': quote_as2name(self.receiver.as2_name), 'Subject': subject, - 'Date': email_utils.formatdate(localtime=True), - # 'recipient-address': message.partner.target_url, + 'Date': email_utils.formatdate(localtime=True) } as2_headers.update(additional_headers) @@ -374,8 +353,7 @@ def build(self, data, filename=None, subject='AS2 Message', encoders.encode_7or8bit(self.payload) if filename: - self.payload.add_header( - 'Content-Disposition', 'attachment', filename=filename) + self.payload.add_header('Content-Disposition', 'attachment', filename=filename) del self.payload['MIME-Version'] if self.receiver.compress: @@ -384,22 +362,17 @@ def build(self, data, filename=None, subject='AS2 Message', compressed_message.set_type('application/pkcs7-mime') compressed_message.set_param('name', 'smime.p7z') compressed_message.set_param('smime-type', 'compressed-data') - compressed_message.add_header( - 'Content-Disposition', 'attachment', filename='smime.p7z') - compressed_message.add_header( - 'Content-Transfer-Encoding', 'binary') - compressed_message.set_payload( - compress_message(mime_to_bytes(self.payload, 0))) - + compressed_message.add_header('Content-Disposition', 'attachment', filename='smime.p7z') + compressed_message.add_header('Content-Transfer-Encoding', 'binary') + compressed_message.set_payload(compress_message(mime_to_bytes(self.payload))) self.payload = compressed_message - logger.debug('Compressed message %s payload as:\n%s' % ( - self.message_id, self.payload.as_string())) + logger.debug( + f'Compressed message {self.message_id} payload as:\n{self.payload.as_string()}') if self.receiver.sign: self.signed, self.digest_alg = True, self.receiver.digest_alg - signed_message = MIMEMultipart( - 'signed', protocol="application/pkcs7-signature") + signed_message = MIMEMultipart('signed', protocol="application/pkcs7-signature") del signed_message['MIME-Version'] signed_message.attach(self.payload) @@ -414,19 +387,18 @@ def build(self, data, filename=None, subject='AS2 Message', signature.set_type('application/pkcs7-signature') signature.set_param('name', 'smime.p7s') signature.set_param('smime-type', 'signed-data') - signature.add_header( - 'Content-Disposition', 'attachment', filename='smime.p7s') + signature.add_header('Content-Disposition', 'attachment', filename='smime.p7s') del signature['MIME-Version'] - signature.set_payload(sign_message( - mic_content, self.digest_alg, self.sender.sign_key)) + signature_data = sign_message(mic_content, self.digest_alg, self.sender.sign_key) + signature.set_payload(signature_data) encoders.encode_base64(signature) signed_message.set_param('micalg', self.digest_alg) signed_message.attach(signature) self.payload = signed_message - logger.debug('Signed message %s payload as:\n%s' % ( - self.message_id, mime_to_bytes(self.payload, 0))) + logger.debug( + f'Signed message {self.message_id} payload as:\n{mime_to_bytes(self.payload)}') if self.receiver.encrypt: self.encrypted, self.enc_alg = True, self.receiver.enc_alg @@ -434,29 +406,27 @@ def build(self, data, filename=None, subject='AS2 Message', encrypted_message.set_type('application/pkcs7-mime') encrypted_message.set_param('name', 'smime.p7m') encrypted_message.set_param('smime-type', 'enveloped-data') - encrypted_message.add_header( - 'Content-Disposition', 'attachment', filename='smime.p7m') + encrypted_message.add_header('Content-Disposition', 'attachment', filename='smime.p7m') encrypted_message.add_header('Content-Transfer-Encoding', 'binary') encrypt_cert = self.receiver.load_encrypt_cert() - encrypted_message.set_payload(encrypt_message( - mime_to_bytes(self.payload, 0), self.enc_alg, encrypt_cert)) + encrypted_data = encrypt_message( + mime_to_bytes(self.payload), self.enc_alg, encrypt_cert) + encrypted_message.set_payload(encrypted_data) self.payload = encrypted_message - logger.debug('Encrypted message %s payload as:\n%s' % ( - self.message_id, self.payload.as_string())) + logger.debug( + f'Encrypted message {self.message_id} payload as:\n{self.payload.as_string()}') if self.receiver.mdn_mode: as2_headers['disposition-notification-to'] = disposition_notification_to if self.receiver.mdn_digest_alg: as2_headers['disposition-notification-options'] = \ - 'signed-receipt-protocol=required, pkcs7-signature; ' \ - 'signed-receipt-micalg=optional, {}'.format( - self.receiver.mdn_digest_alg) + f'signed-receipt-protocol=required, pkcs7-signature; ' \ + f'signed-receipt-micalg=optional, {self.receiver.mdn_digest_alg}' if self.receiver.mdn_mode == 'ASYNC': if not self.sender.mdn_url: raise ImproperlyConfigured( - 'MDN URL must be set in the organization when MDN mode ' - 'is set to ASYNC') + 'MDN URL must be set in the organization when MDN mode is set to ASYNC') as2_headers['receipt-delivery-option'] = self.sender.mdn_url # Update the headers of the final payload and set its boundary @@ -524,31 +494,27 @@ def parse(self, raw_content, find_org_cb, find_partner_cb, org_id = unquote_as2name(as2_headers['as2-to']) self.receiver = find_org_cb(org_id) if not self.receiver: - raise PartnerNotFound( - 'Unknown AS2 organization with id {}'.format(org_id)) + raise PartnerNotFound(f'Unknown AS2 organization with id {org_id}') partner_id = unquote_as2name(as2_headers['as2-from']) self.sender = find_partner_cb(partner_id) if not self.sender: - raise PartnerNotFound( - 'Unknown AS2 partner with id {}'.format(partner_id)) + raise PartnerNotFound(f'Unknown AS2 partner with id {partner_id}') - if find_message_cb and \ - find_message_cb(self.message_id, partner_id): + if find_message_cb and find_message_cb(self.message_id, partner_id): raise DuplicateDocument( - 'Duplicate message received, message with this ID ' - 'already processed.') + 'Duplicate message received, message with this ID already processed.') if self.sender.encrypt and \ self.payload.get_content_type() != 'application/pkcs7-mime': raise InsufficientSecurityError( - 'Incoming messages from partner {} are must be encrypted' - ' but encrypted message not found.'.format(partner_id)) + f'Incoming messages from partner {partner_id} are must be encrypted ' + f'but encrypted message not found.') if self.payload.get_content_type() == 'application/pkcs7-mime' \ and self.payload.get_param('smime-type') == 'enveloped-data': - logger.debug('Decrypting message %s payload :\n%s' % ( - self.message_id, self.payload.as_string())) + logger.debug( + f'Decrypting message {self.message_id} payload :\n{self.payload.as_string()}') self.encrypted = True encrypted_data = self.payload.get_payload(decode=True) @@ -565,21 +531,20 @@ def parse(self, raw_content, find_org_cb, find_partner_cb, # Check for compressed data here self.compressed, self.payload = self._decompress_data(self.payload) - if self.sender.sign and \ - self.payload.get_content_type() != 'multipart/signed': + if self.sender.sign and self.payload.get_content_type() != 'multipart/signed': raise InsufficientSecurityError( - 'Incoming messages from partner {} are must be signed ' - 'but signed message not found.'.format(partner_id)) + f'Incoming messages from partner {partner_id} are must be signed ' + f'but signed message not found.') if self.payload.get_content_type() == 'multipart/signed': - logger.debug('Verifying signed message %s payload: \n%s' % ( - self.message_id, self.payload.as_string())) + logger.debug( + f'Verifying signed message {self.message_id} ' + f'payload: \n{self.payload.as_string()}') self.signed = True # Split the message into signature and signed message signature = None - signature_types = ['application/pkcs7-signature', - 'application/x-pkcs7-signature'] + signature_types = ['application/pkcs7-signature', 'application/x-pkcs7-signature'] for part in self.payload.walk(): if part.get_content_type() in signature_types: signature = part.get_payload(decode=True) @@ -604,11 +569,9 @@ def parse(self, raw_content, find_org_cb, find_partner_cb, except Exception as e: status = getattr(e, 'disposition_type', 'processed/Error') - detailed_status = getattr( - e, 'disposition_modifier', 'unexpected-processing-error') + detailed_status = getattr(e, 'disposition_modifier', 'unexpected-processing-error') exception = (e, traceback.format_exc()) - logger.error('Failed to parse AS2 message\n: %s' % - traceback.format_exc()) + logger.error(f'Failed to parse AS2 message\n: {traceback.format_exc()}') finally: # Update the payload headers with the original headers @@ -626,12 +589,9 @@ def parse(self, raw_content, find_org_cb, find_partner_cb, digest_alg = as2_headers.get('disposition-notification-options') if digest_alg: - digest_alg = digest_alg.split(';')[-1].\ - split(',')[-1].strip() - mdn = Mdn( - mdn_mode=mdn_mode, mdn_url=mdn_url, digest_alg=digest_alg) - mdn.build( - message=self, status=status, detailed_status=detailed_status) + digest_alg = digest_alg.split(';')[-1].split(',')[-1].strip() + mdn = Mdn(mdn_mode=mdn_mode, mdn_url=mdn_url, digest_alg=digest_alg) + mdn.build(message=self, status=status, detailed_status=detailed_status) return status, exception, mdn @@ -653,9 +613,8 @@ def __init__(self, mdn_mode=None, digest_alg=None, mdn_url=None): def content(self): """Function returns the body of the mdn message as a byte string""" - if self.payload: - message_bytes = mime_to_bytes( - self.payload, 0).replace(b'\n', b'\r\n') + if self.payload is not None: + message_bytes = mime_to_bytes(self.payload) boundary = b'--' + self.payload.get_boundary().encode('utf-8') temp = message_bytes.split(boundary) temp.pop(0) @@ -675,20 +634,23 @@ def headers_str(self): message_header = '' if self.payload: for k, v in self.headers.items(): - message_header += '{}: {}\r\n'.format(k, v) + message_header += f'{k}: {v}\r\n' return message_header.encode('utf-8') - def build(self, message, status, detailed_status=None): + def build(self, message, status, detailed_status=None, confirmation_text=MDN_CONFIRM_TEXT, + failed_text=MDN_FAILED_TEXT): """Function builds and signs an AS2 MDN message. :param message: The received AS2 message for which this is an MDN. :param status: The status of processing of the received AS2 message. - :param detailed_status: - The optional detailed status of processing of the received AS2 - message. Used to give additional error info (default "None") + :param detailed_status: The optional detailed status of processing of the received AS2 + message. Used to give additional error info (default "None") + + :param confirmation_text: The confirmation message sent in the first part of the MDN. + :param failed_text: The failure message sent in the first part of the failed MDN. """ # Generate message id using UUID 1 as it uses both hostname and time @@ -699,7 +661,7 @@ def build(self, message, status, detailed_status=None): mdn_headers = { 'AS2-Version': AS2_VERSION, 'ediint-features': EDIINT_FEATURES, - 'Message-ID': '<{}>'.format(self.message_id), + 'Message-ID': f'<{self.message_id}>', 'AS2-From': quote_as2name(message.headers.get('as2-to')), 'AS2-To': quote_as2name(message.headers.get('as2-from')), 'Date': email_utils.formatdate(localtime=True), @@ -707,8 +669,6 @@ def build(self, message, status, detailed_status=None): } # Set the confirmation text message here - confirmation_text = MDN_CONFIRM_TEXT - # overwrite with organization specific message if message.receiver and message.receiver.mdn_confirm_text: confirmation_text = message.receiver.mdn_confirm_text @@ -718,14 +678,13 @@ def build(self, message, status, detailed_status=None): confirmation_text = message.sender.mdn_confirm_text if status != 'processed': - confirmation_text = MDN_FAILED_TEXT + confirmation_text = failed_text - self.payload = MIMEMultipart( - 'report', report_type='disposition-notification') + self.payload = MIMEMultipart('report', report_type='disposition-notification') # Create and attach the MDN Text Message mdn_text = email_message.Message() - mdn_text.set_payload('%s\n' % confirmation_text) + mdn_text.set_payload(f'{confirmation_text}\r\n') mdn_text.set_type('text/plain') del mdn_text['MIME-Version'] encoders.encode_7or8bit(mdn_text) @@ -734,36 +693,31 @@ def build(self, message, status, detailed_status=None): # Create and attache the MDN Report Message mdn_base = email_message.Message() mdn_base.set_type('message/disposition-notification') - mdn_report = 'Reporting-UA: pyAS2 Open Source AS2 Software\n' - mdn_report += 'Original-Recipient: rfc822; {}\n'.format( - message.headers.get('as2-to')) - mdn_report += 'Final-Recipient: rfc822; {}\n'.format( - message.headers.get('as2-to')) - mdn_report += 'Original-Message-ID: <{}>\n'.format(message.message_id) - mdn_report += 'Disposition: automatic-action/' \ - 'MDN-sent-automatically; {}'.format(status) + mdn_report = 'Reporting-UA: pyAS2 Open Source AS2 Software\r\n' + mdn_report += f'Original-Recipient: rfc822; {message.headers.get("as2-to")}\r\n' + mdn_report += f'Final-Recipient: rfc822; {message.headers.get("as2-to")}\r\n' + mdn_report += f'Original-Message-ID: <{message.message_id}>\r\n' + mdn_report += f'Disposition: automatic-action/MDN-sent-automatically; {status}' if detailed_status: - mdn_report += ': {}'.format(detailed_status) - mdn_report += '\n' + mdn_report += f': {detailed_status}' + mdn_report += '\r\n' if message.mic: - mdn_report += 'Received-content-MIC: {}, {}\n'.format( - message.mic.decode(), message.digest_alg) + mdn_report += f'Received-content-MIC: {message.mic.decode()}, {message.digest_alg}\r\n' mdn_base.set_payload(mdn_report) del mdn_base['MIME-Version'] encoders.encode_7or8bit(mdn_base) self.payload.attach(mdn_base) - logger.debug('MDN for message %s created:\n%s' % ( - message.message_id, mdn_base.as_string())) + logger.debug( + f'MDN for message {message.message_id} created:\n{mdn_base.as_string()}') # Sign the MDN if it is requested by the sender if message.headers.get('disposition-notification-options') and \ message.receiver and message.receiver.sign_key: self.digest_alg = \ - message.headers['disposition-notification-options'].split( - ';')[-1].split(',')[-1].strip().replace('-', '') - signed_mdn = MIMEMultipart( - 'signed', protocol="application/pkcs7-signature") + message.headers['disposition-notification-options'].\ + split(';')[-1].split(',')[-1].strip().replace('-', '') + signed_mdn = MIMEMultipart('signed', protocol="application/pkcs7-signature") del signed_mdn['MIME-Version'] signed_mdn.attach(self.payload) @@ -772,23 +726,21 @@ def build(self, message, status, detailed_status=None): signature.set_type('application/pkcs7-signature') signature.set_param('name', 'smime.p7s') signature.set_param('smime-type', 'signed-data') - signature.add_header( - 'Content-Disposition', 'attachment', filename='smime.p7s') + signature.add_header('Content-Disposition', 'attachment', filename='smime.p7s') del signature['MIME-Version'] - signature.set_payload(sign_message( - canonicalize(self.payload), - self.digest_alg, - message.receiver.sign_key - )) + signed_data = sign_message( + canonicalize(self.payload), self.digest_alg, message.receiver.sign_key + ) + signature.set_payload(signed_data) encoders.encode_base64(signature) signed_mdn.set_param('micalg', self.digest_alg) signed_mdn.attach(signature) self.payload = signed_mdn - logger.debug('Signature for MDN %s created:\n%s' % ( - message.message_id, signature.as_string())) + logger.debug( + f'Signature for MDN {message.message_id} created:\n{signature.as_string()}') # Update the headers of the final payload and set message boundary for k, v in mdn_headers.items(): @@ -796,8 +748,7 @@ def build(self, message, status, detailed_status=None): self.payload.replace_header(k, v) else: self.payload.add_header(k, v) - if self.payload.is_multipart(): - self.payload.set_boundary(make_mime_boundary()) + self.payload.set_boundary(make_mime_boundary()) def parse(self, raw_content, find_message_cb): """Function parses the RAW AS2 MDN, verifies it and extracts the @@ -840,8 +791,7 @@ def parse(self, raw_content, find_message_cb): return status, detailed_status if self.payload.get_content_type() == 'multipart/signed': - message_boundary = ('--' + self.payload.get_boundary()).\ - encode('utf-8') + message_boundary = ('--' + self.payload.get_boundary()).encode('utf-8') # Extract the signature and the signed payload signature = None @@ -858,29 +808,24 @@ def parse(self, raw_content, find_message_cb): mic_content = extract_first_part(raw_content, message_boundary) verify_cert = orig_message.receiver.load_verify_cert() try: - self.digest_alg = verify_message( - mic_content, signature, verify_cert) + self.digest_alg = verify_message(mic_content, signature, verify_cert) except IntegrityError: mic_content = canonicalize(self.payload) - self.digest_alg = verify_message( - mic_content, signature, verify_cert) + self.digest_alg = verify_message(mic_content, signature, verify_cert) for part in self.payload.walk(): if part.get_content_type() == 'message/disposition-notification': - logger.debug('Found MDN report for message %s:\n%s' % ( - orig_message.message_id, part.as_string())) + logger.debug( + f'Found MDN report for message {orig_message.message_id}:\n{part.as_string()}') mdn = part.get_payload()[-1] - mdn_status = mdn['Disposition'].split(';').\ - pop().strip().split(':') + mdn_status = mdn['Disposition'].split(';').pop().strip().split(':') status = mdn_status[0] if status == 'processed': - mdn_mic = mdn.get('Received-Content-MIC', '').\ - split(',')[0] + mdn_mic = mdn.get('Received-Content-MIC', '').split(',')[0] # TODO: Check MIC for all cases - if mdn_mic and orig_message.mic \ - and mdn_mic != orig_message.mic.decode(): + if mdn_mic and orig_message.mic and mdn_mic != orig_message.mic.decode(): status = 'processed/warning' detailed_status = 'Message Integrity check failed.' else: diff --git a/pyas2lib/cms.py b/pyas2lib/cms.py index b40df8b..0c37763 100644 --- a/pyas2lib/cms.py +++ b/pyas2lib/cms.py @@ -6,23 +6,7 @@ from oscrypto import asymmetric, symmetric, util from pyas2lib.exceptions import * - -DIGEST_ALGORITHMS = ( - 'md5', - 'sha1', - 'sha224', - 'sha256', - 'sha384', - 'sha512' -) -ENCRYPTION_ALGORITHMS = ( - 'tripledes_192_cbc', - 'rc2_128_cbc', - 'rc4_128_cbc' - 'aes_128_cbc', - 'aes_192_cbc', - 'aes_256_cbc', -) +from pyas2lib.constants import DIGEST_ALGORITHMS def compress_message(data_to_compress): @@ -67,8 +51,7 @@ def decompress_message(compressed_data): raise DecompressionError('Compressed data not found in ASN.1 ') except Exception as e: - raise DecompressionError( - 'Decompression failed with cause: {}'.format(e)) + raise DecompressionError('Decompression failed with cause: {}'.format(e)) def encrypt_message(data_to_encrypt, enc_alg, encryption_cert): @@ -85,39 +68,46 @@ def encrypt_message(data_to_encrypt, enc_alg, encryption_cert): enc_alg_list = enc_alg.split('_') cipher, key_length, mode = enc_alg_list[0], enc_alg_list[1], enc_alg_list[2] - algorithm_id, iv, encrypted_content = None, None, None + enc_alg_asn1, encrypted_content = None, None # Generate the symmetric encryption key and encrypt the message key = util.rand_bytes(int(key_length) // 8) if cipher == 'tripledes': algorithm_id = '1.2.840.113549.3.7' - iv, encrypted_content = symmetric.tripledes_cbc_pkcs5_encrypt( - key, data_to_encrypt, None) + iv, encrypted_content = symmetric.tripledes_cbc_pkcs5_encrypt(key, data_to_encrypt, None) + enc_alg_asn1 = algos.EncryptionAlgorithm({ + 'algorithm': algorithm_id, + 'parameters': cms.OctetString(iv) + }) elif cipher == 'rc2': algorithm_id = '1.2.840.113549.3.2' - iv, encrypted_content = symmetric.rc2_cbc_pkcs5_encrypt( - key, data_to_encrypt, None) + iv, encrypted_content = symmetric.rc2_cbc_pkcs5_encrypt(key, data_to_encrypt, None) + enc_alg_asn1 = algos.EncryptionAlgorithm({ + 'algorithm': algorithm_id, + 'parameters': algos.Rc2Params({'iv': cms.OctetString(iv)}) + }) elif cipher == 'rc4': algorithm_id = '1.2.840.113549.3.4' encrypted_content = symmetric.rc4_encrypt(key, data_to_encrypt) + enc_alg_asn1 = algos.EncryptionAlgorithm({ + 'algorithm': algorithm_id, + }) elif cipher == 'aes': if key_length == '128': algorithm_id = '2.16.840.1.101.3.4.1.2' elif key_length == '192': algorithm_id = '2.16.840.1.101.3.4.1.22' - elif key_length == '256': + else: algorithm_id = '2.16.840.1.101.3.4.1.42' - iv, encrypted_content = symmetric.aes_cbc_pkcs7_encrypt( - key, data_to_encrypt, None) - - enc_alg_asn1 = algos.EncryptionAlgorithm({ - 'algorithm': algorithm_id, - 'parameters': cms.OctetString(iv) - }) + iv, encrypted_content = symmetric.aes_cbc_pkcs7_encrypt(key, data_to_encrypt, None) + enc_alg_asn1 = algos.EncryptionAlgorithm({ + 'algorithm': algorithm_id, + 'parameters': cms.OctetString(iv) + }) # Encrypt the key and build the ASN.1 message encrypted_key = asymmetric.rsa_pkcs1v15_encrypt(encryption_cert, key) @@ -175,36 +165,36 @@ def decrypt_message(encrypted_data, decryption_key): if key_enc_alg == 'rsa': try: - key = asymmetric.rsa_pkcs1v15_decrypt( - decryption_key[0], encrypted_key) - except Exception as e: - raise DecryptionError('Failed to decrypt the payload: ' - 'Could not extract decryption key.') - alg = cms_content['content']['encrypted_content_info'][ - 'content_encryption_algorithm'] + key = asymmetric.rsa_pkcs1v15_decrypt(decryption_key[0], encrypted_key) + except Exception: + raise DecryptionError( + 'Failed to decrypt the payload: Could not extract decryption key.') + alg = cms_content['content']['encrypted_content_info']['content_encryption_algorithm'] encapsulated_data = cms_content['content'][ 'encrypted_content_info']['encrypted_content'].native try: - if alg.encryption_cipher == 'tripledes': + if alg['algorithm'].native == '1.2.840.113549.3.4': # This is RC4 + decrypted_content = symmetric.rc4_decrypt(key, encapsulated_data) + elif alg.encryption_cipher == 'tripledes': cipher = 'tripledes_192_cbc' decrypted_content = symmetric.tripledes_cbc_pkcs5_decrypt( key, encapsulated_data, alg.encryption_iv) elif alg.encryption_cipher == 'aes': decrypted_content = symmetric.aes_cbc_pkcs7_decrypt( key, encapsulated_data, alg.encryption_iv) - elif alg.encryption_cipher == 'rc4': - decrypted_content = symmetric.rc2_cbc_pkcs5_decrypt( - key, encapsulated_data, alg.encryption_iv) elif alg.encryption_cipher == 'rc2': - decrypted_content = symmetric.rc2_cbc_pkcs5_encrypt( - key, encapsulated_data, alg.encryption_iv) + decrypted_content = symmetric.rc2_cbc_pkcs5_decrypt( + key, encapsulated_data, alg['parameters']['iv'].native) else: raise AS2Exception('Unsupported Encryption Algorithm') except Exception as e: - raise DecryptionError( - 'Failed to decrypt the payload: {}'.format(e)) + raise DecryptionError('Failed to decrypt the payload: {}'.format(e)) + else: + raise AS2Exception('Unsupported Encryption Algorithm') + else: + raise DecryptionError('Encrypted data not found in ASN.1 ') return cipher, decrypted_content @@ -288,12 +278,10 @@ class SmimeCapabilities(core.Sequence): ]) }), ]) - signature = asymmetric.rsa_pkcs1v15_sign( - sign_key[0], signed_attributes.dump(), digest_alg) + signature = asymmetric.rsa_pkcs1v15_sign(sign_key[0], signed_attributes.dump(), digest_alg) else: signed_attributes = None - signature = asymmetric.rsa_pkcs1v15_sign( - sign_key[0], data_to_sign, digest_alg) + signature = asymmetric.rsa_pkcs1v15_sign(sign_key[0], data_to_sign, digest_alg) return cms.ContentInfo({ 'content_type': cms.ContentType('signed_data'), @@ -380,22 +368,22 @@ def verify_message(data_to_verify, signature, verify_cert): digest_func.update(data_to_verify) calc_message_digest = digest_func.digest() if message_digest != calc_message_digest: - raise IntegrityError('Failed to verify message signature: ' - 'Message Digest does not match.') + raise IntegrityError( + 'Failed to verify message signature: Message Digest does not match.') signed_data = signed_attributes.untag().dump() try: if sig_alg == 'rsassa_pkcs1v15': - asymmetric.rsa_pkcs1v15_verify( - verify_cert, sig, signed_data, digest_alg) + asymmetric.rsa_pkcs1v15_verify(verify_cert, sig, signed_data, digest_alg) elif sig_alg == 'rsassa_pss': - asymmetric.rsa_pss_verify( - verify_cert, sig, signed_data, digest_alg) + asymmetric.rsa_pss_verify(verify_cert, sig, signed_data, digest_alg) else: raise AS2Exception('Unsupported Signature Algorithm') except Exception as e: raise IntegrityError( 'Failed to verify message signature: {}'.format(e)) + else: + raise IntegrityError('Signed data not found in ASN.1 ') return digest_alg diff --git a/pyas2lib/constants.py b/pyas2lib/constants.py new file mode 100644 index 0000000..0c2edd4 --- /dev/null +++ b/pyas2lib/constants.py @@ -0,0 +1,36 @@ +"""Module for defining the constants used by pyas2lib""" + +AS2_VERSION = '1.2' + +EDIINT_FEATURES = 'CMS' + +SYNCHRONOUS_MDN = 'SYNC' +ASYNCHRONOUS_MDN = 'ASYNC' + +MDN_MODES = ( + SYNCHRONOUS_MDN, + ASYNCHRONOUS_MDN +) + +MDN_CONFIRM_TEXT = 'The AS2 message has been successfully processed. ' \ + 'Thank you for exchanging AS2 messages with pyAS2.' + +MDN_FAILED_TEXT = 'The AS2 message could not be processed. The ' \ + 'disposition-notification report has additional details.' + +DIGEST_ALGORITHMS = ( + 'md5', + 'sha1', + 'sha224', + 'sha256', + 'sha384', + 'sha512' +) +ENCRYPTION_ALGORITHMS = ( + 'tripledes_192_cbc', + 'rc2_128_cbc', + 'rc4_128_cbc', + 'aes_128_cbc', + 'aes_192_cbc', + 'aes_256_cbc', +) diff --git a/pyas2lib/tests/__init__.py b/pyas2lib/tests/__init__.py new file mode 100644 index 0000000..73e66d8 --- /dev/null +++ b/pyas2lib/tests/__init__.py @@ -0,0 +1,30 @@ +import unittest +import os + +TEST_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'fixtures') + + +class Pyas2TestCase(unittest.TestCase): + + @classmethod + def setUpClass(cls): + """Perform the setup actions for the test case.""" + file_list = { + 'test_data': 'payload.txt', + 'test_data_dos': 'payload_dos.txt', + 'private_key': 'cert_test.p12', + 'public_key': 'cert_test_public.pem', + 'mecas2_public_key': 'cert_mecas2_public.pem', + 'oldpyas2_public_key': 'cert_oldpyas2_public.pem', + 'oldpyas2_private_key': 'cert_oldpyas2_private.pem', + 'sb2bi_public_key': 'cert_sb2bi_public.pem', + 'sb2bi_public_ca': 'cert_sb2bi_public.ca', + 'private_cer': 'cert_extract_private.cer', + 'private_pem': 'cert_extract_private.pem', + + } + + # Load the files to the attrs + for attr, filename in file_list.items(): + with open(os.path.join(TEST_DIR, filename), 'rb') as fp: + setattr(cls, attr, fp.read()) diff --git a/tests/fixtures/cert_extract_private.cer b/pyas2lib/tests/fixtures/cert_extract_private.cer similarity index 100% rename from tests/fixtures/cert_extract_private.cer rename to pyas2lib/tests/fixtures/cert_extract_private.cer diff --git a/tests/fixtures/cert_extract_private.pem b/pyas2lib/tests/fixtures/cert_extract_private.pem similarity index 100% rename from tests/fixtures/cert_extract_private.pem rename to pyas2lib/tests/fixtures/cert_extract_private.pem diff --git a/tests/fixtures/cert_extract_public.cer b/pyas2lib/tests/fixtures/cert_extract_public.cer similarity index 100% rename from tests/fixtures/cert_extract_public.cer rename to pyas2lib/tests/fixtures/cert_extract_public.cer diff --git a/tests/fixtures/cert_extract_public.pem b/pyas2lib/tests/fixtures/cert_extract_public.pem similarity index 100% rename from tests/fixtures/cert_extract_public.pem rename to pyas2lib/tests/fixtures/cert_extract_public.pem diff --git a/tests/fixtures/cert_mecas2_public.pem b/pyas2lib/tests/fixtures/cert_mecas2_public.pem similarity index 100% rename from tests/fixtures/cert_mecas2_public.pem rename to pyas2lib/tests/fixtures/cert_mecas2_public.pem diff --git a/tests/fixtures/cert_oldpyas2_private.pem b/pyas2lib/tests/fixtures/cert_oldpyas2_private.pem similarity index 100% rename from tests/fixtures/cert_oldpyas2_private.pem rename to pyas2lib/tests/fixtures/cert_oldpyas2_private.pem diff --git a/tests/fixtures/cert_oldpyas2_public.pem b/pyas2lib/tests/fixtures/cert_oldpyas2_public.pem similarity index 100% rename from tests/fixtures/cert_oldpyas2_public.pem rename to pyas2lib/tests/fixtures/cert_oldpyas2_public.pem diff --git a/tests/fixtures/cert_sb2bi_public.ca b/pyas2lib/tests/fixtures/cert_sb2bi_public.ca similarity index 100% rename from tests/fixtures/cert_sb2bi_public.ca rename to pyas2lib/tests/fixtures/cert_sb2bi_public.ca diff --git a/tests/fixtures/cert_sb2bi_public.pem b/pyas2lib/tests/fixtures/cert_sb2bi_public.pem similarity index 100% rename from tests/fixtures/cert_sb2bi_public.pem rename to pyas2lib/tests/fixtures/cert_sb2bi_public.pem diff --git a/tests/fixtures/cert_test.p12 b/pyas2lib/tests/fixtures/cert_test.p12 similarity index 100% rename from tests/fixtures/cert_test.p12 rename to pyas2lib/tests/fixtures/cert_test.p12 diff --git a/tests/fixtures/cert_test.pem b/pyas2lib/tests/fixtures/cert_test.pem similarity index 100% rename from tests/fixtures/cert_test.pem rename to pyas2lib/tests/fixtures/cert_test.pem diff --git a/tests/fixtures/cert_test_public.pem b/pyas2lib/tests/fixtures/cert_test_public.pem similarity index 100% rename from tests/fixtures/cert_test_public.pem rename to pyas2lib/tests/fixtures/cert_test_public.pem diff --git a/tests/fixtures/mecas2_compressed.as2 b/pyas2lib/tests/fixtures/mecas2_compressed.as2 similarity index 100% rename from tests/fixtures/mecas2_compressed.as2 rename to pyas2lib/tests/fixtures/mecas2_compressed.as2 diff --git a/tests/fixtures/mecas2_compressed_signed_encrypted.as2 b/pyas2lib/tests/fixtures/mecas2_compressed_signed_encrypted.as2 similarity index 100% rename from tests/fixtures/mecas2_compressed_signed_encrypted.as2 rename to pyas2lib/tests/fixtures/mecas2_compressed_signed_encrypted.as2 diff --git a/tests/fixtures/mecas2_encrypted.as2 b/pyas2lib/tests/fixtures/mecas2_encrypted.as2 similarity index 100% rename from tests/fixtures/mecas2_encrypted.as2 rename to pyas2lib/tests/fixtures/mecas2_encrypted.as2 diff --git a/tests/fixtures/mecas2_signed.as2 b/pyas2lib/tests/fixtures/mecas2_signed.as2 similarity index 100% rename from tests/fixtures/mecas2_signed.as2 rename to pyas2lib/tests/fixtures/mecas2_signed.as2 diff --git a/tests/fixtures/mecas2_signed.mdn b/pyas2lib/tests/fixtures/mecas2_signed.mdn similarity index 100% rename from tests/fixtures/mecas2_signed.mdn rename to pyas2lib/tests/fixtures/mecas2_signed.mdn diff --git a/tests/fixtures/mecas2_signed_encrypted.as2 b/pyas2lib/tests/fixtures/mecas2_signed_encrypted.as2 similarity index 100% rename from tests/fixtures/mecas2_signed_encrypted.as2 rename to pyas2lib/tests/fixtures/mecas2_signed_encrypted.as2 diff --git a/tests/fixtures/mecas2_unsigned.mdn b/pyas2lib/tests/fixtures/mecas2_unsigned.mdn similarity index 100% rename from tests/fixtures/mecas2_unsigned.mdn rename to pyas2lib/tests/fixtures/mecas2_unsigned.mdn diff --git a/tests/fixtures/payload.binary b/pyas2lib/tests/fixtures/payload.binary similarity index 100% rename from tests/fixtures/payload.binary rename to pyas2lib/tests/fixtures/payload.binary diff --git a/tests/fixtures/payload.txt b/pyas2lib/tests/fixtures/payload.txt similarity index 100% rename from tests/fixtures/payload.txt rename to pyas2lib/tests/fixtures/payload.txt diff --git a/pyas2lib/tests/fixtures/payload_dos.txt b/pyas2lib/tests/fixtures/payload_dos.txt new file mode 100644 index 0000000..379bf8e --- /dev/null +++ b/pyas2lib/tests/fixtures/payload_dos.txt @@ -0,0 +1,28 @@ +UNB+UNOA:2+:14+:14+140407:0910+5++++1+EANCOM' +UNH+1+ORDERS:D:96A:UN:EAN008' +BGM+220+1AA1TEST+9' +DTM+137:20140407:102' +DTM+63:20140421:102' +DTM+64:20140414:102' +RFF+ADE:1234' +RFF+PD:1704' +NAD+BY+5450534000024::9' +NAD+SU+::9' +NAD+DP+5450534000109::9+++++++GB' +NAD+IV+5450534000055::9++AMAZON EU SARL:5 RUE PLAETIS LUXEMBOURG+CO PO BOX 4558+SLOUGH++SL1 0TX+GB' +RFF+VA:GB727255821' +CUX+2:EUR:9' +LIN+1++9783898307529:EN' +QTY+21:5' +PRI+AAA:27.5' +LIN+2++390787706322:UP' +QTY+21:1' +PRI+AAA:10.87' +LIN+3' +PIA+5+3899408268X-39:SA' +QTY+21:3' +PRI+AAA:3.85' +UNS+S' +CNT+2:3' +UNT+26+1' +UNZ+1+5' diff --git a/tests/fixtures/sb2bi_signed.mdn b/pyas2lib/tests/fixtures/sb2bi_signed.mdn similarity index 100% rename from tests/fixtures/sb2bi_signed.mdn rename to pyas2lib/tests/fixtures/sb2bi_signed.mdn diff --git a/tests/fixtures/sb2bi_signed_cmp.msg b/pyas2lib/tests/fixtures/sb2bi_signed_cmp.msg similarity index 100% rename from tests/fixtures/sb2bi_signed_cmp.msg rename to pyas2lib/tests/fixtures/sb2bi_signed_cmp.msg diff --git a/tests/fixtures/verify_cert_test1.pem b/pyas2lib/tests/fixtures/verify_cert_test1.pem similarity index 100% rename from tests/fixtures/verify_cert_test1.pem rename to pyas2lib/tests/fixtures/verify_cert_test1.pem diff --git a/tests/fixtures/verify_cert_test2.cer b/pyas2lib/tests/fixtures/verify_cert_test2.cer similarity index 100% rename from tests/fixtures/verify_cert_test2.cer rename to pyas2lib/tests/fixtures/verify_cert_test2.cer diff --git a/tests/fixtures/verify_cert_test3.ca b/pyas2lib/tests/fixtures/verify_cert_test3.ca similarity index 100% rename from tests/fixtures/verify_cert_test3.ca rename to pyas2lib/tests/fixtures/verify_cert_test3.ca diff --git a/tests/fixtures/verify_cert_test3.pem b/pyas2lib/tests/fixtures/verify_cert_test3.pem similarity index 100% rename from tests/fixtures/verify_cert_test3.pem rename to pyas2lib/tests/fixtures/verify_cert_test3.pem diff --git a/tests/livetest_with_mecas2.py b/pyas2lib/tests/livetest_with_mecas2.py similarity index 95% rename from tests/livetest_with_mecas2.py rename to pyas2lib/tests/livetest_with_mecas2.py index f33928b..e415073 100644 --- a/tests/livetest_with_mecas2.py +++ b/pyas2lib/tests/livetest_with_mecas2.py @@ -1,8 +1,11 @@ -from __future__ import unicode_literals, absolute_import, print_function -from . import as2, Pyas2TestCase -import requests +"""Module for testing with a live mecas2 server.""" import os +import requests + +from pyas2lib import as2 +from . import Pyas2TestCase + TEST_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'testdata') @@ -12,9 +15,9 @@ def setUp(self): self.org = as2.Organization( as2_name='pyas2lib', sign_key=self.private_key, - sign_key_pass='test'.encode('utf-8'), + sign_key_pass='test', decrypt_key=self.private_key, - decrypt_key_pass='test'.encode('utf-8') + decrypt_key_pass='test' ) self.partner = as2.Partner( diff --git a/tests/livetest_with_oldpyas2.py b/pyas2lib/tests/livetest_with_oldpyas2.py similarity index 95% rename from tests/livetest_with_oldpyas2.py rename to pyas2lib/tests/livetest_with_oldpyas2.py index 4a9fb64..016e976 100644 --- a/tests/livetest_with_oldpyas2.py +++ b/pyas2lib/tests/livetest_with_oldpyas2.py @@ -1,8 +1,11 @@ -from __future__ import unicode_literals, absolute_import, print_function -from . import as2, Pyas2TestCase -import requests +"""Module for testing with a live old pyas2 server.""" import os +import requests + +from pyas2lib import as2 +from . import Pyas2TestCase + TEST_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'testdata') @@ -12,9 +15,9 @@ def setUp(self): self.org = as2.Organization( as2_name='pyas2lib', sign_key=self.private_key, - sign_key_pass='test'.encode('utf-8'), + sign_key_pass='test', decrypt_key=self.private_key, - decrypt_key_pass='test'.encode('utf-8') + decrypt_key_pass='test' ) self.partner = as2.Partner( diff --git a/tests/test_advanced.py b/pyas2lib/tests/test_advanced.py similarity index 72% rename from tests/test_advanced.py rename to pyas2lib/tests/test_advanced.py index f5fd1ac..fd15f24 100644 --- a/tests/test_advanced.py +++ b/pyas2lib/tests/test_advanced.py @@ -1,8 +1,11 @@ -from __future__ import unicode_literals, absolute_import, print_function -from . import Pyas2TestCase, as2, utils -import os +"""Module for testing the advanced features of pyas2lib.""" import base64 -import datetime +import os +from email import message + +from pyas2lib import as2 +from pyas2lib.exceptions import ImproperlyConfigured +from pyas2lib.tests import Pyas2TestCase, TEST_DIR class TestAdvanced(Pyas2TestCase): @@ -11,9 +14,9 @@ def setUp(self): self.org = as2.Organization( as2_name='some_organization', sign_key=self.private_key, - sign_key_pass='test'.encode('utf-8'), + sign_key_pass='test', decrypt_key=self.private_key, - decrypt_key_pass='test'.encode('utf-8') + decrypt_key_pass='test' ) self.partner = as2.Partner( as2_name='some_partner', @@ -29,7 +32,7 @@ def test_binary_message(self): self.partner.encrypt = True self.partner.compress = True out_message = as2.Message(self.org, self.partner) - test_message_path = os.path.join(self.TEST_DIR, 'payload.binary') + test_message_path = os.path.join(TEST_DIR, 'payload.binary') with open(test_message_path, 'rb') as bin_file: original_message = bin_file.read() out_message.build( @@ -82,6 +85,7 @@ def test_partner_not_found(self): mdn.headers_str + b'\r\n' + mdn.content, find_message_cb=self.find_message ) + self.assertEqual(status, 'processed/Error') self.assertEqual(detailed_status, 'unknown-trading-partner') @@ -167,25 +171,31 @@ def test_insufficient_security(self): self.out_message.build(self.test_data) # Parse the generated AS2 message as the partner - self.partner.sign = True self.partner.encrypt = True raw_out_message = \ self.out_message.headers_str + b'\r\n' + self.out_message.content in_message = as2.Message() - _, _, mdn = in_message.parse( + status, (exc, _), mdn = in_message.parse( raw_out_message, find_org_cb=self.find_org, find_partner_cb=self.find_partner, find_message_cb=lambda x, y: False ) + self.assertEqual(status, 'processed/Error') + self.assertEqual(exc.disposition_modifier, 'insufficient-message-security') - out_mdn = as2.Mdn() - status, detailed_status = out_mdn.parse( - mdn.headers_str + b'\r\n' + mdn.content, - find_message_cb=self.find_message + # Try again for signing check + self.partner.encrypt = False + self.partner.sign = True + in_message = as2.Message() + status, (exc, _), mdn = in_message.parse( + raw_out_message, + find_org_cb=self.find_org, + find_partner_cb=self.find_partner, + find_message_cb=lambda x, y: False ) self.assertEqual(status, 'processed/Error') - self.assertEqual(detailed_status, 'insufficient-message-security') + self.assertEqual(exc.disposition_modifier, 'insufficient-message-security') def test_failed_decryption(self): """ Test case where message decryption has failed """ @@ -251,7 +261,7 @@ def test_verify_certificate(self): """ Test case where we have try to load an expired cert """ # First test with a certificate with invalid root - cert_path = os.path.join(self.TEST_DIR, 'verify_cert_test1.pem') + cert_path = os.path.join(TEST_DIR, 'verify_cert_test1.pem') with open(cert_path, 'rb') as cert_file: try: as2.Partner( @@ -263,7 +273,7 @@ def test_verify_certificate(self): 'unable to get local issuer certificate', str(e)) # Test with an expired certificate - cert_path = os.path.join(self.TEST_DIR, 'verify_cert_test2.cer') + cert_path = os.path.join(TEST_DIR, 'verify_cert_test2.cer') with open(cert_path, 'rb') as cert_file: try: as2.Partner( @@ -275,7 +285,7 @@ def test_verify_certificate(self): 'certificate has expired', str(e)) # Test with a chain certificate - cert_path = os.path.join(self.TEST_DIR, 'verify_cert_test3.pem') + cert_path = os.path.join(TEST_DIR, 'verify_cert_test3.pem') with open(cert_path, 'rb') as cert_file: try: as2.Partner( @@ -287,7 +297,7 @@ def test_verify_certificate(self): 'unable to get local issuer certificate', str(e)) # Test chain certificate with the ca - cert_ca_path = os.path.join(self.TEST_DIR, 'verify_cert_test3.ca') + cert_ca_path = os.path.join(TEST_DIR, 'verify_cert_test3.ca') with open(cert_path, 'rb') as cert_file: with open(cert_ca_path, 'rb') as cert_ca_file: try: @@ -303,60 +313,104 @@ def test_load_private_key(self): """ Test case where we have try to load keys in different formats """ # First test with a pkcs12 key file - cert_path = os.path.join(self.TEST_DIR, 'cert_test.p12') + cert_path = os.path.join(TEST_DIR, 'cert_test.p12') with open(cert_path, 'rb') as cert_file: try: as2.Organization( as2_name='some_org', sign_key=cert_file.read(), - sign_key_pass=b'test' + sign_key_pass='test' ) except as2.AS2Exception as e: self.fail('Failed to load p12 private key: %s' % e) # Now test with a pem encoded key file - cert_path = os.path.join(self.TEST_DIR, 'cert_test.pem') + cert_path = os.path.join(TEST_DIR, 'cert_test.pem') with open(cert_path, 'rb') as cert_file: try: as2.Organization( as2_name='some_org', sign_key=cert_file.read(), - sign_key_pass=b'test' + sign_key_pass='test' ) except as2.AS2Exception as e: self.fail('Failed to load pem private key: %s' % e) - def test_extract_certificate_info(self): - """ Test case that extracts data from private and public certificates - in PEM or DER format""" - - cert_info = { - 'valid_from': datetime.datetime(2019, 6, 3, 11, 32, 57, tzinfo=datetime.timezone.utc), - 'valid_to': datetime.datetime(2029, 5, 31, 11, 32, 57, tzinfo=datetime.timezone.utc), - 'subject': [('C', 'AU'), ('ST', 'Some-State'), - ('O', 'pyas2lib'), ('CN', 'test')], - 'issuer': [('C', 'AU'), ('ST', 'Some-State'), - ('O', 'pyas2lib'), ('CN', 'test')], - 'serial': 13747137503594840569 - } - cert_empty = { - 'valid_from': None, - 'valid_to': None, - 'subject': None, - 'issuer': None, - 'serial': None - } - - # compare result of function with cert_info dict. - self.assertEqual( - utils.extract_certificate_info(self.private_pem), cert_info) - self.assertEqual( - utils.extract_certificate_info(self.private_cer), cert_info) - self.assertEqual( - utils.extract_certificate_info(self.public_pem), cert_info) - self.assertEqual( - utils.extract_certificate_info(self.public_cer), cert_info) - self.assertEqual(utils.extract_certificate_info(b''), cert_empty) + def test_partner_checks(self): + """Test the checks for the partner on initialization.""" + with self.assertRaises(ImproperlyConfigured): + as2.Partner('a partner', digest_alg='xyz') + + with self.assertRaises(ImproperlyConfigured): + as2.Partner('a partner', enc_alg='xyz') + + with self.assertRaises(ImproperlyConfigured): + as2.Partner('a partner', mdn_mode='xyz') + + with self.assertRaises(ImproperlyConfigured): + as2.Partner('a partner', mdn_digest_alg='xyz') + + def test_message_checks(self): + """Test the checks and other features of Message.""" + msg = as2.Message() + assert msg.content == '' + assert msg.headers == {} + assert msg.headers_str == b'' + + msg.payload = message.Message() + msg.payload.set_payload(b'data') + assert msg.content == b'data' + + org = as2.Organization(as2_name='AS2 Server') + partner = as2.Partner(as2_name='AS2 Partner', sign=True) + msg = as2.Message(sender=org, receiver=partner) + with self.assertRaises(ImproperlyConfigured): + msg.build(b'data') + + msg.receiver.sign = False + msg.receiver.encrypt = True + with self.assertRaises(ImproperlyConfigured): + msg.build(b'data') + + msg.receiver.encrypt = False + msg.receiver.mdn_mode = 'ASYNC' + with self.assertRaises(ImproperlyConfigured): + msg.build(b'data') + + msg.sender.mdn_url = 'http://localhost/pyas2/as2receive' + msg.build(b'data') + + def test_mdn_checks(self): + """Test the checks and other features of MDN.""" + mdn = as2.Mdn() + assert mdn.content == '' + assert mdn.headers == {} + assert mdn.headers_str == b'' + + def test_all_encryption_algos(self): + """Test all the available encryption algorithms.""" + algos = ['rc2_128_cbc', 'rc4_128_cbc', 'aes_128_cbc', 'aes_192_cbc', 'aes_256_cbc'] + + for algo in algos: + # Build an As2 message to be transmitted to partner + self.partner.encrypt = True + self.partner.enc_alg = algo + out_message = as2.Message(self.org, self.partner) + out_message.build(self.test_data) + raw_out_message = out_message.headers_str + b'\r\n' + out_message.content + + # Parse the generated AS2 message as the partner + in_message = as2.Message() + status, _, _ = in_message.parse( + raw_out_message, + find_org_cb=self.find_org, + find_partner_cb=self.find_partner + ) + + # Compare the mic contents of the input and output messages + self.assertEqual(status, 'processed') + self.assertTrue(in_message.encrypted) + self.assertEqual(self.test_data.splitlines(), in_message.content.splitlines()) def find_org(self, headers): return self.org @@ -374,9 +428,9 @@ def setUp(self): self.org = as2.Organization( as2_name='AS2 Server', sign_key=self.oldpyas2_private_key, - sign_key_pass='password'.encode('utf-8'), + sign_key_pass='password', decrypt_key=self.oldpyas2_private_key, - decrypt_key_pass='password'.encode('utf-8') + decrypt_key_pass='password' ) self.partner = as2.Partner( as2_name='Sterling B2B Integrator', @@ -385,11 +439,12 @@ def setUp(self): encrypt_cert=self.sb2bi_public_key, encrypt_cert_ca=self.sb2bi_public_ca, ) + self.partner.load_verify_cert() + self.partner.load_encrypt_cert() def xtest_process_message(self): """ Test processing message received from Sterling Integrator""" - with open(os.path.join( - self.TEST_DIR, 'sb2bi_signed_cmp.msg'), 'rb') as msg: + with open(os.path.join( TEST_DIR, 'sb2bi_signed_cmp.msg'), 'rb') as msg: as2message = as2.Message() status, exception, as2mdn = as2message.parse( msg.read(), @@ -401,14 +456,11 @@ def xtest_process_message(self): def test_process_mdn(self): """ Test processing mdn received from Sterling Integrator""" - message = as2.Message(sender=self.org, receiver=self.partner) - message.message_id = '151694007918.24690.7052273208458909245@' \ - 'ip-172-31-14-209.ec2.internal' + msg = as2.Message(sender=self.org, receiver=self.partner) + msg.message_id = '151694007918.24690.7052273208458909245@ip-172-31-14-209.ec2.internal' as2mdn = as2.Mdn() # Parse the mdn and get the message status - with open(os.path.join( - self.TEST_DIR, 'sb2bi_signed.mdn'), 'rb') as mdn: - status, detailed_status = as2mdn.parse( - mdn.read(), lambda x, y: message) + with open(os.path.join(TEST_DIR, 'sb2bi_signed.mdn'), 'rb') as mdn: + status, detailed_status = as2mdn.parse(mdn.read(), lambda x, y: msg) self.assertEqual(status, 'processed') diff --git a/tests/test_basic.py b/pyas2lib/tests/test_basic.py similarity index 77% rename from tests/test_basic.py rename to pyas2lib/tests/test_basic.py index 5eadb53..38fc540 100644 --- a/tests/test_basic.py +++ b/pyas2lib/tests/test_basic.py @@ -1,5 +1,7 @@ -from __future__ import unicode_literals, absolute_import, print_function -from . import as2, Pyas2TestCase +"""Module for testing the basic features of pyas2.""" + +from pyas2lib import as2 +from . import Pyas2TestCase class TestBasic(Pyas2TestCase): @@ -8,9 +10,9 @@ def setUp(self): self.org = as2.Organization( as2_name='some_organization', sign_key=self.private_key, - sign_key_pass='test'.encode('utf-8'), + sign_key_pass='test', decrypt_key=self.private_key, - decrypt_key_pass='test'.encode('utf-8') + decrypt_key_pass='test' ) self.partner = as2.Partner( as2_name='some_partner', @@ -59,7 +61,7 @@ def test_compressed_message(self): # Compare the mic contents of the input and output messages self.assertEqual(status, 'processed') self.assertTrue(in_message.compressed) - self.assertEqual(self.test_data, in_message.content) + self.assertEqual(self.test_data.splitlines(), in_message.content.splitlines()) def test_encrypted_message(self): """ Test Encrypted Unsigned Uncompressed Message """ @@ -81,7 +83,7 @@ def test_encrypted_message(self): # Compare the mic contents of the input and output messages self.assertEqual(status, 'processed') self.assertTrue(in_message.encrypted) - self.assertEqual(self.test_data, in_message.content) + self.assertEqual(self.test_data.splitlines(), in_message.content.splitlines()) def test_signed_message(self): """ Test Unencrypted Signed Uncompressed Message """ @@ -102,7 +104,7 @@ def test_signed_message(self): # Compare the mic contents of the input and output messages self.assertEqual(status, 'processed') - self.assertEqual(self.test_data, in_message.content) + self.assertEqual(self.test_data.splitlines(), in_message.content.splitlines()) self.assertTrue(in_message.signed) self.assertEqual(out_message.mic, in_message.mic) @@ -126,10 +128,35 @@ def test_encrypted_signed_message(self): # Compare the mic contents of the input and output messages self.assertEqual(status, 'processed') - self.assertEqual(self.test_data, in_message.content) self.assertTrue(in_message.signed) self.assertTrue(in_message.encrypted) self.assertEqual(out_message.mic, in_message.mic) + self.assertEqual(self.test_data.splitlines(), in_message.content.splitlines()) + + def test_encrypted_signed_message_dos(self): + """ Test Encrypted Signed Uncompressed Message with DOS line endings. """ + + # Build an As2 message to be transmitted to partner + self.partner.sign = True + self.partner.encrypt = True + out_message = as2.Message(self.org, self.partner) + out_message.build(self.test_data_dos) + raw_out_message = out_message.headers_str + b'\r\n' + out_message.content + + # Parse the generated AS2 message as the partner + in_message = as2.Message() + status, _, _ = in_message.parse( + raw_out_message, + find_org_cb=self.find_org, + find_partner_cb=self.find_partner + ) + + # Compare the mic contents of the input and output messages + self.assertEqual(status, 'processed') + self.assertTrue(in_message.signed) + self.assertTrue(in_message.encrypted) + self.assertEqual(out_message.mic, in_message.mic) + self.assertEqual(self.test_data_dos, in_message.content) def test_encrypted_signed_compressed_message(self): """ Test Encrypted Signed Compressed Message """ @@ -152,11 +179,11 @@ def test_encrypted_signed_compressed_message(self): # Compare the mic contents of the input and output messages self.assertEqual(status, 'processed') - self.assertEqual(self.test_data, in_message.content) self.assertTrue(in_message.signed) self.assertTrue(in_message.encrypted) self.assertTrue(in_message.compressed) self.assertEqual(out_message.mic, in_message.mic) + self.assertEqual(self.test_data.splitlines(), in_message.content.splitlines()) def find_org(self, as2_id): return self.org diff --git a/pyas2lib/tests/test_cms.py b/pyas2lib/tests/test_cms.py new file mode 100644 index 0000000..319c8d7 --- /dev/null +++ b/pyas2lib/tests/test_cms.py @@ -0,0 +1,35 @@ +"""Module to test cms related features of pyas2lib.""" +import pytest + +from pyas2lib import cms +from pyas2lib.exceptions import ( + DecompressionError, + DecryptionError, + IntegrityError +) + + +INVALID_DATA = cms.cms.ContentInfo({ + 'content_type': cms.cms.ContentType('data'), +}).dump() + + +def test_compress(): + """Test the compression and decompression functions.""" + compressed_data = cms.compress_message(b'data') + assert cms.decompress_message(compressed_data) == b'data' + + with pytest.raises(DecompressionError): + cms.decompress_message(INVALID_DATA) + + +def test_signing(): + """Test the signing and verification functions.""" + with pytest.raises(IntegrityError): + cms.verify_message(b'data', INVALID_DATA, None) + + +def test_encryption(): + """Test the encryption and decryption functions.""" + with pytest.raises(DecryptionError): + cms.decrypt_message(INVALID_DATA, None) diff --git a/tests/test_mdn.py b/pyas2lib/tests/test_mdn.py similarity index 92% rename from tests/test_mdn.py rename to pyas2lib/tests/test_mdn.py index 96aa5a9..49f0f7a 100644 --- a/tests/test_mdn.py +++ b/pyas2lib/tests/test_mdn.py @@ -1,5 +1,6 @@ -from __future__ import unicode_literals, absolute_import, print_function -from . import as2, Pyas2TestCase +"""Module for testing the MDN related features of pyas2lib""" +from pyas2lib import as2 +from . import Pyas2TestCase class TestMDN(Pyas2TestCase): @@ -9,9 +10,9 @@ def setUp(self): self.org = as2.Organization( as2_name='some_organization', sign_key=self.private_key, - sign_key_pass='test'.encode('utf-8'), + sign_key_pass='test', decrypt_key=self.private_key, - decrypt_key_pass='test'.encode('utf-8') + decrypt_key_pass='test' ) self.partner = as2.Partner( as2_name='some_partner', diff --git a/pyas2lib/tests/test_utils.py b/pyas2lib/tests/test_utils.py new file mode 100644 index 0000000..8222af2 --- /dev/null +++ b/pyas2lib/tests/test_utils.py @@ -0,0 +1,82 @@ +"""Module to test the utility functions of pyas2lib.""" +import datetime +import os +import pytest +from email.message import Message + +from pyas2lib import utils +from pyas2lib.exceptions import AS2Exception +from pyas2lib.tests import TEST_DIR + + +def test_quoting(): + """Test the function for quoting and as2 name.""" + assert utils.quote_as2name('PYAS2LIB') == 'PYAS2LIB' + assert utils.quote_as2name('PYAS2 LIB') == '"PYAS2 LIB"' + + +def test_bytes_generator(): + """Test the email bytes generator class.""" + message = Message() + message.set_type('application/pkcs7-mime') + assert utils.mime_to_bytes(message) == b'MIME-Version: 1.0\r\n' \ + b'Content-Type: application/pkcs7-mime\r\n\r\n' + + +def test_make_boundary(): + """Test the function for creating a boundary for multipart messages.""" + assert utils.make_mime_boundary(text='123456') is not None + + +def test_extract_first_part(): + """Test the function for extracting the first part of a multipart message.""" + message = b'header----first_part\n----second_part\n' + assert utils.extract_first_part(message, b'----') == b'first_part' + + message = b'header----first_part\r\n----second_part\r\n' + assert utils.extract_first_part(message, b'----') == b'first_part' + + +def test_cert_verification(): + """Test the verification of a certificate chain.""" + with open(os.path.join(TEST_DIR, 'cert_sb2bi_public.pem'), 'rb') as fp: + certificate = utils.pem_to_der(fp.read(), return_multiple=False) + + with pytest.raises(AS2Exception): + utils.verify_certificate_chain( + certificate, trusted_certs=[], ignore_self_signed=False) + + +def test_extract_certificate_info(): + """ Test case that extracts data from private and public certificates + in PEM or DER format""" + + cert_info = { + 'valid_from': datetime.datetime(2019, 6, 3, 11, 32, 57, tzinfo=datetime.timezone.utc), + 'valid_to': datetime.datetime(2029, 5, 31, 11, 32, 57, tzinfo=datetime.timezone.utc), + 'subject': [('C', 'AU'), ('ST', 'Some-State'), ('O', 'pyas2lib'), ('CN', 'test')], + 'issuer': [('C', 'AU'), ('ST', 'Some-State'), ('O', 'pyas2lib'), ('CN', 'test')], + 'serial': 13747137503594840569 + } + cert_empty = { + 'valid_from': None, + 'valid_to': None, + 'subject': None, + 'issuer': None, + 'serial': None + } + + # compare result of function with cert_info dict. + with open(os.path.join(TEST_DIR, 'cert_extract_private.cer'), 'rb') as fp: + assert utils.extract_certificate_info(fp.read()) == cert_info + + with open(os.path.join(TEST_DIR, 'cert_extract_private.pem'), 'rb') as fp: + assert utils.extract_certificate_info(fp.read()) == cert_info + + with open(os.path.join(TEST_DIR, 'cert_extract_public.cer'), 'rb') as fp: + assert utils.extract_certificate_info(fp.read()) == cert_info + + with open(os.path.join(TEST_DIR, 'cert_extract_public.pem'), 'rb') as fp: + assert utils.extract_certificate_info(fp.read()) == cert_info + + assert utils.extract_certificate_info(b'') == cert_empty diff --git a/tests/test_with_mecas2.py b/pyas2lib/tests/test_with_mecas2.py similarity index 87% rename from tests/test_with_mecas2.py rename to pyas2lib/tests/test_with_mecas2.py index 3c48dd5..bcf58d8 100644 --- a/tests/test_with_mecas2.py +++ b/pyas2lib/tests/test_with_mecas2.py @@ -1,7 +1,9 @@ -from __future__ import unicode_literals, absolute_import, print_function -from . import Pyas2TestCase, as2 +"""Module for testing with files generated by mendelson as2 server.""" import os +from pyas2lib import as2 +from pyas2lib.tests import Pyas2TestCase, TEST_DIR + class TestMecAS2(Pyas2TestCase): @@ -9,9 +11,9 @@ def setUp(self): self.org = as2.Organization( as2_name='some_organization', sign_key=self.private_key, - sign_key_pass='test'.encode('utf-8'), + sign_key_pass='test', decrypt_key=self.private_key, - decrypt_key_pass='test'.encode('utf-8') + decrypt_key_pass='test' ) self.partner = as2.Partner( as2_name='mecas2', @@ -24,7 +26,7 @@ def test_compressed_message(self): """ Test Compressed Message received from Mendelson AS2""" # Parse the generated AS2 message as the partner - received_file = os.path.join(self.TEST_DIR, 'mecas2_compressed.as2') + received_file = os.path.join(TEST_DIR, 'mecas2_compressed.as2') with open(received_file, 'rb') as fp: in_message = as2.Message() in_message.parse( @@ -41,7 +43,7 @@ def test_encrypted_message(self): """ Test Encrypted Message received from Mendelson AS2""" # Parse the generated AS2 message as the partner - received_file = os.path.join(self.TEST_DIR, 'mecas2_encrypted.as2') + received_file = os.path.join(TEST_DIR, 'mecas2_encrypted.as2') with open(received_file, 'rb') as fp: in_message = as2.Message() in_message.parse( @@ -58,7 +60,7 @@ def test_encrypted_message(self): def test_signed_message(self): """ Test Unencrypted Signed Uncompressed Message from Mendelson AS2""" # Parse the generated AS2 message as the partner - received_file = os.path.join(self.TEST_DIR, 'mecas2_signed.as2') + received_file = os.path.join(TEST_DIR, 'mecas2_signed.as2') with open(received_file, 'rb') as fp: in_message = as2.Message() in_message.parse( @@ -77,7 +79,7 @@ def test_encrypted_signed_message(self): # Parse the generated AS2 message as the partner received_file = os.path.join( - self.TEST_DIR, 'mecas2_signed_encrypted.as2') + TEST_DIR, 'mecas2_signed_encrypted.as2') with open(received_file, 'rb') as fp: in_message = as2.Message() in_message.parse( @@ -98,7 +100,7 @@ def test_encrypted_signed_compressed_message(self): # Parse the generated AS2 message as the partner received_file = os.path.join( - self.TEST_DIR, 'mecas2_compressed_signed_encrypted.as2') + TEST_DIR, 'mecas2_compressed_signed_encrypted.as2') with open(received_file, 'rb') as fp: in_message = as2.Message() in_message.parse( @@ -118,7 +120,7 @@ def test_unsigned_mdn(self): """ Test Unsigned MDN received from Mendelson AS2""" # Parse the generated AS2 message as the partner - received_file = os.path.join(self.TEST_DIR, 'mecas2_unsigned.mdn') + received_file = os.path.join(TEST_DIR, 'mecas2_unsigned.mdn') with open(received_file, 'rb') as fp: in_message = as2.Mdn() status, detailed_status = in_message.parse( @@ -131,7 +133,7 @@ def test_signed_mdn(self): """ Test Signed MDN received from Mendelson AS2""" # Parse the generated AS2 message as the partner - received_file = os.path.join(self.TEST_DIR, 'mecas2_signed.mdn') + received_file = os.path.join(TEST_DIR, 'mecas2_signed.mdn') with open(received_file, 'rb') as fp: in_message = as2.Mdn() in_message.parse(fp.read(), find_message_cb=self.find_message) diff --git a/pyas2lib/utils.py b/pyas2lib/utils.py index 22eca06..72b33f0 100644 --- a/pyas2lib/utils.py +++ b/pyas2lib/utils.py @@ -4,6 +4,8 @@ import sys from OpenSSL import crypto from asn1crypto import pem +from email import policy +from email import message from email.generator import BytesGenerator from io import BytesIO @@ -11,9 +13,9 @@ from datetime import datetime, timezone -def unquote_as2name(quoted_name): +def unquote_as2name(quoted_name: str): """ - Function converts as2 name from quoted to unquoted format + Function converts as2 name from quoted to unquoted format. :param quoted_name: the as2 name in quoted format :return: the as2 name in unquoted format @@ -21,9 +23,10 @@ def unquote_as2name(quoted_name): return email.utils.unquote(quoted_name) -def quote_as2name(unquoted_name): +def quote_as2name(unquoted_name: str): """ - Function converts as2 name from unquoted to quoted format + Function converts as2 name from unquoted to quoted format. + :param unquoted_name: the as2 name in unquoted format :return: the as2 name in unquoted format """ @@ -35,11 +38,13 @@ def quote_as2name(unquoted_name): class BinaryBytesGenerator(BytesGenerator): - """ Override the bytes generator to better handle binary data """ + """Override the bytes generator to better handle binary data.""" - def _handle_application_pkcs7_mime(self, msg): - """ Handle writing the binary messages to prevent default behaviour of - newline replacements """ + def _handle_application_pkcs7_mime(self, msg: email.message.Message): + """ + Handle writing the binary messages to prevent default behaviour of + newline replacements. + """ payload = msg.get_payload(decode=True) if payload is None: return @@ -47,42 +52,44 @@ def _handle_application_pkcs7_mime(self, msg): self._fp.write(payload) -def mime_to_bytes(msg, header_len): +def mime_to_bytes(msg: message.Message, email_policy: policy.Policy = policy.HTTP): """ - Function to convert and email Message to flat string format + Function to convert and email Message to flat string format. + :param msg: email.Message to be converted to string - :param header_len: the msx length of the header per line + :param email_policy: the policy to be used for flattening the message. :return: the byte string representation of the email message """ fp = BytesIO() - g = BinaryBytesGenerator(fp, maxheaderlen=header_len) + g = BinaryBytesGenerator(fp, policy=email_policy) g.flatten(msg) return fp.getvalue() -def canonicalize(message): +def canonicalize(email_message: message.Message): """ - Function to convert an email Message to standard format string + Function to convert an email Message to standard format string/ - :param message: email.Message to be converted to standard string + :param email_message: email.message.Message to be converted to standard string :return: the standard representation of the email message in bytes """ - if message.get('Content-Transfer-Encoding') == 'binary': + if email_message.get('Content-Transfer-Encoding') == 'binary': message_header = '' - message_body = message.get_payload(decode=True) - for k, v in message.items(): + message_body = email_message.get_payload(decode=True) + for k, v in email_message.items(): message_header += '{}: {}\r\n'.format(k, v) message_header += '\r\n' return message_header.encode('utf-8') + message_body else: - return mime_to_bytes(message, 0).replace( - b'\r\n', b'\n').replace(b'\r', b'\n').replace(b'\n', b'\r\n') + return mime_to_bytes(email_message) -def make_mime_boundary(text=None): - # Craft a random boundary. If text is given, ensure that the chosen - # boundary doesn't appear in the text. +def make_mime_boundary(text: str = None): + """ + Craft a random boundary. If text is given, ensure that the chosen + boundary doesn't appear in the text. + """ width = len(repr(sys.maxsize - 1)) fmt = '%%0%dd' % width @@ -102,8 +109,8 @@ def make_mime_boundary(text=None): return b -def extract_first_part(message, boundary): - """ Function to extract the first part of a multipart message""" +def extract_first_part(message: bytes, boundary: bytes): + """Function to extract the first part of a multipart message.""" first_message = message.split(boundary)[1].lstrip() if first_message.endswith(b'\r\n'): first_message = first_message[:-2] @@ -112,8 +119,8 @@ def extract_first_part(message, boundary): return first_message -def pem_to_der(cert, return_multiple=True): - """ Converts a given certificate or list to PEM format""" +def pem_to_der(cert: bytes, return_multiple: bool = True): + """Converts a given certificate or list to PEM format.""" # initialize the certificate array cert_list = [] @@ -132,9 +139,10 @@ def pem_to_der(cert, return_multiple=True): return cert_list.pop() -def split_pem(pem_bytes): +def split_pem(pem_bytes: bytes): """ - Split a give PEM file with multiple certificates + Split a give PEM file with multiple certificates. + :param pem_bytes: The pem data in bytes with multiple certs :return: yields a list of certificates contained in the pem file """ @@ -158,11 +166,11 @@ def split_pem(pem_bytes): pem_data = pem_data + line + b'\r\n' -def verify_certificate_chain(cert_str, trusted_certs, ignore_self_signed=True): - """ Verify a given certificate against a trust store""" +def verify_certificate_chain(cert_bytes, trusted_certs, ignore_self_signed=True): + """Verify a given certificate against a trust store.""" # Load the certificate - certificate = crypto.load_certificate(crypto.FILETYPE_ASN1, cert_str) + certificate = crypto.load_certificate(crypto.FILETYPE_ASN1, cert_bytes) # Create a certificate store and add your trusted certs try: @@ -185,10 +193,11 @@ def verify_certificate_chain(cert_str, trusted_certs, ignore_self_signed=True): return True except crypto.X509StoreContextError as e: - raise AS2Exception('Partner Certificate Invalid: %s' % e.args[-1][-1]) + raise AS2Exception( + 'Partner Certificate Invalid: %s' % e.args[-1][-1], 'invalid-certificate') -def extract_certificate_info(cert): +def extract_certificate_info(cert: bytes): """ Extract validity information from the certificate and return a dictionary. diff --git a/setup.py b/setup.py index b7efad9..e58394b 100644 --- a/setup.py +++ b/setup.py @@ -1,3 +1,4 @@ +import sys from setuptools import setup, find_packages install_requires = [ @@ -6,6 +7,11 @@ 'pyOpenSSL==17.5.0', ] +if sys.version_info.minor == 6: + install_requires += [ + 'dataclasses==0.6' + ] + tests_require = [ 'pytest==3.4.0', 'pytest-cov==2.5.1', @@ -34,7 +40,6 @@ "Intended Audience :: Developers", "Operating System :: OS Independent", "Programming Language :: Python", - "Programming Language :: Python :: 3.5", "Programming Language :: Python :: 3.6", "Programming Language :: Python :: 3.7", "Topic :: Security :: Cryptography", diff --git a/tests/__init__.py b/tests/__init__.py deleted file mode 100644 index e851695..0000000 --- a/tests/__init__.py +++ /dev/null @@ -1,64 +0,0 @@ -import unittest -import os -import sys -sys.path.insert(0, os.path.abspath('..')) - -from pyas2lib import as2, exceptions, utils - - -class Pyas2TestCase(unittest.TestCase): - TEST_DIR = os.path.join( - os.path.dirname(os.path.abspath(__file__)), 'fixtures') - - @classmethod - def setUpClass(cls): - with open(os.path.join(cls.TEST_DIR, 'payload.txt'), 'rb') as t_file: - cls.test_data = t_file.read() - - with open(os.path.join( - cls.TEST_DIR, 'cert_test.p12'), 'rb') as fp: - cls.private_key = fp.read() - - with open(os.path.join( - cls.TEST_DIR, 'cert_test_public.pem'), 'rb') as fp: - cls.public_key = fp.read() - - with open(os.path.join( - cls.TEST_DIR, 'cert_mecas2_public.pem'), 'rb') as fp: - cls.mecas2_public_key = fp.read() - - with open(os.path.join( - cls.TEST_DIR, 'cert_oldpyas2_public.pem'), 'rb') as fp: - cls.oldpyas2_public_key = fp.read() - - with open(os.path.join( - cls.TEST_DIR, 'cert_oldpyas2_public.pem'), 'rb') as fp: - cls.oldpyas2_public_key = fp.read() - - with open(os.path.join( - cls.TEST_DIR, 'cert_oldpyas2_private.pem'), 'rb') as fp: - cls.oldpyas2_private_key = fp.read() - - with open(os.path.join( - cls.TEST_DIR, 'cert_sb2bi_public.pem'), 'rb') as fp: - cls.sb2bi_public_key = fp.read() - - with open(os.path.join( - cls.TEST_DIR, 'cert_sb2bi_public.ca'), 'rb') as fp: - cls.sb2bi_public_ca = fp.read() - - with open(os.path.join( - cls.TEST_DIR, 'cert_extract_private.cer'), 'rb') as fp: - cls.private_cer = fp.read() - - with open(os.path.join( - cls.TEST_DIR, 'cert_extract_private.pem'), 'rb') as fp: - cls.private_pem = fp.read() - - with open(os.path.join( - cls.TEST_DIR, 'cert_extract_public.cer'), 'rb') as fp: - cls.public_pem = fp.read() - - with open(os.path.join( - cls.TEST_DIR, 'cert_extract_public.cer'), 'rb') as fp: - cls.public_cer = fp.read()