Through the interruption function of langgraph, I learned that humans can intervene in the middle of Agent execution.
But if you look at the examples, they all just ignore human interaction. What should I do to actually get confirmation from the user? I think there are three main ways.
You can run the langgraph API server with docker using langgraph cli, then run the graph, change the state, and restart it with the langgraph SDK.
The items provided by langgraph must be used in the manner provided. There are a lot of settings, and it seems like it might be difficult to integrate it with my code.
This is a method of implementing only the necessary parts of the Langgraph API server above on my custom server. For example, when running a graph, the client that executed the graph and the graph checkpoint must be saved, and after the user's confirmation, the graph must be loaded again and the state changed according to the user's response to be restarted.
There may be a lot to think about.
When executing the Agent, a socket is connected and the user interacts through the socket. It works by simply adding the step of receiving user confirmation through socket connection and socket communication in the existing example code.
Instead, it may be difficult to implement streaming that is typed like typing.
First of all, I wanted to implement it in a way that does not increase complexity as much as possible, so I implemented it with a socket connection.
The server uses NestJs and the client uses NextJs.
First, create a gateway for Websocket connection. A connection was created at agent/start and the agent was executed immediately.
@WebSocketGateway({ namespace: "/", transport: ["websocket", "polling"], path: "/agent/start", cors: { origin: "*", methods: ["GET", "POST"], credentials: true, }, }) export class AgentGateway implements OnGatewayConnection, OnGatewayDisconnect { @WebSocketServer() server: Server; protected readonly logger = new Logger(this.constructor.name); constructor( private readonly agentFactory: AgentFactory ) {} private pendingConfirmations = new Map<string, (response: boolean) => void>(); // Handle new connections handleConnection(client: Socket) { console.log(`Client connected: ${client.id}`); // Option 1: Get actionData from query parameters const actionData: { agent: AgentName } = client.handshake.query.actionData ? JSON.parse(client.handshake.query.actionData as string) : null; if (actionData) { this.startAgentProcess(client, actionData); } else { // If no actionData is provided, you can wait for an event client.emit("error", "No action data provided"); client.disconnect(); } } // Handle disconnections handleDisconnect(client: Socket) { console.log(`Client disconnected: ${client.id}`); this.pendingConfirmations.delete(client.id); } // Send confirmation request async sendConfirmationRequest(clientId: string, data: any): Promise<boolean> { return new Promise((resolve) => { this.pendingConfirmations.set(clientId, resolve); this.server.to(clientId).emit("confirmation_request", data); // Optional timeout for response setTimeout(() => { if (this.pendingConfirmations.has(clientId)) { this.pendingConfirmations.delete(clientId); resolve(false); // Default to 'false' if timeout occurs } }, 3000000); // 3000 seconds timeout }); } // Handle client's confirmation response @SubscribeMessage("confirmation_response") handleClientResponse( @MessageBody() data: { confirmed: boolean }, @ConnectedSocket() client: Socket ) { const resolve = this.pendingConfirmations.get(client.id); if (resolve) { resolve(data.confirmed); this.pendingConfirmations.delete(client.id); } } // Start the agent process private async startAgentProcess( client: Socket, actionData: { agent: AgentName } ) { const graph = await this.agentFactory.create({ agentName: actionData.agent, }); const initialInput = { input: "hello world" }; // Thread const graphStateConfig = { configurable: { thread_id: "1" }, streamMode: "values" as const, }; // Run the graph until the first interruption for await (const event of await graph.stream( initialInput, graphStateConfig )) { this.logAndEmit(client, `--- ${event.input} ---`); } // Will log when the graph is interrupted, after step 2. this.logAndEmit(client, "---GRAPH INTERRUPTED---"); const userConfirmed = await this.sendConfirmationRequest(client.id, { message: "Do you want to proceed with this action?", actionData, }); if (userConfirmed) { // If approved, continue the graph execution. We must pass `null` as // the input here, or the graph will for await (const event of await graph.stream(null, graphStateConfig)) { this.logAndEmit(client, `--- ${event.input} ---`); } this.logAndEmit(client, "---ACTION EXECUTED---"); } else { this.logAndEmit(client, "---ACTION CANCELLED---"); } // Optionally disconnect the client client.disconnect(); } private logAndEmit(client: Socket, message: string) { console.log(message); client.emit("message", { message }); } }
The key is simple. When a socket is connected, an agent is immediately created and executed, and when it is interrupted, a confirmation request message is sent to the client and waits. Once the confirmation is resolved, the graph continues.
The agent used in the above code is an agent that sequentially uses steps 1 2 3 below in the langgraph document.
const GraphState = Annotation.Root({ input: Annotation<string>, }); const step1 = (state: typeof GraphState.State) => { console.log("---Step 1---"); return state; }; const step2 = (state: typeof GraphState.State) => { console.log("---Step 2---"); return state; }; const step3 = (state: typeof GraphState.State) => { console.log("---Step 3---"); return state; }; const builder = new StateGraph(GraphState) .addNode("step1", step1) .addNode("step2", step2) .addNode("step3", step3) .addEdge(START, "step1") .addEdge("step1", "step2") .addEdge("step2", "step3") .addEdge("step3", END); // Set up memory const graphStateMemory = new MemorySaver(); const graph = builder.compile({ checkpointer: graphStateMemory, interruptBefore: ["step3"], }); return graph;
The client creates a hook to manage agent start and its status.
import { useRef, useState } from "react"; import io, { Socket } from "socket.io-client"; export const useAgentSocket = () => { const socketRef = useRef<Socket | null>(null); const [confirmationRequest, setConfirmationRequest] = useState<any>(null); const [messages, setMessages] = useState<string[]>([]); const connectAndRun = (actionData: any) => { return new Promise((resolve, reject) => { socketRef.current = io("http://localhost:8000", { path: "/agent/start", transports: ["websocket", "polling"], query: { actionData: JSON.stringify(actionData), }, }); socketRef.current.on("connect", () => { console.log("Connected:", socketRef.current?.id); resolve(void 0); }); socketRef.current.on("connect_error", (error) => { console.error("Connection error:", error); reject(error); }); // Listen for confirmation requests socketRef.current.on("confirmation_request", (data) => { setConfirmationRequest(data); }); // Listen for messages socketRef.current.on("message", (data) => { console.log("Received message:", data); setMessages((prevMessages) => [...prevMessages, data.message]); }); socketRef.current.on("disconnect", () => { console.log("Disconnected from server"); }); }); }; const sendConfirmationResponse = (confirmed: boolean) => { if (socketRef.current) { socketRef.current.emit("confirmation_response", { confirmed }); setConfirmationRequest(null); } }; const disconnectSocket = () => { if (socketRef.current) { socketRef.current.disconnect(); } }; const clearMessages = () => { setMessages([]); }; return { confirmationRequest, sendConfirmationResponse, connectAndRun, disconnectSocket, messages, clearMessages, }; };
Establishes a connection and updates the confirmationRequest status when a confirmation request arrives. Just look at the confirmationRequest status in the UI component and pop up a window to the user.
The above is the detailed content of Langgraph Human In The Loop with socket. For more information, please follow other related articles on the PHP Chinese website!