Kafka-客户端命令
wenking 7/11/2023 kafka
# kafka启动命令
# 直接启动 zk
./zookeeper-server-start.sh ../config/zookeeper.properties
# 后台运行 zk
./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
# 直接启动 kafka
./kafka-server-start.sh ../config/server.properties
# 后台运行 kafka
./kafka-server-start.sh -daemon ../config/server.properties
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
# 主题相关
- 查看集群中所有主题
kafka-topics.sh --bootstrap-server localhost:9092 --list
1
- 查看指定主题信息
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic test
1
查询结果:
Topic: test TopicId: H4KfDOM7Q3OA10RyRlGysg PartitionCount: 3 ReplicationFactor: 1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: test Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: test Partition: 2 Leader: 0 Replicas: 0 Isr: 0
1
2
3
4
2
3
4
- 创建topic
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic1 --partitions 3 --replication-factor 1
1
- 删除topic
kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic1
1
- 修改topic
# 修改topic 修改分区数时,仅能增加分区个数。不能用来修改副本个数。
kafka-topics.sh --alter --bootstrap-server node1:9092,node2:9092,node3:9092 --topic topicName --partitions 3
# 修改 副本个数
kafka-topics.sh --alter --bootstrap-server node1:9092,node2:9092,node3:9092 --topic topicName --config flush.messages=1
kafka-topics.sh --alter --bootstrap-server node1:9092,node2:9092,node3:9092 --topic topicName --delete-config flush.messages
1
2
3
4
5
2
3
4
5
# 消费者组相关
- 查看所有消费者组
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
1
- 查看消费者组详情
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group c1
1
查询结果:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
c2 asd 0 665 665 0 consumer-c2-1-2091869c-6fd2-48f9-a530-f65caf1062d8 /192.168.1.217 consumer-c2-1
1
2
2
- 查询所有消费者组详情
for i in `./kafka-consumer-groups.sh --bootstrap-server 192.168.1.6:9092 --list`;
do
echo "======================= group: $i ========================="
./kafka-consumer-groups.sh --bootstrap-server 192.168.1.6:9092 --group $i --describe;
echo ""
done
1
2
3
4
5
6
2
3
4
5
6
# 模拟操作
- 模拟生产者
# 发送无 key 消息
kafka-console-producer.sh --broker-list localhost:9092 --topic test
# 发送 有 key 消息, 默认以 tab 分割
kafka-console-producer.sh --bootstrap-server localhsot:9092 --topic topicName --property parse.key=true
1
2
3
4
5
2
3
4
5
- 模拟消费者
# 指定主题从最新消息开始消费
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1 --group c-group-1
# 指定主题从头开始消费
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1 --group c-group-1 --from-beginning
1
2
3
4
5
2
3
4
5
# 消费者组offset操作
# 重置偏移为最大值 , 之前的都不消费
kafka-consumer-groups.bat --bootstrap-server 192.168.1.133:9092 --reset-offsets --group c1 --topic asd --to-latest --execute
# 重置偏移为0
kafka-consumer-groups.bat --bootstrap-server 192.168.1.133:9092 --reset-offsets --group c1 --topic asd --to-earliest --execute
# 重置偏移为 指定偏移值
kafka-consumer-groups.bat --bootstrap-server 192.168.1.133:9092 --reset-offsets --group c1 --topic asd --to-offset 400 --execute
# 重置偏移 到指定时间
kafka-consumer-groups.bat --bootstrap-server 192.168.1.133:9092 --reset-offsets --group c1 --topic asd --to-datetime 2022-12-16T10:22:44.000 --execute
#------------------------------------------------------------------------------------------------
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10