基本概念
- Broker: 服务代理。实质上就是kafka集群的一个物理节点。
- Topic: 特定类型的消息流。”消息”是字节的有效负载(payload),话题是消息的分类的名或种子名
- Partition: Topic的子概念。一个Topic可以有多个Partition,但一个Partition只属于一个Topic。此外,Partition则是Consumer消费的基本单元。消费时。每个消费线程最多只能使用一个Partition。一个topic中partition的数量,就是每个user group中消费该topic的最大并行数量。
- UserGroup: 为了便于实现MQ中多播,重复消费等引入的概念。如果ConsumerA和ConsumerB属于同一个UserGroup,那么对于ConsumerA消费过的数据,ConsumerB就不能再消费了。也就是说,同一个user group中的consumer使用同一套offset
- Offset: Offset是专门对于Partition和UserGroup而言的,用于记录某个UserGroup在某个Partition中当前已经消费到达的位置。
- Producer: 生产者,能够发布消息到话题的任何对象。直接向某topic下的某partition发送数据。leader负责主备策略、写入数据、发送ack。
- Consumer: 消费者。可以订阅一个或者多个话题,通过fetch的方式从Broker中拉取数据,从而消费这些已经发布的信息的对象。kafka server不直接负责每个consumer消费到了哪,所以需要client和zk联合维护每个partition读到了哪里,即offset。
下载
下载 kafka_2.11-0.11.0.1.tgz
安装
$ tar -zxvf kafka_2.11-0.11.0.1.tgz
配置、启动
$ cd kafka_2.11-0.11.0.1.tgz
# 启动zookeeper,如果集群中已经存在zookeeper可以忽略这一步
# 如果出现端口占用,可以修改zookeeper.properties文件中的默认端口号
$ bin/zookeeper-server-start.sh config/zookeeper.properties &
# 新添加两个kafka节点
$ cd config
$ cp server.properties server-1.properties
$ cp server.properties server-2.properties
# 修改 broker.id、port、datadir
$ vim server-1.properties
broker.id=1
listeners=PLAINTEXT://10.4.121.218:9093
log.dir=/tmp/kafka-logs-1
$ vim server-2.properties
broker.id=2
listeners=PLAINTEXT://10.4.121.218:9094
log.dir=/tmp/kafka-logs-2
# 启动kafka集群
$ bin/kafka-server-start.sh config/server.properties &
$ bin/kafka-server-start.sh config/server-1.properties &
$ bin/kafka-server-start.sh config/server-2.properties &
# 查看启动情况
$ jobs
# 启动情况如下
[1] Running cd .. && bin/zookeeper-server-start.sh config/zookeeper.properties & (wd: /usr/zhangqiang/kafka_2.11-0.11.0.1/config)
[2] Running bin/kafka-server-start.sh config/server.properties &
[3]- Running bin/kafka-server-start.sh config/server-1.properties &
[4]+ Running bin/kafka-server-start.sh config/server-2.properties &
测试
# 创建一个有3个副本和一个分区的话题 my-replicated-topic
$ bin/kafka-topics.sh --create \
--zookeeper 10.4.121.218:3333 \
--replication-factor 3 \
--partitions 1 \
--topic my-replicated-topic
# 注意:新版本用 --bootstrap-server 指定Kafka broker地址,--zookeeper 被弃用了)
# 因此所有 --zookeeper 都可以用 --bootstrap-server 替代,如下:
$ bin/kafka-topics.sh --create \
--bootstrap-server 10.4.121.218:9093,10.4.121.218:9094 \
--replication-factor 3 \
--partitions 1 \
--topic my-replicated-topic
# 查看broker的情况
$ bin/kafka-topics.sh --describe \
--zookeeper 10.4.121.218:3333 \
--topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
# 配置情况:分区数据在节点broker.id=0、主节点是broker.id=1、副本集是broker.id=1,0,2、isr是broker.id=1,0,2
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
# 发布消息到话题
$ bin/kafka-console-producer.sh \
--broker-list 10.4.121.218:9092 \
--topic my-replicated-topic
...
my test message 1
my test message 2
^C
# 消费消息
$ bin/kafka-console-consumer.sh \
--bootstrap-server 10.4.121.218:9092 \
--from-beginning \
--topic my-replicated-topic
...
my test message 1
my test message 2
^C
# 测试集群容错,干掉主节点broker.id=1
$ ps aux|grep server-1.properties
root 11552 2.5 2.0 8028360 336356 pts/3 Sl 10:13 1:36 /usr/java/latest/bin/java......
$ kill -9 11552
$ bin/kafka-topic.sh --describe --zookeeper 10.4.121.218:3333 -topic my-replicated-topic
# 可以发现leader变成了broker.id=0,而且broker.id=1也不在Isr中了
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 1,0,2 Isr: 0,2
# 依然可以正常消费消息
$ bin/kafka-console-consumer.sh \
--bootstrap-server 10.4.121.218:9092 \
--from-beginning \
--topic my-replicated-topic
...
my test message 1
my test message 2
^C
# 重新启动broker.id=1
$ bin/kafka-server-start.sh config/server-1.properties &
$ bin/kafka-topic.sh --describe \
--zookeeper 10.4.121.218:3333 \
--topic my-replicated-topic
# Isr中又有了broker.id=1
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 1,0,2 Isr: 0,2,1
异常情况
zookeeper 只能本地访问,其他节点通过 ip 无法连接
这是因为 zookeeper 默认是通过hostname
监听连接的,检查下/etc/hosts
,如果你的hostname
指向了127.0.0.1
,那么就只能本地访问了。
# 查看本机名称
$ hostname
hadoop1
$ vim /etc/hosts
# 下面这一行应该改为 192.168.51.21 hadoop1
127.0.0.1 hadoop1