Kafka-Java生产者API
wenking 8/8/2023 kafka
# 生产者基本配置
public class KafkaProducerTest {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.128:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
producer.send(new ProducerRecord<>("topic1", "hello"));
producer.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 同步发送
producer.send(new ProducerRecord<>("topic1", "hello")).get();
1
# 异步回调发送
producer.send(new ProducerRecord<>("topic1", "hello"), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("主题: " + metadata.topic() + " 分区: " + metadata.partition());
}
}
});
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
# 分区器
producer发送 拦截器 序列化器 分区器
分区器优点:
- 便于合理使用存储资源,把海量数据切割分布在多台服务器上
- 提高并行度
粘性分区:选择一个分区,并尽可能的一直使用该分区,直到该分区的batch已满,kafka在随机选取一个和上次不同的分区进行使用。
自定义分区器
public class MyPartition implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
String s = value.toString();
int partition;
if (s.contains("学习")) {
partition = 0;
} else {
partition = 1;
}
return partition;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
注册分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartition.class.getName());
1
# 提高发送效率
提高发送效率
- buffer.memory: 发送缓冲区大小
- batch.size: 数据量达到后才会发送数据,默认16k;推荐32k
- linger.ms: 等待指定量后发送数据,默认0; 推荐 0-100ms
- compression.type: snappy
// 配置kafka发送缓冲区大小为32m
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
// 配置kafka发送批次大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// 间隔多少秒传输一次
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// 压缩类型
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
# 生产者数据可靠性
// 确认类型配置
properties.put(ProducerConfig.ACKS_CONFIG, "1");
// 重试次数配置
properties.put(ProducerConfig.RETRIES_CONFIG, 3);
1
2
3
4
5
2
3
4
5
Leader维护了一个动态的ISR集合,即和Leader保持同步的Follower集合,如果Follower长时间未发送ack给leader,则将follower提出去。该时间长度默认30s,通过replica.lag.time.max.ms
参数配置
正真的可靠性保证: ACK设置-1,分区副本大于2,并且ISR的最小副本数量大于等于2
- 至少一次:ACK设置-1,分区副本大于2,并且ISR的最小副本数量大于等于2
- 最多一次:ACK设置为0
- 精确一次:幂等性 + (ACK设置-1,分区副本大于2,并且ISR的最小副本数量大于等于2)
重复数据判断标准:具有和<PID, Partition, SeqNumber>相同的消息,提交时都只会持久化一次;PID表示生产者每次重启都会唯一分配一个;Partition表示分区号;Sequence Number是单调递增的; 所以幂等性只能保证单分区单会话内不重复。
// 开启幂等性, 默认开启的
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
1
2
2