使用flume+kafka+storm构建实时日志分析系统_PHP教程
使用flume+kafka+storm构建实时日志分析系统
本文只会涉及flume和kafka的结合,kafka和storm的结合可以参考其他博客1. flume安装使用
下载flume安装包http://www.apache.org/dyn/closer.cgi/flume/1.5.2/apache-flume-1.5.2-bin.tar.gz
解压$ tar -xzvf apache-flume-1.5.2-bin.tar.gz -C /opt/flume
flume配置文件放在conf文件目录下,执行文件放在bin文件目录下。
1)配置flume
进入conf目录将flume-conf.properties.template拷贝一份,并命名为自己需要的名字
$ cp flume-conf.properties.template flume.conf
修改flume.conf的内容,我们使用file sink来接收channel中的数据,channel采用memory channel,source采用exec source,配置文件如下:
<ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>agent.sources = seqGenSrc</li><li>agent.channels = memoryChannel</li><li>agent.sinks = loggerSink</li><li></li><li># For each one of the sources, the type is defined</li><li>agent.sources.seqGenSrc.type = exec</li><li>agent.sources.seqGenSrc.command = tail -F /data/mongodata/mongo.log</li><li>#agent.sources.seqGenSrc.bind = 172.168.49.130</li><li></li><li># The channel can be defined as follows.</li><li>agent.sources.seqGenSrc.channels = memoryChannel</li><li></li><li># Each sink's type must be defined</li><li>agent.sinks.loggerSink.type = file_roll</li><li>agent.sinks.loggerSink.sink.directory = /data/flume</li><li></li><li>#Specify the channel the sink should use</li><li>agent.sinks.loggerSink.channel = memoryChannel</li><li></li><li># Each channel's type is defined.</li><li>agent.channels.memoryChannel.type = memory</li><li></li><li># Other config values specific to each type of channel(sink or source)</li><li># can be defined as well</li><li># In this case, it specifies the capacity of the memory channel</li><li>agent.channels.memoryChannel.capacity = 1000</li><li>agent.channels.memory4log.transactionCapacity = 100</li></ol>
切换到bin目录下,运行一下命令:
$ ./flume-ng agent --conf ../conf -f ../conf/flume.conf --n agent -Dflume.root.logger=INFO,console
在/data/flume目录下可以看到生成的日志文件。
2. 结合kafka
由于flume1.5.2没有kafka sink,所以需要自己开发kafka sink
可以参考flume 1.6里面的kafka sink,但是要注意使用的kafka版本,由于有些kafka api不兼容的
这里只提供核心代码,process()内容。
<ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>Sink.Status status = Status.READY;<br /> </li><li><br /></li><li>Channel ch = getChannel();<br /></li><li>Transaction transaction = null;<br /></li><li>Event event = null;<br /></li><li>String eventTopic = null;<br /></li><li>String eventKey = null;<br /></li><li><br /></li><li>try {<br /></li><li>transaction = ch.getTransaction();<br /></li><li>transaction.begin();<br /></li><li>messageList.clear();<br /></li><li><br /></li><li>if (type.equals("sync")) {<br /></li><li>event = ch.take();<br /></li><li><br /></li><li> if (event != null) {<br /></li><li> byte[] tempBody = event.getBody();<br /></li><li> String eventBody = new String(tempBody,"UTF-8");<br /></li><li> Map<String, String> headers = event.getHeaders();<br /></li><li><br /></li><li> if ((eventTopic = headers.get(TOPIC_HDR)) == null) {<br /></li><li> eventTopic = topic;<br /></li><li> }<br /></li><li><br /></li><li> eventKey = headers.get(KEY_HDR);<br /></li><li><br /></li><li> if (logger.isDebugEnabled()) {<br /></li><li> logger.debug("{Event} " + eventTopic + " : " + eventKey + " : "<br /></li><li> + eventBody);<br /></li><li> }<br /></li><li> <br /></li><li> ProducerData<String, Message> data = new ProducerData<String, Message><br /></li><li> (eventTopic, new Message(tempBody));<br /></li><li> <br /></li><li> long startTime = System.nanoTime();<br /></li><li> logger.debug(eventTopic+"++++"+eventBody);<br /></li><li> producer.send(data);<br /></li><li> long endTime = System.nanoTime(); </li><li> }<br /></li><li>} else {<br /></li><li>long processedEvents = 0;<br /></li><li>for (; processedEvents < batchSize; processedEvents += 1) {<br /></li><li>event = ch.take();<br /></li><li><br /></li><li> if (event == null) {<br /></li><li> break;<br /></li><li> }<br /></li><li><br /></li><li> byte[] tempBody = event.getBody();<br /></li><li> String eventBody = new String(tempBody,"UTF-8");<br /></li><li> Map<String, String> headers = event.getHeaders();<br /></li><li><br /></li><li> if ((eventTopic = headers.get(TOPIC_HDR)) == null) {<br /></li><li> eventTopic = topic;<br /></li><li> }<br /></li><li><br /></li><li> eventKey = headers.get(KEY_HDR);<br /></li><li><br /></li><li> if (logger.isDebugEnabled()) {<br /></li><li> logger.debug("{Event} " + eventTopic + " : " + eventKey + " : "<br /></li><li> + eventBody);<br /></li><li> logger.debug("event #{}", processedEvents);<br /></li><li> }<br /></li><li><br /></li><li> // create a message and add to buffer<br /></li><li> ProducerData<String, String> data = new ProducerData<String, String><br /></li><li> (eventTopic, eventBody);<br /></li><li> messageList.add(data);<br /></li><li>}<br /></li><li><br /></li><li>// publish batch and commit.<br /></li><li> if (processedEvents > 0) {<br /></li><li> long startTime = System.nanoTime(); </li><li> long endTime = System.nanoTime(); </li><li> }<br /></li><li>}<br /></li><li><br /></li><li>transaction.commit();<br /></li><li>} catch (Exception ex) {<br /></li><li>String errorMsg = "Failed to publish events";<br /></li><li>logger.error("Failed to publish events", ex);<br /></li><li>status = Status.BACKOFF;<br /></li><li>if (transaction != null) {<br /></li><li>try {<br /></li><li>transaction.rollback(); </li><li>} catch (Exception e) {<br /></li><li>logger.error("Transaction rollback failed", e);<br /></li><li>throw Throwables.propagate(e);<br /></li><li>}<br /></li><li>}<br /></li><li>throw new EventDeliveryException(errorMsg, ex);<br /></li><li>} finally {<br /></li><li>if (transaction != null) {<br /></li><li>transaction.close();<br /></li><li>}<br /></li><li>}<br /></li><li><br /></li><li>return status; </li></ol>
<ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>producer.sinks.r.type = org.apache.flume.sink.kafka.KafkaSink<br /> </li><li>producer.sinks.r.brokerList = bigdata-node00:9092<br /></li><li>producer.sinks.r.requiredAcks = 1<br /></li><li>producer.sinks.r.batchSize = 100<br /></li><li>#producer.sinks.r.kafka.producer.type=async<br /></li><li>#producer.sinks.r.kafka.customer.encoding=UTF-8<br /></li><li>producer.sinks.r.topic = testFlume1</li></ol>
下面的参数都是kafka的一系列参数,最重要的是brokerList和topic参数
现在重新启动flume,就可以在kafka的对应topic下查看到对应的日志

Outils d'IA chauds

Undresser.AI Undress
Application basée sur l'IA pour créer des photos de nu réalistes

AI Clothes Remover
Outil d'IA en ligne pour supprimer les vêtements des photos.

Undress AI Tool
Images de déshabillage gratuites

Clothoff.io
Dissolvant de vêtements AI

Video Face Swap
Échangez les visages dans n'importe quelle vidéo sans effort grâce à notre outil d'échange de visage AI entièrement gratuit !

Article chaud

Outils chauds

Bloc-notes++7.3.1
Éditeur de code facile à utiliser et gratuit

SublimeText3 version chinoise
Version chinoise, très simple à utiliser

Envoyer Studio 13.0.1
Puissant environnement de développement intégré PHP

Dreamweaver CS6
Outils de développement Web visuel

SublimeText3 version Mac
Logiciel d'édition de code au niveau de Dieu (SublimeText3)

Ces derniers jours, Ice Universe n'a cessé de révéler des détails sur le Galaxy S25 Ultra, qui est largement considéré comme le prochain smartphone phare de Samsung. Entre autres choses, le fuyard a affirmé que Samsung prévoyait d'apporter une seule mise à niveau de l'appareil photo.

OnLeaks s'est désormais associé à Android Headlines pour offrir un premier aperçu du Galaxy S25 Ultra, quelques jours après une tentative infructueuse de générer plus de 4 000 $ auprès de ses abonnés X (anciennement Twitter). Pour le contexte, les images de rendu intégrées ci-dessous h

En plus d'annoncer deux nouveaux smartphones, TCL a également annoncé une nouvelle tablette Android appelée NXTPAPER 14, et sa taille d'écran massive est l'un de ses arguments de vente. Le NXTPAPER 14 est doté de la version 3.0 de la marque emblématique de panneaux LCD mats de TCL.

Le Vivo Y300 Pro vient d'être entièrement dévoilé et c'est l'un des téléphones Android de milieu de gamme les plus fins avec une grande batterie. Pour être exact, le smartphone ne fait que 7,69 mm d'épaisseur mais dispose d'une batterie de 6 500 mAh. C'est la même capacité que le lancement récent

Samsung n'a pas encore donné d'indications sur la date à laquelle il mettrait à jour sa série de smartphones Fan Edition (FE). Dans l’état actuel des choses, le Galaxy S23 FE reste l’édition la plus récente de la société, ayant été présentée début octobre 2023. Cependant, de nombreux

Ces derniers jours, Ice Universe n'a cessé de révéler des détails sur le Galaxy S25 Ultra, qui est largement considéré comme le prochain smartphone phare de Samsung. Entre autres choses, le fuyard a affirmé que Samsung prévoyait d'apporter une seule mise à niveau de l'appareil photo.

Le Redmi Note 14 Pro Plus est désormais officiel en tant que successeur direct du Redmi Note 13 Pro Plus de l'année dernière (375 $ sur Amazon). Comme prévu, le Redmi Note 14 Pro Plus est en tête de la série Redmi Note 14 aux côtés du Redmi Note 14 et du Redmi Note 14 Pro. Li

La marque sœur de OnePlus, iQOO, a un cycle de produits 2023-4 qui pourrait être presque terminé ; néanmoins, la marque a déclaré qu'elle n'en avait pas encore fini avec sa série Z9. Sa variante Turbo+ finale, et peut-être la plus haut de gamme, vient d'être annoncée comme prévu. T
