首頁 > web前端 > js教程 > 使用非同步生成器在 TypeScript 中非同步迭代事件發射器

使用非同步生成器在 TypeScript 中非同步迭代事件發射器

Barbara Streisand
發布: 2024-09-30 06:26:39
原創
1006 人瀏覽過

Asynchronously Iterating Over Event Emitters in TypeScript with Async Generators

介绍

在现代 Web 开发中,我们经常处理事件,无论是处理传入的 WebSocket 消息、服务器发送的事件 (SSE) 还是来自 Redis Pub/Sub 等服务的数据流。虽然 Node.js 提供了事件驱动的功能,但它缺乏一种开箱即用的方法来使用 for wait...of 循环异步迭代事件。

在这篇文章中,我将引导您了解一种简单而强大的方法,使用 TypeScript 和 AsyncGenerator 创建异步事件迭代器。这种方法旨在允许您以干净且可预测的方式使用来自任何类型事件发射器的事件,并完全控制取消和清理逻辑。

用例:Redis Pub/Sub

在我最近的一个项目中,我需要监听 Redis Pub/Sub 通道并将服务器发送的事件 (SSE) 异步分派给连接的客户端。面临的挑战是在不压垮系统的情况下处理传入事件,同时允许消费者随时取消事件流。

解决方案?一个事件迭代器,可将任何事件发射器(例如 Redis Pub/Sub)转换为异步迭代器。这使我们能够以受控的方式处理事件,并在必要时优雅地处理取消。

让我们深入了解实施。

守则

export type Context<T> = {
    emit: (value: T) => void;
    cancel: () => void;
};

export type CleanupFn = () => void | Promise<void>;

export type Subscriber<T> = (
    context: Context<T>,
) => void | CleanupFn | Promise<CleanupFn | void>;

export async function* createEventIterator<T>(
    subscriber: Subscriber<T>,
): AsyncGenerator<T> {
    const events: T[] = [];
    let cancelled = false;

    // Create a promise that resolves whenever a new event is added to the events array
    let resolveNext: (() => void) | null = null;

    const emit = (event: T) => {
        events.push(event);
        // If we are awaiting for a new event, resolve the promise
        if (resolveNext) {
            resolveNext();
            resolveNext = null;
        }
    };

    const cancel = () => {
        cancelled = true;
    };

    const unsubscribe = await subscriber({ emit, cancel });

    try {
        while (!cancelled) {
            // If there are events in the queue, yield the next event
            if (events.length > 0) {
                yield events.shift()!;
            } else {
                // Wait for the next event
                await new Promise<void>((resolve) => {
                    resolveNext = resolve;
                });
            }
        }

        // Process any remaining events that were emitted before cancellation.
        while (events.length > 0) {
            yield events.shift()!;
        }
    } finally {
        await unsubscribe?.();
    }
}
登入後複製

它是如何运作的

此函数接受订阅者函数,您可以将其挂接到任何事件发射器或发布/订阅系统中。订阅者提供了两个基本方法:

  1. emit:允许订阅者将新事件推送到迭代器中。
  2. 取消:提供一种方式来指示迭代应该停止。

该函数返回一个 AsyncGenerator,允许您使用 for wait...of 循环迭代事件。

分解代码

  1. 上下文对象:
    上下文 type 提供了一个接口来发出新事件或取消订阅。订阅者使用此上下文来控制事件流。

  2. 事件队列:
    events: T[] 数组用作存储发出的事件的缓冲区。生成器将一一处理这些事件。如果队列中没有事件,它将等待下一个事件发出。

  3. 发出逻辑:
    发出函数将新事件添加到队列并解决任何待处理的承诺(即,如果生成器正在等待新事件)。

  4. 取消
    如果调用 cancel 函数,它会设置一个标志 (cancelled = true) 来表示循环应该退出。在生成器完成之前,队列中的任何剩余事件仍将被处理。

  5. 清理
    取消后,生成器将调用取消订阅函数(如果提供)来执行任何必要的清理。这对于取消订阅 Redis 等外部系统或清理资源尤其重要。

示例:收听 Redis Pub/Sub

让我们看看如何使用此事件迭代器来监听 Redis Pub/Sub 并异步迭代传入的消息。

import Redis from 'ioredis';

function redisEventIterator(channel: string) {
    const client = new Redis();

    return createEventIterator<string>(({ emit, cancel }) => {
        const messageHandler = (channel: string, message: string) => {
            emit(message);
        };

        // Subscribe to the channel
        client.subscribe(channel);
        client.on('message', messageHandler);

        // Cleanup function to unsubscribe and disconnect
        return async () => {
            client.off('message', messageHandler);
            await client.unsubscribe(channel);
            await client.quit();
        };
    });
}

// Usage
(async () => {
    for await (const message of redisEventIterator('my-channel')) {
        console.log('New message:', message);

        // You can cancel the event stream if needed
        if (message === 'STOP') {
            break;
        }
    }
})();
登入後複製

在此示例中,我们使用 createEventIterator 订阅 Redis Pub/Sub 通道并异步迭代消息。每次有新消息到达时,它都会被发送到生成器中,我们可以在那里实时处理它。如果收到特定消息(例如“STOP”),我们会中断循环并取消订阅 Redis。

示例:使用 EventEmitter

以下是如何将 createEventIterator 与 Node.js 的 EventEmitter 结合使用:

import { EventEmitter } from 'events';

function eventEmitterIterator(emitter: EventEmitter, eventName: string) {
    return createEventIterator<string>(({ emit, cancel }) => {
        const eventHandler = (data: string) => emit(data);

        emitter.on(eventName, eventHandler);

        // Cleanup function to remove the listener
        return () => {
            emitter.off(eventName, eventHandler);
        };
    });
}

// Usage
(async () => {
    const emitter = new EventEmitter();

    // Simulate event emissions
    setTimeout(() => emitter.emit('data', 'First event'), 1000);
    setTimeout(() => emitter.emit('data', 'Second event'), 2000);
    setTimeout(() => emitter.emit('data', 'STOP'), 3000);

    for await (const event of eventEmitterIterator(emitter, 'data')) {
        console.log('Received event:', event);

        if (event === 'STOP') {
            break;
        }
    }
})();
登入後複製

在此示例中:

  • 我们使用EventEmitter来发出事件,这些事件由createEventIterator捕获。
  • 迭代器监听“data”事件并异步处理它。
  • 与 Redis 示例类似,我们可以在收到特定事件(“STOP”)时停止迭代。

这种方法的好处

  • 异步控制:通过利用 AsyncGenerator,我们可以异步处理事件,按照自己的节奏处理它们,并在需要时暂停处理。

  • 取消:随时取消事件流的能力使这种方法变得灵活,特别是在可能需要正常关闭连接的现实场景中。

  • 通用:此迭代器可用于任何事件发射器或 Pub/Sub 系统,使其适用于不同的应用程序。

結論

事件驅動架構是許多現代 Web 應用程式的基石,但當我們需要非同步控制事件流時,它們的管理可能會變得棘手。透過 TypeScript 中 AsyncGenerator 的強大功能,您可以建立像此事件迭代器這樣的優雅解決方案,讓您的事件處理程式碼更乾淨、更易於維護。

我希望這篇文章可以幫助您開始為自己的事件發射器使用非同步迭代器。如果您有任何問題或想法,請隨時在評論中分享!

以上是使用非同步生成器在 TypeScript 中非同步迭代事件發射器的詳細內容。更多資訊請關注PHP中文網其他相關文章!

來源:dev.to
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
作者最新文章
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板