diff --git a/.travis.yml b/.travis.yml index a9c61f4..63dcf98 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,7 +7,7 @@ python: install: - pip install -e ".[tests]" script: - - pytest --cov=pyas2lib --cov-config .coveragerc --black --pylama + - pytest --cov=pyas2lib --cov-config .coveragerc --black --pylava after_success: - pip install codecov - codecov \ No newline at end of file diff --git a/pyas2lib/as2.py b/pyas2lib/as2.py index 4a42abc..7eaba35 100644 --- a/pyas2lib/as2.py +++ b/pyas2lib/as2.py @@ -1,3 +1,4 @@ +"""Define the core functions/classes of the pyas2 package.""" import logging import hashlib import binascii @@ -54,7 +55,7 @@ @dataclass -class Organization(object): +class Organization: """ Class represents an AS2 organization and defines the certificates and settings to be used when sending and receiving messages. @@ -124,7 +125,7 @@ def load_key(key_str: bytes, key_pass: str): @dataclass -class Partner(object): +class Partner: """ Class represents an AS2 partner and defines the certificates and settings to be used when sending and receiving messages. @@ -217,6 +218,7 @@ def __post_init__(self): ) def load_verify_cert(self): + """Load the verification certificate of the partner and returned the parsed cert.""" if self.validate_certs: # Convert the certificate to DER format cert = pem_to_der(self.verify_cert, return_multiple=False) @@ -235,6 +237,7 @@ def load_verify_cert(self): return asymmetric.load_certificate(self.verify_cert) def load_encrypt_cert(self): + """Load the encryption certificate of the partner and returned the parsed cert.""" if self.validate_certs: # Convert the certificate to DER format cert = pem_to_der(self.encrypt_cert, return_multiple=False) @@ -253,7 +256,7 @@ def load_encrypt_cert(self): return asymmetric.load_certificate(self.encrypt_cert) -class Message(object): +class Message: """Class for handling AS2 messages. Includes functions for both parsing and building messages. """ @@ -291,19 +294,20 @@ def content(self): temp = message_bytes.split(boundary) temp.pop(0) return boundary + boundary.join(temp) - else: - content = self.payload.get_payload(decode=True) - return content + + content = self.payload.get_payload(decode=True) + return content @property def headers(self): + """Return the headers in the payload as a dictionary.""" if self.payload: return dict(self.payload.items()) - else: - return {} + return {} @property def headers_str(self): + """Return the headers in the payload as a string.""" message_header = "" if self.payload: for k, v in self.headers.items(): @@ -346,10 +350,10 @@ def build( """ # Validations - assert type(data) is bytes, "Parameter data must be of bytes type." + assert isinstance(data, bytes), "Parameter data must be of bytes type." additional_headers = additional_headers if additional_headers else {} - assert type(additional_headers) is dict + assert isinstance(additional_headers, dict) if self.receiver.sign and not self.sender.sign_key: raise ImproperlyConfigured( @@ -641,43 +645,42 @@ def parse(self, raw_content, find_org_cb, find_partner_cb, find_message_cb=None) if not self.compressed: self.compressed, self.payload = self._decompress_data(self.payload) - except Exception as e: + except Exception as e: # pylint: disable=W0703 status = getattr(e, "disposition_type", "processed/Error") detailed_status = getattr( e, "disposition_modifier", "unexpected-processing-error" ) exception = (e, traceback.format_exc()) logger.error(f"Failed to parse AS2 message\n: {traceback.format_exc()}") - finally: - # Update the payload headers with the original headers - for k, v in as2_headers.items(): - if self.payload.get(k) and k.lower() != "content-disposition": - del self.payload[k] - self.payload.add_header(k, v) + # Update the payload headers with the original headers + for k, v in as2_headers.items(): + if self.payload.get(k) and k.lower() != "content-disposition": + del self.payload[k] + self.payload.add_header(k, v) - if as2_headers.get("disposition-notification-to"): - mdn_mode = SYNCHRONOUS_MDN + if as2_headers.get("disposition-notification-to"): + mdn_mode = SYNCHRONOUS_MDN - mdn_url = as2_headers.get("receipt-delivery-option") - if mdn_url: - mdn_mode = ASYNCHRONOUS_MDN + mdn_url = as2_headers.get("receipt-delivery-option") + if mdn_url: + mdn_mode = ASYNCHRONOUS_MDN - digest_alg = as2_headers.get("disposition-notification-options") - if digest_alg: - digest_alg = digest_alg.split(";")[-1].split(",")[-1].strip() + digest_alg = as2_headers.get("disposition-notification-options") + if digest_alg: + digest_alg = digest_alg.split(";")[-1].split(",")[-1].strip() - logger.debug( - f"Building the MDN for message {self.message_id} with status {status} " - f"and detailed-status {detailed_status}." - ) - mdn = Mdn(mdn_mode=mdn_mode, mdn_url=mdn_url, digest_alg=digest_alg) - mdn.build(message=self, status=status, detailed_status=detailed_status) + logger.debug( + f"Building the MDN for message {self.message_id} with status {status} " + f"and detailed-status {detailed_status}." + ) + mdn = Mdn(mdn_mode=mdn_mode, mdn_url=mdn_url, digest_alg=digest_alg) + mdn.build(message=self, status=status, detailed_status=detailed_status) - return status, exception, mdn + return status, exception, mdn -class Mdn(object): +class Mdn: """Class for handling AS2 MDNs. Includes functions for both parsing and building them. """ @@ -700,18 +703,18 @@ def content(self): temp = message_bytes.split(boundary) temp.pop(0) return boundary + boundary.join(temp) - else: - return "" + return "" @property def headers(self): + """Return the headers in the payload as a dictionary.""" if self.payload: return dict(self.payload.items()) - else: - return {} + return {} @property def headers_str(self): + """Return the headers in the payload as a string.""" message_header = "" if self.payload: for k, v in self.headers.items(): @@ -947,16 +950,14 @@ def parse(self, raw_content, find_message_cb): except MDNNotFound: status = "failed/Failure" detailed_status = "mdn-not-found" - except Exception as e: + except Exception as e: # pylint: disable=W0703 status = "failed/Failure" detailed_status = f"Failed to parse received MDN. {e}" logger.error(f"Failed to parse AS2 MDN\n: {traceback.format_exc()}") - - finally: - return status, detailed_status + return status, detailed_status def detect_mdn(self): - """ Function checks if the received raw message is an AS2 MDN or not. + """Function checks if the received raw message is an AS2 MDN or not. :raises MDNNotFound: If the received payload is not an MDN then this exception is raised. diff --git a/pyas2lib/cms.py b/pyas2lib/cms.py index 9ffbc60..02fe8c7 100644 --- a/pyas2lib/cms.py +++ b/pyas2lib/cms.py @@ -1,3 +1,4 @@ +"""Define functions related to the CMS operations such as encrypting, signature, etc.""" import hashlib import zlib from collections import OrderedDict @@ -57,8 +58,7 @@ def decompress_message(compressed_data): cms_content = cms.ContentInfo.load(compressed_data) if cms_content["content_type"].native == "compressed_data": return cms_content["content"].decompressed - else: - raise DecompressionError("Compressed data not found in ASN.1 ") + raise DecompressionError("Compressed data not found in ASN.1 ") except Exception as e: raise DecompressionError("Decompression failed with cause: {}".format(e)) @@ -103,7 +103,11 @@ def encrypt_message(data_to_encrypt, enc_alg, encryption_cert): elif cipher == "rc4": algorithm_id = "1.2.840.113549.3.4" encrypted_content = symmetric.rc4_encrypt(key, data_to_encrypt) - enc_alg_asn1 = algos.EncryptionAlgorithm({"algorithm": algorithm_id,}) + enc_alg_asn1 = algos.EncryptionAlgorithm( + { + "algorithm": algorithm_id, + } + ) elif cipher == "aes": if key_length == "128": @@ -271,6 +275,8 @@ def sign_message( message_digest = digest_func.digest() class SmimeCapability(core.Sequence): + """"Define the possible list of Smime Capability.""" + _fields = [ ("0", core.Any, {"optional": True}), ("1", core.Any, {"optional": True}), @@ -280,6 +286,8 @@ class SmimeCapability(core.Sequence): ] class SmimeCapabilities(core.Sequence): + """"Define the Smime Capabilities supported by pyas2.""" + _fields = [ ("0", SmimeCapability), ("1", SmimeCapability, {"optional": True}), @@ -504,9 +512,6 @@ def verify_message(data_to_verify, signature, verify_cert): else: raise AS2Exception("Unsupported Signature Algorithm") except Exception as e: - import traceback - - traceback.print_exc() raise IntegrityError("Failed to verify message signature: {}".format(e)) else: raise IntegrityError("Signed data not found in ASN.1 ") diff --git a/pyas2lib/exceptions.py b/pyas2lib/exceptions.py index 3edccb8..1b1c2d6 100644 --- a/pyas2lib/exceptions.py +++ b/pyas2lib/exceptions.py @@ -82,7 +82,7 @@ class IntegrityError(AS2Exception): class UnexpectedError(AS2Exception): """A catch all exception to be raised for any error found while parsing - a received AS2 message""" + a received AS2 message""" disposition_type = "processed/Error" disposition_modifier = "unexpected-processing-error" diff --git a/pyas2lib/tests/test_cms.py b/pyas2lib/tests/test_cms.py index 7a898e8..34655db 100644 --- a/pyas2lib/tests/test_cms.py +++ b/pyas2lib/tests/test_cms.py @@ -16,7 +16,9 @@ INVALID_DATA = cms.cms.ContentInfo( - {"content_type": cms.cms.ContentType("data"),} + { + "content_type": cms.cms.ContentType("data"), + } ).dump() diff --git a/pyas2lib/tests/test_utils.py b/pyas2lib/tests/test_utils.py index 72a9e7a..f1607e5 100644 --- a/pyas2lib/tests/test_utils.py +++ b/pyas2lib/tests/test_utils.py @@ -114,8 +114,8 @@ def test_cert_verification(): def test_extract_certificate_info(): - """ Test case that extracts data from private and public certificates - in PEM or DER format""" + """Test case that extracts data from private and public certificates + in PEM or DER format""" cert_info = { "valid_from": datetime.datetime( diff --git a/pyas2lib/utils.py b/pyas2lib/utils.py index dd2bbe6..a76ade9 100644 --- a/pyas2lib/utils.py +++ b/pyas2lib/utils.py @@ -1,16 +1,19 @@ +"""Define utility functions used by the pyas2-lib package.""" + import email import random import re import sys -from OpenSSL import crypto -from asn1crypto import pem -from email import policy +from datetime import datetime, timezone from email import message +from email import policy from email.generator import BytesGenerator from io import BytesIO +from OpenSSL import crypto +from asn1crypto import pem + from pyas2lib.exceptions import AS2Exception -from datetime import datetime, timezone def unquote_as2name(quoted_name: str): @@ -33,8 +36,7 @@ def quote_as2name(unquoted_name: str): if re.search(r'[\\" ]', unquoted_name, re.M): return '"' + email.utils.quote(unquoted_name) + '"' - else: - return unquoted_name + return unquoted_name class BinaryBytesGenerator(BytesGenerator): @@ -52,8 +54,7 @@ def _handle_text(self, msg): payload = msg.get_payload(decode=True) if payload is None: return - else: - self._fp.write(payload) + self._fp.write(payload) else: super()._handle_text(msg) @@ -89,8 +90,8 @@ def canonicalize(email_message: message.Message): message_header += "{}: {}\r\n".format(k, v) message_header += "\r\n" return message_header.encode("utf-8") + message_body - else: - return mime_to_bytes(email_message) + + return mime_to_bytes(email_message) def make_mime_boundary(text: str = None): @@ -117,9 +118,9 @@ def make_mime_boundary(text: str = None): return b -def extract_first_part(message: bytes, boundary: bytes): - """Function to extract the first part of a multipart message.""" - first_message = message.split(boundary)[1].lstrip() +def extract_first_part(message_content: bytes, boundary: bytes): + """Extract the first part of a multipart message.""" + first_message = message_content.split(boundary)[1].lstrip() if first_message.endswith(b"\r\n"): first_message = first_message[:-2] else: @@ -128,8 +129,7 @@ def extract_first_part(message: bytes, boundary: bytes): def pem_to_der(cert: bytes, return_multiple: bool = True): - """Converts a given certificate or list to PEM format.""" - + """Convert a given certificate or list to PEM format.""" # initialize the certificate array cert_list = [] @@ -143,8 +143,7 @@ def pem_to_der(cert: bytes, return_multiple: bool = True): # return multiple if return_multiple is set else first element if return_multiple: return cert_list - else: - return cert_list.pop() + return cert_list.pop() def split_pem(pem_bytes: bytes): @@ -219,7 +218,6 @@ def extract_certificate_info(cert: bytes): issuer (list of name, value tuples) serial (int) """ - # initialize the cert_info dictionary cert_info = { "valid_from": None, diff --git a/setup.cfg b/setup.cfg index 029bd22..927df06 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,23 +1,22 @@ [aliases] test=pytest -[pylama:pycodestyle] +[pylava:pycodestyle] max_line_length = 100 -[pylama:pylint] +[pylava:pylint] max_line_length = 100 ignore = E1101,R0902,R0903,W1203,C0103 -[pylama:pydocstyle] +[pylava:pydocstyle] convention = numpy ignore = D202 -[pylama:pep8] +[pylava:pep8] max_line_length = 100 -[pylama] +[pylava] format = pep8 -skip = venv/*,.tox/* -linters= pycodestyle,pydocstyle,pyflakes,pylint,pep8 -ignore = D203,D212,E231 - +skip = venv/*,.tox/*,*/tests/*,setup.py +linters= pycodestyle,pyflakes,pylint,pep8 +ignore = D203,D212,E231,C0330,R0912,R0914,W1202,R1702 diff --git a/setup.py b/setup.py index 5a79a02..0670b7c 100644 --- a/setup.py +++ b/setup.py @@ -8,11 +8,13 @@ ] tests_require = [ - "pytest==5.4.1", + "pytest==6.1.2", "pytest-cov==2.8.1", "coverage==5.0.4", - "pylama==7.7.1", - "pytest-black==0.3.8", + "pylava-pylint==0.0.3", + "pylint==2.4.4", + "black==20.8b1", + "pytest-black==0.3.12", ] setup( @@ -42,5 +44,7 @@ setup_requires=["pytest-runner"], install_requires=install_requires, tests_require=tests_require, - extras_require={"tests": tests_require,}, + extras_require={ + "tests": tests_require, + }, )