Skip to content

CeleryPlugin

CeleryPlugin

CeleryPlugin(verifier)

Bases: BasePlugin

Celery interception plugin.

Patches celery.app.task.Task.delay and Task.apply_async at the class level. Uses reference counting so nested sandboxes work correctly.

Source code in src/tripwire/plugins/celery_plugin.py
def __init__(self, verifier: StrictVerifier) -> None:
    super().__init__(verifier)
    self._queues: dict[str, deque[CeleryMockConfig]] = {}
    self._registry_lock: threading.Lock = threading.Lock()

mock_delay

mock_delay(task_name, *, returns, raises=None, required=True)

Register a mock for task.delay() dispatch.

Source code in src/tripwire/plugins/celery_plugin.py
def mock_delay(
    self,
    task_name: str,
    *,
    returns: Any,  # noqa: ANN401
    raises: BaseException | None = None,
    required: bool = True,
) -> None:
    """Register a mock for task.delay() dispatch."""
    config = CeleryMockConfig(
        task_name=task_name,
        dispatch_method="delay",
        returns=returns,
        raises=raises,
        required=required,
    )
    queue_key = f"{task_name}:delay"
    with self._registry_lock:
        if queue_key not in self._queues:
            self._queues[queue_key] = deque()
        self._queues[queue_key].append(config)

mock_apply_async

mock_apply_async(task_name, *, returns, raises=None, required=True)

Register a mock for task.apply_async() dispatch.

Source code in src/tripwire/plugins/celery_plugin.py
def mock_apply_async(
    self,
    task_name: str,
    *,
    returns: Any,  # noqa: ANN401
    raises: BaseException | None = None,
    required: bool = True,
) -> None:
    """Register a mock for task.apply_async() dispatch."""
    config = CeleryMockConfig(
        task_name=task_name,
        dispatch_method="apply_async",
        returns=returns,
        raises=raises,
        required=required,
    )
    queue_key = f"{task_name}:apply_async"
    with self._registry_lock:
        if queue_key not in self._queues:
            self._queues[queue_key] = deque()
        self._queues[queue_key].append(config)

install_patches

install_patches()

Install Celery Task.delay and Task.apply_async patches.

Source code in src/tripwire/plugins/celery_plugin.py
def install_patches(self) -> None:
    """Install Celery Task.delay and Task.apply_async patches."""
    if not _CELERY_AVAILABLE:
        raise ImportError(
            "Install python-tripwire[celery] to use CeleryPlugin: "
            "pip install python-tripwire[celery]"
        )
    from celery.app.task import Task

    CeleryPlugin._original_delay = Task.delay
    CeleryPlugin._original_apply_async = Task.apply_async
    Task.delay = _patched_delay
    Task.apply_async = _patched_apply_async

restore_patches

restore_patches()

Restore original Celery Task methods.

Source code in src/tripwire/plugins/celery_plugin.py
def restore_patches(self) -> None:
    """Restore original Celery Task methods."""
    from celery.app.task import Task

    if CeleryPlugin._original_delay is not None:
        Task.delay = CeleryPlugin._original_delay
        CeleryPlugin._original_delay = None
    if CeleryPlugin._original_apply_async is not None:
        Task.apply_async = CeleryPlugin._original_apply_async
        CeleryPlugin._original_apply_async = None

assert_delay

assert_delay(task_name, args, kwargs, options)

Typed helper: assert the next delay interaction.

Source code in src/tripwire/plugins/celery_plugin.py
def assert_delay(
    self,
    task_name: str,
    args: tuple[Any, ...],
    kwargs: dict[str, Any],
    options: dict[str, Any],
) -> None:
    """Typed helper: assert the next delay interaction."""
    from tripwire._context import _get_test_verifier_or_raise  # noqa: PLC0415

    source_id = f"celery:{task_name}:delay"
    sentinel = _CelerySentinel(source_id)
    _get_test_verifier_or_raise().assert_interaction(
        sentinel,
        task_name=task_name,
        dispatch_method="delay",
        args=args,
        kwargs=kwargs,
        options=options,
    )

assert_apply_async

assert_apply_async(task_name, args, kwargs, options)

Typed helper: assert the next apply_async interaction.

Source code in src/tripwire/plugins/celery_plugin.py
def assert_apply_async(
    self,
    task_name: str,
    args: tuple[Any, ...],
    kwargs: dict[str, Any],
    options: dict[str, Any],
) -> None:
    """Typed helper: assert the next apply_async interaction."""
    from tripwire._context import _get_test_verifier_or_raise  # noqa: PLC0415

    source_id = f"celery:{task_name}:apply_async"
    sentinel = _CelerySentinel(source_id)
    _get_test_verifier_or_raise().assert_interaction(
        sentinel,
        task_name=task_name,
        dispatch_method="apply_async",
        args=args,
        kwargs=kwargs,
        options=options,
    )