From affbf02fd72e06c8465083abf7292fb21c5f1794 Mon Sep 17 00:00:00 2001 From: loRes <61621244+loRes228@users.noreply.github.com> Date: Thu, 28 Mar 2024 00:07:36 +0200 Subject: [PATCH] 0.2.6 --- README.md | 30 +++++++---- aiogram_broadcaster/__about__.py | 2 +- aiogram_broadcaster/broadcaster.py | 51 ++++--------------- aiogram_broadcaster/contents/base.py | 2 +- aiogram_broadcaster/mailer/chat_engine.py | 26 ++++++---- aiogram_broadcaster/mailer/container.py | 39 ++++++++++++++ .../mailer/{multiple.py => group.py} | 45 ++++++---------- aiogram_broadcaster/mailer/mailer.py | 37 ++++++++------ aiogram_broadcaster/mailer/statistic.py | 6 +-- aiogram_broadcaster/placeholder.py | 4 +- aiogram_broadcaster/storage/base.py | 4 ++ aiogram_broadcaster/storage/file.py | 31 +++++++---- aiogram_broadcaster/storage/redis.py | 3 ++ examples/mre.py | 17 +++++-- examples/setup_master.py | 20 +++----- 15 files changed, 181 insertions(+), 136 deletions(-) create mode 100644 aiogram_broadcaster/mailer/container.py rename aiogram_broadcaster/mailer/{multiple.py => group.py} (51%) diff --git a/README.md b/README.md index 1bae097..30f83a1 100644 --- a/README.md +++ b/README.md @@ -20,11 +20,10 @@ from aiogram_broadcaster import Broadcaster, DefaultMailerProperties from aiogram_broadcaster.contents import MessageSendContent from aiogram_broadcaster.event import EventRouter from aiogram_broadcaster.mailer import Mailer -from aiogram_broadcaster.storage.redis import RedisBCRStorage +from aiogram_broadcaster.storage.file import FileBCRStorage TOKEN = "1234:Abc" # noqa: S105 USER_IDS = {78238238, 78378343, 98765431, 12345678} -OWNER_ID = 61043901 router = Router(name=__name__) event = EventRouter() @@ -33,21 +32,34 @@ event = EventRouter() @router.message() async def on_any_message(message: Message, broadcaster: Broadcaster) -> Any: content = MessageSendContent(message=message) - mailer = await broadcaster.create_mailer(content=content, chats=USER_IDS) + mailer = await broadcaster.create_mailer( + content=content, + chats=USER_IDS, + data={"publisher_id": message.chat.id, "message_id": message.message_id}, + ) mailer.start() await message.answer(text="Run broadcasting...") @event.completed() -async def notify_complete(mailer: Mailer, bot: Bot) -> None: +async def notify_complete( + mailer: Mailer, + bot: Bot, + publisher_id: int, + message_id: int, +) -> None: text = ( f"Broadcasting has been completed!\n" f"Mailer ID: {mailer.id} | Bot ID: {bot.id}\n" - f"Total chats: {mailer.statistic.total_count}\n" - f"Failed chats: {mailer.statistic.failed_count}\n" - f"Success chats: {mailer.statistic.success_count}\n" + f"Total chats: {mailer.statistic.total_chats.total}\n" + f"Failed chats: {mailer.statistic.failed_chats.total}\n" + f"Success chats: {mailer.statistic.success_chats.total}\n" + ) + await bot.send_message( + chat_id=publisher_id, + text=text, + reply_to_message_id=message_id, ) - await bot.send_message(chat_id=OWNER_ID, text=text) def main() -> None: @@ -56,7 +68,7 @@ def main() -> None: dispatcher = Dispatcher() dispatcher.include_router(router) - bcr_storage = RedisBCRStorage.from_url("redis://localhost:6379") + bcr_storage = FileBCRStorage() default = DefaultMailerProperties(destroy_on_complete=True) broadcaster = Broadcaster(bot, storage=bcr_storage, default=default) broadcaster.event.include(event) diff --git a/aiogram_broadcaster/__about__.py b/aiogram_broadcaster/__about__.py index fe404ae..01ef120 100644 --- a/aiogram_broadcaster/__about__.py +++ b/aiogram_broadcaster/__about__.py @@ -1 +1 @@ -__version__ = "0.2.5" +__version__ = "0.2.6" diff --git a/aiogram_broadcaster/broadcaster.py b/aiogram_broadcaster/broadcaster.py index 42e01f0..ffe8dbf 100644 --- a/aiogram_broadcaster/broadcaster.py +++ b/aiogram_broadcaster/broadcaster.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, Iterable, Iterator, List, Literal, Optional, Set, Tuple, Union +from typing import Any, Dict, Iterable, Literal, Optional, Set, Tuple, Union from uuid import uuid4 from aiogram import Bot, Dispatcher @@ -11,14 +11,15 @@ from .logger import logger from .mailer import Mailer, MailerStatus from .mailer.chat_engine import ChatEngine, ChatState -from .mailer.multiple import MultipleMailers +from .mailer.container import MailerContainer +from .mailer.group import MailerGroup from .mailer.settings import MailerSettings from .placeholder import PlaceholderWizard from .storage.base import BaseBCRStorage from .storage.record import StorageRecord -class Broadcaster: +class Broadcaster(MailerContainer): _bots: Dict[int, Bot] storage: Optional[BaseBCRStorage] language_getter: BaseLanguageGetter @@ -27,7 +28,6 @@ class Broadcaster: kwargs: Dict[str, Any] event: EventManager placeholder: PlaceholderWizard - _mailers: Dict[int, Mailer] def __init__( self, @@ -38,6 +38,8 @@ def __init__( context_key: str = "broadcaster", **kwargs: Any, ) -> None: + super().__init__() + self._bots = {bot.id: bot for bot in bots} self.storage = storage self.language_getter = language_getter or DefaultLanguageGetter() @@ -48,45 +50,13 @@ def __init__( self.event = EventManager(name="root") self.placeholder = PlaceholderWizard(name="root") - self._mailers = {} - - def __repr__(self) -> str: - return f"Broadcaster(total_mailers={len(self._mailers)})" - - def __str__(self) -> str: - mailers = ", ".join(map(repr, self)) - return f"Broadcaster[{mailers}]" - - def __contains__(self, item: int) -> bool: - return item in self._mailers - - def __getitem__(self, item: int) -> Mailer: - if mailer := self._mailers.get(item): - return mailer - raise LookupError(f"Mailer with id={item} not exists.") - - def __iter__(self) -> Iterator[Mailer]: - return iter(self._mailers.values()) - - def __len__(self) -> int: - return len(self._mailers) @property def bots(self) -> Tuple[Bot, ...]: return tuple(self._bots.values()) - @property - def mailers(self) -> Dict[int, Mailer]: - return self._mailers - - def get_mailers(self) -> List[Mailer]: - return list(self._mailers.values()) - - def get_mailer(self, mailer_id: int) -> Optional[Mailer]: - return self._mailers.get(mailer_id) - - def as_multiple(self) -> MultipleMailers: - return MultipleMailers(mailers=self._mailers.values()) + def as_group(self) -> MailerGroup: + return MailerGroup(*self._mailers.values()) async def create_mailers( self, @@ -103,7 +73,7 @@ async def create_mailers( exclude_placeholders: Optional[Union[Literal[True], Set[str]]] = None, data: Optional[Dict[str, Any]] = None, **kwargs: Any, - ) -> MultipleMailers: + ) -> MailerGroup: if not bots and not self._bots: raise ValueError("At least one bot must be specified.") if not bots: @@ -126,7 +96,7 @@ async def create_mailers( ) for bot in bots ] - return MultipleMailers(mailers=mailers) + return MailerGroup(*mailers) async def create_mailer( self, @@ -260,4 +230,5 @@ def setup(self, dispatcher: Dispatcher, *, include_data: bool = True) -> None: self.kwargs.update(dispatcher.workflow_data) if self.storage: dispatcher.startup.register(self.restore_mailers) + dispatcher.shutdown.register(self.storage.close) dispatcher.startup.register(self.run_mailers) diff --git a/aiogram_broadcaster/contents/base.py b/aiogram_broadcaster/contents/base.py index 73c069a..e925b3a 100644 --- a/aiogram_broadcaster/contents/base.py +++ b/aiogram_broadcaster/contents/base.py @@ -12,7 +12,7 @@ from pydantic.functional_validators import ModelWrapValidatorHandler -VALIDATOR_KEY = "__validator" +VALIDATOR_KEY = "__V" class BaseContent(BaseModel, ABC): diff --git a/aiogram_broadcaster/mailer/chat_engine.py b/aiogram_broadcaster/mailer/chat_engine.py index c27b384..7a9b285 100644 --- a/aiogram_broadcaster/mailer/chat_engine.py +++ b/aiogram_broadcaster/mailer/chat_engine.py @@ -19,7 +19,7 @@ class ChatEngine(BaseModel): mailer_id: Optional[int] = Field(default=None, exclude=True) storage: Optional[BaseBCRStorage] = Field(default=None, exclude=True) - def model_post_init(self, __context: Optional[Dict[str, Any]]) -> None: + def model_post_init(self, __context: Dict[str, Any]) -> None: if not __context: return self.mailer_id = __context.get("mailer_id") @@ -58,21 +58,29 @@ async def add_chats(self, chats: Iterable[int], state: ChatState) -> Set[int]: if not difference: return difference self.chats[state].update(difference) - if self.storage and self.mailer_id: - async with self.storage.update_record(mailer_id=self.mailer_id) as record: - record.chats = self + await self._preserve() return difference + async def set_chats_state(self, state: ChatState) -> None: + chats = self.get_chats() + self.chats.clear() + self.chats[state] = chats + await self._preserve() + async def set_chat_state(self, chat: int, state: ChatState) -> None: - from_state = self.resolve_chat_state(chat=chat) + from_state = self._resolve_chat_state(chat=chat) self.chats[from_state].discard(chat) self.chats[state].add(chat) - if self.storage and self.mailer_id: - async with self.storage.update_record(mailer_id=self.mailer_id) as record: - record.chats = self + await self._preserve() - def resolve_chat_state(self, chat: int) -> ChatState: + def _resolve_chat_state(self, chat: int) -> ChatState: for state, chats in self.chats.items(): if chat in chats: return state raise LookupError(f"Chat={chats} state is undefined.") + + async def _preserve(self) -> None: + if not self.storage or not self.mailer_id: + return + async with self.storage.update_record(mailer_id=self.mailer_id) as record: + record.chats = self diff --git a/aiogram_broadcaster/mailer/container.py b/aiogram_broadcaster/mailer/container.py new file mode 100644 index 0000000..491f3b2 --- /dev/null +++ b/aiogram_broadcaster/mailer/container.py @@ -0,0 +1,39 @@ +from typing import Dict, Iterator, List, Optional + +from .mailer import Mailer + + +class MailerContainer: + _mailers: Dict[int, Mailer] + + def __init__(self, *mailers: Mailer) -> None: + self._mailers = {mailer.id: mailer for mailer in mailers} + + def __repr__(self) -> str: + return f"{type(self).__name__}(total_mailers={len(self._mailers)})" + + def __str__(self) -> str: + mailers = ", ".join(map(repr, self)) + return f"{type(self).__name__}[{mailers}]" + + def __contains__(self, item: int) -> bool: + return item in self._mailers + + def __getitem__(self, item: int) -> Mailer: + return self._mailers[item] + + def __iter__(self) -> Iterator[Mailer]: + return iter(self._mailers.copy().values()) + + def __len__(self) -> int: + return len(self._mailers) + + @property + def mailers(self) -> Dict[int, Mailer]: + return self._mailers.copy() + + def get_mailer(self, mailer_id: int) -> Optional[Mailer]: + return self._mailers.get(mailer_id) + + def get_mailers(self) -> List[Mailer]: + return list(self._mailers.values()) diff --git a/aiogram_broadcaster/mailer/multiple.py b/aiogram_broadcaster/mailer/group.py similarity index 51% rename from aiogram_broadcaster/mailer/multiple.py rename to aiogram_broadcaster/mailer/group.py index 2317b0c..0ad8849 100644 --- a/aiogram_broadcaster/mailer/multiple.py +++ b/aiogram_broadcaster/mailer/group.py @@ -1,63 +1,50 @@ from asyncio import gather, wait -from typing import Any, Coroutine, Dict, Iterable, Iterator, List, Tuple +from typing import Any, Coroutine, Dict, Iterable, List from aiogram_broadcaster.logger import logger +from .container import MailerContainer from .mailer import Mailer -class MultipleMailers: - mailers: Tuple[Mailer, ...] - - def __init__(self, mailers: Iterable[Mailer]) -> None: - self.mailers = tuple(mailers) - - def __iter__(self) -> Iterator[Mailer]: - return iter(self.mailers) - - def __len__(self) -> int: - return len(self.mailers) - - def __repr__(self) -> str: - return f"MultipleMailers(total_mailers={len(self.mailers)})" - - def __str__(self) -> str: - mailers = ", ".join(map(repr, self.mailers)) - return f"MultipleMailers[{mailers}]" - +class MailerGroup(MailerContainer): def start(self, **kwargs: Any) -> None: - for mailer in self.mailers: + for mailer in self._mailers.values(): try: mailer.start(**kwargs) except RuntimeError: # noqa: PERF203 logger.exception("A start error occurred") async def wait(self) -> None: - futures = [mailer.wait() for mailer in self.mailers] + futures = [mailer.wait() for mailer in self._mailers.values()] await wait(futures) async def run(self, **kwargs: Any) -> Dict[Mailer, Any]: - futures = [mailer.run(**kwargs) for mailer in self.mailers] + futures = [mailer.run(**kwargs) for mailer in self._mailers.values()] return await self._gather_futures(futures=futures) async def stop(self) -> Dict[Mailer, Any]: - futures = [mailer.stop() for mailer in self.mailers] + futures = [mailer.stop() for mailer in self._mailers.values()] return await self._gather_futures(futures=futures) async def destroy(self) -> Dict[Mailer, Any]: - futures = [mailer.destroy() for mailer in self.mailers] + futures = [mailer.destroy() for mailer in self._mailers.values()] return await self._gather_futures(futures=futures) async def add_chats(self, chats: Iterable[int]) -> Dict[Mailer, bool]: - futures = [mailer.add_chats(chats=chats) for mailer in self.mailers] + futures = [mailer.add_chats(chats=chats) for mailer in self._mailers.values()] return await self._gather_futures(futures=futures) - async def send_content(self, chat_id: int) -> Dict[Mailer, Any]: - futures = [mailer.send_content(chat_id=chat_id) for mailer in self.mailers] + async def reset_chats(self) -> None: + futures = [mailer.reset_chats() for mailer in self._mailers.values()] + await self._gather_futures(futures=futures) + + async def send(self, chat_id: int) -> Dict[Mailer, Any]: + futures = [mailer.send(chat_id=chat_id) for mailer in self._mailers.values()] return await self._gather_futures(futures=futures) async def _gather_futures(self, futures: List[Coroutine[Any, Any, Any]]) -> Dict[Mailer, Any]: if not futures: return {} results = await gather(*futures, return_exceptions=True) - return dict(zip(self.mailers, results)) + return dict(zip(self._mailers.values(), results)) diff --git a/aiogram_broadcaster/mailer/mailer.py b/aiogram_broadcaster/mailer/mailer.py index a114a91..ebd23a2 100644 --- a/aiogram_broadcaster/mailer/mailer.py +++ b/aiogram_broadcaster/mailer/mailer.py @@ -129,6 +129,26 @@ async def add_chats(self, chats: Iterable[int]) -> Set[int]: self._status = MailerStatus.STOPPED return has_difference + async def reset_chats(self) -> None: + await self._chat_engine.set_chats_state(state=ChatState.PENDING) + if self._status is MailerStatus.COMPLETED: + self._status = MailerStatus.STOPPED + + async def send(self, chat_id: int) -> Any: + method = await self._content.as_method( + chat_id=chat_id, + language_getter=self._language_getter, + **self._kwargs, + ) + if self._settings.exclude_placeholders is not True: + method = await self._placeholder.render( + model=method, + exclude=self._settings.exclude_placeholders, + chat_id=chat_id, + **self._kwargs, + ) + return await method.as_(bot=self._bot) + def start(self, **kwargs: Any) -> None: self._check_start() self._task.start(self.run(**kwargs)) @@ -227,24 +247,9 @@ async def _broadcast(self) -> bool: return False return True - async def send_content(self, chat_id: int) -> Any: - method = await self._content.as_method( - chat_id=chat_id, - language_getter=self._language_getter, - **self._kwargs, - ) - if self._settings.exclude_placeholders is not True: - method = await self._placeholder.render( - model=method, - exclude=self._settings.exclude_placeholders, - chat_id=chat_id, - **self._kwargs, - ) - return await method.as_(bot=self._bot) - async def _send(self, chat_id: int) -> None: try: - response = await self.send_content(chat_id=chat_id) + response = await self.send(chat_id=chat_id) except TelegramRetryAfter as error: if self._settings.handle_retry_after: await self._process_retry_after(chat_id=chat_id, delay=error.retry_after) diff --git a/aiogram_broadcaster/mailer/statistic.py b/aiogram_broadcaster/mailer/statistic.py index ffb59c2..a41d59a 100644 --- a/aiogram_broadcaster/mailer/statistic.py +++ b/aiogram_broadcaster/mailer/statistic.py @@ -27,9 +27,6 @@ def __init__(self, ids: Set[int], total: int) -> None: "relative_range": self.relative_range, } - def __iter__(self) -> Iterator[int]: - return iter(self.ids) - def __repr__(self) -> str: # fmt: off metrics = ", ".join( @@ -47,6 +44,9 @@ def __str__(self) -> str: ) # fmt: on + def __iter__(self) -> Iterator[int]: + return iter(self.ids) + class MailerStatistic: _chat_engine: ChatEngine diff --git a/aiogram_broadcaster/placeholder.py b/aiogram_broadcaster/placeholder.py index 7f89d93..c444907 100644 --- a/aiogram_broadcaster/placeholder.py +++ b/aiogram_broadcaster/placeholder.py @@ -56,7 +56,7 @@ def chain_keys(self) -> Generator[str, None, None]: yield from placeholder.values @property - def chain_values(self) -> Generator[Tuple[str, Any], None, None]: + def chain_items(self) -> Generator[Tuple[str, Any], None, None]: for placeholder in self.chain_tail: yield from placeholder.values.items() @@ -92,7 +92,7 @@ async def fetch_data( select = set(self.chain_keys) return { key: await value(**kwargs) if callable(value) else value - for key, value in self.chain_values + for key, value in self.chain_items if key in select } diff --git a/aiogram_broadcaster/storage/base.py b/aiogram_broadcaster/storage/base.py index 3b6e978..b7f89f3 100644 --- a/aiogram_broadcaster/storage/base.py +++ b/aiogram_broadcaster/storage/base.py @@ -33,3 +33,7 @@ async def update_record(self, mailer_id: int) -> AsyncGenerator[StorageRecord, N yield record finally: await self.set_record(mailer_id=mailer_id, record=record) + + @abstractmethod + async def close(self) -> None: + pass diff --git a/aiogram_broadcaster/storage/file.py b/aiogram_broadcaster/storage/file.py index e9929e3..8b89b0c 100644 --- a/aiogram_broadcaster/storage/file.py +++ b/aiogram_broadcaster/storage/file.py @@ -1,5 +1,6 @@ +from asyncio import Lock from pathlib import Path -from typing import Any, Dict, Set, Union +from typing import Any, Dict, Optional, Set, Union from aiofiles import open from pydantic import BaseModel, Field @@ -14,14 +15,23 @@ class StorageRecords(BaseModel): class FileBCRStorage(BaseBCRStorage): file: Path + _lock: Optional[Lock] def __init__(self, filename: Union[str, Path] = ".mailers.json") -> None: if not isinstance(filename, Path): filename = Path(filename) self.file = filename + self._lock = None if self.file.exists() and not self.file.is_file(): - raise RuntimeError + raise RuntimeError("The filename is not a file.") + + @property + def lock(self) -> Lock: + """Lazy initialization to correctly catch the event loop.""" + if self._lock is None: + self._lock = Lock() + return self._lock async def get_mailer_ids(self) -> Set[int]: records = await self.get_records() @@ -46,14 +56,17 @@ async def get_records(self, **pydantic_context: Any) -> StorageRecords: records = StorageRecords() await self.set_records(records=records) return records - async with open(file=self.file, mode="r") as file: + async with self.lock, open(file=self.file, mode="r", encoding="utf-8") as file: data = await file.read() - return StorageRecords.model_validate_json( - json_data=data, - context=pydantic_context, - ) + return StorageRecords.model_validate_json( + json_data=data, + context=pydantic_context, + ) async def set_records(self, records: StorageRecords) -> None: - data = records.model_dump_json(exclude_defaults=True) - async with open(file=self.file, mode="w") as file: + async with self.lock, open(file=self.file, mode="w", encoding="utf-8") as file: + data = records.model_dump_json(exclude_defaults=True) await file.write(data) + + async def close(self) -> None: + pass diff --git a/aiogram_broadcaster/storage/redis.py b/aiogram_broadcaster/storage/redis.py index 11c0e0e..b83e90f 100644 --- a/aiogram_broadcaster/storage/redis.py +++ b/aiogram_broadcaster/storage/redis.py @@ -83,3 +83,6 @@ async def set_record(self, mailer_id: int, record: StorageRecord) -> None: async def delete_record(self, mailer_id: int) -> None: key = self.key_builder.build(mailer_id=mailer_id) await self.redis.delete(key) + + async def close(self) -> None: + await self.redis.aclose(close_connection_pool=True) diff --git a/examples/mre.py b/examples/mre.py index 236343a..253c68e 100644 --- a/examples/mre.py +++ b/examples/mre.py @@ -25,22 +25,31 @@ async def on_any_message(message: Message, broadcaster: Broadcaster) -> Any: mailer = await broadcaster.create_mailer( content=content, chats=USER_IDS, - data={"message": message}, + data={"publisher_id": message.chat.id, "message_id": message.message_id}, ) mailer.start() await message.answer(text="Run broadcasting...") @event.completed() -async def notify_complete(mailer: Mailer, message: Message) -> None: +async def notify_complete( + mailer: Mailer, + bot: Bot, + publisher_id: int, + message_id: int, +) -> None: text = ( f"Broadcasting has been completed!\n" - f"Mailer ID: {mailer.id} | Bot ID: {mailer.bot.id}\n" + f"Mailer ID: {mailer.id} | Bot ID: {bot.id}\n" f"Total chats: {mailer.statistic.total_chats.total}\n" f"Failed chats: {mailer.statistic.failed_chats.total}\n" f"Success chats: {mailer.statistic.success_chats.total}\n" ) - await message.reply(text=text) + await bot.send_message( + chat_id=publisher_id, + text=text, + reply_to_message_id=message_id, + ) def main() -> None: diff --git a/examples/setup_master.py b/examples/setup_master.py index 5878d85..105340a 100644 --- a/examples/setup_master.py +++ b/examples/setup_master.py @@ -1,5 +1,5 @@ from datetime import datetime, timedelta -from typing import TYPE_CHECKING, Any, List, Optional, Protocol +from typing import Any, List, Optional, Protocol from aiogram import Bot, Dispatcher from aiogram.exceptions import TelegramBadRequest @@ -13,9 +13,6 @@ from aiogram_broadcaster.storage.redis import RedisBCRStorage -if TYPE_CHECKING: - from aiogram.types import Message - event = EventRouter(name=__name__) placeholder = Placeholder(name=__name__) @@ -45,20 +42,17 @@ async def append_new_chats(mailer: Mailer, database: Database) -> None: @event.completed() -async def notify_complete(mailer: Mailer, bot: Bot, **kwargs: Any) -> None: - message: Optional[Message] = kwargs.get("message") - publisher_id: Optional[int] = kwargs.get("publisher_id") +async def notify_complete(mailer: Mailer, bot: Bot, publisher_id: int, **kwargs: Any) -> None: text = ( f"Broadcasting has been completed!\n" f"Mailer ID: {mailer.id} | Bot ID: {bot.id}\n" f"{mailer}" ) - if message: - await message.reply(text=text) - return - if publisher_id: - await bot.send_message(chat_id=publisher_id, text=text) - return + await bot.send_message( + chat_id=publisher_id, + text=text, + reply_to_message_id=kwargs.get("message_id"), + ) @event.failed_sent()