Rumah > pembangunan bahagian belakang > Tutorial Python > Mesin Negeri LangGraph: Mengurus Aliran Tugas Ejen Kompleks dalam Pengeluaran

Mesin Negeri LangGraph: Mengurus Aliran Tugas Ejen Kompleks dalam Pengeluaran

Barbara Streisand
Lepaskan: 2024-11-24 03:37:09
asal
998 orang telah melayarinya

LangGraph State Machines: Managing Complex Agent Task Flows in Production

Apakah itu LangGraph?

LangGraph ialah rangka kerja orkestrasi aliran kerja yang direka khusus untuk aplikasi LLM. Prinsip terasnya ialah:

  • Memecahkan tugas yang rumit kepada keadaan dan peralihan
  • Mengurus logik peralihan keadaan
  • Mengendalikan pelbagai pengecualian semasa pelaksanaan tugas

Fikirkan membeli-belah: Semak imbas → Tambah ke Troli → Daftar Keluar → Pembayaran. LangGraph membantu kami mengurus aliran kerja sedemikian dengan cekap.

Konsep Teras

1. Negeri

Negeri adalah seperti pusat pemeriksaan dalam pelaksanaan tugas anda:

from typing import TypedDict, List

class ShoppingState(TypedDict):
    # Current state
    current_step: str
    # Cart items
    cart_items: List[str]
    # Total amount
    total_amount: float
    # User input
    user_input: str

class ShoppingGraph(StateGraph):
    def __init__(self):
        super().__init__()

        # Define states
        self.add_node("browse", self.browse_products)
        self.add_node("add_to_cart", self.add_to_cart)
        self.add_node("checkout", self.checkout)
        self.add_node("payment", self.payment)
Salin selepas log masuk

2. Peralihan Negeri

Peralihan negeri menentukan "peta jalan" aliran tugas anda:

class ShoppingController:
    def define_transitions(self):
        # Add transition rules
        self.graph.add_edge("browse", "add_to_cart")
        self.graph.add_edge("add_to_cart", "browse")
        self.graph.add_edge("add_to_cart", "checkout")
        self.graph.add_edge("checkout", "payment")

    def should_move_to_cart(self, state: ShoppingState) -> bool:
        """Determine if we should transition to cart state"""
        return "add to cart" in state["user_input"].lower()
Salin selepas log masuk

3. Kegigihan Negeri

Untuk memastikan kebolehpercayaan sistem, kami perlu mengekalkan maklumat keadaan:

class StateManager:
    def __init__(self):
        self.redis_client = redis.Redis()

    def save_state(self, session_id: str, state: dict):
        """Save state to Redis"""
        self.redis_client.set(
            f"shopping_state:{session_id}",
            json.dumps(state),
            ex=3600  # 1 hour expiration
        )

    def load_state(self, session_id: str) -> dict:
        """Load state from Redis"""
        state_data = self.redis_client.get(f"shopping_state:{session_id}")
        return json.loads(state_data) if state_data else None
Salin selepas log masuk

4. Mekanisme Pemulihan Ralat

Sebarang langkah boleh gagal, dan kita perlu menangani situasi ini dengan baik:

class ErrorHandler:
    def __init__(self):
        self.max_retries = 3

    async def with_retry(self, func, state: dict):
        """Function execution with retry mechanism"""
        retries = 0
        while retries < self.max_retries:
            try:
                return await func(state)
            except Exception as e:
                retries += 1
                if retries == self.max_retries:
                    return self.handle_final_error(e, state)
                await self.handle_retry(e, state, retries)

    def handle_final_error(self, error, state: dict):
        """Handle final error"""
        # Save error state
        state["error"] = str(error)
        # Rollback to last stable state
        return self.rollback_to_last_stable_state(state)
Salin selepas log masuk

Contoh Dunia Nyata: Sistem Perkhidmatan Pelanggan Pintar

Mari kita lihat contoh praktikal - sistem perkhidmatan pelanggan yang bijak:

from langgraph.graph import StateGraph, State

class CustomerServiceState(TypedDict):
    conversation_history: List[str]
    current_intent: str
    user_info: dict
    resolved: bool

class CustomerServiceGraph(StateGraph):
    def __init__(self):
        super().__init__()

        # Initialize states
        self.add_node("greeting", self.greet_customer)
        self.add_node("understand_intent", self.analyze_intent)
        self.add_node("handle_query", self.process_query)
        self.add_node("confirm_resolution", self.check_resolution)

    async def greet_customer(self, state: State):
        """Greet customer"""
        response = await self.llm.generate(
            prompt=f"""
            Conversation history: {state['conversation_history']}
            Task: Generate appropriate greeting
            Requirements:
            1. Maintain professional friendliness
            2. Acknowledge returning customers
            3. Ask how to help
            """
        )
        state['conversation_history'].append(f"Assistant: {response}")
        return state

    async def analyze_intent(self, state: State):
        """Understand user intent"""
        response = await self.llm.generate(
            prompt=f"""
            Conversation history: {state['conversation_history']}
            Task: Analyze user intent
            Output format:
            {{
                "intent": "refund/inquiry/complaint/other",
                "confidence": 0.95,
                "details": "specific description"
            }}
            """
        )
        state['current_intent'] = json.loads(response)
        return state
Salin selepas log masuk

Penggunaan

# Initialize system
graph = CustomerServiceGraph()
state_manager = StateManager()
error_handler = ErrorHandler()

async def handle_customer_query(user_id: str, message: str):
    # Load or create state
    state = state_manager.load_state(user_id) or {
        "conversation_history": [],
        "current_intent": None,
        "user_info": {},
        "resolved": False
    }

    # Add user message
    state["conversation_history"].append(f"User: {message}")

    # Execute state machine flow
    try:
        result = await graph.run(state)
        # Save state
        state_manager.save_state(user_id, result)
        return result["conversation_history"][-1]
    except Exception as e:
        return await error_handler.with_retry(
            graph.run,
            state
        )
Salin selepas log masuk

Amalan Terbaik

  1. Prinsip Reka Bentuk Negeri

    • Pastikan keadaan mudah dan jelas
    • Simpan maklumat yang diperlukan sahaja
    • Pertimbangkan keperluan bersiri
  2. Pengoptimuman Logik Peralihan

    • Gunakan peralihan bersyarat
    • Elakkan gelung tak terhingga
    • Tetapkan had langkah maksimum
  3. Strategi Pengendalian Ralat

    • Melaksanakan degradasi anggun
    • Log maklumat terperinci
    • Sediakan mekanisme rollback
  4. Pengoptimuman Prestasi

    • Gunakan operasi tak segerak
    • Laksanakan caching keadaan
    • Kawal saiz keadaan

Perangkap dan Penyelesaian Biasa

  1. Letupan Negeri

    • Masalah: Terlalu banyak negeri menyukarkan penyelenggaraan
    • Penyelesaian: Gabungkan keadaan yang serupa, gunakan gabungan keadaan dan bukannya mencipta yang baharu
  2. Situasi Kebuntuan

    • Masalah: Peralihan keadaan bulat yang menyebabkan tugas digantung
    • Penyelesaian: Tambahkan mekanisme tamat masa dan syarat keluar paksa
  3. Ketekalan Negeri

    • Masalah: Keadaan tidak konsisten dalam persekitaran teragih
    • Penyelesaian: Gunakan kunci yang diedarkan dan mekanisme transaksi

Ringkasan

Mesin keadaan LangGraph menyediakan penyelesaian yang berkuasa untuk menguruskan aliran tugas Ejen AI yang kompleks:

  • Kosongkan pengurusan aliran tugas
  • Kegigihan keadaan yang boleh dipercayai
  • Pengendalian ralat menyeluruh
  • Kebolehlanjutan fleksibel

Atas ialah kandungan terperinci Mesin Negeri LangGraph: Mengurus Aliran Tugas Ejen Kompleks dalam Pengeluaran. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

sumber:dev.to
Kenyataan Laman Web ini
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn
Artikel terbaru oleh pengarang
Tutorial Popular
Lagi>
Muat turun terkini
Lagi>
kesan web
Kod sumber laman web
Bahan laman web
Templat hujung hadapan