Skip to content

PikaPlugin

PikaPlugin

PikaPlugin(verifier)

Bases: StateMachinePlugin

Pika (RabbitMQ) interception plugin.

Replaces pika.BlockingConnection with _FakeBlockingConnection at activate() time and restores the original at deactivate() time. Uses reference counting so nested sandboxes work correctly.

States: disconnected -> connected -> channel_open -> closed close is also valid from connected (skipping channel_open).

Source code in src/tripwire/plugins/pika_plugin.py
def __init__(self, verifier: StrictVerifier) -> None:
    super().__init__(verifier)
    self._connect_sentinel = _StepSentinel(_SOURCE_CONNECT)
    self._channel_sentinel = _StepSentinel(_SOURCE_CHANNEL)
    self._publish_sentinel = _StepSentinel(_SOURCE_PUBLISH)
    self._consume_sentinel = _StepSentinel(_SOURCE_CONSUME)
    self._ack_sentinel = _StepSentinel(_SOURCE_ACK)
    self._nack_sentinel = _StepSentinel(_SOURCE_NACK)
    self._close_sentinel = _StepSentinel(_SOURCE_CLOSE)

install_patches

install_patches()

Install pika.BlockingConnection patch.

Source code in src/tripwire/plugins/pika_plugin.py
def install_patches(self) -> None:
    """Install pika.BlockingConnection patch."""
    if not _PIKA_AVAILABLE:  # pragma: no cover
        return
    PikaPlugin._original_blocking_connection = pika_lib.BlockingConnection
    pika_lib.BlockingConnection = _FakeBlockingConnection

restore_patches

restore_patches()

Restore original pika.BlockingConnection.

Source code in src/tripwire/plugins/pika_plugin.py
def restore_patches(self) -> None:
    """Restore original pika.BlockingConnection."""
    if not _PIKA_AVAILABLE:  # pragma: no cover
        return
    if PikaPlugin._original_blocking_connection is not None:
        pika_lib.BlockingConnection = PikaPlugin._original_blocking_connection
        PikaPlugin._original_blocking_connection = None

matches

matches(interaction, expected)

Field-by-field comparison with dirty-equals support.

Source code in src/tripwire/plugins/pika_plugin.py
def matches(self, interaction: Interaction, expected: dict[str, Any]) -> bool:
    """Field-by-field comparison with dirty-equals support."""
    try:
        for key, expected_val in expected.items():
            actual_val = interaction.details.get(key)
            if expected_val != actual_val:
                return False
        return True
    except Exception:
        return False

assertable_fields

assertable_fields(interaction)

Return assertable fields for each step type.

channel and close are state-transition-only steps with no data fields, so they return frozenset().

Source code in src/tripwire/plugins/pika_plugin.py
def assertable_fields(self, interaction: Interaction) -> frozenset[str]:
    """Return assertable fields for each step type.

    channel and close are state-transition-only steps with no data fields,
    so they return frozenset().
    """
    return frozenset(interaction.details.keys())