Skip to content

CryptoPlugin

CryptoPlugin

CryptoPlugin(verifier)

Bases: BasePlugin

Cryptography interception plugin.

Patches Fernet.encrypt, Fernet.decrypt, and rsa.generate_private_key. Uses reference counting so nested sandboxes work correctly.

SECURITY: Actual plaintext, keys, and signatures are NOT stored in interaction details. Only metadata (lengths, algorithm names, key sizes) is recorded.

Source code in src/tripwire/plugins/crypto_plugin.py
def __init__(self, verifier: StrictVerifier) -> None:
    super().__init__(verifier)
    self._queues: dict[str, deque[CryptoMockConfig]] = {}
    self._registry_lock: threading.Lock = threading.Lock()

mock_encrypt

mock_encrypt(*, returns, raises=None, required=True)

Register a mock for Fernet.encrypt().

Source code in src/tripwire/plugins/crypto_plugin.py
def mock_encrypt(
    self,
    *,
    returns: Any,  # noqa: ANN401
    raises: BaseException | None = None,
    required: bool = True,
) -> None:
    """Register a mock for Fernet.encrypt()."""
    config = CryptoMockConfig(
        operation="fernet_encrypt", returns=returns, raises=raises, required=required
    )
    with self._registry_lock:
        if "fernet_encrypt" not in self._queues:
            self._queues["fernet_encrypt"] = deque()
        self._queues["fernet_encrypt"].append(config)

mock_decrypt

mock_decrypt(*, returns, raises=None, required=True)

Register a mock for Fernet.decrypt().

Source code in src/tripwire/plugins/crypto_plugin.py
def mock_decrypt(
    self,
    *,
    returns: Any,  # noqa: ANN401
    raises: BaseException | None = None,
    required: bool = True,
) -> None:
    """Register a mock for Fernet.decrypt()."""
    config = CryptoMockConfig(
        operation="fernet_decrypt", returns=returns, raises=raises, required=required
    )
    with self._registry_lock:
        if "fernet_decrypt" not in self._queues:
            self._queues["fernet_decrypt"] = deque()
        self._queues["fernet_decrypt"].append(config)

mock_generate_key

mock_generate_key(*, returns, raises=None, required=True)

Register a mock for rsa.generate_private_key().

Source code in src/tripwire/plugins/crypto_plugin.py
def mock_generate_key(
    self,
    *,
    returns: Any,  # noqa: ANN401
    raises: BaseException | None = None,
    required: bool = True,
) -> None:
    """Register a mock for rsa.generate_private_key()."""
    config = CryptoMockConfig(
        operation="generate_key", returns=returns, raises=raises, required=required
    )
    with self._registry_lock:
        if "generate_key" not in self._queues:
            self._queues["generate_key"] = deque()
        self._queues["generate_key"].append(config)

install_patches

install_patches()

Install cryptography Fernet and RSA patches.

Source code in src/tripwire/plugins/crypto_plugin.py
def install_patches(self) -> None:
    """Install cryptography Fernet and RSA patches."""
    if not _CRYPTOGRAPHY_AVAILABLE:
        raise ImportError(
            "Install python-tripwire[crypto] to use CryptoPlugin: "
            "pip install python-tripwire[crypto]"
        )
    CryptoPlugin._original_encrypt = _Fernet.encrypt
    CryptoPlugin._original_decrypt = _Fernet.decrypt
    CryptoPlugin._original_generate_private_key = _rsa_mod.generate_private_key
    setattr(_Fernet, "encrypt", _patched_fernet_encrypt)
    setattr(_Fernet, "decrypt", _patched_fernet_decrypt)
    _rsa_mod.generate_private_key = _patched_generate_private_key

restore_patches

restore_patches()

Restore original cryptography functions.

Source code in src/tripwire/plugins/crypto_plugin.py
def restore_patches(self) -> None:
    """Restore original cryptography functions."""
    if CryptoPlugin._original_encrypt is not None:
        setattr(_Fernet, "encrypt", CryptoPlugin._original_encrypt)
        CryptoPlugin._original_encrypt = None
    if CryptoPlugin._original_decrypt is not None:
        setattr(_Fernet, "decrypt", CryptoPlugin._original_decrypt)
        CryptoPlugin._original_decrypt = None
    if CryptoPlugin._original_generate_private_key is not None:
        _rsa_mod.generate_private_key = CryptoPlugin._original_generate_private_key
        CryptoPlugin._original_generate_private_key = None

assert_encrypt

assert_encrypt(*, plaintext_length, **extra)

Assert the next Fernet.encrypt() interaction.

Source code in src/tripwire/plugins/crypto_plugin.py
def assert_encrypt(self, *, plaintext_length: int, **extra: Any) -> None:  # noqa: ANN401
    """Assert the next Fernet.encrypt() interaction."""
    from tripwire._context import _get_test_verifier_or_raise  # noqa: PLC0415

    sentinel = _CryptoSentinel("fernet_encrypt")
    _get_test_verifier_or_raise().assert_interaction(
        sentinel, plaintext_length=plaintext_length, **extra
    )

assert_decrypt

assert_decrypt(*, token, ttl=None, **extra)

Assert the next Fernet.decrypt() interaction.

Source code in src/tripwire/plugins/crypto_plugin.py
def assert_decrypt(self, *, token: bytes | str, ttl: int | None = None, **extra: Any) -> None:  # noqa: ANN401
    """Assert the next Fernet.decrypt() interaction."""
    from tripwire._context import _get_test_verifier_or_raise  # noqa: PLC0415

    sentinel = _CryptoSentinel("fernet_decrypt")
    _get_test_verifier_or_raise().assert_interaction(
        sentinel, token=token, ttl=ttl, **extra
    )

assert_generate_key

assert_generate_key(*, algorithm, key_size, **extra)

Assert the next rsa.generate_private_key() interaction.

Source code in src/tripwire/plugins/crypto_plugin.py
def assert_generate_key(self, *, algorithm: str, key_size: int, **extra: Any) -> None:  # noqa: ANN401
    """Assert the next rsa.generate_private_key() interaction."""
    from tripwire._context import _get_test_verifier_or_raise  # noqa: PLC0415

    sentinel = _CryptoSentinel("generate_key")
    _get_test_verifier_or_raise().assert_interaction(
        sentinel, algorithm=algorithm, key_size=key_size, **extra
    )