Home > Web Front-end > JS Tutorial > Asynchronously Iterating Over Event Emitters in TypeScript with Async Generators

Asynchronously Iterating Over Event Emitters in TypeScript with Async Generators

Barbara Streisand
Release: 2024-09-30 06:26:39
Original
990 people have browsed it

Asynchronously Iterating Over Event Emitters in TypeScript with Async Generators

소개

현대 웹 개발에서는 들어오는 WebSocket 메시지, 서버 전송 이벤트(SSE) 또는 Redis Pub/Sub와 같은 서비스의 데이터 스트림 처리 등 이벤트를 처리하는 경우가 많습니다. Node.js는 이벤트 기반 기능을 제공하지만 for wait...of 루프를 사용하여 이벤트를 비동기적으로 반복하는 기본 방법이 부족합니다.

이 게시물에서는 TypeScript와 AsyncGenerator를 사용하여 비동기 이벤트 반복자를 생성하는 간단하면서도 강력한 방법을 안내해 드리겠습니다. 이 접근 방식은 취소 및 정리 로직을 완벽하게 제어하면서 깔끔하고 예측 가능한 방식으로 모든 종류의 이벤트 이미터의 이벤트를 사용할 수 있도록 설계되었습니다.

사용 사례: Redis Pub/Sub

최근 프로젝트 중 하나에서는 Redis Pub/Sub 채널을 수신하고 서버 전송 이벤트(SSE)를 연결된 클라이언트에 비동기식으로 전달해야 했습니다. 문제는 소비자가 언제든지 이벤트 스트림을 취소할 수 있도록 하면서 시스템에 부담을 주지 않고 들어오는 이벤트를 처리하는 것이었습니다.

해결책은? 이벤트 이미터(예: Redis Pub/Sub)를 비동기 반복 가능 항목으로 변환하는 이벤트 반복기입니다. 이를 통해 이벤트를 통제된 방식으로 처리하고 필요한 경우 취소를 원활하게 처리할 수 있습니다.

구현에 대해 자세히 살펴보겠습니다.

코드

export type Context<T> = {
    emit: (value: T) => void;
    cancel: () => void;
};

export type CleanupFn = () => void | Promise<void>;

export type Subscriber<T> = (
    context: Context<T>,
) => void | CleanupFn | Promise<CleanupFn | void>;

export async function* createEventIterator<T>(
    subscriber: Subscriber<T>,
): AsyncGenerator<T> {
    const events: T[] = [];
    let cancelled = false;

    // Create a promise that resolves whenever a new event is added to the events array
    let resolveNext: (() => void) | null = null;

    const emit = (event: T) => {
        events.push(event);
        // If we are awaiting for a new event, resolve the promise
        if (resolveNext) {
            resolveNext();
            resolveNext = null;
        }
    };

    const cancel = () => {
        cancelled = true;
    };

    const unsubscribe = await subscriber({ emit, cancel });

    try {
        while (!cancelled) {
            // If there are events in the queue, yield the next event
            if (events.length > 0) {
                yield events.shift()!;
            } else {
                // Wait for the next event
                await new Promise<void>((resolve) => {
                    resolveNext = resolve;
                });
            }
        }

        // Process any remaining events that were emitted before cancellation.
        while (events.length > 0) {
            yield events.shift()!;
        }
    } finally {
        await unsubscribe?.();
    }
}
Copy after login

작동 방식

이 함수는 모든 이벤트 이미터 또는 게시/구독 시스템에 연결할 수 있는 구독자 기능을 허용합니다. 구독자는 두 가지 필수 방법을 제공합니다:

  1. emit: 구독자가 새 이벤트를 반복자에 푸시할 수 있습니다.
  2. 취소: 반복을 중지해야 한다는 신호를 보내는 방법을 제공합니다.

이 함수는 AsyncGenerator를 반환하므로 for wait...of 루프를 사용하여 이벤트를 반복할 수 있습니다.

코드 분석

  1. 컨텍스트 개체:
    컨텍스트 type은 새 이벤트를 내보내거나 구독을 취소하는 인터페이스를 제공합니다. 구독자는 이 컨텍스트를 사용하여 이벤트 흐름을 제어합니다.

  2. 이벤트 대기열:
    이벤트: T[] 배열은 발생한 이벤트를 저장하는 버퍼 역할을 합니다. 생성기는 이러한 이벤트를 하나씩 처리합니다. 대기열에 이벤트가 없으면 다음 이벤트가 방출될 때까지 기다립니다.

  3. 논리 방출:
    내보내기 기능은 대기열에 새 이벤트를 추가하고 보류 중인 약속을 해결합니다(즉, 생성기가 새 이벤트를 기다리는 경우).

  4. 취소:
    취소 함수가 호출되면 플래그(cancelled = true)를 설정하여 루프가 종료되어야 한다는 신호를 보냅니다. 대기열에 남아 있는 모든 이벤트는 생성기가 완료되기 전에 계속 처리됩니다.

  5. 정리:
    취소 후 생성기는 구독 취소 기능(제공된 경우)을 호출하여 필요한 정리를 수행합니다. 이는 Redis와 같은 외부 시스템의 구독을 취소하거나 리소스를 정리할 때 특히 중요합니다.

예: Redis Pub/Sub 듣기

이 이벤트 반복자를 사용하여 Redis Pub/Sub를 수신하고 수신 메시지를 비동기적으로 반복하는 방법을 살펴보겠습니다.

import Redis from 'ioredis';

function redisEventIterator(channel: string) {
    const client = new Redis();

    return createEventIterator<string>(({ emit, cancel }) => {
        const messageHandler = (channel: string, message: string) => {
            emit(message);
        };

        // Subscribe to the channel
        client.subscribe(channel);
        client.on('message', messageHandler);

        // Cleanup function to unsubscribe and disconnect
        return async () => {
            client.off('message', messageHandler);
            await client.unsubscribe(channel);
            await client.quit();
        };
    });
}

// Usage
(async () => {
    for await (const message of redisEventIterator('my-channel')) {
        console.log('New message:', message);

        // You can cancel the event stream if needed
        if (message === 'STOP') {
            break;
        }
    }
})();
Copy after login

이 예에서는 createEventIterator를 사용하여 Redis Pub/Sub 채널을 구독하고 메시지를 비동기식으로 반복합니다. 새 메시지가 도착할 때마다 생성기로 내보내져 실시간으로 처리할 수 있습니다. 특정 메시지(예: "STOP")가 수신되면 루프를 중단하고 Redis 구독을 취소합니다.

예: EventEmitter 사용

Node.js의 EventEmitter와 함께 createEventIterator를 사용하는 방법은 다음과 같습니다.

import { EventEmitter } from 'events';

function eventEmitterIterator(emitter: EventEmitter, eventName: string) {
    return createEventIterator<string>(({ emit, cancel }) => {
        const eventHandler = (data: string) => emit(data);

        emitter.on(eventName, eventHandler);

        // Cleanup function to remove the listener
        return () => {
            emitter.off(eventName, eventHandler);
        };
    });
}

// Usage
(async () => {
    const emitter = new EventEmitter();

    // Simulate event emissions
    setTimeout(() => emitter.emit('data', 'First event'), 1000);
    setTimeout(() => emitter.emit('data', 'Second event'), 2000);
    setTimeout(() => emitter.emit('data', 'STOP'), 3000);

    for await (const event of eventEmitterIterator(emitter, 'data')) {
        console.log('Received event:', event);

        if (event === 'STOP') {
            break;
        }
    }
})();
Copy after login

이 예에서는:

  • EventEmitter를 사용하여 createEventIterator에 의해 캡처되는 이벤트를 내보냅니다.
  • 반복자는 '데이터' 이벤트를 수신하고 이를 비동기적으로 처리합니다.
  • Redis 예시와 유사하게 특정 이벤트('STOP')가 수신되면 반복을 중지할 수 있습니다.

이 접근 방식의 이점

  • 비동기 제어: AsyncGenerator를 활용하면 이벤트를 비동기식으로 처리하고 원하는 속도로 처리하며 필요할 때 처리를 일시 중지할 수 있습니다.

  • 취소: 언제든지 이벤트 스트림을 취소할 수 있으므로 이 접근 방식이 유연해지며, 특히 연결을 정상적으로 닫아야 할 수 있는 실제 시나리오에서 더욱 그렇습니다.

  • 일반 용도: 이 반복자는 모든 이벤트 이미터 또는 Pub/Sub 시스템에 사용할 수 있으므로 다양한 애플리케이션에 다용도로 사용할 수 있습니다.

Conclusion

Event-driven architectures are a cornerstone of many modern web applications, but they can become tricky to manage when we need to control the flow of events asynchronously. With the power of AsyncGenerator in TypeScript, you can build elegant solutions like this event iterator, making your event-handling code cleaner and easier to maintain.

I hope this post helps you get started with async iterators for your own event emitters. If you have any questions or thoughts, feel free to share them in the comments!

The above is the detailed content of Asynchronously Iterating Over Event Emitters in TypeScript with Async Generators. For more information, please follow other related articles on the PHP Chinese website!

source:dev.to
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Latest Articles by Author
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template