【kafka】kafka命令大全

2023-09-15 10:25:50

概述

本文将分享一些kafka经常使用的一些命令,不断完善中。

管理

创建主题,3个分区,2个副本

对使用了zookeeper的kafka
kafka-topics.sh --create --zookeeper 192.168.2.140:2181,192.168.2.141:2181,192.168.2.142:2181 --replication-factor 2 --partitions 3 --topic test2
Created topic test2.

对kafka版本 >= 2.2,可以将--zookeeper替换成--bootstrap-server参数即可,如下:后续的其他命令类似。
kafka-topics.sh --create --bootstrap-server 192.168.2.140:9092,192.168.2.141:9092,192.168.2.142:9092 --replication-factor 2 --partitions 3 --topic test2

分区扩容(注意只能扩,不能缩,我测试时至少是这样的)
kafka版本 < 2.2
kafka-topics.sh --zookeeper 192.168.2.140:2181,192.168.2.141:2181,192.168.2.142:2181 --alter --topic test2 --partitions 4

kafka版本 >= 2.2
kafka-topics.sh --bootstrap-server 192.168.2.140:9092,192.168.2.141:9092,192.168.2.142:9092 --alter --topic test2 --partitions 3

删除某一个topic
kafka-topics.sh --delete  --bootstrap-server 192.168.2.140:9092,192.168.2.141:9092,192.168.2.142:9092 --topic test2

注意,在实际生产使用过程中,–zookeeper或者–bootstrap-server后面的参数值最好写完整,避免因某个节点异常而不能正常使用,后面同是,但是使用bootstrap-server时,成功执行时不会返回结果。其次建议使用–bootstrap-server,–zookeeper后续会成过去式

查询

查询集群描述,结果中包括所有的topic分区主从情况
kafka-topics.sh --describe --zookeeper  192.168.2.140:2181,192.168.2.141:2181,192.168.2.142:2181

topic列表查询,结果中只有topic的名称
kafka-topics.sh --list --zookeeper  192.168.2.140:2181,192.168.2.141:2181,192.168.2.142:2181

topic列表查询(支持0.9版本+)
kafka-topics.sh --list  --bootstrap-server 192.168.2.140:9092,192.168.2.141:9092,192.168.2.142:9092

消费者列表查询(存储在zk中的)
kafka-consumer-groups.sh --zookeeper 192.168.2.140:2181,192.168.2.141:2181,192.168.2.142:2181 --list

消费者列表查询(支持0.9版本+)
kafka-consumer-groups.sh --new-consumer --bootstrap-server 192.168.2.140:9092,192.168.2.141:9092,192.168.2.142:9092 --list

新消费者列表查询(支持0.10版本+)
kafka-consumer-groups.sh    --bootstrap-server 192.168.2.140:9092,192.168.2.141:9092,192.168.2.142:9092  --list
connect-local-file-sink

显示某个消费组的消费详情(0.9版本 - 0.10.1.0 之前)
kafka-consumer-groups.sh --new-consumer --bootstrap-server 192.168.2.140:9092,192.168.2.141:9092,192.168.2.142:9092 --describe --group group1
显示某个消费组的消费详情(0.10.1.0版本+)
kafka-consumer-groups.sh --bootstrap-server 192.168.2.140:9092,192.168.2.141:9092,192.168.2.142:9092 --describe --group group1

kafka-consumer-groups.sh --bootstrap-server 192.168.2.140:9092,192.168.2.141:9092,192.168.2.142:9092  --delete --group test-consumer-group

修改删除topic

kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name transaction --alter --add-config max.message.bytes=10485760
#可以执行以下命令验证结果
kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name transaction --describe
Copy
#移除:
kafka-configs.sh --zookeeper localhost:2181  --entity-type topics --entity-name my-topic --alter --delete-config max.message.bytes

生产和消费

生产者发送数据,注意参数是--broker-list
kafka-console-producer.sh --broker-list 192.168.2.140:9092,192.168.2.141:9092,192.168.2.142:9092 --topic test2

消费数据,注意参数是--bootstrap-server,不能使用--zookeeper
kafka-console-consumer.sh  --bootstrap-server 192.168.2.140:9092,192.168.2.141:9092,192.168.2.142:9092 --topic test2 --from-beginning

消费者(支持0.9版本+)
kafka-console-consumer.sh --bootstrap-server 192.168.2.140:9092,192.168.2.141:9092,192.168.2.142:9092 --topic test2 --from-beginning --consumer.config config/consumer.properties

消费数据到某个文件,如下的kafka.txt
kafka-console-consumer.sh  --bootstrap-server 192.168.2.140:9092,192.168.2.141:9092,192.168.2.142:9092 --topic test2 --from-beginning > kafka.txt

kafka-verifiable-consumer.sh(消费者事件,例如:offset提交等)
kafka-verifiable-consumer.sh --broker-list localhost:9092 --topic test --group-id groupName


查看消费者消费主题的情况
kafka-consumer-groups.sh --bootstrap-server 192.168.2.140:9092,192.168.2.141:9092,192.168.2.142:9092 --group connect-local-file-sink --describe
Consumer group 'connect-local-file-sink' has no active members.

主题	分区	当前位置	结束位置	剩余	消费者id	主机	客户端id
TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
connect-test    0          3               3               0               -               -               -

对topic进行负载均衡

kafka版本 <= 2.4
kafka-preferred-replica-election.sh  --zookeeper localhost:2181
kafka新版本
kafka-preferred-replica-election.sh  --bootstrap-server 192.168.2.140:9092
查看目前的消费组
kafka-consumer-groups.sh --list  --bootstrap-server localhost:9092
查看某个消费者的详情
kafka-consumer\-groups.sh  --bootstrap-server localhost:9092 --describe --group test-consumer-group

kafka自带压测命令

kafka-producer-perf-test.sh --topic test --num-records 1000 --record-size 1 --throughput 100  --producer-props bootstrap.servers=localhost:9092
501 records sent, 100.2 records/sec (0.00 MB/sec), 28.5 ms avg latency, 794.0 ms max latency.
1000 records sent, 99.860196 records/sec (0.00 MB/sec), 16.42 ms avg latency, 794.00 ms max latency, 3 ms 50th, 28 ms 95th, 481 ms 99th, 794 ms 99.9th.

调整主题

副本扩容

kafka-topics.sh  --alter --zookeeper 192.168.2.140:2181,192.168.2.141:2181,192.168.2.142:2181 --topic test --partitions 3
kafka-topics.sh --alter --topic topic_name --partitions 5 --bootstrap-server localhost:9092

分区调整

创建规则json,注意replicas后面的数字为kafka的broker.id。

{"version":1,
"partitions":[
{"topic":"test","partition":0,"replicas":[0,1,2]},
{"topic":"test","partition":1,"replicas":[1,2,0]},
{"topic":"test","partition":2,"replicas":[2,0,1]}
]}

执行

kafka-reassign-partitions.sh --zookeeper  192.168.2.140:2181,192.168.2.141:2181,192.168.2.142:2181  --reassignment-json-file 1.json --execute 

验证

kafka-reassign-partitions.sh --zookeeper 192.168.2.140:2181,192.168.2.141:2181,192.168.2.142:2181 --reassignment-json-file 1.json --verify
Status of partition reassignment: 
Reassignment of partition test-0 completed successfully
Reassignment of partition test-1 completed successfully
Reassignment of partition test-2 completed successfully

kafka持续发送消息

持续发送消息到指定的topic中,且每条发送的消息都会有响应信息:

 kafka-verifiable-producer.sh --broker-list $(hostname -i):9092 --topic test --max-messages 1000000

查看消费者主题

Kafka0.9版本之前,consumer默认将offset保存在Zookeeper中。从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为__consumer_offsets

查看消费者主题的相关信息命令如下:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic __consumer_offsets --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning

优雅的关闭kafka

在配置文件中添加以下配置:

controlled.shutdown.enable=true

kafka在数据迁移期间限制带宽的使用

#可以改变速率限制多次执行
kafka-reassign-partitions.sh --zookeeper localhost:2181--execute --reassignment-json-file 1.json —throttle 50000000
#确认限制被移除
kafka-reassign-partitions.sh --zookeeper localhost:2181  --verify --reassignment-json-file bigger-cluster.json

更多关于kafka的知识分享,请前往博客主页。编写过程中,难免出现差错,敬请指出

更多推荐

关于String、StringBuffer、StringBuilder

1.String可以被继承吗?String类由final修饰,所以不能被继承。扩展阅读在Java中,String类被设计为不可变类,主要表现在它保存字符串的成员变量是final的。Java9之前字符串采用char[]数组来保存字符,即privatefinalchar[]value;Java9做了改进,采用byte[]数

【人工智能】企业如何使用 AI与人工智能的定义、研究价值、发展阶段的深刻讨论

前言人工智能(ArtificialIntelligence),英文缩写为AI。它是研究、开发用于模拟、延伸和扩展人的智能的理论、方法、技术及应用系统的一门新的技术科学。人工智能是新一轮科技革命和产业变革的重要驱动力量。📕作者简介:热爱跑步的恒川,致力于C/C++、Java、Python等多编程语言,热爱跑步,喜爱音乐

Maven 直接依赖、间接依赖、依赖冲突、依赖仲裁

文章目录直接依赖和间接依赖依赖冲突Maven的依赖仲裁最短路径优先先声明优先手动解决依赖冲突直接依赖和间接依赖在项目中直接引入的依赖叫做直接依赖,而那些被动引入的就叫间接依赖比如上图中,A是我们的项目,我们在项目中直接引入了B模块,所以B和A的关系就是直接依赖,而B工程内部引入了C,所以B和C也是直接依赖关系,如果B工

Python编程指南:利用HTTP和HTTPS适配器实现智能路由

嗨,爬虫大佬们!今天我要为大家分享一篇关于如何利用HTTP和HTTPS适配器来实现智能路由的Python编程指南。在现代互联网应用中,路由功能起着至关重要的作用,而利用Python编程语言实现智能路由则可以为我们的应用带来更高的灵活性和性能优化。接下来,让我们一起深入了解这个令人激动的主题吧!1、了解HTTP和HTTP

简单易上手,亚马逊云科技Amazon CodeWhisperer个性化辅助功能成为开发者好帮手

AmazonCodeWhisperer介绍AmazonCodeWhisperer是亚马逊云科技出品的一款基于机器学习的通用代码生成器,可实时提供代码建议。类似Cursor和GithubCopilot编码工具。在编写代码时,它会自动根据您现有的代码和注释生成建议。从单行代码建议到完整的函数,它可为您提供各种大小和范围的个

关于React Hooks的面试题及其答案

请解释一下ReactHooks是什么,以及它的优点和缺点是什么?Hooks是React16.8版本引入的一种新特性,它允许你在不写class的情况下操作state和其他React特性。Hooks是一种特殊的函数,可以让你“钩入”React的特性。它的优点是让编写组件更简单方便,同时可以自定义hook把公共的逻辑提取出来

面试中的技术趋势:如何展示你跟进最新技术的能力

🌷🍁博主猫头虎(🐅🐾)带您GotoNewWorld✨🍁🦄博客首页——🐅🐾猫头虎的博客🎐🐳《面试题大全专栏》🦕文章图文并茂🦖生动形象🐅简单易学!欢迎大家来踩踩~🌺🌊《IDEA开发秘籍专栏》🐾学会IDEA常用操作,工作效率翻倍~💐🌊《100天精通Golang(基础入门篇)》🐅学会Gol

小程序测试基础知识分享,获取专业测试报告就找卓码软件测评

近年来,随着互联网的快速发展,小程序测试成为了一个重要的环节。而小程序测试的内容以及注意事项则会直接影响到产品的质量和用户体验。卓码软件测评作为专业的软件测试公司,在软件测试方面有着丰富的经验。下面将从多个角度来详细描述小程序测试的内容和注意事项,并分析测试对产品的作用。一、小程序测试的内容1、功能测试:各项功能的稳定

如何查看mysql的存储引擎

要查看MySQL中的存储引擎,可以使用以下两种方法:1.使用SQL查询:您可以使用SQL查询来查看MySQL中的存储引擎。打开MySQL客户端,并连接到您的MySQL服务器,然后运行以下SQL查询:SHOWTABLESTATUS;这将列出所有数据库中的表以及与每个表相关的信息,包括存储引擎。在结果中,可以查看"Engi

腾讯云16核服务器配置大全_CVM和轻量服务器汇总

腾讯云16核CPU服务器有哪些配置可以选择?可以选择标准型S6、标准型SA3、计算型C6或标准型S5等,目前标准型S5云服务器有优惠活动,性价比高,计算型C6云服务器16核性能更高,轻量16核32G28M带宽优惠价3468元15个月,腾讯云百科分享腾讯云16核CPU服务器可以选择的云服务器CVM规格列表:目录腾讯云16

二蛋赠书三期:《C#入门经典(第9版)》

文章目录前言活动规则参与方式本期赠送书籍介绍作者介绍内容简介读者对象获奖名单结语前言大家好!我是二蛋,一个热爱技术、乐于分享的工程师。在过去的几年里,我一直通过各种渠道与大家分享技术知识和经验。我深知,每一位技术人员都对自己的技能提升和职业发展有着热切的期待。因此,我非常感激大家一直以来对我的关注和支持。为了回馈大家的

热文推荐