From 3f99b2a069c1c20a6ac32cd9be5c5aa1a5d9c944 Mon Sep 17 00:00:00 2001 From: Gerrie Crafford Date: Wed, 6 Nov 2024 16:09:01 +0200 Subject: [PATCH] Add Discord webhook exposed key handling --- canarytokens/channel.py | 31 ----- canarytokens/models.py | 80 +----------- canarytokens/queries.py | 20 +-- canarytokens/utils.py | 8 +- canarytokens/webhook_formatting.py | 167 +++++++++++++++++++++++-- tests/units/test_webhook_formatting.py | 3 + 6 files changed, 175 insertions(+), 134 deletions(-) diff --git a/canarytokens/channel.py b/canarytokens/channel.py index bf5baa4d7..e9343e193 100644 --- a/canarytokens/channel.py +++ b/canarytokens/channel.py @@ -26,16 +26,12 @@ Memo, SlackAttachment, SlackField, - DiscordDetails, - DiscordEmbeds, - DiscordAuthorField, MsTeamsTitleSection, MsTeamsDetailsSection, MsTeamsPotentialAction, TokenAlertDetails, TokenAlertDetailsGoogleChat, TokenAlertDetailsSlack, - TokenAlertDetailsDiscord, TokenAlertDetailsMsTeams, ) @@ -100,33 +96,6 @@ def format_as_googlechat_canaryalert( ) -def format_as_discord_canaryalert( - details: TokenAlertDetails, -) -> TokenAlertDetailsDiscord: - embeds = DiscordEmbeds( - author=DiscordAuthorField( - icon_url="https://s3-eu-west-1.amazonaws.com/email-images.canary.tools/canary-logo-round.png", - ), - url=details.manage_url, - timestamp=details.time.strftime("%Y-%m-%dT%H:%M:%S"), - ) - - embeds.add_fields( - fields_info=DiscordDetails( - canarytoken=details.token, - token_reminder=details.memo, - src_data=details.src_data if details.src_data else None, - ).get_discord_data(), - ) - - if details.additional_data: - embeds.add_fields( - fields_info=details.additional_data, - ) - - return TokenAlertDetailsDiscord(embeds=[embeds]) - - def format_as_ms_teams_canaryalert( details: TokenAlertDetails, ) -> TokenAlertDetailsMsTeams: diff --git a/canarytokens/models.py b/canarytokens/models.py index 993267852..113eb9d93 100644 --- a/canarytokens/models.py +++ b/canarytokens/models.py @@ -49,7 +49,12 @@ CANARY_IMAGE_URL, MEMO_MAX_CHARACTERS, ) -from canarytokens.utils import prettify_snake_case, dict_to_csv, get_src_ip_continent +from canarytokens.utils import ( + json_safe_dict, + prettify_snake_case, + dict_to_csv, + get_src_ip_continent, +) CANARYTOKEN_RE = re.compile( ".*([" + "".join(CANARYTOKEN_ALPHABET) + "]{" + str(CANARYTOKEN_LENGTH) + "}).*", @@ -401,10 +406,6 @@ def __str__(self) -> str: ] -def json_safe_dict(m: BaseModel, exclude: Tuple = ()) -> Dict[str, str]: - return json.loads(m.json(exclude_none=True, exclude=set(exclude))) - - class TokenRequest(BaseModel): """ TokenRequest holds fields needed to create a Canarytoken. @@ -2234,66 +2235,6 @@ class GoogleChatCardV2(BaseModel): card: GoogleChatCard -class DiscordFieldEntry(BaseModel): - name: str = "" - value: str = "" - inline: bool = False - - -class DiscordDetails(BaseModel): - canarytoken: Canarytoken - token_reminder: Memo - src_data: Optional[dict[str, Any]] - - def get_discord_data(self) -> Dict[str, str]: - data = json_safe_dict(self) - data["Canarytoken"] = data.pop("canarytoken", "") - data["Token Reminder"] = data.pop("token_reminder", "") - if "src_data" in data: - data["Source Data"] = data.pop("src_data", "") - return data - - -class DiscordAuthorField(BaseModel): - name: str = "Canary Alerts" - icon_url: str - - -class DiscordEmbeds(BaseModel): - author: DiscordAuthorField - color: int = 3724415 # Magic colour number. Trust the process - title: str = "Canarytoken Triggered" - url: Optional[HttpUrl] - timestamp: datetime - fields: List[DiscordFieldEntry] = [] - - def add_fields(self, fields_info: Optional[Dict[str, str]] = {}) -> None: - for label, text in fields_info.items(): - if not label or not text: - continue - message_text = ( - json.dumps(text) if isinstance(text, dict) else "{}".format(text) - ) - self.fields.append( - DiscordFieldEntry( - name=label, - value=message_text, - inline=len(max(message_text.split("\n"))) < 40, - ) - ) - - @validator("timestamp", pre=True) - def validate_timestamp(cls, value): - if isinstance(value, str): - return datetime.strptime(value, "%Y-%m-%dT%H:%M:%S") - return value - - class Config: - json_encoders = { - datetime: lambda v: v.strftime("%Y-%m-%dT%H:%M:%S"), - } - - class TokenAlertDetailsGoogleChat(BaseModel): cardsV2: List[GoogleChatCardV2] @@ -2374,15 +2315,6 @@ def json_safe_dict(self) -> Dict[str, str]: return json_safe_dict(self) -class TokenAlertDetailsDiscord(BaseModel): - """Details that are sent to Discord webhooks""" - - embeds: List[DiscordEmbeds] - - def json_safe_dict(self) -> Dict[str, str]: - return json_safe_dict(self) - - class UserName(ConstrainedStr): max_lengthint: int = 30 strip_whitespace: bool = True diff --git a/canarytokens/queries.py b/canarytokens/queries.py index 5befac0b4..75991f05b 100644 --- a/canarytokens/queries.py +++ b/canarytokens/queries.py @@ -7,7 +7,7 @@ import re import secrets from ipaddress import IPv4Address -from typing import Dict, List, Literal, Optional, Tuple, Union +from typing import Dict, List, Literal, Optional, Tuple import advocate import requests @@ -859,13 +859,6 @@ def validate_webhook(url, token_type: models.TokenTypes): if len(url) > constants.MAX_WEBHOOK_URL_LENGTH: raise WebhookTooLongError() - payload: Union[ - models.TokenAlertDetails, - models.TokenAlertDetailsSlack, - models.TokenAlertDetailsGoogleChat, - models.TokenAlertDetailsDiscord, - models.TokenAlertDetailsMsTeams, - ] webhook_type = get_webhook_type(url) if webhook_type == WebhookType.SLACK: payload = models.TokenAlertDetailsSlack( @@ -896,17 +889,6 @@ def validate_webhook(url, token_type: models.TokenTypes): payload = models.TokenAlertDetailsGoogleChat( cardsV2=[models.GoogleChatCardV2(cardId="unique-card-id", card=card)] ) - elif webhook_type == WebhookType.DISCORD: - # construct discord alert card - embeds = models.DiscordEmbeds( - author=models.DiscordAuthorField( - icon_url="https://s3-eu-west-1.amazonaws.com/email-images.canary.tools/canary-logo-round.png" - ), - title="Validating new canarytokens webhook", - fields=[], - timestamp=datetime.datetime.now(), - ) - payload = models.TokenAlertDetailsDiscord(embeds=[embeds]) elif webhook_type == WebhookType.MS_TEAMS: section = models.MsTeamsTitleSection( activityTitle="Validating new Canarytokens webhook" diff --git a/canarytokens/utils.py b/canarytokens/utils.py index a7e77963e..938a0ce0b 100644 --- a/canarytokens/utils.py +++ b/canarytokens/utils.py @@ -1,8 +1,14 @@ import subprocess from pathlib import Path -from typing import Any, Literal, Union +from typing import Any, Literal, Union, Tuple, Dict +import json import pycountry_convert +from pydantic import BaseModel + + +def json_safe_dict(m: BaseModel, exclude: Tuple = ()) -> Dict[str, str]: + return json.loads(m.json(exclude_none=True, exclude=set(exclude))) def dict_to_csv(d: dict) -> str: diff --git a/canarytokens/webhook_formatting.py b/canarytokens/webhook_formatting.py index d2dfd5974..6edaeef97 100644 --- a/canarytokens/webhook_formatting.py +++ b/canarytokens/webhook_formatting.py @@ -1,20 +1,22 @@ from __future__ import annotations -from typing import Union +from typing import List, Union, Optional, Dict +import json from enum import Enum import re from functools import partial from datetime import datetime -from pydantic import HttpUrl, parse_obj_as +from pydantic import HttpUrl, parse_obj_as, BaseModel, validator from canarytokens import constants +from canarytokens.utils import json_safe_dict, prettify_snake_case from canarytokens.channel import ( - format_as_discord_canaryalert, format_as_googlechat_canaryalert, format_as_ms_teams_canaryalert, format_as_slack_canaryalert, ) from canarytokens.models import ( + readable_token_type_names, Memo, TokenTypes, TokenAlertDetails, @@ -22,7 +24,33 @@ ) +CANARY_LOGO_ROUND_PUBLIC_URL = parse_obj_as( + HttpUrl, + constants.CANARY_IMAGE_URL, +) WEBHOOK_TEST_URL = parse_obj_as(HttpUrl, "http://example.com/test/url/for/webhook") +TOKEN_EXPOSED_DESCRIPTION = "One of your {readable_type} Canarytokens has been found on the internet. A publicly exposed token will provide very low quality alerts. We recommend that you disable and replace this token on private infrastructure." +MAX_INLINE_LENGTH = 40 # Max length of content to share a line with other content + + +class HexColor(Enum): + WARNING = "#ed6c02" + ERROR = "#d32f2f" + CANARY_GREEN = "#3ad47f" + + @property + def decimal_value(self): + return int(self.value_without_hash, 16) + + @property + def value_without_hash(self): + return self.value[1:] + + +class DecimalColor(Enum): + WARNING = HexColor.WARNING.decimal_value + ERROR = HexColor.ERROR.decimal_value + CANARY_GREEN = HexColor.CANARY_GREEN.decimal_value class WebhookType(Enum): @@ -82,7 +110,7 @@ def _format_alert_details_for_webhook( elif webhook_type == WebhookType.GOOGLE_CHAT: return format_as_googlechat_canaryalert(details) elif webhook_type == WebhookType.DISCORD: - return format_as_discord_canaryalert(details) + return _format_as_discord_canaryalert(details) elif webhook_type == WebhookType.MS_TEAMS: return format_as_ms_teams_canaryalert(details) elif webhook_type == WebhookType.GENERIC: @@ -105,9 +133,7 @@ def _format_exposed_details_for_webhook( f"_format_exposed_details_for_webhook not implemented for webhook type: {webhook_type}" ) elif webhook_type == WebhookType.DISCORD: - raise NotImplementedError( - f"_format_exposed_details_for_webhook not implemented for webhook type: {webhook_type}" - ) + return _format_as_discord_token_exposed(details) elif webhook_type == WebhookType.MS_TEAMS: raise NotImplementedError( f"_format_exposed_details_for_webhook not implemented for webhook type: {webhook_type}" @@ -130,9 +156,14 @@ def generate_webhook_test_payload(webhook_type: WebhookType, token_type: TokenTy "generate_webhook_test_payload not implemented for GOOGLE_CHAT" ) elif webhook_type == WebhookType.DISCORD: - raise NotImplementedError( - "generate_webhook_test_payload not implemented for DISCORD" + embeds = DiscordEmbeds( + author=DiscordAuthorField(icon_url=CANARY_LOGO_ROUND_PUBLIC_URL), + url=WEBHOOK_TEST_URL, + title="Validating new Canarytokens webhook", + fields=[], + timestamp=datetime.now(), ) + return TokenAlertDetailsDiscord(embeds=[embeds]) elif webhook_type == WebhookType.MS_TEAMS: raise NotImplementedError( "generate_webhook_test_payload not implemented for MS_TEAMS" @@ -159,9 +190,127 @@ def generate_webhook_test_payload(webhook_type: WebhookType, token_type: TokenTy ) +def _format_as_discord_canaryalert( + details: TokenAlertDetails, +) -> TokenAlertDetailsDiscord: + embeds = DiscordEmbeds( + author=DiscordAuthorField( + icon_url=CANARY_LOGO_ROUND_PUBLIC_URL, + ), + url=details.manage_url, + timestamp=details.time.strftime("%Y-%m-%dT%H:%M:%S"), + color=DecimalColor.ERROR.value, + ) + + embeds.add_fields( + { + "canarytoken": details.token, + "token_reminder": details.memo, + "src_data": details.src_data if details.src_data else None, + } + ) + + if details.additional_data: + embeds.add_fields(details.additional_data) + + embeds.add_fields({"Manage token": details.manage_url}) + + return TokenAlertDetailsDiscord(embeds=[embeds]) + + +def _format_as_discord_token_exposed( + details: TokenExposedDetails, +) -> TokenAlertDetailsDiscord: + embeds = DiscordEmbeds( + author=DiscordAuthorField( + icon_url=CANARY_LOGO_ROUND_PUBLIC_URL, + ), + url=details.manage_url, + timestamp=details.exposed_time.strftime("%Y-%m-%dT%H:%M:%S"), + description=_get_exposed_token_description(details.token_type), + color=DecimalColor.WARNING.value, + ) + + embeds.add_fields( + { + "Key ID": details.key_id, + "Token Reminder": details.memo, + "Key exposed here": details.public_location, + "Manage token": details.manage_url, + } + ) + + return TokenAlertDetailsDiscord(embeds=[embeds]) + + +class DiscordFieldEntry(BaseModel): + name: str = "" + value: str = "" + inline: bool = False + + +class DiscordAuthorField(BaseModel): + name: str = "Canary Alerts" + icon_url: str + + +class DiscordEmbeds(BaseModel): + author: DiscordAuthorField + color: int = DecimalColor.CANARY_GREEN.value + title: str = "Canarytoken Triggered" + description: Optional[str] = None + url: Optional[HttpUrl] + timestamp: datetime + fields: List[DiscordFieldEntry] = [] + + def add_fields(self, fields_info: Dict[str, str]) -> None: + for label, text in fields_info.items(): + if not label or not text: + continue + + message_text = ( + f"```{json.dumps(text)}```" + if isinstance(text, dict) + else "{}".format(text) + ) + self.fields.append( + DiscordFieldEntry( + name=prettify_snake_case(label), + value=message_text, + inline=len(max(message_text.split("\n"))) < MAX_INLINE_LENGTH, + ) + ) + + @validator("timestamp", pre=True) + def validate_timestamp(cls, value): + if isinstance(value, str): + return datetime.strptime(value, "%Y-%m-%dT%H:%M:%S") + return value + + class Config: + json_encoders = { + datetime: lambda v: v.strftime("%Y-%m-%dT%H:%M:%S"), + } + + +class TokenAlertDetailsDiscord(BaseModel): + """Details that are sent to Discord webhooks""" + + embeds: List[DiscordEmbeds] + + def json_safe_dict(self) -> Dict[str, str]: + return json_safe_dict(self) + + class TokenAlertDetailGeneric(TokenAlertDetails): ... class TokenExposedDetailGeneric(TokenExposedDetails): ... + + +def _get_exposed_token_description(token_type: TokenTypes) -> str: + return TOKEN_EXPOSED_DESCRIPTION.format( + readable_type=readable_token_type_names[token_type] + ) diff --git a/tests/units/test_webhook_formatting.py b/tests/units/test_webhook_formatting.py index 852079f1f..b044c2a9b 100644 --- a/tests/units/test_webhook_formatting.py +++ b/tests/units/test_webhook_formatting.py @@ -7,6 +7,7 @@ format_details_for_webhook, get_webhook_type, TokenAlertDetailGeneric, + TokenAlertDetailsDiscord, TokenExposedDetailGeneric, ) @@ -42,6 +43,8 @@ def test_get_webhook_type(url: str, expected_type: WebhookType): [ ("alert", WebhookType.GENERIC, TokenAlertDetailGeneric), ("exposed", WebhookType.GENERIC, TokenExposedDetailGeneric), + ("alert", WebhookType.DISCORD, TokenAlertDetailsDiscord), + ("exposed", WebhookType.DISCORD, TokenAlertDetailsDiscord), ], ) def test_format_details_for_webhook_alert_type(