Skip to content

AsyncSubprocessPlugin

AsyncSubprocessPlugin

AsyncSubprocessPlugin(verifier)

Bases: StateMachinePlugin

Async subprocess interception plugin.

Replaces asyncio.create_subprocess_exec and asyncio.create_subprocess_shell with fake implementations at activate() time and restores the originals at deactivate() time. Uses reference counting so nested sandboxes work correctly.

States: created -> running -> terminated

Source code in src/tripwire/plugins/async_subprocess_plugin.py
def __init__(self, verifier: "StrictVerifier") -> None:
    super().__init__(verifier)
    self._spawn_sentinel = _StepSentinel(_SOURCE_SPAWN)
    self._communicate_sentinel = _StepSentinel(_SOURCE_COMMUNICATE)
    self._wait_sentinel = _StepSentinel(_SOURCE_WAIT)

install_patches

install_patches()

Install asyncio.create_subprocess_exec/shell patches.

Source code in src/tripwire/plugins/async_subprocess_plugin.py
def install_patches(self) -> None:
    """Install asyncio.create_subprocess_exec/shell patches."""
    global _tripwire_create_subprocess_exec, _tripwire_create_subprocess_shell

    AsyncSubprocessPlugin._original_exec = asyncio.create_subprocess_exec
    AsyncSubprocessPlugin._original_shell = asyncio.create_subprocess_shell

    async def _fake_create_subprocess_exec(
        program: str,
        *args: Any,  # noqa: ANN401
        **kwargs: Any,  # noqa: ANN401
    ) -> _AsyncFakeProcess | asyncio.subprocess.Process:
        try:
            command = [program, *[str(a) for a in args]]
            binary = program
            command_str = " ".join(command)
            fw_request = SubprocessFirewallRequest(command=command_str, binary=binary)
            plugin = _find_async_subprocess_plugin(firewall_request=fw_request)
        except GuardPassThrough:
            # GuardPassThrough means the firewall ALLOWed the call: defer
            # to the original asyncio.create_subprocess_exec, which spawns
            # a real child process and returns a real
            # asyncio.subprocess.Process. Returning that real Process here
            # is intentional; the prior cast(_AsyncFakeProcess, ...) lied
            # about the runtime type.
            proc_real: asyncio.subprocess.Process = await _ORIGINAL_CREATE_SUBPROCESS_EXEC(
                program, *args, **kwargs,
            )
            return proc_real
        proc = _AsyncFakeProcess()
        proc._plugin = plugin
        plugin._bind_connection(proc)
        command = [program, *[str(a) for a in args]]
        stdin = kwargs.get("stdin")
        plugin._execute_step(
            plugin._lookup_session(proc), "spawn", (program, *args), kwargs,
            _SOURCE_SPAWN,
            details={
                "command": command,
                "stdin": stdin if isinstance(stdin, (bytes, type(None))) else None,
            },
        )
        return proc

    async def _fake_create_subprocess_shell(
        cmd: str,
        **kwargs: Any,  # noqa: ANN401
    ) -> _AsyncFakeProcess | asyncio.subprocess.Process:
        try:
            binary = cmd.split()[0] if cmd else ""
            fw_request = SubprocessFirewallRequest(command=cmd, binary=binary)
            plugin = _find_async_subprocess_plugin(firewall_request=fw_request)
        except GuardPassThrough:
            # GuardPassThrough means the firewall ALLOWed the call: defer
            # to the original asyncio.create_subprocess_shell, which spawns
            # a real child process and returns a real
            # asyncio.subprocess.Process.
            proc_real: asyncio.subprocess.Process = await _ORIGINAL_CREATE_SUBPROCESS_SHELL(
                cmd, **kwargs,
            )
            return proc_real
        proc = _AsyncFakeProcess()
        proc._plugin = plugin
        plugin._bind_connection(proc)
        stdin = kwargs.get("stdin")
        plugin._execute_step(
            plugin._lookup_session(proc), "spawn", (cmd,), kwargs,
            _SOURCE_SPAWN,
            details={
                "command": cmd,
                "stdin": stdin if isinstance(stdin, (bytes, type(None))) else None,
            },
        )
        return proc

    _tripwire_create_subprocess_exec = _fake_create_subprocess_exec
    _tripwire_create_subprocess_shell = _fake_create_subprocess_shell

    setattr(asyncio, "create_subprocess_exec", _fake_create_subprocess_exec)
    setattr(asyncio, "create_subprocess_shell", _fake_create_subprocess_shell)

restore_patches

restore_patches()

Restore original asyncio.create_subprocess_exec/shell.

Source code in src/tripwire/plugins/async_subprocess_plugin.py
def restore_patches(self) -> None:
    """Restore original asyncio.create_subprocess_exec/shell."""
    global _tripwire_create_subprocess_exec, _tripwire_create_subprocess_shell

    if AsyncSubprocessPlugin._original_exec is not None:
        asyncio.create_subprocess_exec = AsyncSubprocessPlugin._original_exec
        AsyncSubprocessPlugin._original_exec = None
    if AsyncSubprocessPlugin._original_shell is not None:
        asyncio.create_subprocess_shell = AsyncSubprocessPlugin._original_shell
        AsyncSubprocessPlugin._original_shell = None
    _tripwire_create_subprocess_exec = None
    _tripwire_create_subprocess_shell = None

check_conflicts

check_conflicts()

Verify asyncio.create_subprocess_exec/shell have not been patched by a third party.

Source code in src/tripwire/plugins/async_subprocess_plugin.py
def check_conflicts(self) -> None:
    """Verify asyncio.create_subprocess_exec/shell have not been patched by a third party."""
    for target_name, current, original, tripwire_ref in [
        (
            "asyncio.create_subprocess_exec",
            asyncio.create_subprocess_exec,
            _ORIGINAL_CREATE_SUBPROCESS_EXEC,
            _tripwire_create_subprocess_exec,
        ),
        (
            "asyncio.create_subprocess_shell",
            asyncio.create_subprocess_shell,
            _ORIGINAL_CREATE_SUBPROCESS_SHELL,
            _tripwire_create_subprocess_shell,
        ),
    ]:
        if current is not original and current is not tripwire_ref:
            mod = getattr(current, "__module__", None) or ""
            qualname = getattr(current, "__qualname__", None) or ""
            if "unittest.mock" in mod or "MagicMock" in qualname:
                patcher = "unittest.mock"
            elif "pytest_mock" in mod:
                patcher = "pytest-mock"
            else:
                patcher = "an unknown library"
            raise ConflictError(target=target_name, patcher=patcher)

matches

matches(interaction, expected)

Field-by-field comparison with dirty-equals support.

Source code in src/tripwire/plugins/async_subprocess_plugin.py
def matches(self, interaction: Interaction, expected: dict[str, Any]) -> bool:
    """Field-by-field comparison with dirty-equals support."""
    try:
        for key, expected_val in expected.items():
            actual_val = interaction.details.get(key)
            if expected_val != actual_val:
                return False
        return True
    except Exception:
        return False

assertable_fields

assertable_fields(interaction)

Return assertable fields for each step type.

Source code in src/tripwire/plugins/async_subprocess_plugin.py
def assertable_fields(self, interaction: Interaction) -> frozenset[str]:
    """Return assertable fields for each step type."""
    if interaction.source_id == _SOURCE_WAIT:
        return frozenset()
    return frozenset(interaction.details.keys())