Kafka原理和集群测试
Kafka是一个消息系统,由LinkedIn贡献给Apache基金会,称为Apache的一个顶级项目。Kafka最初用作LinkedIn的活动流(activity stream)和运营数据处理管道(pipeline)的基
Kafka是一个消息系统,由LinkedIn贡献给Apache基金会,称为Apache的一个顶级项目。Kafka最初用作LinkedIn的活动流(activity stream)和运营数据处理管道(pipeline)的基础。它具有可扩展、吞吐量大和可持久化等特征,以及非常好的分区、复制和容错特征。
Kafka的关键设计决策
1). Kafka在设计之时为就将持久化消息作为通常的使用情况进行了考虑。
2). Kafka主要的设计约束是吞吐量,而不是功能。
3). Kafka有关哪些数据已经被使用了的状态信息保存为数据使用者(consumer)的一部分,而不是保存在服务器之上。
4). Kafka是一种显式的分布式系统。它假设,数据生产者(producer)、代理(brokers)和数据使用者(consumer)分散于多台机器之上。
而相比而言,传统的消息队列不能很好的支持(如超长的未处理数据、不能有效持久化)。对于数据的可用性,Kafka提供了两个保证:
(1). 生产者发送到Topic的分区上消息将会按照它们发送的顺序,而消费者收到的消息也是此顺序
(2). 如果一个Topic配置了复制因子( replication facto)为N, 那么可以允许N-1服务器当掉而不丢失任何已经增加的消息
Kafka中几个关键术语
Topic:Kafka将消息种子(Feed)分门别类, 每一类的消息称之为话题(Topic).
Producer:发布消息的对象称之为话题生产者(Kafka topic producer)
Consumer:订阅消息并处理发布的消息的种子的对象称之为话题消费者(consumers)
Broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker). 消费者可以订阅一个或多个话题,并从Broker拉数据,从而消费这些已发布的消息。
Kafka中的Topic
Topic是发布的消息的类别或者种子Feed名。对于每一个Topic, Kafka集群维护这一个分区的log,就像下图中的示例:Kafka集群
每一个分区都是一个顺序的、不可变的消息队列, 并且可以持续的添加。分区中的消息都被分配了一个序列号,称之为偏移量(offset),在每个分区中此偏移量都是唯一的。
Kafka集群保持所有的消息,直到它们过期,无论消息是否被消费了。
实际上消费者所持有的仅有的元数据就是这个偏移量,也就是消费者在这个log中的位置。 这个偏移量由消费者控制:正常情况当消费者消费消息的时候,偏移量也线性的的增加。但是实际偏移量由消费者控制,消费者可以将偏移量重置为更老的一个偏移量,重新读取消息。
可以看到这种设计对消费者来说操作自如, 一个消费者的操作不会影响其它消费者对此log的处理。
再说说分区。Kafka中采用分区的设计有几个目的。
一、可以处理更多的消息,不受单台服务器的限制。Topic拥有多个分区意味着它可以进行扩展,并处理更多的数据。
二、分区可以作为并行处理的单元。
Topic的分区Log被分布到集群中的多个服务器上。每个服务器处理它持有的分区。 根据配置每个分区还可以复制到其它服务器作为备份容错。
每个分区有一个leader,零或多个replica。Leader处理此分区的所有的读写请求而replica被动的复制数据。如果leader当机,其它的一个replica会被推举为新的leader。
一台服务器可能同时是一个分区的leader,另一个分区的replica。 这样可以平衡负载,避免所有的请求都只让一台或者某几台服务器处理。
关于复制原理,参考下面官档翻译:
Kafka 的集群复制设计
Kafka的集群部署
Kafka中主要有三种模式,
单机broker模式
单机多broker模式(伪分布式)
多机多broker模式(集群)
和hadoop一样,前两种多用于开发测试。第三种才是实际生产中可用的部署模式,下面介绍一下三节点kafka集群的部署流程
软件的安装直接解压缩即可:
tar xzvf kafka_2.10-0.8.1.1.tgz mkdir /var/kafka && mkdir /var/zookeeper
关键参数的解释,可以参考http://debugo.com/kafka-params/
vim kafka_2.10-0.8.1.1/config/server.properties
#在默认的配置上,我只修改了3个地方。三个主机debugo01,debugo02,debugo03分别对应id为1,2,3 broker.id=3 log.dirs=/var/kafka zookeeper.connect=debugo01:2181,debugo02:2181,debugo03:2181 配置zookeeper,修改DataDir并加入集群参数 vim kafka_2.10-0.8.1.1/config/zookeeper.properties initLimit=5 syncLimit=2 server.1=debugo01:2888:3888 server.2=debugo02:2888:3888 server.3=debugo03:2888:3888 dataDir=/var/zookeeper #分别将1,2,3写入三个主机的myid文件 echo "1" >> /var/zookeeper/myid
在debugo01,debugo02,debugo03上分别启动zookeeper和kafka Server
bin/zookeeper-server-start.sh config/zookeeper.properties # 启动kafka Server bin/kafka-server-start.sh config/server.properties
这时可以在log中找到,新的broker已经将数据注册到znode中。
#####debugo01##### [2014-12-07 20:54:20,506] INFO Awaiting socket connections on debugo01:9092. (kafka.network.Acceptor) [2014-12-07 20:54:20,521] INFO [Socket Server on Broker 1], Started (kafka.network.SocketServer) [2014-12-07 20:54:20,649] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$) [2014-12-07 20:54:20,725] INFO 1 successfully elected as leader (kafka.server.ZookeeperLeaderElector) [2014-12-07 20:54:20,876] INFO Registered broker 1 at path /brokers/ids/1 with address debugo01:9092. (kafka.utils.ZkUtils$) [2014-12-07 20:54:20,907] INFO [Kafka Server 1], started (kafka.server.KafkaServer) [2014-12-07 20:54:20,993] INFO New leader is 1 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener) #####debugo02##### [2014-12-07 20:54:35,896] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor) [2014-12-07 20:54:35,913] INFO [Socket Server on Broker 2], Started (kafka.network.SocketServer) [2014-12-07 20:54:36,073] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$) [2014-12-07 20:54:36,179] INFO conflict in /controller data: {"version":1,"brokerid":2,"timestamp":"1417956876081"} stored data: {"version":1,"brokerid":1,"timestamp":"1417956860689"} (kafka.utils.ZkUtils$) [2014-12-07 20:54:36,398] INFO Registered broker 2 at path /brokers/ids/2 with address debugo02:9092. (kafka.utils.ZkUtils$) [2014-12-07 20:54:36,420] INFO [Kafka Server 2], started (kafka.server.KafkaServer) #####debugo03##### [2014-12-07 20:54:43,535] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor) [2014-12-07 20:54:43,549] INFO [Socket Server on Broker 3], Started (kafka.network.SocketServer) [2014-12-07 20:54:43,728] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$) [2014-12-07 20:54:43,783] INFO conflict in /controller data: {"version":1,"brokerid":3,"timestamp":"1417956883737"} stored data: {"version":1,"brokerid":1,"timestamp":"1417956860689"} (kafka.utils.ZkUtils$) [2014-12-07 20:54:43,999] INFO Registered broker 3 at path /brokers/ids/3 with address debugo03:9092. (kafka.utils.ZkUtils$) [2014-12-07 20:54:44,018] INFO [Kafka Server 3], started (kafka.server.KafkaServer)
Topic的分区和复制
1. 创建debugo01,这个topic分区数为3,复制为1(不复制)。该topic跨越全部broker。下面管理命令在任意kafka节点上执行即可
bin/kafka-topics.sh --create --zookeeper debugo01,debugo02,debugo03 --replication-factor 1 --partitions 3 --topic debugo01 Created topic "debugo01".
2. 创建debugo02,这个topic分区数为1,复制为3(每个主机都有一份)。该topic跨越全部broker。下面管理命令在任意kafka节点上执行即可
bin/kafka-topics.sh --create --zookeeper debugo01,debugo02,debugo03 --replication-factor 3 --partitions 1 --topic debugo02
3. 列出topic信息
[root@debugo01 kafka_2.10-0.8.1.1]# bin/kafka-topics.sh --list --zookeeper localhost:2181 debugo01 debugo02
4. 列出topic描述信息
[root@debugo01 kafka_2.10-0.8.1.1]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic debugo01 Topic:debugo01 PartitionCount:3 ReplicationFactor:1 Configs: Topic: debugo01 Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Topic: debugo01 Partition: 1 Leader: 2 Replicas: 2 Isr: 2 Topic: debugo01 Partition: 2 Leader: 3 Replicas: 3 Isr: 3
5. 检查log目录,对于topic debugo01,debugo01为0号分区,debugo02为1号分区。而topic debugo02则复制了3份,都为0号分区
[root@debugo01 kafka]# ll total 24 drwxr-xr-x 2 root root 4096 Dec 7 21:15 debugo01-0 drwxr-xr-x 2 root root 4096 Dec 7 21:16 debugo02-0 [root@debugo02 kafka]# ll total 24 drwxr-xr-x 2 root root 4096 Dec 7 21:15 debugo01-1 drwxr-xr-x 2 root root 4096 Dec 7 21:16 debugo02-0 #而每个分区下面都生成了index和log文件 [root@debugo01 debugo01-0]# ls 00000000000000000000.index 00000000000000000000.log
6. 下面topic debugo03,replication-factor为2,partition为3.那么broker id为1的debugo01会如下面describe所示,保存0号分区和1号分区。
而0号分区的repica leader为broker id = 3,包含3和1两个replicas。
bin/kafka-topics.sh --create --zookeeper debugo01,debugo02,debugo03 --replication-factor 2 --partitions 3 --topic debugo03 Created topic "debugo03". bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic debugo03 [root@debugo01 kafka_2.10-0.8.1.1]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic debugo03 Topic:debugo03 PartitionCount:3 ReplicationFactor:2 Configs: Topic: debugo03 Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1 Topic: debugo03 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: debugo03 Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3 [root@debugo01 kafka_2.10-0.8.1.1]# ll /var/kafka/debugo03* /var/kafka/debugo03-0: total 0 -rw-r--r-- 1 root root 10485760 Dec 7 21:34 00000000000000000000.index -rw-r--r-- 1 root root 0 Dec 7 21:34 00000000000000000000.log /var/kafka/debugo03-1: total 0 -rw-r--r-- 1 root root 10485760 Dec 7 21:34 00000000000000000000.index -rw-r--r-- 1 root root 0 Dec 7 21:34 00000000000000000000.log
消息的产生和消费
两个终端分别打开producer和consumer进行测试
<terminal> bin/kafka-console-producer.sh --broker-list debugo01:9092 --topic debugo03 hello kafka hello debugo <terminal> bin/kafka-console-consumer.sh --zookeeper debugo01:2181 --from-beginning --topic debugo03 hello kafka hello debugo</terminal></terminal>
下面使用perf命令来测试几个topic的性能,需要先下载kafka-perf_2.10-0.8.1.1.jar,并拷贝到kafka/libs下面。
50W条消息,每条1000字节,batch大小1000,topic为debugo01,4个线程(message size设置太大需要调整相关参数,否则容易OOM)。只用了13秒完成,kafka在多分区支持下吞吐量是非常给力的。
bin/kafka-producer-perf-test.sh --messages 500000 --message-size 1000 --batch-size 1000 --topics debugo01 --threads 4 --broker-list debugo01:9092,debugo02:9092,debugo03:9092 start.time, end.time, compression, message.size, batch.size, total.data.sent.in.MB, MB.sec, total.data.sent.in.nMsg, nMsg.sec 2014-12-07 22:07:56:038, 2014-12-07 22:08:09:413, 0, 1000, 1000, 476.84, 35.6514, 500000, 37383.1776
同样的参数测试debugo02, 由于但分区加复制(replicas-factor=3),用时39秒。所以,适当加大partition数量和broker相关线程数量会极大的提高性能。
bin/kafka-producer-perf-test.sh --messages 500000 --message-size 1000 --batch-size 1000 --topics debugo02 --threads 4 --broker-list debugo01:9092,debugo02:9092,debugo03:9092 start.time, end.time, compression, message.size, batch.size, total.data.sent.in.MB, MB.sec, total.data.sent.in.nMsg, nMsg.sec 2014-12-07 22:13:28:840, 2014-12-07 22:14:07:819, 0, 1000, 1000, 476.84, 12.2332, 500000, 12827.4199
同样的参数测试debugo03,用时30秒。
bin/kafka-producer-perf-test.sh --messages 500000 --message-size 1000 --batch-size 1000 --topics debugo03 --threads 4 --broker-list debugo01:9092,debugo02:9092,debugo03:9092 start.time, end.time, compression, message.size, batch.size, total.data.sent.in.MB, MB.sec, total.data.sent.in.nMsg, nMsg.sec 2014-12-07 22:16:04:895, 2014-12-07 22:16:34:715, 0, 1000, 1000, 476.84, 15.9905, 500000, 16767.2703
同理,测试comsumer的性能。
bin/kafka-consumer-perf-test.sh --zookeeper debugo01,debugo02,debugo03 --messages 500000 --topic debugo01 --threads 3 start.time, end.time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec 2014-12-07 22:19:04:527, 2014-12-07 22:19:17:184, 1048576, 476.8372, 62.2747, 500000, 65299.7257 bin/kafka-consumer-perf-test.sh --zookeeper debugo01,debugo02,debugo03 --messages 500000 --topic debugo02 --threads 3 start.time, end.time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec [2014-12-07 22:19:59,938] WARN [perf-consumer-78853_debugo01-1417961999315-4a5941ef], No broker partitions consumed by consumer thread perf-consumer-78853_debugo01-1417961999315-4a5941ef-1 for topic debugo02 (kafka.consumer.ZookeeperConsumerConnector) [2014-12-07 22:19:59,938] WARN [perf-consumer-78853_debugo01-1417961999315-4a5941ef], No broker partitions consumed by consumer thread perf-consumer-78853_debugo01-1417961999315-4a5941ef-2 for topic debugo02 (kafka.consumer.ZookeeperConsumerConnector) 2014-12-07 22:20:01:008, 2014-12-07 22:20:08:971, 1048576, 476.8372, 160.9305, 500000, 168747.8907 bin/kafka-consumer-perf-test.sh --zookeeper debugo01,debugo02,debugo03 --messages 500000 --topic debugo03 --threads 3 start.time, end.time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec ?2014-12-07 22:21:27:421, 2014-12-07 22:21:39:918, 1048576, 476.8372, 63.6037, 500002, 66693.6108
^^
参考
http://blog.csdn.net/smallnest/article/details/38491483
http://www.350351.com/jiagoucunchu/xiaoxixitong/46720.html
http://kafka.apache.org/documentation.html
http://backend.blog.163.com/blog/static/202294126201431723734212/
http://www.inter12.org/archives/842
原文地址:Kafka原理和集群测试, 感谢原作者分享。

热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

Video Face Swap
使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热门文章

热工具

记事本++7.3.1
好用且免费的代码编辑器

SublimeText3汉化版
中文版,非常好用

禅工作室 13.0.1
功能强大的PHP集成开发环境

Dreamweaver CS6
视觉化网页开发工具

SublimeText3 Mac版
神级代码编辑软件(SublimeText3)

通用矩阵乘法(GeneralMatrixMultiplication,GEMM)是许多应用程序和算法中至关重要的一部分,也是评估计算机硬件性能的重要指标之一。通过深入研究和优化GEMM的实现,可以帮助我们更好地理解高性能计算以及软硬件系统之间的关系。在计算机科学中,对GEMM进行有效的优化可以提高计算速度并节省资源,这对于提高计算机系统的整体性能至关重要。深入了解GEMM的工作原理和优化方法,有助于我们更好地利用现代计算硬件的潜力,并为各种复杂计算任务提供更高效的解决方案。通过对GEMM性能的优

消息已发出但被对方拒收是所发送的信息已经成功地从设备发送出去,但由于某种原因,对方并没有接收到这条信息。更具体地说,这通常是因为对方已经设置了某些权限或采取了某些操作,导致你的信息无法被正常接收。

7月29日,在AITO问界第四十万台新车下线仪式上,华为常务董事、终端BG董事长、智能汽车解决方案BU董事长余承东出席发表演讲并宣布,问界系列车型将于今年8月迎来华为干昆ADS3.0版本的上市,并计划在8月至9月间陆续推送升级。 8月6日即将发布的享界S9将首发华为ADS3.0智能驾驶系统。华为干昆ADS3.0版本在激光雷达的辅助下,将大幅提升智驾能力,具备融合端到端的能力,并采用GOD(通用障碍物识别)/PDP(预测决策规控)全新端到端架构,提供车位到车位智驾领航NCA功能,并升级CAS3.0全

furmark怎么看?1、在主界面中设置“运行模式”和“显示模式”,还能调整“测试模式”,点击“开始”按钮。2、等待片刻后,就会看到测试结果,包含了显卡各种参数。furmark怎么算合格?1、用furmark烤机,半个小时左右看一下结果,基本上在85度左右徘徊,峰值87度,室温19度。大号机箱,5个机箱风扇口,前置两个,上置两个,后置一个,不过只装了一个风扇。所有配件都没有超频。2、一般情况下,显卡的正常温度应该在“30-85℃”之间。3、就算是大夏天周围环境温度过高,正常温度也是“50-85℃

nohup的作用及原理解析在Unix和类Unix操作系统中,nohup是一个常用的命令,用于在后台运行命令,即便用户退出当前会话或关闭终端窗口,命令仍然能够继续执行。在本文中,我们将详细解析nohup命令的作用和原理。一、nohup的作用后台运行命令:通过nohup命令,我们可以让需要长时间运行的命令在后台持续执行,而不受用户退出终端会话的影响。这在需要运行

4月11日,华为官方首次宣布HarmonyOS4.2百机升级计划,此次共有180余款设备参与升级,品类覆盖手机、平板、手表、耳机、智慧屏等设备。过去一个月,随着HarmonyOS4.2百机升级计划的稳步推进,包括华为Pocket2、华为MateX5系列、nova12系列、华为Pura系列等多款热门机型也已纷纷展开升级适配,这意味着会有更多华为机型用户享受到HarmonyOS带来的常用常新体验。从用户反馈来看,华为Mate60系列机型在升级HarmonyOS4.2之后,体验全方位跃升。尤其是华为M

苹果16系统中版本最好的是iOS16.1.4,iOS16系统的最佳版本可能因人而异添加和日常使用体验的提升也受到了很多用户的好评。苹果16系统哪个版本最好答:iOS16.1.4iOS16系统的最佳版本可能因人而异。根据公开的消息,2022年推出的iOS16被认为是一个非常稳定且性能优越的版本,用户对其整体体验也相当满意。此外,iOS16中新功能的添加和日常使用体验的提升也受到了很多用户的好评。特别是在更新后的电池续航能力、信号表现和发热控制方面,用户的反馈都比较积极。然而,考虑到iPhone14

新派幻想仙侠MMORPG《诛仙2》“无为测试”即将于4月23日开启,在原著千年后的诛仙大陆,会发生怎样的全新仙侠冒险故事?六境仙侠大世界,全日制修仙学府,自由自在的修仙生活,仙界中的万般妙趣都在等待着仙友们亲自前往探索!“无为测试”预下载现已开启,仙友们可前往官网下载,开服前无法登录游戏服务器,激活码可在预下载安装完成后使用。《诛仙2》“无为测试”开放时间:4月23日10:00——5月6日23:59诛仙正统续作全新仙侠冒险篇章《诛仙2》以《诛仙》小说为蓝图,在继承原著世界观的基础上,将游戏背景设
