Pada masa ini, kita hidup dalam dunia di mana bait peta data dijana setiap saat. Oleh itu, menganalisis dan memproses data ini dalam masa nyata menjadi lebih penting bagi syarikat yang ingin menjana cerapan perniagaan dengan lebih tepat apabila data dan lebih banyak data dihasilkan.
Hari ini, kami akan membangunkan analisis data masa nyata berdasarkan data trafik udara rekaan menggunakan Spark Structured Streaming dan Apache Kafka. Jika anda tidak tahu apakah teknologi ini, saya cadangkan membaca artikel saya yang saya tulis untuk memperkenalkannya dengan lebih terperinci, serta konsep lain yang akan dibincangkan sepanjang artikel ini. Jadi, jangan lupa untuk menyemaknya?.
Anda boleh menyemak projek lengkap di GitHub saya.
Nah, bayangkan anda, seorang jurutera data, bekerja di syarikat penerbangan bernama SkyX, di mana data tentang trafik udara dijana setiap saat.
Anda diminta untuk membangunkan papan pemuka yang memaparkan data masa nyata daripada penerbangan ini, seperti penarafan bandar yang paling banyak dikunjungi di luar negara; bandar-bandar di mana kebanyakan orang pergi; dan pesawat yang mengangkut paling ramai orang di seluruh dunia.
Ini ialah data yang dijana dengan setiap penerbangan:
Berikut ialah seni bina asas projek kami:
Tutorial ini menganggap anda sudah memasang PySpark pada mesin anda. Jika anda belum berbuat demikian, lihat langkah-langkah dalam dokumentasi itu sendiri.
Bagi Apache Kafka, kami akan menggunakannya melalui kontena melalui Docker??.
Dan akhirnya, kami akan menggunakan Python melalui persekitaran maya.
Tanpa berlengah lagi, buat folder bernama skyx dan tambahkan fail docker-compose.yml di dalamnya.
$ mkdir skyx $ cd skyx $ touch docker-compose.yml
Sekarang, tambah kandungan berikut di dalam fail docker-compose:
version: '3.9' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 2181:2181 kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - 29092:29092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Selesai! Kami kini boleh memuat naik pelayan Kafka kami. Untuk melakukan ini, taip arahan berikut dalam terminal:
$ docker compose up -d $ docker compose ps
NAME COMMAND SERVICE STATUS PORTS skyx-kafka-1 "/etc/confluent/dock…" kafka running 9092/tcp, 0.0.0.0:29092->29092/tcp skyx-zookeeper-1 "/etc/confluent/dock…" zookeeper running 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp
Nota: Tutorial ini menggunakan versi 2.0 Docker Compose. Inilah sebabnya mengapa tiada "-" antara docker dan karang ☺.
Kini, kita perlu mencipta topik dalam Kafka yang akan menyimpan data yang dihantar dalam masa nyata oleh pengeluar. Untuk melakukan ini, mari akses Kafka di dalam bekas:
$ docker compose exec kafka bash
Dan akhirnya mencipta topik, dipanggil lalu lintas udara.
$ kafka-topik --buat --topik lalu lintas udara --bootstrap-server localhost:29092
Mencipta topik lalu lintas udara.
Untuk membangunkan pengeluar kami, iaitu, aplikasi yang akan bertanggungjawab menghantar data trafik udara masa nyata ke topik Kafka, kami perlu menggunakan perpustakaan kafka-python. kafka-python ialah perpustakaan yang dibangunkan komuniti yang membolehkan kami membangunkan pengeluar dan pengguna yang berintegrasi dengan Apache Kafka.
Mula-mula, mari buat fail bernama requirements.txt dan tambahkan kebergantungan berikut di dalamnya:
kafka-python
Kedua, kami akan mencipta persekitaran maya dan memasang kebergantungan dalam fail requirements.txt:
$ mkdir skyx $ cd skyx $ touch docker-compose.yml
Selesai! Kini persekitaran kita sudah bersedia untuk pembangunan?.
Sekarang mari buat penerbit kami. Seperti yang dinyatakan, pengeluar akan bertanggungjawab untuk menghantar data trafik udara ke topik Kafka yang baru dibuat.
Seperti yang juga dikatakan dalam seni bina, SkyX hanya terbang antara lima bandar di seluruh dunia, dan hanya mempunyai lima pesawat yang tersedia?. Perlu dinyatakan bahawa setiap pesawat membawa antara 50 dan 100 orang.
Perhatikan bahawa data dijana secara rawak dan dihantar ke topik dalam format json dalam selang masa antara 1 dan 6 saat?.
Jom! Buat subdirektori dipanggil src dan subdirektori lain dipanggil kafka. Di dalam direktori kafka, buat fail bernama airtraffic_producer.py dan tambahkan kod berikut di dalamnya:
version: '3.9' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 2181:2181 kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - 29092:29092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Selesai! Kami membangunkan pengeluar kami. Jalankan dan biarkan ia berjalan seketika.
$ python airtraffic_producer.py
Sekarang mari bangunkan pengguna kita. Ini akan menjadi aplikasi yang sangat mudah. Ia hanya akan memaparkan data yang tiba dalam topik kafka dalam masa nyata dalam terminal.
Masih di dalam direktori kafka, buat fail bernama airtraffic_consumer.py dan tambahkan kod berikut di dalamnya:
$ docker compose up -d $ docker compose ps
Lihat, saya memberitahu anda bahawa ia sangat mudah. Jalankan dan tonton data yang akan dipaparkan dalam masa nyata semasa pengeluar menghantar data ke topik.
$ python airtraffic_consumer.py
Sekarang kita mulakan dengan analisis data kita. Pada ketika ini, kami akan membangunkan papan pemuka, aplikasi, yang akan memaparkan dalam masa nyata kedudukan bandar yang menerima paling ramai pelancong. Dengan kata lain, kami akan mengumpulkan data mengikut lajur ke dan membuat jumlah berdasarkan lajur penumpang. Sangat mudah!
Untuk melakukan ini, dalam direktori src, cipta subdirektori yang dipanggil papan pemuka dan buat fail bernama tourists_analysis.py. Kemudian tambahkan kod berikut di dalamnya:
$ mkdir skyx $ cd skyx $ touch docker-compose.yml
Dan kini kami boleh melaksanakan fail kami melalui penyerahan percikan. Tetapi bertenang! Apabila kami menyepadukan PySpark dengan Kafka, kami mesti menjalankan penyerahan percikan secara berbeza. Anda perlu memaklumkan pakej Apache Kafka dan versi semasa Apache Spark melalui parameter --packages.
Jika ini kali pertama anda menyepadukan Apache Spark dengan Apache Kafka, penyerahan percikan mungkin mengambil sedikit masa untuk dijalankan. Ini kerana ia perlu memuat turun pakej yang diperlukan.
Pastikan pengeluar masih berjalan supaya kami dapat melihat analisis data dalam masa nyata. Di dalam direktori papan pemuka, jalankan arahan berikut:
$ spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 tourists_analysis.py
version: '3.9' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 2181:2181 kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - 29092:29092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Analisis ini hampir sama dengan yang sebelumnya. Walau bagaimanapun, daripada menganalisis dalam masa nyata bandar yang menerima paling ramai pelancong, kami akan menganalisis bandar yang paling ramai orang pergi. Untuk melakukan ini, buat fail bernama leavers_analysis.py dan tambahkan kod berikut di dalamnya:
$ docker compose up -d $ docker compose ps
Pastikan pengeluar masih berjalan supaya kami dapat melihat analisis data dalam masa nyata. Di dalam direktori papan pemuka, jalankan arahan berikut:
$ spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 leavers_analysis.py
NAME COMMAND SERVICE STATUS PORTS skyx-kafka-1 "/etc/confluent/dock…" kafka running 9092/tcp, 0.0.0.0:29092->29092/tcp skyx-zookeeper-1 "/etc/confluent/dock…" zookeeper running 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp
Analisis ini jauh lebih mudah daripada yang sebelumnya. Mari analisa dalam masa nyata pesawat yang mengangkut penumpang terbanyak antara bandar di seluruh dunia. Buat fail bernama aircrafts_analysis.py dan tambahkan kod berikut di dalamnya:
$ python -m venv venv $ venv\scripts\activate $ pip install -r requirements.txt
Pastikan pengeluar masih berjalan supaya kami dapat melihat analisis data dalam masa nyata. Di dalam direktori papan pemuka, jalankan arahan berikut:
$ spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 aircrafts_analysis.py
$ mkdir skyx $ cd skyx $ touch docker-compose.yml
Dan kita selesai di sini, kawan-kawan! Dalam artikel ini, kami membangunkan analisis data masa nyata berdasarkan data trafik udara rekaan menggunakan Spark Structured Streaming dan Apache Kafka.
Untuk melakukan ini, kami membangunkan pengeluar yang menghantar data ini dalam masa nyata ke topik Kafka, dan kemudian kami membangunkan 3 papan pemuka untuk menganalisis data ini dalam masa nyata.
Saya harap anda menyukainya. Jumpa lagi lain kali?.
Atas ialah kandungan terperinci Analisis data trafik udara masa nyata dengan Spark Structured Streaming dan Apache Kafka. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!