Skip to content

Commit

Permalink
use binary encoding for encryption and signatures
Browse files Browse the repository at this point in the history
  • Loading branch information
abhishekram committed Apr 30, 2019
1 parent 7fac38a commit 0fcf070
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 66 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
## 1.0.4 - 2019-04-26

* Handle cases where compression is done before signing.
* Add support for additional encryption algorithms
* Add support for additional encryption algorithms.
* Use binary encoding for encryption and signatures.
* Remove support for Python 2.

## 1.0.3 - 2018-05-01
Expand Down
69 changes: 34 additions & 35 deletions pyas2lib/as2.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,14 +278,13 @@ def content(self):
return ''

if self.payload.is_multipart():
message_bytes = mime_to_bytes(
self.payload, 0).replace(b'\n', b'\r\n')
message_bytes = mime_to_bytes(self.payload, 0)
boundary = b'--' + self.payload.get_boundary().encode('utf-8')
temp = message_bytes.split(boundary)
temp.pop(0)
return boundary + boundary.join(temp)
else:
content = self.payload.get_payload()
content = self.payload.get_payload(decode=True)
if isinstance(content, str):
content = content.encode('utf-8')
return content
Expand Down Expand Up @@ -382,15 +381,15 @@ def build(self, data, filename=None, subject='AS2 Message',
compressed_message.set_param('smime-type', 'compressed-data')
compressed_message.add_header(
'Content-Disposition', 'attachment', filename='smime.p7z')
# compressed_message['Content-Transfer-Encoding'] = 'binary'
compressed_message.add_header(
'Content-Transfer-Encoding', 'binary')
compressed_message.set_payload(
compress_message(canonicalize(self.payload)))
compress_message(mime_to_bytes(self.payload, 0)))

encoders.encode_base64(compressed_message)
self.payload = compressed_message

# logger.debug(b'Compressed message %s payload as:\n%s' % (
# self.message_id, self.payload.as_string()))
logger.debug('Compressed message %s payload as:\n%s' % (
self.message_id, self.payload.as_string()))

if self.receiver.sign:
self.signed, self.digest_alg = True, self.receiver.digest_alg
Expand Down Expand Up @@ -421,8 +420,8 @@ def build(self, data, filename=None, subject='AS2 Message',
signed_message.attach(signature)
self.payload = signed_message

# logger.debug(b'Signed message %s payload as:\n%s' % (
# self.message_id, self.payload.as_string()))
logger.debug('Signed message %s payload as:\n%s' % (
self.message_id, mime_to_bytes(self.payload, 0)))

if self.receiver.encrypt:
self.encrypted, self.enc_alg = True, self.receiver.enc_alg
Expand All @@ -432,16 +431,14 @@ def build(self, data, filename=None, subject='AS2 Message',
encrypted_message.set_param('smime-type', 'enveloped-data')
encrypted_message.add_header(
'Content-Disposition', 'attachment', filename='smime.p7m')
encrypted_message.add_header('Content-Transfer-Encoding', 'binary')
encrypt_cert = self.receiver.load_encrypt_cert()
encrypted_message.set_payload(encrypt_message(
canonicalize(self.payload),
self.enc_alg,
encrypt_cert
))
encoders.encode_base64(encrypted_message)
mime_to_bytes(self.payload, 0), self.enc_alg, encrypt_cert))

self.payload = encrypted_message
# logger.debug(b'Encrypted message %s payload as:\n%s' % (
# self.message_id, self.payload.as_string()))
logger.debug('Encrypted message %s payload as:\n%s' % (
self.message_id, self.payload.as_string()))

if self.receiver.mdn_mode:
as2_headers['disposition-notification-to'] = '[email protected]'
Expand All @@ -468,7 +465,7 @@ def build(self, data, filename=None, subject='AS2 Message',
self.payload.set_boundary(make_mime_boundary())

@staticmethod
def decompress_data(payload):
def _decompress_data(payload):
if payload.get_content_type() == 'application/pkcs7-mime' \
and payload.get_param('smime-type') == 'compressed-data':
compressed_data = payload.get_payload(decode=True)
Expand Down Expand Up @@ -562,7 +559,7 @@ def parse(self, raw_content, find_org_cb, find_partner_cb,
self.payload.set_type('application/edi-consent')

# Check for compressed data here
self.compressed, self.payload = self.decompress_data(self.payload)
self.compressed, self.payload = self._decompress_data(self.payload)

if self.sender.sign and \
self.payload.get_content_type() != 'multipart/signed':
Expand All @@ -575,8 +572,8 @@ def parse(self, raw_content, find_org_cb, find_partner_cb,
# self.payload.as_string()))
self.signed = True
signature = None
message_boundary = (
'--' + self.payload.get_boundary()).encode('utf-8')
message_boundary = ('--' + self.payload.get_boundary()).\
encode('utf-8')
for part in self.payload.walk():
if part.get_content_type() == "application/pkcs7-signature":
signature = part.get_payload(decode=True)
Expand All @@ -591,8 +588,8 @@ def parse(self, raw_content, find_org_cb, find_partner_cb,
self.digest_alg = verify_message(
mic_content, signature, verify_cert)
except IntegrityError:
mic_content = raw_content.split(message_boundary)[
1].replace(b'\n', b'\r\n')
mic_content = raw_content.split(message_boundary)[1].\
replace(b'\n', b'\r\n')
self.digest_alg = verify_message(
mic_content, signature, verify_cert)

Expand All @@ -603,15 +600,17 @@ def parse(self, raw_content, find_org_cb, find_partner_cb,

# Check for compressed data here
if not self.compressed:
self.compressed, self.payload = self.decompress_data(self.payload)
self.compressed, self.payload = self._decompress_data(self.payload)

except Exception as e:
status = getattr(e, 'disposition_type', 'processed/Error')
detailed_status = getattr(
e, 'disposition_modifier', 'unexpected-processing-error')
exception = (e, traceback.format_exc())
logger.error('Failed to parse AS2 message\n: %s' % traceback.format_exc())
logger.error('Failed to parse AS2 message\n: %s' %
traceback.format_exc())
finally:

# Update the payload headers with the original headers
for k, v in as2_headers.items():
if self.payload.get(k) and k.lower() != 'content-disposition':
Expand All @@ -627,13 +626,12 @@ def parse(self, raw_content, find_org_cb, find_partner_cb,

digest_alg = as2_headers.get('disposition-notification-options')
if digest_alg:
digest_alg = digest_alg.split(';')[-1].split(',')[
-1].strip()
digest_alg = digest_alg.split(';')[-1].\
split(',')[-1].strip()
mdn = Mdn(
mdn_mode=mdn_mode, mdn_url=mdn_url, digest_alg=digest_alg)
mdn.build(message=self,
status=status,
detailed_status=detailed_status)
mdn.build(
message=self, status=status, detailed_status=detailed_status)

return status, exception, mdn

Expand Down Expand Up @@ -867,11 +865,12 @@ def parse(self, raw_content, find_message_cb):
# orig_message.message_id, part.as_string()))

mdn = part.get_payload()[-1]
mdn_status = mdn['Disposition'].split(
';').pop().strip().split(':')
mdn_status = mdn['Disposition'].split(';').\
pop().strip().split(':')
status = mdn_status[0]
if status == 'processed':
mdn_mic = mdn.get('Received-Content-MIC', '').split(',')[0]
mdn_mic = mdn.get('Received-Content-MIC', '').\
split(',')[0]

# TODO: Check MIC for all cases
if mdn_mic and orig_message.mic \
Expand Down Expand Up @@ -910,6 +909,6 @@ def detect_mdn(self):
if part.get_content_type() == 'message/disposition-notification':
mdn = part.get_payload()[0]
message_id = mdn.get('Original-Message-ID').strip('<>')
message_recipient = mdn.get(
'Original-Recipient').split(';')[1].strip()
message_recipient = mdn.get('Original-Recipient').\
split(';')[1].strip()
return message_id, message_recipient
13 changes: 5 additions & 8 deletions pyas2lib/cms.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,10 @@ def encrypt_message(data_to_encrypt, enc_alg, encryption_cert):

enc_alg_list = enc_alg.split('_')
cipher, key_length, mode = enc_alg_list[0], enc_alg_list[1], enc_alg_list[2]
enc_alg_asn1, key, encrypted_content = None, None, None
algorithm_id, iv, encrypted_content = None, None, None

# Generate the symmetric encryption key and encrypt the message
key = util.rand_bytes(int(key_length) // 8)
algorithm_id = None
iv, encrypted_content = None, None

if cipher == 'tripledes':
algorithm_id = '1.2.840.113549.3.7'
iv, encrypted_content = symmetric.tripledes_cbc_pkcs5_encrypt(
Expand Down Expand Up @@ -228,12 +225,12 @@ def sign_message(data_to_sign, digest_alg, sign_key,
:return: A CMS ASN.1 byte string of the signed data.
"""

if use_signed_attributes:
digest_func = hashlib.new(digest_alg)
digest_func.update(data_to_sign)
message_digest = digest_func.digest()

print(data_to_sign)
print(digest_alg, message_digest)
class SmimeCapability(core.Sequence):
_fields = [
('0', core.Any, {'optional': True}),
Expand Down Expand Up @@ -358,7 +355,6 @@ def verify_message(data_to_verify, signature, verify_cert):

cms_content = cms.ContentInfo.load(signature)
digest_alg = None

if cms_content['content_type'].native == 'signed_data':
for signer in cms_content['content']['signer_infos']:

Expand All @@ -382,9 +378,10 @@ def verify_message(data_to_verify, signature, verify_cert):
message_digest += d

digest_func = hashlib.new(digest_alg)
print(data_to_verify)
digest_func.update(data_to_verify)
calc_message_digest = digest_func.digest()

print(digest_alg, calc_message_digest)
if message_digest != calc_message_digest:
raise IntegrityError('Failed to verify message signature: '
'Message Digest does not match.')
Expand Down
25 changes: 18 additions & 7 deletions pyas2lib/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,19 @@ def quote_as2name(unquoted_name):
return unquoted_name


class BinaryBytesGenerator(BytesGenerator):
""" Override the bytes generator to better handle binary data """

def _handle_application_pkcs7_mime(self, msg):
""" Handle writing the binary messages to prevent default behaviour of
newline replacements """
payload = msg.get_payload(decode=True)
if payload is None:
return
else:
self._fp.write(payload)


def mime_to_bytes(msg, header_len):
"""
Function to convert and email Message to flat string format
Expand All @@ -41,7 +54,7 @@ def mime_to_bytes(msg, header_len):
:return: the byte string representation of the email message
"""
fp = BytesIO()
g = BytesGenerator(fp, maxheaderlen=header_len)
g = BinaryBytesGenerator(fp, maxheaderlen=header_len)
g.flatten(msg)
return fp.getvalue()

Expand All @@ -54,18 +67,16 @@ def canonicalize(message):
:return: the standard representation of the email message in bytes
"""

if message.is_multipart() \
or message.get('Content-Transfer-Encoding') != 'binary':

return mime_to_bytes(message, 0).replace(
b'\r\n', b'\n').replace(b'\r', b'\n').replace(b'\n', b'\r\n')
else:
if message.get('Content-Transfer-Encoding') == 'binary':
message_header = ''
message_body = message.get_payload(decode=True)
for k, v in message.items():
message_header += '{}: {}\r\n'.format(k, v)
message_header += '\r\n'
return message_header.encode('utf-8') + message_body
else:
return mime_to_bytes(message, 0).replace(
b'\r\n', b'\n').replace(b'\r', b'\n').replace(b'\n', b'\r\n')


def make_mime_boundary(text=None):
Expand Down
32 changes: 17 additions & 15 deletions tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ def test_plain_message(self):

# Parse the generated AS2 message as the partner
in_message = as2.Message()
in_message.parse(
status, _, _ = in_message.parse(
raw_out_message,
find_org_cb=self.find_org,
find_partner_cb=self.find_partner
)

# Compare contents of the input and output messages
self.assertEqual(status, 'processed')
self.assertEqual(self.test_data, in_message.content)

def test_compressed_message(self):
Expand All @@ -49,15 +50,16 @@ def test_compressed_message(self):

# Parse the generated AS2 message as the partner
in_message = as2.Message()
in_message.parse(
status, _, _ = in_message.parse(
raw_out_message,
find_org_cb=self.find_org,
find_partner_cb=self.find_partner
)

# Compare the mic contents of the input and output messages
self.assertEqual(
self.test_data.replace(b'\n', b'\r\n'), in_message.content)
self.assertEqual(status, 'processed')
self.assertTrue(in_message.compressed)
self.assertEqual(self.test_data, in_message.content)

def test_encrypted_message(self):
""" Test Encrypted Unsigned Uncompressed Message """
Expand All @@ -70,15 +72,16 @@ def test_encrypted_message(self):

# Parse the generated AS2 message as the partner
in_message = as2.Message()
in_message.parse(
status, _, _ = in_message.parse(
raw_out_message,
find_org_cb=self.find_org,
find_partner_cb=self.find_partner
)

# Compare the mic contents of the input and output messages
self.assertEqual(
self.test_data.replace(b'\n', b'\r\n'), in_message.content)
self.assertEqual(status, 'processed')
self.assertTrue(in_message.encrypted)
self.assertEqual(self.test_data, in_message.content)

def test_signed_message(self):
""" Test Unencrypted Signed Uncompressed Message """
Expand All @@ -91,15 +94,15 @@ def test_signed_message(self):

# Parse the generated AS2 message as the partner
in_message = as2.Message()
in_message.parse(
status, _, _ = in_message.parse(
raw_out_message,
find_org_cb=self.find_org,
find_partner_cb=self.find_partner
)

# Compare the mic contents of the input and output messages
self.assertEqual(
self.test_data.replace(b'\n', b'\r\n'), in_message.content)
self.assertEqual(status, 'processed')
self.assertEqual(self.test_data, in_message.content)
self.assertTrue(in_message.signed)
self.assertEqual(out_message.mic, in_message.mic)

Expand All @@ -115,15 +118,15 @@ def test_encrypted_signed_message(self):

# Parse the generated AS2 message as the partner
in_message = as2.Message()
in_message.parse(
status, _, _ = in_message.parse(
raw_out_message,
find_org_cb=self.find_org,
find_partner_cb=self.find_partner
)

# Compare the mic contents of the input and output messages
self.assertEqual(
self.test_data.replace(b'\n', b'\r\n'), in_message.content)
self.assertEqual(status, 'processed')
self.assertEqual(self.test_data, in_message.content)
self.assertTrue(in_message.signed)
self.assertTrue(in_message.encrypted)
self.assertEqual(out_message.mic, in_message.mic)
Expand All @@ -149,8 +152,7 @@ def test_encrypted_signed_compressed_message(self):

# Compare the mic contents of the input and output messages
self.assertEqual(status, 'processed')
self.assertEqual(
self.test_data.replace(b'\n', b'\r\n'), in_message.content)
self.assertEqual(self.test_data, in_message.content)
self.assertTrue(in_message.signed)
self.assertTrue(in_message.encrypted)
self.assertTrue(in_message.compressed)
Expand Down

0 comments on commit 0fcf070

Please sign in to comment.