Kafka-核心概念

7/11/2023 kafka

# 消息一致性解决方案

采用消息表的方式,我们执行事务操作同时将需要发送的消息写入消息表记录,并记录消息状态为初始态(0),事务执行完成后发送消息。

  • 消息发送成功(异步发送,确认收到ack确认)记录为已发送态(1)。
  • 启动定时任务轮询初始态(0)的消息,并尝试发送,如果发送成功,修改为已发送态(1)。
  • 依据业务需求,或多次尝试,如果消息仍然异常发不出去,可以修改消息状态为失败态(2)。

# 消息可靠性解决方案

  • 生产端:通过ack确认机制解决;异步处理生产端消息响应ack确保消息被真确发送。
  • 服务端:高可用和分区;解决消息刚到达broker数据还有没落盘由于单节点故障导致致使数据丢失。
  • 消费端:幂等操作和手动提交;即使有重复消息也能确保业务有且仅被执行一次。

# 消息不重复消费

  • 生产端:开启幂等性配置(enable.idempotence),默认为true,客户端启动将会分配一个随机的ProducerID,并且生产者对于每个分区都会维护一个SequenceNumber,(ProducerID, PartitionNumber, SequenceNumber)三者唯一确定消息唯一性。
  • 消费端:业务需要确保幂等性,即多次执行相同的业务操作只有第一次有影响。

# kafka 存储机制

  1. 一个topic对应多个partition (每个主题对应物理文件夹为 topic-0, topic-1, ...)
  2. 一个partition对应多个segment
  3. 一个segment对应4个文件(xxx.index, xxx.log, xxx.timeindex, xxx.snapshot)

# 基于消息中间件实现TCC分布式事务解决方案

对于每个三方接口调用时,需要提供三个接口:try执行接口(用于主节点调用),confrim执行接口以及cancel执行接口(可以主节点调用,也可以消息异步执行)。

TCC中准备阶段(T阶段)所执行的所有操作,均不会影响最终结果,意味着我们本地数据库事务修改的数据或者三方接口调用中事务修改的数据都会存储在一个临时的地方(比如存储在一个流水表中,如转账业务将转账金额存储到转账流水表)

由于T阶段所有操作都执行成功,进入确认阶段(第一个C阶段)。该阶段会真实的修改数据库数据,如果失败会进行反复重试,通常实现方案是业务发起方发送一个确认消息,所有三方系统都订阅该消息监听并执行。

由于T阶段部分操作执行失败,进入回滚阶段(第二个C阶段)。该阶段对有修改数据库临时数据的操作会进行回滚(或者依据有限状态机修改流水状态设置失败);对没有修改数据库临时数据的三方接口则不需进行理会。

# 执行流程:

正常执行本地事务,或者进行三方接口调用:

  • 如果所有操作都执行成功,进入confirm阶段;协调节点发送消息,所有关联的三方应用都订阅该消息,并且清空临时数据或者修改流水数据为确认态,真实修改数据。
  • 如果中间有部分操作执行失败,进行cancel阶段;对于已经执行的本地事务进行回滚,对应已经执行的三方调用进行发送cancel消息,将执行结果发送给三方应用进行回滚,未执行的三方应用则无需理会该消息

# 日志文件结构

index 文件采用稀疏矩阵进行存储, 结构大概类似如下

topic offset position
330 1225
380 1456
420 1928

第一行存储kafka 的消息偏移, 第二行存储 物理消息相对文件头偏移。

log 文件

message offset position
1 330 1225
2 380 1456
3 420 1928

参考文档

https://mp.weixin.qq.com/s?__biz=Mzg3MTcxMDgxNA==&mid=2247488841&idx=1&sn=2ea884012493403ab45b450271708fc8&source=41#wechat_redirect

# kafka实现延迟队列(供参考)

生产者发送消息到broker中,并附上需要延迟执行的时间, 消费者从broker中拉取消息,获取消息发送时间, 加上延迟执行时间,然后把消息放到延迟队列中执行。

# 消费端自动提交

kafka消费端在配置消息提交时,可以选择手动提交和自动提交,如果选择自动提交,将涉及到2个参数:

  • enable.auto.commit:true,标识开启自动提交
  • auto.commit.interval.ms: 自动提交offset的时间间隔毫秒数

当设置 enable.auto.commit 为 true,Kafka 会保证在开始调用 poll 方法时,并且距离上次提交offset的时间间隔大于auto.commit.interval.ms, 会提交上次 poll 返回的所有消息。从顺序上来说,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。

# 日志清除策略

在 kafka 中, 存在2种日志, 数据日志和操作日志。

数据日志: kafka的topic中存储的数据;配置项为 server.properties下的 log.dirs, 默认使用 配置项 log.dir

操作日志:kafka中记录操作的日志

# 数据日志清理

日志删除

按照指定的策略直接删除不符合条件的日志。

  1. 按时间删除
log.retention.ms
log.retention.minutes
log.retention.hours=168  #默认会保留7天
1
2
3
  1. 按size删除
log.retention.bytes=-1 # 无限制
1

Kafka的日志删除策略并不是非常严格的(比如如果log.retention.bytes设置了10G的话,并不是超过10G的部分就会立刻删除,只是被标记为待删除,Kafka会在恰当的时候再真正删除);