OpenSearch,是Elasticsearch 的開源替代品,是一個強大的搜尋和分析引擎,旨在處理大型數據集 輕鬆。在本部落格中,我們將示範如何使用Python在OpenSearch中執行基本的CRUD(建立、讀取、更新、刪除)操作。
首先,我們需要一個本地 OpenSearch 實例。下面是一個簡單的 docker-compose.yml 文件,它啟動 OpenSearch 和 OpenSearch 儀表板。
version: '3' services: opensearch-test-node-1: image: opensearchproject/opensearch:2.13.0 container_name: opensearch-test-node-1 environment: - cluster.name=opensearch-test-cluster - node.name=opensearch-test-node-1 - discovery.seed_hosts=opensearch-test-node-1,opensearch-test-node-2 - cluster.initial_cluster_manager_nodes=opensearch-test-node-1,opensearch-test-node-2 - bootstrap.memory_lock=true - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" - "DISABLE_INSTALL_DEMO_CONFIG=true" - "DISABLE_SECURITY_PLUGIN=true" ulimits: memlock: soft: -1 hard: -1 nofile: soft: 65536 hard: 65536 volumes: - opensearch-test-data1:/usr/share/opensearch/data ports: - 9200:9200 - 9600:9600 networks: - opensearch-test-net opensearch-test-node-2: image: opensearchproject/opensearch:2.13.0 container_name: opensearch-test-node-2 environment: - cluster.name=opensearch-test-cluster - node.name=opensearch-test-node-2 - discovery.seed_hosts=opensearch-test-node-1,opensearch-test-node-2 - cluster.initial_cluster_manager_nodes=opensearch-test-node-1,opensearch-test-node-2 - bootstrap.memory_lock=true - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" - "DISABLE_INSTALL_DEMO_CONFIG=true" - "DISABLE_SECURITY_PLUGIN=true" ulimits: memlock: soft: -1 hard: -1 nofile: soft: 65536 hard: 65536 volumes: - opensearch-test-data2:/usr/share/opensearch/data networks: - opensearch-test-net opensearch-test-dashboards: image: opensearchproject/opensearch-dashboards:2.13.0 container_name: opensearch-test-dashboards ports: - 5601:5601 expose: - "5601" environment: - 'OPENSEARCH_HOSTS=["http://opensearch-test-node-1:9200","http://opensearch-test-node-2:9200"]' - "DISABLE_SECURITY_DASHBOARDS_PLUGIN=true" networks: - opensearch-test-net volumes: opensearch-test-data1: opensearch-test-data2: networks: opensearch-test-net:
執行下列指令來啟動您的 OpenSearch 執行個體:
docker-compose up
OpenSearch 可透過 http://localhost:9200 存取。
python -m venv .venv source .venv/bin/activate pip install opensearch-py
我們也會以以下方式建構我們的專案:
├── interfaces.py ├── main.py ├── searchservice.py ├── docker-compose.yml
在interfaces.py 檔案中,我們定義了Resource 和Resources 類別。這些將有助於我們動態處理 OpenSearch 中的不同資源類型(在本例中為使用者)。
from dataclasses import dataclass, field @dataclass class Resource: name: str def __post_init__(self) -> None: self.name = self.name.lower() @dataclass class Resources: users: Resource = field(default_factory=lambda: Resource("Users"))
在searchservice.py中,我們定義了一個抽象類別SearchService來概述所需的操作。然後,HTTPOpenSearchService 類別實作這些 CRUD 方法,與 OpenSearch 用戶端互動。
# coding: utf-8 import abc import logging import typing as t from dataclasses import dataclass from uuid import UUID from interfaces import Resource, Resources from opensearchpy import NotFoundError, OpenSearch resources = Resources() class SearchService(abc.ABC): def search( self, kinds: t.List[Resource], tenants_id: UUID, companies_id: UUID, query: t.Dict[str, t.Any], ) -> t.Dict[t.Literal["hits"], t.Dict[str, t.Any]]: raise NotImplementedError def delete_index( self, kind: Resource, tenants_id: UUID, companies_id: UUID, data: t.Dict[str, t.Any], ) -> None: raise NotImplementedError def index( self, kind: Resource, tenants_id: UUID, companies_id: UUID, data: t.Dict[str, t.Any], ) -> t.Dict[str, t.Any]: raise NotImplementedError def delete_document( self, kind: Resource, tenants_id: UUID, companies_id: UUID, document_id: str, ) -> t.Optional[t.Dict[str, t.Any]]: raise NotImplementedError def create_index( self, kind: Resource, tenants_id: UUID, companies_id: UUID, data: t.Dict[str, t.Any], ) -> None: raise NotImplementedError @dataclass(frozen=True) class HTTPOpenSearchService(SearchService): client: OpenSearch def _gen_index( self, kind: Resource, tenants_id: UUID, companies_id: UUID, ) -> str: return ( f"tenant_{str(UUID(str(tenants_id)))}" f"_company_{str(UUID(str(companies_id)))}" f"_kind_{kind.name}" ) def index( self, kind: Resource, tenants_id: UUID, companies_id: UUID, data: t.Dict[str, t.Any], ) -> t.Dict[str, t.Any]: self.client.index( index=self._gen_index(kind, tenants_id, companies_id), body=data, id=data.get("id"), ) return data def delete_index( self, kind: Resource, tenants_id: UUID, companies_id: UUID, ) -> None: try: index = self._gen_index(kind, tenants_id, companies_id) if self.client.indices.exists(index): self.client.indices.delete(index) except NotFoundError: pass def create_index( self, kind: Resource, tenants_id: UUID, companies_id: UUID, ) -> None: body: t.Dict[str, t.Any] = {} self.client.indices.create( index=self._gen_index(kind, tenants_id, companies_id), body=body, ) def search( self, kinds: t.List[Resource], tenants_id: UUID, companies_id: UUID, query: t.Dict[str, t.Any], ) -> t.Dict[t.Literal["hits"], t.Dict[str, t.Any]]: return self.client.search( index=",".join( [self._gen_index(kind, tenants_id, companies_id) for kind in kinds] ), body={"query": query}, ) def delete_document( self, kind: Resource, tenants_id: UUID, companies_id: UUID, document_id: str, ) -> t.Optional[t.Dict[str, t.Any]]: try: response = self.client.delete( index=self._gen_index(kind, tenants_id, companies_id), id=document_id, ) return response except Exception as e: logging.error(f"Error deleting document: {e}") return None
在 main.py 中,我們示範如何:
- 在 OpenSearch 中建立索引。
- 索引文件以及範例使用者資料。
- 根據查詢搜尋文件。
- 使用文件 ID 刪除文件。
main.py
# coding=utf-8 import logging import os import typing as t from uuid import uuid4 import searchservice from interfaces import Resources from opensearchpy import OpenSearch resources = Resources() logging.basicConfig(level=logging.INFO) search_service = searchservice.HTTPOpenSearchService( client=OpenSearch( hosts=[ { "host": os.getenv("OPENSEARCH_HOST", "localhost"), "port": os.getenv("OPENSEARCH_PORT", "9200"), } ], http_auth=( os.getenv("OPENSEARCH_USERNAME", ""), os.getenv("OPENSEARCH_PASSWORD", ""), ), use_ssl=False, verify_certs=False, ), ) tenants_id: str = "f0835e2d-bd68-406c-99a7-ad63a51e9ef9" companies_id: str = "bf58c749-c90a-41e2-b66f-6d98aae17a6c" search_str: str = "frank" document_id_to_delete: str = str(uuid4()) fake_data: t.List[t.Dict[str, t.Any]] = [ {"id": document_id_to_delete, "name": "Franklin", "tech": "python,node,golang"}, {"id": str(uuid4()), "name": "Jarvis", "tech": "AI"}, {"id": str(uuid4()), "name": "Parry", "tech": "Golang"}, {"id": str(uuid4()), "name": "Steve", "tech": "iOS"}, {"id": str(uuid4()), "name": "Frank", "tech": "node"}, ] search_service.delete_index( kind=resources.users, tenants_id=tenants_id, companies_id=companies_id ) search_service.create_index( kind=resources.users, tenants_id=tenants_id, companies_id=companies_id, ) for item in fake_data: search_service.index( kind=resources.users, tenants_id=tenants_id, companies_id=companies_id, data=dict(tenants_id=tenants_id, companies_id=companies_id, **item), ) search_query: t.Dict[str, t.Any] = { "bool": { "must": [], "must_not": [], "should": [], "filter": [ {"term": {"tenants_id.keyword": tenants_id}}, {"term": {"companies_id.keyword": companies_id}}, ], } } search_query["bool"]["must"].append( { "multi_match": { "query": search_str, "type": "phrase_prefix", "fields": ["name", "tech"], } } ) search_results = search_service.search( kinds=[resources.users], tenants_id=tenants_id, companies_id=companies_id, query=search_query, ) final_result = search_results.get("hits", {}).get("hits", []) for item in final_result: logging.info(["Item -> ", item.get("_source", {})]) deleted_result = search_service.delete_document( kind=resources.users, tenants_id=tenants_id, companies_id=companies_id, document_id=document_id_to_delete, ) logging.info(["Deleted result -> ", deleted_result])
docker 組成
python main.py
它應該會列印找到和刪除的記錄資訊。
在本部落格中,我們示範了如何使用Docker 在本機設定OpenSearch 並使用CRUD 執行基本操作🎜> Python。 OpenSearch 為管理和查詢大型資料集提供了強大且可擴展的解決方案。雖然本指南重點介紹OpenSearch 與虛擬資料的集成,但在實際應用程式中,OpenSearch 通常用作讀取最佳化儲存以更快 資料檢索。在這種情況下,通常會實作不同的索引策略,透過同時更新主資料庫和OpenSearch來確保資料一致性。
這可確保OpenSearch與您的主要數據來源保持同步,優化性能和準確性 在資料檢索中。
https://github.com/FranklinThaker/opensearch-integration-example
以上是在 Python 中使用 OpenSearch 掌握 CRUD 操作:實用指南的詳細內容。更多資訊請關注PHP中文網其他相關文章!