Skip to content

MongoPlugin

MongoPlugin

MongoPlugin(verifier)

Bases: BasePlugin

MongoDB interception plugin.

Patches pymongo.collection.Collection methods at the class level. Uses reference counting so nested sandboxes work correctly.

Each operation name has its own FIFO deque of MongoMockConfig objects.

Source code in src/tripwire/plugins/mongo_plugin.py
def __init__(self, verifier: StrictVerifier) -> None:
    super().__init__(verifier)
    self._queues: dict[str, deque[MongoMockConfig]] = {}
    self._registry_lock: threading.Lock = threading.Lock()

mock_operation

mock_operation(operation, *, returns, raises=None, required=True)

Register a mock for a single MongoDB operation invocation.

Args: operation: The operation name (e.g., "find_one", "insert_one"). returns: Value to return when this mock is consumed (required even when raises is set, as it serves as the fallback value type). raises: If provided, this exception is raised instead of returning. required: If False, the mock is not reported as unused at teardown.

Source code in src/tripwire/plugins/mongo_plugin.py
def mock_operation(
    self,
    operation: str,
    *,
    returns: Any,  # noqa: ANN401
    raises: BaseException | None = None,
    required: bool = True,
) -> None:
    """Register a mock for a single MongoDB operation invocation.

    Args:
        operation: The operation name (e.g., "find_one", "insert_one").
        returns: Value to return when this mock is consumed (required even
            when raises is set, as it serves as the fallback value type).
        raises: If provided, this exception is raised instead of returning.
        required: If False, the mock is not reported as unused at teardown.
    """
    config = MongoMockConfig(
        operation=operation,
        returns=returns,
        raises=raises,
        required=required,
    )
    with self._registry_lock:
        if operation not in self._queues:
            self._queues[operation] = deque()
        self._queues[operation].append(config)

install_patches

install_patches()

Install pymongo Collection method patches.

Source code in src/tripwire/plugins/mongo_plugin.py
def install_patches(self) -> None:
    """Install pymongo Collection method patches."""
    if not _PYMONGO_AVAILABLE:
        raise ImportError(
            "Install python-tripwire[mongo] to use MongoPlugin: "
            "pip install python-tripwire[mongo]"
        )

    # Patch MongoClient.__init__ to capture connection metadata
    if MongoPlugin._original_client_init is None:
        MongoPlugin._original_client_init = pymongo.MongoClient.__init__

        def _patched_client_init(self_: object, *args: Any, **kwargs: Any) -> None:  # noqa: ANN401
            assert MongoPlugin._original_client_init is not None
            MongoPlugin._original_client_init(self_, *args, **kwargs)
            host_arg = args[0] if args else kwargs.get("host", "localhost")
            port_arg = kwargs.get("port") or (args[1] if len(args) > 1 else 27017)
            host = "localhost"
            port = int(port_arg)
            if isinstance(host_arg, list):
                # pymongo accepts a list of host strings; use the first.
                host_arg = host_arg[0] if host_arg else "localhost"
            if isinstance(host_arg, str):
                if host_arg.startswith(("mongodb://", "mongodb+srv://")):
                    from urllib.parse import urlparse  # noqa: PLC0415
                    parsed = urlparse(host_arg)
                    host = parsed.hostname or "localhost"
                    if parsed.port:
                        port = parsed.port
                else:
                    host = host_arg
            _mongo_conn_meta[self_] = (normalize_host(host), port)

        pymongo.MongoClient.__init__ = _patched_client_init  # type: ignore[assignment,method-assign]

    MongoPlugin._original_methods = {}
    for op in MongoPlugin._INTERCEPTED_OPERATIONS:
        MongoPlugin._original_methods[op] = getattr(
            pymongo.collection.Collection, op
        )
        setattr(
            pymongo.collection.Collection,
            op,
            _make_patched_method(op),
        )

restore_patches

restore_patches()

Restore original pymongo Collection methods.

Source code in src/tripwire/plugins/mongo_plugin.py
def restore_patches(self) -> None:
    """Restore original pymongo Collection methods."""
    if MongoPlugin._original_methods is not None:
        for method_name, original in MongoPlugin._original_methods.items():
            setattr(pymongo.collection.Collection, method_name, original)
        MongoPlugin._original_methods = None
    if MongoPlugin._original_client_init is not None:
        pymongo.MongoClient.__init__ = MongoPlugin._original_client_init  # type: ignore[method-assign]
        MongoPlugin._original_client_init = None

matches

matches(interaction, expected)

Field-by-field comparison with dirty-equals support.

Source code in src/tripwire/plugins/mongo_plugin.py
def matches(self, interaction: Interaction, expected: dict[str, Any]) -> bool:
    """Field-by-field comparison with dirty-equals support."""
    try:
        for key, expected_val in expected.items():
            actual_val = interaction.details.get(key)
            if expected_val != actual_val:
                return False
        return True
    except Exception:
        return False

get_unused_mocks

get_unused_mocks()

Return all MongoMockConfig with required=True still in any queue.

Source code in src/tripwire/plugins/mongo_plugin.py
def get_unused_mocks(self) -> list[MongoMockConfig]:
    """Return all MongoMockConfig with required=True still in any queue."""
    unused: list[MongoMockConfig] = []
    with self._registry_lock:
        for queue in self._queues.values():
            for config in queue:
                if config.required:
                    unused.append(config)
    return unused

assert_find

assert_find(database, collection, filter, projection=None)

Typed helper: assert the next find interaction.

Source code in src/tripwire/plugins/mongo_plugin.py
def assert_find(
    self,
    database: str,
    collection: str,
    filter: Any,  # noqa: A002, ANN401
    projection: Any = None,  # noqa: ANN401
) -> None:
    """Typed helper: assert the next find interaction."""
    self._assert_operation(
        "find",
        database=database,
        collection=collection,
        filter=filter,
        projection=projection,
    )

assert_find_one

assert_find_one(database, collection, filter, projection=None)

Typed helper: assert the next find_one interaction.

Source code in src/tripwire/plugins/mongo_plugin.py
def assert_find_one(
    self,
    database: str,
    collection: str,
    filter: Any,  # noqa: A002, ANN401
    projection: Any = None,  # noqa: ANN401
) -> None:
    """Typed helper: assert the next find_one interaction."""
    self._assert_operation(
        "find_one",
        database=database,
        collection=collection,
        filter=filter,
        projection=projection,
    )

assert_insert_one

assert_insert_one(database, collection, document)

Typed helper: assert the next insert_one interaction.

Source code in src/tripwire/plugins/mongo_plugin.py
def assert_insert_one(
    self,
    database: str,
    collection: str,
    document: Any,  # noqa: ANN401
) -> None:
    """Typed helper: assert the next insert_one interaction."""
    self._assert_operation(
        "insert_one",
        database=database,
        collection=collection,
        document=document,
    )

assert_insert_many

assert_insert_many(database, collection, documents)

Typed helper: assert the next insert_many interaction.

Source code in src/tripwire/plugins/mongo_plugin.py
def assert_insert_many(
    self,
    database: str,
    collection: str,
    documents: Any,  # noqa: ANN401
) -> None:
    """Typed helper: assert the next insert_many interaction."""
    self._assert_operation(
        "insert_many",
        database=database,
        collection=collection,
        documents=documents,
    )

assert_update_one

assert_update_one(database, collection, filter, update)

Typed helper: assert the next update_one interaction.

Source code in src/tripwire/plugins/mongo_plugin.py
def assert_update_one(
    self,
    database: str,
    collection: str,
    filter: Any,  # noqa: A002, ANN401
    update: Any,  # noqa: ANN401
) -> None:
    """Typed helper: assert the next update_one interaction."""
    self._assert_operation(
        "update_one",
        database=database,
        collection=collection,
        filter=filter,
        update=update,
    )

assert_update_many

assert_update_many(database, collection, filter, update)

Typed helper: assert the next update_many interaction.

Source code in src/tripwire/plugins/mongo_plugin.py
def assert_update_many(
    self,
    database: str,
    collection: str,
    filter: Any,  # noqa: A002, ANN401
    update: Any,  # noqa: ANN401
) -> None:
    """Typed helper: assert the next update_many interaction."""
    self._assert_operation(
        "update_many",
        database=database,
        collection=collection,
        filter=filter,
        update=update,
    )

assert_delete_one

assert_delete_one(database, collection, filter)

Typed helper: assert the next delete_one interaction.

Source code in src/tripwire/plugins/mongo_plugin.py
def assert_delete_one(
    self,
    database: str,
    collection: str,
    filter: Any,  # noqa: A002, ANN401
) -> None:
    """Typed helper: assert the next delete_one interaction."""
    self._assert_operation(
        "delete_one",
        database=database,
        collection=collection,
        filter=filter,
    )

assert_delete_many

assert_delete_many(database, collection, filter)

Typed helper: assert the next delete_many interaction.

Source code in src/tripwire/plugins/mongo_plugin.py
def assert_delete_many(
    self,
    database: str,
    collection: str,
    filter: Any,  # noqa: A002, ANN401
) -> None:
    """Typed helper: assert the next delete_many interaction."""
    self._assert_operation(
        "delete_many",
        database=database,
        collection=collection,
        filter=filter,
    )

assert_aggregate

assert_aggregate(database, collection, pipeline)

Typed helper: assert the next aggregate interaction.

Source code in src/tripwire/plugins/mongo_plugin.py
def assert_aggregate(
    self,
    database: str,
    collection: str,
    pipeline: Any,  # noqa: ANN401
) -> None:
    """Typed helper: assert the next aggregate interaction."""
    self._assert_operation(
        "aggregate",
        database=database,
        collection=collection,
        pipeline=pipeline,
    )

assert_count_documents

assert_count_documents(database, collection, filter)

Typed helper: assert the next count_documents interaction.

Source code in src/tripwire/plugins/mongo_plugin.py
def assert_count_documents(
    self,
    database: str,
    collection: str,
    filter: Any,  # noqa: A002, ANN401
) -> None:
    """Typed helper: assert the next count_documents interaction."""
    self._assert_operation(
        "count_documents",
        database=database,
        collection=collection,
        filter=filter,
    )