非同期ジェネレーターを使用した TypeScript でのイベント エミッターの非同期反復

Barbara Streisand
リリース: 2024-09-30 06:26:39
オリジナル
979 人が閲覧しました

Asynchronously Iterating Over Event Emitters in TypeScript with Async Generators

Introduction

Dans le développement Web moderne, nous traitons souvent d'événements, qu'il s'agisse de la gestion des messages WebSocket entrants, des événements envoyés par le serveur (SSE) ou des flux de données provenant de services comme Redis Pub/Sub. Bien que Node.js fournisse des fonctionnalités basées sur les événements, il lui manque un moyen prêt à l'emploi pour parcourir les événements de manière asynchrone à l'aide des boucles for wait...of.

Dans cet article, je vais vous expliquer un moyen simple mais puissant de créer un itérateur d'événement asynchrone à l'aide de TypeScript et AsyncGenerator. Cette approche est conçue pour vous permettre de consommer des événements provenant de tout type d'émetteur d'événements de manière propre et prévisible, avec un contrôle total sur la logique d'annulation et de nettoyage.

Le cas d'utilisation : Redis Pub/Sub

Dans l'un de mes projets récents, j'avais besoin d'écouter les canaux Redis Pub/Sub et de distribuer des événements envoyés par le serveur (SSE) de manière asynchrone aux clients connectés. Le défi consistait à gérer les événements entrants sans surcharger le système tout en permettant au consommateur d'annuler le flux d'événements à tout moment.

La solution ? Un itérateur d'événement qui convertit n'importe quel émetteur d'événement (tel que Redis Pub/Sub) en un itérable asynchrone. Cela nous permet de traiter les événements de manière contrôlée et de gérer avec élégance les annulations si nécessaire.

Plongeons-nous dans la mise en œuvre.

Le code

export type Context<T> = {
    emit: (value: T) => void;
    cancel: () => void;
};

export type CleanupFn = () => void | Promise<void>;

export type Subscriber<T> = (
    context: Context<T>,
) => void | CleanupFn | Promise<CleanupFn | void>;

export async function* createEventIterator<T>(
    subscriber: Subscriber<T>,
): AsyncGenerator<T> {
    const events: T[] = [];
    let cancelled = false;

    // Create a promise that resolves whenever a new event is added to the events array
    let resolveNext: (() => void) | null = null;

    const emit = (event: T) => {
        events.push(event);
        // If we are awaiting for a new event, resolve the promise
        if (resolveNext) {
            resolveNext();
            resolveNext = null;
        }
    };

    const cancel = () => {
        cancelled = true;
    };

    const unsubscribe = await subscriber({ emit, cancel });

    try {
        while (!cancelled) {
            // If there are events in the queue, yield the next event
            if (events.length > 0) {
                yield events.shift()!;
            } else {
                // Wait for the next event
                await new Promise<void>((resolve) => {
                    resolveNext = resolve;
                });
            }
        }

        // Process any remaining events that were emitted before cancellation.
        while (events.length > 0) {
            yield events.shift()!;
        }
    } finally {
        await unsubscribe?.();
    }
}
ログイン後にコピー

Comment ça marche

Cette fonction accepte une fonction d'abonné que vous pouvez connecter à n'importe quel émetteur d'événements ou système pub/sous. L'abonné propose deux méthodes essentielles :

  1. emit : permet à l'abonné de pousser de nouveaux événements dans l'itérateur.
  2. annuler : Fournit un moyen de signaler que l'itération doit s'arrêter.

La fonction renvoie un AsyncGenerator, vous permettant de parcourir les événements à l'aide d'une boucle for wait...of.

Décrypter le code

  1. Objet contextuel :
    Le contexte type fournit une interface pour émettre de nouveaux événements ou annuler l'abonnement. L'abonné utilise ce contexte pour contrôler le flux des événements.

  2. File d'attente des événements :
    Le tableau events: T[] sert de tampon pour stocker les événements émis. Le générateur traitera ces événements un par un. S'il n'y a aucun événement dans la file d'attente, il attendra que le prochain événement soit émis.

  3. Émettre la logique :
    La fonction d'émission ajoute de nouveaux événements à la file d'attente et résout toute promesse en attente (c'est-à-dire si le générateur attend de nouveaux événements).

  4. Annulation :
    Si la fonction d'annulation est appelée, elle définit un indicateur (annulé = vrai) pour signaler que la boucle doit se terminer. Tous les événements restants dans la file d'attente seront toujours traités avant la fin du générateur.

  5. Nettoyage :
    Après l'annulation, le générateur invoquera la fonction de désabonnement (si fournie) pour effectuer tout nettoyage nécessaire. Ceci est particulièrement important pour se désinscrire de systèmes externes comme Redis ou pour nettoyer les ressources.

Exemple : écoute de Redis Pub/Sub

Voyons comment nous pouvons utiliser cet itérateur d'événement pour écouter Redis Pub/Sub et parcourir de manière asynchrone les messages entrants.

import Redis from 'ioredis';

function redisEventIterator(channel: string) {
    const client = new Redis();

    return createEventIterator<string>(({ emit, cancel }) => {
        const messageHandler = (channel: string, message: string) => {
            emit(message);
        };

        // Subscribe to the channel
        client.subscribe(channel);
        client.on('message', messageHandler);

        // Cleanup function to unsubscribe and disconnect
        return async () => {
            client.off('message', messageHandler);
            await client.unsubscribe(channel);
            await client.quit();
        };
    });
}

// Usage
(async () => {
    for await (const message of redisEventIterator('my-channel')) {
        console.log('New message:', message);

        // You can cancel the event stream if needed
        if (message === 'STOP') {
            break;
        }
    }
})();
ログイン後にコピー

Dans cet exemple, nous utilisons createEventIterator pour nous abonner à un canal Redis Pub/Sub et parcourir les messages de manière asynchrone. Chaque fois qu'un nouveau message arrive, il est émis dans le générateur, où nous pouvons le traiter en temps réel. Si un message spécifique (par exemple, "STOP") est reçu, nous rompons la boucle et nous désabonnons de Redis.

Exemple : utilisation d'EventEmitter

Voici comment utiliser createEventIterator avec EventEmitter de Node.js :

import { EventEmitter } from 'events';

function eventEmitterIterator(emitter: EventEmitter, eventName: string) {
    return createEventIterator<string>(({ emit, cancel }) => {
        const eventHandler = (data: string) => emit(data);

        emitter.on(eventName, eventHandler);

        // Cleanup function to remove the listener
        return () => {
            emitter.off(eventName, eventHandler);
        };
    });
}

// Usage
(async () => {
    const emitter = new EventEmitter();

    // Simulate event emissions
    setTimeout(() => emitter.emit('data', 'First event'), 1000);
    setTimeout(() => emitter.emit('data', 'Second event'), 2000);
    setTimeout(() => emitter.emit('data', 'STOP'), 3000);

    for await (const event of eventEmitterIterator(emitter, 'data')) {
        console.log('Received event:', event);

        if (event === 'STOP') {
            break;
        }
    }
})();
ログイン後にコピー

Dans cet exemple :

  • Nous utilisons EventEmitter pour émettre des événements, qui sont capturés par createEventIterator.
  • L'itérateur écoute l'événement 'data' et le traite de manière asynchrone.
  • Semblable à l'exemple Redis, nous pouvons arrêter l'itération lorsqu'un événement spécifique (« STOP ») est reçu.

Avantages de cette approche

  • Contrôle asynchrone : en tirant parti d'AsyncGenerator, nous pouvons gérer les événements de manière asynchrone, les traiter à notre propre rythme et suspendre le traitement si nécessaire.

  • Annulation : la possibilité d'annuler le flux d'événements à tout moment rend cette approche flexible, en particulier dans les scénarios réels où les connexions peuvent devoir être fermées correctement.

  • Utilisation générale : cet itérateur peut être utilisé pour n'importe quel émetteur d'événements ou système Pub/Sub, ce qui le rend polyvalent pour différentes applications.

結論

イベント駆動型アーキテクチャは、多くの最新の Web アプリケーションの基礎ですが、イベント フローを非同期に制御する必要がある場合、管理が難しくなる可能性があります。 TypeScript の AsyncGenerator の機能を利用すると、このイベント イテレータのような洗練されたソリューションを構築でき、イベント処理コードがよりクリーンになり、保守が容易になります。

この投稿が、独自のイベント エミッターの非同期イテレーターの使用を開始するのに役立つことを願っています。ご質問やご意見がございましたら、お気軽にコメント欄で共有してください!

以上が非同期ジェネレーターを使用した TypeScript でのイベント エミッターの非同期反復の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ソース:dev.to
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
著者別の最新記事
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート