1
0
mirror of https://gitlab.com/MoonTestUse1/AdministrationItDepartmens.git synced 2025-08-14 00:25:46 +02:00
Files
MoonTestUse1 e81df4c87e Initial commit
2024-12-23 19:27:44 +06:00

142 lines
4.6 KiB
Python

from __future__ import annotations
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional
from aiogram.dispatcher.middlewares.manager import MiddlewareManager
from ...exceptions import UnsupportedKeywordArgument
from ...filters.base import Filter
from ...types import TelegramObject
from .bases import UNHANDLED, MiddlewareType, SkipHandler
from .handler import CallbackType, FilterObject, HandlerObject
if TYPE_CHECKING:
from aiogram.dispatcher.router import Router
class TelegramEventObserver:
"""
Event observer for Telegram events
Here you can register handler with filter.
This observer will stop event propagation when first handler is pass.
"""
def __init__(self, router: Router, event_name: str) -> None:
self.router: Router = router
self.event_name: str = event_name
self.handlers: List[HandlerObject] = []
self.middleware = MiddlewareManager()
self.outer_middleware = MiddlewareManager()
# Re-used filters check method from already implemented handler object
# with dummy callback which never will be used
self._handler = HandlerObject(callback=lambda: True, filters=[])
def filter(self, *filters: CallbackType) -> None:
"""
Register filter for all handlers of this event observer
:param filters: positional filters
"""
if self._handler.filters is None:
self._handler.filters = []
self._handler.filters.extend([FilterObject(filter_) for filter_ in filters])
def _resolve_middlewares(self) -> List[MiddlewareType[TelegramObject]]:
middlewares: List[MiddlewareType[TelegramObject]] = []
for router in reversed(tuple(self.router.chain_head)):
observer = router.observers.get(self.event_name)
if observer:
middlewares.extend(observer.middleware)
return middlewares
def register(
self,
callback: CallbackType,
*filters: CallbackType,
flags: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> CallbackType:
"""
Register event handler
"""
if kwargs:
raise UnsupportedKeywordArgument(
"Passing any additional keyword arguments to the registrar method "
"is not supported.\n"
"This error may be caused when you are trying to register filters like in 2.x "
"version of this framework, if it's true just look at correspoding "
"documentation pages.\n"
f"Please remove the {set(kwargs.keys())} arguments from this call.\n"
)
if flags is None:
flags = {}
for item in filters:
if isinstance(item, Filter):
item.update_handler_flags(flags=flags)
self.handlers.append(
HandlerObject(
callback=callback,
filters=[FilterObject(filter_) for filter_ in filters],
flags=flags,
)
)
return callback
def wrap_outer_middleware(
self, callback: Any, event: TelegramObject, data: Dict[str, Any]
) -> Any:
wrapped_outer = self.middleware.wrap_middlewares(
self.outer_middleware,
callback,
)
return wrapped_outer(event, data)
def check_root_filters(self, event: TelegramObject, **kwargs: Any) -> Any:
return self._handler.check(event, **kwargs)
async def trigger(self, event: TelegramObject, **kwargs: Any) -> Any:
"""
Propagate event to handlers and stops propagation on first match.
Handler will be called when all its filters are pass.
"""
for handler in self.handlers:
kwargs["handler"] = handler
result, data = await handler.check(event, **kwargs)
if result:
kwargs.update(data)
try:
wrapped_inner = self.outer_middleware.wrap_middlewares(
self._resolve_middlewares(),
handler.call,
)
return await wrapped_inner(event, kwargs)
except SkipHandler:
continue
return UNHANDLED
def __call__(
self,
*filters: CallbackType,
flags: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Callable[[CallbackType], CallbackType]:
"""
Decorator for registering event handlers
"""
def wrapper(callback: CallbackType) -> CallbackType:
self.register(callback, *filters, flags=flags, **kwargs)
return callback
return wrapper