OpenSearch, eine Open-Source-Alternative zu Elasticsearch, ist eine leistungsstarke Such- und Analysemaschine, die für die Verarbeitung großer Datensätze mit Leichtigkeit. In diesem Blog werden wir demonstrieren, wie man grundlegende CRUD-Operationen (Erstellen, Lesen, Aktualisieren, Löschen) in OpenSearch mit Python ausführt.
lokale OpenSearch-Instanz. Unten finden Sie eine einfache docker-compose.yml-Datei, die OpenSearch und OpenSearch Dashboards startet.
version: '3' services: opensearch-test-node-1: image: opensearchproject/opensearch:2.13.0 container_name: opensearch-test-node-1 environment: - cluster.name=opensearch-test-cluster - node.name=opensearch-test-node-1 - discovery.seed_hosts=opensearch-test-node-1,opensearch-test-node-2 - cluster.initial_cluster_manager_nodes=opensearch-test-node-1,opensearch-test-node-2 - bootstrap.memory_lock=true - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" - "DISABLE_INSTALL_DEMO_CONFIG=true" - "DISABLE_SECURITY_PLUGIN=true" ulimits: memlock: soft: -1 hard: -1 nofile: soft: 65536 hard: 65536 volumes: - opensearch-test-data1:/usr/share/opensearch/data ports: - 9200:9200 - 9600:9600 networks: - opensearch-test-net opensearch-test-node-2: image: opensearchproject/opensearch:2.13.0 container_name: opensearch-test-node-2 environment: - cluster.name=opensearch-test-cluster - node.name=opensearch-test-node-2 - discovery.seed_hosts=opensearch-test-node-1,opensearch-test-node-2 - cluster.initial_cluster_manager_nodes=opensearch-test-node-1,opensearch-test-node-2 - bootstrap.memory_lock=true - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" - "DISABLE_INSTALL_DEMO_CONFIG=true" - "DISABLE_SECURITY_PLUGIN=true" ulimits: memlock: soft: -1 hard: -1 nofile: soft: 65536 hard: 65536 volumes: - opensearch-test-data2:/usr/share/opensearch/data networks: - opensearch-test-net opensearch-test-dashboards: image: opensearchproject/opensearch-dashboards:2.13.0 container_name: opensearch-test-dashboards ports: - 5601:5601 expose: - "5601" environment: - 'OPENSEARCH_HOSTS=["http://opensearch-test-node-1:9200","http://opensearch-test-node-2:9200"]' - "DISABLE_SECURITY_DASHBOARDS_PLUGIN=true" networks: - opensearch-test-net volumes: opensearch-test-data1: opensearch-test-data2: networks: opensearch-test-net:
Führen Sie den folgenden Befehl aus, um Ihre OpenSearch-Instanz aufzurufen:docker-compose up
OpenSearch wird unter http://localhost:9200.
zugänglich sein
python -m venv .venv source .venv/bin/activate pip install opensearch-py
Wir werden unser Projekt außerdem wie folgt strukturieren:
├── interfaces.py ├── main.py ├── searchservice.py ├── docker-compose.yml
In der Datei interfaces.py definieren wir unsere Ressourcen- und Ressourcenklassen. Diese helfen uns, verschiedene Ressourcentypen in OpenSearch (in diesem Fall Benutzer) dynamisch zu verarbeiten.
from dataclasses import dataclass, field @dataclass class Resource: name: str def __post_init__(self) -> None: self.name = self.name.lower() @dataclass class Resources: users: Resource = field(default_factory=lambda: Resource("Users"))
In searchservice.py definieren wir eine abstrakte Klasse SearchService, um die erforderlichen Vorgänge zu skizzieren. Die HTTPOpenSearchService-Klasse implementiert dann diese CRUD-Methoden und interagiert mit dem OpenSearch-Client.
# coding: utf-8 import abc import logging import typing as t from dataclasses import dataclass from uuid import UUID from interfaces import Resource, Resources from opensearchpy import NotFoundError, OpenSearch resources = Resources() class SearchService(abc.ABC): def search( self, kinds: t.List[Resource], tenants_id: UUID, companies_id: UUID, query: t.Dict[str, t.Any], ) -> t.Dict[t.Literal["hits"], t.Dict[str, t.Any]]: raise NotImplementedError def delete_index( self, kind: Resource, tenants_id: UUID, companies_id: UUID, data: t.Dict[str, t.Any], ) -> None: raise NotImplementedError def index( self, kind: Resource, tenants_id: UUID, companies_id: UUID, data: t.Dict[str, t.Any], ) -> t.Dict[str, t.Any]: raise NotImplementedError def delete_document( self, kind: Resource, tenants_id: UUID, companies_id: UUID, document_id: str, ) -> t.Optional[t.Dict[str, t.Any]]: raise NotImplementedError def create_index( self, kind: Resource, tenants_id: UUID, companies_id: UUID, data: t.Dict[str, t.Any], ) -> None: raise NotImplementedError @dataclass(frozen=True) class HTTPOpenSearchService(SearchService): client: OpenSearch def _gen_index( self, kind: Resource, tenants_id: UUID, companies_id: UUID, ) -> str: return ( f"tenant_{str(UUID(str(tenants_id)))}" f"_company_{str(UUID(str(companies_id)))}" f"_kind_{kind.name}" ) def index( self, kind: Resource, tenants_id: UUID, companies_id: UUID, data: t.Dict[str, t.Any], ) -> t.Dict[str, t.Any]: self.client.index( index=self._gen_index(kind, tenants_id, companies_id), body=data, id=data.get("id"), ) return data def delete_index( self, kind: Resource, tenants_id: UUID, companies_id: UUID, ) -> None: try: index = self._gen_index(kind, tenants_id, companies_id) if self.client.indices.exists(index): self.client.indices.delete(index) except NotFoundError: pass def create_index( self, kind: Resource, tenants_id: UUID, companies_id: UUID, ) -> None: body: t.Dict[str, t.Any] = {} self.client.indices.create( index=self._gen_index(kind, tenants_id, companies_id), body=body, ) def search( self, kinds: t.List[Resource], tenants_id: UUID, companies_id: UUID, query: t.Dict[str, t.Any], ) -> t.Dict[t.Literal["hits"], t.Dict[str, t.Any]]: return self.client.search( index=",".join( [self._gen_index(kind, tenants_id, companies_id) for kind in kinds] ), body={"query": query}, ) def delete_document( self, kind: Resource, tenants_id: UUID, companies_id: UUID, document_id: str, ) -> t.Optional[t.Dict[str, t.Any]]: try: response = self.client.delete( index=self._gen_index(kind, tenants_id, companies_id), id=document_id, ) return response except Exception as e: logging.error(f"Error deleting document: {e}") return None
In main.py zeigen wir, wie man:
Erstellen Sie einen
main.py- Index in OpenSearch.
- Indexdokumente mit Beispielbenutzerdaten.
- Suchennach Dokumenten basierend auf einer Abfrage.
- Löschenein Dokument anhand seiner ID.
# coding=utf-8 import logging import os import typing as t from uuid import uuid4 import searchservice from interfaces import Resources from opensearchpy import OpenSearch resources = Resources() logging.basicConfig(level=logging.INFO) search_service = searchservice.HTTPOpenSearchService( client=OpenSearch( hosts=[ { "host": os.getenv("OPENSEARCH_HOST", "localhost"), "port": os.getenv("OPENSEARCH_PORT", "9200"), } ], http_auth=( os.getenv("OPENSEARCH_USERNAME", ""), os.getenv("OPENSEARCH_PASSWORD", ""), ), use_ssl=False, verify_certs=False, ), ) tenants_id: str = "f0835e2d-bd68-406c-99a7-ad63a51e9ef9" companies_id: str = "bf58c749-c90a-41e2-b66f-6d98aae17a6c" search_str: str = "frank" document_id_to_delete: str = str(uuid4()) fake_data: t.List[t.Dict[str, t.Any]] = [ {"id": document_id_to_delete, "name": "Franklin", "tech": "python,node,golang"}, {"id": str(uuid4()), "name": "Jarvis", "tech": "AI"}, {"id": str(uuid4()), "name": "Parry", "tech": "Golang"}, {"id": str(uuid4()), "name": "Steve", "tech": "iOS"}, {"id": str(uuid4()), "name": "Frank", "tech": "node"}, ] search_service.delete_index( kind=resources.users, tenants_id=tenants_id, companies_id=companies_id ) search_service.create_index( kind=resources.users, tenants_id=tenants_id, companies_id=companies_id, ) for item in fake_data: search_service.index( kind=resources.users, tenants_id=tenants_id, companies_id=companies_id, data=dict(tenants_id=tenants_id, companies_id=companies_id, **item), ) search_query: t.Dict[str, t.Any] = { "bool": { "must": [], "must_not": [], "should": [], "filter": [ {"term": {"tenants_id.keyword": tenants_id}}, {"term": {"companies_id.keyword": companies_id}}, ], } } search_query["bool"]["must"].append( { "multi_match": { "query": search_str, "type": "phrase_prefix", "fields": ["name", "tech"], } } ) search_results = search_service.search( kinds=[resources.users], tenants_id=tenants_id, companies_id=companies_id, query=search_query, ) final_result = search_results.get("hits", {}).get("hits", []) for item in final_result: logging.info(["Item -> ", item.get("_source", {})]) deleted_result = search_service.delete_document( kind=resources.users, tenants_id=tenants_id, companies_id=companies_id, document_id=document_id_to_delete, ) logging.info(["Deleted result -> ", deleted_result])
Docker-Kompositionpython main.py
demonstriert, wie man OpenSearch lokal mit Docker einrichtet und grundlegende CRUD-Operationen mit Python. OpenSearch bietet eine leistungsstarke und skalierbare Lösung für die Verwaltung und Abfrage großer Datensätze. Während sich dieser Leitfaden auf die Integration von OpenSearch mit Dummy-Daten konzentriert, wird OpenSearch in realen Anwendungen häufig als leseoptimierter Speicher für Schnelleres verwendet Datenabruf. In solchen Fällen ist es üblich, verschiedene Indexierungsstrategien zu implementieren, um die Datenkonsistenz sicherzustellen, indem sowohl die Primärdatenbank als auch OpenSearch gleichzeitig aktualisiert werden. Dadurch wird sichergestellt, dass
OpenSearchmit Ihrer Primärdatenquelle synchron bleibt und sowohl Leistung als auch Genauigkeitoptimiert wird > beim Datenabruf.
Referenzen
Das obige ist der detaillierte Inhalt vonCRUD-Operationen mit OpenSearch in Python beherrschen: Ein praktischer Leitfaden. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!