Lors de la conception d'une architecture de microservices pour des applications basées sur des événements, l'intégration d'Apache Kafka et Node.js peut améliorer considérablement les capacités de traitement des données en temps réel . Dans cet article, nous découvrirons comment tirer parti de l'intégration de Kafka Node.js pour créer des microservices robustes et évolutifs qui gèrent efficacement les données en streaming.
Dans une architecture de microservices, les services doivent communiquer efficacement entre eux. Apache Kafka sert de plate-forme de streaming d'événements distribués qui permet l'échange de données en temps réel entre microservices. Il découple les services, leur permettant de fonctionner de manière indépendante tout en traitant de gros volumes de données.
Pour intégrer Apache Kafka et Node.js dans un environnement de microservices, vous devrez configurer Kafka en tant qu'intermédiaire de messages et le connecter à vos services Node.js. Voici un guide étape par étape :
Tout d'abord, assurez-vous que Apache Kafka et Node.js sont installés sur votre système. Vous pouvez installer Kafka & Node.js en suivant les articles suivants :
Pour connecter Node.js à Kafka, vous pouvez utiliser la bibliothèque kafkajs, un client Kafka populaire pour Node.js.
npm install kafkajs
Dans une architecture microservices, un producteur Kafka est chargé d'envoyer des messages à un sujet Kafka. Vous trouverez ci-dessous un exemple simple de la façon de créer un producteur Kafka dans Node.js :
const { Kafka } = require('kafkajs'); const kafka = new Kafka({ clientId: 'my-producer', brokers: ['localhost:9092'] }); const producer = kafka.producer(); const sendMessage = async () => { await producer.connect(); await producer.send({ topic: 'my-topic', messages: [ { value: 'Hello Kafka' }, ], }); await producer.disconnect(); }; sendMessage().catch(console.error);
Un consommateur Kafka est utilisé pour lire les messages d'un sujet Kafka. Voici comment créer un consommateur :
const { Kafka } = require('kafkajs'); const kafka = new Kafka({ clientId: 'my-consumer', brokers: ['localhost:9092'] }); const consumer = kafka.consumer({ groupId: 'my-group' }); const runConsumer = async () => { await consumer.connect(); await consumer.subscribe({ topic: 'my-topic', fromBeginning: true }); await consumer.run({ eachMessage: async ({ topic, partition, message }) => { console.log({ partition, offset: message.offset, value: message.value.toString(), }); }, }); }; runConsumer().catch(console.error);
Pour illustrer l'intégration de Kafka et Node.js dans une architecture de microservices, considérons l'étude de cas suivante :
Nous avons deux microservices :
Chaque fois qu'un achat ou une transaction a lieu dans le Service de commande, le stock sera mis à jour dans le Service produit. Kafka facilite cette communication en agissant comme un intermédiaire de messages.
Le Service des commandes est chargé de traiter les bons de commande et d'envoyer des messages au Service Produits pour mettre à jour le stock. Voici comment mettre en œuvre le Service de commande en tant que producteur Kafka :
// orderService.js const express = require('express'); const { Kafka } = require('kafkajs'); // Kafka producer configuration const kafka = new Kafka({ clientId: 'order-service', brokers: ['localhost:9092'], }); const producer = kafka.producer(); // Initialize Express app const app = express(); app.use(express.json()); const placeOrder = async (orderId, productId, quantity) => { await producer.connect(); const orderEvent = { orderId, productId, quantity, eventType: 'ORDER_PLACED', timestamp: Date.now(), }; await producer.send({ topic: 'product-updates', messages: [{ value: JSON.stringify(orderEvent) }], }); await producer.disconnect(); console.log(`Order placed: ${orderId} for product: ${productId}`); }; // API endpoint to place an order app.post('/order', async (req, res) => { const { orderId, productId, quantity } = req.body; if (!orderId || !productId || !quantity) { return res.status(400).json({ error: 'Missing orderId, productId, or quantity' }); } try { await placeOrder(orderId, productId, quantity); res.status(200).json({ message: `Order ${orderId} placed successfully.` }); } catch (error) { console.error('Error placing order:', error); res.status(500).json({ error: 'Failed to place order' }); } }); // Start the server const PORT = process.env.PORT || 3000; app.listen(PORT, () => { console.log(`Order Service API running on port ${PORT}`); });
Le Service Produit consomme les messages du sujet Kafka de mises à jour des produits et met à jour le stock de produits en conséquence. Voici la mise en œuvre :
// productService.js const express = require('express'); const { Kafka } = require('kafkajs'); // Kafka consumer configuration const kafka = new Kafka({ clientId: 'product-service', brokers: ['localhost:9092'], }); const consumer = kafka.consumer({ groupId: 'product-group' }); // Initialize Express app const app = express(); app.use(express.json()); const updateStock = async () => { await consumer.connect(); await consumer.subscribe({ topic: 'product-updates', fromBeginning: true }); await consumer.run({ eachMessage: async ({ topic, partition, message }) => { const orderEvent = JSON.parse(message.value.toString()); console.log(`Received order: ${orderEvent.orderId}, Product: ${orderEvent.productId}, Quantity: ${orderEvent.quantity}`); // Simulate stock update console.log(`Updating stock for product: ${orderEvent.productId}`); // logic to update stock }, }); }; // Start the Product Service to listen for messages updateStock().catch(console.error); // Start the server const PORT = process.env.PORT || 3001; app.listen(PORT, () => { console.log(`Product Service API running on port ${PORT}`); });
Démarrez d'abord le Service produit, car il doit écouter les messages entrants :
node productService.js
Le Service produit commencera à écouter sur le port 3001 (ou un autre port si spécifié).
Démarrez le Service de commande avec cette commande :
node orderService.js
Le Service de commande sera disponible sur le port 3000 (ou un autre port si spécifié).
Vous pouvez passer une commande en envoyant une requête POST à l'API du Order Service :
curl -X POST http://localhost:3000/order \ -H "Content-Type: application/json" \ -d '{ "orderId": "order-789", "productId": "product-123", "quantity": 5 }'
When an order is placed, the Order Service will send a Kafka message, and the Product Service will consume that message to update the stock:
Received order: order-789, Product: product-123, Quantity: 5 Updating stock for product: product-123
Integrating Apache Kafka and Node.js in your microservices architecture allows you to build highly scalable and resilient event-driven applications.
By following best practices and leveraging Kafka’s powerful features, you can efficiently process real-time data and create a robust communication layer between your microservices.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!