Rumah > pembangunan bahagian belakang > Tutorial Python > Mesin Negeri LangGraph: Mengurus Aliran Tugas Ejen Kompleks dalam Pengeluaran

Mesin Negeri LangGraph: Mengurus Aliran Tugas Ejen Kompleks dalam Pengeluaran

Barbara Streisand
Lepaskan: 2024-11-24 03:37:09
asal
1089 orang telah melayarinya

LangGraph State Machines: Managing Complex Agent Task Flows in Production

Apakah itu LangGraph?

LangGraph ialah rangka kerja orkestrasi aliran kerja yang direka khusus untuk aplikasi LLM. Prinsip terasnya ialah:

  • Memecahkan tugas yang rumit kepada keadaan dan peralihan
  • Mengurus logik peralihan keadaan
  • Mengendalikan pelbagai pengecualian semasa pelaksanaan tugas

Fikirkan membeli-belah: Semak imbas → Tambah ke Troli → Daftar Keluar → Pembayaran. LangGraph membantu kami mengurus aliran kerja sedemikian dengan cekap.

Konsep Teras

1. Negeri

Negeri adalah seperti pusat pemeriksaan dalam pelaksanaan tugas anda:

from typing import TypedDict, List

class ShoppingState(TypedDict):
    # Current state
    current_step: str
    # Cart items
    cart_items: List[str]
    # Total amount
    total_amount: float
    # User input
    user_input: str

class ShoppingGraph(StateGraph):
    def __init__(self):
        super().__init__()

        # Define states
        self.add_node("browse", self.browse_products)
        self.add_node("add_to_cart", self.add_to_cart)
        self.add_node("checkout", self.checkout)
        self.add_node("payment", self.payment)
Salin selepas log masuk

2. Peralihan Negeri

Peralihan negeri menentukan "peta jalan" aliran tugas anda:

class ShoppingController:
    def define_transitions(self):
        # Add transition rules
        self.graph.add_edge("browse", "add_to_cart")
        self.graph.add_edge("add_to_cart", "browse")
        self.graph.add_edge("add_to_cart", "checkout")
        self.graph.add_edge("checkout", "payment")

    def should_move_to_cart(self, state: ShoppingState) -> bool:
        """Determine if we should transition to cart state"""
        return "add to cart" in state["user_input"].lower()
Salin selepas log masuk

3. Kegigihan Negeri

Untuk memastikan kebolehpercayaan sistem, kami perlu mengekalkan maklumat keadaan:

class StateManager:
    def __init__(self):
        self.redis_client = redis.Redis()

    def save_state(self, session_id: str, state: dict):
        """Save state to Redis"""
        self.redis_client.set(
            f"shopping_state:{session_id}",
            json.dumps(state),
            ex=3600  # 1 hour expiration
        )

    def load_state(self, session_id: str) -> dict:
        """Load state from Redis"""
        state_data = self.redis_client.get(f"shopping_state:{session_id}")
        return json.loads(state_data) if state_data else None
Salin selepas log masuk

4. Mekanisme Pemulihan Ralat

Sebarang langkah boleh gagal, dan kita perlu menangani situasi ini dengan baik:

class ErrorHandler:
    def __init__(self):
        self.max_retries = 3

    async def with_retry(self, func, state: dict):
        """Function execution with retry mechanism"""
        retries = 0
        while retries < self.max_retries:
            try:
                return await func(state)
            except Exception as e:
                retries += 1
                if retries == self.max_retries:
                    return self.handle_final_error(e, state)
                await self.handle_retry(e, state, retries)

    def handle_final_error(self, error, state: dict):
        """Handle final error"""
        # Save error state
        state["error"] = str(error)
        # Rollback to last stable state
        return self.rollback_to_last_stable_state(state)
Salin selepas log masuk

Contoh Dunia Nyata: Sistem Perkhidmatan Pelanggan Pintar

Mari kita lihat contoh praktikal - sistem perkhidmatan pelanggan yang bijak:

from langgraph.graph import StateGraph, State

class CustomerServiceState(TypedDict):
    conversation_history: List[str]
    current_intent: str
    user_info: dict
    resolved: bool

class CustomerServiceGraph(StateGraph):
    def __init__(self):
        super().__init__()

        # Initialize states
        self.add_node("greeting", self.greet_customer)
        self.add_node("understand_intent", self.analyze_intent)
        self.add_node("handle_query", self.process_query)
        self.add_node("confirm_resolution", self.check_resolution)

    async def greet_customer(self, state: State):
        """Greet customer"""
        response = await self.llm.generate(
            prompt=f"""
            Conversation history: {state['conversation_history']}
            Task: Generate appropriate greeting
            Requirements:
            1. Maintain professional friendliness
            2. Acknowledge returning customers
            3. Ask how to help
            """
        )
        state['conversation_history'].append(f"Assistant: {response}")
        return state

    async def analyze_intent(self, state: State):
        """Understand user intent"""
        response = await self.llm.generate(
            prompt=f"""
            Conversation history: {state['conversation_history']}
            Task: Analyze user intent
            Output format:
            {{
                "intent": "refund/inquiry/complaint/other",
                "confidence": 0.95,
                "details": "specific description"
            }}
            """
        )
        state['current_intent'] = json.loads(response)
        return state
Salin selepas log masuk

Penggunaan

# Initialize system
graph = CustomerServiceGraph()
state_manager = StateManager()
error_handler = ErrorHandler()

async def handle_customer_query(user_id: str, message: str):
    # Load or create state
    state = state_manager.load_state(user_id) or {
        "conversation_history": [],
        "current_intent": None,
        "user_info": {},
        "resolved": False
    }

    # Add user message
    state["conversation_history"].append(f"User: {message}")

    # Execute state machine flow
    try:
        result = await graph.run(state)
        # Save state
        state_manager.save_state(user_id, result)
        return result["conversation_history"][-1]
    except Exception as e:
        return await error_handler.with_retry(
            graph.run,
            state
        )
Salin selepas log masuk

Amalan Terbaik

  1. Prinsip Reka Bentuk Negeri

    • Pastikan keadaan mudah dan jelas
    • Simpan maklumat yang diperlukan sahaja
    • Pertimbangkan keperluan bersiri
  2. Pengoptimuman Logik Peralihan

    • Gunakan peralihan bersyarat
    • Elakkan gelung tak terhingga
    • Tetapkan had langkah maksimum
  3. Strategi Pengendalian Ralat

    • Melaksanakan degradasi anggun
    • Log maklumat terperinci
    • Sediakan mekanisme rollback
  4. Pengoptimuman Prestasi

    • Gunakan operasi tak segerak
    • Laksanakan caching keadaan
    • Kawal saiz keadaan

Perangkap dan Penyelesaian Biasa

  1. Letupan Negeri

    • Masalah: Terlalu banyak negeri menyukarkan penyelenggaraan
    • Penyelesaian: Gabungkan keadaan yang serupa, gunakan gabungan keadaan dan bukannya mencipta yang baharu
  2. Situasi Kebuntuan

    • Masalah: Peralihan keadaan bulat yang menyebabkan tugas digantung
    • Penyelesaian: Tambahkan mekanisme tamat masa dan syarat keluar paksa
  3. Ketekalan Negeri

    • Masalah: Keadaan tidak konsisten dalam persekitaran teragih
    • Penyelesaian: Gunakan kunci yang diedarkan dan mekanisme transaksi

Ringkasan

Mesin keadaan LangGraph menyediakan penyelesaian yang berkuasa untuk menguruskan aliran tugas Ejen AI yang kompleks:

  • Kosongkan pengurusan aliran tugas
  • Kegigihan keadaan yang boleh dipercayai
  • Pengendalian ralat menyeluruh
  • Kebolehlanjutan fleksibel

Atas ialah kandungan terperinci Mesin Negeri LangGraph: Mengurus Aliran Tugas Ejen Kompleks dalam Pengeluaran. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan Laman Web ini
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn
Artikel terbaru oleh pengarang
Tutorial Popular
Lagi>
Muat turun terkini
Lagi>
kesan web
Kod sumber laman web
Bahan laman web
Templat hujung hadapan