在現代 Web 開發中,我們經常處理事件,無論是處理傳入的 WebSocket 訊息、伺服器發送的事件 (SSE) 還是來自 Redis Pub/Sub 等服務的資料流。雖然 Node.js 提供了事件驅動的功能,但它缺乏一種開箱即用的方法來使用 for wait...of 迴圈非同步迭代事件。
在這篇文章中,我將引導您了解一種簡單且強大的方法,使用 TypeScript 和 AsyncGenerator 建立非同步事件迭代器。這種方法旨在允許您以乾淨且可預測的方式使用來自任何類型事件發射器的事件,並完全控制取消和清理邏輯。
在我最近的一個專案中,我需要監聽 Redis Pub/Sub 通道並將伺服器發送的事件 (SSE) 非同步分派給連接的客戶端。面臨的挑戰是在不壓垮系統的情況下處理傳入事件,同時允許消費者隨時取消事件流。
解決方案?一個事件迭代器,可將任何事件發射器(例如 Redis Pub/Sub)轉換為非同步迭代器。這使我們能夠以受控的方式處理事件,並在必要時優雅地處理取消。
讓我們深入了解實作。
export type Context<T> = { emit: (value: T) => void; cancel: () => void; }; export type CleanupFn = () => void | Promise<void>; export type Subscriber<T> = ( context: Context<T>, ) => void | CleanupFn | Promise<CleanupFn | void>; export async function* createEventIterator<T>( subscriber: Subscriber<T>, ): AsyncGenerator<T> { const events: T[] = []; let cancelled = false; // Create a promise that resolves whenever a new event is added to the events array let resolveNext: (() => void) | null = null; const emit = (event: T) => { events.push(event); // If we are awaiting for a new event, resolve the promise if (resolveNext) { resolveNext(); resolveNext = null; } }; const cancel = () => { cancelled = true; }; const unsubscribe = await subscriber({ emit, cancel }); try { while (!cancelled) { // If there are events in the queue, yield the next event if (events.length > 0) { yield events.shift()!; } else { // Wait for the next event await new Promise<void>((resolve) => { resolveNext = resolve; }); } } // Process any remaining events that were emitted before cancellation. while (events.length > 0) { yield events.shift()!; } } finally { await unsubscribe?.(); } }
此函數接受訂閱者函數,您可以將其掛接到任何事件發射器或發布/訂閱系統中。訂閱者提供了兩種基本方法:
函數傳回一個 AsyncGenerator
上下文物件:
上下文
事件佇列:
events: T[] 陣列用作儲存發出的事件的緩衝區。生成器將一一處理這些事件。如果佇列中沒有事件,它將等待下一個事件發出。
發出邏輯:
發出函數將新事件新增至佇列並解決任何待處理的承諾(即,如果生成器正在等待新事件)。
取消:
如果呼叫 cancel 函數,它會設定一個標誌 (cancelled = true) 來表示循環應該退出。在生成器完成之前,佇列中的任何剩餘事件仍將被處理。
清理:
取消後,生成器將呼叫取消訂閱函數(如果提供)來執行任何必要的清理。這對於取消訂閱 Redis 等外部系統或清理資源尤其重要。
讓我們看看如何使用此事件迭代器來監聽 Redis Pub/Sub 並非同步迭代傳入的訊息。
import Redis from 'ioredis'; function redisEventIterator(channel: string) { const client = new Redis(); return createEventIterator<string>(({ emit, cancel }) => { const messageHandler = (channel: string, message: string) => { emit(message); }; // Subscribe to the channel client.subscribe(channel); client.on('message', messageHandler); // Cleanup function to unsubscribe and disconnect return async () => { client.off('message', messageHandler); await client.unsubscribe(channel); await client.quit(); }; }); } // Usage (async () => { for await (const message of redisEventIterator('my-channel')) { console.log('New message:', message); // You can cancel the event stream if needed if (message === 'STOP') { break; } } })();
在此範例中,我們使用 createEventIterator 訂閱 Redis Pub/Sub 通道並非同步迭代訊息。每次有新訊息到達時,它都會被發送到生成器中,我們可以在那裡即時處理它。如果收到特定訊息(例如“STOP”),我們會中斷循環並取消訂閱 Redis。
以下是如何將 createEventIterator 與 Node.js 的 EventEmitter 結合使用:
import { EventEmitter } from 'events'; function eventEmitterIterator(emitter: EventEmitter, eventName: string) { return createEventIterator<string>(({ emit, cancel }) => { const eventHandler = (data: string) => emit(data); emitter.on(eventName, eventHandler); // Cleanup function to remove the listener return () => { emitter.off(eventName, eventHandler); }; }); } // Usage (async () => { const emitter = new EventEmitter(); // Simulate event emissions setTimeout(() => emitter.emit('data', 'First event'), 1000); setTimeout(() => emitter.emit('data', 'Second event'), 2000); setTimeout(() => emitter.emit('data', 'STOP'), 3000); for await (const event of eventEmitterIterator(emitter, 'data')) { console.log('Received event:', event); if (event === 'STOP') { break; } } })();
在此範例中:
非同步控制:透過利用 AsyncGenerator,我們可以非同步處理事件,按照自己的節奏處理它們,並在需要時暫停處理。
取消:隨時取消事件流的能力使這種方法變得靈活,特別是在可能需要正常關閉連接的現實場景中。
通用:此迭代器可用於任何事件發射器或 Pub/Sub 系統,使其適用於不同的應用程式。
이벤트 중심 아키텍처는 많은 최신 웹 애플리케이션의 초석이지만 이벤트 흐름을 비동기적으로 제어해야 할 때 관리하기 까다로울 수 있습니다. TypeScript의 AsyncGenerator 기능을 사용하면 이 이벤트 반복기와 같은 우아한 솔루션을 구축하여 이벤트 처리 코드를 더욱 깔끔하고 유지 관리하기 쉽게 만들 수 있습니다.
이 게시물이 여러분의 이벤트 이미터에 대한 비동기 반복자를 시작하는 데 도움이 되기를 바랍니다. 질문이나 의견이 있으시면 댓글로 자유롭게 공유해주세요!
위 내용은 비동기 생성기를 사용하여 TypeScript에서 이벤트 이미터를 비동기식으로 반복의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!