현대 웹 개발에서는 들어오는 WebSocket 메시지, 서버 전송 이벤트(SSE) 또는 Redis Pub/Sub와 같은 서비스의 데이터 스트림 처리 등 이벤트를 처리하는 경우가 많습니다. Node.js는 이벤트 기반 기능을 제공하지만 for wait...of 루프를 사용하여 이벤트를 비동기적으로 반복하는 기본 방법이 부족합니다.
이 게시물에서는 TypeScript와 AsyncGenerator를 사용하여 비동기 이벤트 반복자를 생성하는 간단하면서도 강력한 방법을 안내해 드리겠습니다. 이 접근 방식은 취소 및 정리 로직을 완벽하게 제어하면서 깔끔하고 예측 가능한 방식으로 모든 종류의 이벤트 이미터의 이벤트를 사용할 수 있도록 설계되었습니다.
최근 프로젝트 중 하나에서는 Redis Pub/Sub 채널을 수신하고 서버 전송 이벤트(SSE)를 연결된 클라이언트에 비동기식으로 전달해야 했습니다. 문제는 소비자가 언제든지 이벤트 스트림을 취소할 수 있도록 하면서 시스템에 부담을 주지 않고 들어오는 이벤트를 처리하는 것이었습니다.
해결책은? 이벤트 이미터(예: Redis Pub/Sub)를 비동기 반복 가능 항목으로 변환하는 이벤트 반복기입니다. 이를 통해 이벤트를 통제된 방식으로 처리하고 필요한 경우 취소를 원활하게 처리할 수 있습니다.
구현에 대해 자세히 살펴보겠습니다.
export type Context<T> = { emit: (value: T) => void; cancel: () => void; }; export type CleanupFn = () => void | Promise<void>; export type Subscriber<T> = ( context: Context<T>, ) => void | CleanupFn | Promise<CleanupFn | void>; export async function* createEventIterator<T>( subscriber: Subscriber<T>, ): AsyncGenerator<T> { const events: T[] = []; let cancelled = false; // Create a promise that resolves whenever a new event is added to the events array let resolveNext: (() => void) | null = null; const emit = (event: T) => { events.push(event); // If we are awaiting for a new event, resolve the promise if (resolveNext) { resolveNext(); resolveNext = null; } }; const cancel = () => { cancelled = true; }; const unsubscribe = await subscriber({ emit, cancel }); try { while (!cancelled) { // If there are events in the queue, yield the next event if (events.length > 0) { yield events.shift()!; } else { // Wait for the next event await new Promise<void>((resolve) => { resolveNext = resolve; }); } } // Process any remaining events that were emitted before cancellation. while (events.length > 0) { yield events.shift()!; } } finally { await unsubscribe?.(); } }
이 함수는 모든 이벤트 이미터 또는 게시/구독 시스템에 연결할 수 있는 구독자 기능을 허용합니다. 구독자는 두 가지 필수 방법을 제공합니다:
이 함수는 AsyncGenerator
컨텍스트 개체:
컨텍스트
이벤트 대기열:
이벤트: T[] 배열은 발생한 이벤트를 저장하는 버퍼 역할을 합니다. 생성기는 이러한 이벤트를 하나씩 처리합니다. 대기열에 이벤트가 없으면 다음 이벤트가 방출될 때까지 기다립니다.
논리 방출:
내보내기 기능은 대기열에 새 이벤트를 추가하고 보류 중인 약속을 해결합니다(즉, 생성기가 새 이벤트를 기다리는 경우).
취소:
취소 함수가 호출되면 플래그(cancelled = true)를 설정하여 루프가 종료되어야 한다는 신호를 보냅니다. 대기열에 남아 있는 모든 이벤트는 생성기가 완료되기 전에 계속 처리됩니다.
정리:
취소 후 생성기는 구독 취소 기능(제공된 경우)을 호출하여 필요한 정리를 수행합니다. 이는 Redis와 같은 외부 시스템의 구독을 취소하거나 리소스를 정리할 때 특히 중요합니다.
이 이벤트 반복자를 사용하여 Redis Pub/Sub를 수신하고 수신 메시지를 비동기적으로 반복하는 방법을 살펴보겠습니다.
import Redis from 'ioredis'; function redisEventIterator(channel: string) { const client = new Redis(); return createEventIterator<string>(({ emit, cancel }) => { const messageHandler = (channel: string, message: string) => { emit(message); }; // Subscribe to the channel client.subscribe(channel); client.on('message', messageHandler); // Cleanup function to unsubscribe and disconnect return async () => { client.off('message', messageHandler); await client.unsubscribe(channel); await client.quit(); }; }); } // Usage (async () => { for await (const message of redisEventIterator('my-channel')) { console.log('New message:', message); // You can cancel the event stream if needed if (message === 'STOP') { break; } } })();
이 예에서는 createEventIterator를 사용하여 Redis Pub/Sub 채널을 구독하고 메시지를 비동기식으로 반복합니다. 새 메시지가 도착할 때마다 생성기로 내보내져 실시간으로 처리할 수 있습니다. 특정 메시지(예: "STOP")가 수신되면 루프를 중단하고 Redis 구독을 취소합니다.
Node.js의 EventEmitter와 함께 createEventIterator를 사용하는 방법은 다음과 같습니다.
import { EventEmitter } from 'events'; function eventEmitterIterator(emitter: EventEmitter, eventName: string) { return createEventIterator<string>(({ emit, cancel }) => { const eventHandler = (data: string) => emit(data); emitter.on(eventName, eventHandler); // Cleanup function to remove the listener return () => { emitter.off(eventName, eventHandler); }; }); } // Usage (async () => { const emitter = new EventEmitter(); // Simulate event emissions setTimeout(() => emitter.emit('data', 'First event'), 1000); setTimeout(() => emitter.emit('data', 'Second event'), 2000); setTimeout(() => emitter.emit('data', 'STOP'), 3000); for await (const event of eventEmitterIterator(emitter, 'data')) { console.log('Received event:', event); if (event === 'STOP') { break; } } })();
이 예에서는:
비동기 제어: AsyncGenerator를 활용하면 이벤트를 비동기식으로 처리하고 원하는 속도로 처리하며 필요할 때 처리를 일시 중지할 수 있습니다.
취소: 언제든지 이벤트 스트림을 취소할 수 있으므로 이 접근 방식이 유연해지며, 특히 연결을 정상적으로 닫아야 할 수 있는 실제 시나리오에서 더욱 그렇습니다.
일반 용도: 이 반복자는 모든 이벤트 이미터 또는 Pub/Sub 시스템에 사용할 수 있으므로 다양한 애플리케이션에 다용도로 사용할 수 있습니다.
Event-driven architectures are a cornerstone of many modern web applications, but they can become tricky to manage when we need to control the flow of events asynchronously. With the power of AsyncGenerator in TypeScript, you can build elegant solutions like this event iterator, making your event-handling code cleaner and easier to maintain.
I hope this post helps you get started with async iterators for your own event emitters. If you have any questions or thoughts, feel free to share them in the comments!
The above is the detailed content of Asynchronously Iterating Over Event Emitters in TypeScript with Async Generators. For more information, please follow other related articles on the PHP Chinese website!