Skip to content

Commit

Permalink
0.3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
loRes228 committed Apr 3, 2024
1 parent ed6e148 commit c8e366e
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 162 deletions.
2 changes: 1 addition & 1 deletion aiogram_broadcaster/__about__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.2.9"
__version__ = "0.3.0"
17 changes: 7 additions & 10 deletions aiogram_broadcaster/broadcaster.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@


class Broadcaster(MailerContainer):
_bots: Dict[int, Bot]
bots: Tuple[Bot, ...]
storage: Optional[BaseBCRStorage]
default: DefaultMailerProperties
context_key: str
Expand All @@ -38,7 +38,7 @@ def __init__(
) -> None:
super().__init__()

self._bots = {bot.id: bot for bot in bots}
self.bots = bots
self.storage = storage
self.default = default or DefaultMailerProperties()
self.context_key = context_key
Expand All @@ -48,10 +48,6 @@ def __init__(
self.event = EventManager(name="root")
self.placeholder = PlaceholderWizard(name="root")

@property
def bots(self) -> Tuple[Bot, ...]:
return tuple(self._bots.values())

def as_group(self) -> MailerGroup:
return MailerGroup(*self._mailers.values())

Expand All @@ -71,7 +67,7 @@ async def create_mailers(
data: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> MailerGroup:
if not bots and not self._bots:
if not bots and not self.bots:
raise ValueError("At least one bot must be specified.")
if not bots:
bots = self.bots
Expand Down Expand Up @@ -123,7 +119,7 @@ async def create_mailer(

if not chats:
raise ValueError("At least one chat id must be provided.")
if not bot and not self._bots:
if not bot and not self.bots:
raise ValueError("At least one bot must be specified.")

chats = set(chats)
Expand Down Expand Up @@ -188,13 +184,14 @@ async def create_mailer(
async def restore_mailers(self) -> None:
if not self.storage:
raise RuntimeError("Storage not found.")
bots = {bot.id: bot for bot in self.bots}
for mailer_id in await self.storage.get_mailer_ids():
try:
record = await self.storage.get_record(mailer_id=mailer_id)
except Exception: # noqa: BLE001
logger.exception("Failed to restore mailer id=%d.")
continue
if record.bot not in self._bots:
if record.bot not in bots:
logger.error(
"Failed to restore mailer id=%d, bot with id=%d not defined.",
mailer_id,
Expand All @@ -210,7 +207,7 @@ async def restore_mailers(self) -> None:
placeholder=self.placeholder,
storage=self.storage,
mailer_container=self._mailers,
bot=self._bots[record.bot],
bot=bots[record.bot],
contextual_data={**self.contextual_data, **record.data},
)
self._mailers[mailer_id] = mailer
Expand Down
12 changes: 3 additions & 9 deletions aiogram_broadcaster/contents/adapter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import abstractmethod
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, Optional, cast
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, Optional

from aiogram.dispatcher.event.handler import CallableObject
from aiogram.methods import TelegramMethod
Expand Down Expand Up @@ -27,16 +27,10 @@ def model_post_init(self, __context: Any) -> None:
self._callback = CallableObject(callback=self.__call__)

async def as_method(self, **kwargs: Any) -> TelegramMethod[Any]:
ident = await self._callback.call(**kwargs)
content = self.resolve_content(ident=ident)
key = await self._callback.call(**kwargs)
content = self.__pydantic_extra__.get(key, self.default)
return await content.as_method(**kwargs)

def resolve_content(self, ident: Optional[str]) -> BaseContent:
if ident is None or not self.model_extra:
return self.default
content = self.model_extra.get(ident, self.default)
return cast(BaseContent, content)

if TYPE_CHECKING:

def __init__(
Expand Down
33 changes: 15 additions & 18 deletions aiogram_broadcaster/contents/base.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, Awaitable, Callable, ClassVar, Dict, Type, TypeVar

from aiogram.methods.base import TelegramMethod
from pydantic import (
BaseModel,
ConfigDict,
SerializerFunctionWrapHandler,
model_serializer,
model_validator,
)
from pydantic.functional_validators import ModelWrapValidatorHandler
from aiogram.methods import TelegramMethod
from pydantic import BaseModel, ConfigDict, model_serializer, model_validator
from pydantic_core.core_schema import SerializerFunctionWrapHandler, ValidatorFunctionWrapHandler


VALIDATOR_KEY = "__V"
Expand All @@ -30,8 +24,8 @@ class BaseContent(BaseModel, ABC):
__validators__: ClassVar[Dict[str, Type["BaseContent"]]] = {}

def __init_subclass__(cls, **kwargs: Any) -> None:
super().__init_subclass__(**kwargs)
cls.__validators__[cls.__name__] = cls
super().__init_subclass__(**kwargs)

if TYPE_CHECKING:
as_method: ClassVar[Callable[..., Awaitable[TelegramMethod[Any]]]]
Expand All @@ -45,14 +39,17 @@ async def as_method(self, **kwargs: Any) -> TelegramMethod[Any]:
@classmethod
def _validate(
cls,
__value: Any,
__handler: ModelWrapValidatorHandler["BaseContent"],
) -> "BaseContent":
if not isinstance(__value, dict):
return __handler(__value)
if validator := __value.pop(VALIDATOR_KEY, None):
return cls.__validators__[validator].model_validate(__value)
return __handler(__value)
value: Any,
handler: ValidatorFunctionWrapHandler,
) -> Any:
if not isinstance(value, dict):
return handler(value)
validator: str = value.pop(VALIDATOR_KEY, None)
if not validator:
return handler(value)
if validator not in cls.__validators__:
raise RuntimeError(f"Content '{validator}' was not found.")
return cls.__validators__[validator].model_validate(value)

@model_serializer(mode="wrap", return_type=Any)
def _serialize(self, handler: SerializerFunctionWrapHandler) -> Any:
Expand Down
8 changes: 3 additions & 5 deletions aiogram_broadcaster/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async def trigger(self, **kwargs: Any) -> None:
merged_kwargs.update(result)


class EventRouter(ChainObject):
class EventRouter(ChainObject, sub_name="event"):
started: EventObserver
stopped: EventObserver
completed: EventObserver
Expand All @@ -52,7 +52,7 @@ class EventRouter(ChainObject):
observers: Dict[str, EventObserver]

def __init__(self, name: Optional[str] = None) -> None:
super().__init__(entity=EventRouter, sub_name="event", name=name)
super().__init__(name=name)

self.started = EventObserver()
self.stopped = EventObserver()
Expand All @@ -68,9 +68,7 @@ def __init__(self, name: Optional[str] = None) -> None:
}


class EventManager(EventRouter):
__chain_root__ = True

class EventManager(EventRouter, root=True):
async def emit_started(self, **kwargs: Any) -> None:
for event in self.chain_tail:
await event.started.trigger(**kwargs)
Expand Down
134 changes: 58 additions & 76 deletions aiogram_broadcaster/mailer/mailer.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,106 +116,88 @@ def content(self) -> ContentType:
def bot(self) -> Bot:
return self._bot

async def send(self, chat_id: int) -> Any:
method = await self._content.as_method(chat_id=chat_id, **self._contextual_data)
if self._settings.exclude_placeholders is not True:
method = await self._placeholder.render(
model=method,
exclude=self._settings.exclude_placeholders,
chat_id=chat_id,
**self._contextual_data,
)
return await method.as_(bot=self.bot)

async def add_chats(self, chats: Iterable[int]) -> Set[int]:
new_chats = await self._chat_engine.add_chats(
chats=chats,
state=ChatState.PENDING,
)
if self._status is MailerStatus.DESTROYED:
raise RuntimeError(f"Mailer id={self._id} cant be added the chats.")
new_chats = await self._chat_engine.add_chats(chats=chats, state=ChatState.PENDING)
if new_chats:
logger.info(
"Mailer id=%d new %d chats added.",
self._id,
len(new_chats),
)
if self._status is MailerStatus.COMPLETED:
self._status = MailerStatus.STOPPED
logger.info("Mailer id=%d new %d chats added.", self._id, len(new_chats))
return new_chats

async def reset_chats(self) -> None:
if self._status is MailerStatus.DESTROYED:
raise RuntimeError(f"Mailer id={self._id} cant be reset.")
await self._chat_engine.set_chats_state(state=ChatState.PENDING)
if self._status is MailerStatus.COMPLETED:
self._status = MailerStatus.STOPPED
logger.info("Mailer id=%d has been reset.")
logger.info("Mailer id=%d has been reset.", self._id)

async def send(self, chat_id: int) -> Any:
method = await self._content.as_method(
chat_id=chat_id,
**self._contextual_data,
)
if self._settings.exclude_placeholders is not True:
method = await self._placeholder.render(
model=method,
exclude=self._settings.exclude_placeholders,
chat_id=chat_id,
**self._contextual_data,
)
return await method.as_(bot=self._bot)

def start(self) -> None:
if self._status is not MailerStatus.STOPPED:
raise RuntimeError(f"Mailer id={self.id} cant be started.")
self._task.start(self.run())

async def wait(self) -> None:
if not self._task.started or self._task.waited:
raise RuntimeError(f"Mailer id={self.id} cant be waited.")
await self._task.wait()
async def destroy(self) -> None:
if self._status is MailerStatus.DESTROYED:
raise RuntimeError(f"Mailer id={self._id} cant be destroyed.")
if self._status is MailerStatus.STARTED:
await self.stop()
if self._settings.preserved:
del self._mailer_container[self._id]
if self._storage:
await self._storage.delete_record(mailer_id=self._id)
self._stop_event.set()
self._status = MailerStatus.DESTROYED
logger.info("Mailer id=%d destroyed.", self._id)

async def stop(self) -> None:
if self._status is not MailerStatus.STARTED:
raise RuntimeError(f"Mailer id={self.id} cant be stopped.")
await self._emit_stop()
raise RuntimeError(f"Mailer id={self._id} cant be stopped.")
logger.info("Mailer id=%d stopped.", self._id)
if not self._settings.disable_events:
await self._event.emit_stopped(**self._contextual_data)
self._stop_event.set()
self._status = MailerStatus.STOPPED

async def run(self) -> None:
if self._status is not MailerStatus.STOPPED:
raise RuntimeError(f"Mailer id={self.id} cant be started.")
await self._emit_start()
raise RuntimeError(f"Mailer id={self._id} cant be started.")
if not self._settings.disable_events:
await self._event.emit_started(**self._contextual_data)
self._stop_event.clear()
self._status = MailerStatus.STARTED
logger.info("Mailer id=%d started.", self._id)
try:
completed = await self._broadcast()
except:
await self.stop()
raise
else:
if completed:
await self._emit_complete()

async def destroy(self) -> None:
if not self._settings.preserved:
raise RuntimeError(f"Mailer id={self.id} cant be destroyed.")
await self._emit_destroy()

async def _emit_start(self) -> None:
logger.info("Mailer id=%d started.", self._id)
if not self._settings.disable_events:
await self._event.emit_started(**self._contextual_data)
self._stop_event.clear()
self._status = MailerStatus.STARTED

async def _emit_stop(self) -> None:
logger.info("Mailer id=%d stopped.", self._id)
if not self._settings.disable_events:
await self._event.emit_stopped(**self._contextual_data)
self._stop_event.set()
self._status = MailerStatus.STOPPED

async def _emit_complete(self) -> None:
logger.info("Mailer id=%d completed.", self._id)
if not self._settings.disable_events:
await self._event.emit_completed(**self._contextual_data)
if self._settings.destroy_on_complete:
await self._emit_destroy()
else:
if completed:
if not self._settings.disable_events:
await self._event.emit_completed(**self._contextual_data)
self._stop_event.set()
self._status = MailerStatus.COMPLETED
self._stop_event.set()
if self._settings.destroy_on_complete:
await self.destroy()
logger.info("Mailer id=%d completed.", self._id)

async def _emit_destroy(self) -> None:
if self._status is MailerStatus.STARTED:
await self._emit_stop()
if self._storage:
await self._storage.delete_record(mailer_id=self._id)
self._mailer_container.pop(self._id, None)
self._status = MailerStatus.DESTROYED
logger.info("Mailer id=%d destroyed.", self._id)
def start(self) -> None:
if self._status is not MailerStatus.STOPPED:
raise RuntimeError(f"Mailer id={self._id} cant be started.")
self._task.start(self.run())

async def wait(self) -> None:
if not self._task.started or self._task.waited:
raise RuntimeError(f"Mailer id={self._id} cant be waited.")
await self._task.wait()

async def _broadcast(self) -> bool:
async for chat in self._chat_engine.iterate_chats(state=ChatState.PENDING):
Expand Down
Loading

0 comments on commit c8e366e

Please sign in to comment.