Bagaimana untuk menghuraikan data Apache Avro? Artikel ini akan memperkenalkan anda kepada kaedah bersiri untuk menjana data Avro, menyahsiri untuk menghuraikan data Avro dan menggunakan FlinkSQL untuk menghuraikan data Avro saya harap ia akan membantu anda.
Dengan perkembangan pesat Internet, teknologi termaju seperti pengkomputeran awan, data besar, kecerdasan buatan AI dan Internet Perkara telah menjadi arus perdana teknologi tinggi teknologi pada era hari ini, seperti laman web e-dagang , pengecaman muka, pemanduan tanpa pemandu, rumah pintar, bandar pintar, dan lain-lain, bukan sahaja memudahkan keperluan harian orang ramai, makanan, perumahan dan pengangkutan, tetapi di sebalik tabir, sentiasa ada yang besar. jumlah data yang dikumpul, dikosongkan dan dianalisis oleh pelbagai platform sistem, dan adalah penting untuk memastikan kependaman rendah, daya pemprosesan tinggi, dan keselamatan data Apache Avro sendiri disiri melalui Skema untuk penghantaran binari memastikan penghantaran data berkelajuan tinggi, dan sebaliknya, ia memastikan keselamatan data , avro kini digunakan secara lebih meluas dalam pelbagai industri Cara memproses dan menghuraikan data avro amat penting data avro melalui siri dan gunakan FlinkSQL untuk analisis.
Artikel ini ialah demo penghuraian avro Pada masa ini, FlinkSQL hanya sesuai untuk penghuraian data avro yang kompleks tidak disokong buat masa ini.
Artikel ini terutamanya memperkenalkan tiga kandungan utama berikut:
Cara mensiri dan menjana data Avro
Cara menyahsiri dan menghuraikan data Avro
Cara menggunakan FlinkSQL untuk menghuraikan data Avro
Untuk memahami apa itu avro, anda boleh merujuk kepada panduan permulaan pantas laman web rasmi apache avro
Fahami senario aplikasi avro
1 Cipta projek avro maven baharu dan konfigurasikan kebergantungan pom
Kandungan fail pom adalah seperti berikut:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.huawei.bigdata</groupId> <artifactId>avrodemo</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.8.1</version> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>schema</goal> </goals> <configuration> <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory> <outputDirectory>${project.basedir}/src/main/java/</outputDirectory> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.6</source> <target>1.6</target> </configuration> </plugin> </plugins> </build> </project>
Nota: Fail pom di atas dikonfigurasikan Menjana laluan kelas secara automatik, iaitu {project.basedir}/src/main /java/. Selepas konfigurasi ini, apabila melaksanakan perintah mvn, Pemalam ini akan menjana fail kelas secara automatik daripada skema avsc dalam direktori ini dan meletakkannya dalam direktori yang terakhir. Jika direktori avro tidak dijana, buat sahaja secara manual.
2. Tentukan skema
Gunakan JSON untuk mentakrifkan skema untuk Avro. Skema terdiri daripada jenis asas (null, boolean, int, long, float, double, bait dan rentetan) dan jenis kompleks (rekod, enum, tatasusunan, peta, kesatuan dan tetap). Sebagai contoh, yang berikut mentakrifkan skema pengguna, mencipta direktori avro dalam direktori utama dan kemudian mencipta pengguna fail baharu.avsc dalam direktori avro:
{"namespace": "lancoo.ecbdc.pre", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] }
3. Susun skema
点击maven projects项目的compile进行编译,会自动在创建namespace路径和User类代码
4、序列化
创建TestUser类,用于序列化生成数据
User user1 = new User(); user1.setName("Alyssa"); user1.setFavoriteNumber(256); // Leave favorite col or null // Alternate constructor User user2 = new User("Ben", 7, "red"); // Construct via builder User user3 = User.newBuilder() .setName("Charlie") .setFavoriteColor("blue") .setFavoriteNumber(null) .build(); // Serialize user1, user2 and user3 to disk DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class); DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter); dataFileWriter.create(user1.getSchema(), new File("user_generic.avro")); dataFileWriter.append(user1); dataFileWriter.append(user2); dataFileWriter.append(user3); dataFileWriter.close();
执行序列化程序后,会在项目的同级目录下生成avro数据
user_generic.avro内容如下:
Objavro.schema�{"type":"record","name":"User","namespace":"lancoo.ecbdc.pre","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}
至此avro数据已经生成。
5、反序列化
通过反序列化代码解析avro数据
// Deserialize Users from disk DatumReader<User> userDatumReader = new SpecificDatumReader<User>(User.class); DataFileReader<User> dataFileReader = new DataFileReader<User>(new File("user_generic.avro"), userDatumReader); User user = null; while (dataFileReader.hasNext()) { // Reuse user object by passing it to next(). This saves us from // allocating and garbage collecting many objects for files with // many items. user = dataFileReader.next(user); System.out.println(user); }
执行反序列化代码解析user_generic.avro
avro数据解析成功。
6、将user_generic.avro上传至hdfs路径
hdfs dfs -mkdir -p /tmp/lztest/ hdfs dfs -put user_generic.avro /tmp/lztest/
7、配置flinkserver
将flink-sql-avro-*.jar、flink-sql-avro-confluent-registry-*.jar放入flinkserver lib,将下面的命令在所有flinkserver节点执行
cp /opt/huawei/Bigdata/FusionInsight_Flink_8.1.2/install/FusionInsight-Flink-1.12.2/flink/opt/flink-sql-avro*.jar /opt/huawei/Bigdata/FusionInsight_Flink_8.1.3/install/FusionInsight-Flink-1.12.2/flink/lib chmod 500 flink-sql-avro*.jar chown omm:wheel flink-sql-avro*.jar
同时重启FlinkServer实例,重启完成后查看avro包是否被上传
hdfs dfs -ls /FusionInsight_FlinkServer/8.1.2-312005/lib
8、编写FlinkSQL
CREATE TABLE testHdfs( name String, favorite_number int, favorite_color String ) WITH( 'connector' = 'filesystem', 'path' = 'hdfs:///tmp/lztest/user_generic.avro', 'format' = 'avro' );CREATE TABLE KafkaTable ( name String, favorite_number int, favorite_color String ) WITH ( 'connector' = 'kafka', 'topic' = 'testavro', 'properties.bootstrap.servers' = '96.10.2.1:21005', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'avro' ); insert into KafkaTable select * from testHdfs;
保存提交任务
9、查看对应topic中是否有数据
FlinkSQL解析avro数据成功。
【推荐:Apache使用教程】
Atas ialah kandungan terperinci Mari kita bincangkan tentang cara menghuraikan data Apache Avro (penjelasan dengan contoh). Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!