イベント駆動型アプリケーション用のマイクロサービス アーキテクチャを設計する場合、Apache Kafka と Node.js を統合すると、リアルタイム データ処理機能が大幅に強化されます。 。この記事では、Kafka Node.js 統合を活用して、ストリーミング データを効率的に処理する堅牢でスケーラブルなマイクロサービスを構築する方法を検討します。
マイクロサービス アーキテクチャでは、サービスは相互に効率的に通信する必要があります。 Apache Kafka は、マイクロサービス間のリアルタイムのデータ交換を可能にする分散イベント ストリーミング プラットフォームとして機能します。サービスを分離し、大量のデータを処理しながら独立して動作できるようにします。
マイクロサービス環境で Apache Kafka と Node.js を統合するには、Kafka をメッセージ ブローカーとして設定し、Node.js サービスに接続する必要があります。ステップバイステップのガイドは次のとおりです:
まず、Apache Kafka と Node.js がシステムにインストールされていることを確認します。次の記事に従って Kafka と Node.js をインストールできます:
Node.js を Kafka に接続するには、Node.js 用の人気のある Kafka クライアントである kafkajs ライブラリを使用できます。
npm install kafkajs
マイクロサービス アーキテクチャでは、Kafka プロデューサーは Kafka トピックにメッセージを送信する責任があります。以下は、Node.js で Kafka プロデューサーを作成する方法の簡単な例です:
const { Kafka } = require('kafkajs'); const kafka = new Kafka({ clientId: 'my-producer', brokers: ['localhost:9092'] }); const producer = kafka.producer(); const sendMessage = async () => { await producer.connect(); await producer.send({ topic: 'my-topic', messages: [ { value: 'Hello Kafka' }, ], }); await producer.disconnect(); }; sendMessage().catch(console.error);
Kafka コンシューマーは、Kafka トピックからメッセージを読み取るために使用されます。コンシューマを作成する方法は次のとおりです:
const { Kafka } = require('kafkajs'); const kafka = new Kafka({ clientId: 'my-consumer', brokers: ['localhost:9092'] }); const consumer = kafka.consumer({ groupId: 'my-group' }); const runConsumer = async () => { await consumer.connect(); await consumer.subscribe({ topic: 'my-topic', fromBeginning: true }); await consumer.run({ eachMessage: async ({ topic, partition, message }) => { console.log({ partition, offset: message.offset, value: message.value.toString(), }); }, }); }; runConsumer().catch(console.error);
マイクロサービス アーキテクチャにおける Kafka と Node.js の統合を説明するために、次のケース スタディを検討してください。
2 つのマイクロサービスがあります:
注文サービスで購入または取引が発生すると、製品サービスの在庫が更新されます。 Kafka は、メッセージ ブローカーとして機能することでこの通信を促進します。
注文サービスは、注文書を処理し、在庫を更新するために製品サービスにメッセージを送信する責任があります。 Kafka プロデューサーとして Order Service を実装する方法は次のとおりです。
// orderService.js const express = require('express'); const { Kafka } = require('kafkajs'); // Kafka producer configuration const kafka = new Kafka({ clientId: 'order-service', brokers: ['localhost:9092'], }); const producer = kafka.producer(); // Initialize Express app const app = express(); app.use(express.json()); const placeOrder = async (orderId, productId, quantity) => { await producer.connect(); const orderEvent = { orderId, productId, quantity, eventType: 'ORDER_PLACED', timestamp: Date.now(), }; await producer.send({ topic: 'product-updates', messages: [{ value: JSON.stringify(orderEvent) }], }); await producer.disconnect(); console.log(`Order placed: ${orderId} for product: ${productId}`); }; // API endpoint to place an order app.post('/order', async (req, res) => { const { orderId, productId, quantity } = req.body; if (!orderId || !productId || !quantity) { return res.status(400).json({ error: 'Missing orderId, productId, or quantity' }); } try { await placeOrder(orderId, productId, quantity); res.status(200).json({ message: `Order ${orderId} placed successfully.` }); } catch (error) { console.error('Error placing order:', error); res.status(500).json({ error: 'Failed to place order' }); } }); // Start the server const PORT = process.env.PORT || 3000; app.listen(PORT, () => { console.log(`Order Service API running on port ${PORT}`); });
Product Service は、product-updates Kafka トピックからのメッセージを消費し、それに応じて製品在庫を更新します。実装は次のとおりです:
// productService.js const express = require('express'); const { Kafka } = require('kafkajs'); // Kafka consumer configuration const kafka = new Kafka({ clientId: 'product-service', brokers: ['localhost:9092'], }); const consumer = kafka.consumer({ groupId: 'product-group' }); // Initialize Express app const app = express(); app.use(express.json()); const updateStock = async () => { await consumer.connect(); await consumer.subscribe({ topic: 'product-updates', fromBeginning: true }); await consumer.run({ eachMessage: async ({ topic, partition, message }) => { const orderEvent = JSON.parse(message.value.toString()); console.log(`Received order: ${orderEvent.orderId}, Product: ${orderEvent.productId}, Quantity: ${orderEvent.quantity}`); // Simulate stock update console.log(`Updating stock for product: ${orderEvent.productId}`); // logic to update stock }, }); }; // Start the Product Service to listen for messages updateStock().catch(console.error); // Start the server const PORT = process.env.PORT || 3001; app.listen(PORT, () => { console.log(`Product Service API running on port ${PORT}`); });
受信メッセージをリッスンする必要があるため、最初に 製品サービス を開始します。
node productService.js
製品サービスは、ポート 3001 (または指定されている場合は別のポート) でのリッスンを開始します。
次のコマンドで 注文サービス を開始します:
node orderService.js
オーダー サービスは、ポート 3000 (または指定されている場合は別のポート) で利用可能になります。
POST リクエストを Order Service API:
に送信することで注文できます。
curl -X POST http://localhost:3000/order \ -H "Content-Type: application/json" \ -d '{ "orderId": "order-789", "productId": "product-123", "quantity": 5 }'
When an order is placed, the Order Service will send a Kafka message, and the Product Service will consume that message to update the stock:
Received order: order-789, Product: product-123, Quantity: 5 Updating stock for product: product-123
Integrating Apache Kafka and Node.js in your microservices architecture allows you to build highly scalable and resilient event-driven applications.
By following best practices and leveraging Kafka’s powerful features, you can efficiently process real-time data and create a robust communication layer between your microservices.
以上がマイクロサービス アーキテクチャで Kafka と Node.js を実装するの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。