Skip to content

Commit

Permalink
Add Discord webhook exposed key handling
Browse files Browse the repository at this point in the history
  • Loading branch information
gjcthinkst committed Nov 8, 2024
1 parent 2763cf4 commit 3f99b2a
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 134 deletions.
31 changes: 0 additions & 31 deletions canarytokens/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,12 @@
Memo,
SlackAttachment,
SlackField,
DiscordDetails,
DiscordEmbeds,
DiscordAuthorField,
MsTeamsTitleSection,
MsTeamsDetailsSection,
MsTeamsPotentialAction,
TokenAlertDetails,
TokenAlertDetailsGoogleChat,
TokenAlertDetailsSlack,
TokenAlertDetailsDiscord,
TokenAlertDetailsMsTeams,
)

Expand Down Expand Up @@ -100,33 +96,6 @@ def format_as_googlechat_canaryalert(
)


def format_as_discord_canaryalert(
details: TokenAlertDetails,
) -> TokenAlertDetailsDiscord:
embeds = DiscordEmbeds(
author=DiscordAuthorField(
icon_url="https://s3-eu-west-1.amazonaws.com/email-images.canary.tools/canary-logo-round.png",
),
url=details.manage_url,
timestamp=details.time.strftime("%Y-%m-%dT%H:%M:%S"),
)

embeds.add_fields(
fields_info=DiscordDetails(
canarytoken=details.token,
token_reminder=details.memo,
src_data=details.src_data if details.src_data else None,
).get_discord_data(),
)

if details.additional_data:
embeds.add_fields(
fields_info=details.additional_data,
)

return TokenAlertDetailsDiscord(embeds=[embeds])


def format_as_ms_teams_canaryalert(
details: TokenAlertDetails,
) -> TokenAlertDetailsMsTeams:
Expand Down
80 changes: 6 additions & 74 deletions canarytokens/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,12 @@
CANARY_IMAGE_URL,
MEMO_MAX_CHARACTERS,
)
from canarytokens.utils import prettify_snake_case, dict_to_csv, get_src_ip_continent
from canarytokens.utils import (
json_safe_dict,
prettify_snake_case,
dict_to_csv,
get_src_ip_continent,
)

CANARYTOKEN_RE = re.compile(
".*([" + "".join(CANARYTOKEN_ALPHABET) + "]{" + str(CANARYTOKEN_LENGTH) + "}).*",
Expand Down Expand Up @@ -401,10 +406,6 @@ def __str__(self) -> str:
]


def json_safe_dict(m: BaseModel, exclude: Tuple = ()) -> Dict[str, str]:
return json.loads(m.json(exclude_none=True, exclude=set(exclude)))


class TokenRequest(BaseModel):
"""
TokenRequest holds fields needed to create a Canarytoken.
Expand Down Expand Up @@ -2234,66 +2235,6 @@ class GoogleChatCardV2(BaseModel):
card: GoogleChatCard


class DiscordFieldEntry(BaseModel):
name: str = ""
value: str = ""
inline: bool = False


class DiscordDetails(BaseModel):
canarytoken: Canarytoken
token_reminder: Memo
src_data: Optional[dict[str, Any]]

def get_discord_data(self) -> Dict[str, str]:
data = json_safe_dict(self)
data["Canarytoken"] = data.pop("canarytoken", "")
data["Token Reminder"] = data.pop("token_reminder", "")
if "src_data" in data:
data["Source Data"] = data.pop("src_data", "")
return data


class DiscordAuthorField(BaseModel):
name: str = "Canary Alerts"
icon_url: str


class DiscordEmbeds(BaseModel):
author: DiscordAuthorField
color: int = 3724415 # Magic colour number. Trust the process
title: str = "Canarytoken Triggered"
url: Optional[HttpUrl]
timestamp: datetime
fields: List[DiscordFieldEntry] = []

def add_fields(self, fields_info: Optional[Dict[str, str]] = {}) -> None:
for label, text in fields_info.items():
if not label or not text:
continue
message_text = (
json.dumps(text) if isinstance(text, dict) else "{}".format(text)
)
self.fields.append(
DiscordFieldEntry(
name=label,
value=message_text,
inline=len(max(message_text.split("\n"))) < 40,
)
)

@validator("timestamp", pre=True)
def validate_timestamp(cls, value):
if isinstance(value, str):
return datetime.strptime(value, "%Y-%m-%dT%H:%M:%S")
return value

class Config:
json_encoders = {
datetime: lambda v: v.strftime("%Y-%m-%dT%H:%M:%S"),
}


class TokenAlertDetailsGoogleChat(BaseModel):
cardsV2: List[GoogleChatCardV2]

Expand Down Expand Up @@ -2374,15 +2315,6 @@ def json_safe_dict(self) -> Dict[str, str]:
return json_safe_dict(self)


class TokenAlertDetailsDiscord(BaseModel):
"""Details that are sent to Discord webhooks"""

embeds: List[DiscordEmbeds]

def json_safe_dict(self) -> Dict[str, str]:
return json_safe_dict(self)


class UserName(ConstrainedStr):
max_lengthint: int = 30
strip_whitespace: bool = True
Expand Down
20 changes: 1 addition & 19 deletions canarytokens/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import re
import secrets
from ipaddress import IPv4Address
from typing import Dict, List, Literal, Optional, Tuple, Union
from typing import Dict, List, Literal, Optional, Tuple

import advocate
import requests
Expand Down Expand Up @@ -859,13 +859,6 @@ def validate_webhook(url, token_type: models.TokenTypes):
if len(url) > constants.MAX_WEBHOOK_URL_LENGTH:
raise WebhookTooLongError()

payload: Union[
models.TokenAlertDetails,
models.TokenAlertDetailsSlack,
models.TokenAlertDetailsGoogleChat,
models.TokenAlertDetailsDiscord,
models.TokenAlertDetailsMsTeams,
]
webhook_type = get_webhook_type(url)
if webhook_type == WebhookType.SLACK:
payload = models.TokenAlertDetailsSlack(
Expand Down Expand Up @@ -896,17 +889,6 @@ def validate_webhook(url, token_type: models.TokenTypes):
payload = models.TokenAlertDetailsGoogleChat(
cardsV2=[models.GoogleChatCardV2(cardId="unique-card-id", card=card)]
)
elif webhook_type == WebhookType.DISCORD:
# construct discord alert card
embeds = models.DiscordEmbeds(
author=models.DiscordAuthorField(
icon_url="https://s3-eu-west-1.amazonaws.com/email-images.canary.tools/canary-logo-round.png"
),
title="Validating new canarytokens webhook",
fields=[],
timestamp=datetime.datetime.now(),
)
payload = models.TokenAlertDetailsDiscord(embeds=[embeds])
elif webhook_type == WebhookType.MS_TEAMS:
section = models.MsTeamsTitleSection(
activityTitle="<b>Validating new Canarytokens webhook</b>"
Expand Down
8 changes: 7 additions & 1 deletion canarytokens/utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
import subprocess
from pathlib import Path
from typing import Any, Literal, Union
from typing import Any, Literal, Union, Tuple, Dict
import json

import pycountry_convert
from pydantic import BaseModel


def json_safe_dict(m: BaseModel, exclude: Tuple = ()) -> Dict[str, str]:
return json.loads(m.json(exclude_none=True, exclude=set(exclude)))


def dict_to_csv(d: dict) -> str:
Expand Down
Loading

0 comments on commit 3f99b2a

Please sign in to comment.