目次
使用flume+kafka+storm构建实时日志分析系统
ホームページ php教程 php手册 使用flume+kafka+storm构建实时日志分析系统

使用flume+kafka+storm构建实时日志分析系统

Jun 13, 2016 am 08:44 AM
android

使用flume+kafka+storm构建实时日志分析系统

本文只会涉及flume和kafka的结合,kafka和storm的结合可以参考其他博客
1. flume安装使用
下载flume安装包http://www.apache.org/dyn/closer.cgi/flume/1.5.2/apache-flume-1.5.2-bin.tar.gz
解压$ tar -xzvf apache-flume-1.5.2-bin.tar.gz -C /opt/flume
flume配置文件放在conf文件目录下,执行文件放在bin文件目录下。
1)配置flume
进入conf目录将flume-conf.properties.template拷贝一份,并命名为自己需要的名字
$ cp flume-conf.properties.template flume.conf
修改flume.conf的内容,我们使用file sink来接收channel中的数据,channel采用memory channel,source采用exec source,配置文件如下:
<ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>agent.sources = seqGenSrc</li><li>agent.channels = memoryChannel</li><li>agent.sinks = loggerSink</li><li></li><li># For each one of the sources, the type is defined</li><li>agent.sources.seqGenSrc.type = exec</li><li>agent.sources.seqGenSrc.command = tail -F /data/mongodata/mongo.log</li><li>#agent.sources.seqGenSrc.bind = 172.168.49.130</li><li></li><li># The channel can be defined as follows.</li><li>agent.sources.seqGenSrc.channels = memoryChannel</li><li></li><li># Each sink's type must be defined</li><li>agent.sinks.loggerSink.type = file_roll</li><li>agent.sinks.loggerSink.sink.directory = /data/flume</li><li></li><li>#Specify the channel the sink should use</li><li>agent.sinks.loggerSink.channel = memoryChannel</li><li></li><li># Each channel's type is defined.</li><li>agent.channels.memoryChannel.type = memory</li><li></li><li># Other config values specific to each type of channel(sink or source)</li><li># can be defined as well</li><li># In this case, it specifies the capacity of the memory channel</li><li>agent.channels.memoryChannel.capacity = 1000</li><li>agent.channels.memory4log.transactionCapacity = 100</li></ol>
ログイン後にコピー
2)运行flume agent
切换到bin目录下,运行一下命令:
$ ./flume-ng agent --conf ../conf -f ../conf/flume.conf --n agent -Dflume.root.logger=INFO,console
在/data/flume目录下可以看到生成的日志文件。

2. 结合kafka
由于flume1.5.2没有kafka sink,所以需要自己开发kafka sink
可以参考flume 1.6里面的kafka sink,但是要注意使用的kafka版本,由于有些kafka api不兼容的
这里只提供核心代码,process()内容。

<ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>Sink.Status status = Status.READY;<br /> </li><li><br /></li><li>Channel ch = getChannel();<br /></li><li>Transaction transaction = null;<br /></li><li>Event event = null;<br /></li><li>String eventTopic = null;<br /></li><li>String eventKey = null;<br /></li><li><br /></li><li>try {<br /></li><li>transaction = ch.getTransaction();<br /></li><li>transaction.begin();<br /></li><li>messageList.clear();<br /></li><li><br /></li><li>if (type.equals("sync")) {<br /></li><li>event = ch.take();<br /></li><li><br /></li><li> if (event != null) {<br /></li><li>        byte[] tempBody = event.getBody();<br /></li><li> String eventBody = new String(tempBody,"UTF-8");<br /></li><li> Map<String, String> headers = event.getHeaders();<br /></li><li><br /></li><li> if ((eventTopic = headers.get(TOPIC_HDR)) == null) {<br /></li><li>          eventTopic = topic;<br /></li><li> }<br /></li><li><br /></li><li>        eventKey = headers.get(KEY_HDR);<br /></li><li><br /></li><li> if (logger.isDebugEnabled()) {<br /></li><li> logger.debug("{Event} " + eventTopic + " : " + eventKey + " : "<br /></li><li> + eventBody);<br /></li><li> }<br /></li><li> <br /></li><li>        ProducerData<String, Message> data = new ProducerData<String, Message><br /></li><li> (eventTopic, new Message(tempBody));<br /></li><li> <br /></li><li> long startTime = System.nanoTime();<br /></li><li> logger.debug(eventTopic+"++++"+eventBody);<br /></li><li>        producer.send(data);<br /></li><li> long endTime = System.nanoTime(); </li><li> }<br /></li><li>} else {<br /></li><li>long processedEvents = 0;<br /></li><li>for (; processedEvents < batchSize; processedEvents += 1) {<br /></li><li>event = ch.take();<br /></li><li><br /></li><li> if (event == null) {<br /></li><li> break;<br /></li><li> }<br /></li><li><br /></li><li> byte[] tempBody = event.getBody();<br /></li><li> String eventBody = new String(tempBody,"UTF-8");<br /></li><li> Map<String, String> headers = event.getHeaders();<br /></li><li><br /></li><li> if ((eventTopic = headers.get(TOPIC_HDR)) == null) {<br /></li><li>          eventTopic = topic;<br /></li><li> }<br /></li><li><br /></li><li>        eventKey = headers.get(KEY_HDR);<br /></li><li><br /></li><li> if (logger.isDebugEnabled()) {<br /></li><li> logger.debug("{Event} " + eventTopic + " : " + eventKey + " : "<br /></li><li> + eventBody);<br /></li><li> logger.debug("event #{}", processedEvents);<br /></li><li> }<br /></li><li><br /></li><li> // create a message and add to buffer<br /></li><li>        ProducerData<String, String> data = new ProducerData<String, String><br /></li><li> (eventTopic, eventBody);<br /></li><li>        messageList.add(data);<br /></li><li>}<br /></li><li><br /></li><li>// publish batch and commit.<br /></li><li> if (processedEvents > 0) {<br /></li><li> long startTime = System.nanoTime(); </li><li> long endTime = System.nanoTime(); </li><li> }<br /></li><li>}<br /></li><li><br /></li><li>transaction.commit();<br /></li><li>} catch (Exception ex) {<br /></li><li>String errorMsg = "Failed to publish events";<br /></li><li>logger.error("Failed to publish events", ex);<br /></li><li>status = Status.BACKOFF;<br /></li><li>if (transaction != null) {<br /></li><li>try {<br /></li><li>transaction.rollback(); </li><li>} catch (Exception e) {<br /></li><li>logger.error("Transaction rollback failed", e);<br /></li><li>throw Throwables.propagate(e);<br /></li><li>}<br /></li><li>}<br /></li><li>throw new EventDeliveryException(errorMsg, ex);<br /></li><li>} finally {<br /></li><li>if (transaction != null) {<br /></li><li>transaction.close();<br /></li><li>}<br /></li><li>}<br /></li><li><br /></li><li>return status; </li></ol>
ログイン後にコピー
下一步,修改flume配置文件,将其中sink部分的配置改成kafka sink,如:

<ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>producer.sinks.r.type = org.apache.flume.sink.kafka.KafkaSink<br /> </li><li>producer.sinks.r.brokerList = bigdata-node00:9092<br /></li><li>producer.sinks.r.requiredAcks = 1<br /></li><li>producer.sinks.r.batchSize = 100<br /></li><li>#producer.sinks.r.kafka.producer.type=async<br /></li><li>#producer.sinks.r.kafka.customer.encoding=UTF-8<br /></li><li>producer.sinks.r.topic = testFlume1</li></ol>
ログイン後にコピー
type指向kafkasink所在的完整路径
下面的参数都是kafka的一系列参数,最重要的是brokerList和topic参数

现在重新启动flume,就可以在kafka的对应topic下查看到对应的日志
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。

ホットAIツール

Undresser.AI Undress

Undresser.AI Undress

リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover

AI Clothes Remover

写真から衣服を削除するオンライン AI ツール。

Undress AI Tool

Undress AI Tool

脱衣画像を無料で

Clothoff.io

Clothoff.io

AI衣類リムーバー

AI Hentai Generator

AI Hentai Generator

AIヘンタイを無料で生成します。

ホットツール

メモ帳++7.3.1

メモ帳++7.3.1

使いやすく無料のコードエディター

SublimeText3 中国語版

SublimeText3 中国語版

中国語版、とても使いやすい

ゼンドスタジオ 13.0.1

ゼンドスタジオ 13.0.1

強力な PHP 統合開発環境

ドリームウィーバー CS6

ドリームウィーバー CS6

ビジュアル Web 開発ツール

SublimeText3 Mac版

SublimeText3 Mac版

神レベルのコード編集ソフト(SublimeText3)

新しいレポートは、噂のSamsung Galaxy S25、Galaxy S25 Plus、Galaxy S25 Ultraのカメラアップグレードのひどい評価を提供します 新しいレポートは、噂のSamsung Galaxy S25、Galaxy S25 Plus、Galaxy S25 Ultraのカメラアップグレードのひどい評価を提供します Sep 12, 2024 pm 12:23 PM

ここ数日、Ice Universeは、サムスンの次期主力スマートフォンであると広く信じられているGalaxy S25 Ultraの詳細を着実に明らかにしている。とりわけ、リーカーはサムスンがカメラのアップグレードを1つだけ計画していると主張した

Samsung Galaxy S25 Ultraの最初のレンダリング画像がリークされ、噂のデザイン変更が明らかに Samsung Galaxy S25 Ultraの最初のレンダリング画像がリークされ、噂のデザイン変更が明らかに Sep 11, 2024 am 06:37 AM

OnLeaks は、X (旧 Twitter) のフォロワーから 4,000 ドル以上を集めようとして失敗した数日後、Android Headlines と提携して Galaxy S25 Ultra のファーストルックを提供しました。コンテキストとして、h の下に埋め込まれたレンダリング イメージ

IFA 2024 | TCLのNXTPAPER 14は、パフォーマンスではGalaxy Tab S10 Ultraに匹敵しませんが、サイズではほぼ匹敵します IFA 2024 | TCLのNXTPAPER 14は、パフォーマンスではGalaxy Tab S10 Ultraに匹敵しませんが、サイズではほぼ匹敵します Sep 07, 2024 am 06:35 AM

TCLは、2つの新しいスマートフォンの発表に加えて、NXTPAPER 14と呼ばれる新しいAndroidタブレットも発表しました。その巨大な画面サイズはセールスポイントの1つです。 NXTPAPER 14 は、TCL の代表的なブランドであるマット LCD パネルのバージョン 3.0 を搭載しています。

Vivo Y300 Pro は、7.69 mm のスリムなボディに 6,500 mAh のバッテリーを搭載 Vivo Y300 Pro は、7.69 mm のスリムなボディに 6,500 mAh のバッテリーを搭載 Sep 07, 2024 am 06:39 AM

Vivo Y300 Pro は完全に公開されたばかりで、大容量バッテリーを備えた最もスリムなミッドレンジ Android スマートフォンの 1 つです。正確に言うと、このスマートフォンの厚さはわずか 7.69 mm ですが、6,500 mAh のバッテリーを搭載しています。これは最近発売されたものと同じ容量です

Samsung Galaxy S24 FEは、4色と2つのメモリオプションで予想よりも低価格で発売されると請求されています Samsung Galaxy S24 FEは、4色と2つのメモリオプションで予想よりも低価格で発売されると請求されています Sep 12, 2024 pm 09:21 PM

サムスンは、ファンエディション(FE)スマートフォンシリーズをいつアップデートするかについて、まだ何のヒントも提供していない。現時点では、Galaxy S23 FE は 2023 年 10 月初めに発表された同社の最新版のままです。

新しいレポートは、噂のSamsung Galaxy S25、Galaxy S25 Plus、Galaxy S25 Ultraのカメラアップグレードのひどい評価を提供します 新しいレポートは、噂のSamsung Galaxy S25、Galaxy S25 Plus、Galaxy S25 Ultraのカメラアップグレードのひどい評価を提供します Sep 12, 2024 pm 12:22 PM

ここ数日、Ice Universeは、サムスンの次期主力スマートフォンであると広く信じられているGalaxy S25 Ultraの詳細を着実に明らかにしている。とりわけ、リーカーはサムスンがカメラのアップグレードを1つだけ計画していると主張した

Xiaomi Redmi Note 14 Pro Plusは、Light Hunter 800カメラを搭載した初のQualcomm Snapdragon 7s Gen 3スマートフォンとして登場します Xiaomi Redmi Note 14 Pro Plusは、Light Hunter 800カメラを搭載した初のQualcomm Snapdragon 7s Gen 3スマートフォンとして登場します Sep 27, 2024 am 06:23 AM

Redmi Note 14 Pro Plusは、昨年のRedmi Note 13 Pro Plus(Amazonで現在375ドル)の直接の後継者として正式に発表されました。予想通り、Redmi Note 14 Pro Plusは、Redmi Note 14およびRedmi Note 14 Proと並んでRedmi Note 14シリーズをリードします。李

iQOO Z9 Turbo Plus: 強化されたシリーズフラッグシップの予約開始 iQOO Z9 Turbo Plus: 強化されたシリーズフラッグシップの予約開始 Sep 10, 2024 am 06:45 AM

OnePlus の姉妹ブランドである iQOO の製品サイクルは 2023 年から 4 年で、ほぼ終わりに近づいている可能性があります。それにもかかわらず、ブランドはまだZ9シリーズの開発を終えていないと宣言しました。その最終、そしておそらく最高エンドとなる Turbo+ バリアントが、予測どおりに発表されました。 T

See all articles