Bases: StateMachinePlugin
SSH (paramiko) interception plugin.
Replaces paramiko.SSHClient with _FakeSSHClient at activate() time and
restores the original at deactivate() time. Uses reference counting so
nested sandboxes work correctly.
States: disconnected -> connected -> closed
exec_command, open_sftp, and sftp_* are self-transitions on connected.
Source code in src/tripwire/plugins/ssh_plugin.py
| def __init__(self, verifier: StrictVerifier) -> None:
super().__init__(verifier)
self._connect_sentinel = _StepSentinel(_SOURCE_CONNECT)
self._exec_command_sentinel = _StepSentinel(_SOURCE_EXEC_COMMAND)
self._open_sftp_sentinel = _StepSentinel(_SOURCE_OPEN_SFTP)
self._sftp_get_sentinel = _StepSentinel(_SOURCE_SFTP_GET)
self._sftp_put_sentinel = _StepSentinel(_SOURCE_SFTP_PUT)
self._sftp_listdir_sentinel = _StepSentinel(_SOURCE_SFTP_LISTDIR)
self._sftp_stat_sentinel = _StepSentinel(_SOURCE_SFTP_STAT)
self._sftp_mkdir_sentinel = _StepSentinel(_SOURCE_SFTP_MKDIR)
self._sftp_remove_sentinel = _StepSentinel(_SOURCE_SFTP_REMOVE)
self._close_sentinel = _StepSentinel(_SOURCE_CLOSE)
|
install_patches
Install paramiko.SSHClient patch.
Source code in src/tripwire/plugins/ssh_plugin.py
| def install_patches(self) -> None:
"""Install paramiko.SSHClient patch."""
if not _PARAMIKO_AVAILABLE: # pragma: no cover
return
SshPlugin._original_ssh_client = paramiko_lib.SSHClient
paramiko_lib.SSHClient = _FakeSSHClient
|
restore_patches
Restore original paramiko.SSHClient.
Source code in src/tripwire/plugins/ssh_plugin.py
| def restore_patches(self) -> None:
"""Restore original paramiko.SSHClient."""
if not _PARAMIKO_AVAILABLE: # pragma: no cover
return
if SshPlugin._original_ssh_client is not None:
paramiko_lib.SSHClient = SshPlugin._original_ssh_client
SshPlugin._original_ssh_client = None
|
matches
matches(interaction, expected)
Field-by-field comparison with dirty-equals support.
Source code in src/tripwire/plugins/ssh_plugin.py
| def matches(self, interaction: Interaction, expected: dict[str, Any]) -> bool:
"""Field-by-field comparison with dirty-equals support."""
try:
for key, expected_val in expected.items():
actual_val = interaction.details.get(key)
if expected_val != actual_val:
return False
return True
except Exception:
return False
|
assertable_fields
assertable_fields(interaction)
Return all detail keys as assertable fields.
Steps with no data fields (open_sftp, close) record empty details,
so this naturally returns frozenset() for those steps.
Source code in src/tripwire/plugins/ssh_plugin.py
| def assertable_fields(self, interaction: Interaction) -> frozenset[str]:
"""Return all detail keys as assertable fields.
Steps with no data fields (open_sftp, close) record empty details,
so this naturally returns frozenset() for those steps.
"""
return frozenset(interaction.details.keys())
|