Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: close #13 #16

Merged
merged 1 commit into from
Sep 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions app/erfiume/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@

from .apis import Stazione, Valore, enrich_data, fetch_latest_time, fetch_stations_data
from .logging import logger
from .storage import DynamoClient
from .storage import AsyncDynamoDB
from .tgbot import bot

__all__ = [
"AsyncDynamoDB",
"Stazione",
"Valore",
"DynamoClient",
"enrich_data",
"fetch_latest_time",
"fetch_stations_data",
Expand Down
74 changes: 37 additions & 37 deletions app/erfiume/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,74 +5,76 @@
from __future__ import annotations

import os
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Self

import aioboto3
from boto3.dynamodb.conditions import Key
import aioboto3.resources
from botocore.exceptions import ClientError

from .apis import Stazione
from .logging import logger

if TYPE_CHECKING:
from types_aiobotocore_dynamodb import DynamoDBServiceResource
from types import TracebackType


class DynamoClient:
class AsyncDynamoDB:
"""
Asynchronous DynamoDB client that can be used for various operations on DynamoDB tables.
This class is designed to be instantiated and used in other asynchronous methods.
"""

def __init__(self, client: DynamoDBServiceResource):
"""
Wrap the class in async context.
"""
self.client = client

@classmethod
async def create(cls) -> DynamoClient:
"""
Factory method to initialize the DynamoDB client.
This method is asynchronous and sets up the connection based on environment.
"""
def __init__(self, table_name: str) -> None:
environment = os.getenv("ENVIRONMENT", "staging")
session = aioboto3.Session()
self.endpoint_url = (
"http://localhost:4566" if environment != "production" else None
)
self.table_name = table_name

async def __aenter__(self) -> Self:
"""Set up the client and table."""
self.session = aioboto3.Session()
self.dynamodb = await self.session.resource(
service_name="dynamodb",
endpoint_url=self.endpoint_url,
).__aenter__()
self.table = await self.dynamodb.Table(self.table_name)
return self

async with session.resource(
"dynamodb",
endpoint_url=(
"http://localhost:4566" if environment != "production" else None
),
) as client:
return cls(client)
async def __aexit__(
self,
exc_type: type[Exception] | None, # noqa: PYI036
exc_val: Exception | None, # noqa: PYI036
exc_tb: TracebackType | None,
) -> None:
"""Close the client on exit."""
await self.dynamodb.__aexit__(exc_type, exc_val, exc_tb)

async def check_and_update_stazioni(self, station: Stazione) -> None:
"""
Check if the station data in DynamoDB is outdated compared to the given station object.
If outdated or non-existent, update it with the new data.
"""
try:
table = await self.client.Table("Stazioni")
response = await table.query(
KeyConditionExpression=Key("nomestaz").eq(station.nomestaz),
response = await self.table.get_item(
Key={"nomestaz": station.nomestaz},
)

# Get the latest timestamp from the DynamoDB response
latest_timestamp = (
int(response["Items"][0].get("timestamp")) # type: ignore[arg-type]
if response["Count"] > 0
int(response["Item"].get("timestamp")) # type: ignore[arg-type]
if response["Item"]
else 0
)

# If the provided station has newer data or the record doesn't exist, update DynamoDB
if station.timestamp > latest_timestamp or response["Count"] == 0:
if station.timestamp > latest_timestamp or not response["Item"]:
logger.info(
"Updating data for station %s (%s)",
station.nomestaz,
station.idstazione,
)
await table.put_item(Item=station.to_dict())
await self.table.put_item(Item=station.to_dict())
except ClientError as e:
logger.exception(
"Error while checking or updating station %s: %s", station.nomestaz, e
Expand All @@ -88,14 +90,12 @@ async def get_matching_station(self, station_name: str) -> Stazione | None:
Returns the station data as a dictionary, or None if not found.
"""
try:
table = await self.client.Table("Stazioni")
response = await table.query(
Limit=1,
KeyConditionExpression=Key("nomestaz").eq(station_name),
stazione = await self.table.get_item(
Key={"nomestaz": station_name},
)

if response["Count"] > 0:
return Stazione(**response["Items"][0]) # type: ignore[arg-type]
if stazione["Item"]:
return Stazione(**stazione["Item"]) # type: ignore[arg-type]
logger.info("Station %s not found in DynamoDB.", station_name)
except ClientError as e:
logger.exception("Error while retrieving station %s: %s", station_name, e)
Expand Down
48 changes: 21 additions & 27 deletions app/erfiume/tgbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@
from __future__ import annotations

import json
import os
from datetime import datetime
from inspect import cleandoc
from typing import TYPE_CHECKING, Any

import boto3
from telegram import Update
from telegram.ext import (
Application,
Expand All @@ -24,21 +22,17 @@
if TYPE_CHECKING:
from .apis import Stazione

from aws_lambda_powertools.utilities import parameters

from .logging import logger
from .storage import DynamoClient
from .storage import AsyncDynamoDB


async def fetch_bot_token() -> str:
"""
Fetch the Telegram Bot token from AWS SM
"""
environment = os.getenv("ENVIRONMENT", "staging")
return boto3.client(
service_name="secretsmanager",
endpoint_url=("http://localhost:4566" if environment != "production" else None),
).get_secret_value(
SecretId="telegram-bot-token",
)["SecretString"]
return parameters.get_secret("telegram-bot-token")


async def start(update: Update, _: ContextTypes.DEFAULT_TYPE) -> None:
Expand Down Expand Up @@ -84,15 +78,15 @@ def create_station_message(station: Stazione) -> str:

async def cesena(update: Update, _: ContextTypes.DEFAULT_TYPE) -> None:
"""Send a message when the command /cesena is issued."""
db_client = await DynamoClient.create()
stazione = await db_client.get_matching_station("Cesena")
if stazione:
if update.message:
await update.message.reply_html(create_station_message(stazione))
elif update.message:
await update.message.reply_html(
"Nessun stazione trovata!",
)
async with AsyncDynamoDB(table_name="Stazioni") as dynamo:
stazione = await dynamo.get_matching_station("Cesena")
if stazione:
if update.message:
await update.message.reply_html(create_station_message(stazione))
elif update.message:
await update.message.reply_html(
"Nessun stazione trovata!",
)


async def handle_private_message(
Expand All @@ -109,14 +103,14 @@ async def handle_private_message(
)
if update.message and update.effective_chat and update.message.text:
logger.info("Received private message: %s", update.message.text)
db_client = await DynamoClient.create()
stazione = await db_client.get_matching_station(update.message.text)
if stazione and update.message:
message = create_station_message(stazione)
await context.bot.send_message(
chat_id=update.effective_chat.id,
text=message,
)
async with AsyncDynamoDB(table_name="Stazioni") as dynamo:
stazione = await dynamo.get_matching_station(update.message.text)
if stazione and update.message:
message = create_station_message(stazione)
await context.bot.send_message(
chat_id=update.effective_chat.id,
text=message,
)


async def bot(event: dict[str, Any]) -> None:
Expand Down
9 changes: 5 additions & 4 deletions app/erfiume_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from aws_lambda_powertools.utilities.typing import LambdaContext

from erfiume import (
DynamoClient,
AsyncDynamoDB,
enrich_data,
fetch_latest_time,
fetch_stations_data,
Expand All @@ -25,14 +25,15 @@ async def update() -> None:
"""
Run main.
"""
db_client = await DynamoClient.create()
async with httpx.AsyncClient() as http_client:
async with httpx.AsyncClient() as http_client, AsyncDynamoDB(
table_name="Stazioni"
) as dynamo:
try:
latest_time = await fetch_latest_time(http_client)
stations = await fetch_stations_data(http_client, latest_time)
await enrich_data(http_client, stations)
for stazione in stations:
await db_client.check_and_update_stazioni(stazione)
await dynamo.check_and_update_stazioni(stazione)
except httpx.HTTPStatusError as e:
logger.exception("HTTP error occurred: %d", e.response.status_code)
except httpx.ConnectTimeout:
Expand Down
2 changes: 1 addition & 1 deletion app/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ output-format = "github"

[tool.ruff.lint]
select = ["ALL"]
ignore = ["D2", "D4", "ANN", "COM812", "ISC001"]
ignore = ["D2", "D4", "ANN", "COM812", "ISC001", "D105", "D107"]
fixable = ["ALL"]
unfixable = []

Expand Down
9 changes: 5 additions & 4 deletions app/standalone.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import httpx

from erfiume import (
DynamoClient,
AsyncDynamoDB,
bot,
enrich_data,
fetch_latest_time,
Expand All @@ -22,15 +22,16 @@ async def update() -> None:
"""
Run main.
"""
db_client = await DynamoClient.create()
async with httpx.AsyncClient() as http_client:
async with httpx.AsyncClient() as http_client, AsyncDynamoDB(
table_name="Stazioni"
) as dynamo:
while True:
try:
latest_time = await fetch_latest_time(http_client)
stations = await fetch_stations_data(http_client, latest_time)
await enrich_data(http_client, stations)
for stazione in stations:
await db_client.check_and_update_stazioni(stazione)
await dynamo.check_and_update_stazioni(stazione)
except httpx.HTTPStatusError as e:
logger.exception("HTTP error occurred: %d", e.response.status_code)
except httpx.ConnectTimeout:
Expand Down
7 changes: 2 additions & 5 deletions pulumi/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,11 @@
name="Stazioni",
billing_mode="PAY_PER_REQUEST",
hash_key="nomestaz",
range_key="ordinamento",
attributes=[
dynamodb.TableAttributeArgs(
name="nomestaz",
type="S",
),
dynamodb.TableAttributeArgs(
name="ordinamento",
type="N",
),
],
)

Expand Down Expand Up @@ -72,6 +67,7 @@
"Actions": [
"dynamodb:PutItem",
"dynamodb:Query",
"dynamodb:GetItem",
],
"Resources": [stazioni_table.arn],
}
Expand Down Expand Up @@ -110,6 +106,7 @@
"Effect": "Allow",
"Actions": [
"dynamodb:Query",
"dynamodb:GetItem",
],
"Resources": [
f"arn:aws:dynamodb:eu-west-1:{get_caller_identity().account_id}:table/Stazioni"
Expand Down