Memandangkan penggunaan Generative AI (GenAI) meningkat merentasi industri, organisasi semakin memanfaatkan teknik Retrieval-Augmented Generation (RAG) untuk memperkukuh model AI mereka dengan masa nyata, kaya konteks data. Menguruskan aliran maklumat yang kompleks dalam aplikasi sedemikian menimbulkan cabaran yang ketara, terutamanya apabila berurusan dengan data yang dijana secara berterusan pada skala. KubeMQ, broker mesej yang mantap, muncul sebagai penyelesaian untuk menyelaraskan penghalaan berbilang proses RAG, memastikan pengendalian data yang cekap dalam aplikasi GenAI.
Untuk meningkatkan lagi kecekapan dan kebolehskalaan aliran kerja RAG, menyepadukan pangkalan data berprestasi tinggi seperti FalkorDB adalah penting. FalkorDB menyediakan penyelesaian storan yang boleh dipercayai dan berskala untuk pangkalan pengetahuan dinamik yang bergantung kepada sistem RAG, memastikan perolehan data pantas dan penyepaduan yang lancar dengan sistem pemesejan seperti KubeMQ.
RAG ialah paradigma yang meningkatkan model AI generatif dengan menyepadukan mekanisme perolehan semula, membenarkan model mengakses pangkalan pengetahuan luaran semasa inferens. Pendekatan ini meningkatkan ketepatan, perkaitan dan ketepatan masa respons yang dijana dengan ketara dengan meletakkannya pada maklumat terkini dan berkaitan yang tersedia.
Dalam aliran kerja GenAI biasa yang menggunakan RAG, prosesnya melibatkan berbilang langkah:
Pemprosesan pertanyaan: Mentafsir input pengguna untuk memahami maksud dan konteks
Pendapatan semula: Mengambil dokumen atau data yang berkaitan daripada pangkalan pengetahuan dinamik, seperti FalkorDB, yang memastikan akses pantas dan cekap kepada maklumat terkini dan berkaitan.
Penjanaan: Menghasilkan respons menggunakan kedua-dua input dan data yang diambil
Penyampaian respons: Memberikan hasil akhir yang diperkaya kembali kepada pengguna
Menskalakan langkah ini, terutamanya dalam persekitaran di mana data dijana dan dikemas kini secara berterusan, memerlukan mekanisme yang cekap dan boleh dipercayai untuk aliran data antara pelbagai komponen saluran paip RAG.
Dalam senario seperti rangkaian IoT, platform media sosial atau sistem analitik masa nyata, data baharu dihasilkan tanpa henti dan model AI mesti menyesuaikan diri dengan pantas untuk memasukkan maklumat ini. Seni bina tindak balas permintaan tradisional boleh menjadi kesesakan dalam keadaan pemprosesan tinggi, yang membawa kepada isu kependaman dan prestasi yang merosot.
KubeMQ menguruskan senario pemesejan berkemampuan tinggi dengan menyediakan infrastruktur berskala dan teguh untuk penghalaan data yang cekap antara perkhidmatan. Dengan menyepadukan KubeMQ ke dalam saluran paip RAG, setiap titik data baharu diterbitkan ke baris gilir atau strim mesej, memastikan bahawa komponen pengambilan mempunyai akses segera kepada maklumat terkini tanpa membebankan sistem. Keupayaan pengendalian data masa nyata ini adalah penting untuk mengekalkan perkaitan dan ketepatan output GenAI.
KubeMQ menawarkan pelbagai corak pemesejan — termasuk baris gilir, strim, publish-subscribe (pub/sub) dan Panggilan Prosedur Jauh (RPC) — menjadikannya penghala yang serba boleh dan berkuasa dalam saluran paip RAG. Ciri kependaman rendah dan prestasi tinggi memastikan penghantaran mesej segera, yang penting untuk aplikasi GenAI masa nyata di mana kelewatan boleh memberi kesan ketara kepada pengalaman pengguna dan keberkesanan sistem.
Selain itu, keupayaan KubeMQ untuk mengendalikan logik penghalaan yang kompleks membolehkan strategi pengedaran data yang canggih. Ini memastikan komponen sistem AI yang berbeza menerima dengan tepat data yang mereka perlukan, apabila mereka memerlukannya, tanpa pertindihan atau penangguhan yang tidak perlu.
Walaupun KubeMQ menghalakan mesej antara perkhidmatan dengan cekap, FalkorDB melengkapkan ini dengan menyediakan penyelesaian pangkalan data graf berskala dan berprestasi tinggi untuk menyimpan dan mendapatkan semula sejumlah besar data yang diperlukan oleh proses RAG. Penyepaduan ini memastikan bahawa semasa data baharu mengalir melalui KubeMQ, ia disimpan dengan lancar dalam FalkorDB, menjadikannya tersedia untuk operasi mendapatkan semula tanpa memperkenalkan kependaman atau kesesakan.
Apabila aplikasi GenAI berkembang dalam pangkalan pengguna dan volum data, kebolehskalaan menjadi kebimbangan utama. KubeMQ boleh berskala, menyokong penskalaan mendatar untuk menampung peningkatan beban dengan lancar. Ia memastikan bahawa apabila bilangan proses RAG meningkat atau apabila penjanaan data dipercepatkan, infrastruktur pemesejan kekal teguh dan responsif.
Selain itu, KubeMQ menyediakan ketekalan mesej dan toleransi kesalahan. Sekiranya berlaku kegagalan sistem atau gangguan rangkaian, KubeMQ memastikan bahawa mesej tidak hilang dan sistem boleh pulih dengan anggun. Kebolehpercayaan ini penting dalam mengekalkan integriti aplikasi AI yang pengguna bergantung pada maklumat yang tepat pada masanya dan tepat.
Melaksanakan perkhidmatan penghalaan tersuai untuk pengendalian data dalam saluran paip RAG boleh menjadi intensif sumber dan kompleks. Ia selalunya memerlukan usaha pembangunan yang ketara untuk membina, menyelenggara dan menskalakan perkhidmatan ini, mengalihkan fokus daripada pembangunan aplikasi AI teras.
Dengan mengguna pakai KubeMQ, organisasi menghapuskan keperluan untuk mencipta penyelesaian penghalaan yang dipesan lebih dahulu. KubeMQ menyediakan fungsi luar kotak yang menangani keperluan penghalaan proses RAG, termasuk corak penghalaan yang kompleks, penapisan mesej dan pengendalian keutamaan. Ini bukan sahaja mengurangkan overhed pembangunan dan penyelenggaraan tetapi juga mempercepatkan masa ke pasaran untuk penyelesaian GenAI.
KubeMQ menawarkan berbilang antara muka untuk berinteraksi dengan keupayaan broker mesejnya:
REST API: Mendayakan penyepaduan bahasa-agnostik, membenarkan perkhidmatan yang ditulis dalam mana-mana bahasa pengaturcaraan menghantar dan menerima mesej melalui HTTP
SDK: Menyediakan perpustakaan pelanggan untuk pelbagai bahasa pengaturcaraan (seperti Python, Java, Go dan .NET), memudahkan corak komunikasi yang lebih cekap dan prestasi yang lebih baik melalui penyepaduan asli
Fleksibiliti ini membolehkan pembangun memilih kaedah yang paling sesuai untuk kes penggunaan khusus mereka, memudahkan seni bina dan mempercepatkan kitaran pembangunan. Satu titik sentuh untuk penghalaan data memperkemas komunikasi antara komponen saluran paip RAG yang berbeza, meningkatkan keselarasan sistem keseluruhan.
Contoh kod mempamerkan cara membina sistem mendapatkan maklumat filem dengan menyepadukan KubeMQ ke dalam saluran paip RAG. Ia menyediakan pelayan yang mengambil URL filem daripada Rotten Tomatoes untuk membina graf pengetahuan menggunakan GPT-4. Pengguna boleh berinteraksi dengan sistem ini melalui pelanggan sembang, menghantar pertanyaan berkaitan filem dan menerima respons yang dijana AI. Kes penggunaan ini menunjukkan cara mengendalikan pengingesan data berterusan dan pemprosesan pertanyaan masa nyata dalam aplikasi praktikal, menggunakan KubeMQ untuk pengendalian mesej yang cekap dan komunikasi antara perkhidmatan dalam konteks filem.
Perkhidmatan pengingesan data: Menangkap dan menerbitkan data baharu ke strim KubeMQ apabila ia tersedia
Perkhidmatan mendapatkan semula: Langgan strim KubeMQ untuk menerima kemas kini dan menyegarkan pangkalan pengetahuan
Perkhidmatan penjanaan: Mendengar permintaan pertanyaan, berinteraksi dengan model AI dan menjana respons
Perkhidmatan respons: Menghantar semula respons yang dijana kepada pengguna melalui saluran yang sesuai
Pastikan bahawa KubeMQ beroperasi, yang boleh dicapai dengan mengaturnya menggunakan Docker:
docker run -d --rm \ -p 8080:8080 \ -p 50000:50000 \ -p 9090:9090 \ -e KUBEMQ_TOKEN="your token"
Arahan ini memulakan KubeMQ dengan port yang diperlukan didedahkan untuk komunikasi REST dan gRPC.
Kod ini (repo GitHub) melaksanakan pelayan RAG yang memproses pertanyaan sembang dan mengurus sumber pengetahuan menggunakan KubeMQ untuk pengendalian mesej.
docker run -d --rm \ -p 8080:8080 \ -p 50000:50000 \ -p 9090:9090 \ -e KUBEMQ_TOKEN="your token"
Pelayan menjalankan dua utas utama: satu yang melanggan pertanyaan sembang melalui saluran yang dipanggil "rag-chat-query" dan memprosesnya menggunakan graf pengetahuan dengan GPT-4 dan satu lagi yang secara berterusan menarik diri daripada baris gilir yang dipanggil "rag -sources-queue" untuk menambah sumber baharu pada graf pengetahuan. Graf pengetahuan dimulakan dengan ontologi tersuai yang dimuatkan daripada fail JSON dan menggunakan model GPT-4 OpenAI untuk pemprosesan. Pelayan melaksanakan pengendalian penutupan yang anggun dan pengurusan ralat, memastikan semua urutan ditamatkan dengan betul apabila pelayan dihentikan.
# server.py import json import threading from typing import List from dotenv import load_dotenv load_dotenv() import time from kubemq.common import CancellationToken from kubemq.cq import Client as CQClient, QueryMessageReceived, QueryResponseMessage, QueriesSubscription from kubemq.queues import Client as QueuesClient from graphrag_sdk.models.openai import OpenAiGenerativeModel from graphrag_sdk.model_config import KnowledgeGraphModelConfig from graphrag_sdk import KnowledgeGraph, Ontology from graphrag_sdk.source import URL class RAGServer: def __init__(self): self.cq_client = CQClient(address="localhost:50000") self.queues_client = QueuesClient(address="localhost:50000") model = OpenAiGenerativeModel(model_name="gpt-4o") with open("ontology.json", "r") as f: ontology = json.load(f) ontology = Ontology.from_json(ontology) self.kg = KnowledgeGraph( name="movies", model_config=KnowledgeGraphModelConfig.with_model(model), ontology=ontology) self.chat = self.kg.chat_session() self.shutdown_event = threading.Event() self.threads: List[threading.Thread] = [] def handle_chat(self, request: QueryMessageReceived): try: message = request.body.decode('utf-8') print(f"Received chat message: {message}") result= self.chat.send_message(message) answer = result.get("response","No answer") print(f"Chat response: {answer}") response = QueryResponseMessage( query_received=request, is_executed=True, body=answer.encode('utf-8') ) self.cq_client.send_response_message(response) except Exception as e: print(f"Error processing chat message: {str(e)}") self.cq_client.send_response_message(QueryResponseMessage( query_received=request, is_executed=False, error=str(e) )) def pull_from_queue(self): while not self.shutdown_event.is_set(): try: result = self.queues_client.pull("rag-sources-queue", 10, 1) if result.is_error: print(f"Error pulling message from queue: {result.error}") continue sources = [] for message in result.messages: source = message.body.decode('utf-8') print(f"Received source: {source}, adding to knowledge graph") sources.append(URL(message.body.decode('utf-8'))) if sources: self.kg.process_sources(sources) except Exception as e: if not self.shutdown_event.is_set(): # Only log if not shutting down print(f"Error processing sources: {str(e)}") def subscribe_to_chat_queries(self): def on_error(err: str): if not self.shutdown_event.is_set(): # Only log if not shutting down print(f"Error: {err}") cancellation_token = CancellationToken() try: self.cq_client.subscribe_to_queries( subscription=QueriesSubscription( channel="rag-chat-query", on_receive_query_callback=self.handle_chat, on_error_callback=on_error, ), cancel=cancellation_token ) # Wait for shutdown signal while not self.shutdown_event.is_set(): time.sleep(0.1) # Cancel subscription when shutdown is requested cancellation_token.cancel() except Exception as e: if not self.shutdown_event.is_set(): print(f"Error in subscription thread: {str(e)}") def run(self): chat_thread = threading.Thread(target=self.subscribe_to_chat_queries) queue_thread = threading.Thread(target=self.pull_from_queue) self.threads.extend([chat_thread, queue_thread]) for thread in self.threads: thread.daemon = True # Make threads daemon so they exit when main thread exits thread.start() print("RAG server started") try: while True: time.sleep(1) except KeyboardInterrupt: print("\nShutting down gracefully...") self.shutdown() self.cq_client.close() self.queues_client.close() def shutdown(self): print("Initiating shutdown sequence...") self.shutdown_event.set() # Signal all threads to stop for thread in self.threads: thread.join(timeout=5.0) # Wait up to 5 seconds for each thread if thread.is_alive(): print(f"Warning: Thread {thread.name} did not shutdown cleanly") print("Shutdown complete") if __name__ == "__main__": rag_server = RAGServer() rag_server.run()
Kod ini melaksanakan klien mudah yang menghantar URL filem ke pelayan RAG melalui sistem baris gilir KubeMQ. Khususnya, ia mencipta kelas SourceClient yang bersambung ke KubeMQ dan menghantar mesej ke saluran "rag-sources-queue", yang merupakan baris gilir yang sama yang dipantau oleh pelayan RAG. Apabila dijalankan sebagai program utama, ia menghantar senarai URL filem Rotten Tomatoes (termasuk filem Matrix, John Wick dan Speed) untuk diproses dan ditambahkan pada graf pengetahuan oleh pelayan RAG.
# sources_client.py from kubemq.queues import * class SourceClient: def __init__(self, address="localhost:50000"): self.client = Client(address=address) def send_source(self, message: str) : send_result = self.client.send_queues_message( QueueMessage( channel="rag-sources-queue", body=message.encode("utf-8"), ) ) if send_result.is_error: print(f"message send error, error:{send_result.error}") if __name__ == "__main__": client = SourceClient() urls = ["https://www.rottentomatoes.com/m/side_by_side_2012", "https://www.rottentomatoes.com/m/matrix", "https://www.rottentomatoes.com/m/matrix_revolutions", "https://www.rottentomatoes.com/m/matrix_reloaded", "https://www.rottentomatoes.com/m/speed_1994", "https://www.rottentomatoes.com/m/john_wick_chapter_4"] for url in urls: client.send_source(url) print("done")
Kod ini melaksanakan pelanggan sembang yang berkomunikasi dengan pelayan RAG melalui sistem pertanyaan KubeMQ. Kelas ChatClient menghantar mesej ke saluran "rag-chat-query" dan menunggu balasan, dengan tamat masa 30 saat untuk setiap pertanyaan. Apabila dijalankan sebagai program utama, ia menunjukkan kefungsian pelanggan dengan menghantar dua soalan berkaitan tentang pengarah The Matrix dan sambungan mereka kepada Keanu Reeves, mencetak setiap respons semasa menerimanya.
Semua contoh kod boleh didapati dalam garpu saya bagi repositori GitHub asal.
Mengintegrasikan KubeMQ ke dalam saluran paip RAG untuk aplikasi GenAI menyediakan mekanisme berskala, boleh dipercayai dan cekap untuk mengendalikan aliran data berterusan dan komunikasi antara proses yang kompleks. Dengan berfungsi sebagai penghala bersatu dengan corak pemesejan serba boleh, KubeMQ memudahkan keseluruhan seni bina, mengurangkan keperluan untuk penyelesaian penghalaan tersuai dan mempercepatkan kitaran pembangunan.
Selain itu, menggabungkan FalkorDB meningkatkan pengurusan data dengan menawarkan pangkalan pengetahuan berprestasi tinggi yang disepadukan dengan lancar dengan KubeMQ. Gabungan ini memastikan pengambilan dan penyimpanan data yang dioptimumkan, menyokong keperluan dinamik proses RAG.
Keupayaan untuk mengendalikan senario pemprosesan tinggi, digabungkan dengan ciri seperti kegigihan dan toleransi kesalahan, memastikan aplikasi GenAI kekal responsif dan boleh dipercayai, walaupun di bawah beban berat atau dalam menghadapi gangguan sistem.
Dengan memanfaatkan KubeMQ dan FalkorDB, organisasi boleh menumpukan pada meningkatkan model AI mereka dan menyampaikan cerapan dan perkhidmatan yang berharga, yakin bahawa infrastruktur penghalaan data mereka teguh dan mampu memenuhi permintaan aliran kerja AI moden.
Atas ialah kandungan terperinci Mempertingkatkan Aplikasi GenAI Dengan KubeMQ: Menskalakan dengan Cekap Penskalaan Retrieval-Augmented Generation (RAG). Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!