1
0
mirror of https://gitlab.com/MoonTestUse1/AdministrationItDepartmens.git synced 2025-08-14 00:25:46 +02:00

Все подряд

This commit is contained in:
MoonTestUse1
2024-12-31 02:37:57 +06:00
parent 8e53bb6cb2
commit d5780b2eab
3258 changed files with 1087440 additions and 268 deletions

View File

@@ -0,0 +1,37 @@
try:
from ._version import version as __version__
except ImportError:
# broken installation, we don't even try
# unknown only works because we do poor mans version compare
__version__ = "unknown"
__all__ = [
"__version__",
"PluginManager",
"PluginValidationError",
"HookCaller",
"HookCallError",
"HookspecOpts",
"HookimplOpts",
"HookImpl",
"HookRelay",
"HookspecMarker",
"HookimplMarker",
"Result",
"PluggyWarning",
"PluggyTeardownRaisedWarning",
]
from ._hooks import HookCaller
from ._hooks import HookImpl
from ._hooks import HookimplMarker
from ._hooks import HookimplOpts
from ._hooks import HookRelay
from ._hooks import HookspecMarker
from ._hooks import HookspecOpts
from ._manager import PluginManager
from ._manager import PluginValidationError
from ._result import HookCallError
from ._result import Result
from ._warnings import PluggyTeardownRaisedWarning
from ._warnings import PluggyWarning

View File

@@ -0,0 +1,182 @@
"""
Call loop machinery
"""
from __future__ import annotations
from typing import cast
from typing import Generator
from typing import Mapping
from typing import NoReturn
from typing import Sequence
from typing import Tuple
from typing import Union
import warnings
from ._hooks import HookImpl
from ._result import HookCallError
from ._result import Result
from ._warnings import PluggyTeardownRaisedWarning
# Need to distinguish between old- and new-style hook wrappers.
# Wrapping with a tuple is the fastest type-safe way I found to do it.
Teardown = Union[
Tuple[Generator[None, Result[object], None], HookImpl],
Generator[None, object, object],
]
def _raise_wrapfail(
wrap_controller: (
Generator[None, Result[object], None] | Generator[None, object, object]
),
msg: str,
) -> NoReturn:
co = wrap_controller.gi_code
raise RuntimeError(
"wrap_controller at %r %s:%d %s"
% (co.co_name, co.co_filename, co.co_firstlineno, msg)
)
def _warn_teardown_exception(
hook_name: str, hook_impl: HookImpl, e: BaseException
) -> None:
msg = "A plugin raised an exception during an old-style hookwrapper teardown.\n"
msg += f"Plugin: {hook_impl.plugin_name}, Hook: {hook_name}\n"
msg += f"{type(e).__name__}: {e}\n"
msg += "For more information see https://pluggy.readthedocs.io/en/stable/api_reference.html#pluggy.PluggyTeardownRaisedWarning" # noqa: E501
warnings.warn(PluggyTeardownRaisedWarning(msg), stacklevel=5)
def _multicall(
hook_name: str,
hook_impls: Sequence[HookImpl],
caller_kwargs: Mapping[str, object],
firstresult: bool,
) -> object | list[object]:
"""Execute a call into multiple python functions/methods and return the
result(s).
``caller_kwargs`` comes from HookCaller.__call__().
"""
__tracebackhide__ = True
results: list[object] = []
exception = None
only_new_style_wrappers = True
try: # run impl and wrapper setup functions in a loop
teardowns: list[Teardown] = []
try:
for hook_impl in reversed(hook_impls):
try:
args = [caller_kwargs[argname] for argname in hook_impl.argnames]
except KeyError:
for argname in hook_impl.argnames:
if argname not in caller_kwargs:
raise HookCallError(
f"hook call must provide argument {argname!r}"
)
if hook_impl.hookwrapper:
only_new_style_wrappers = False
try:
# If this cast is not valid, a type error is raised below,
# which is the desired response.
res = hook_impl.function(*args)
wrapper_gen = cast(Generator[None, Result[object], None], res)
next(wrapper_gen) # first yield
teardowns.append((wrapper_gen, hook_impl))
except StopIteration:
_raise_wrapfail(wrapper_gen, "did not yield")
elif hook_impl.wrapper:
try:
# If this cast is not valid, a type error is raised below,
# which is the desired response.
res = hook_impl.function(*args)
function_gen = cast(Generator[None, object, object], res)
next(function_gen) # first yield
teardowns.append(function_gen)
except StopIteration:
_raise_wrapfail(function_gen, "did not yield")
else:
res = hook_impl.function(*args)
if res is not None:
results.append(res)
if firstresult: # halt further impl calls
break
except BaseException as exc:
exception = exc
finally:
# Fast path - only new-style wrappers, no Result.
if only_new_style_wrappers:
if firstresult: # first result hooks return a single value
result = results[0] if results else None
else:
result = results
# run all wrapper post-yield blocks
for teardown in reversed(teardowns):
try:
if exception is not None:
teardown.throw(exception) # type: ignore[union-attr]
else:
teardown.send(result) # type: ignore[union-attr]
# Following is unreachable for a well behaved hook wrapper.
# Try to force finalizers otherwise postponed till GC action.
# Note: close() may raise if generator handles GeneratorExit.
teardown.close() # type: ignore[union-attr]
except StopIteration as si:
result = si.value
exception = None
continue
except BaseException as e:
exception = e
continue
_raise_wrapfail(teardown, "has second yield") # type: ignore[arg-type]
if exception is not None:
raise exception.with_traceback(exception.__traceback__)
else:
return result
# Slow path - need to support old-style wrappers.
else:
if firstresult: # first result hooks return a single value
outcome: Result[object | list[object]] = Result(
results[0] if results else None, exception
)
else:
outcome = Result(results, exception)
# run all wrapper post-yield blocks
for teardown in reversed(teardowns):
if isinstance(teardown, tuple):
try:
teardown[0].send(outcome)
except StopIteration:
pass
except BaseException as e:
_warn_teardown_exception(hook_name, teardown[1], e)
raise
else:
_raise_wrapfail(teardown[0], "has second yield")
else:
try:
if outcome._exception is not None:
teardown.throw(outcome._exception)
else:
teardown.send(outcome._result)
# Following is unreachable for a well behaved hook wrapper.
# Try to force finalizers otherwise postponed till GC action.
# Note: close() may raise if generator handles GeneratorExit.
teardown.close()
except StopIteration as si:
outcome.force_result(si.value)
continue
except BaseException as e:
outcome.force_exception(e)
continue
_raise_wrapfail(teardown, "has second yield")
return outcome.get_result()

View File

@@ -0,0 +1,715 @@
"""
Internal hook annotation, representation and calling machinery.
"""
from __future__ import annotations
import inspect
import sys
from types import ModuleType
from typing import AbstractSet
from typing import Any
from typing import Callable
from typing import Final
from typing import final
from typing import Generator
from typing import List
from typing import Mapping
from typing import Optional
from typing import overload
from typing import Sequence
from typing import Tuple
from typing import TYPE_CHECKING
from typing import TypedDict
from typing import TypeVar
from typing import Union
import warnings
from ._result import Result
_T = TypeVar("_T")
_F = TypeVar("_F", bound=Callable[..., object])
_Namespace = Union[ModuleType, type]
_Plugin = object
_HookExec = Callable[
[str, Sequence["HookImpl"], Mapping[str, object], bool],
Union[object, List[object]],
]
_HookImplFunction = Callable[..., Union[_T, Generator[None, Result[_T], None]]]
class HookspecOpts(TypedDict):
"""Options for a hook specification."""
#: Whether the hook is :ref:`first result only <firstresult>`.
firstresult: bool
#: Whether the hook is :ref:`historic <historic>`.
historic: bool
#: Whether the hook :ref:`warns when implemented <warn_on_impl>`.
warn_on_impl: Warning | None
#: Whether the hook warns when :ref:`certain arguments are requested
#: <warn_on_impl>`.
#:
#: .. versionadded:: 1.5
warn_on_impl_args: Mapping[str, Warning] | None
class HookimplOpts(TypedDict):
"""Options for a hook implementation."""
#: Whether the hook implementation is a :ref:`wrapper <hookwrapper>`.
wrapper: bool
#: Whether the hook implementation is an :ref:`old-style wrapper
#: <old_style_hookwrappers>`.
hookwrapper: bool
#: Whether validation against a hook specification is :ref:`optional
#: <optionalhook>`.
optionalhook: bool
#: Whether to try to order this hook implementation :ref:`first
#: <callorder>`.
tryfirst: bool
#: Whether to try to order this hook implementation :ref:`last
#: <callorder>`.
trylast: bool
#: The name of the hook specification to match, see :ref:`specname`.
specname: str | None
@final
class HookspecMarker:
"""Decorator for marking functions as hook specifications.
Instantiate it with a project_name to get a decorator.
Calling :meth:`PluginManager.add_hookspecs` later will discover all marked
functions if the :class:`PluginManager` uses the same project name.
"""
__slots__ = ("project_name",)
def __init__(self, project_name: str) -> None:
self.project_name: Final = project_name
@overload
def __call__(
self,
function: _F,
firstresult: bool = False,
historic: bool = False,
warn_on_impl: Warning | None = None,
warn_on_impl_args: Mapping[str, Warning] | None = None,
) -> _F: ...
@overload # noqa: F811
def __call__( # noqa: F811
self,
function: None = ...,
firstresult: bool = ...,
historic: bool = ...,
warn_on_impl: Warning | None = ...,
warn_on_impl_args: Mapping[str, Warning] | None = ...,
) -> Callable[[_F], _F]: ...
def __call__( # noqa: F811
self,
function: _F | None = None,
firstresult: bool = False,
historic: bool = False,
warn_on_impl: Warning | None = None,
warn_on_impl_args: Mapping[str, Warning] | None = None,
) -> _F | Callable[[_F], _F]:
"""If passed a function, directly sets attributes on the function
which will make it discoverable to :meth:`PluginManager.add_hookspecs`.
If passed no function, returns a decorator which can be applied to a
function later using the attributes supplied.
:param firstresult:
If ``True``, the 1:N hook call (N being the number of registered
hook implementation functions) will stop at I<=N when the I'th
function returns a non-``None`` result. See :ref:`firstresult`.
:param historic:
If ``True``, every call to the hook will be memorized and replayed
on plugins registered after the call was made. See :ref:`historic`.
:param warn_on_impl:
If given, every implementation of this hook will trigger the given
warning. See :ref:`warn_on_impl`.
:param warn_on_impl_args:
If given, every implementation of this hook which requests one of
the arguments in the dict will trigger the corresponding warning.
See :ref:`warn_on_impl`.
.. versionadded:: 1.5
"""
def setattr_hookspec_opts(func: _F) -> _F:
if historic and firstresult:
raise ValueError("cannot have a historic firstresult hook")
opts: HookspecOpts = {
"firstresult": firstresult,
"historic": historic,
"warn_on_impl": warn_on_impl,
"warn_on_impl_args": warn_on_impl_args,
}
setattr(func, self.project_name + "_spec", opts)
return func
if function is not None:
return setattr_hookspec_opts(function)
else:
return setattr_hookspec_opts
@final
class HookimplMarker:
"""Decorator for marking functions as hook implementations.
Instantiate it with a ``project_name`` to get a decorator.
Calling :meth:`PluginManager.register` later will discover all marked
functions if the :class:`PluginManager` uses the same project name.
"""
__slots__ = ("project_name",)
def __init__(self, project_name: str) -> None:
self.project_name: Final = project_name
@overload
def __call__(
self,
function: _F,
hookwrapper: bool = ...,
optionalhook: bool = ...,
tryfirst: bool = ...,
trylast: bool = ...,
specname: str | None = ...,
wrapper: bool = ...,
) -> _F: ...
@overload # noqa: F811
def __call__( # noqa: F811
self,
function: None = ...,
hookwrapper: bool = ...,
optionalhook: bool = ...,
tryfirst: bool = ...,
trylast: bool = ...,
specname: str | None = ...,
wrapper: bool = ...,
) -> Callable[[_F], _F]: ...
def __call__( # noqa: F811
self,
function: _F | None = None,
hookwrapper: bool = False,
optionalhook: bool = False,
tryfirst: bool = False,
trylast: bool = False,
specname: str | None = None,
wrapper: bool = False,
) -> _F | Callable[[_F], _F]:
"""If passed a function, directly sets attributes on the function
which will make it discoverable to :meth:`PluginManager.register`.
If passed no function, returns a decorator which can be applied to a
function later using the attributes supplied.
:param optionalhook:
If ``True``, a missing matching hook specification will not result
in an error (by default it is an error if no matching spec is
found). See :ref:`optionalhook`.
:param tryfirst:
If ``True``, this hook implementation will run as early as possible
in the chain of N hook implementations for a specification. See
:ref:`callorder`.
:param trylast:
If ``True``, this hook implementation will run as late as possible
in the chain of N hook implementations for a specification. See
:ref:`callorder`.
:param wrapper:
If ``True`` ("new-style hook wrapper"), the hook implementation
needs to execute exactly one ``yield``. The code before the
``yield`` is run early before any non-hook-wrapper function is run.
The code after the ``yield`` is run after all non-hook-wrapper
functions have run. The ``yield`` receives the result value of the
inner calls, or raises the exception of inner calls (including
earlier hook wrapper calls). The return value of the function
becomes the return value of the hook, and a raised exception becomes
the exception of the hook. See :ref:`hookwrapper`.
:param hookwrapper:
If ``True`` ("old-style hook wrapper"), the hook implementation
needs to execute exactly one ``yield``. The code before the
``yield`` is run early before any non-hook-wrapper function is run.
The code after the ``yield`` is run after all non-hook-wrapper
function have run The ``yield`` receives a :class:`Result` object
representing the exception or result outcome of the inner calls
(including earlier hook wrapper calls). This option is mutually
exclusive with ``wrapper``. See :ref:`old_style_hookwrapper`.
:param specname:
If provided, the given name will be used instead of the function
name when matching this hook implementation to a hook specification
during registration. See :ref:`specname`.
.. versionadded:: 1.2.0
The ``wrapper`` parameter.
"""
def setattr_hookimpl_opts(func: _F) -> _F:
opts: HookimplOpts = {
"wrapper": wrapper,
"hookwrapper": hookwrapper,
"optionalhook": optionalhook,
"tryfirst": tryfirst,
"trylast": trylast,
"specname": specname,
}
setattr(func, self.project_name + "_impl", opts)
return func
if function is None:
return setattr_hookimpl_opts
else:
return setattr_hookimpl_opts(function)
def normalize_hookimpl_opts(opts: HookimplOpts) -> None:
opts.setdefault("tryfirst", False)
opts.setdefault("trylast", False)
opts.setdefault("wrapper", False)
opts.setdefault("hookwrapper", False)
opts.setdefault("optionalhook", False)
opts.setdefault("specname", None)
_PYPY = hasattr(sys, "pypy_version_info")
def varnames(func: object) -> tuple[tuple[str, ...], tuple[str, ...]]:
"""Return tuple of positional and keywrord argument names for a function,
method, class or callable.
In case of a class, its ``__init__`` method is considered.
For methods the ``self`` parameter is not included.
"""
if inspect.isclass(func):
try:
func = func.__init__
except AttributeError:
return (), ()
elif not inspect.isroutine(func): # callable object?
try:
func = getattr(func, "__call__", func)
except Exception:
return (), ()
try:
# func MUST be a function or method here or we won't parse any args.
sig = inspect.signature(
func.__func__ if inspect.ismethod(func) else func # type:ignore[arg-type]
)
except TypeError:
return (), ()
_valid_param_kinds = (
inspect.Parameter.POSITIONAL_ONLY,
inspect.Parameter.POSITIONAL_OR_KEYWORD,
)
_valid_params = {
name: param
for name, param in sig.parameters.items()
if param.kind in _valid_param_kinds
}
args = tuple(_valid_params)
defaults = (
tuple(
param.default
for param in _valid_params.values()
if param.default is not param.empty
)
or None
)
if defaults:
index = -len(defaults)
args, kwargs = args[:index], tuple(args[index:])
else:
kwargs = ()
# strip any implicit instance arg
# pypy3 uses "obj" instead of "self" for default dunder methods
if not _PYPY:
implicit_names: tuple[str, ...] = ("self",)
else:
implicit_names = ("self", "obj")
if args:
qualname: str = getattr(func, "__qualname__", "")
if inspect.ismethod(func) or ("." in qualname and args[0] in implicit_names):
args = args[1:]
return args, kwargs
@final
class HookRelay:
"""Hook holder object for performing 1:N hook calls where N is the number
of registered plugins."""
__slots__ = ("__dict__",)
def __init__(self) -> None:
""":meta private:"""
if TYPE_CHECKING:
def __getattr__(self, name: str) -> HookCaller: ...
# Historical name (pluggy<=1.2), kept for backward compatibility.
_HookRelay = HookRelay
_CallHistory = List[Tuple[Mapping[str, object], Optional[Callable[[Any], None]]]]
class HookCaller:
"""A caller of all registered implementations of a hook specification."""
__slots__ = (
"name",
"spec",
"_hookexec",
"_hookimpls",
"_call_history",
)
def __init__(
self,
name: str,
hook_execute: _HookExec,
specmodule_or_class: _Namespace | None = None,
spec_opts: HookspecOpts | None = None,
) -> None:
""":meta private:"""
#: Name of the hook getting called.
self.name: Final = name
self._hookexec: Final = hook_execute
# The hookimpls list. The caller iterates it *in reverse*. Format:
# 1. trylast nonwrappers
# 2. nonwrappers
# 3. tryfirst nonwrappers
# 4. trylast wrappers
# 5. wrappers
# 6. tryfirst wrappers
self._hookimpls: Final[list[HookImpl]] = []
self._call_history: _CallHistory | None = None
# TODO: Document, or make private.
self.spec: HookSpec | None = None
if specmodule_or_class is not None:
assert spec_opts is not None
self.set_specification(specmodule_or_class, spec_opts)
# TODO: Document, or make private.
def has_spec(self) -> bool:
return self.spec is not None
# TODO: Document, or make private.
def set_specification(
self,
specmodule_or_class: _Namespace,
spec_opts: HookspecOpts,
) -> None:
if self.spec is not None:
raise ValueError(
f"Hook {self.spec.name!r} is already registered "
f"within namespace {self.spec.namespace}"
)
self.spec = HookSpec(specmodule_or_class, self.name, spec_opts)
if spec_opts.get("historic"):
self._call_history = []
def is_historic(self) -> bool:
"""Whether this caller is :ref:`historic <historic>`."""
return self._call_history is not None
def _remove_plugin(self, plugin: _Plugin) -> None:
for i, method in enumerate(self._hookimpls):
if method.plugin == plugin:
del self._hookimpls[i]
return
raise ValueError(f"plugin {plugin!r} not found")
def get_hookimpls(self) -> list[HookImpl]:
"""Get all registered hook implementations for this hook."""
return self._hookimpls.copy()
def _add_hookimpl(self, hookimpl: HookImpl) -> None:
"""Add an implementation to the callback chain."""
for i, method in enumerate(self._hookimpls):
if method.hookwrapper or method.wrapper:
splitpoint = i
break
else:
splitpoint = len(self._hookimpls)
if hookimpl.hookwrapper or hookimpl.wrapper:
start, end = splitpoint, len(self._hookimpls)
else:
start, end = 0, splitpoint
if hookimpl.trylast:
self._hookimpls.insert(start, hookimpl)
elif hookimpl.tryfirst:
self._hookimpls.insert(end, hookimpl)
else:
# find last non-tryfirst method
i = end - 1
while i >= start and self._hookimpls[i].tryfirst:
i -= 1
self._hookimpls.insert(i + 1, hookimpl)
def __repr__(self) -> str:
return f"<HookCaller {self.name!r}>"
def _verify_all_args_are_provided(self, kwargs: Mapping[str, object]) -> None:
# This is written to avoid expensive operations when not needed.
if self.spec:
for argname in self.spec.argnames:
if argname not in kwargs:
notincall = ", ".join(
repr(argname)
for argname in self.spec.argnames
# Avoid self.spec.argnames - kwargs.keys() - doesn't preserve order.
if argname not in kwargs.keys()
)
warnings.warn(
"Argument(s) {} which are declared in the hookspec "
"cannot be found in this hook call".format(notincall),
stacklevel=2,
)
break
def __call__(self, **kwargs: object) -> Any:
"""Call the hook.
Only accepts keyword arguments, which should match the hook
specification.
Returns the result(s) of calling all registered plugins, see
:ref:`calling`.
"""
assert (
not self.is_historic()
), "Cannot directly call a historic hook - use call_historic instead."
self._verify_all_args_are_provided(kwargs)
firstresult = self.spec.opts.get("firstresult", False) if self.spec else False
# Copy because plugins may register other plugins during iteration (#438).
return self._hookexec(self.name, self._hookimpls.copy(), kwargs, firstresult)
def call_historic(
self,
result_callback: Callable[[Any], None] | None = None,
kwargs: Mapping[str, object] | None = None,
) -> None:
"""Call the hook with given ``kwargs`` for all registered plugins and
for all plugins which will be registered afterwards, see
:ref:`historic`.
:param result_callback:
If provided, will be called for each non-``None`` result obtained
from a hook implementation.
"""
assert self._call_history is not None
kwargs = kwargs or {}
self._verify_all_args_are_provided(kwargs)
self._call_history.append((kwargs, result_callback))
# Historizing hooks don't return results.
# Remember firstresult isn't compatible with historic.
# Copy because plugins may register other plugins during iteration (#438).
res = self._hookexec(self.name, self._hookimpls.copy(), kwargs, False)
if result_callback is None:
return
if isinstance(res, list):
for x in res:
result_callback(x)
def call_extra(
self, methods: Sequence[Callable[..., object]], kwargs: Mapping[str, object]
) -> Any:
"""Call the hook with some additional temporarily participating
methods using the specified ``kwargs`` as call parameters, see
:ref:`call_extra`."""
assert (
not self.is_historic()
), "Cannot directly call a historic hook - use call_historic instead."
self._verify_all_args_are_provided(kwargs)
opts: HookimplOpts = {
"wrapper": False,
"hookwrapper": False,
"optionalhook": False,
"trylast": False,
"tryfirst": False,
"specname": None,
}
hookimpls = self._hookimpls.copy()
for method in methods:
hookimpl = HookImpl(None, "<temp>", method, opts)
# Find last non-tryfirst nonwrapper method.
i = len(hookimpls) - 1
while i >= 0 and (
# Skip wrappers.
(hookimpls[i].hookwrapper or hookimpls[i].wrapper)
# Skip tryfirst nonwrappers.
or hookimpls[i].tryfirst
):
i -= 1
hookimpls.insert(i + 1, hookimpl)
firstresult = self.spec.opts.get("firstresult", False) if self.spec else False
return self._hookexec(self.name, hookimpls, kwargs, firstresult)
def _maybe_apply_history(self, method: HookImpl) -> None:
"""Apply call history to a new hookimpl if it is marked as historic."""
if self.is_historic():
assert self._call_history is not None
for kwargs, result_callback in self._call_history:
res = self._hookexec(self.name, [method], kwargs, False)
if res and result_callback is not None:
# XXX: remember firstresult isn't compat with historic
assert isinstance(res, list)
result_callback(res[0])
# Historical name (pluggy<=1.2), kept for backward compatibility.
_HookCaller = HookCaller
class _SubsetHookCaller(HookCaller):
"""A proxy to another HookCaller which manages calls to all registered
plugins except the ones from remove_plugins."""
# This class is unusual: in inhertits from `HookCaller` so all of
# the *code* runs in the class, but it delegates all underlying *data*
# to the original HookCaller.
# `subset_hook_caller` used to be implemented by creating a full-fledged
# HookCaller, copying all hookimpls from the original. This had problems
# with memory leaks (#346) and historic calls (#347), which make a proxy
# approach better.
# An alternative implementation is to use a `_getattr__`/`__getattribute__`
# proxy, however that adds more overhead and is more tricky to implement.
__slots__ = (
"_orig",
"_remove_plugins",
)
def __init__(self, orig: HookCaller, remove_plugins: AbstractSet[_Plugin]) -> None:
self._orig = orig
self._remove_plugins = remove_plugins
self.name = orig.name # type: ignore[misc]
self._hookexec = orig._hookexec # type: ignore[misc]
@property # type: ignore[misc]
def _hookimpls(self) -> list[HookImpl]:
return [
impl
for impl in self._orig._hookimpls
if impl.plugin not in self._remove_plugins
]
@property
def spec(self) -> HookSpec | None: # type: ignore[override]
return self._orig.spec
@property
def _call_history(self) -> _CallHistory | None: # type: ignore[override]
return self._orig._call_history
def __repr__(self) -> str:
return f"<_SubsetHookCaller {self.name!r}>"
@final
class HookImpl:
"""A hook implementation in a :class:`HookCaller`."""
__slots__ = (
"function",
"argnames",
"kwargnames",
"plugin",
"opts",
"plugin_name",
"wrapper",
"hookwrapper",
"optionalhook",
"tryfirst",
"trylast",
)
def __init__(
self,
plugin: _Plugin,
plugin_name: str,
function: _HookImplFunction[object],
hook_impl_opts: HookimplOpts,
) -> None:
""":meta private:"""
#: The hook implementation function.
self.function: Final = function
argnames, kwargnames = varnames(self.function)
#: The positional parameter names of ``function```.
self.argnames: Final = argnames
#: The keyword parameter names of ``function```.
self.kwargnames: Final = kwargnames
#: The plugin which defined this hook implementation.
self.plugin: Final = plugin
#: The :class:`HookimplOpts` used to configure this hook implementation.
self.opts: Final = hook_impl_opts
#: The name of the plugin which defined this hook implementation.
self.plugin_name: Final = plugin_name
#: Whether the hook implementation is a :ref:`wrapper <hookwrapper>`.
self.wrapper: Final = hook_impl_opts["wrapper"]
#: Whether the hook implementation is an :ref:`old-style wrapper
#: <old_style_hookwrappers>`.
self.hookwrapper: Final = hook_impl_opts["hookwrapper"]
#: Whether validation against a hook specification is :ref:`optional
#: <optionalhook>`.
self.optionalhook: Final = hook_impl_opts["optionalhook"]
#: Whether to try to order this hook implementation :ref:`first
#: <callorder>`.
self.tryfirst: Final = hook_impl_opts["tryfirst"]
#: Whether to try to order this hook implementation :ref:`last
#: <callorder>`.
self.trylast: Final = hook_impl_opts["trylast"]
def __repr__(self) -> str:
return f"<HookImpl plugin_name={self.plugin_name!r}, plugin={self.plugin!r}>"
@final
class HookSpec:
__slots__ = (
"namespace",
"function",
"name",
"argnames",
"kwargnames",
"opts",
"warn_on_impl",
"warn_on_impl_args",
)
def __init__(self, namespace: _Namespace, name: str, opts: HookspecOpts) -> None:
self.namespace = namespace
self.function: Callable[..., object] = getattr(namespace, name)
self.name = name
self.argnames, self.kwargnames = varnames(self.function)
self.opts = opts
self.warn_on_impl = opts.get("warn_on_impl")
self.warn_on_impl_args = opts.get("warn_on_impl_args")

View File

@@ -0,0 +1,528 @@
from __future__ import annotations
import inspect
import types
from typing import Any
from typing import Callable
from typing import cast
from typing import Final
from typing import Iterable
from typing import Mapping
from typing import Sequence
from typing import TYPE_CHECKING
import warnings
from . import _tracing
from ._callers import _multicall
from ._hooks import _HookImplFunction
from ._hooks import _Namespace
from ._hooks import _Plugin
from ._hooks import _SubsetHookCaller
from ._hooks import HookCaller
from ._hooks import HookImpl
from ._hooks import HookimplOpts
from ._hooks import HookRelay
from ._hooks import HookspecOpts
from ._hooks import normalize_hookimpl_opts
from ._result import Result
if TYPE_CHECKING:
# importtlib.metadata import is slow, defer it.
import importlib.metadata
_BeforeTrace = Callable[[str, Sequence[HookImpl], Mapping[str, Any]], None]
_AfterTrace = Callable[[Result[Any], str, Sequence[HookImpl], Mapping[str, Any]], None]
def _warn_for_function(warning: Warning, function: Callable[..., object]) -> None:
func = cast(types.FunctionType, function)
warnings.warn_explicit(
warning,
type(warning),
lineno=func.__code__.co_firstlineno,
filename=func.__code__.co_filename,
)
class PluginValidationError(Exception):
"""Plugin failed validation.
:param plugin: The plugin which failed validation.
:param message: Error message.
"""
def __init__(self, plugin: _Plugin, message: str) -> None:
super().__init__(message)
#: The plugin which failed validation.
self.plugin = plugin
class DistFacade:
"""Emulate a pkg_resources Distribution"""
def __init__(self, dist: importlib.metadata.Distribution) -> None:
self._dist = dist
@property
def project_name(self) -> str:
name: str = self.metadata["name"]
return name
def __getattr__(self, attr: str, default=None):
return getattr(self._dist, attr, default)
def __dir__(self) -> list[str]:
return sorted(dir(self._dist) + ["_dist", "project_name"])
class PluginManager:
"""Core class which manages registration of plugin objects and 1:N hook
calling.
You can register new hooks by calling :meth:`add_hookspecs(module_or_class)
<PluginManager.add_hookspecs>`.
You can register plugin objects (which contain hook implementations) by
calling :meth:`register(plugin) <PluginManager.register>`.
For debugging purposes you can call :meth:`PluginManager.enable_tracing`
which will subsequently send debug information to the trace helper.
:param project_name:
The short project name. Prefer snake case. Make sure it's unique!
"""
def __init__(self, project_name: str) -> None:
#: The project name.
self.project_name: Final = project_name
self._name2plugin: Final[dict[str, _Plugin]] = {}
self._plugin_distinfo: Final[list[tuple[_Plugin, DistFacade]]] = []
#: The "hook relay", used to call a hook on all registered plugins.
#: See :ref:`calling`.
self.hook: Final = HookRelay()
#: The tracing entry point. See :ref:`tracing`.
self.trace: Final[_tracing.TagTracerSub] = _tracing.TagTracer().get(
"pluginmanage"
)
self._inner_hookexec = _multicall
def _hookexec(
self,
hook_name: str,
methods: Sequence[HookImpl],
kwargs: Mapping[str, object],
firstresult: bool,
) -> object | list[object]:
# called from all hookcaller instances.
# enable_tracing will set its own wrapping function at self._inner_hookexec
return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
def register(self, plugin: _Plugin, name: str | None = None) -> str | None:
"""Register a plugin and return its name.
:param name:
The name under which to register the plugin. If not specified, a
name is generated using :func:`get_canonical_name`.
:returns:
The plugin name. If the name is blocked from registering, returns
``None``.
If the plugin is already registered, raises a :exc:`ValueError`.
"""
plugin_name = name or self.get_canonical_name(plugin)
if plugin_name in self._name2plugin:
if self._name2plugin.get(plugin_name, -1) is None:
return None # blocked plugin, return None to indicate no registration
raise ValueError(
"Plugin name already registered: %s=%s\n%s"
% (plugin_name, plugin, self._name2plugin)
)
if plugin in self._name2plugin.values():
raise ValueError(
"Plugin already registered under a different name: %s=%s\n%s"
% (plugin_name, plugin, self._name2plugin)
)
# XXX if an error happens we should make sure no state has been
# changed at point of return
self._name2plugin[plugin_name] = plugin
# register matching hook implementations of the plugin
for name in dir(plugin):
hookimpl_opts = self.parse_hookimpl_opts(plugin, name)
if hookimpl_opts is not None:
normalize_hookimpl_opts(hookimpl_opts)
method: _HookImplFunction[object] = getattr(plugin, name)
hookimpl = HookImpl(plugin, plugin_name, method, hookimpl_opts)
name = hookimpl_opts.get("specname") or name
hook: HookCaller | None = getattr(self.hook, name, None)
if hook is None:
hook = HookCaller(name, self._hookexec)
setattr(self.hook, name, hook)
elif hook.has_spec():
self._verify_hook(hook, hookimpl)
hook._maybe_apply_history(hookimpl)
hook._add_hookimpl(hookimpl)
return plugin_name
def parse_hookimpl_opts(self, plugin: _Plugin, name: str) -> HookimplOpts | None:
"""Try to obtain a hook implementation from an item with the given name
in the given plugin which is being searched for hook impls.
:returns:
The parsed hookimpl options, or None to skip the given item.
This method can be overridden by ``PluginManager`` subclasses to
customize how hook implementation are picked up. By default, returns the
options for items decorated with :class:`HookimplMarker`.
"""
method: object = getattr(plugin, name)
if not inspect.isroutine(method):
return None
try:
res: HookimplOpts | None = getattr(
method, self.project_name + "_impl", None
)
except Exception:
res = {} # type: ignore[assignment]
if res is not None and not isinstance(res, dict):
# false positive
res = None # type:ignore[unreachable]
return res
def unregister(
self, plugin: _Plugin | None = None, name: str | None = None
) -> Any | None:
"""Unregister a plugin and all of its hook implementations.
The plugin can be specified either by the plugin object or the plugin
name. If both are specified, they must agree.
Returns the unregistered plugin, or ``None`` if not found.
"""
if name is None:
assert plugin is not None, "one of name or plugin needs to be specified"
name = self.get_name(plugin)
assert name is not None, "plugin is not registered"
if plugin is None:
plugin = self.get_plugin(name)
if plugin is None:
return None
hookcallers = self.get_hookcallers(plugin)
if hookcallers:
for hookcaller in hookcallers:
hookcaller._remove_plugin(plugin)
# if self._name2plugin[name] == None registration was blocked: ignore
if self._name2plugin.get(name):
assert name is not None
del self._name2plugin[name]
return plugin
def set_blocked(self, name: str) -> None:
"""Block registrations of the given name, unregister if already registered."""
self.unregister(name=name)
self._name2plugin[name] = None
def is_blocked(self, name: str) -> bool:
"""Return whether the given plugin name is blocked."""
return name in self._name2plugin and self._name2plugin[name] is None
def unblock(self, name: str) -> bool:
"""Unblocks a name.
Returns whether the name was actually blocked.
"""
if self._name2plugin.get(name, -1) is None:
del self._name2plugin[name]
return True
return False
def add_hookspecs(self, module_or_class: _Namespace) -> None:
"""Add new hook specifications defined in the given ``module_or_class``.
Functions are recognized as hook specifications if they have been
decorated with a matching :class:`HookspecMarker`.
"""
names = []
for name in dir(module_or_class):
spec_opts = self.parse_hookspec_opts(module_or_class, name)
if spec_opts is not None:
hc: HookCaller | None = getattr(self.hook, name, None)
if hc is None:
hc = HookCaller(name, self._hookexec, module_or_class, spec_opts)
setattr(self.hook, name, hc)
else:
# Plugins registered this hook without knowing the spec.
hc.set_specification(module_or_class, spec_opts)
for hookfunction in hc.get_hookimpls():
self._verify_hook(hc, hookfunction)
names.append(name)
if not names:
raise ValueError(
f"did not find any {self.project_name!r} hooks in {module_or_class!r}"
)
def parse_hookspec_opts(
self, module_or_class: _Namespace, name: str
) -> HookspecOpts | None:
"""Try to obtain a hook specification from an item with the given name
in the given module or class which is being searched for hook specs.
:returns:
The parsed hookspec options for defining a hook, or None to skip the
given item.
This method can be overridden by ``PluginManager`` subclasses to
customize how hook specifications are picked up. By default, returns the
options for items decorated with :class:`HookspecMarker`.
"""
method = getattr(module_or_class, name)
opts: HookspecOpts | None = getattr(method, self.project_name + "_spec", None)
return opts
def get_plugins(self) -> set[Any]:
"""Return a set of all registered plugin objects."""
return {x for x in self._name2plugin.values() if x is not None}
def is_registered(self, plugin: _Plugin) -> bool:
"""Return whether the plugin is already registered."""
return any(plugin == val for val in self._name2plugin.values())
def get_canonical_name(self, plugin: _Plugin) -> str:
"""Return a canonical name for a plugin object.
Note that a plugin may be registered under a different name
specified by the caller of :meth:`register(plugin, name) <register>`.
To obtain the name of a registered plugin use :meth:`get_name(plugin)
<get_name>` instead.
"""
name: str | None = getattr(plugin, "__name__", None)
return name or str(id(plugin))
def get_plugin(self, name: str) -> Any | None:
"""Return the plugin registered under the given name, if any."""
return self._name2plugin.get(name)
def has_plugin(self, name: str) -> bool:
"""Return whether a plugin with the given name is registered."""
return self.get_plugin(name) is not None
def get_name(self, plugin: _Plugin) -> str | None:
"""Return the name the plugin is registered under, or ``None`` if
is isn't."""
for name, val in self._name2plugin.items():
if plugin == val:
return name
return None
def _verify_hook(self, hook: HookCaller, hookimpl: HookImpl) -> None:
if hook.is_historic() and (hookimpl.hookwrapper or hookimpl.wrapper):
raise PluginValidationError(
hookimpl.plugin,
"Plugin %r\nhook %r\nhistoric incompatible with yield/wrapper/hookwrapper"
% (hookimpl.plugin_name, hook.name),
)
assert hook.spec is not None
if hook.spec.warn_on_impl:
_warn_for_function(hook.spec.warn_on_impl, hookimpl.function)
# positional arg checking
notinspec = set(hookimpl.argnames) - set(hook.spec.argnames)
if notinspec:
raise PluginValidationError(
hookimpl.plugin,
"Plugin %r for hook %r\nhookimpl definition: %s\n"
"Argument(s) %s are declared in the hookimpl but "
"can not be found in the hookspec"
% (
hookimpl.plugin_name,
hook.name,
_formatdef(hookimpl.function),
notinspec,
),
)
if hook.spec.warn_on_impl_args:
for hookimpl_argname in hookimpl.argnames:
argname_warning = hook.spec.warn_on_impl_args.get(hookimpl_argname)
if argname_warning is not None:
_warn_for_function(argname_warning, hookimpl.function)
if (
hookimpl.wrapper or hookimpl.hookwrapper
) and not inspect.isgeneratorfunction(hookimpl.function):
raise PluginValidationError(
hookimpl.plugin,
"Plugin %r for hook %r\nhookimpl definition: %s\n"
"Declared as wrapper=True or hookwrapper=True "
"but function is not a generator function"
% (hookimpl.plugin_name, hook.name, _formatdef(hookimpl.function)),
)
if hookimpl.wrapper and hookimpl.hookwrapper:
raise PluginValidationError(
hookimpl.plugin,
"Plugin %r for hook %r\nhookimpl definition: %s\n"
"The wrapper=True and hookwrapper=True options are mutually exclusive"
% (hookimpl.plugin_name, hook.name, _formatdef(hookimpl.function)),
)
def check_pending(self) -> None:
"""Verify that all hooks which have not been verified against a
hook specification are optional, otherwise raise
:exc:`PluginValidationError`."""
for name in self.hook.__dict__:
if name[0] != "_":
hook: HookCaller = getattr(self.hook, name)
if not hook.has_spec():
for hookimpl in hook.get_hookimpls():
if not hookimpl.optionalhook:
raise PluginValidationError(
hookimpl.plugin,
"unknown hook %r in plugin %r"
% (name, hookimpl.plugin),
)
def load_setuptools_entrypoints(self, group: str, name: str | None = None) -> int:
"""Load modules from querying the specified setuptools ``group``.
:param group:
Entry point group to load plugins.
:param name:
If given, loads only plugins with the given ``name``.
:return:
The number of plugins loaded by this call.
"""
import importlib.metadata
count = 0
for dist in list(importlib.metadata.distributions()):
for ep in dist.entry_points:
if (
ep.group != group
or (name is not None and ep.name != name)
# already registered
or self.get_plugin(ep.name)
or self.is_blocked(ep.name)
):
continue
plugin = ep.load()
self.register(plugin, name=ep.name)
self._plugin_distinfo.append((plugin, DistFacade(dist)))
count += 1
return count
def list_plugin_distinfo(self) -> list[tuple[_Plugin, DistFacade]]:
"""Return a list of (plugin, distinfo) pairs for all
setuptools-registered plugins."""
return list(self._plugin_distinfo)
def list_name_plugin(self) -> list[tuple[str, _Plugin]]:
"""Return a list of (name, plugin) pairs for all registered plugins."""
return list(self._name2plugin.items())
def get_hookcallers(self, plugin: _Plugin) -> list[HookCaller] | None:
"""Get all hook callers for the specified plugin.
:returns:
The hook callers, or ``None`` if ``plugin`` is not registered in
this plugin manager.
"""
if self.get_name(plugin) is None:
return None
hookcallers = []
for hookcaller in self.hook.__dict__.values():
for hookimpl in hookcaller.get_hookimpls():
if hookimpl.plugin is plugin:
hookcallers.append(hookcaller)
return hookcallers
def add_hookcall_monitoring(
self, before: _BeforeTrace, after: _AfterTrace
) -> Callable[[], None]:
"""Add before/after tracing functions for all hooks.
Returns an undo function which, when called, removes the added tracers.
``before(hook_name, hook_impls, kwargs)`` will be called ahead
of all hook calls and receive a hookcaller instance, a list
of HookImpl instances and the keyword arguments for the hook call.
``after(outcome, hook_name, hook_impls, kwargs)`` receives the
same arguments as ``before`` but also a :class:`~pluggy.Result` object
which represents the result of the overall hook call.
"""
oldcall = self._inner_hookexec
def traced_hookexec(
hook_name: str,
hook_impls: Sequence[HookImpl],
caller_kwargs: Mapping[str, object],
firstresult: bool,
) -> object | list[object]:
before(hook_name, hook_impls, caller_kwargs)
outcome = Result.from_call(
lambda: oldcall(hook_name, hook_impls, caller_kwargs, firstresult)
)
after(outcome, hook_name, hook_impls, caller_kwargs)
return outcome.get_result()
self._inner_hookexec = traced_hookexec
def undo() -> None:
self._inner_hookexec = oldcall
return undo
def enable_tracing(self) -> Callable[[], None]:
"""Enable tracing of hook calls.
Returns an undo function which, when called, removes the added tracing.
"""
hooktrace = self.trace.root.get("hook")
def before(
hook_name: str, methods: Sequence[HookImpl], kwargs: Mapping[str, object]
) -> None:
hooktrace.root.indent += 1
hooktrace(hook_name, kwargs)
def after(
outcome: Result[object],
hook_name: str,
methods: Sequence[HookImpl],
kwargs: Mapping[str, object],
) -> None:
if outcome.exception is None:
hooktrace("finish", hook_name, "-->", outcome.get_result())
hooktrace.root.indent -= 1
return self.add_hookcall_monitoring(before, after)
def subset_hook_caller(
self, name: str, remove_plugins: Iterable[_Plugin]
) -> HookCaller:
"""Return a proxy :class:`~pluggy.HookCaller` instance for the named
method which manages calls to all registered plugins except the ones
from remove_plugins."""
orig: HookCaller = getattr(self.hook, name)
plugins_to_remove = {plug for plug in remove_plugins if hasattr(plug, name)}
if plugins_to_remove:
return _SubsetHookCaller(orig, plugins_to_remove)
return orig
def _formatdef(func: Callable[..., object]) -> str:
return f"{func.__name__}{inspect.signature(func)}"

View File

@@ -0,0 +1,104 @@
"""
Hook wrapper "result" utilities.
"""
from __future__ import annotations
from types import TracebackType
from typing import Callable
from typing import cast
from typing import final
from typing import Generic
from typing import Optional
from typing import Tuple
from typing import Type
from typing import TypeVar
_ExcInfo = Tuple[Type[BaseException], BaseException, Optional[TracebackType]]
ResultType = TypeVar("ResultType")
class HookCallError(Exception):
"""Hook was called incorrectly."""
@final
class Result(Generic[ResultType]):
"""An object used to inspect and set the result in a :ref:`hook wrapper
<hookwrappers>`."""
__slots__ = ("_result", "_exception")
def __init__(
self,
result: ResultType | None,
exception: BaseException | None,
) -> None:
""":meta private:"""
self._result = result
self._exception = exception
@property
def excinfo(self) -> _ExcInfo | None:
""":meta private:"""
exc = self._exception
if exc is None:
return None
else:
return (type(exc), exc, exc.__traceback__)
@property
def exception(self) -> BaseException | None:
""":meta private:"""
return self._exception
@classmethod
def from_call(cls, func: Callable[[], ResultType]) -> Result[ResultType]:
""":meta private:"""
__tracebackhide__ = True
result = exception = None
try:
result = func()
except BaseException as exc:
exception = exc
return cls(result, exception)
def force_result(self, result: ResultType) -> None:
"""Force the result(s) to ``result``.
If the hook was marked as a ``firstresult`` a single value should
be set, otherwise set a (modified) list of results. Any exceptions
found during invocation will be deleted.
This overrides any previous result or exception.
"""
self._result = result
self._exception = None
def force_exception(self, exception: BaseException) -> None:
"""Force the result to fail with ``exception``.
This overrides any previous result or exception.
.. versionadded:: 1.1.0
"""
self._result = None
self._exception = exception
def get_result(self) -> ResultType:
"""Get the result(s) for this hook call.
If the hook was marked as a ``firstresult`` only a single value
will be returned, otherwise a list of results.
"""
__tracebackhide__ = True
exc = self._exception
if exc is None:
return cast(ResultType, self._result)
else:
raise exc.with_traceback(exc.__traceback__)
# Historical name (pluggy<=1.2), kept for backward compatibility.
_Result = Result

View File

@@ -0,0 +1,73 @@
"""
Tracing utils
"""
from __future__ import annotations
from typing import Any
from typing import Callable
from typing import Sequence
from typing import Tuple
_Writer = Callable[[str], object]
_Processor = Callable[[Tuple[str, ...], Tuple[Any, ...]], object]
class TagTracer:
def __init__(self) -> None:
self._tags2proc: dict[tuple[str, ...], _Processor] = {}
self._writer: _Writer | None = None
self.indent = 0
def get(self, name: str) -> TagTracerSub:
return TagTracerSub(self, (name,))
def _format_message(self, tags: Sequence[str], args: Sequence[object]) -> str:
if isinstance(args[-1], dict):
extra = args[-1]
args = args[:-1]
else:
extra = {}
content = " ".join(map(str, args))
indent = " " * self.indent
lines = ["{}{} [{}]\n".format(indent, content, ":".join(tags))]
for name, value in extra.items():
lines.append(f"{indent} {name}: {value}\n")
return "".join(lines)
def _processmessage(self, tags: tuple[str, ...], args: tuple[object, ...]) -> None:
if self._writer is not None and args:
self._writer(self._format_message(tags, args))
try:
processor = self._tags2proc[tags]
except KeyError:
pass
else:
processor(tags, args)
def setwriter(self, writer: _Writer | None) -> None:
self._writer = writer
def setprocessor(self, tags: str | tuple[str, ...], processor: _Processor) -> None:
if isinstance(tags, str):
tags = tuple(tags.split(":"))
else:
assert isinstance(tags, tuple)
self._tags2proc[tags] = processor
class TagTracerSub:
def __init__(self, root: TagTracer, tags: tuple[str, ...]) -> None:
self.root = root
self.tags = tags
def __call__(self, *args: object) -> None:
self.root._processmessage(self.tags, args)
def get(self, name: str) -> TagTracerSub:
return self.__class__(self.root, self.tags + (name,))

View File

@@ -0,0 +1,16 @@
# file generated by setuptools_scm
# don't change, don't track in version control
TYPE_CHECKING = False
if TYPE_CHECKING:
from typing import Tuple, Union
VERSION_TUPLE = Tuple[Union[int, str], ...]
else:
VERSION_TUPLE = object
version: str
__version__: str
__version_tuple__: VERSION_TUPLE
version_tuple: VERSION_TUPLE
__version__ = version = '1.5.0'
__version_tuple__ = version_tuple = (1, 5, 0)

View File

@@ -0,0 +1,27 @@
from typing import final
class PluggyWarning(UserWarning):
"""Base class for all warnings emitted by pluggy."""
__module__ = "pluggy"
@final
class PluggyTeardownRaisedWarning(PluggyWarning):
"""A plugin raised an exception during an :ref:`old-style hookwrapper
<old_style_hookwrappers>` teardown.
Such exceptions are not handled by pluggy, and may cause subsequent
teardowns to be executed at unexpected times, or be skipped entirely.
This is an issue in the plugin implementation.
If the exception is unintended, fix the underlying cause.
If the exception is intended, switch to :ref:`new-style hook wrappers
<hookwrappers>`, or use :func:`result.force_exception()
<pluggy.Result.force_exception>` to set the exception instead of raising.
"""
__module__ = "pluggy"

View File