Bases: BasePlugin
Cryptography interception plugin.
Patches Fernet.encrypt, Fernet.decrypt, and rsa.generate_private_key.
Uses reference counting so nested sandboxes work correctly.
SECURITY: Actual plaintext, keys, and signatures are NOT stored in
interaction details. Only metadata (lengths, algorithm names, key sizes)
is recorded.
Source code in src/tripwire/plugins/crypto_plugin.py
| def __init__(self, verifier: StrictVerifier) -> None:
super().__init__(verifier)
self._queues: dict[str, deque[CryptoMockConfig]] = {}
self._registry_lock: threading.Lock = threading.Lock()
|
mock_encrypt
mock_encrypt(*, returns, raises=None, required=True)
Register a mock for Fernet.encrypt().
Source code in src/tripwire/plugins/crypto_plugin.py
| def mock_encrypt(
self,
*,
returns: Any, # noqa: ANN401
raises: BaseException | None = None,
required: bool = True,
) -> None:
"""Register a mock for Fernet.encrypt()."""
config = CryptoMockConfig(
operation="fernet_encrypt", returns=returns, raises=raises, required=required
)
with self._registry_lock:
if "fernet_encrypt" not in self._queues:
self._queues["fernet_encrypt"] = deque()
self._queues["fernet_encrypt"].append(config)
|
mock_decrypt
mock_decrypt(*, returns, raises=None, required=True)
Register a mock for Fernet.decrypt().
Source code in src/tripwire/plugins/crypto_plugin.py
| def mock_decrypt(
self,
*,
returns: Any, # noqa: ANN401
raises: BaseException | None = None,
required: bool = True,
) -> None:
"""Register a mock for Fernet.decrypt()."""
config = CryptoMockConfig(
operation="fernet_decrypt", returns=returns, raises=raises, required=required
)
with self._registry_lock:
if "fernet_decrypt" not in self._queues:
self._queues["fernet_decrypt"] = deque()
self._queues["fernet_decrypt"].append(config)
|
mock_generate_key
mock_generate_key(*, returns, raises=None, required=True)
Register a mock for rsa.generate_private_key().
Source code in src/tripwire/plugins/crypto_plugin.py
| def mock_generate_key(
self,
*,
returns: Any, # noqa: ANN401
raises: BaseException | None = None,
required: bool = True,
) -> None:
"""Register a mock for rsa.generate_private_key()."""
config = CryptoMockConfig(
operation="generate_key", returns=returns, raises=raises, required=required
)
with self._registry_lock:
if "generate_key" not in self._queues:
self._queues["generate_key"] = deque()
self._queues["generate_key"].append(config)
|
install_patches
Install cryptography Fernet and RSA patches.
Source code in src/tripwire/plugins/crypto_plugin.py
| def install_patches(self) -> None:
"""Install cryptography Fernet and RSA patches."""
if not _CRYPTOGRAPHY_AVAILABLE:
raise ImportError(
"Install python-tripwire[crypto] to use CryptoPlugin: "
"pip install python-tripwire[crypto]"
)
CryptoPlugin._original_encrypt = _Fernet.encrypt
CryptoPlugin._original_decrypt = _Fernet.decrypt
CryptoPlugin._original_generate_private_key = _rsa_mod.generate_private_key
setattr(_Fernet, "encrypt", _patched_fernet_encrypt)
setattr(_Fernet, "decrypt", _patched_fernet_decrypt)
_rsa_mod.generate_private_key = _patched_generate_private_key
|
restore_patches
Restore original cryptography functions.
Source code in src/tripwire/plugins/crypto_plugin.py
| def restore_patches(self) -> None:
"""Restore original cryptography functions."""
if CryptoPlugin._original_encrypt is not None:
setattr(_Fernet, "encrypt", CryptoPlugin._original_encrypt)
CryptoPlugin._original_encrypt = None
if CryptoPlugin._original_decrypt is not None:
setattr(_Fernet, "decrypt", CryptoPlugin._original_decrypt)
CryptoPlugin._original_decrypt = None
if CryptoPlugin._original_generate_private_key is not None:
_rsa_mod.generate_private_key = CryptoPlugin._original_generate_private_key
CryptoPlugin._original_generate_private_key = None
|
assert_encrypt
assert_encrypt(*, plaintext_length, **extra)
Assert the next Fernet.encrypt() interaction.
Source code in src/tripwire/plugins/crypto_plugin.py
| def assert_encrypt(self, *, plaintext_length: int, **extra: Any) -> None: # noqa: ANN401
"""Assert the next Fernet.encrypt() interaction."""
from tripwire._context import _get_test_verifier_or_raise # noqa: PLC0415
sentinel = _CryptoSentinel("fernet_encrypt")
_get_test_verifier_or_raise().assert_interaction(
sentinel, plaintext_length=plaintext_length, **extra
)
|
assert_decrypt
assert_decrypt(*, token, ttl=None, **extra)
Assert the next Fernet.decrypt() interaction.
Source code in src/tripwire/plugins/crypto_plugin.py
| def assert_decrypt(self, *, token: bytes | str, ttl: int | None = None, **extra: Any) -> None: # noqa: ANN401
"""Assert the next Fernet.decrypt() interaction."""
from tripwire._context import _get_test_verifier_or_raise # noqa: PLC0415
sentinel = _CryptoSentinel("fernet_decrypt")
_get_test_verifier_or_raise().assert_interaction(
sentinel, token=token, ttl=ttl, **extra
)
|
assert_generate_key
assert_generate_key(*, algorithm, key_size, **extra)
Assert the next rsa.generate_private_key() interaction.
Source code in src/tripwire/plugins/crypto_plugin.py
| def assert_generate_key(self, *, algorithm: str, key_size: int, **extra: Any) -> None: # noqa: ANN401
"""Assert the next rsa.generate_private_key() interaction."""
from tripwire._context import _get_test_verifier_or_raise # noqa: PLC0415
sentinel = _CryptoSentinel("generate_key")
_get_test_verifier_or_raise().assert_interaction(
sentinel, algorithm=algorithm, key_size=key_size, **extra
)
|