Kafka-客户端命令

7/11/2023 kafka

# kafka启动命令

# 直接启动 zk
./zookeeper-server-start.sh ../config/zookeeper.properties
# 后台运行 zk
./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
# 直接启动 kafka
./kafka-server-start.sh ../config/server.properties
# 后台运行 kafka
./kafka-server-start.sh -daemon ../config/server.properties
1
2
3
4
5
6
7
8

# 主题相关

  • 查看集群中所有主题
kafka-topics.sh --bootstrap-server localhost:9092 --list
1
  • 查看指定主题信息
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic test
1

查询结果:

Topic: test	TopicId: H4KfDOM7Q3OA10RyRlGysg	PartitionCount: 3	ReplicationFactor: 1	Configs: 
	Topic: test	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
	Topic: test	Partition: 1	Leader: 0	Replicas: 0	Isr: 0
	Topic: test	Partition: 2	Leader: 0	Replicas: 0	Isr: 0
1
2
3
4
  • 创建topic
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic1 --partitions 3 --replication-factor 1
1
  • 删除topic
kafka-topics.sh --zookeeper localhost:2181  --delete --topic topic1
1
  • 修改topic
# 修改topic     修改分区数时,仅能增加分区个数。不能用来修改副本个数。
kafka-topics.sh --alter --bootstrap-server node1:9092,node2:9092,node3:9092 --topic topicName --partitions 3
# 修改 副本个数
kafka-topics.sh --alter --bootstrap-server node1:9092,node2:9092,node3:9092 --topic topicName --config flush.messages=1
kafka-topics.sh --alter --bootstrap-server node1:9092,node2:9092,node3:9092 --topic topicName --delete-config flush.messages
1
2
3
4
5

# 消费者组相关

  • 查看所有消费者组
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
1
  • 查看消费者组详情
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group c1
1

查询结果:

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                        HOST            CLIENT-ID
c2              asd             0          665             665             0               consumer-c2-1-2091869c-6fd2-48f9-a530-f65caf1062d8 /192.168.1.217  consumer-c2-1
1
2
  • 查询所有消费者组详情
for i in `./kafka-consumer-groups.sh --bootstrap-server 192.168.1.6:9092 --list`;
do
  echo "======================= group: $i ========================="
  ./kafka-consumer-groups.sh --bootstrap-server 192.168.1.6:9092 --group $i --describe;
  echo ""
done
1
2
3
4
5
6

# 模拟操作

  • 模拟生产者
# 发送无 key 消息
kafka-console-producer.sh --broker-list localhost:9092 --topic test

# 发送 有 key 消息, 默认以 tab 分割
kafka-console-producer.sh --bootstrap-server localhsot:9092 --topic topicName --property parse.key=true
1
2
3
4
5
  • 模拟消费者
# 指定主题从最新消息开始消费
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1 --group c-group-1

# 指定主题从头开始消费
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1 --group c-group-1 --from-beginning
1
2
3
4
5

# 消费者组offset操作


# 重置偏移为最大值   , 之前的都不消费
kafka-consumer-groups.bat --bootstrap-server 192.168.1.133:9092  --reset-offsets --group c1 --topic asd --to-latest --execute
# 重置偏移为0
kafka-consumer-groups.bat --bootstrap-server 192.168.1.133:9092  --reset-offsets --group c1 --topic asd --to-earliest --execute
# 重置偏移为 指定偏移值
kafka-consumer-groups.bat --bootstrap-server 192.168.1.133:9092  --reset-offsets --group c1 --topic asd --to-offset 400 --execute
# 重置偏移 到指定时间
kafka-consumer-groups.bat --bootstrap-server 192.168.1.133:9092  --reset-offsets --group c1 --topic asd --to-datetime 2022-12-16T10:22:44.000 --execute
#------------------------------------------------------------------------------------------------
1
2
3
4
5
6
7
8
9
10