Apache Camel に関する一連の記事を紹介できることを嬉しく思います。この最初の投稿では、Apache Camel の複雑さを深く掘り下げるのではなく、その機能を示す実際的な使用例を紹介します。具体的には、Apache Camel を使用して 2 つのデータベース間で単純な抽出、変換、読み込み (ETL) アプリケーションを作成する方法を学習します。
実際の使用例に入る前に、Apache Camel について簡単に紹介しましょう。 Apache Camel は、エンタープライズ統合パターン (EIP) を活用してさまざまなシステムの統合を促進するオープンソースの統合フレームワークです。
今日の世界では、さまざまな種類の多数のシステムが共存しています。レガシー システムもあれば、新しいシステムもあります。これらのシステムは多くの場合、相互に対話して統合する必要がありますが、実装やメッセージ形式が異なるため、これは困難な場合があります。解決策の 1 つは、カスタム コードを作成してこれらの違いを埋めることですが、これにより密結合とメンテナンスの問題が発生する可能性があります。
代わりに、Apache Camel はシステム間の違いを仲介する追加レイヤーを提供し、その結果疎結合が実現し、メンテナンスが容易になります。 Camel は、API (または宣言型 Java ドメイン固有言語) を使用して、EIP に基づいてルーティングおよびメディエーション ルールを構成します。
Apache Camel を理解するには、「エンタープライズ統合パターン」(EIP) を理解することが重要です。 『Enterprise Integration Patterns』という本では、コンポーネントが同じプロセスまたは異なるマシン上で実行できる大規模なコンポーネント ベースのシステムを設計するための一連のパターンについて説明しています。重要な考え方は、システムはメッセージ指向であり、コンポーネントがメッセージを介して通信する必要があるということです。パターンは、これらの通信を実装するためのツールキットを提供します (図 1)。
図 1 – 統合ソリューションの基本要素 (enterpriseintegrationpatterns.com)
エンドポイント: エンドポイントは、メッセージが送受信されるチャネルです。これは、コンポーネントと外部世界の間のインターフェイスとして機能します。
メッセージ: メッセージは、システム間の通信に使用されるデータ構造であり、ヘッダーとボディで構成されます。ヘッダーにはメタデータが含まれ、本文には実際のデータが含まれます。
チャネル: チャネルは 2 つのエンドポイントを接続し、メッセージの送受信を容易にします。
ルーター: ルーターは、あるエンドポイントから別のエンドポイントにメッセージを送り、メッセージ パスを決定します。
トランスレーター: トランスレーターは、メッセージをある形式から別の形式に変換します。
Apache Camel の紹介については、今のところここでやめておこうと思います。次に、Apache Camel を使用して 2 つのデータベース間に単純な ETL アプリケーションを作成する方法を説明します。
重要なコンポーネントの 1 つがデータベースである高負荷のシステムがあると仮定しましょう。ある時点で、ML モデルのトレーニング、エクスポート、グラフの生成など、通常の運用ケース以外でこのデータを処理する必要がある場合、または単にデータの一部が必要な場合があります。もちろん、これにより運用データベースへの負担がさらに大きくなります。そのためには、必要なデータを抽出し、必要な形式に変換して、別のデータベースに保存できるメカニズムが最適です。運用上のものよりも。この戦略により、当社の運営基盤に過度の潜在的な過負荷がかかるという問題を解決します。さらに、このようなメカニズムを使用すると、システムに負荷がかかりすぎない時間帯 (夜間など) にこの操作を実行できます。
解決策は以下の図 (図 2) に示されています。 Apache Camel を使用して、2 つのデータベース間に単純な ETL アプリケーションを作成します。アプリケーションはソース データベースからデータを抽出し、変換して、ターゲット データベースにロードします。ソース データベースからデータを抽出する方法に焦点を当てて、このソリューションを実装するためのさまざまな戦略を導入できます。データを選択する基準は、レコードの更新日に基づいていると思います。この戦略は、変更されたデータも抽出する機会を提供します。
図 2 – Apache Camel を使用した 2 つのデータベース間のデータの同期
ソース データベースとターゲット データベースは次のテーブル構造になります:
CREATE TABLE IF NOT EXISTS user ( id serial PRIMARY KEY, username VARCHAR(50) NOT NULL, password VARCHAR(50) NOT NULL, email VARCHAR(255) NOT NULL, created_at timestamp default now()::timestamp without time zone, last_modified TIMESTAMP DEFAULT now()::timestamp without time zone );
ターゲット データベースでは、ユーザー名を挿入する前に大文字に変換します。
さまざまな Camel コンポーネントに Camel Quarkus 拡張機能を使用します。具体的には、Camel SQL コンポーネントを使用してデータベースと対話します。 SQL コンポーネントは、SQL クエリ、挿入、更新、削除の実行をサポートします。
まず、RouteBuilder を拡張するクラスを作成し、configure メソッドをオーバーライドします。
@ApplicationScoped public class UserRoute extends RouteBuilder { @Override public void configure() throws Exception { // your code here } }
ここでは @ApplicationScoped アノテーションの使用は必須ではありませんが、クラスが CDI Bean であり、CDI コンテナーによって管理される必要があることを示すことを好みます。
上で述べたように、データベースと対話するために Camel SQL コンポーネントを使用します。 Camel SQL コンポーネントをソース データベースとターゲット データベースに接続するように設定する必要があります。 Quarkus Agroal 拡張機能を使用してデータ ソースを設定します。 Agroal 拡張機能は、データ ソースの接続プールを提供します。 application.properties ファイルでデータ ソースを構成します。
# # Source Database Configuration quarkus.datasource.source_db.db-kind=postgresql quarkus.datasource.source_db.jdbc.url=jdbc:postgresql://localhost:5001/demo quarkus.datasource.source_db.username=test quarkus.datasource.source_db.password=password1 # # # Target Database Configuration quarkus.datasource.target_db.db-kind=postgresql quarkus.datasource.target_db.jdbc.url=jdbc:postgresql://localhost:6001/demo quarkus.datasource.target_db.username=test quarkus.datasource.target_db.password=password1 #
これで、ソース データベースとターゲット データベースに接続するように Camel SQL コンポーネントを構成できるようになりました。 SQL コンポーネントを使用して、ソース データベースとターゲット データベースの SQL エンドポイントを作成します。
SQL コンポーネントは次のエンドポイント URI 表記を使用します:
sql:select * from table where id=# order by name[?options]
しかし、操作を自動的に実行するメカニズムが必要です。タイマー コンポーネントを使用して、ETL プロセスを秒ごとにトリガーします。タイマー コンポーネントは、タイマーが起動したときにメッセージ交換を生成するために使用されます。タイマー コンポーネントは、次のエンドポイント URI 表記を使用します:
timer:name[?options]
ルートでは次のような構成を使用します:
from("timer://userSync?delay={{etl.timer.delay}}&period={{etl.timer.period}}")
{{etl.timer.delay}} と {{etl.timer.period}} は、application.properties ファイルで定義する構成値です。
etl.timer.period=10000 etl.timer.delay=1000
ターゲット データベースにデータを挿入する前にデータを変換するには、トランスレータを提供する必要があります。
.process(exchange -> { final Map<String, Object> rows = exchange.getIn().getBody(Map.class); final String userName = (String) rows.get("username"); final String userNameToUpperCase = userName.toUpperCase(); log.info("User name: {} converted to upper case: {}", userName, userNameToUpperCase); rows.put("username", userNameToUpperCase); })
プロセッサ インターフェイスは、メッセージ交換のコンシューマを実装したり、メッセージ トランスレータやその他のユースケースを実装したりするために使用されます。
これで、Apache Camel を使用した 2 つのデータベース間の単純な ETL アプリケーションが完成しました。
アプリケーションを実行すると、ログに次の出力が表示されるはずです。
2024-06-09 13:15:49,257 INFO [route1] (Camel (camel-1) thread #1 - timer://userSync) Extracting Max last_modified value from source database 2024-06-09 13:15:49,258 INFO [route1] (Camel (camel-1) thread #1 - timer://userSync) No record found in target database 2024-06-09 13:15:49,258 INFO [route2] (Camel (camel-1) thread #1 - timer://userSync) The last_modified from source DB: 2024-06-09 13:15:49,274 INFO [route2] (Camel (camel-1) thread #1 - timer://userSync) Extracting records from source database 2024-06-09 13:15:49,277 INFO [org.iqn.cam.rou.UserRoute] (Camel (camel-1) thread #1 - timer://userSync) User name: john_doe converted to upper case: JOHN_DOE 2024-06-09 13:15:49,282 INFO [org.iqn.cam.rou.UserRoute] (Camel (camel-1) thread #1 - timer://userSync) User name: jane_smith converted to upper case: JANE_SMITH 2024-06-09 13:15:49,283 INFO [org.iqn.cam.rou.UserRoute] (Camel (camel-1) thread #1 - timer://userSync) User name: alice_miller converted to upper case: ALICE_MILLER
アプリケーションの完全なソース コードは GitHub リポジトリで見つけることができます。
このセットアップでは、Apache Camel を使用して、ソース データベースからデータを抽出し、変換して、ターゲット データベースにロードする単純な ETL アプリケーションを作成しました。このアプローチにより、運用データベースの負荷が軽減され、オフピーク時にデータ抽出を実行できるようになります。
以上がQuarkus を使用した Apache Camel の実践ガイド: ETL アプリケーションの構築の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。