目錄
場景介紹
前提條件
首頁 運維 Apache 聊聊怎麼解析Apache Avro資料(範例講解)

聊聊怎麼解析Apache Avro資料(範例講解)

Feb 22, 2022 am 10:47 AM
apache

怎麼解析Apache Avro資料?這篇文章跟大家介紹一下序列化產生Avro資料、反序列化解析Avro資料、使用FlinkSQL解析Avro資料的方法,希望對大家有幫助!

聊聊怎麼解析Apache Avro資料(範例講解)

隨著網路高速的發展,雲端運算、大數據、人工智慧AI、物聯網等尖端技術已然成為當今時代主流的高新技術,電商網站、臉部辨識、無人駕駛、智慧家居、智慧城市等等,不僅方面方便了人們的衣食住行,背後更是時時刻刻有大量的數據在經過各種各樣的系統平台的採集、清晰、分析,而確保資料的低時延、高吞吐、安全性就顯得尤為重要,Apache Avro本身透過Schema的方式序列化後進行二進位傳輸,一方面保證了資料的高速傳輸,另一方面保證了資料安全性,avro目前在各行業的應用越來越廣泛,如何對avro數據進行處理解析應用就格外重要,本文將示範如果序列化產生avro數據,並使用FlinkSQL進行解析。

本文是avro解析的demo,目前FlinkSQL僅適用於簡單的avro資料解析,複雜巢狀avro資料暫時不支援。

場景介紹

本文主要介紹以下三個重點內容:

  • #如何序列化產生Avro資料

  • #如何反序列化解析Avro資料

  • 如何使用FlinkSQL解析Avro資料

前提條件

  • 了解avro是什麼,可參考apache avro官網快速入門指南

  • #了解avro應用程式場景

##操作步驟

#1、新avro maven工程項目,設定pom依賴

聊聊怎麼解析Apache Avro資料(範例講解)

pom檔案內容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.huawei.bigdata</groupId>
    <artifactId>avrodemo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.8.1</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.8.1</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
                            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>
登入後複製

注意:以上pom檔案設定了自動生成類別的路徑,即

pr##o# ject.bas# #edir/src/main/avrr

o

/

聊聊怎麼解析Apache Avro資料(範例講解)

{project.basedir}/src/main/java/,這樣配置之後,在執行mvn命令的時候,這個插件就會自動將此目錄下的avsc schema生成類文件,並放到後者這個目錄下。如果沒有產生avro目錄,手動建立一下即可。 ######2、定義schema######使用JSON為Avro定義schema。 schema由基本型別(null,boolean, int, long, float, double, bytes 和string)和複雜型別(record, enum, array, map, union, 和fixed)組成。例如,以下定義一個user的schema,在main目錄下建立一個avro目錄,然後在avro目錄下新檔案user.avsc :###
{"namespace": "lancoo.ecbdc.pre",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}
登入後複製
############3、編譯schema# ##

点击maven projects项目的compile进行编译,会自动在创建namespace路径和User类代码

聊聊怎麼解析Apache Avro資料(範例講解)

4、序列化

创建TestUser类,用于序列化生成数据

User user1 = new User();
user1.setName("Alyssa");
user1.setFavoriteNumber(256);
// Leave favorite col or null

// Alternate constructor
User user2 = new User("Ben", 7, "red");

// Construct via builder
User user3 = User.newBuilder()
        .setName("Charlie")
        .setFavoriteColor("blue")
        .setFavoriteNumber(null)
        .build();

// Serialize user1, user2 and user3 to disk
DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
dataFileWriter.create(user1.getSchema(), new File("user_generic.avro"));
dataFileWriter.append(user1);
dataFileWriter.append(user2);
dataFileWriter.append(user3);
dataFileWriter.close();
登入後複製

执行序列化程序后,会在项目的同级目录下生成avro数据

聊聊怎麼解析Apache Avro資料(範例講解)

user_generic.avro内容如下:

Objavro.schema�{"type":"record","name":"User","namespace":"lancoo.ecbdc.pre","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}
登入後複製

至此avro数据已经生成。

5、反序列化

通过反序列化代码解析avro数据

// Deserialize Users from disk
DatumReader<User> userDatumReader = new SpecificDatumReader<User>(User.class);
DataFileReader<User> dataFileReader = new DataFileReader<User>(new File("user_generic.avro"), userDatumReader);
User user = null;
while (dataFileReader.hasNext()) {
    // Reuse user object by passing it to next(). This saves us from
    // allocating and garbage collecting many objects for files with
    // many items.
    user = dataFileReader.next(user);
    System.out.println(user);
}
登入後複製

执行反序列化代码解析user_generic.avro

聊聊怎麼解析Apache Avro資料(範例講解)

avro数据解析成功。

6、将user_generic.avro上传至hdfs路径

hdfs dfs -mkdir -p /tmp/lztest/

hdfs dfs -put user_generic.avro /tmp/lztest/
登入後複製

聊聊怎麼解析Apache Avro資料(範例講解)

7、配置flinkserver

  • 准备avro jar包

将flink-sql-avro-*.jar、flink-sql-avro-confluent-registry-*.jar放入flinkserver lib,将下面的命令在所有flinkserver节点执行

cp /opt/huawei/Bigdata/FusionInsight_Flink_8.1.2/install/FusionInsight-Flink-1.12.2/flink/opt/flink-sql-avro*.jar /opt/huawei/Bigdata/FusionInsight_Flink_8.1.3/install/FusionInsight-Flink-1.12.2/flink/lib

chmod 500 flink-sql-avro*.jar

chown omm:wheel flink-sql-avro*.jar
登入後複製

聊聊怎麼解析Apache Avro資料(範例講解)

  • 同时重启FlinkServer实例,重启完成后查看avro包是否被上传

    hdfs dfs -ls /FusionInsight_FlinkServer/8.1.2-312005/lib

聊聊怎麼解析Apache Avro資料(範例講解)

8、编写FlinkSQL

CREATE TABLE testHdfs(
  name String,
  favorite_number int,
  favorite_color String
) WITH(
  &#39;connector&#39; = &#39;filesystem&#39;,
  &#39;path&#39; = &#39;hdfs:///tmp/lztest/user_generic.avro&#39;,
  &#39;format&#39; = &#39;avro&#39;
);CREATE TABLE KafkaTable (
  name String,
  favorite_number int,
  favorite_color String
) WITH (
  &#39;connector&#39; = &#39;kafka&#39;,
  &#39;topic&#39; = &#39;testavro&#39;,
  &#39;properties.bootstrap.servers&#39; = &#39;96.10.2.1:21005&#39;,
  &#39;properties.group.id&#39; = &#39;testGroup&#39;,
  &#39;scan.startup.mode&#39; = &#39;latest-offset&#39;,
  &#39;format&#39; = &#39;avro&#39;
);
insert into
  KafkaTable
select
  *
from
  testHdfs;
登入後複製

聊聊怎麼解析Apache Avro資料(範例講解)

保存提交任务

9、查看对应topic中是否有数据

聊聊怎麼解析Apache Avro資料(範例講解)

FlinkSQL解析avro数据成功。

【推荐:Apache使用教程

以上是聊聊怎麼解析Apache Avro資料(範例講解)的詳細內容。更多資訊請關注PHP中文網其他相關文章!

本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱工具

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

強大的PHP整合開發環境

Dreamweaver CS6

Dreamweaver CS6

視覺化網頁開發工具

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)

apache中cgi目錄怎麼設置 apache中cgi目錄怎麼設置 Apr 13, 2025 pm 01:18 PM

要在 Apache 中設置 CGI 目錄,需要執行以下步驟:創建 CGI 目錄,如 "cgi-bin",並授予 Apache 寫入權限。在 Apache 配置文件中添加 "ScriptAlias" 指令塊,將 CGI 目錄映射到 "/cgi-bin" URL。重啟 Apache。

apache80端口被佔用怎麼辦 apache80端口被佔用怎麼辦 Apr 13, 2025 pm 01:24 PM

當 Apache 80 端口被佔用時,解決方法如下:找出佔用該端口的進程並關閉它。檢查防火牆設置以確保 Apache 未被阻止。如果以上方法無效,請重新配置 Apache 使用不同的端口。重啟 Apache 服務。

apache怎麼連接數據庫 apache怎麼連接數據庫 Apr 13, 2025 pm 01:03 PM

Apache 連接數據庫需要以下步驟:安裝數據庫驅動程序。配置 web.xml 文件以創建連接池。創建 JDBC 數據源,指定連接設置。從 Java 代碼中使用 JDBC API 訪問數據庫,包括獲取連接、創建語句、綁定參數、執行查詢或更新以及處理結果。

apache怎麼啟動 apache怎麼啟動 Apr 13, 2025 pm 01:06 PM

啟動 Apache 的步驟如下:安裝 Apache(命令:sudo apt-get install apache2 或從官網下載)啟動 Apache(Linux:sudo systemctl start apache2;Windows:右鍵“Apache2.4”服務並選擇“啟動”)檢查是否已啟動(Linux:sudo systemctl status apache2;Windows:查看服務管理器中“Apache2.4”服務的狀態)啟用開機自動啟動(可選,Linux:sudo systemctl

怎麼查看自己的apache版本 怎麼查看自己的apache版本 Apr 13, 2025 pm 01:15 PM

有 3 種方法可在 Apache 服務器上查看版本:通過命令行(apachectl -v 或 apache2ctl -v)、檢查服務器狀態頁(http://&lt;服務器IP或域名&gt;/server-status)或查看 Apache 配置文件(ServerVersion: Apache/&lt;版本號&gt;)。

apache怎麼刪除多於的服務器名 apache怎麼刪除多於的服務器名 Apr 13, 2025 pm 01:09 PM

要從 Apache 中刪除多餘的 ServerName 指令,可以採取以下步驟:識別並刪除多餘的 ServerName 指令。重新啟動 Apache 使更改生效。檢查配置文件驗證更改。測試服務器確保問題已解決。

怎麼查看apache版本 怎麼查看apache版本 Apr 13, 2025 pm 01:00 PM

如何查看 Apache 版本?啟動 Apache 服務器:使用 sudo service apache2 start 啟動服務器。查看版本號:使用以下方法之一查看版本:命令行:運行 apache2 -v 命令。服務器狀態頁面:在 Web 瀏覽器中訪問 Apache 服務器的默認端口(通常為 80),版本信息顯示在頁面底部。

apache怎麼配置zend apache怎麼配置zend Apr 13, 2025 pm 12:57 PM

如何在 Apache 中配置 Zend?在 Apache Web 服務器中配置 Zend Framework 的步驟如下:安裝 Zend Framework 並解壓到 Web 服務器目錄中。創建 .htaccess 文件。創建 Zend 應用程序目錄並添加 index.php 文件。配置 Zend 應用程序(application.ini)。重新啟動 Apache Web 服務器。

See all articles