Skip to content

SshPlugin

SshPlugin

SshPlugin(verifier)

Bases: StateMachinePlugin

SSH (paramiko) interception plugin.

Replaces paramiko.SSHClient with _FakeSSHClient at activate() time and restores the original at deactivate() time. Uses reference counting so nested sandboxes work correctly.

States: disconnected -> connected -> closed exec_command, open_sftp, and sftp_* are self-transitions on connected.

Source code in src/tripwire/plugins/ssh_plugin.py
def __init__(self, verifier: StrictVerifier) -> None:
    super().__init__(verifier)
    self._connect_sentinel = _StepSentinel(_SOURCE_CONNECT)
    self._exec_command_sentinel = _StepSentinel(_SOURCE_EXEC_COMMAND)
    self._open_sftp_sentinel = _StepSentinel(_SOURCE_OPEN_SFTP)
    self._sftp_get_sentinel = _StepSentinel(_SOURCE_SFTP_GET)
    self._sftp_put_sentinel = _StepSentinel(_SOURCE_SFTP_PUT)
    self._sftp_listdir_sentinel = _StepSentinel(_SOURCE_SFTP_LISTDIR)
    self._sftp_stat_sentinel = _StepSentinel(_SOURCE_SFTP_STAT)
    self._sftp_mkdir_sentinel = _StepSentinel(_SOURCE_SFTP_MKDIR)
    self._sftp_remove_sentinel = _StepSentinel(_SOURCE_SFTP_REMOVE)
    self._close_sentinel = _StepSentinel(_SOURCE_CLOSE)

install_patches

install_patches()

Install paramiko.SSHClient patch.

Source code in src/tripwire/plugins/ssh_plugin.py
def install_patches(self) -> None:
    """Install paramiko.SSHClient patch."""
    if not _PARAMIKO_AVAILABLE:  # pragma: no cover
        return
    SshPlugin._original_ssh_client = paramiko_lib.SSHClient
    paramiko_lib.SSHClient = _FakeSSHClient

restore_patches

restore_patches()

Restore original paramiko.SSHClient.

Source code in src/tripwire/plugins/ssh_plugin.py
def restore_patches(self) -> None:
    """Restore original paramiko.SSHClient."""
    if not _PARAMIKO_AVAILABLE:  # pragma: no cover
        return
    if SshPlugin._original_ssh_client is not None:
        paramiko_lib.SSHClient = SshPlugin._original_ssh_client
        SshPlugin._original_ssh_client = None

matches

matches(interaction, expected)

Field-by-field comparison with dirty-equals support.

Source code in src/tripwire/plugins/ssh_plugin.py
def matches(self, interaction: Interaction, expected: dict[str, Any]) -> bool:
    """Field-by-field comparison with dirty-equals support."""
    try:
        for key, expected_val in expected.items():
            actual_val = interaction.details.get(key)
            if expected_val != actual_val:
                return False
        return True
    except Exception:
        return False

assertable_fields

assertable_fields(interaction)

Return all detail keys as assertable fields.

Steps with no data fields (open_sftp, close) record empty details, so this naturally returns frozenset() for those steps.

Source code in src/tripwire/plugins/ssh_plugin.py
def assertable_fields(self, interaction: Interaction) -> frozenset[str]:
    """Return all detail keys as assertable fields.

    Steps with no data fields (open_sftp, close) record empty details,
    so this naturally returns frozenset() for those steps.
    """
    return frozenset(interaction.details.keys())