Dans le développement Web moderne, nous traitons souvent d'événements, qu'il s'agisse de la gestion des messages WebSocket entrants, des événements envoyés par le serveur (SSE) ou des flux de données provenant de services comme Redis Pub/Sub. Bien que Node.js fournisse des fonctionnalités basées sur les événements, il lui manque un moyen prêt à l'emploi pour parcourir les événements de manière asynchrone à l'aide des boucles for wait...of.
Dans cet article, je vais vous expliquer un moyen simple mais puissant de créer un itérateur d'événement asynchrone à l'aide de TypeScript et AsyncGenerator. Cette approche est conçue pour vous permettre de consommer des événements provenant de tout type d'émetteur d'événements de manière propre et prévisible, avec un contrôle total sur la logique d'annulation et de nettoyage.
Dans l'un de mes projets récents, j'avais besoin d'écouter les canaux Redis Pub/Sub et de distribuer des événements envoyés par le serveur (SSE) de manière asynchrone aux clients connectés. Le défi consistait à gérer les événements entrants sans surcharger le système tout en permettant au consommateur d'annuler le flux d'événements à tout moment.
La solution ? Un itérateur d'événement qui convertit n'importe quel émetteur d'événement (tel que Redis Pub/Sub) en un itérable asynchrone. Cela nous permet de traiter les événements de manière contrôlée et de gérer avec élégance les annulations si nécessaire.
Plongeons-nous dans la mise en œuvre.
export type Context<T> = { emit: (value: T) => void; cancel: () => void; }; export type CleanupFn = () => void | Promise<void>; export type Subscriber<T> = ( context: Context<T>, ) => void | CleanupFn | Promise<CleanupFn | void>; export async function* createEventIterator<T>( subscriber: Subscriber<T>, ): AsyncGenerator<T> { const events: T[] = []; let cancelled = false; // Create a promise that resolves whenever a new event is added to the events array let resolveNext: (() => void) | null = null; const emit = (event: T) => { events.push(event); // If we are awaiting for a new event, resolve the promise if (resolveNext) { resolveNext(); resolveNext = null; } }; const cancel = () => { cancelled = true; }; const unsubscribe = await subscriber({ emit, cancel }); try { while (!cancelled) { // If there are events in the queue, yield the next event if (events.length > 0) { yield events.shift()!; } else { // Wait for the next event await new Promise<void>((resolve) => { resolveNext = resolve; }); } } // Process any remaining events that were emitted before cancellation. while (events.length > 0) { yield events.shift()!; } } finally { await unsubscribe?.(); } }
Cette fonction accepte une fonction d'abonné que vous pouvez connecter à n'importe quel émetteur d'événements ou système pub/sous. L'abonné propose deux méthodes essentielles :
La fonction renvoie un AsyncGenerator
Objet contextuel :
Le contexte
File d'attente des événements :
Le tableau events: T[] sert de tampon pour stocker les événements émis. Le générateur traitera ces événements un par un. S'il n'y a aucun événement dans la file d'attente, il attendra que le prochain événement soit émis.
Émettre la logique :
La fonction d'émission ajoute de nouveaux événements à la file d'attente et résout toute promesse en attente (c'est-à-dire si le générateur attend de nouveaux événements).
Annulation :
Si la fonction d'annulation est appelée, elle définit un indicateur (annulé = vrai) pour signaler que la boucle doit se terminer. Tous les événements restants dans la file d'attente seront toujours traités avant la fin du générateur.
Nettoyage :
Après l'annulation, le générateur invoquera la fonction de désabonnement (si fournie) pour effectuer tout nettoyage nécessaire. Ceci est particulièrement important pour se désinscrire de systèmes externes comme Redis ou pour nettoyer les ressources.
Voyons comment nous pouvons utiliser cet itérateur d'événement pour écouter Redis Pub/Sub et parcourir de manière asynchrone les messages entrants.
import Redis from 'ioredis'; function redisEventIterator(channel: string) { const client = new Redis(); return createEventIterator<string>(({ emit, cancel }) => { const messageHandler = (channel: string, message: string) => { emit(message); }; // Subscribe to the channel client.subscribe(channel); client.on('message', messageHandler); // Cleanup function to unsubscribe and disconnect return async () => { client.off('message', messageHandler); await client.unsubscribe(channel); await client.quit(); }; }); } // Usage (async () => { for await (const message of redisEventIterator('my-channel')) { console.log('New message:', message); // You can cancel the event stream if needed if (message === 'STOP') { break; } } })();
Dans cet exemple, nous utilisons createEventIterator pour nous abonner à un canal Redis Pub/Sub et parcourir les messages de manière asynchrone. Chaque fois qu'un nouveau message arrive, il est émis dans le générateur, où nous pouvons le traiter en temps réel. Si un message spécifique (par exemple, "STOP") est reçu, nous rompons la boucle et nous désabonnons de Redis.
Voici comment utiliser createEventIterator avec EventEmitter de Node.js :
import { EventEmitter } from 'events'; function eventEmitterIterator(emitter: EventEmitter, eventName: string) { return createEventIterator<string>(({ emit, cancel }) => { const eventHandler = (data: string) => emit(data); emitter.on(eventName, eventHandler); // Cleanup function to remove the listener return () => { emitter.off(eventName, eventHandler); }; }); } // Usage (async () => { const emitter = new EventEmitter(); // Simulate event emissions setTimeout(() => emitter.emit('data', 'First event'), 1000); setTimeout(() => emitter.emit('data', 'Second event'), 2000); setTimeout(() => emitter.emit('data', 'STOP'), 3000); for await (const event of eventEmitterIterator(emitter, 'data')) { console.log('Received event:', event); if (event === 'STOP') { break; } } })();
Dans cet exemple :
Contrôle asynchrone : en tirant parti d'AsyncGenerator, nous pouvons gérer les événements de manière asynchrone, les traiter à notre propre rythme et suspendre le traitement si nécessaire.
Annulation : la possibilité d'annuler le flux d'événements à tout moment rend cette approche flexible, en particulier dans les scénarios réels où les connexions peuvent devoir être fermées correctement.
Utilisation générale : cet itérateur peut être utilisé pour n'importe quel émetteur d'événements ou système Pub/Sub, ce qui le rend polyvalent pour différentes applications.
イベント駆動型アーキテクチャは、多くの最新の Web アプリケーションの基礎ですが、イベント フローを非同期に制御する必要がある場合、管理が難しくなる可能性があります。 TypeScript の AsyncGenerator の機能を利用すると、このイベント イテレータのような洗練されたソリューションを構築でき、イベント処理コードがよりクリーンになり、保守が容易になります。
この投稿が、独自のイベント エミッターの非同期イテレーターの使用を開始するのに役立つことを願っています。ご質問やご意見がございましたら、お気軽にコメント欄で共有してください!
以上が非同期ジェネレーターを使用した TypeScript でのイベント エミッターの非同期反復の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。