Skip to content

Commit

Permalink
Linting
Browse files Browse the repository at this point in the history
  • Loading branch information
neoformit committed Nov 12, 2024
1 parent 727a974 commit 200d9e9
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 31 deletions.
24 changes: 10 additions & 14 deletions files/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,31 @@ def tearDown(self):

def test_contains_new_entry(self):
jwd = "unique_id_1"
self.assertFalse(self.record.contains(jwd),
"New entry should initially return False")
self.assertTrue(self.record.contains(jwd),
"Duplicate entry should return True")
self.assertFalse(
self.record.contains(jwd), "New entry should initially return False"
)
self.assertTrue(self.record.contains(jwd), "Duplicate entry should return True")

def test_contains_existing_entry(self):
jwd = "existing_id"
with open(self.temp_file.name, "a") as f:
f.write(f"{datetime.now()}\t{jwd}\n")
self.assertTrue(self.record.contains(jwd),
"Existing entry should return True")
self.assertTrue(self.record.contains(jwd), "Existing entry should return True")

@patch("walle.SLACK_NOTIFY_PERIOD_DAYS", new=SLACK_NOTIFY_PERIOD_DAYS)
def test_truncate_old_records(self):
old_jwd = "old_entry"
recent_jwd = "recent_entry"
old_date = datetime.now() - timedelta(
days=SLACK_NOTIFY_PERIOD_DAYS + 1)
old_date = datetime.now() - timedelta(days=SLACK_NOTIFY_PERIOD_DAYS + 1)
recent_date = datetime.now()

with open(self.temp_file.name, "a") as f:
f.write(f"{old_date.isoformat()}\t{old_jwd}\n")
f.write(f"{recent_date.isoformat()}\t{recent_jwd}\n")

self.record._truncate_records()
self.assertFalse(self.record.contains(old_jwd),
"Old entry should be purged")
self.assertTrue(self.record.contains(recent_jwd),
"Recent entry should remain")
self.assertFalse(self.record.contains(old_jwd), "Old entry should be purged")
self.assertTrue(self.record.contains(recent_jwd), "Recent entry should remain")

def test_purge_invalid_records(self):
with open(self.temp_file.name, "w") as f:
Expand All @@ -57,8 +53,8 @@ def test_purge_invalid_records(self):
self.record._read_records()
mock_warning.assert_called()

self.assertFalse(self.record._get_jwds(),
"Invalid records should be purged")
self.assertFalse(self.record._get_jwds(), "Invalid records should be purged")


if __name__ == "__main__":
unittest.main()
30 changes: 13 additions & 17 deletions files/walle.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@
)
logger = logging.getLogger(__name__)
GXADMIN_PATH = os.getenv("GXADMIN_PATH", "/usr/local/bin/gxadmin")
NOTIFICATION_HISTORY_FILE = os.getenv("WALLE_NOTIFICATION_HISTORY_FILE",
"/tmp/walle-notifications.txt")
NOTIFICATION_HISTORY_FILE = os.getenv(
"WALLE_NOTIFICATION_HISTORY_FILE", "/tmp/walle-notifications.txt"
)


def convert_arg_to_byte(mb: str) -> int:
Expand All @@ -72,16 +73,12 @@ def __init__(self, record_file: str) -> None:
self._truncate_records()

def _get_jwds(self) -> List[str]:
return [
line[1] for line in self._read_records()
]
return [line[1] for line in self._read_records()]

def _read_records(self) -> List[str]:
with open(self.record_file, "r") as f:
records = [
line.strip().split('\t')
for line in f.readlines()
if line.strip()
line.strip().split("\t") for line in f.readlines() if line.strip()
]
return self._validate(records)

Check failure on line 83 in files/walle.py

View workflow job for this annotation

GitHub Actions / pyright

Type "List[List[str]]" is not assignable to return type "List[str]"   "List[List[str]]" is not assignable to "List[str]"     Type parameter "_T@list" is invariant, but "List[str]" is not the same as "str"     Consider switching from "list" to "Sequence" which is covariant (reportReturnType)

Expand All @@ -95,27 +92,27 @@ def _validate(self, records: List[List[str]]) -> List[List[str]]:
logger.warning(
f"Invalid records found in {self.record_file}. The"
" file will be purged. This may result in duplicate Slack"
" notifications.")
" notifications."
)
self._purge_records()
return []
return records

def _write_jwd(self, jwd: str):
def _write_jwd(self, jwd: str) -> None:
with open(self.record_file, "a") as f:
f.write(f"{datetime.now()}\t{jwd}\n")

def _purge_records(self):
def _purge_records(self) -> None:
self.record_file.unlink()
self.record_file.touch()

def _truncate_records(self):
def _truncate_records(self) -> None:
"""Truncate older records."""
records = self._read_records()
with open(self.record_file, "w") as f:
for datestr, jwd_path in records:
if (
datetime.fromisoformat(datestr)
> datetime.now() - timedelta(days=SLACK_NOTIFY_PERIOD_DAYS)
if datetime.fromisoformat(datestr) > datetime.now() - timedelta(
days=SLACK_NOTIFY_PERIOD_DAYS
):
f.write(f"{datestr}\t{jwd_path}\n")

Expand Down Expand Up @@ -478,8 +475,7 @@ def report_matching_malware(self):

def post_slack_alert(self):
if notification_history.contains(self.job.jwd):

Check failure on line 477 in files/walle.py

View workflow job for this annotation

GitHub Actions / pyright

Argument of type "Path" cannot be assigned to parameter "jwd" of type "str" in function "contains"   "Path" is not assignable to "str" (reportArgumentType)
logger.debug(
"Skipping Slack notification - already posted for this JWD")
logger.debug("Skipping Slack notification - already posted for this JWD")
return
msg = f"""
:rotating_light: WALLE: *Malware detected* :rotating_light:
Expand Down

0 comments on commit 200d9e9

Please sign in to comment.