Kafka 集群管理

Reference

Kafka Tools 管理工具

Note: 不同版本间参数变动较大,命令具体参数使用需要查看相对应版本的命令使用,例如老版本(0.9之前) offset/group metadata 是默认存储在 zookeeper 中的(i.e. offsets.storage=zookeeper),因此指定地址的时候使用--zookeeper指定 zookeeper 地址,而新版本(0.9+)将 offset 信息存储到 kafka 集群上就需要--bootstrap-server指定 kafka 集群地址,下面的命令使用的版本是 Apache Kafka 2.3

$KAFKA_HOME/bin目录下,放着kafka常用的一些工具脚本,这些类的实现都是放在源码的kafka/core/src/main/scala/kafka/tools/路径下。

想查看这些脚本的使用参数说明的话,直接运行脚本。例如查看kafka-topics.sh的使用参数说明:

$ ./kafka-topics.sh
# 下面就会详细的参数介绍
Create, delete, describe, or change a topic.
Option                                   Description                            
------                                   -----------                            
--alter                                  Alter the number of partitions,        
                                           replica assignment, and/or           
                                           configuration for the topic.         
--config <String: name=value$            A topic configuration override for the 
                                           topic being created or altered.The   
                                           following is a list of valid         
......

常用命令

管理 topics

# 添加一个新的 topic,指定分区、副本数量
$ bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name \
        --partitions 20 --replication-factor 3 --config x=y

# 增加分区数量,Kafka 分区数目前只能增加,且增加分区后对已有数据不会重新分区,如果使用 hash % partitionNum 分区,只对新数据生效
$ bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic my_topic_name \
        --partitions 40

# 查看集群上的 topics
$ bin/kafka-topics.sh --bootstrap-server broker_host:port --list

# 查看某个 topic 的详细信息,比如分区在节点上的分布情况,副本情况
$ bin/kafka-topics.sh --bootstrap-server broker_host:port --topic my_topic_name --describe

# 删除指定的 topic,需要保证参数`delete.topic.enable=true`
$ bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic my_topic_name
# Note: 如果没有配置`delete.topic.enable=true`,那么此时并不是真正的删除,而只是把 topic marked for deletion
# 还需要手动删除一下 Zookeeper 中的 Topic 记录
$ ./zkCli.sh -server 10.4.121.218:2181
# 该路径下存储了所有 marked for deleteion 的 topics
$ ls /admin/delete_tpoics
[mytopic]
$ delete /admin/delete_tpoics/mytopic
# 该路径存储了所有的 topics 信息
$ ls /brokers/topics/
[mytopic]
$ rmr /brokers/topics/mytopic

# 添加一个 config
$ bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics \
        --entity-name my_topic_name --alter --add-config x=y

# 删除一个 config
$ bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics \
        --entity-name my_topic_name --alter --delete-config x

优雅的关闭集群

平衡 Leader

管理消费者组

# 查看所有的消费者组
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

# 检查某个消费者组的具体消费情况,Note:老版本使用 kafka-consumer-offset-checker.sh,现在已经被 remove 了
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                    HOST            CLIENT-ID
topic3          0          241019          395308          154289          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2
topic2          1          520678          803288          282610          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2
topic3          1          241018          398817          157799          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2
topic1          0          854144          855809          1665            consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1
topic2          0          460537          803290          342753          consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1
topic3          2          243655          398812          155157          consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1      consumer4

# 查看消费者组中的消费成员信息 --members,查看成员的具体的 topic 分区分配信息 --verbose
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose
# #PARTITIONS 是该 member 被分配的总分区数量
CONSUMER-ID                                    HOST            CLIENT-ID       #PARTITIONS     ASSIGNMENT
consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1       2               topic1(0), topic2(0)
consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1      consumer4       1               topic3(2)
consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2       3               topic2(1), topic3(0,1)
consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1      consumer3       0               -

# 删除消费者组
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group --group my-other-group
Deletion of requested consumer groups ('my-group', 'my-other-group') was successful.

控制台生产者消费者

# 创建生产者
$ ./kafka-console-producer.sh \
--broker-list 10.4.121.218:9092 \
--topic mytopic

# 创建消费者
# 每次都是从开始位置消费,在生产环境下不建议这样使用。
$ ./kafka-console-consumer.sh \
--bootstrap-server 10.4.121.218:9092 \
--topic mytopic \
--from-beginning 
# 可以通过 --offset 指定开始消费的记录,可以指定一个非负整数,
# 或者`earliest`表示从第一条开始消费
# 或者`latest`表示从最后一条开始消费
$ ./kafka-console-consumer.sh \
--bootstrap-server 10.4.121.218:9092 \
--topic mytopic \
--offset earliest

其他

# The deprecated tool kafka-consumer-offset-checker.sh has been removed. Use kafka-consumer-groups.sh to get consumer group details.
# 显示出Consumer的Group、Topic、分区ID、分区对应已经消费的Offset、logSize大小,Lag以及Owner等信息。
# 使用脚本:kafka-consumer-offset-checker.sh
$ ./kafka-consumer-offset-checker.sh --broker-info \
--zookeeper 10.4.121.218:3333,10.4.121.202:3333,10.4.121.203:3333 \
--topic mytopic \
--group xb_id

# 有时候我们需要验证日志索引是否有损坏(corrupt),或者仅仅想从log文件中直接打印消息。
# 使用脚本:kafka-run-class.sh
$ ./kafka-run-class.sh kafka.tools.DumpLogSegments 
$ ./kafka-run-class.sh kafka.tools.DumpLogSegments /nodedata/kafka/kafka-logs/xb_topic-0/00000000000000000033.log
$ ./kafka-run-class.sh kafka.tools.DumpLogSegments --print-data-log \
--files /nodedata/kafka/kafka-logs/xb_topic-0/00000000000000000033.log 

# 导出Zookeeper中Group相关的偏移量。有时候我们需要导出某个Consumer group各个分区的偏移量。
# 使用脚本:kafka-run-class.sh
$ ./kafka-run-class.sh kafka.tools.ExportZkOffsets
$ ./kafka-run-class.sh kafka.tools.ExportZkOffsets \
--group test_group \
--zkconnect 10.4.121.218:3333 \
--output-file ~/offset
$ vim ~/offset

# 这个工具主要作用是从一个Kafka集群里面读取指定Topic的消息,并将这些消息发送到其他集群的指定topic中。
# 使用脚本:./kafka-replay-log-producer.sh

# kafka-simple-consumer-shell.sh工具主要是使用Simple Consumer API从指定Topic的分区读取数据并打印在终端。
$ ./kafka-simple-consumer-shell.sh \
--broker-list 10.4.121.218:9092 \
--topic mytopic \
--partition 0

# kafka.tools.UpdateOffsetsInZK工具可以更新Zookeeper中指定Topic所有分区的偏移量,可以指定成 earliest或者latest
$ ./kafka-run-class.sh kafka.tools.UpdateOffsetsInZK

# 最后再注意kafka的启动和停止。
# 启动kafka: 
./kafka-server-start.sh /kafka/config/server.properties $/dev/null 2$&1 &
# 停止kafka: 直接kill掉进程就行。
$ ps aux | grep server-1.properties
root     11552  2.5  2.0 8028360 336356 pts/3  Sl   10:13   1:36 /usr/java/latest/bin/java......
$ kill -9  11552

通过 kafka connect 导入/导出数据

当需要从其他数据源导入kafka或者将kafka中的数据导入其他数据源,除了通过代码实现,还可以使用kafka connect来导入导出数据,它是一个可扩展的工具,通过运行connector,实现与外部系统交互的自定义逻辑。

# 造数据
$ echo -e "foo\nbar" > test.txt

# 在standalone模式下,启动两个connectors,需要配置三个配置文件。
# connect-standalone.properties配置kafka connect process,例如要连接到哪个broker,数据的序列化格式等。
# 其他的每个配置文件,都代表了要创建的每个connector,例如指定每个connector唯一的name,要实例化哪个connector class,和一些connector需要的其他的配置项。

# 修改 connect-standalone.properties
$ vim config/connect-standalone.properties
# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=ambari0:9093

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
# org.apache.kafka.connect.converters.ByteArrayConverter
# org.apache.kafka.connect.converters.DoubleConverter
# org.apache.kafka.connect.converters.FloatConverter
# org.apache.kafka.connect.converters.IntegerConverter
# org.apache.kafka.connect.converters.LongConverter
# org.apache.kafka.connect.converters.ShortConverter
# org.apache.kafka.connect.json.JsonConverter
# org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

# 导出 kafka 数据到 file
$ vim config/connect-file-sink.properties
name=kafka_to_file
connector.class=FileStreamSink
tasks.max=5
file=invert_index.txt
topics=invert_index
$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-sink.properties

# 导入 file 数据到 kafka
$ vim config/connect-file-source.properties
name=file_to_kafka
connector.class=FileStreamSource
tasks.max=5
file=invert_index.txt
topics=invert_index
$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties

Kafka-Manager

download

# WARN: require jdk1.8,使用 jdk11 的时候会出现 NullPointerException

# 添加 sbt 镜像源,默认 jcenter 连不上
$ vim ~/.sbt/repositories
[repositories]
  local
  aliyun: http://maven.aliyun.com/nexus/content/groups/public/
  central: http://repo1.maven.org/maven2/
  
$ cd kafka-manager-master
# 编译为 zip
$ ./sbt clean dist
# 编译完的 zip 包在 ./target/universal/kafka-manager-1.3.3.22.zip

# 编译为 rpm
# 确认当前机器安装了 rpm(rpmbuild),参考 https://github.com/yahoo/kafka-manager/issues/51
# 安装了 rpm 之后,rpmbuild 就可以使用了
$ brew install rpm
$ ./sbt rpm:packageBin
# 编译完成的 rpm 包在 ./target/rpm/RPMS/noarch/kafka-manager-1.3.3.22-1.noarch.rpm

# 安装,rpm 默认安装目录 /usr/share/kafka-manager
$ rpm -ivh kafka-manager-1.3.3.22-1.noarch.rpm

# 修改配置文件,添加 zk 地址,并注释掉 kafka-manager.zkhosts
$ vim /etc/kafka-manager/application.conf
ZK_HOSTS="hadoop1:2181,hadoop2:2181,hadoop3:2181"
#kafka-manager.zkhosts="kafka-manager-zookeeper:2181"

# 修改日志输出位置
$ vim /etc/kafka-manager/logger.xml
$ vim /etc/kafka-manager/logback.xml
# 把 ${application.home} -> /var/log/kafka-manager
# 把 ${application.home} -> /var/log/kafka-manager
:%s/\${application.home}/\/usr\/share\/kafka-manager/g

# 设置开机启动,注意 /etc/systemd/system/multi-user.target.wants/kafka-manager.service 才是新创建的软连接!
$ systemctl enable kafka-manager
Created symlink from /etc/systemd/system/multi-user.target.wants/kafka-manager.service to /usr/lib/systemd/system/kafka-manager.service.

# 如果不想修改上面的文件,也可以自己启动 kafkamanager 指定 application.home
$ bin/kafka-manager \
-Dconfig.file=/usr/share/kafka-manager/conf/application.conf \
-Dapplication.home=/usr/share/kafka-manager \
-Dlogger.file=/etc/kafka-manager/logger.xml \
-Dhttp.port=1911 >kafka-manager.log 2>&1 &

# 也可以修改 systemd service 启动参数,比如修改端口
$ cat /usr/lib/systemd/system/kafka-manager.service
...
[Service]
Type=simple
WorkingDirectory=/usr/share/kafka-manager
EnvironmentFile=/etc/default/kafka-manager
...

$ vim /etc/default/kafka-manager
JAVA_OPTS="-Dpidfile.path=/var/run/kafka-manager.pid -Dconfig.file=/etc/kafka-manager/application.conf -Dlogger.file=/etc/kafka-manager/logger.xml -Dhttp.port=1911"

kafka-manager.service 起不来

$ systemctl status kafka-manager
...
loaded: loaded(/usr/lib/systemd/system/kafka-manager.service)
...

$ vim /usr/lib/systemd/system/kafka-manager.service
...
WorkingDirectory=/usr/share/kafka-manager
EnvironmentFile=/etc/default/kafka-manager
...

$ vim /etc/default/kafka-manager
...
PIDFILE="/var/run/kafka-manager.pid"
...

$ rm -f /var/run/kafka-manager.pid

Kafka Tool

Start

# Start with SASL Plaintext
$ kafkatool.exe -J-Djava.security.auth.login.config=C:\Users\zhangqiang\kafka_client_jaas.conf

打赏一个呗

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏,一毛也是爱

打开支付宝扫一扫,即可进行扫码打赏哦