Skip to content

Commit

Permalink
Use pylava as the linter and freeze versions of test libs
Browse files Browse the repository at this point in the history
  • Loading branch information
abhishekram committed Nov 1, 2020
1 parent 0a33440 commit 29a84d8
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 84 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ python:
install:
- pip install -e ".[tests]"
script:
- pytest --cov=pyas2lib --cov-config .coveragerc --black --pylama
- pytest --cov=pyas2lib --cov-config .coveragerc --black --pylava
after_success:
- pip install codecov
- codecov
85 changes: 43 additions & 42 deletions pyas2lib/as2.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Define the core functions/classes of the pyas2 package."""
import logging
import hashlib
import binascii
Expand Down Expand Up @@ -54,7 +55,7 @@


@dataclass
class Organization(object):
class Organization:
"""
Class represents an AS2 organization and defines the certificates and
settings to be used when sending and receiving messages.
Expand Down Expand Up @@ -124,7 +125,7 @@ def load_key(key_str: bytes, key_pass: str):


@dataclass
class Partner(object):
class Partner:
"""
Class represents an AS2 partner and defines the certificates and
settings to be used when sending and receiving messages.
Expand Down Expand Up @@ -217,6 +218,7 @@ def __post_init__(self):
)

def load_verify_cert(self):
"""Load the verification certificate of the partner and returned the parsed cert."""
if self.validate_certs:
# Convert the certificate to DER format
cert = pem_to_der(self.verify_cert, return_multiple=False)
Expand All @@ -235,6 +237,7 @@ def load_verify_cert(self):
return asymmetric.load_certificate(self.verify_cert)

def load_encrypt_cert(self):
"""Load the encryption certificate of the partner and returned the parsed cert."""
if self.validate_certs:
# Convert the certificate to DER format
cert = pem_to_der(self.encrypt_cert, return_multiple=False)
Expand All @@ -253,7 +256,7 @@ def load_encrypt_cert(self):
return asymmetric.load_certificate(self.encrypt_cert)


class Message(object):
class Message:
"""Class for handling AS2 messages. Includes functions for both
parsing and building messages.
"""
Expand Down Expand Up @@ -291,19 +294,20 @@ def content(self):
temp = message_bytes.split(boundary)
temp.pop(0)
return boundary + boundary.join(temp)
else:
content = self.payload.get_payload(decode=True)
return content

content = self.payload.get_payload(decode=True)
return content

@property
def headers(self):
"""Return the headers in the payload as a dictionary."""
if self.payload:
return dict(self.payload.items())
else:
return {}
return {}

@property
def headers_str(self):
"""Return the headers in the payload as a string."""
message_header = ""
if self.payload:
for k, v in self.headers.items():
Expand Down Expand Up @@ -346,10 +350,10 @@ def build(
"""

# Validations
assert type(data) is bytes, "Parameter data must be of bytes type."
assert isinstance(data, bytes), "Parameter data must be of bytes type."

additional_headers = additional_headers if additional_headers else {}
assert type(additional_headers) is dict
assert isinstance(additional_headers, dict)

if self.receiver.sign and not self.sender.sign_key:
raise ImproperlyConfigured(
Expand Down Expand Up @@ -641,43 +645,42 @@ def parse(self, raw_content, find_org_cb, find_partner_cb, find_message_cb=None)
if not self.compressed:
self.compressed, self.payload = self._decompress_data(self.payload)

except Exception as e:
except Exception as e: # pylint: disable=W0703
status = getattr(e, "disposition_type", "processed/Error")
detailed_status = getattr(
e, "disposition_modifier", "unexpected-processing-error"
)
exception = (e, traceback.format_exc())
logger.error(f"Failed to parse AS2 message\n: {traceback.format_exc()}")
finally:

# Update the payload headers with the original headers
for k, v in as2_headers.items():
if self.payload.get(k) and k.lower() != "content-disposition":
del self.payload[k]
self.payload.add_header(k, v)
# Update the payload headers with the original headers
for k, v in as2_headers.items():
if self.payload.get(k) and k.lower() != "content-disposition":
del self.payload[k]
self.payload.add_header(k, v)

if as2_headers.get("disposition-notification-to"):
mdn_mode = SYNCHRONOUS_MDN
if as2_headers.get("disposition-notification-to"):
mdn_mode = SYNCHRONOUS_MDN

mdn_url = as2_headers.get("receipt-delivery-option")
if mdn_url:
mdn_mode = ASYNCHRONOUS_MDN
mdn_url = as2_headers.get("receipt-delivery-option")
if mdn_url:
mdn_mode = ASYNCHRONOUS_MDN

digest_alg = as2_headers.get("disposition-notification-options")
if digest_alg:
digest_alg = digest_alg.split(";")[-1].split(",")[-1].strip()
digest_alg = as2_headers.get("disposition-notification-options")
if digest_alg:
digest_alg = digest_alg.split(";")[-1].split(",")[-1].strip()

logger.debug(
f"Building the MDN for message {self.message_id} with status {status} "
f"and detailed-status {detailed_status}."
)
mdn = Mdn(mdn_mode=mdn_mode, mdn_url=mdn_url, digest_alg=digest_alg)
mdn.build(message=self, status=status, detailed_status=detailed_status)
logger.debug(
f"Building the MDN for message {self.message_id} with status {status} "
f"and detailed-status {detailed_status}."
)
mdn = Mdn(mdn_mode=mdn_mode, mdn_url=mdn_url, digest_alg=digest_alg)
mdn.build(message=self, status=status, detailed_status=detailed_status)

return status, exception, mdn
return status, exception, mdn


class Mdn(object):
class Mdn:
"""Class for handling AS2 MDNs. Includes functions for both
parsing and building them.
"""
Expand All @@ -700,18 +703,18 @@ def content(self):
temp = message_bytes.split(boundary)
temp.pop(0)
return boundary + boundary.join(temp)
else:
return ""
return ""

@property
def headers(self):
"""Return the headers in the payload as a dictionary."""
if self.payload:
return dict(self.payload.items())
else:
return {}
return {}

@property
def headers_str(self):
"""Return the headers in the payload as a string."""
message_header = ""
if self.payload:
for k, v in self.headers.items():
Expand Down Expand Up @@ -947,16 +950,14 @@ def parse(self, raw_content, find_message_cb):
except MDNNotFound:
status = "failed/Failure"
detailed_status = "mdn-not-found"
except Exception as e:
except Exception as e: # pylint: disable=W0703
status = "failed/Failure"
detailed_status = f"Failed to parse received MDN. {e}"
logger.error(f"Failed to parse AS2 MDN\n: {traceback.format_exc()}")

finally:
return status, detailed_status
return status, detailed_status

def detect_mdn(self):
""" Function checks if the received raw message is an AS2 MDN or not.
"""Function checks if the received raw message is an AS2 MDN or not.
:raises MDNNotFound: If the received payload is not an MDN then this
exception is raised.
Expand Down
17 changes: 11 additions & 6 deletions pyas2lib/cms.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Define functions related to the CMS operations such as encrypting, signature, etc."""
import hashlib
import zlib
from collections import OrderedDict
Expand Down Expand Up @@ -57,8 +58,7 @@ def decompress_message(compressed_data):
cms_content = cms.ContentInfo.load(compressed_data)
if cms_content["content_type"].native == "compressed_data":
return cms_content["content"].decompressed
else:
raise DecompressionError("Compressed data not found in ASN.1 ")
raise DecompressionError("Compressed data not found in ASN.1 ")

except Exception as e:
raise DecompressionError("Decompression failed with cause: {}".format(e))
Expand Down Expand Up @@ -103,7 +103,11 @@ def encrypt_message(data_to_encrypt, enc_alg, encryption_cert):
elif cipher == "rc4":
algorithm_id = "1.2.840.113549.3.4"
encrypted_content = symmetric.rc4_encrypt(key, data_to_encrypt)
enc_alg_asn1 = algos.EncryptionAlgorithm({"algorithm": algorithm_id,})
enc_alg_asn1 = algos.EncryptionAlgorithm(
{
"algorithm": algorithm_id,
}
)

elif cipher == "aes":
if key_length == "128":
Expand Down Expand Up @@ -271,6 +275,8 @@ def sign_message(
message_digest = digest_func.digest()

class SmimeCapability(core.Sequence):
""""Define the possible list of Smime Capability."""

_fields = [
("0", core.Any, {"optional": True}),
("1", core.Any, {"optional": True}),
Expand All @@ -280,6 +286,8 @@ class SmimeCapability(core.Sequence):
]

class SmimeCapabilities(core.Sequence):
""""Define the Smime Capabilities supported by pyas2."""

_fields = [
("0", SmimeCapability),
("1", SmimeCapability, {"optional": True}),
Expand Down Expand Up @@ -504,9 +512,6 @@ def verify_message(data_to_verify, signature, verify_cert):
else:
raise AS2Exception("Unsupported Signature Algorithm")
except Exception as e:
import traceback

traceback.print_exc()
raise IntegrityError("Failed to verify message signature: {}".format(e))
else:
raise IntegrityError("Signed data not found in ASN.1 ")
Expand Down
2 changes: 1 addition & 1 deletion pyas2lib/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class IntegrityError(AS2Exception):

class UnexpectedError(AS2Exception):
"""A catch all exception to be raised for any error found while parsing
a received AS2 message"""
a received AS2 message"""

disposition_type = "processed/Error"
disposition_modifier = "unexpected-processing-error"
Expand Down
4 changes: 3 additions & 1 deletion pyas2lib/tests/test_cms.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@


INVALID_DATA = cms.cms.ContentInfo(
{"content_type": cms.cms.ContentType("data"),}
{
"content_type": cms.cms.ContentType("data"),
}
).dump()


Expand Down
4 changes: 2 additions & 2 deletions pyas2lib/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ def test_cert_verification():


def test_extract_certificate_info():
""" Test case that extracts data from private and public certificates
in PEM or DER format"""
"""Test case that extracts data from private and public certificates
in PEM or DER format"""

cert_info = {
"valid_from": datetime.datetime(
Expand Down
34 changes: 16 additions & 18 deletions pyas2lib/utils.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
"""Define utility functions used by the pyas2-lib package."""

import email
import random
import re
import sys
from OpenSSL import crypto
from asn1crypto import pem
from email import policy
from datetime import datetime, timezone
from email import message
from email import policy
from email.generator import BytesGenerator
from io import BytesIO

from OpenSSL import crypto
from asn1crypto import pem

from pyas2lib.exceptions import AS2Exception
from datetime import datetime, timezone


def unquote_as2name(quoted_name: str):
Expand All @@ -33,8 +36,7 @@ def quote_as2name(unquoted_name: str):

if re.search(r'[\\" ]', unquoted_name, re.M):
return '"' + email.utils.quote(unquoted_name) + '"'
else:
return unquoted_name
return unquoted_name


class BinaryBytesGenerator(BytesGenerator):
Expand All @@ -52,8 +54,7 @@ def _handle_text(self, msg):
payload = msg.get_payload(decode=True)
if payload is None:
return
else:
self._fp.write(payload)
self._fp.write(payload)
else:
super()._handle_text(msg)

Expand Down Expand Up @@ -89,8 +90,8 @@ def canonicalize(email_message: message.Message):
message_header += "{}: {}\r\n".format(k, v)
message_header += "\r\n"
return message_header.encode("utf-8") + message_body
else:
return mime_to_bytes(email_message)

return mime_to_bytes(email_message)


def make_mime_boundary(text: str = None):
Expand All @@ -117,9 +118,9 @@ def make_mime_boundary(text: str = None):
return b


def extract_first_part(message: bytes, boundary: bytes):
"""Function to extract the first part of a multipart message."""
first_message = message.split(boundary)[1].lstrip()
def extract_first_part(message_content: bytes, boundary: bytes):
"""Extract the first part of a multipart message."""
first_message = message_content.split(boundary)[1].lstrip()
if first_message.endswith(b"\r\n"):
first_message = first_message[:-2]
else:
Expand All @@ -128,8 +129,7 @@ def extract_first_part(message: bytes, boundary: bytes):


def pem_to_der(cert: bytes, return_multiple: bool = True):
"""Converts a given certificate or list to PEM format."""

"""Convert a given certificate or list to PEM format."""
# initialize the certificate array
cert_list = []

Expand All @@ -143,8 +143,7 @@ def pem_to_der(cert: bytes, return_multiple: bool = True):
# return multiple if return_multiple is set else first element
if return_multiple:
return cert_list
else:
return cert_list.pop()
return cert_list.pop()


def split_pem(pem_bytes: bytes):
Expand Down Expand Up @@ -219,7 +218,6 @@ def extract_certificate_info(cert: bytes):
issuer (list of name, value tuples)
serial (int)
"""

# initialize the cert_info dictionary
cert_info = {
"valid_from": None,
Expand Down
Loading

0 comments on commit 29a84d8

Please sign in to comment.