Bases: BasePlugin
Celery interception plugin.
Patches celery.app.task.Task.delay and Task.apply_async at the class level.
Uses reference counting so nested sandboxes work correctly.
Source code in src/tripwire/plugins/celery_plugin.py
| def __init__(self, verifier: StrictVerifier) -> None:
super().__init__(verifier)
self._queues: dict[str, deque[CeleryMockConfig]] = {}
self._registry_lock: threading.Lock = threading.Lock()
|
mock_delay
mock_delay(task_name, *, returns, raises=None, required=True)
Register a mock for task.delay() dispatch.
Source code in src/tripwire/plugins/celery_plugin.py
| def mock_delay(
self,
task_name: str,
*,
returns: Any, # noqa: ANN401
raises: BaseException | None = None,
required: bool = True,
) -> None:
"""Register a mock for task.delay() dispatch."""
config = CeleryMockConfig(
task_name=task_name,
dispatch_method="delay",
returns=returns,
raises=raises,
required=required,
)
queue_key = f"{task_name}:delay"
with self._registry_lock:
if queue_key not in self._queues:
self._queues[queue_key] = deque()
self._queues[queue_key].append(config)
|
mock_apply_async
mock_apply_async(task_name, *, returns, raises=None, required=True)
Register a mock for task.apply_async() dispatch.
Source code in src/tripwire/plugins/celery_plugin.py
| def mock_apply_async(
self,
task_name: str,
*,
returns: Any, # noqa: ANN401
raises: BaseException | None = None,
required: bool = True,
) -> None:
"""Register a mock for task.apply_async() dispatch."""
config = CeleryMockConfig(
task_name=task_name,
dispatch_method="apply_async",
returns=returns,
raises=raises,
required=required,
)
queue_key = f"{task_name}:apply_async"
with self._registry_lock:
if queue_key not in self._queues:
self._queues[queue_key] = deque()
self._queues[queue_key].append(config)
|
install_patches
Install Celery Task.delay and Task.apply_async patches.
Source code in src/tripwire/plugins/celery_plugin.py
| def install_patches(self) -> None:
"""Install Celery Task.delay and Task.apply_async patches."""
if not _CELERY_AVAILABLE:
raise ImportError(
"Install python-tripwire[celery] to use CeleryPlugin: "
"pip install python-tripwire[celery]"
)
from celery.app.task import Task
CeleryPlugin._original_delay = Task.delay
CeleryPlugin._original_apply_async = Task.apply_async
Task.delay = _patched_delay
Task.apply_async = _patched_apply_async
|
restore_patches
Restore original Celery Task methods.
Source code in src/tripwire/plugins/celery_plugin.py
| def restore_patches(self) -> None:
"""Restore original Celery Task methods."""
from celery.app.task import Task
if CeleryPlugin._original_delay is not None:
Task.delay = CeleryPlugin._original_delay
CeleryPlugin._original_delay = None
if CeleryPlugin._original_apply_async is not None:
Task.apply_async = CeleryPlugin._original_apply_async
CeleryPlugin._original_apply_async = None
|
assert_delay
assert_delay(task_name, args, kwargs, options)
Typed helper: assert the next delay interaction.
Source code in src/tripwire/plugins/celery_plugin.py
| def assert_delay(
self,
task_name: str,
args: tuple[Any, ...],
kwargs: dict[str, Any],
options: dict[str, Any],
) -> None:
"""Typed helper: assert the next delay interaction."""
from tripwire._context import _get_test_verifier_or_raise # noqa: PLC0415
source_id = f"celery:{task_name}:delay"
sentinel = _CelerySentinel(source_id)
_get_test_verifier_or_raise().assert_interaction(
sentinel,
task_name=task_name,
dispatch_method="delay",
args=args,
kwargs=kwargs,
options=options,
)
|
assert_apply_async
assert_apply_async(task_name, args, kwargs, options)
Typed helper: assert the next apply_async interaction.
Source code in src/tripwire/plugins/celery_plugin.py
| def assert_apply_async(
self,
task_name: str,
args: tuple[Any, ...],
kwargs: dict[str, Any],
options: dict[str, Any],
) -> None:
"""Typed helper: assert the next apply_async interaction."""
from tripwire._context import _get_test_verifier_or_raise # noqa: PLC0415
source_id = f"celery:{task_name}:apply_async"
sentinel = _CelerySentinel(source_id)
_get_test_verifier_or_raise().assert_interaction(
sentinel,
task_name=task_name,
dispatch_method="apply_async",
args=args,
kwargs=kwargs,
options=options,
)
|