비동기 생성기를 사용하여 TypeScript에서 이벤트 이미터를 비동기식으로 반복

Barbara Streisand
풀어 주다: 2024-09-30 06:26:39
원래의
979명이 탐색했습니다.

Asynchronously Iterating Over Event Emitters in TypeScript with Async Generators

介紹

在現代 Web 開發中,我們經常處理事件,無論是處理傳入的 WebSocket 訊息、伺服器發送的事件 (SSE) 還是來自 Redis Pub/Sub 等服務的資料流。雖然 Node.js 提供了事件驅動的功能,但它缺乏一種開箱即用的方法來使用 for wait...of 迴圈非同步迭代事件。

在這篇文章中,我將引導您了解一種簡單且強大的方法,使用 TypeScript 和 AsyncGenerator 建立非同步事件迭代器。這種方法旨在允許您以乾淨且可預測的方式使用來自任何類型事件發射器的事件,並完全控制取消和清理邏輯。

用例:Redis Pub/Sub

在我最近的一個專案中,我需要監聽 Redis Pub/Sub 通道並將伺服器發送的事件 (SSE) 非同步分派給連接的客戶端。面臨的挑戰是在不壓垮系統的情況下處理傳入事件,同時允許消費者隨時取消事件流。

解決方案?一個事件迭代器,可將任何事件發射器(例如 Redis Pub/Sub)轉換為非同步迭代器。這使我們能夠以受控的方式處理事件,並在必要時優雅地處理取消。

讓我們深入了解實作。

守則

export type Context<T> = {
    emit: (value: T) => void;
    cancel: () => void;
};

export type CleanupFn = () => void | Promise<void>;

export type Subscriber<T> = (
    context: Context<T>,
) => void | CleanupFn | Promise<CleanupFn | void>;

export async function* createEventIterator<T>(
    subscriber: Subscriber<T>,
): AsyncGenerator<T> {
    const events: T[] = [];
    let cancelled = false;

    // Create a promise that resolves whenever a new event is added to the events array
    let resolveNext: (() => void) | null = null;

    const emit = (event: T) => {
        events.push(event);
        // If we are awaiting for a new event, resolve the promise
        if (resolveNext) {
            resolveNext();
            resolveNext = null;
        }
    };

    const cancel = () => {
        cancelled = true;
    };

    const unsubscribe = await subscriber({ emit, cancel });

    try {
        while (!cancelled) {
            // If there are events in the queue, yield the next event
            if (events.length > 0) {
                yield events.shift()!;
            } else {
                // Wait for the next event
                await new Promise<void>((resolve) => {
                    resolveNext = resolve;
                });
            }
        }

        // Process any remaining events that were emitted before cancellation.
        while (events.length > 0) {
            yield events.shift()!;
        }
    } finally {
        await unsubscribe?.();
    }
}
로그인 후 복사

它是如何運作的

此函數接受訂閱者函數,您可以將其掛接到任何事件發射器或發布/訂閱系統中。訂閱者提供了兩種基本方法:

  1. emit:允許訂閱者將新事件推送到迭代器中。
  2. 取消:提供一種方式來指示迭代應該停止。

函數傳回一個 AsyncGenerator,讓您可以使用 for wait...of 迴圈迭代事件。

分解代碼

  1. 上下文物件:
    上下文 type 提供了一個介面來發出新事件或取消訂閱。訂閱者使用此上下文來控制事件流。

  2. 事件佇列:
    events: T[] 陣列用作儲存發出的事件的緩衝區。生成器將一一處理這些事件。如果佇列中沒有事件,它將等待下一個事件發出。

  3. 發出邏輯:
    發出函數將新事件新增至佇列並解決任何待處理的承諾(即,如果生成器正在等待新事件)。

  4. 取消
    如果呼叫 cancel 函數,它會設定一個標誌 (cancelled = true) 來表示循環應該退出。在生成器完成之前,佇列中的任何剩餘事件仍將被處理。

  5. 清理
    取消後,生成器將呼叫取消訂閱函數(如果提供)來執行任何必要的清理。這對於取消訂閱 Redis 等外部系統或清理資源尤其重要。

範例:收聽 Redis Pub/Sub

讓我們看看如何使用此事件迭代器來監聽 Redis Pub/Sub 並非同步迭代傳入的訊息。

import Redis from 'ioredis';

function redisEventIterator(channel: string) {
    const client = new Redis();

    return createEventIterator<string>(({ emit, cancel }) => {
        const messageHandler = (channel: string, message: string) => {
            emit(message);
        };

        // Subscribe to the channel
        client.subscribe(channel);
        client.on('message', messageHandler);

        // Cleanup function to unsubscribe and disconnect
        return async () => {
            client.off('message', messageHandler);
            await client.unsubscribe(channel);
            await client.quit();
        };
    });
}

// Usage
(async () => {
    for await (const message of redisEventIterator('my-channel')) {
        console.log('New message:', message);

        // You can cancel the event stream if needed
        if (message === 'STOP') {
            break;
        }
    }
})();
로그인 후 복사

在此範例中,我們使用 createEventIterator 訂閱 Redis Pub/Sub 通道並非同步迭代訊息。每次有新訊息到達時,它都會被發送到生成器中,我們可以在那裡即時處理它。如果收到特定訊息(例如“STOP”),我們會中斷循環並取消訂閱 Redis。

範例:使用 EventEmitter

以下是如何將 createEventIterator 與 Node.js 的 EventEmitter 結合使用:

import { EventEmitter } from 'events';

function eventEmitterIterator(emitter: EventEmitter, eventName: string) {
    return createEventIterator<string>(({ emit, cancel }) => {
        const eventHandler = (data: string) => emit(data);

        emitter.on(eventName, eventHandler);

        // Cleanup function to remove the listener
        return () => {
            emitter.off(eventName, eventHandler);
        };
    });
}

// Usage
(async () => {
    const emitter = new EventEmitter();

    // Simulate event emissions
    setTimeout(() => emitter.emit('data', 'First event'), 1000);
    setTimeout(() => emitter.emit('data', 'Second event'), 2000);
    setTimeout(() => emitter.emit('data', 'STOP'), 3000);

    for await (const event of eventEmitterIterator(emitter, 'data')) {
        console.log('Received event:', event);

        if (event === 'STOP') {
            break;
        }
    }
})();
로그인 후 복사

在此範例中:

  • 我們使用EventEmitter來發出事件,這些事件由createEventIterator捕獲。
  • 迭代器監聽「data」事件並非同步處理它。
  • 與 Redis 範例類似,我們可以在收到特定事件(「STOP」)時停止迭代。

這種方法的好處

  • 非同步控制:透過利用 AsyncGenerator,我們可以非同步處理事件,按照自己的節奏處理它們,並在需要時暫停處理。

  • 取消:隨時取消事件流的能力使這種方法變得靈活,特別是在可能需要正常關閉連接的現實場景中。

  • 通用:此迭代器可用於任何事件發射器或 Pub/Sub 系統,使其適用於不同的應用程式。

결론

이벤트 중심 아키텍처는 많은 최신 웹 애플리케이션의 초석이지만 이벤트 흐름을 비동기적으로 제어해야 할 때 관리하기 까다로울 수 있습니다. TypeScript의 AsyncGenerator 기능을 사용하면 이 이벤트 반복기와 같은 우아한 솔루션을 구축하여 이벤트 처리 코드를 더욱 깔끔하고 유지 관리하기 쉽게 만들 수 있습니다.

이 게시물이 여러분의 이벤트 이미터에 대한 비동기 반복자를 시작하는 데 도움이 되기를 바랍니다. 질문이나 의견이 있으시면 댓글로 자유롭게 공유해주세요!

위 내용은 비동기 생성기를 사용하여 TypeScript에서 이벤트 이미터를 비동기식으로 반복의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

원천:dev.to
본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
저자별 최신 기사
인기 튜토리얼
더>
최신 다운로드
더>
웹 효과
웹사이트 소스 코드
웹사이트 자료
프론트엔드 템플릿