Skip to content

ElasticsearchPlugin

ElasticsearchPlugin

ElasticsearchPlugin(verifier)

Bases: BasePlugin

Elasticsearch interception plugin.

Patches elasticsearch.Elasticsearch methods at the class level. Uses reference counting so nested sandboxes work correctly.

Source code in src/tripwire/plugins/elasticsearch_plugin.py
def __init__(self, verifier: StrictVerifier) -> None:
    super().__init__(verifier)
    self._queues: dict[str, deque[ElasticsearchMockConfig]] = {}
    self._registry_lock: threading.Lock = threading.Lock()

mock_operation

mock_operation(operation, *, returns, raises=None, required=True)

Register a mock for a single Elasticsearch operation invocation.

Source code in src/tripwire/plugins/elasticsearch_plugin.py
def mock_operation(
    self,
    operation: str,
    *,
    returns: Any,  # noqa: ANN401
    raises: BaseException | None = None,
    required: bool = True,
) -> None:
    """Register a mock for a single Elasticsearch operation invocation."""
    config = ElasticsearchMockConfig(
        operation=operation,
        returns=returns,
        raises=raises,
        required=required,
    )
    with self._registry_lock:
        if operation not in self._queues:
            self._queues[operation] = deque()
        self._queues[operation].append(config)

install_patches

install_patches()

Install Elasticsearch method patches.

Source code in src/tripwire/plugins/elasticsearch_plugin.py
def install_patches(self) -> None:
    """Install Elasticsearch method patches."""
    if not _ELASTICSEARCH_AVAILABLE:
        raise ImportError(
            "Install python-tripwire[elasticsearch] to use ElasticsearchPlugin: "
            "pip install python-tripwire[elasticsearch]"
        )
    es_cls = es_lib.Elasticsearch

    # Patch __init__ to capture connection metadata
    if ElasticsearchPlugin._original_init is None:
        ElasticsearchPlugin._original_init = es_cls.__init__

        def _patched_init(self_: object, *args: Any, **kwargs: Any) -> None:  # noqa: ANN401
            assert ElasticsearchPlugin._original_init is not None
            ElasticsearchPlugin._original_init(self_, *args, **kwargs)
            # Elasticsearch client accepts hosts as str, list of str, or list of dicts
            hosts = args[0] if args else kwargs.get("hosts", kwargs.get("host", "localhost"))
            host, port = "localhost", 9200
            if isinstance(hosts, str):
                if ":" in hosts and not hosts.startswith("["):
                    parts = hosts.rsplit(":", 1)
                    host = parts[0]
                    try:
                        port = int(parts[1].rstrip("/"))
                    except ValueError:
                        pass
                else:
                    host = hosts.rstrip("/")
            elif isinstance(hosts, (list, tuple)) and hosts:
                first = hosts[0]
                if isinstance(first, dict):
                    host = first.get("host", "localhost")
                    port = first.get("port", 9200)
                elif isinstance(first, str):
                    if ":" in first and not first.startswith("["):
                        parts = first.rsplit(":", 1)
                        host = parts[0]
                        try:
                            port = int(parts[1].rstrip("/"))
                        except ValueError:
                            pass
                    else:
                        host = first.rstrip("/")
            # Strip scheme if present (e.g., "http://localhost")
            if "://" in str(host):
                from urllib.parse import urlparse  # noqa: PLC0415
                parsed = urlparse(str(host) if not str(host).endswith("/") else str(host))
                host = parsed.hostname or "localhost"
                if parsed.port:
                    port = parsed.port
            _es_conn_meta[self_] = (normalize_host(str(host)), int(port))

        es_cls.__init__ = _patched_init  # type: ignore[assignment,method-assign]

    for method_name in _INTERCEPTED_METHODS:
        ElasticsearchPlugin._originals[method_name] = getattr(es_cls, method_name)
        setattr(es_cls, method_name, _make_interceptor(method_name))

restore_patches

restore_patches()

Restore original Elasticsearch methods.

Source code in src/tripwire/plugins/elasticsearch_plugin.py
def restore_patches(self) -> None:
    """Restore original Elasticsearch methods."""
    es_cls = es_lib.Elasticsearch
    for method_name, original in ElasticsearchPlugin._originals.items():
        setattr(es_cls, method_name, original)
    ElasticsearchPlugin._originals.clear()
    if ElasticsearchPlugin._original_init is not None:
        es_cls.__init__ = ElasticsearchPlugin._original_init  # type: ignore[method-assign]
        ElasticsearchPlugin._original_init = None

assert_index

assert_index(*, index, document, id=None, **extra)

Assert the next index interaction.

Source code in src/tripwire/plugins/elasticsearch_plugin.py
def assert_index(  # noqa: A002
    self, *, index: str, document: Any, id: str | None = None,  # noqa: ANN401
    **extra: Any,  # noqa: ANN401
) -> None:
    """Assert the next index interaction."""
    from tripwire._context import _get_test_verifier_or_raise  # noqa: PLC0415

    sentinel = _ElasticsearchSentinel("index")
    kwargs: dict[str, Any] = {"index": index, "document": document}
    if id is not None:
        kwargs["id"] = id
    kwargs.update(extra)
    _get_test_verifier_or_raise().assert_interaction(sentinel, **kwargs)
assert_search(*, index=None, query=None, size=None, from_=None, **extra)

Assert the next search interaction.

Source code in src/tripwire/plugins/elasticsearch_plugin.py
def assert_search(
    self, *, index: str | None = None, query: Any = None,  # noqa: ANN401
    size: int | None = None, from_: int | None = None,
    **extra: Any,  # noqa: ANN401
) -> None:
    """Assert the next search interaction."""
    from tripwire._context import _get_test_verifier_or_raise  # noqa: PLC0415

    sentinel = _ElasticsearchSentinel("search")
    kwargs: dict[str, Any] = {}
    if index is not None:
        kwargs["index"] = index
    if query is not None:
        kwargs["query"] = query
    if size is not None:
        kwargs["size"] = size
    if from_ is not None:
        kwargs["from_"] = from_
    kwargs.update(extra)
    _get_test_verifier_or_raise().assert_interaction(sentinel, **kwargs)

assert_get

assert_get(*, index, id, **extra)

Assert the next get interaction.

Source code in src/tripwire/plugins/elasticsearch_plugin.py
def assert_get(self, *, index: str, id: str, **extra: Any) -> None:  # noqa: A002, ANN401
    """Assert the next get interaction."""
    from tripwire._context import _get_test_verifier_or_raise  # noqa: PLC0415

    sentinel = _ElasticsearchSentinel("get")
    kwargs: dict[str, Any] = {"index": index, "id": id}
    kwargs.update(extra)
    _get_test_verifier_or_raise().assert_interaction(sentinel, **kwargs)

assert_delete

assert_delete(*, index, id, **extra)

Assert the next delete interaction.

Source code in src/tripwire/plugins/elasticsearch_plugin.py
def assert_delete(self, *, index: str, id: str, **extra: Any) -> None:  # noqa: A002, ANN401
    """Assert the next delete interaction."""
    from tripwire._context import _get_test_verifier_or_raise  # noqa: PLC0415

    sentinel = _ElasticsearchSentinel("delete")
    kwargs: dict[str, Any] = {"index": index, "id": id}
    kwargs.update(extra)
    _get_test_verifier_or_raise().assert_interaction(sentinel, **kwargs)

assert_bulk

assert_bulk(*, operations, **extra)

Assert the next bulk interaction.

Source code in src/tripwire/plugins/elasticsearch_plugin.py
def assert_bulk(self, *, operations: Any, **extra: Any) -> None:  # noqa: ANN401
    """Assert the next bulk interaction."""
    from tripwire._context import _get_test_verifier_or_raise  # noqa: PLC0415

    sentinel = _ElasticsearchSentinel("bulk")
    kwargs: dict[str, Any] = {"operations": operations}
    kwargs.update(extra)
    _get_test_verifier_or_raise().assert_interaction(sentinel, **kwargs)