Kafka-Java消费者API
wenking 8/10/2023 kafka
# 消费者基本配置
public class KafkaConsumerTest {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.128:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 必须指定消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-1");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 订阅主题
List<String> topics = new ArrayList<>();
topics.add("test");
consumer.subscribe(topics);
while (true) {
// 每隔一秒拉取一个批次
ConsumerRecords<String, String> batchRecords = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : batchRecords) {
System.out.println(record);
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 消费指定分区配置
// 订阅主题改为订阅主题分区
List<TopicPartition> partitions = new ArrayList<>();
TopicPartition partition = new TopicPartition("test", 0);
partitions.add(partition);
consumer.assign(partitions);
1
2
3
4
5
2
3
4
5
# 分区分配和再平衡
作用:确定消费者消费哪个partition的数据
partition.assignment.strategy指定分区的分配策略,默认策略是 Range + CooperativeSticky。
- Range
- RoundRobin
- Sticky
- CooperativeSticky
触发再平衡方式:
- 默认情况下消费者每隔3s和coordinator保持心跳,一旦超时(session.timeout.ms=45s),该消费者会被移除,触发再平衡;
- 消费者处理消息时间过长(max.poll.interval.ms=5m),也会触发再平衡;
分区再分配日志信息
[main] 2023-08-10 10:28:19.772 INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-c-group-1, groupId=c-group] Request joining group due to: group is already rebalancing
[main] 2023-08-10 10:28:19.775 INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-c-group-1, groupId=c-group] Revoke previously assigned partitions test-1, test-0
[main] 2023-08-10 10:28:19.775 INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-c-group-1, groupId=c-group] (Re-)joining group
[main] 2023-08-10 10:28:19.779 INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-c-group-1, groupId=c-group] Successfully joined group with generation Generation{generationId=8, memberId='consumer-c-group-1-a6520723-e869-44ca-9b19-8575617d4a7d', protocol='range'}
[main] 2023-08-10 10:28:19.784 INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-c-group-1, groupId=c-group] Successfully synced group in generation Generation{generationId=8, memberId='consumer-c-group-1-a6520723-e869-44ca-9b19-8575617d4a7d', protocol='range'}
[main] 2023-08-10 10:28:19.786 INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-c-group-1, groupId=c-group] Notifying assignor about the new Assignment(partitions=[test-1])
[main] 2023-08-10 10:28:19.786 INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-c-group-1, groupId=c-group] Adding newly assigned partitions: test-1
[main] 2023-08-10 10:28:19.793 INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-c-group-1, groupId=c-group] Setting offset for partition test-1 to the committed offset FetchPosition{offset=85, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[192.168.1.128:9092 (id: 0 rack: null)], epoch=0}}
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
- Revoke previously assigned partitions test-1, test-0 , 之前消费者消费分区包含 0, 1
- protocol='range': 消费者分区分配方式
- Adding newly assigned partitions: test-1 , 重新分配分区包含 1
# range
定义:首先对同一个topic里面的分区按照序列号进行排序,对消费者按照字母进行排序,然后通过 partition号 / consumer号 来决定消费者消费第几个分区。如果除不尽,那么靠前的消费者将多消耗一个分区。
1
问题:如果消费组订阅了多个主题,并且这些主题数并不能被分区数整除,那么可能前几个消费者的负载将非常高,应为多个topic都将多一个partition给靠前的消费者。
# roundrobin
针对集群中所有的topic,把所有partition和所有consumer都列出来,然后按照hashcode进行排序,最后通过轮询算法分配partition给各个消费者。
1
# sticky
定义:类似range,消费者消费分区并不一定是固定顺序的,而是随机的。
# 自动提交offset
- enable.auto.commit: 是否开启自动提交offset功能,默认开启
- auto.commit.interval.ms: 自动提交offset的时间间隔,默认是5s
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
1
2
2
# offset配置
- auto.offset.reset: latest | earliest | none
latest: 如果消费者组offset存在,则从offset开始消费,不存在,则从最近处开始消费
earliest:如果消费者组offset存在,则从offset开始消费,不存在,则从头开始消费
none:如果消费者组offset存在,则从offset开始消费,不存在,则抛异常
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
1
# 获取指定分区offset
TopicPartition partition = new TopicPartition("test", 2);
long position = consumer.position(partition);
log.info(position);
1
2
3
4
2
3
4
# 手动提交offset
# 同步提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumer.subscribe(Collections.singleton("test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
log.info(record.toString());
}
// 手动提交 offset, 对拉取的批次数据进行提交
consumer.commitSync();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
# 异步提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumer.subscribe(Collections.singleton("test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
log.info(record.toString());
}
// 手动提交 offset, 对拉取的批次数据进行提交
consumer.commitAsync();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
# 对单条记录进行提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
if (record.offset() == 10) {
consumer.close();
return;
}
// 根据记录信息 进行同步提交
Map<TopicPartition, OffsetAndMetadata> meta = new HashMap<>();
meta.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset()));
consumer.commitSync(meta);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 指定offset消费
// 拿到所有的分区信息
Set<TopicPartition> assignments = consumer.assignment();
// 保障分区方案已经制定完毕: 确保消费者组初始化完成
while (assignments.size() == 0) {
consumer.poll(Duration.ofSeconds(1));
assignments = consumer.assignment();
}
// 遍历所有分区,然后每个分区的偏移量设置为100, 从100 这个偏移量开始消费
for (TopicPartition partition : assignments) {
consumer.seek(partition, 100);
}
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
log.info(record.toString());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 指定时间消费
List<TopicPartition> partitions = new ArrayList<>();
// 指定消费分区
TopicPartition partition = new TopicPartition("test", 2);
partitions.add(partition);
consumer.assign(partitions);
// 拿到所有的分区信息
Set<TopicPartition> assignments = consumer.assignment();
// 保障分区方案已经制定完毕
while (assignments.size() == 0) {
consumer.poll(Duration.ofSeconds(1));
assignments = consumer.assignment();
}
Map<TopicPartition, Long> map = new HashMap<>();
// 初始化集合,指定分区和对应时间关系
for (TopicPartition partition : assignments) {
map.put(partition, System.currentTimeMillis() - 24 * 60 * 60 * 1000);
}
Map<TopicPartition, OffsetAndTimestamp> tfmap = consumer.offsetsForTimes(map);
for (TopicPartition partition : assignments) {
OffsetAndTimestamp offsetAndTimestamp = tfmap.get(partition);
consumer.seek(partition, offsetAndTimestamp.offset());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28