在Centos 上安装Kafka集群
在Centos 上安装Kafka集群
安装准备:版本
Kafka版本:kafka_2.11-0.9.0.0
Zookeeper版本:zookeeper-3.4.7
Zookeeper 集群:bjrenrui0001 bjrenrui0002 bjrenrui0003
Zookeeper集群的搭建参见:在CentOS上安装ZooKeeper集群
物理环境
安装三台物理机:
192.168.100.200 bjrenrui0001(运行3个Broker)
192.168.100.201 bjrenrui0002(运行2个Broker)
192.168.100.202 bjrenrui0003(运行2个Broker)
该集群的创建主要分为三步,单节点单Broker,单节点多Broker,多节点多Broker
单节点单Broker
本节以bjrenrui0001上创建一个Broker为例
下载kafka:
下载路径:http://kafka.apache.org/downloads.html
cd /mq/
wget http://mirrors.hust.edu.cn/apache/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz
copyfiles.sh kafka_2.11-0.9.0.0.tgz bjyfnbserver /mq/
tar zxvf kafka_2.11-0.9.0.0.tgz -C /mq/
ln -s /mq/kafka_2.11-0.9.0.0 /mq/kafka
mkdir /mq/kafka/logs
配置
修改config/server.properties
vi /mq/kafka/config/server.properties
broker.id=1
listeners=PLAINTEXT://:9092
port=9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/mq/kafka/logs/kafka-logs
num.partitions=10
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=bjrenrui0001:2181,bjrenrui0002:2181,bjrenrui0003:2181GG
zookeeper.connection.timeout.ms=6000
启动Kafka服务:
cd /mq/kafka;sh bin/kafka-server-start.sh -daemon config/server.properties
或
sh /mq/kafka/bin/kafka-server-start.sh -daemon /mq/kafka/config/server.properties
netstat -ntlp|grep -E '2181|9092'
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
tcp6 0 0 :::9092 :::* LISTEN 26903/java
tcp6 0 0 :::2181 :::* LISTEN 24532/java
创建Topic:
sh /mq/kafka/bin/kafka-topics.sh --create --zookeeper bjrenrui0001:2181,bjrenrui0002:2181,bjrenrui0003:2181 --replication-factor 1 --partitions 1 --topic test
查看Topic:
sh /mq/kafka/bin/kafka-topics.sh --list --zookeeper bjrenrui0001:2181,bjrenrui0002:2181,bjrenrui0003:2181
producer发送消息:
$ sh /mq/kafka/bin/kafka-console-producer.sh --broker-list bjrenrui0001:9092 --topic test
first
message
consumer接收消息:
$ sh bin/kafka-console-consumer.sh --zookeeper bjrenrui0001:2181,bjrenrui0002:2181,bjrenrui0003:2181 --topic test --from-beginning
first
message
如果要最新的数据,可以不带--from-beginning参数即可。
单节点多个Broker
将上个章节中的文件夹再复制两份分别为kafka_2,kafka_3
cp -r /mq/kafka_2.11-0.9.0.0 /mq/kafka_2.11-0.9.0.0_2
cp -r /mq/kafka_2.11-0.9.0.0 /mq/kafka_2.11-0.9.0.0_3
ln -s /mq/kafka_2.11-0.9.0.0_2 /mq/kafka_2
ln -s /mq/kafka_2.11-0.9.0.0_3 /mq/kafka_3
分别修改kafka_2/config/server.properties以及kafka_3/config/server.properties 文件中的broker.id,以及port属性,确保唯一性
vi /mq/kafka_2/config/server.properties
broker.id=2
listeners=PLAINTEXT://:9093
port=9093
host.name=bjrenrui0001
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/mq/kafka_2/logs/kafka-logs
num.partitions=10
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=bjrenrui0001:2181,bjrenrui0002:2181,bjrenrui0003:2181
zookeeper.connection.timeout.ms=6000
vi /mq/kafka_3/config/server.properties
broker.id=3
listeners=PLAINTEXT://:9094
port=9094
host.name=bjrenrui0001
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/mq/kafka_3/logs/kafka-logs
num.partitions=10
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=bjrenrui0001:2181,bjrenrui0002:2181,bjrenrui0003:2181
zookeeper.connection.timeout.ms=6000
启动
启动另外两个Broker:
sh /mq/kafka_2/bin/kafka-server-start.sh -daemon /mq/kafka_2/config/server.properties
sh /mq/kafka_3/bin/kafka-server-start.sh -daemon /mq/kafka_3/config/server.properties
检查端口:
[dreamjobs@bjrenrui0001 config]$ netstat -ntlp|grep -E '2181|909[2-9]'|sort -k3
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
tcp6 0 0 :::2181 :::* LISTEN 24532/java
tcp6 0 0 :::9092 :::* LISTEN 26903/java
tcp6 0 0 :::9093 :::* LISTEN 28672/java
tcp6 0 0 :::9094 :::* LISTEN 28734/java
创建一个replication factor为3的topic:
sh /mq/kafka/bin/kafka-topics.sh --create --zookeeper bjrenrui0001:2181,bjrenrui0002:2181,bjrenrui0003:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
查看Topic的状态:
$ sh /mq/kafka/bin/kafka-topics.sh --describe -zookeeper bjrenrui0001:2181,bjrenrui0002:2181,bjrenrui0003:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
从上面的内容可以看出,该topic包含1个part,replicationfactor为3,且Node3 是leador
解释如下:
"leader" is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.
"replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
"isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.
再来看一下之前创建的test topic, 从下图可以看出没有进行replication
$ sh /mq/kafka/bin/kafka-topics.sh --describe --zookeeper bjrenrui0001:2181,bjrenrui0002:2181,bjrenrui0003:2181 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 1 Replicas: 1 Isr: 1
多个节点的多个Broker
在bjrenrui0002、bjrenrui0003上分别把下载的文件解压缩到kafka_4,kafka_5,kafka_6两个文件夹中,再将bjrenrui0001上的server.properties配置文件拷贝到这三个文件夹中
vi /mq/kafka_4/config/server.properties
broker.id=4
listeners=PLAINTEXT://:9095
port=9095
host.name=bjrenrui0002
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/mq/kafka_4/logs/kafka-logs
num.partitions=10
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=bjrenrui0001:2181,bjrenrui0002:2181,bjrenrui0003:2181
zookeeper.connection.timeout.ms=6000
vi /mq/kafka_5/config/server.properties
broker.id=5
listeners=PLAINTEXT://:9096
port=9096
host.name=bjrenrui0002
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/mq/kafka_5/logs/kafka-logs
num.partitions=10
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=bjrenrui0001:2181,bjrenrui0002:2181,bjrenrui0003:2181
zookeeper.connection.timeout.ms=6000
vi /mq/kafka_6/config/server.properties
broker.id=6
listeners=PLAINTEXT://:9097
port=9097
host.name=bjrenrui0003
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/mq/kafka_6/logs/kafka-logs
num.partitions=10
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=bjrenrui0001:2181,bjrenrui0002:2181,bjrenrui0003:2181
zookeeper.connection.timeout.ms=6000
vi /mq/kafka_7/config/server.properties
broker.id=7
listeners=PLAINTEXT://:9098
port=9098
host.name=bjrenrui0003
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/mq/kafka_7/logs/kafka-logs
num.partitions=10
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=bjrenrui0001:2181,bjrenrui0002:2181,bjrenrui0003:2181
zookeeper.connection.timeout.ms=6000
启动服务
sh /mq/kafka/bin/kafka-server-start.sh -daemon /mq/kafka/config/server.properties
sh /mq/kafka_2/bin/kafka-server-start.sh -daemon /mq/kafka_2/config/server.properties
sh /mq/kafka_3/bin/kafka-server-start.sh -daemon /mq/kafka_3/config/server.properties
sh /mq/kafka_4/bin/kafka-server-start.sh -daemon /mq/kafka_4/config/server.properties
sh /mq/kafka_5/bin/kafka-server-start.sh -daemon /mq/kafka_5/config/server.properties
sh /mq/kafka_6/bin/kafka-server-start.sh -daemon /mq/kafka_6/config/server.properties
sh /mq/kafka_7/bin/kafka-server-start.sh -daemon /mq/kafka_7/config/server.properties
检查:
$ netstat -ntlp|grep -E '2181|909[2-9]'|sort -k3
停服务:
sh /mq/kafka/bin/kafka-server-stop.sh
如果使用脚本停broker服务,会把单节点上的多broker服务都停掉,慎重!!!
ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}' | xargs kill -SIGTERM
到目前为止,三台物理机上的7个Broker已经启动完毕:
[dreamjobs@bjrenrui0001 bin]$ netstat -ntlp|grep -E '2181|909[2-9]'|sort -k3
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
tcp6 0 0 :::2181 :::* LISTEN 24532/java
tcp6 0 0 :::9092 :::* LISTEN 33212/java
tcp6 0 0 :::9093 :::* LISTEN 32997/java
tcp6 0 0 :::9094 :::* LISTEN 33064/java
[dreamjobs@bjrenrui0002 config]$ netstat -ntlp|grep -E '2181|909[2-9]'|sort -k3
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
tcp6 0 0 :::2181 :::* LISTEN 6899/java
tcp6 0 0 :::9095 :::* LISTEN 33251/java
tcp6 0 0 :::9096 :::* LISTEN 33279/java
[dreamjobs@bjrenrui0003 config]$ netstat -ntlp|grep -E '2181|909[2-9]'|sort -k3
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
tcp 0 0 0.0.0.0:2181 0.0.0.0:* LISTEN 14562/java
tcp 0 0 0.0.0.0:9097 0.0.0.0:* LISTEN 23246/java
tcp 0 0 0.0.0.0:9098 0.0.0.0:* LISTEN 23270/java
producer发送消息:
$ sh /mq/kafka/bin/kafka-console-producer.sh --broker-list bjrenrui0001:9092 --topic my-replicated-topic
consumer接收消息:
$ sh /mq/kafka_4/bin/kafka-console-consumer.sh --zookeeper bjrenrui0001:2181,bjrenrui0002:2181,bjrenrui0003:2181 --topic my-replicated-topic --from-beginning

熱AI工具

Undresser.AI Undress
人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover
用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool
免費脫衣圖片

Clothoff.io
AI脫衣器

Video Face Swap
使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱門文章

熱工具

記事本++7.3.1
好用且免費的程式碼編輯器

SublimeText3漢化版
中文版,非常好用

禪工作室 13.0.1
強大的PHP整合開發環境

Dreamweaver CS6
視覺化網頁開發工具

SublimeText3 Mac版
神級程式碼編輯軟體(SublimeText3)

最近幾天,Ice Universe 不斷披露有關 Galaxy S25 Ultra 的詳細信息,人們普遍認為這款手機將是三星的下一款旗艦智慧型手機。除此之外,洩密者聲稱三星只計劃升級一台相機

OnLeaks 現在與 Android Headlines 合作,首次展示了 Galaxy S25 Ultra,幾天前,他試圖從他的 X(以前的 Twitter)粉絲那裡籌集到 4,000 美元以上的資金,但失敗了。對於上下文,嵌入在 h 下面的渲染圖像

除了發布兩款新智慧型手機外,TCL 還發布了一款名為 NXTPAPER 14 的新 Android 平板電腦,其大螢幕尺寸是其賣點之一。 NXTPAPER 14 採用 TCL 標誌性品牌霧面液晶面板 3.0 版本

Vivo Y300 Pro剛剛全面亮相,它是最薄的中階Android手機之一,配備大電池。準確來說,這款智慧型手機厚度僅為 7.69 毫米,但配備 6,500 mAh 電池。這與最近推出的容量相同

三星尚未就何時更新其 Fan Edition (FE) 智慧型手機系列提供任何提示。目前來看,Galaxy S23 FE 仍然是該公司的最新版本,於 2023 年 10 月年初推出。

最近幾天,Ice Universe 不斷披露有關 Galaxy S25 Ultra 的詳細信息,人們普遍認為這款手機將是三星的下一款旗艦智慧型手機。除此之外,洩密者聲稱三星只計劃升級一台相機

Redmi Note 14 Pro Plus 現已正式成為去年 Redmi Note 13 Pro Plus 的直接後繼產品(亞馬遜售價 375 美元)。正如預期的那樣,Redmi Note 14 Pro Plus與Redmi Note 14和Redmi Note 14 Pro一起成為Redmi Note 14系列的主角。李

摩托羅拉今年發布了無數設備,儘管其中只有兩款是可折疊的。就上下文而言,雖然世界上大多數地區都收到了 Razr 50 和 Razr 50 Ultra,但摩托羅拉在北美提供了 Razr 2024 和 Razr 2
