首頁 資料庫 mysql教程 Kafka+Storm+HDFS整合实践

Kafka+Storm+HDFS整合实践

Jun 07, 2016 pm 04:39 PM
基於 實踐 整合

在基于Hadoop平台的很多应用场景中,我们需要对数据进行离线和实时分析,离线分析可以很容易地借助于Hive来实现统计分析,但是对于实时的需求Hive就不合适了。实时应用场景可以使用Storm,它是一个实时处理系统,它为实时处理类应用提供了一个计算模型,可以

在基于Hadoop平台的很多应用场景中,我们需要对数据进行离线和实时分析,离线分析可以很容易地借助于Hive来实现统计分析,但是对于实时的需求Hive就不合适了。实时应用场景可以使用Storm,它是一个实时处理系统,它为实时处理类应用提供了一个计算模型,可以很容易地进行编程处理。为了统一离线和实时计算,一般情况下,我们都希望将离线和实时计算的数据源的集合统一起来作为输入,然后将数据的流向分别经由实时系统和离线分析系统,分别进行分析处理,这时我们可以考虑将数据源(如使用Flume收集日志)直接连接一个消息中间件,如Kafka,可以整合Flume+Kafka,Flume作为消息的Producer,生产的消息数据(日志数据、业务请求数据等等)发布到Kafka中,然后通过订阅的方式,使用Storm的Topology作为消息的Consumer,在Storm集群中分别进行如下两个需求场景的处理:

  • 直接使用Storm的Topology对数据进行实时分析处理
  • 整合Storm+HDFS,将消息处理后写入HDFS进行离线分析处理

实时处理,只要开发满足业务需要的Topology即可,不做过多说明。这里,我们主要从安装配置Kafka、Storm,以及整合Kafka+Storm、整合Storm+HDFS、整合Kafka+Storm+HDFS这几点来配置实践,满足上面提出的一些需求。配置实践使用的软件包如下所示:

  • zookeeper-3.4.5.tar.gz
  • kafka_2.9.2-0.8.1.1.tgz
  • apache-storm-0.9.2-incubating.tar.gz
  • hadoop-2.2.0.tar.gz

程序配置运行所基于的操作系统为CentOS 5.11。

Kafka安装配置

我们使用3台机器搭建Kafka集群:

192.168.4.142   h1
192.168.4.143   h2
192.168.4.144   h3
登入後複製
登入後複製

在安装Kafka集群之前,这里没有使用Kafka自带的Zookeeper,而是独立安装了一个Zookeeper集群,也是使用这3台机器,保证Zookeeper集群正常运行。
首先,在h1上准备Kafka安装文件,执行如下命令:

cd /usr/local/
wget http://mirror.bit.edu.cn/apache/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz
tar xvzf kafka_2.9.2-0.8.1.1.tgz
ln -s /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka
chown -R kafka:kafka /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka
登入後複製

修改配置文件/usr/local/kafka/config/server.properties,修改如下内容:

broker.id=0
zookeeper.connect=h1:2181,h2:2181,h3:2181/kafka
登入後複製

这里需要说明的是,默认Kafka会使用ZooKeeper默认的/路径,这样有关Kafka的ZooKeeper配置就会散落在根路径下面,如果你有其他的应用也在使用ZooKeeper集群,查看ZooKeeper中数据可能会不直观,所以强烈建议指定一个chroot路径,直接在zookeeper.connect配置项中指定:

zookeeper.connect=h1:2181,h2:2181,h3:2181/kafka
登入後複製

而且,需要手动在ZooKeeper中创建路径/kafka,使用如下命令连接到任意一台ZooKeeper服务器:

cd /usr/local/zookeeper
bin/zkCli.sh
登入後複製

在ZooKeeper执行如下命令创建chroot路径:

create /kafka ''
登入後複製

这样,每次连接Kafka集群的时候(使用--zookeeper选项),也必须使用带chroot路径的连接字符串,后面会看到。
然后,将配置好的安装文件同步到其他的h2、h3节点上:

scp -r /usr/local/kafka_2.9.2-0.8.1.1/ h2:/usr/local/
scp -r /usr/local/kafka_2.9.2-0.8.1.1/ h3:/usr/local/
登入後複製

最后,在h2、h3节点上配置,执行如下命令:

cd /usr/local/
ln -s /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka
chown -R kafka:kafka /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka
登入後複製

并修改配置文件/usr/local/kafka/config/server.properties内容如下所示:

broker.id=1  # 在h1修改
broker.id=2  # 在h2修改
登入後複製

因为Kafka集群需要保证各个Broker的id在整个集群中必须唯一,需要调整这个配置项的值(如果在单机上,可以通过建立多个Broker进程来模拟分布式的Kafka集群,也需要Broker的id唯一,还需要修改一些配置目录的信息)。
在集群中的h1、h2、h3这三个节点上分别启动Kafka,分别执行如下命令:

bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
登入後複製

可以通过查看日志,或者检查进程状态,保证Kafka集群启动成功。
我们创建一个名称为my-replicated-topic5的Topic,5个分区,并且复制因子为3,执行如下命令:

bin/kafka-topics.sh --create --zookeeper h1:2181,h2:2181,h3:2181/kafka --replication-factor 3 --partitions 5 --topic my-replicated-topic5
登入後複製

查看创建的Topic,执行如下命令:

bin/kafka-topics.sh --describe --zookeeper h1:2181,h2:2181,h3:2181/kafka --topic my-replicated-topic5
登入後複製

结果信息如下所示:

Topic:my-replicated-topic5     PartitionCount:5     ReplicationFactor:3     Configs:
     Topic: my-replicated-topic5     Partition: 0     Leader: 0     Replicas: 0,2,1     Isr: 0,2,1
     Topic: my-replicated-topic5     Partition: 1     Leader: 0     Replicas: 1,0,2     Isr: 0,2,1
     Topic: my-replicated-topic5     Partition: 2     Leader: 2     Replicas: 2,1,0     Isr: 2,0,1
     Topic: my-replicated-topic5     Partition: 3     Leader: 0     Replicas: 0,1,2     Isr: 0,2,1
     Topic: my-replicated-topic5     Partition: 4     Leader: 2     Replicas: 1,2,0     Isr: 2,0,1
登入後複製

上面Leader、Replicas、Isr的含义如下:

Partition: 分区
Leader   : 负责读写指定分区的节点
Replicas : 复制该分区log的节点列表
Isr      : "in-sync" replicas,当前活跃的副本列表(是一个子集),并且可能成为Leader
登入後複製

我们可以通过Kafka自带的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh脚本,来验证演示如果发布消息、消费消息。
在一个终端,启动Producer,并向我们上面创建的名称为my-replicated-topic5的Topic中生产消息,执行如下脚本:

bin/kafka-console-producer.sh --broker-list h1:9092,h2:9092,h3:9092 --topic my-replicated-topic5
登入後複製

在另一个终端,启动Consumer,并订阅我们上面创建的名称为my-replicated-topic5的Topic中生产的消息,执行如下脚本:

bin/kafka-console-consumer.sh --zookeeper h1:2181,h2:2181,h3:2181/kafka --from-beginning --topic my-replicated-topic5
登入後複製

可以在Producer终端上输入字符串消息行,然后回车,就可以在Consumer终端上看到消费者消费的消息内容。
也可以参考Kafka的Producer和Consumer的Java API,通过API编码的方式来实现消息生产和消费的处理逻辑。

Storm安装配置

Storm集群也依赖Zookeeper集群,要保证Zookeeper集群正常运行。Storm的安装配置比较简单,我们仍然使用下面3台机器搭建:

192.168.4.142   h1
192.168.4.143   h2
192.168.4.144   h3
登入後複製
登入後複製

首先,在h1节点上,执行如下命令安装:

cd /usr/local/
wget http://mirror.bit.edu.cn/apache/incubator/storm/apache-storm-0.9.2-incubating/apache-storm-0.9.2-incubating.tar.gz
tar xvzf apache-storm-0.9.2-incubating.tar.gz
ln -s /usr/local/apache-storm-0.9.2-incubating /usr/local/storm
chown -R storm:storm /usr/local/apache-storm-0.9.2-incubating /usr/local/storm
登入後複製

然后,修改配置文件conf/storm.yaml,内容如下所示:

 storm.zookeeper.servers:
     - "h1"
     - "h2"
     - "h3"
storm.zookeeper.port: 2181
#
nimbus.host: "h1"
supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703
storm.local.dir: "/tmp/storm"
登入後複製

将配置好的安装文件,分发到其他节点上:

scp -r /usr/local/apache-storm-0.9.2-incubating/ h2:/usr/local/
scp -r /usr/local/apache-storm-0.9.2-incubating/ h3:/usr/local/
登入後複製

最后,在h2、h3节点上配置,执行如下命令:

cd /usr/local/
ln -s /usr/local/apache-storm-0.9.2-incubating /usr/local/storm
chown -R storm:storm /usr/local/apache-storm-0.9.2-incubating /usr/local/storm
登入後複製

Storm集群的主节点为Nimbus,从节点为Supervisor,我们需要在h1上启动Nimbus服务,在从节点h2、h3上启动Supervisor服务:

bin/storm nimbus &
bin/storm supervisor &
登入後複製

为了方便监控,可以启动Storm UI,可以从Web页面上监控Storm Topology的运行状态,例如在h2上启动:

bin/storm ui &
登入後複製

这样可以通过访问http://h2:8080/来查看Topology的运行状况。

整合Kafka+Storm

消息通过各种方式进入到Kafka消息中间件,比如可以通过使用Flume来收集日志数据,然后在Kafka中路由暂存,然后再由实时计算程序Storm做实时分析,这时我们就需要将在Storm的Spout中读取Kafka中的消息,然后交由具体的Spot组件去分析处理。实际上,apache-storm-0.9.2-incubating这个版本的Storm已经自带了一个集成Kafka的外部插件程序storm-kafka,可以直接使用,例如我使用的Maven依赖配置,如下所示:

          <dependency>
               <groupid>org.apache.storm</groupid>
               <artifactid>storm-core</artifactid>
               <version>0.9.2-incubating</version>
               <scope>provided</scope>
          </dependency>
          <dependency>
               <groupid>org.apache.storm</groupid>
               <artifactid>storm-kafka</artifactid>
               <version>0.9.2-incubating</version>
          </dependency>
          <dependency>
               <groupid>org.apache.kafka</groupid>
               <artifactid>kafka_2.9.2</artifactid>
               <version>0.8.1.1</version>
               <exclusions>
                    <exclusion>
                         <groupid>org.apache.zookeeper</groupid>
                         <artifactid>zookeeper</artifactid>
                    </exclusion>
                    <exclusion>
                         <groupid>log4j</groupid>
                         <artifactid>log4j</artifactid>
                    </exclusion>
               </exclusions>
          </dependency>
登入後複製

下面,我们开发了一个简单WordCount示例程序,从Kafka读取订阅的消息行,通过空格拆分出单个单词,然后再做词频统计计算,实现的Topology的代码,如下所示:

package org.shirdrn.storm.examples;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class MyKafkaTopology {
     public static class KafkaWordSplitter extends BaseRichBolt {
          private static final Log LOG = LogFactory.getLog(KafkaWordSplitter.class);
          private static final long serialVersionUID = 886149197481637894L;
          private OutputCollector collector;
          @Override
          public void prepare(Map stormConf, TopologyContext context,
                    OutputCollector collector) {
               this.collector = collector;              
          }
          @Override
          public void execute(Tuple input) {
               String line = input.getString(0);
               LOG.info("RECV[kafka -> splitter] " + line);
               String[] words = line.split("\\s+");
               for(String word : words) {
                    LOG.info("EMIT[splitter -> counter] " + word);
                    collector.emit(input, new Values(word, 1));
               }
               collector.ack(input);
          }
          @Override
          public void declareOutputFields(OutputFieldsDeclarer declarer) {
               declarer.declare(new Fields("word", "count"));         
          }
     }
     public static class WordCounter extends BaseRichBolt {
          private static final Log LOG = LogFactory.getLog(WordCounter.class);
          private static final long serialVersionUID = 886149197481637894L;
          private OutputCollector collector;
          private Map<string atomicinteger> counterMap;
          @Override
          public void prepare(Map stormConf, TopologyContext context,
                    OutputCollector collector) {
               this.collector = collector;    
               this.counterMap = new HashMap<string atomicinteger>();
          }
          @Override
          public void execute(Tuple input) {
               String word = input.getString(0);
               int count = input.getInteger(1);
               LOG.info("RECV[splitter -> counter] " + word + " : " + count);
               AtomicInteger ai = this.counterMap.get(word);
               if(ai == null) {
                    ai = new AtomicInteger();
                    this.counterMap.put(word, ai);
               }
               ai.addAndGet(count);
               collector.ack(input);
               LOG.info("CHECK statistics map: " + this.counterMap);
          }
          @Override
          public void cleanup() {
               LOG.info("The final result:");
               Iterator<entry atomicinteger>> iter = this.counterMap.entrySet().iterator();
               while(iter.hasNext()) {
                    Entry<string atomicinteger> entry = iter.next();
                    LOG.info(entry.getKey() + "\t:\t" + entry.getValue().get());
               }
          }
          @Override
          public void declareOutputFields(OutputFieldsDeclarer declarer) {
               declarer.declare(new Fields("word", "count"));         
          }
     }
     public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
          String zks = "h1:2181,h2:2181,h3:2181";
          String topic = "my-replicated-topic5";
          String zkRoot = "/storm"; // default zookeeper root configuration for storm
          String id = "word";
          BrokerHosts brokerHosts = new ZkHosts(zks);
          SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
          spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
          spoutConf.forceFromStart = false;
          spoutConf.zkServers = Arrays.asList(new String[] {"h1", "h2", "h3"});
          spoutConf.zkPort = 2181;
          TopologyBuilder builder = new TopologyBuilder();
          builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5); // Kafka我们创建了一个5分区的Topic,这里并行度设置为5
          builder.setBolt("word-splitter", new KafkaWordSplitter(), 2).shuffleGrouping("kafka-reader");
          builder.setBolt("word-counter", new WordCounter()).fieldsGrouping("word-splitter", new Fields("word"));
          Config conf = new Config();
          String name = MyKafkaTopology.class.getSimpleName();
          if (args != null && args.length > 0) {
               // Nimbus host name passed from command line
               conf.put(Config.NIMBUS_HOST, args[0]);
               conf.setNumWorkers(3);
               StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
          } else {
               conf.setMaxTaskParallelism(3);
               LocalCluster cluster = new LocalCluster();
               cluster.submitTopology(name, conf, builder.createTopology());
               Thread.sleep(60000);
               cluster.shutdown();
          }
     }
}
</string></entry></string></string>
登入後複製

上面程序,在本地调试(使用LocalCluster)不需要输入任何参数,提交到实际集群中运行时,需要传递一个参数,该参数为Nimbus的主机名称。
通过Maven构建,生成一个包含依赖的single jar文件(不要把Storm的依赖包添加进去),例如storm-examples-0.0.1-SNAPSHOT.jar,在提交Topology程序到Storm集群之前,因为用到了Kafka,需要拷贝一下依赖jar文件到Storm集群中的lib目录下面:

cp /usr/local/kafka/libs/kafka_2.9.2-0.8.1.1.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/scala-library-2.9.2.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/metrics-core-2.2.0.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/snappy-java-1.0.5.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/zkclient-0.3.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/log4j-1.2.15.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/slf4j-api-1.7.2.jar /usr/local/storm/lib/
cp /usr/local/kafka/libs/jopt-simple-3.2.jar /usr/local/storm/lib/
登入後複製

然后,就可以提交我们开发的Topology程序了:

bin/storm jar /home/storm/storm-examples-0.0.1-SNAPSHOT.jar org.shirdrn.storm.examples.MyKafkaTopology h1
登入後複製

可以通过查看日志文件(logs/目录下)或者Storm UI来监控Topology的运行状况。如果程序没有错误,可以使用前面我们使用的Kafka Producer来生成消息,就能看到我们开发的Storm Topology能够实时接收到并进行处理。
上面Topology实现代码中,有一个很关键的配置对象SpoutConfig,配置属性如下所示:

spoutConf.forceFromStart = false;
登入後複製

该配置是指,如果该Topology因故障停止处理,下次正常运行时是否从Spout对应数据源Kafka中的该订阅Topic的起始位置开始读取,如果forceFromStart=true,则之前处理过的Tuple还要重新处理一遍,否则会从上次处理的位置继续处理,保证Kafka中的Topic数据不被重复处理,是在数据源的位置进行状态记录。

整合Storm+HDFS

Storm实时计算集群从Kafka消息中间件中消费消息,有实时处理需求的可以走实时处理程序,还有需要进行离线分析的需求,如写入到HDFS进行分析。下面实现了一个Topology,代码如下所示:

package org.shirdrn.storm.examples;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.storm.hdfs.bolt.HdfsBolt;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.format.RecordFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
public class StormToHDFSTopology {
     public static class EventSpout extends BaseRichSpout {
          private static final Log LOG = LogFactory.getLog(EventSpout.class);
          private static final long serialVersionUID = 886149197481637894L;
          private SpoutOutputCollector collector;
          private Random rand;
          private String[] records;
          @Override
          public void open(Map conf, TopologyContext context,
                    SpoutOutputCollector collector) {
               this.collector = collector;    
               rand = new Random();
               records = new String[] {
                         "10001     ef2da82d4c8b49c44199655dc14f39f6     4.2.1     HUAWEI G610-U00     HUAWEI     2     70:72:3c:73:8b:22     2014-10-13 12:36:35",
                         "10001     ffb52739a29348a67952e47c12da54ef     4.3     GT-I9300     samsung     2     50:CC:F8:E4:22:E2     2014-10-13 12:36:02",
                         "10001     ef2da82d4c8b49c44199655dc14f39f6     4.2.1     HUAWEI G610-U00     HUAWEI     2     70:72:3c:73:8b:22     2014-10-13 12:36:35"
               };
          }
          @Override
          public void nextTuple() {
               Utils.sleep(1000);
               DateFormat df = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
               Date d = new Date(System.currentTimeMillis());
               String minute = df.format(d);
               String record = records[rand.nextInt(records.length)];
               LOG.info("EMIT[spout -> hdfs] " + minute + " : " + record);
               collector.emit(new Values(minute, record));
          }
          @Override
          public void declareOutputFields(OutputFieldsDeclarer declarer) {
               declarer.declare(new Fields("minute", "record"));         
          }
     }
     public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
          // use "|" instead of "," for field delimiter
          RecordFormat format = new DelimitedRecordFormat()
                  .withFieldDelimiter(" : ");
          // sync the filesystem after every 1k tuples
          SyncPolicy syncPolicy = new CountSyncPolicy(1000);
          // rotate files 
          FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimeUnit.MINUTES);
          FileNameFormat fileNameFormat = new DefaultFileNameFormat()
                  .withPath("/storm/").withPrefix("app_").withExtension(".log");
          HdfsBolt hdfsBolt = new HdfsBolt()
                  .withFsUrl("hdfs://h1:8020")
                  .withFileNameFormat(fileNameFormat)
                  .withRecordFormat(format)
                  .withRotationPolicy(rotationPolicy)
                  .withSyncPolicy(syncPolicy);
          TopologyBuilder builder = new TopologyBuilder();
          builder.setSpout("event-spout", new EventSpout(), 3);
          builder.setBolt("hdfs-bolt", hdfsBolt, 2).fieldsGrouping("event-spout", new Fields("minute"));
          Config conf = new Config();
          String name = StormToHDFSTopology.class.getSimpleName();
          if (args != null && args.length > 0) {
               conf.put(Config.NIMBUS_HOST, args[0]);
               conf.setNumWorkers(3);
               StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
          } else {
               conf.setMaxTaskParallelism(3);
               LocalCluster cluster = new LocalCluster();
               cluster.submitTopology(name, conf, builder.createTopology());
               Thread.sleep(60000);
               cluster.shutdown();
          }
     }
}
登入後複製

上面的处理逻辑,可以对HdfsBolt进行更加详细的配置,如FileNameFormat、SyncPolicy、FileRotationPolicy(可以设置在满足什么条件下,切出一个新的日志,如可以指定多长时间切出一个新的日志文件,可以指定一个日志文件大小达到设置值后,再写一个新日志文件),更多设置可以参考storm-hdfs,。
上面代码在打包的时候,需要注意,使用storm-starter自带的Maven打包配置,可能在将Topology部署运行的时候,会报错,可以使用maven-shade-plugin这个插件,如下配置所示:

               <plugin>
                   <groupid>org.apache.maven.plugins</groupid>
                   <artifactid>maven-shade-plugin</artifactid>
                   <version>1.4</version>
                   <configuration>
                       <createdependencyreducedpom>true</createdependencyreducedpom>
                   </configuration>
                   <executions>
                       <execution>
                           <phase>package</phase>
                           <goals>
                               <goal>shade</goal>
                           </goals>
                           <configuration>
                               <transformers>
                                   <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"></transformer>
                                   <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                       <mainclass></mainclass>
                                   </transformer>
                               </transformers>
                           </configuration>
                       </execution>
                   </executions>
               </plugin>
登入後複製

整合Kafka+Storm+HDFS

上面分别对整合Kafka+Storm和Storm+HDFS做了实践,可以将后者的Spout改成前者的Spout,从Kafka中消费消息,在Storm中可以做简单处理,然后将数据写入HDFS,最后可以在Hadoop平台上对数据进行离线分析处理。下面,写了一个简单的例子,从Kafka消费消息,然后经由Storm处理,写入到HDFS存储,代码如下所示:

package org.shirdrn.storm.examples;
import java.util.Arrays;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.storm.hdfs.bolt.HdfsBolt;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.format.RecordFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class DistributeWordTopology {
     public static class KafkaWordToUpperCase extends BaseRichBolt {
          private static final Log LOG = LogFactory.getLog(KafkaWordToUpperCase.class);
          private static final long serialVersionUID = -5207232012035109026L;
          private OutputCollector collector;
          @Override
          public void prepare(Map stormConf, TopologyContext context,
                    OutputCollector collector) {
               this.collector = collector;              
          }
          @Override
          public void execute(Tuple input) {
               String line = input.getString(0).trim();
               LOG.info("RECV[kafka -> splitter] " + line);
               if(!line.isEmpty()) {
                    String upperLine = line.toUpperCase();
                    LOG.info("EMIT[splitter -> counter] " + upperLine);
                    collector.emit(input, new Values(upperLine, upperLine.length()));
               }
               collector.ack(input);
          }
          @Override
          public void declareOutputFields(OutputFieldsDeclarer declarer) {
               declarer.declare(new Fields("line", "len"));         
          }
     }
     public static class RealtimeBolt extends BaseRichBolt {
          private static final Log LOG = LogFactory.getLog(KafkaWordToUpperCase.class);
          private static final long serialVersionUID = -4115132557403913367L;
          private OutputCollector collector;
          @Override
          public void prepare(Map stormConf, TopologyContext context,
                    OutputCollector collector) {
               this.collector = collector;              
          }
          @Override
          public void execute(Tuple input) {
               String line = input.getString(0).trim();
               LOG.info("REALTIME: " + line);
               collector.ack(input);
          }
          @Override
          public void declareOutputFields(OutputFieldsDeclarer declarer) {
          }
     }
     public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
          // Configure Kafka
          String zks = "h1:2181,h2:2181,h3:2181";
          String topic = "my-replicated-topic5";
          String zkRoot = "/storm"; // default zookeeper root configuration for storm
          String id = "word";
          BrokerHosts brokerHosts = new ZkHosts(zks);
          SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
          spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
          spoutConf.forceFromStart = false;
          spoutConf.zkServers = Arrays.asList(new String[] {"h1", "h2", "h3"});
          spoutConf.zkPort = 2181;
          // Configure HDFS bolt
          RecordFormat format = new DelimitedRecordFormat()
                  .withFieldDelimiter("\t"); // use "\t" instead of "," for field delimiter
          SyncPolicy syncPolicy = new CountSyncPolicy(1000); // sync the filesystem after every 1k tuples
          FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimeUnit.MINUTES); // rotate files
          FileNameFormat fileNameFormat = new DefaultFileNameFormat()
                  .withPath("/storm/").withPrefix("app_").withExtension(".log"); // set file name format
          HdfsBolt hdfsBolt = new HdfsBolt()
                  .withFsUrl("hdfs://h1:8020")
                  .withFileNameFormat(fileNameFormat)
                  .withRecordFormat(format)
                  .withRotationPolicy(rotationPolicy)
                  .withSyncPolicy(syncPolicy);
          // configure & build topology
          TopologyBuilder builder = new TopologyBuilder();
          builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5);
          builder.setBolt("to-upper", new KafkaWordToUpperCase(), 3).shuffleGrouping("kafka-reader");
          builder.setBolt("hdfs-bolt", hdfsBolt, 2).shuffleGrouping("to-upper");
          builder.setBolt("realtime", new RealtimeBolt(), 2).shuffleGrouping("to-upper");
          // submit topology
          Config conf = new Config();
          String name = DistributeWordTopology.class.getSimpleName();
          if (args != null && args.length > 0) {
               String nimbus = args[0];
               conf.put(Config.NIMBUS_HOST, nimbus);
               conf.setNumWorkers(3);
               StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
          } else {
               conf.setMaxTaskParallelism(3);
               LocalCluster cluster = new LocalCluster();
               cluster.submitTopology(name, conf, builder.createTopology());
               Thread.sleep(60000);
               cluster.shutdown();
          }
     }
}
登入後複製

上面代码中,名称为to-upper的Bolt将接收到的字符串行转换成大写以后,会将处理过的数据向后面的hdfs-bolt、realtime这两个Bolt各发一份拷贝,然后由这两个Bolt分别根据实际需要(实时/离线)单独处理。
打包后,在Storm集群上部署并运行这个Topology:

bin/storm jar ~/storm-examples-0.0.1-SNAPSHOT.jar org.shirdrn.storm.examples.DistributeWordTopology h1
登入後複製

可以通过Storm UI查看Topology运行情况,可以查看HDFS上生成的数据。

参考链接

  • http://kafka.apache.org/
  • http://kafka.apache.org/documentation.html
  • https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
  • http://storm.apache.org/
  • http://storm.apache.org/documentation/Tutorial.html
  • http://storm.apache.org/documentation/FAQ.html
  • https://github.com/ptgoetz/storm-hdfs
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱工具

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

強大的PHP整合開發環境

Dreamweaver CS6

Dreamweaver CS6

視覺化網頁開發工具

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)

熱門話題

Java教學
1664
14
CakePHP 教程
1423
52
Laravel 教程
1317
25
PHP教程
1268
29
C# 教程
1246
24
如何阻止Outlook自動將事件新增到我的日曆 如何阻止Outlook自動將事件新增到我的日曆 Feb 26, 2024 am 09:49 AM

作為電子郵件管理器應用程序,MicrosoftOutlook允許我們安排活動和約會。它透過提供在Outlook應用程式中建立、管理和追蹤這些活動(也稱為事件)的工具,使我們能夠保持有序。然而,有時會將不需要的事件加入Outlook中的日曆中,這會對使用者造成混亂,並向日曆發送垃圾郵件。在本文中,我們將探討可協助我們防止Outlook自動將事件新增至我的日曆中的各種方案和步驟。 Outlook活動-簡要概述Outlook事件具有多種用途,並具有許多有用的功能,具體如下:日曆整合:在Outlook

深入探討Struts框架的原理與實踐 深入探討Struts框架的原理與實踐 Feb 18, 2024 pm 06:10 PM

Struts框架的原理解析與實務探索Struts框架作為JavaWeb開發中常用的MVC框架,具有良好的設計模式和可擴展性,廣泛應用於企業級應用程式開發中。本文將對Struts框架的原理進行解析,並結合實際程式碼範例進行探索,幫助讀者更好地理解和應用該框架。一、Struts框架的原理解析1.MVC架構Struts框架是基於MVC(Model-View-Con

PHP編碼實踐:拒絕使用goto語句的替代方案 PHP編碼實踐:拒絕使用goto語句的替代方案 Mar 28, 2024 pm 09:24 PM

PHP編碼實踐:拒絕使用goto語句的替代方案近年來,隨著程式語言的不斷更新和迭代,程式設計師開始更加重視編碼規範和最佳實踐。在PHP程式設計中,goto語句作為一種控制流語句存在已久,但在實際應用中往往會導致程式碼的可讀性和可維護性下降。本文將分享一些替代方案,幫助開發人員拒絕使用goto語句,提升程式碼品質。一、為什麼拒絕使用goto語句?首先,讓我們來思考一下為

C++反射機制實踐:實現靈活的運行時類型信息 C++反射機制實踐:實現靈活的運行時類型信息 Nov 27, 2023 pm 01:11 PM

C++反射機制實踐:實作靈活的運行時類型資訊導語:C++是一門強型別語言,不像其他語言直接提供反射機制以取得類別的型別資訊。然而,透過一些技巧和技術手段,我們也可以在C++中實現類似的反射功能。本文將介紹如何利用模板元編程和巨集定義來實現靈活的運行時類型資訊。一、什麼是反射機制?反射機制是指在運行時獲取類別的類型信息,如類別的名稱、成員函數、成員變數以及其他屬性

織夢CMS站群實作分享 織夢CMS站群實作分享 Mar 18, 2024 am 10:18 AM

織夢CMS站群實務分享近年來,隨著網路的快速發展,網站建置變得越來越重要。在建立多個網站時,站群技術成為了一個非常有效的方法。而在眾多網站建立工具中,織夢CMS憑藉其靈活性和易用性成為了不少站群愛好者的首選。本文將分享一些關於織夢CMS站群的實務經驗,以及一些具體的程式碼範例,希望能為正在探索站群技術的讀者提供一些幫助。 1.什麼是織夢CMS站群?織夢CMS

使用Golang進行流量管理的最佳實踐 使用Golang進行流量管理的最佳實踐 Mar 07, 2024 am 08:27 AM

Golang是一種強大且高效的程式語言,廣泛應用於建立網路服務和應用程式。在網路服務中,流量管理是至關重要的一環,它可以幫助我們控制和優化網路上的資料傳輸,保障服務的穩定性和效能。本文將介紹使用Golang進行流量管理的最佳實踐,並提供具體的程式碼範例。 1.使用Golang的net包進行基本的流量管理Golang的net包提供了處理網路數

利用PyCharm實現遠端開發的實用指南 利用PyCharm實現遠端開發的實用指南 Feb 25, 2024 pm 07:18 PM

利用PyCharm進行遠端開發是一種高效的方式,可以讓開發者在本機環境中輕鬆地對遠端伺服器上的程式碼進行編輯、偵錯和運行。本文將介紹如何利用PyCharm進行遠端開發實踐,並結合具體的程式碼範例來幫助讀者更好地理解和應用這項技術。什麼是PyCharmPyCharm是一款由JetBrains公司開發的Python整合開發環境(IDE),提供了豐富的功能和工具來幫

使用 Golang 實現頁面跳躍的最佳實踐 使用 Golang 實現頁面跳躍的最佳實踐 Mar 05, 2024 pm 01:18 PM

使用Golang實現頁面跳轉的最佳實踐在開發web應用程式時,頁面跳轉是一個常見的功能需求。在Golang中,我們可以使用一些函式庫來實現頁面跳轉,例如使用Gin框架來處理路由和頁面跳轉。本文將介紹如何在Golang中實現頁面跳轉的最佳實踐,同時給出具體的程式碼範例。 Gin框架簡介Gin是一個用Go語言寫的web框架,功能強大且容易

See all articles