Reference
Kafka Tools 管理工具
Note: 不同版本间参数变动较大,命令具体参数使用需要查看相对应版本的命令使用,例如老版本(0.9之前) offset/group metadata 是默认存储在 zookeeper 中的(i.e.
offsets.storage=zookeeper
),因此指定地址的时候使用--zookeeper
指定 zookeeper 地址,而新版本(0.9+)将 offset 信息存储到 kafka 集群上就需要--bootstrap-server
指定 kafka 集群地址,下面的命令使用的版本是 Apache Kafka 2.3
在$KAFKA_HOME/bin
目录下,放着kafka常用的一些工具脚本,这些类的实现都是放在源码的kafka/core/src/main/scala/kafka/tools/
路径下。
想查看这些脚本的使用参数说明的话,直接运行脚本。例如查看kafka-topics.sh
的使用参数说明:
$ ./kafka-topics.sh
# 下面就会详细的参数介绍
Create, delete, describe, or change a topic.
Option Description
------ -----------
--alter Alter the number of partitions,
replica assignment, and/or
configuration for the topic.
--config <String: name=value$ A topic configuration override for the
topic being created or altered.The
following is a list of valid
......
常用命令
管理 topics
# 添加一个新的 topic,指定分区、副本数量
$ bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name \
--partitions 20 --replication-factor 3 --config x=y
# 增加分区数量,Kafka 分区数目前只能增加,且增加分区后对已有数据不会重新分区,如果使用 hash % partitionNum 分区,只对新数据生效
$ bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic my_topic_name \
--partitions 40
# 查看集群上的 topics
$ bin/kafka-topics.sh --bootstrap-server broker_host:port --list
# 查看某个 topic 的详细信息,比如分区在节点上的分布情况,副本情况
$ bin/kafka-topics.sh --bootstrap-server broker_host:port --topic my_topic_name --describe
# 删除指定的 topic,需要保证参数`delete.topic.enable=true`
$ bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic my_topic_name
# Note: 如果没有配置`delete.topic.enable=true`,那么此时并不是真正的删除,而只是把 topic marked for deletion
# 还需要手动删除一下 Zookeeper 中的 Topic 记录
$ ./zkCli.sh -server 10.4.121.218:2181
# 该路径下存储了所有 marked for deleteion 的 topics
$ ls /admin/delete_tpoics
[mytopic]
$ delete /admin/delete_tpoics/mytopic
# 该路径存储了所有的 topics 信息
$ ls /brokers/topics/
[mytopic]
$ rmr /brokers/topics/mytopic
# 添加一个 config
$ bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics \
--entity-name my_topic_name --alter --add-config x=y
# 删除一个 config
$ bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics \
--entity-name my_topic_name --alter --delete-config x
优雅的关闭集群
平衡 Leader
管理消费者组
# 查看所有的消费者组
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
# 检查某个消费者组的具体消费情况,Note:老版本使用 kafka-consumer-offset-checker.sh,现在已经被 remove 了
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
topic3 0 241019 395308 154289 consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2
topic2 1 520678 803288 282610 consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2
topic3 1 241018 398817 157799 consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2
topic1 0 854144 855809 1665 consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1
topic2 0 460537 803290 342753 consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1
topic3 2 243655 398812 155157 consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4
# 查看消费者组中的消费成员信息 --members,查看成员的具体的 topic 分区分配信息 --verbose
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose
# #PARTITIONS 是该 member 被分配的总分区数量
CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT
consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1 2 topic1(0), topic2(0)
consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4 1 topic3(2)
consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2 3 topic2(1), topic3(0,1)
consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1 consumer3 0 -
# 删除消费者组
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group --group my-other-group
Deletion of requested consumer groups ('my-group', 'my-other-group') was successful.
控制台生产者消费者
# 创建生产者
$ ./kafka-console-producer.sh \
--broker-list 10.4.121.218:9092 \
--topic mytopic
# 创建消费者
# 每次都是从开始位置消费,在生产环境下不建议这样使用。
$ ./kafka-console-consumer.sh \
--bootstrap-server 10.4.121.218:9092 \
--topic mytopic \
--from-beginning
# 可以通过 --offset 指定开始消费的记录,可以指定一个非负整数,
# 或者`earliest`表示从第一条开始消费
# 或者`latest`表示从最后一条开始消费
$ ./kafka-console-consumer.sh \
--bootstrap-server 10.4.121.218:9092 \
--topic mytopic \
--offset earliest
其他
# The deprecated tool kafka-consumer-offset-checker.sh has been removed. Use kafka-consumer-groups.sh to get consumer group details.
# 显示出Consumer的Group、Topic、分区ID、分区对应已经消费的Offset、logSize大小,Lag以及Owner等信息。
# 使用脚本:kafka-consumer-offset-checker.sh
$ ./kafka-consumer-offset-checker.sh --broker-info \
--zookeeper 10.4.121.218:3333,10.4.121.202:3333,10.4.121.203:3333 \
--topic mytopic \
--group xb_id
# 有时候我们需要验证日志索引是否有损坏(corrupt),或者仅仅想从log文件中直接打印消息。
# 使用脚本:kafka-run-class.sh
$ ./kafka-run-class.sh kafka.tools.DumpLogSegments
$ ./kafka-run-class.sh kafka.tools.DumpLogSegments /nodedata/kafka/kafka-logs/xb_topic-0/00000000000000000033.log
$ ./kafka-run-class.sh kafka.tools.DumpLogSegments --print-data-log \
--files /nodedata/kafka/kafka-logs/xb_topic-0/00000000000000000033.log
# 导出Zookeeper中Group相关的偏移量。有时候我们需要导出某个Consumer group各个分区的偏移量。
# 使用脚本:kafka-run-class.sh
$ ./kafka-run-class.sh kafka.tools.ExportZkOffsets
$ ./kafka-run-class.sh kafka.tools.ExportZkOffsets \
--group test_group \
--zkconnect 10.4.121.218:3333 \
--output-file ~/offset
$ vim ~/offset
# 这个工具主要作用是从一个Kafka集群里面读取指定Topic的消息,并将这些消息发送到其他集群的指定topic中。
# 使用脚本:./kafka-replay-log-producer.sh
# kafka-simple-consumer-shell.sh工具主要是使用Simple Consumer API从指定Topic的分区读取数据并打印在终端。
$ ./kafka-simple-consumer-shell.sh \
--broker-list 10.4.121.218:9092 \
--topic mytopic \
--partition 0
# kafka.tools.UpdateOffsetsInZK工具可以更新Zookeeper中指定Topic所有分区的偏移量,可以指定成 earliest或者latest
$ ./kafka-run-class.sh kafka.tools.UpdateOffsetsInZK
# 最后再注意kafka的启动和停止。
# 启动kafka:
./kafka-server-start.sh /kafka/config/server.properties $/dev/null 2$&1 &
# 停止kafka: 直接kill掉进程就行。
$ ps aux | grep server-1.properties
root 11552 2.5 2.0 8028360 336356 pts/3 Sl 10:13 1:36 /usr/java/latest/bin/java......
$ kill -9 11552
Kafka-Manager
# WARN: require jdk1.8,使用 jdk11 的时候会出现 NullPointerException
# 添加 sbt 镜像源,默认 jcenter 连不上
$ vim ~/.sbt/repositories
[repositories]
local
aliyun: http://maven.aliyun.com/nexus/content/groups/public/
central: http://repo1.maven.org/maven2/
$ cd kafka-manager-master
# 编译为 zip
$ ./sbt clean dist
# 编译完的 zip 包在 ./target/universal/kafka-manager-1.3.3.22.zip
# 编译为 rpm
# 确认当前机器安装了 rpm(rpmbuild),参考 https://github.com/yahoo/kafka-manager/issues/51
# 安装了 rpm 之后,rpmbuild 就可以使用了
$ brew install rpm
$ ./sbt rpm:packageBin
# 编译完成的 rpm 包在 ./target/rpm/RPMS/noarch/kafka-manager-1.3.3.22-1.noarch.rpm
# 安装,rpm 默认安装目录 /usr/share/kafka-manager
$ rpm -ivh kafka-manager-1.3.3.22-1.noarch.rpm
# 修改配置文件,添加 zk 地址,并注释掉 kafka-manager.zkhosts
$ vim /etc/kafka-manager/application.conf
ZK_HOSTS="hadoop1:2181,hadoop2:2181,hadoop3:2181"
#kafka-manager.zkhosts="kafka-manager-zookeeper:2181"
# 修改日志输出位置
$ vim /etc/kafka-manager/logger.xml
$ vim /etc/kafka-manager/logback.xml
# 把 ${application.home} -> /var/log/kafka-manager
# 把 ${application.home} -> /var/log/kafka-manager
:%s/\${application.home}/\/usr\/share\/kafka-manager/g
# 设置开机启动,注意 /etc/systemd/system/multi-user.target.wants/kafka-manager.service 才是新创建的软连接!
$ systemctl enable kafka-manager
Created symlink from /etc/systemd/system/multi-user.target.wants/kafka-manager.service to /usr/lib/systemd/system/kafka-manager.service.
# 如果不想修改上面的文件,也可以自己启动 kafkamanager 指定 application.home
$ bin/kafka-manager \
-Dconfig.file=/usr/share/kafka-manager/conf/application.conf \
-Dapplication.home=/usr/share/kafka-manager \
-Dlogger.file=/etc/kafka-manager/logger.xml \
-Dhttp.port=1911 >kafka-manager.log 2>&1 &
# 也可以修改 systemd service 启动参数,比如修改端口
$ cat /usr/lib/systemd/system/kafka-manager.service
...
[Service]
Type=simple
WorkingDirectory=/usr/share/kafka-manager
EnvironmentFile=/etc/default/kafka-manager
...
$ vim /etc/default/kafka-manager
JAVA_OPTS="-Dpidfile.path=/var/run/kafka-manager.pid -Dconfig.file=/etc/kafka-manager/application.conf -Dlogger.file=/etc/kafka-manager/logger.xml -Dhttp.port=1911"
kafka-manager.service 起不来
$ systemctl status kafka-manager
...
loaded: loaded(/usr/lib/systemd/system/kafka-manager.service)
...
$ vim /usr/lib/systemd/system/kafka-manager.service
...
WorkingDirectory=/usr/share/kafka-manager
EnvironmentFile=/etc/default/kafka-manager
...
$ vim /etc/default/kafka-manager
...
PIDFILE="/var/run/kafka-manager.pid"
...
$ rm -f /var/run/kafka-manager.pid
Kafka Tool
Start
# Start with SASL Plaintext
$ kafkatool.exe -J-Djava.security.auth.login.config=C:\Users\zhangqiang\kafka_client_jaas.conf