Skip to content

SmtpPlugin

SmtpPlugin

SmtpPlugin(verifier)

Bases: StateMachinePlugin

SMTP interception plugin.

Replaces smtplib.SMTP with _FakeSMTP at activate() time and restores the original at deactivate() time. Uses reference counting so nested sandboxes work correctly.

States: disconnected -> connected -> greeted -> (authenticated|sending) -> closed Note: starttls is a self-loop on 'greeted' (optional). login transitions greeted -> authenticated.

Source code in src/tripwire/plugins/smtp_plugin.py
def __init__(self, verifier: "StrictVerifier") -> None:
    super().__init__(verifier)
    self._connect_sentinel = _StepSentinel(_SOURCE_CONNECT)
    self._ehlo_sentinel = _StepSentinel(_SOURCE_EHLO)
    self._helo_sentinel = _StepSentinel(_SOURCE_HELO)
    self._starttls_sentinel = _StepSentinel(_SOURCE_STARTTLS)
    self._login_sentinel = _StepSentinel(_SOURCE_LOGIN)
    self._sendmail_sentinel = _StepSentinel(_SOURCE_SENDMAIL)
    self._send_message_sentinel = _StepSentinel(_SOURCE_SEND_MESSAGE)
    self._quit_sentinel = _StepSentinel(_SOURCE_QUIT)

install_patches

install_patches()

Install smtplib.SMTP patch.

Source code in src/tripwire/plugins/smtp_plugin.py
def install_patches(self) -> None:
    """Install smtplib.SMTP patch."""
    SmtpPlugin._original_smtp = smtplib.SMTP
    setattr(smtplib, "SMTP", _FakeSMTP)

restore_patches

restore_patches()

Restore original smtplib.SMTP.

Source code in src/tripwire/plugins/smtp_plugin.py
def restore_patches(self) -> None:
    """Restore original smtplib.SMTP."""
    if SmtpPlugin._original_smtp is not None:
        setattr(smtplib, "SMTP", SmtpPlugin._original_smtp)
        SmtpPlugin._original_smtp = None

matches

matches(interaction, expected)

Field-by-field comparison with dirty-equals support.

Source code in src/tripwire/plugins/smtp_plugin.py
def matches(self, interaction: Interaction, expected: dict[str, Any]) -> bool:
    """Field-by-field comparison with dirty-equals support."""
    try:
        for key, expected_val in expected.items():
            actual_val = interaction.details.get(key)
            if expected_val != actual_val:
                return False
        return True
    except Exception:
        return False

assertable_fields

assertable_fields(interaction)

Return assertable fields for each step type.

Source code in src/tripwire/plugins/smtp_plugin.py
def assertable_fields(self, interaction: Interaction) -> frozenset[str]:
    """Return assertable fields for each step type."""
    no_data = {_SOURCE_STARTTLS, _SOURCE_QUIT}
    if interaction.source_id in no_data:
        return frozenset()
    return frozenset(interaction.details.keys())