./bin/kafka-topics.sh --create --topic test --partitions 1 --replication-factor 1
./bin/kafka-console-consumer.sh --topic test --from-beginning
./bin/kafka-console-producer.sh --topic test
./bin/kafka-console-consumer.sh --topic test --from-beginning
在Linux中快速安装Kafka并进行入门:详细步骤指南
Linux环境下安装Kafka的详细步骤
1. 前提条件
- 操作系统:Linux(推荐使用Ubuntu或CentOS)
- Java:JDK 8或更高版本
- ZooKeeper:版本3.4或更高版本
- Kafka:最新稳定版本
2. 安装Java
sudo apt-get update sudo apt-get install default-jdk
3. 安装ZooKeeper
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz tar -xvf zookeeper-3.4.14.tar.gz cd zookeeper-3.4.14 ./configure make sudo make install
4. 配置ZooKeeper
sudo mkdir /var/lib/zookeeper sudo chown zookeeper:zookeeper /var/lib/zookeeper
编辑/etc/zookeeper/conf/zoo.cfg
文件,并添加以下内容:/etc/zookeeper/conf/zoo.cfg
文件,并添加以下内容:
dataDir=/var/lib/zookeeper clientPort=2181
启动ZooKeeper:
sudo service zookeeper start
5. 安装Kafka
wget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.13-2.8.0.tgz tar -xvf kafka_2.13-2.8.0.tgz cd kafka_2.13-2.8.0
6. 配置Kafka
编辑/etc/kafka/server.properties
broker.id=0 listeners=PLAINTEXT://:9092 zookeeper.connect=localhost:2181
./bin/kafka-server-start.sh config/server.properties
5. 安装Kafka./bin/kafka-topics.sh --create --topic test --partitions 1 --replication-factor 1
登录后复制
./bin/kafka-topics.sh --create --topic test --partitions 1 --replication-factor 1
6. 配置Kafka
编辑/etc/kafka/server.properties
文件,并添加以下内容:./bin/kafka-console-producer.sh --topic test
登录后复制启动Kafka:./bin/kafka-console-consumer.sh --topic test --from-beginning
登录后复制
./bin/kafka-console-producer.sh --topic test
./bin/kafka-console-consumer.sh --topic test --from-beginning
7. 创建主题import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class SimpleProducer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> record = new ProducerRecord<>("test", "Hello, Kafka!");
producer.send(record);
producer.close();
}
}
登录后复制
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class SimpleProducer { public static void main(String[] args) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); ProducerRecord<String, String> record = new ProducerRecord<>("test", "Hello, Kafka!"); producer.send(record); producer.close(); } }
8. 生产数据import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class SimpleConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList("test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.key() + ": " + record.value());
}
}
consumer.close();
}
}
登录后复制9. 消费数据rrreee
🎜快速入门指南🎜🎜🎜🎜1. 创建一个Producer🎜🎜rrreee🎜🎜2. 创建一个Consumer🎜🎜rrreee
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class SimpleConsumer { public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); consumer.subscribe(Arrays.asList("test")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println(record.key() + ": " + record.value()); } } consumer.close(); } }
以上是在Linux中快速安装Kafka并进行入门:详细步骤指南的详细内容。更多信息请关注PHP中文网其他相关文章!

热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

AI Hentai Generator
免费生成ai无尽的。

热门文章

热工具

记事本++7.3.1
好用且免费的代码编辑器

SublimeText3汉化版
中文版,非常好用

禅工作室 13.0.1
功能强大的PHP集成开发环境

Dreamweaver CS6
视觉化网页开发工具

SublimeText3 Mac版
神级代码编辑软件(SublimeText3)

热门话题

Win11系统无法安装中文语言包的解决方法随着Windows11系统的推出,许多用户开始升级他们的操作系统以体验新的功能和界面。然而,一些用户在升级后发现他们无法安装中文语言包,这给他们的使用体验带来了困扰。在本文中,我们将探讨Win11系统无法安装中文语言包的原因,并提供一些解决方法,帮助用户解决这一问题。原因分析首先,让我们来分析一下Win11系统无法

您可能无法在OracleVirtualBox中将来宾添加安装到虚拟机。当我们点击Devices>;InstallGuestAdditionsCDImage时,它只会抛出一个错误,如下所示:VirtualBox-错误:无法插入虚拟光盘C:将FilesOracleVirtualBoxVBoxGuestAdditions.iso编程到ubuntu机器中在这篇文章中,我们将了解当您无法在VirtualBox中安装来宾添加组件时该怎么办。无法在VirtualBox中安装来宾添加如果您无法在Virtua

如果你已经成功下载了百度网盘的安装文件,但是无法正常安装,可能是软件文件的完整性发生了错误或者是残留文件和注册表项的问题,下面就让本站来为用户们来仔细的介绍一下百度网盘下载成功但是安装不了问题解析吧。 百度网盘下载成功但是安装不了问题解析 1、检查安装文件完整性:确保下载的安装文件完整且没有损坏。你可以重新下载一次,或者尝试使用其他可信的来源下载安装文件。 2、关闭杀毒软件和防火墙:某些杀毒软件或防火墙程序可能会阻止安装程序的正常运行。尝试将杀毒软件和防火墙禁用或退出,然后重新运行安装

在Linux上安装安卓应用一直是许多用户所关心的问题,尤其是对于喜欢使用安卓应用的Linux用户来说,掌握如何在Linux系统上安装安卓应用是非常重要的。虽然在Linux系统上直接运行安卓应用并不像在Android平台上那么简单,但是通过使用模拟器或者第三方工具,我们依然可以在Linux上愉快地享受安卓应用的乐趣。下面将为大家介绍在Linux系统上安装安卓应

了解LinuxBashrc:功能、配置与使用方法在Linux系统中,Bashrc(BourneAgainShellruncommands)是一个非常重要的配置文件,其中包含了系统启动时自动运行的各种命令和设置。Bashrc文件通常位于用户的家目录下,是一个隐藏文件,它的作用是为用户自定义设置Bashshell的环境。一、Bashrc的功能设置环境

如果您使用过Docker,则必须了解守护进程、容器及其功能。守护进程是在容器已在任何系统中使用时在后台运行的服务。Podman是一个免费的管理工具,用于管理和创建容器,而不依赖于任何守护程序,如Docker。因此,它在管理集装箱方面具有优势,而不需要长期的后台服务。此外,Podman不需要使用根级别的权限。本指南详细讨论了如何在Ubuntu24上安装Podman。更新系统我们首先要进行系统更新,打开Ubuntu24的Terminalshell。在安装和升级过程中,我们都需要使用命令行。一种简单的

在高中学习的时候,有些学生做的笔记非常清晰准确,比同一个班级的其他人都做得更多。对于一些人来说,记笔记是一种爱好,而对于其他人来说,当他们很容易忘记任何重要事情的小信息时,则是一种必需品。Microsoft的NTFS应用程序对于那些希望保存除常规讲座以外的重要笔记的学生特别有用。在这篇文章中,我们将描述Ubuntu24上的Ubuntu应用程序的安装。更新Ubuntu系统在安装Ubuntu安装程序之前,在Ubuntu24上我们需要确保新配置的系统已经更新。我们可以使用Ubuntu系统中最著名的“a

Win7电脑上安装Go语言的详细步骤Go(又称Golang)是一种由Google开发的开源编程语言,其简洁、高效和并发性能优秀,适合用于开发云端服务、网络应用和后端系统等领域。在Win7电脑上安装Go语言,可以让您快速入门这门语言并开始编写Go程序。下面将会详细介绍在Win7电脑上安装Go语言的步骤,并附上具体的代码示例。步骤一:下载Go语言安装包访问Go官
