diff --git a/canarytokens/channel.py b/canarytokens/channel.py
index bf5baa4d7..756387d96 100644
--- a/canarytokens/channel.py
+++ b/canarytokens/channel.py
@@ -29,14 +29,10 @@
DiscordDetails,
DiscordEmbeds,
DiscordAuthorField,
- MsTeamsTitleSection,
- MsTeamsDetailsSection,
- MsTeamsPotentialAction,
TokenAlertDetails,
TokenAlertDetailsGoogleChat,
TokenAlertDetailsSlack,
TokenAlertDetailsDiscord,
- TokenAlertDetailsMsTeams,
)
log = Logger()
@@ -127,28 +123,6 @@ def format_as_discord_canaryalert(
return TokenAlertDetailsDiscord(embeds=[embeds])
-def format_as_ms_teams_canaryalert(
- details: TokenAlertDetails,
-) -> TokenAlertDetailsMsTeams:
- sections = [
- MsTeamsTitleSection(activityTitle="Canarytoken triggered"),
- MsTeamsDetailsSection(
- canarytoken=details.token,
- token_reminder=details.memo,
- src_data=details.src_data if details.src_data else None,
- additional_data=details.additional_data,
- ),
- ]
-
- return TokenAlertDetailsMsTeams(
- summary="Canarytoken triggered",
- sections=sections,
- potentialAction=[
- MsTeamsPotentialAction(name="Manage", target=[details.manage_url])
- ],
- )
-
-
class Channel(object):
CHANNEL = "Base"
diff --git a/canarytokens/models.py b/canarytokens/models.py
index 993267852..922642431 100644
--- a/canarytokens/models.py
+++ b/canarytokens/models.py
@@ -46,10 +46,9 @@
from canarytokens.constants import (
CANARYTOKEN_ALPHABET,
CANARYTOKEN_LENGTH,
- CANARY_IMAGE_URL,
MEMO_MAX_CHARACTERS,
)
-from canarytokens.utils import prettify_snake_case, dict_to_csv, get_src_ip_continent
+from canarytokens.utils import json_safe_dict, get_src_ip_continent
CANARYTOKEN_RE = re.compile(
".*([" + "".join(CANARYTOKEN_ALPHABET) + "]{" + str(CANARYTOKEN_LENGTH) + "}).*",
@@ -401,10 +400,6 @@ def __str__(self) -> str:
]
-def json_safe_dict(m: BaseModel, exclude: Tuple = ()) -> Dict[str, str]:
- return json.loads(m.json(exclude_none=True, exclude=set(exclude)))
-
-
class TokenRequest(BaseModel):
"""
TokenRequest holds fields needed to create a Canarytoken.
@@ -2310,70 +2305,6 @@ def json_safe_dict(self) -> Dict[str, str]:
return json_safe_dict(self)
-class MsTeamsDetailsSection(BaseModel):
- canarytoken: Canarytoken
- token_reminder: Memo
- src_data: Optional[dict[str, Any]] = None
- additional_data: Optional[dict[str, Any]] = None
-
- def dict(self, *args, **kwargs):
- data = json_safe_dict(self)
- data["Canarytoken"] = data.pop("canarytoken", "")
- data["Token Reminder"] = data.pop("token_reminder", "")
- if "src_data" in data:
- data["Source Data"] = data.pop("src_data", "")
-
- if data["additional_data"]:
- add_data = data.pop("additional_data", {})
- data.update(add_data)
-
- facts = []
- for k, v in data.items():
- if not v:
- continue
-
- if isinstance(v, dict):
- v = dict_to_csv(v)
- else:
- v = str(v)
-
- facts.append({"name": prettify_snake_case(k), "value": v})
-
- return {"facts": facts}
-
-
-class MsTeamsTitleSection(BaseModel):
- activityTitle: str
- activityImage = CANARY_IMAGE_URL
-
-
-class MsTeamsPotentialAction(BaseModel):
- name: str
- target: List[AnyHttpUrl]
- type: str = "ViewAction"
- context: str = "http://schema.org"
-
- def dict(self, *args, **kwargs):
- d = super().dict(*args, **kwargs)
-
- d["@type"] = d.pop("type")
- d["@context"] = d.pop("context")
-
- return d
-
-
-class TokenAlertDetailsMsTeams(BaseModel):
- """Details that are sent to MS Teams webhooks."""
-
- summary: str
- themeColor = "ff0000"
- sections: Optional[List[Union[MsTeamsTitleSection, MsTeamsDetailsSection]]] = None
- potentialAction: Optional[List[MsTeamsPotentialAction]] = None
-
- def json_safe_dict(self) -> Dict[str, str]:
- return json_safe_dict(self)
-
-
class TokenAlertDetailsDiscord(BaseModel):
"""Details that are sent to Discord webhooks"""
diff --git a/canarytokens/queries.py b/canarytokens/queries.py
index 5befac0b4..910f32944 100644
--- a/canarytokens/queries.py
+++ b/canarytokens/queries.py
@@ -7,7 +7,7 @@
import re
import secrets
from ipaddress import IPv4Address
-from typing import Dict, List, Literal, Optional, Tuple, Union
+from typing import Dict, List, Literal, Optional, Tuple
import advocate
import requests
@@ -859,13 +859,6 @@ def validate_webhook(url, token_type: models.TokenTypes):
if len(url) > constants.MAX_WEBHOOK_URL_LENGTH:
raise WebhookTooLongError()
- payload: Union[
- models.TokenAlertDetails,
- models.TokenAlertDetailsSlack,
- models.TokenAlertDetailsGoogleChat,
- models.TokenAlertDetailsDiscord,
- models.TokenAlertDetailsMsTeams,
- ]
webhook_type = get_webhook_type(url)
if webhook_type == WebhookType.SLACK:
payload = models.TokenAlertDetailsSlack(
@@ -907,13 +900,6 @@ def validate_webhook(url, token_type: models.TokenTypes):
timestamp=datetime.datetime.now(),
)
payload = models.TokenAlertDetailsDiscord(embeds=[embeds])
- elif webhook_type == WebhookType.MS_TEAMS:
- section = models.MsTeamsTitleSection(
- activityTitle="Validating new Canarytokens webhook"
- )
- payload = models.TokenAlertDetailsMsTeams(
- summary="Validating new Canarytokens webhook", sections=[section]
- )
else:
payload = generate_webhook_test_payload(webhook_type, token_type)
diff --git a/canarytokens/utils.py b/canarytokens/utils.py
index a7e77963e..938a0ce0b 100644
--- a/canarytokens/utils.py
+++ b/canarytokens/utils.py
@@ -1,8 +1,14 @@
import subprocess
from pathlib import Path
-from typing import Any, Literal, Union
+from typing import Any, Literal, Union, Tuple, Dict
+import json
import pycountry_convert
+from pydantic import BaseModel
+
+
+def json_safe_dict(m: BaseModel, exclude: Tuple = ()) -> Dict[str, str]:
+ return json.loads(m.json(exclude_none=True, exclude=set(exclude)))
def dict_to_csv(d: dict) -> str:
diff --git a/canarytokens/webhook_formatting.py b/canarytokens/webhook_formatting.py
index d2dfd5974..fa4e224de 100644
--- a/canarytokens/webhook_formatting.py
+++ b/canarytokens/webhook_formatting.py
@@ -1,20 +1,22 @@
from __future__ import annotations
-from typing import Union
+from typing import List, Union, Optional, Dict
from enum import Enum
import re
from functools import partial
from datetime import datetime
-from pydantic import HttpUrl, parse_obj_as
+from pydantic import HttpUrl, parse_obj_as, BaseModel
from canarytokens import constants
from canarytokens.channel import (
format_as_discord_canaryalert,
format_as_googlechat_canaryalert,
- format_as_ms_teams_canaryalert,
format_as_slack_canaryalert,
)
+
+from canarytokens.utils import json_safe_dict, prettify_snake_case, dict_to_csv
from canarytokens.models import (
+ readable_token_type_names,
Memo,
TokenTypes,
TokenAlertDetails,
@@ -22,7 +24,33 @@
)
+CANARY_LOGO_ROUND_PUBLIC_URL = parse_obj_as(
+ HttpUrl,
+ constants.CANARY_IMAGE_URL,
+)
WEBHOOK_TEST_URL = parse_obj_as(HttpUrl, "http://example.com/test/url/for/webhook")
+TOKEN_EXPOSED_DESCRIPTION = "One of your {readable_type} Canarytokens has been found on the internet. A publicly exposed token will provide very low quality alerts. We recommend that you disable and replace this token on private infrastructure."
+MAX_INLINE_LENGTH = 40 # Max length of content to share a line with other content
+
+
+class HexColor(Enum):
+ WARNING = "#ed6c02"
+ ERROR = "#d32f2f"
+ CANARY_GREEN = "#3ad47f"
+
+ @property
+ def decimal_value(self):
+ return int(self.value_without_hash, 16)
+
+ @property
+ def value_without_hash(self):
+ return self.value[1:]
+
+
+class DecimalColor(Enum):
+ WARNING = HexColor.WARNING.decimal_value
+ ERROR = HexColor.ERROR.decimal_value
+ CANARY_GREEN = HexColor.CANARY_GREEN.decimal_value
class WebhookType(Enum):
@@ -84,7 +112,7 @@ def _format_alert_details_for_webhook(
elif webhook_type == WebhookType.DISCORD:
return format_as_discord_canaryalert(details)
elif webhook_type == WebhookType.MS_TEAMS:
- return format_as_ms_teams_canaryalert(details)
+ return _format_as_ms_teams_canaryalert(details)
elif webhook_type == WebhookType.GENERIC:
return TokenAlertDetailGeneric(**details.dict())
else:
@@ -109,9 +137,7 @@ def _format_exposed_details_for_webhook(
f"_format_exposed_details_for_webhook not implemented for webhook type: {webhook_type}"
)
elif webhook_type == WebhookType.MS_TEAMS:
- raise NotImplementedError(
- f"_format_exposed_details_for_webhook not implemented for webhook type: {webhook_type}"
- )
+ return _format_as_ms_teams_token_exposed(details)
elif webhook_type == WebhookType.GENERIC:
return TokenExposedDetailGeneric(**details.dict())
else:
@@ -134,8 +160,11 @@ def generate_webhook_test_payload(webhook_type: WebhookType, token_type: TokenTy
"generate_webhook_test_payload not implemented for DISCORD"
)
elif webhook_type == WebhookType.MS_TEAMS:
- raise NotImplementedError(
- "generate_webhook_test_payload not implemented for MS_TEAMS"
+ section = MsTeamsTitleSection(
+ activityTitle="Validating new Canarytokens webhook"
+ )
+ return TokenAlertDetailsMsTeams(
+ summary="Validating new Canarytokens webhook", sections=[section]
)
elif webhook_type == WebhookType.GENERIC:
return TokenAlertDetails(
@@ -159,9 +188,129 @@ def generate_webhook_test_payload(webhook_type: WebhookType, token_type: TokenTy
)
+def _format_as_ms_teams_canaryalert(
+ details: TokenAlertDetails,
+) -> TokenAlertDetailsMsTeams:
+ facts = [
+ MsTeamsFact(name="Canarytoken", value=details.token),
+ MsTeamsFact(name="Token Reminder", value=details.memo),
+ ]
+
+ if details.src_data:
+ facts.extend(_data_to_ms_teams_facts(details.src_data))
+ if details.additional_data:
+ facts.extend(_data_to_ms_teams_facts(details.additional_data))
+
+ sections = [
+ MsTeamsTitleSection(activityTitle="Canarytoken Triggered"),
+ MsTeamsDetailsSection(facts=facts),
+ ]
+
+ return TokenAlertDetailsMsTeams(
+ summary="Canarytoken Triggered",
+ themeColor=HexColor.ERROR.value_without_hash,
+ sections=sections,
+ potentialAction=[
+ MsTeamsPotentialAction(name="Manage token", target=[details.manage_url])
+ ],
+ )
+
+
+def _format_as_ms_teams_token_exposed(
+ details: TokenExposedDetails,
+) -> TokenAlertDetailsMsTeams:
+ facts = [
+ MsTeamsFact(name="Key ID", value=details.key_id),
+ MsTeamsFact(name="Token Reminder", value=details.memo),
+ MsTeamsFact(
+ name="Key exposed at",
+ value=details.exposed_time.strftime("%Y-%m-%d %H:%M:%S (UTC)"),
+ ),
+ MsTeamsFact(name="Key exposed here", value=details.public_location),
+ ]
+
+ sections = [
+ MsTeamsTitleSection(activityTitle="Canarytoken Exposed"),
+ MsTeamsDetailsSection(
+ facts=facts, text=_get_exposed_token_description(details.token_type)
+ ),
+ ]
+
+ return TokenAlertDetailsMsTeams(
+ summary="Canarytoken Exposed",
+ themeColor=HexColor.WARNING.value_without_hash,
+ sections=sections,
+ potentialAction=[
+ MsTeamsPotentialAction(name="Manage token", target=[details.manage_url])
+ ],
+ )
+
+
+def _data_to_ms_teams_facts(data: dict[str, Union[str, dict]]) -> list[MsTeamsFact]:
+ facts: list[MsTeamsFact] = []
+
+ for label, value in data.items():
+ if not label or not value:
+ continue
+
+ message_text = dict_to_csv(value) if isinstance(value, dict) else value
+ facts.append(MsTeamsFact(name=prettify_snake_case(label), value=message_text))
+
+ return facts
+
+
+class MsTeamsFact(BaseModel):
+ name: str
+ value: str
+
+
+class MsTeamsDetailsSection(BaseModel):
+ facts: list[MsTeamsFact]
+ text: Optional[str] = None
+
+
+class MsTeamsTitleSection(BaseModel):
+ activityTitle: str
+ activityImage = CANARY_LOGO_ROUND_PUBLIC_URL
+
+
+class MsTeamsPotentialAction(BaseModel):
+ name: str
+ target: List[HttpUrl]
+ type: str = "ViewAction"
+ context: str = "http://schema.org"
+
+ def dict(self, *args, **kwargs):
+ d = super().dict(*args, **kwargs)
+
+ d["@type"] = d.pop("type")
+ d["@context"] = d.pop("context")
+
+ return d
+
+
+class TokenAlertDetailsMsTeams(BaseModel):
+ """Details that are sent to MS Teams webhooks."""
+
+ summary: str
+ themeColor: str = HexColor.CANARY_GREEN.value
+ sections: Optional[List[Union[MsTeamsTitleSection, MsTeamsDetailsSection]]] = None
+ potentialAction: Optional[List[MsTeamsPotentialAction]] = None
+ text: Optional[str] = None
+
+ def json_safe_dict(self) -> Dict[str, str]:
+ return json_safe_dict(self)
+
+
class TokenAlertDetailGeneric(TokenAlertDetails):
...
class TokenExposedDetailGeneric(TokenExposedDetails):
...
+
+
+def _get_exposed_token_description(token_type: TokenTypes) -> str:
+ return TOKEN_EXPOSED_DESCRIPTION.format(
+ readable_type=readable_token_type_names[token_type]
+ )
diff --git a/tests/units/test_channel_output_webhook.py b/tests/units/test_channel_output_webhook.py
index 853515ee3..ea4e40096 100644
--- a/tests/units/test_channel_output_webhook.py
+++ b/tests/units/test_channel_output_webhook.py
@@ -6,20 +6,23 @@
from canarytokens.canarydrop import Canarydrop
from canarytokens.channel import (
format_as_googlechat_canaryalert,
- format_as_ms_teams_canaryalert,
)
from canarytokens.channel_dns import ChannelDNS
from canarytokens.channel_output_webhook import WebhookOutputChannel
from canarytokens.models import (
TokenAlertDetailsGoogleChat,
- TokenAlertDetailsMsTeams,
TokenTypes,
)
from canarytokens.settings import FrontendSettings, SwitchboardSettings
from canarytokens.switchboard import Switchboard
from canarytokens.tokens import Canarytoken
from canarytokens.constants import CANARY_IMAGE_URL
-from canarytokens.webhook_formatting import format_details_for_webhook, get_webhook_type
+from canarytokens.webhook_formatting import (
+ TokenAlertDetailsMsTeams,
+ format_details_for_webhook,
+ get_webhook_type,
+ WebhookType,
+)
def test_broken_webhook(
@@ -274,23 +277,23 @@ def test_ms_teams_webhook_format(
protocol=input_channel.switchboard_scheme,
host=settings.PUBLIC_DOMAIN,
)
- webhook_payload = format_as_ms_teams_canaryalert(details=details)
+ webhook_payload = format_details_for_webhook(WebhookType.MS_TEAMS, details)
payload = webhook_payload.json_safe_dict()
- assert payload["summary"] == "Canarytoken triggered"
- assert payload["themeColor"] == "ff0000"
+ assert payload["summary"] == "Canarytoken Triggered"
+ assert payload["themeColor"] == "d32f2f"
assert payload["potentialAction"] == [
{
"@context": "http://schema.org",
"@type": "ViewAction",
- "name": "Manage",
+ "name": "Manage token",
"target": [details.manage_url],
}
]
assert len(payload["sections"]) == 2
assert payload["sections"][0] == {
- "activityTitle": "Canarytoken triggered",
+ "activityTitle": "Canarytoken Triggered",
"activityImage": CANARY_IMAGE_URL,
}
diff --git a/tests/units/test_webhook_formatting.py b/tests/units/test_webhook_formatting.py
index 852079f1f..3fe207350 100644
--- a/tests/units/test_webhook_formatting.py
+++ b/tests/units/test_webhook_formatting.py
@@ -7,6 +7,7 @@
format_details_for_webhook,
get_webhook_type,
TokenAlertDetailGeneric,
+ TokenAlertDetailsMsTeams,
TokenExposedDetailGeneric,
)
@@ -42,6 +43,8 @@ def test_get_webhook_type(url: str, expected_type: WebhookType):
[
("alert", WebhookType.GENERIC, TokenAlertDetailGeneric),
("exposed", WebhookType.GENERIC, TokenExposedDetailGeneric),
+ ("alert", WebhookType.MS_TEAMS, TokenAlertDetailsMsTeams),
+ ("exposed", WebhookType.MS_TEAMS, TokenAlertDetailsMsTeams),
],
)
def test_format_details_for_webhook_alert_type(