Skip to content

Commit

Permalink
moved the byte reading to the parser. If just the parser is used it w…
Browse files Browse the repository at this point in the history
…ill find the BOM issue
  • Loading branch information
DigitalTrustCenter committed Apr 9, 2024
1 parent ad85c74 commit 49f3872
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 56 deletions.
6 changes: 3 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
requests==2.31.0
python-dateutil==2.8.2
python-dateutil==2.9.0.post0
langcodes==3.3.0
pytest==7.4.3
requests-mock==1.11.0
pytest==8.1.1
requests-mock==1.12.1
PGPy==0.6.0
46 changes: 24 additions & 22 deletions sectxt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import dateutil.parser
import requests

__version__ = "0.9.2"
__version__ = "0.9.3"

s = requests.Session()

Expand Down Expand Up @@ -69,7 +69,7 @@ class Parser:

def __init__(
self,
content: str,
content: bytes,
urls: Optional[str] = None,
recommend_unknown_fields: bool = True,
is_local: bool = False
Expand All @@ -85,13 +85,15 @@ def __init__(
self._reading_sig = False
self._finished_sig = False
self._content = content
self._content_str = None
self.recommend_unknown_fields = recommend_unknown_fields
self.is_local = is_local
self._line_no: Optional[int] = None
self._process()

def _process(self) -> None:
lines = self._content.split("\n")
self._content_str = self._get_str(self._content)
lines = self._content_str.split("\n")
self._line_no = 1
for line in lines:
self._line_info.append(self._parse_line(line))
Expand Down Expand Up @@ -159,7 +161,7 @@ def _parse_line(self, line: str) -> LineDict:

# Check pgp formatting if signed
try:
pgpy.PGPMessage.from_blob(self._content)
pgpy.PGPMessage.from_blob(self._content_str)
except ValueError:
self._add_error(
"pgp_data_error",
Expand Down Expand Up @@ -363,6 +365,21 @@ def validate_contents(self) -> None:
def is_valid(self) -> bool:
return not self._errors

def _get_str(self, content: bytes) -> str:
try:
if content.startswith(codecs.BOM_UTF8):
content = content.replace(codecs.BOM_UTF8, b'', 1)
self._add_error(
"bom_in_file",
"The Byte-Order Mark was found at the start of the file. "
"Security.txt must be encoded using UTF-8 in Net-Unicode form, "
"the BOM signature must not appear at the beginning."
)
return content.decode('utf-8')
except UnicodeError:
self._add_error("utf8", "Content must be utf-8 encoded.")
return content.decode('utf-8', errors="replace")

@property
def errors(self) -> List[ErrorDict]:
return self._errors
Expand Down Expand Up @@ -417,27 +434,12 @@ def __init__(self, url: str, recommend_unknown_fields: bool = True, is_local: bo
self._loc = loc
self._path: Optional[str] = None
self._url: Optional[str] = None
super().__init__("", recommend_unknown_fields=recommend_unknown_fields, is_local=is_local)

def _get_str(self, content: bytes) -> str:
try:
if content.startswith(codecs.BOM_UTF8):
content = content.replace(codecs.BOM_UTF8, b'', 1)
self._add_error(
"bom_in_file",
"The Byte-Order Mark was found at the start of the file. "
"Security.txt must be encoded using UTF-8 in Net-Unicode form, "
"the BOM signature must not appear at the beginning."
)
return content.decode('utf-8')
except UnicodeError:
self._add_error("utf8", "Content must be utf-8 encoded.")
return content.decode('utf-8', errors="replace")
super().__init__(b'', recommend_unknown_fields=recommend_unknown_fields, is_local=is_local)

def _process(self) -> None:
if self.is_local:
security_txt_file = open(self._loc, mode="rb")
self._content = self._get_str(security_txt_file.read())
self._content = security_txt_file.read()
security_txt_file.close()
super()._process()
else:
Expand Down Expand Up @@ -509,7 +511,7 @@ def _process(self) -> None:
"Charset parameter in Content-Type header must be "
"'utf-8' if present.",
)
self._content = self._get_str(resp.content)
self._content = resp.content
if resp.history:
self._urls = [resp.history[0].url, resp.url]
else:
Expand Down
70 changes: 39 additions & 31 deletions test/test_sectxt.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,25 +46,25 @@
class SecTxtTestCase(TestCase):
def test_future_expires(self):
content = f"Expires: {date.today().year + 3}-01-01T12:00:00Z\n"
p = Parser(content)
p = Parser(content.encode())
self.assertEqual(p._recommendations[0]["code"], "long_expiry")

def test_invalid_expires(self):
content = "Expires: Nonsense\n"
p = Parser(content)
p = Parser(content.encode())
self.assertEqual(p._errors[0]["code"], "invalid_expiry")
content = "Expires: Thu, 15 Sep 2022 06:03:46 -0700\n"
p = Parser(content)
p = Parser(content.encode())
self.assertEqual(p._errors[0]["code"], "invalid_expiry")

def test_expired(self):
content = "Expires: 2020-01-01T12:00:00Z\n"
p = Parser(content)
p = Parser(content.encode())
self.assertEqual(p._errors[0]["code"], "expired")

def test_long_expiry(self):
content = "Expires: 2030-01-01T12:00:00Z\n# Wow"
p = Parser(content)
p = Parser(content.encode())
line_info = p._line_info[1]
self.assertEqual(line_info["type"], "comment")
self.assertEqual(line_info["value"], "# Wow")
Expand All @@ -79,65 +79,65 @@ def test_preferred_languages(self):

# Single invalid value.
content = static_content + "Preferred-Languages: English"
p = Parser(content)
p = Parser(content.encode())
self.assertEqual(p._errors[0]["code"], "invalid_lang")

# Mix of valid and invalid value.
content = static_content + "Preferred-Languages: nl, Invalid"
p = Parser(content)
p = Parser(content.encode())
self.assertEqual(p._errors[0]["code"], "invalid_lang")

# Both ISO 639-1 (2 char) and ISO 639-2 (3 char) should be valid.
# Case should be ignored.
content = static_content + "Preferred-Languages: En, dUT"
p = Parser(content)
p = Parser(content.encode())
self.assertFalse(any(error["code"] == "invalid_lang" for error in p._errors))

def test_prec_ws(self):
content = "Contact : mailto:[email protected]\n# Wow"
p = Parser(content)
p = Parser(content.encode())
self.assertEqual(p._errors[0]["code"], "prec_ws")

def test_empty_key(self):
content = ": mailto:[email protected]\n# Wow"
p = Parser(content)
p = Parser(content.encode())
self.assertEqual(p._errors[0]["code"], "empty_key")

def test_empty_key2(self):
content = " : mailto:[email protected]\n# Wow"
p = Parser(content)
p = Parser(content.encode())
self.assertEqual(p._errors[1]["code"], "empty_key")

def test_missing_space(self):
content = "Contact:mailto:[email protected]\n# Wow"
p = Parser(content)
p = Parser(content.encode())
self.assertEqual(p._errors[0]["code"], "no_space")

def test_missing_value(self):
content = "Contact: \n# Wow"
p = Parser(content)
p = Parser(content.encode())
self.assertEqual(p._errors[0]["code"], "empty_value")

def test_no_https(self):
content = "Contact: http://example.com/contact\n# Wow"
p = Parser(content)
p = Parser(content.encode())
self.assertEqual(p._errors[0]["code"], "no_https")

def test_no_uri(self):
content = "Contact: [email protected]\n# Wow"
p = Parser(content)
p = Parser(content.encode())
self.assertEqual(p._errors[0]["code"], "no_uri")

def test_signed(self):
p = Parser(_signed_example)
p = Parser(_signed_example.encode())
self.assertTrue(p.is_valid())

def test_signed_invalid_pgp(self):
# Remove required pgp signature header for pgp data error
content = _signed_example.replace(
"-----BEGIN PGP SIGNATURE-----", ""
)
p1 = Parser(content)
p1 = Parser(content.encode())
self.assertFalse(p1.is_valid())
self.assertEqual(
len([1 for r in p1._errors if r["code"] == "pgp_data_error"]), 1
Expand All @@ -146,7 +146,7 @@ def test_signed_invalid_pgp(self):
content = _signed_example.replace(
"-----BEGIN PGP SIGNATURE-----", "-----BEGIN PGP SIGNATURE-----\n- \n"
)
p2 = Parser(content)
p2 = Parser(content.encode())
self.assertFalse(p2.is_valid())
self.assertEqual(
len([1 for r in p2._errors if r["code"] == "pgp_data_error"]), 1
Expand All @@ -157,7 +157,7 @@ def test_signed_invalid_pgp(self):
).replace(
"HHXU8bf222naeYJHgaHadLTJJ8YQIQ9N5fYF7K4BM0jPZc48aaUPaBdhNxw+", "HHXU8bf222naeYJHga"
)
p3 = Parser(content)
p3 = Parser(content.encode())
self.assertFalse(p3.is_valid())
self.assertEqual(
len([1 for r in p3._errors if r["code"] == "pgp_error"]), 1
Expand All @@ -167,17 +167,17 @@ def test_signed_no_canonical(self):
content = _signed_example.replace(
"Canonical: https://example.com/.well-known/security.txt", ""
)
p = Parser(content)
p = Parser(content.encode())
self.assertEqual(p._recommendations[0]["code"], "no_canonical")

def test_signed_dash_escaped(self):
content = _signed_example.replace("Expires", "- Expires")
p = Parser(content)
p = Parser(content.encode())
self.assertTrue(p.is_valid())

def test_pgp_signed_formatting(self):
content = "\r\n" + _signed_example
p = Parser(content)
p = Parser(content.encode())
self.assertFalse(p.is_valid())
self.assertTrue(any(d["code"] == "signed_format_issue" for d in p.errors))

Expand All @@ -194,14 +194,14 @@ def test_unknown_fields(self):
)

# By default, recommend that there are unknown fields.
p = Parser(content)
p = Parser(content.encode())
self.assertTrue(p.is_valid())
self.assertEqual(
len([1 for r in p._notifications if r["code"] == "unknown_field"]), 2
)

# When turned off, there should be no unknown_field recommendations.
p = Parser(content, recommend_unknown_fields=False)
p = Parser(content.encode(), recommend_unknown_fields=False)
self.assertTrue(p.is_valid())
self.assertEqual(
len([1 for r in p._notifications if r["code"] == "unknown_field"]), 0
Expand All @@ -213,7 +213,7 @@ def test_no_line_separators(self):
"Contact: mailto:[email protected] Expires: "
f"{expire_date}T18:37:07z # All on a single line"
)
p_line_separator = Parser(single_line_security_txt)
p_line_separator = Parser(single_line_security_txt.encode())
self.assertFalse(p_line_separator.is_valid())
self.assertEqual(
len([1 for r in p_line_separator._errors if r["code"] == "no_line_separators"]), 1
Expand All @@ -224,7 +224,7 @@ def test_no_line_separators(self):
"line 3\n"
"Contact: mailto:[email protected] Expires"
)
p_length_4 = Parser(line_length_4_no_carriage_feed)
p_length_4 = Parser(line_length_4_no_carriage_feed.encode())
self.assertFalse(p_length_4.is_valid())
self.assertEqual(
len([1 for r in p_length_4._errors if r["code"] == "no_line_separators"]), 1
Expand All @@ -238,7 +238,7 @@ def test_csaf_https_uri(self):
"CSAF: https://example.com/.well-known/csaf/provider-metadata.json",
"CSAF: http://example.com/.well-known/csaf/provider-metadata.json",
)
p = Parser(content)
p = Parser(content.encode())
self.assertFalse(p.is_valid())
self.assertEqual(len([1 for r in p._errors if r["code"] == "no_https"]), 1)

Expand All @@ -247,7 +247,7 @@ def test_csaf_provider_file(self):
"CSAF: https://example.com/.well-known/csaf/provider-metadata.json",
"CSAF: https://example.com/.well-known/csaf/other_provider_name.json",
)
p = Parser(content)
p = Parser(content.encode())
self.assertFalse(p.is_valid())
self.assertEqual(len([1 for r in p._errors if r["code"] == "no_csaf_file"]), 1)

Expand All @@ -257,7 +257,7 @@ def test_multiple_csaf_notification(self):
"# CSAF link\n"
"CSAF: https://example2.com/.well-known/csaf/provider-metadata.json",
)
p = Parser(content)
p = Parser(content.encode())
self.assertTrue(p.is_valid())
self.assertEqual(
len([1 for r in p._recommendations if r["code"] == "multiple_csaf_fields"]), 1
Expand Down Expand Up @@ -304,19 +304,27 @@ def test_invalid_uri_scheme(self):
if not any(d["code"] == "invalid_uri_scheme" for d in s.errors):
pytest.fail("invalid_uri_scheme error code should be given")

def test_byte_order_mark_parser(self):
expires = f"Expires: {(date.today() + timedelta(days=10)).isoformat()}T18:37:07z\n"
byte_content_with_bom = b'\xef\xbb\xbf\xef\xbb\xbfContact: mailto:[email protected]\n' \
+ expires.encode()
p = Parser(byte_content_with_bom)
self.assertFalse(p.is_valid())
self.assertTrue(any(d["code"] == "bom_in_file" for d in p.errors))

# noinspection PyMethodMayBeStatic
def test_byte_order_mark(self):
with Mocker() as m:
expires = f"Expires: {(date.today() + timedelta(days=10)).isoformat()}T18:37:07z\n"
byte_content_with_bom = b'\xef\xbb\xbf\xef\xbb\xbfContact: mailto:[email protected]\n' \
+ bytes(expires, "utf-8")
+ expires.encode()
m.get(
"https://example.com/.well-known/security.txt",
headers={"content-type": "text/plain"},
content=byte_content_with_bom,
)
s = SecurityTXT("example.com")
assert(not s.is_valid())
assert (not s.is_valid())
if not any(d["code"] == "bom_in_file" for d in s.errors):
pytest.fail("bom_in_file error code should be given")

Expand Down

0 comments on commit 49f3872

Please sign in to comment.