OpenSearch, une alternative open source à Elasticsearch, est un puissant moteur de recherche et d'analyse conçu pour gérer des grands ensembles de données en toute simplicité. Dans ce blog, nous allons montrer comment effectuer des opérations CRUD de base (Créer, Lire, Mettre à jour, Supprimer) dans OpenSearch à l'aide de Python.
Pour commencer, nous avons besoin d'une instance OpenSearch locale. Vous trouverez ci-dessous un simple fichier docker-compose.yml qui fait tourner OpenSearch et OpenSearch Dashboards.
version: '3' services: opensearch-test-node-1: image: opensearchproject/opensearch:2.13.0 container_name: opensearch-test-node-1 environment: - cluster.name=opensearch-test-cluster - node.name=opensearch-test-node-1 - discovery.seed_hosts=opensearch-test-node-1,opensearch-test-node-2 - cluster.initial_cluster_manager_nodes=opensearch-test-node-1,opensearch-test-node-2 - bootstrap.memory_lock=true - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" - "DISABLE_INSTALL_DEMO_CONFIG=true" - "DISABLE_SECURITY_PLUGIN=true" ulimits: memlock: soft: -1 hard: -1 nofile: soft: 65536 hard: 65536 volumes: - opensearch-test-data1:/usr/share/opensearch/data ports: - 9200:9200 - 9600:9600 networks: - opensearch-test-net opensearch-test-node-2: image: opensearchproject/opensearch:2.13.0 container_name: opensearch-test-node-2 environment: - cluster.name=opensearch-test-cluster - node.name=opensearch-test-node-2 - discovery.seed_hosts=opensearch-test-node-1,opensearch-test-node-2 - cluster.initial_cluster_manager_nodes=opensearch-test-node-1,opensearch-test-node-2 - bootstrap.memory_lock=true - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" - "DISABLE_INSTALL_DEMO_CONFIG=true" - "DISABLE_SECURITY_PLUGIN=true" ulimits: memlock: soft: -1 hard: -1 nofile: soft: 65536 hard: 65536 volumes: - opensearch-test-data2:/usr/share/opensearch/data networks: - opensearch-test-net opensearch-test-dashboards: image: opensearchproject/opensearch-dashboards:2.13.0 container_name: opensearch-test-dashboards ports: - 5601:5601 expose: - "5601" environment: - 'OPENSEARCH_HOSTS=["http://opensearch-test-node-1:9200","http://opensearch-test-node-2:9200"]' - "DISABLE_SECURITY_DASHBOARDS_PLUGIN=true" networks: - opensearch-test-net volumes: opensearch-test-data1: opensearch-test-data2: networks: opensearch-test-net:
Exécutez la commande suivante pour afficher votre instance OpenSearch :
docker-composer
OpenSearch sera accessible sur http://localhost:9200.
python -m venv .venv source .venv/bin/activate pip install opensearch-py
Nous structurerons également notre projet comme suit :
├── interfaces.py ├── main.py ├── searchservice.py ├── docker-compose.yml
Dans le fichier interfaces.py, nous définissons nos classes Resource et Resources. Ceux-ci nous aideront à gérer dynamiquement différents types de ressources dans OpenSearch (dans ce cas, les utilisateurs).
from dataclasses import dataclass, field @dataclass class Resource: name: str def __post_init__(self) -> None: self.name = self.name.lower() @dataclass class Resources: users: Resource = field(default_factory=lambda: Resource("Users"))
Dans searchservice.py, nous définissons une classe abstraite SearchService pour décrire les opérations requises. La classe HTTPOpenSearchService implémente ensuite ces méthodes CRUD, en interagissant avec le client OpenSearch.
# coding: utf-8 import abc import logging import typing as t from dataclasses import dataclass from uuid import UUID from interfaces import Resource, Resources from opensearchpy import NotFoundError, OpenSearch resources = Resources() class SearchService(abc.ABC): def search( self, kinds: t.List[Resource], tenants_id: UUID, companies_id: UUID, query: t.Dict[str, t.Any], ) -> t.Dict[t.Literal["hits"], t.Dict[str, t.Any]]: raise NotImplementedError def delete_index( self, kind: Resource, tenants_id: UUID, companies_id: UUID, data: t.Dict[str, t.Any], ) -> None: raise NotImplementedError def index( self, kind: Resource, tenants_id: UUID, companies_id: UUID, data: t.Dict[str, t.Any], ) -> t.Dict[str, t.Any]: raise NotImplementedError def delete_document( self, kind: Resource, tenants_id: UUID, companies_id: UUID, document_id: str, ) -> t.Optional[t.Dict[str, t.Any]]: raise NotImplementedError def create_index( self, kind: Resource, tenants_id: UUID, companies_id: UUID, data: t.Dict[str, t.Any], ) -> None: raise NotImplementedError @dataclass(frozen=True) class HTTPOpenSearchService(SearchService): client: OpenSearch def _gen_index( self, kind: Resource, tenants_id: UUID, companies_id: UUID, ) -> str: return ( f"tenant_{str(UUID(str(tenants_id)))}" f"_company_{str(UUID(str(companies_id)))}" f"_kind_{kind.name}" ) def index( self, kind: Resource, tenants_id: UUID, companies_id: UUID, data: t.Dict[str, t.Any], ) -> t.Dict[str, t.Any]: self.client.index( index=self._gen_index(kind, tenants_id, companies_id), body=data, id=data.get("id"), ) return data def delete_index( self, kind: Resource, tenants_id: UUID, companies_id: UUID, ) -> None: try: index = self._gen_index(kind, tenants_id, companies_id) if self.client.indices.exists(index): self.client.indices.delete(index) except NotFoundError: pass def create_index( self, kind: Resource, tenants_id: UUID, companies_id: UUID, ) -> None: body: t.Dict[str, t.Any] = {} self.client.indices.create( index=self._gen_index(kind, tenants_id, companies_id), body=body, ) def search( self, kinds: t.List[Resource], tenants_id: UUID, companies_id: UUID, query: t.Dict[str, t.Any], ) -> t.Dict[t.Literal["hits"], t.Dict[str, t.Any]]: return self.client.search( index=",".join( [self._gen_index(kind, tenants_id, companies_id) for kind in kinds] ), body={"query": query}, ) def delete_document( self, kind: Resource, tenants_id: UUID, companies_id: UUID, document_id: str, ) -> t.Optional[t.Dict[str, t.Any]]: try: response = self.client.delete( index=self._gen_index(kind, tenants_id, companies_id), id=document_id, ) return response except Exception as e: logging.error(f"Error deleting document: {e}") return None
Dans main.py, nous montrons comment :
- Créez un index dans OpenSearch.
- Indexer les documents avec des exemples de données utilisateur.
- Rechercher des documents en fonction d'une requête.
- Supprimer un document en utilisant son identifiant.
main.py
# coding=utf-8 import logging import os import typing as t from uuid import uuid4 import searchservice from interfaces import Resources from opensearchpy import OpenSearch resources = Resources() logging.basicConfig(level=logging.INFO) search_service = searchservice.HTTPOpenSearchService( client=OpenSearch( hosts=[ { "host": os.getenv("OPENSEARCH_HOST", "localhost"), "port": os.getenv("OPENSEARCH_PORT", "9200"), } ], http_auth=( os.getenv("OPENSEARCH_USERNAME", ""), os.getenv("OPENSEARCH_PASSWORD", ""), ), use_ssl=False, verify_certs=False, ), ) tenants_id: str = "f0835e2d-bd68-406c-99a7-ad63a51e9ef9" companies_id: str = "bf58c749-c90a-41e2-b66f-6d98aae17a6c" search_str: str = "frank" document_id_to_delete: str = str(uuid4()) fake_data: t.List[t.Dict[str, t.Any]] = [ {"id": document_id_to_delete, "name": "Franklin", "tech": "python,node,golang"}, {"id": str(uuid4()), "name": "Jarvis", "tech": "AI"}, {"id": str(uuid4()), "name": "Parry", "tech": "Golang"}, {"id": str(uuid4()), "name": "Steve", "tech": "iOS"}, {"id": str(uuid4()), "name": "Frank", "tech": "node"}, ] search_service.delete_index( kind=resources.users, tenants_id=tenants_id, companies_id=companies_id ) search_service.create_index( kind=resources.users, tenants_id=tenants_id, companies_id=companies_id, ) for item in fake_data: search_service.index( kind=resources.users, tenants_id=tenants_id, companies_id=companies_id, data=dict(tenants_id=tenants_id, companies_id=companies_id, **item), ) search_query: t.Dict[str, t.Any] = { "bool": { "must": [], "must_not": [], "should": [], "filter": [ {"term": {"tenants_id.keyword": tenants_id}}, {"term": {"companies_id.keyword": companies_id}}, ], } } search_query["bool"]["must"].append( { "multi_match": { "query": search_str, "type": "phrase_prefix", "fields": ["name", "tech"], } } ) search_results = search_service.search( kinds=[resources.users], tenants_id=tenants_id, companies_id=companies_id, query=search_query, ) final_result = search_results.get("hits", {}).get("hits", []) for item in final_result: logging.info(["Item -> ", item.get("_source", {})]) deleted_result = search_service.delete_document( kind=resources.users, tenants_id=tenants_id, companies_id=companies_id, document_id=document_id_to_delete, ) logging.info(["Deleted result -> ", deleted_result])
docker composer
python main.py
Il devrait imprimer les informations sur les enregistrements trouvés et supprimés.
Dans ce blog, nous avons démontré comment configurer OpenSearch localement à l'aide de Docker et effectuer des opérations CRUD de base avec Python. OpenSearch fournit une solution puissante et évolutive pour gérer et interroger de grands ensembles de données. Bien que ce guide se concentre sur l'intégration d'OpenSearch avec des données factices, dans les applications du monde réel, OpenSearch est souvent utilisé comme un magasin optimisé pour la lecture pour une lecture plus rapide. récupération de données. Dans de tels cas, il est courant de mettre en œuvre différentes stratégies d'indexation pour garantir la cohérence des données en mettant à jour simultanément la base de données principale et OpenSearch.
Cela garantit que OpenSearch reste synchronisé avec votre données primaires source, optimisant à la fois les performances et la précision en récupération de données.
https://github.com/FranklinThaker/opensearch-integration-example
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!