이벤트 기반 애플리케이션을 위한 마이크로서비스 아키텍처를 설계할 때 Apache Kafka와 Node.js를 통합하면 실시간 데이터 처리 기능을 크게 향상할 수 있습니다. . 이 기사에서는 Kafka Node.js 통합을 활용하여 스트리밍 데이터를 효율적으로 처리하는 강력하고 확장 가능한 마이크로서비스를 구축하는 방법을 살펴보겠습니다.
마이크로서비스 아키텍처에서는 서비스가 서로 효율적으로 통신해야 합니다. Apache Kafka는 마이크로서비스 간 실시간 데이터 교환을 가능하게 하는 분산 이벤트 스트리밍 플랫폼 역할을 합니다. 서비스를 분리하여 대용량 데이터를 처리하면서 독립적으로 운영할 수 있습니다.
Apache Kafka와 Node.js를 마이크로서비스 환경에 통합하려면 Kafka를 메시지 브로커로 설정하고 이를 Node.js 서비스와 연결해야 합니다. 단계별 가이드는 다음과 같습니다.
먼저 Apache Kafka 및 Node.js가 시스템에 설치되어 있는지 확인하세요. 다음 문서에 따라 Kafka 및 Node.js를 설치할 수 있습니다.
Node.js를 Kafka와 연결하려면 인기 있는 Node.js용 Kafka 클라이언트인 kafkajs 라이브러리를 사용할 수 있습니다.
npm install kafkajs
마이크로서비스 아키텍처에서 Kafka 생산자는 Kafka 주제에 메시지를 보내는 일을 담당합니다. 다음은 Node.js에서 Kafka 생산자를 생성하는 방법에 대한 간단한 예입니다.
const { Kafka } = require('kafkajs'); const kafka = new Kafka({ clientId: 'my-producer', brokers: ['localhost:9092'] }); const producer = kafka.producer(); const sendMessage = async () => { await producer.connect(); await producer.send({ topic: 'my-topic', messages: [ { value: 'Hello Kafka' }, ], }); await producer.disconnect(); }; sendMessage().catch(console.error);
Kafka 소비자는 Kafka 주제의 메시지를 읽는 데 사용됩니다. 소비자를 생성하는 방법은 다음과 같습니다.
const { Kafka } = require('kafkajs'); const kafka = new Kafka({ clientId: 'my-consumer', brokers: ['localhost:9092'] }); const consumer = kafka.consumer({ groupId: 'my-group' }); const runConsumer = async () => { await consumer.connect(); await consumer.subscribe({ topic: 'my-topic', fromBeginning: true }); await consumer.run({ eachMessage: async ({ topic, partition, message }) => { console.log({ partition, offset: message.offset, value: message.value.toString(), }); }, }); }; runConsumer().catch(console.error);
마이크로서비스 아키텍처에서 Kafka와 Node.js의 통합을 설명하려면 다음 사례 연구를 고려하세요.
두 가지 마이크로서비스가 있습니다.
주문 서비스에서 구매나 거래가 발생할 때마다 제품 서비스에 재고가 업데이트됩니다. Kafka는 메시지 브로커 역할을 하여 이러한 통신을 용이하게 합니다.
주문 서비스는 구매 주문을 처리하고 제품 서비스에 메시지를 보내 재고를 업데이트하는 역할을 담당합니다. Kafka 프로듀서로서 주문 서비스를 구현하는 방법은 다음과 같습니다.
// orderService.js const express = require('express'); const { Kafka } = require('kafkajs'); // Kafka producer configuration const kafka = new Kafka({ clientId: 'order-service', brokers: ['localhost:9092'], }); const producer = kafka.producer(); // Initialize Express app const app = express(); app.use(express.json()); const placeOrder = async (orderId, productId, quantity) => { await producer.connect(); const orderEvent = { orderId, productId, quantity, eventType: 'ORDER_PLACED', timestamp: Date.now(), }; await producer.send({ topic: 'product-updates', messages: [{ value: JSON.stringify(orderEvent) }], }); await producer.disconnect(); console.log(`Order placed: ${orderId} for product: ${productId}`); }; // API endpoint to place an order app.post('/order', async (req, res) => { const { orderId, productId, quantity } = req.body; if (!orderId || !productId || !quantity) { return res.status(400).json({ error: 'Missing orderId, productId, or quantity' }); } try { await placeOrder(orderId, productId, quantity); res.status(200).json({ message: `Order ${orderId} placed successfully.` }); } catch (error) { console.error('Error placing order:', error); res.status(500).json({ error: 'Failed to place order' }); } }); // Start the server const PORT = process.env.PORT || 3000; app.listen(PORT, () => { console.log(`Order Service API running on port ${PORT}`); });
제품 서비스는 product-updates Kafka 주제의 메시지를 사용하고 이에 따라 제품 재고를 업데이트합니다. 구현은 다음과 같습니다.
// productService.js const express = require('express'); const { Kafka } = require('kafkajs'); // Kafka consumer configuration const kafka = new Kafka({ clientId: 'product-service', brokers: ['localhost:9092'], }); const consumer = kafka.consumer({ groupId: 'product-group' }); // Initialize Express app const app = express(); app.use(express.json()); const updateStock = async () => { await consumer.connect(); await consumer.subscribe({ topic: 'product-updates', fromBeginning: true }); await consumer.run({ eachMessage: async ({ topic, partition, message }) => { const orderEvent = JSON.parse(message.value.toString()); console.log(`Received order: ${orderEvent.orderId}, Product: ${orderEvent.productId}, Quantity: ${orderEvent.quantity}`); // Simulate stock update console.log(`Updating stock for product: ${orderEvent.productId}`); // logic to update stock }, }); }; // Start the Product Service to listen for messages updateStock().catch(console.error); // Start the server const PORT = process.env.PORT || 3001; app.listen(PORT, () => { console.log(`Product Service API running on port ${PORT}`); });
수신 메시지를 수신해야 하므로 제품 서비스를 먼저 시작하세요.
node productService.js
제품 서비스는 포트 3001(또는 지정된 경우 다른 포트)에서 수신 대기를 시작합니다.
다음 명령으로 주문 서비스를 시작하세요.
node orderService.js
주문 서비스는 포트 3000(또는 지정된 경우 다른 포트)에서 사용할 수 있습니다.
주문 서비스 API에 POST 요청을 보내 주문할 수 있습니다.
curl -X POST http://localhost:3000/order \ -H "Content-Type: application/json" \ -d '{ "orderId": "order-789", "productId": "product-123", "quantity": 5 }'
When an order is placed, the Order Service will send a Kafka message, and the Product Service will consume that message to update the stock:
Received order: order-789, Product: product-123, Quantity: 5 Updating stock for product: product-123
Integrating Apache Kafka and Node.js in your microservices architecture allows you to build highly scalable and resilient event-driven applications.
By following best practices and leveraging Kafka’s powerful features, you can efficiently process real-time data and create a robust communication layer between your microservices.
위 내용은 마이크로서비스 아키텍처에서 Kafka 및 Node.js 구현의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!