Apabila mereka bentuk seni bina perkhidmatan mikro untuk aplikasi dipacu peristiwa, menyepadukan Apache Kafka dan Node.js boleh meningkatkan keupayaan pemprosesan data masa nyata dengan ketara . Dalam artikel ini, kami akan meneroka cara memanfaatkan integrasi Kafka Node.js untuk membina perkhidmatan mikro yang teguh dan boleh skala yang mengendalikan data penstriman dengan cekap.
Dalam seni bina perkhidmatan mikro, perkhidmatan perlu berkomunikasi antara satu sama lain dengan cekap. Apache Kafka berfungsi sebagai platform penstriman acara teragih yang membolehkan pertukaran data masa nyata antara perkhidmatan mikro. Ia memisahkan perkhidmatan, membolehkan mereka beroperasi secara bebas sambil memproses jumlah data yang besar.
Untuk menyepadukan Apache Kafka dan Node.js dalam persekitaran perkhidmatan mikro, anda perlu menyediakan Kafka sebagai broker mesej dan menyambungkannya dengan perkhidmatan Node.js anda. Berikut ialah panduan langkah demi langkah:
Pertama, pastikan Apache Kafka dan Node.js dipasang pada sistem anda. Anda boleh memasang Kafka & Node.js dengan mengikuti artikel berikut:
Untuk menyambung Node.js dengan Kafka, anda boleh menggunakan perpustakaan kafkajs, pelanggan Kafka yang popular untuk Node.js.
npm install kafkajs
Dalam seni bina perkhidmatan mikro, pengeluar Kafka bertanggungjawab menghantar mesej kepada topik Kafka. Di bawah ialah contoh mudah cara mencipta pengeluar Kafka dalam Node.js:
const { Kafka } = require('kafkajs'); const kafka = new Kafka({ clientId: 'my-producer', brokers: ['localhost:9092'] }); const producer = kafka.producer(); const sendMessage = async () => { await producer.connect(); await producer.send({ topic: 'my-topic', messages: [ { value: 'Hello Kafka' }, ], }); await producer.disconnect(); }; sendMessage().catch(console.error);
Pengguna Kafka digunakan untuk membaca mesej daripada topik Kafka. Begini cara anda boleh mencipta pengguna:
const { Kafka } = require('kafkajs'); const kafka = new Kafka({ clientId: 'my-consumer', brokers: ['localhost:9092'] }); const consumer = kafka.consumer({ groupId: 'my-group' }); const runConsumer = async () => { await consumer.connect(); await consumer.subscribe({ topic: 'my-topic', fromBeginning: true }); await consumer.run({ eachMessage: async ({ topic, partition, message }) => { console.log({ partition, offset: message.offset, value: message.value.toString(), }); }, }); }; runConsumer().catch(console.error);
Untuk menggambarkan penyepaduan Kafka dan Node.js dalam seni bina perkhidmatan mikro, pertimbangkan kajian kes berikut:
Kami mempunyai dua perkhidmatan mikro:
Setiap kali pembelian atau transaksi berlaku dalam Perkhidmatan Pesanan, ia akan mengemas kini stok dalam Perkhidmatan Produk. Kafka memudahkan komunikasi ini dengan bertindak sebagai broker mesej.
Perkhidmatan Pesanan bertanggungjawab untuk mengendalikan pesanan pembelian dan menghantar mesej kepada Perkhidmatan Produk untuk mengemas kini stok. Begini cara anda boleh melaksanakan Perkhidmatan Pesanan sebagai pengeluar Kafka:
// orderService.js const express = require('express'); const { Kafka } = require('kafkajs'); // Kafka producer configuration const kafka = new Kafka({ clientId: 'order-service', brokers: ['localhost:9092'], }); const producer = kafka.producer(); // Initialize Express app const app = express(); app.use(express.json()); const placeOrder = async (orderId, productId, quantity) => { await producer.connect(); const orderEvent = { orderId, productId, quantity, eventType: 'ORDER_PLACED', timestamp: Date.now(), }; await producer.send({ topic: 'product-updates', messages: [{ value: JSON.stringify(orderEvent) }], }); await producer.disconnect(); console.log(`Order placed: ${orderId} for product: ${productId}`); }; // API endpoint to place an order app.post('/order', async (req, res) => { const { orderId, productId, quantity } = req.body; if (!orderId || !productId || !quantity) { return res.status(400).json({ error: 'Missing orderId, productId, or quantity' }); } try { await placeOrder(orderId, productId, quantity); res.status(200).json({ message: `Order ${orderId} placed successfully.` }); } catch (error) { console.error('Error placing order:', error); res.status(500).json({ error: 'Failed to place order' }); } }); // Start the server const PORT = process.env.PORT || 3000; app.listen(PORT, () => { console.log(`Order Service API running on port ${PORT}`); });
Perkhidmatan Produk menggunakan mesej daripada topik Kafka kemas kini produk dan mengemas kini stok produk dengan sewajarnya. Inilah pelaksanaannya:
// productService.js const express = require('express'); const { Kafka } = require('kafkajs'); // Kafka consumer configuration const kafka = new Kafka({ clientId: 'product-service', brokers: ['localhost:9092'], }); const consumer = kafka.consumer({ groupId: 'product-group' }); // Initialize Express app const app = express(); app.use(express.json()); const updateStock = async () => { await consumer.connect(); await consumer.subscribe({ topic: 'product-updates', fromBeginning: true }); await consumer.run({ eachMessage: async ({ topic, partition, message }) => { const orderEvent = JSON.parse(message.value.toString()); console.log(`Received order: ${orderEvent.orderId}, Product: ${orderEvent.productId}, Quantity: ${orderEvent.quantity}`); // Simulate stock update console.log(`Updating stock for product: ${orderEvent.productId}`); // logic to update stock }, }); }; // Start the Product Service to listen for messages updateStock().catch(console.error); // Start the server const PORT = process.env.PORT || 3001; app.listen(PORT, () => { console.log(`Product Service API running on port ${PORT}`); });
Mulakan Perkhidmatan Produk dahulu, kerana ia perlu mendengar mesej masuk:
node productService.js
Perkhidmatan Produk akan mula mendengar pada port 3001 (atau port lain jika dinyatakan).
Mulakan Perkhidmatan Pesanan dengan arahan ini:
node orderService.js
Perkhidmatan Pesanan akan tersedia pada port 3000 (atau port lain jika dinyatakan).
Anda boleh membuat pesanan dengan menghantar permintaan POST ke API Perkhidmatan Pesanan:
curl -X POST http://localhost:3000/order \ -H "Content-Type: application/json" \ -d '{ "orderId": "order-789", "productId": "product-123", "quantity": 5 }'
When an order is placed, the Order Service will send a Kafka message, and the Product Service will consume that message to update the stock:
Received order: order-789, Product: product-123, Quantity: 5 Updating stock for product: product-123
Integrating Apache Kafka and Node.js in your microservices architecture allows you to build highly scalable and resilient event-driven applications.
By following best practices and leveraging Kafka’s powerful features, you can efficiently process real-time data and create a robust communication layer between your microservices.
Atas ialah kandungan terperinci Laksanakan Kafka dan Node.js dalam Seni Bina Perkhidmatan Mikro. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!