Kafka 0.10 常用运维命令

引言

Kafka是由LinkedIn开发的一个分布式的消息系统,它以可水平扩展和高吞吐率而被广泛使用,现在已经是Apache的项目。

Kafka系统自带了丰富的运维管理工具,都是基于命令行的,本文主要介绍一些常用的命令。

读者需要对Kafka已经有入门级的了解。

常用命令

以下命令都是在Kafka的主目录下执行的。

启动Kafka

启动命令需要指定配置文件

1
bin/kafka-server-start.sh config/server.properties

默认的启动方式并不是守护进程,可以添加’nohup’和’&’让进程保持在后台运行,即使断开SSH终端连接。

1
nohup bin/kafka-server-start.sh config/server.properties > ~/kafka-server-start.out 2>&1 &

topic 相关

列出 topic

1
bin/kafka-topics.sh --list --zookeeper localhost:2181

–zookeeper:指定zookeeper地址。

创建 topic

1
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 4 --topic test_topic

–replication-factor:备份数,就是数据保存几份,这里设置为3,表示该topic的数据会在3个节点上各保存一份。

–partitions:分区数

topic 详情

1
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test_topic

删除 topic

1
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test_topic

需要开启一个配置项才可以删除topic。

配置项在server.properties中:
delete.topic.enable = true

ps: 有个特殊的topic叫 __consumer_offsets,是Kafka内部使用的,不允许删除的。

生产者消费者相关

生产者

1
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic

–broker-list:kafka broker地址

消费者

1
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test_topic

分区相关

增加分区

1
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic test_topic --partitions 4

–partitions:指明将分区增加至多少个。
分区不能减少只能增加。

改变分区分布和副本数

1
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file test_topic.json --execute

–reassignment-json-file:指定一个分区方案文件。

分区方案文件是一个json格式的文本文件,需要自己编写。

举个例子,test_topic.json内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
{
"version": 1,
"partitions": [
{
"topic": "test_topic",
"partition": 0,
"replicas": [1,2]
},
{
"topic": "test_topic",
"partition": 1,
"replicas": [2,3]
},
{
"topic": "test_topic",
"partition": 2,
"replicas": [3,1]
},
{
"topic": "test_topic",
"partition": 3,
"replicas": [1,2]
}
]
}

解释其中一部分,剩下的自然也会明白。

1
2
3
"topic": "test_topic", //topic名称
"partition": 0, //分区编号,从0开始
"replicas": [1,2] //指定保存在哪个broker,这里是填写broker id。写两个id,意思就是有两份备份啦。

PS:这个方法不能用来增加分区数。第一个replicas就是默认leader。

改变分区和备份数的操作并不能立即完成,而是需要一段时间,内部操作会在后台运行。
所以需要…

检查进度

1
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file test_topic.json --verify

以上。