mirror of
https://gitlab.com/MoonTestUse1/AdministrationItDepartmens.git
synced 2025-08-14 00:25:46 +02:00
142 lines
4.6 KiB
Python
142 lines
4.6 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional
|
|
|
|
from aiogram.dispatcher.middlewares.manager import MiddlewareManager
|
|
|
|
from ...exceptions import UnsupportedKeywordArgument
|
|
from ...filters.base import Filter
|
|
from ...types import TelegramObject
|
|
from .bases import UNHANDLED, MiddlewareType, SkipHandler
|
|
from .handler import CallbackType, FilterObject, HandlerObject
|
|
|
|
if TYPE_CHECKING:
|
|
from aiogram.dispatcher.router import Router
|
|
|
|
|
|
class TelegramEventObserver:
|
|
"""
|
|
Event observer for Telegram events
|
|
|
|
Here you can register handler with filter.
|
|
This observer will stop event propagation when first handler is pass.
|
|
"""
|
|
|
|
def __init__(self, router: Router, event_name: str) -> None:
|
|
self.router: Router = router
|
|
self.event_name: str = event_name
|
|
|
|
self.handlers: List[HandlerObject] = []
|
|
|
|
self.middleware = MiddlewareManager()
|
|
self.outer_middleware = MiddlewareManager()
|
|
|
|
# Re-used filters check method from already implemented handler object
|
|
# with dummy callback which never will be used
|
|
self._handler = HandlerObject(callback=lambda: True, filters=[])
|
|
|
|
def filter(self, *filters: CallbackType) -> None:
|
|
"""
|
|
Register filter for all handlers of this event observer
|
|
|
|
:param filters: positional filters
|
|
"""
|
|
if self._handler.filters is None:
|
|
self._handler.filters = []
|
|
self._handler.filters.extend([FilterObject(filter_) for filter_ in filters])
|
|
|
|
def _resolve_middlewares(self) -> List[MiddlewareType[TelegramObject]]:
|
|
middlewares: List[MiddlewareType[TelegramObject]] = []
|
|
for router in reversed(tuple(self.router.chain_head)):
|
|
observer = router.observers.get(self.event_name)
|
|
if observer:
|
|
middlewares.extend(observer.middleware)
|
|
|
|
return middlewares
|
|
|
|
def register(
|
|
self,
|
|
callback: CallbackType,
|
|
*filters: CallbackType,
|
|
flags: Optional[Dict[str, Any]] = None,
|
|
**kwargs: Any,
|
|
) -> CallbackType:
|
|
"""
|
|
Register event handler
|
|
"""
|
|
if kwargs:
|
|
raise UnsupportedKeywordArgument(
|
|
"Passing any additional keyword arguments to the registrar method "
|
|
"is not supported.\n"
|
|
"This error may be caused when you are trying to register filters like in 2.x "
|
|
"version of this framework, if it's true just look at correspoding "
|
|
"documentation pages.\n"
|
|
f"Please remove the {set(kwargs.keys())} arguments from this call.\n"
|
|
)
|
|
|
|
if flags is None:
|
|
flags = {}
|
|
|
|
for item in filters:
|
|
if isinstance(item, Filter):
|
|
item.update_handler_flags(flags=flags)
|
|
|
|
self.handlers.append(
|
|
HandlerObject(
|
|
callback=callback,
|
|
filters=[FilterObject(filter_) for filter_ in filters],
|
|
flags=flags,
|
|
)
|
|
)
|
|
|
|
return callback
|
|
|
|
def wrap_outer_middleware(
|
|
self, callback: Any, event: TelegramObject, data: Dict[str, Any]
|
|
) -> Any:
|
|
wrapped_outer = self.middleware.wrap_middlewares(
|
|
self.outer_middleware,
|
|
callback,
|
|
)
|
|
return wrapped_outer(event, data)
|
|
|
|
def check_root_filters(self, event: TelegramObject, **kwargs: Any) -> Any:
|
|
return self._handler.check(event, **kwargs)
|
|
|
|
async def trigger(self, event: TelegramObject, **kwargs: Any) -> Any:
|
|
"""
|
|
Propagate event to handlers and stops propagation on first match.
|
|
Handler will be called when all its filters are pass.
|
|
"""
|
|
for handler in self.handlers:
|
|
kwargs["handler"] = handler
|
|
result, data = await handler.check(event, **kwargs)
|
|
if result:
|
|
kwargs.update(data)
|
|
try:
|
|
wrapped_inner = self.outer_middleware.wrap_middlewares(
|
|
self._resolve_middlewares(),
|
|
handler.call,
|
|
)
|
|
return await wrapped_inner(event, kwargs)
|
|
except SkipHandler:
|
|
continue
|
|
|
|
return UNHANDLED
|
|
|
|
def __call__(
|
|
self,
|
|
*filters: CallbackType,
|
|
flags: Optional[Dict[str, Any]] = None,
|
|
**kwargs: Any,
|
|
) -> Callable[[CallbackType], CallbackType]:
|
|
"""
|
|
Decorator for registering event handlers
|
|
"""
|
|
|
|
def wrapper(callback: CallbackType) -> CallbackType:
|
|
self.register(callback, *filters, flags=flags, **kwargs)
|
|
return callback
|
|
|
|
return wrapper
|