flume+kafka+storm を使ってリアルタイムログ分析システムを構築する_PHP チュートリアル
flume+kafka+storm を使用してリアルタイムログ分析システムを構築します
この記事では、flume と kafka の組み合わせについてのみ説明します。kafka と storm の組み合わせについては、他のブログを参照してください1。そして使用してください
flume インストールパッケージをダウンロードします http://www .apache.org/dyn/closer.cgi/flume/1.5.2/apache-flume-1.5.2-bin.tar.gz
$ tar -xzvf apache を解凍します-flume-1.5.2-bin.tar.gz - C /opt/flume
flume 設定ファイルは conf ファイル ディレクトリに配置され、実行ファイルは bin ファイル ディレクトリに配置されます。
1) flume を設定します
conf ディレクトリに入り、flume-conf.properties.template をコピーし、必要な名前を付けます
$ cp flume-conf.properties.template flume.conf
flume.conf の内容を変更します。ファイルシンクはチャネル内のデータを受信します。チャネルはメモリチャネルを使用し、ソースは実行ソースを使用します。設定ファイルは次のとおりです:
<ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>agent.sources = seqGenSrc</li><li>agent.channels = memoryChannel</li><li>agent.sinks = loggerSink</li><li></li><li># For each one of the sources, the type is defined</li><li>agent.sources.seqGenSrc.type = exec</li><li>agent.sources.seqGenSrc.command = tail -F /data/mongodata/mongo.log</li><li>#agent.sources.seqGenSrc.bind = 172.168.49.130</li><li></li><li># The channel can be defined as follows.</li><li>agent.sources.seqGenSrc.channels = memoryChannel</li><li></li><li># Each sink's type must be defined</li><li>agent.sinks.loggerSink.type = file_roll</li><li>agent.sinks.loggerSink.sink.directory = /data/flume</li><li></li><li>#Specify the channel the sink should use</li><li>agent.sinks.loggerSink.channel = memoryChannel</li><li></li><li># Each channel's type is defined.</li><li>agent.channels.memoryChannel.type = memory</li><li></li><li># Other config values specific to each type of channel(sink or source)</li><li># can be defined as well</li><li># In this case, it specifies the capacity of the memory channel</li><li>agent.channels.memoryChannel.capacity = 1000</li><li>agent.channels.memory4log.transactionCapacity = 100</li></ol>
bin ディレクトリに切り替えてコマンドを実行します:
$ ./ flume-ng エージェント --conf ../conf -f ../conf/flume.conf --n エージェント -Dflume.root.logger=INFO,console
生成されたログ ファイルは /data/flume ディレクトリで確認できます。 。
2. kafka と組み合わせる
flume1.5.2 には kafka シンクがないため、自分で kafka シンクを開発する必要があります
flume 1.6 の kafka シンクを参照できますが、使用される Kafka のバージョンに注意してください。 kafka API には互換性がありません
ここではコア コード、process() コンテンツのみが提供されます。
<ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>Sink.Status status = Status.READY;<br /> </li><li><br /></li><li>Channel ch = getChannel();<br /></li><li>Transaction transaction = null;<br /></li><li>Event event = null;<br /></li><li>String eventTopic = null;<br /></li><li>String eventKey = null;<br /></li><li><br /></li><li>try {<br /></li><li>transaction = ch.getTransaction();<br /></li><li>transaction.begin();<br /></li><li>messageList.clear();<br /></li><li><br /></li><li>if (type.equals("sync")) {<br /></li><li>event = ch.take();<br /></li><li><br /></li><li> if (event != null) {<br /></li><li> byte[] tempBody = event.getBody();<br /></li><li> String eventBody = new String(tempBody,"UTF-8");<br /></li><li> Map<String, String> headers = event.getHeaders();<br /></li><li><br /></li><li> if ((eventTopic = headers.get(TOPIC_HDR)) == null) {<br /></li><li> eventTopic = topic;<br /></li><li> }<br /></li><li><br /></li><li> eventKey = headers.get(KEY_HDR);<br /></li><li><br /></li><li> if (logger.isDebugEnabled()) {<br /></li><li> logger.debug("{Event} " + eventTopic + " : " + eventKey + " : "<br /></li><li> + eventBody);<br /></li><li> }<br /></li><li> <br /></li><li> ProducerData<String, Message> data = new ProducerData<String, Message><br /></li><li> (eventTopic, new Message(tempBody));<br /></li><li> <br /></li><li> long startTime = System.nanoTime();<br /></li><li> logger.debug(eventTopic+"++++"+eventBody);<br /></li><li> producer.send(data);<br /></li><li> long endTime = System.nanoTime(); </li><li> }<br /></li><li>} else {<br /></li><li>long processedEvents = 0;<br /></li><li>for (; processedEvents < batchSize; processedEvents += 1) {<br /></li><li>event = ch.take();<br /></li><li><br /></li><li> if (event == null) {<br /></li><li> break;<br /></li><li> }<br /></li><li><br /></li><li> byte[] tempBody = event.getBody();<br /></li><li> String eventBody = new String(tempBody,"UTF-8");<br /></li><li> Map<String, String> headers = event.getHeaders();<br /></li><li><br /></li><li> if ((eventTopic = headers.get(TOPIC_HDR)) == null) {<br /></li><li> eventTopic = topic;<br /></li><li> }<br /></li><li><br /></li><li> eventKey = headers.get(KEY_HDR);<br /></li><li><br /></li><li> if (logger.isDebugEnabled()) {<br /></li><li> logger.debug("{Event} " + eventTopic + " : " + eventKey + " : "<br /></li><li> + eventBody);<br /></li><li> logger.debug("event #{}", processedEvents);<br /></li><li> }<br /></li><li><br /></li><li> // create a message and add to buffer<br /></li><li> ProducerData<String, String> data = new ProducerData<String, String><br /></li><li> (eventTopic, eventBody);<br /></li><li> messageList.add(data);<br /></li><li>}<br /></li><li><br /></li><li>// publish batch and commit.<br /></li><li> if (processedEvents > 0) {<br /></li><li> long startTime = System.nanoTime(); </li><li> long endTime = System.nanoTime(); </li><li> }<br /></li><li>}<br /></li><li><br /></li><li>transaction.commit();<br /></li><li>} catch (Exception ex) {<br /></li><li>String errorMsg = "Failed to publish events";<br /></li><li>logger.error("Failed to publish events", ex);<br /></li><li>status = Status.BACKOFF;<br /></li><li>if (transaction != null) {<br /></li><li>try {<br /></li><li>transaction.rollback(); </li><li>} catch (Exception e) {<br /></li><li>logger.error("Transaction rollback failed", e);<br /></li><li>throw Throwables.propagate(e);<br /></li><li>}<br /></li><li>}<br /></li><li>throw new EventDeliveryException(errorMsg, ex);<br /></li><li>} finally {<br /></li><li>if (transaction != null) {<br /></li><li>transaction.close();<br /></li><li>}<br /></li><li>}<br /></li><li><br /></li><li>return status; </li></ol>
<ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>producer.sinks.r.type = org.apache.flume.sink.kafka.KafkaSink<br /> </li><li>producer.sinks.r.brokerList = bigdata-node00:9092<br /></li><li>producer.sinks.r.requiredAcks = 1<br /></li><li>producer.sinks.r.batchSize = 100<br /></li><li>#producer.sinks.r.kafka.producer.type=async<br /></li><li>#producer.sinks.r.kafka.customer.encoding=UTF-8<br /></li><li>producer.sinks.r.topic = testFlume1</li></ol>
以下のパラメータはすべて、kafka の一連のパラメータです。最も重要なことは、brokerList とトピックパラメータです
ここで、flume を再起動すると、kafka の対応するトピックの下に対応するログを表示できます

ホットAIツール

Undresser.AI Undress
リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover
写真から衣服を削除するオンライン AI ツール。

Undress AI Tool
脱衣画像を無料で

Clothoff.io
AI衣類リムーバー

Video Face Swap
完全無料の AI 顔交換ツールを使用して、あらゆるビデオの顔を簡単に交換できます。

人気の記事

ホットツール

メモ帳++7.3.1
使いやすく無料のコードエディター

SublimeText3 中国語版
中国語版、とても使いやすい

ゼンドスタジオ 13.0.1
強力な PHP 統合開発環境

ドリームウィーバー CS6
ビジュアル Web 開発ツール

SublimeText3 Mac版
神レベルのコード編集ソフト(SublimeText3)

ホットトピック









ここ数日、Ice Universeは、サムスンの次期主力スマートフォンであると広く信じられているGalaxy S25 Ultraの詳細を着実に明らかにしている。とりわけ、リーカーはサムスンがカメラのアップグレードを1つだけ計画していると主張した

OnLeaks は、X (旧 Twitter) のフォロワーから 4,000 ドル以上を集めようとして失敗した数日後、Android Headlines と提携して Galaxy S25 Ultra のファーストルックを提供しました。コンテキストとして、h の下に埋め込まれたレンダリング イメージ

TCLは、2つの新しいスマートフォンの発表に加えて、NXTPAPER 14と呼ばれる新しいAndroidタブレットも発表しました。その巨大な画面サイズはセールスポイントの1つです。 NXTPAPER 14 は、TCL の代表的なブランドであるマット LCD パネルのバージョン 3.0 を搭載しています。

Vivo Y300 Pro は完全に公開されたばかりで、大容量バッテリーを備えた最もスリムなミッドレンジ Android スマートフォンの 1 つです。正確に言うと、このスマートフォンの厚さはわずか 7.69 mm ですが、6,500 mAh のバッテリーを搭載しています。これは最近発売されたものと同じ容量です

サムスンは、ファンエディション(FE)スマートフォンシリーズをいつアップデートするかについて、まだ何のヒントも提供していない。現時点では、Galaxy S23 FE は 2023 年 10 月初めに発表された同社の最新版のままです。

ここ数日、Ice Universeは、サムスンの次期主力スマートフォンであると広く信じられているGalaxy S25 Ultraの詳細を着実に明らかにしている。とりわけ、リーカーはサムスンがカメラのアップグレードを1つだけ計画していると主張した

Redmi Note 14 Pro Plusは、昨年のRedmi Note 13 Pro Plus(Amazonで現在375ドル)の直接の後継者として正式に発表されました。予想通り、Redmi Note 14 Pro Plusは、Redmi Note 14およびRedmi Note 14 Proと並んでRedmi Note 14シリーズをリードします。李

OnePlus の姉妹ブランドである iQOO の製品サイクルは 2023 年から 4 年で、ほぼ終わりに近づいている可能性があります。それにもかかわらず、ブランドはまだZ9シリーズの開発を終えていないと宣言しました。その最終、そしておそらく最高エンドとなる Turbo+ バリアントが、予測どおりに発表されました。 T
