
kafka消息堆积并不断重平衡导致重复消费
最近接手了一个积分服务,发现测试环境有一个提供给营销使用的topic,消费速度巨慢无比,5000条消费了半天,还有3000多条没有消费,并且kafka在不断重平衡。
查看代码,发现一个消息体中携带了50条数据,并且在一个for循环中一条一条处理。
消息体大概格式如下:
{
"field1": 1,
"dataList": [
{
"userId": 1
},
{
"userId": 2
}
// ...一共有50条
]
}
具体看了下这个topic的代码逻辑,收到消息时使用了一个for循环依次处理dataList中的数据,每次循环都会生成一条积分记录,查询数据库发现每分钟产生了接近3000条数据,而一个消息体中有50条数据,也就是每分钟实际消费了60条消息。
理论上按照这个速度50分钟能消费完,但是实际上过了半个小时仍然堆积了3000条消息没有消费。
查询数据库发现了不对劲的地方,这半个小时产生的积分记录竟然大部分都是重复的,我意识到了什么,赶紧检查下代码,果然代码中没有做幂等校验。
结合kafka一直在发生重平衡的情况,应该是消费出问题了。
回顾kafka重复消费的几种原因:
1、消费者宕机、重启等。导致消息已经消费但是没有提交offset。
2、消费者使用自动提交offset,但当还没有提交的时候,有新的消费者加入或者移除,发生了rebalance(重平衡)。再次消费的时候,消费者会根据提交的偏移量来,于是重复消费了数据。
3、消息处理耗时,或者消费者拉取的消息量太多,处理耗时,超过了max.poll.interval.ms的配置时间,导致认为当前消费者已经死掉,触发再均衡。
由于目前是发生大量的重复消费,所以大概率不会是前两种情况,于是我把目光聚集在了第三种情况。
检查kafka配置,max.poll.interval.ms和max-poll-records两个参数都没有显示配置,那么使用的是默认配置。
打开源码查看其默认值:
max-poll-records参数是每次拉取的最大数量,默认值是500,也就是一次最多拉取500条消息。
与之配套的超时参数是max.poll.interval.ms,默认值是300000ms,也就是5分钟。
max.poll.interval.ms参数的解释是:
The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member. For consumers using a non-null <code>group.instance.id</code> which reach this timeout, partitions will not be immediately reassigned. Instead, the consumer will stop sending heartbeats and partitions will be reassigned after expiration of <code>session.timeout.ms</code>. This mirrors the behavior of a static consumer which has shutdown.
在使用消费者组管理时,poll() 调用之间的最大延迟。这为消费者在获取更多记录之前可以空闲的时间设定了一个上限。如果在超时之前没有调用 poll(),则认为消费者失败,并且组将重新平衡以将分区重新分配给另一个成员。对于使用非空 <code>group.instance.id</code> 的消费者,如果达到此超时,分区不会立即重新分配。相反,消费者将停止发送心跳,分区将在 <code>session.timeout.ms</code> 超时后重新分配。这反映了静态消费者关闭的行为。
所以如果使用的是默认的max-poll-records和max.poll.interval.ms配置,那么心里一定要有一个底,处理500条数据是否会超过5分钟,平均算下来也就是单条数据处理时间不能超过600ms,否则会不断触发重平衡,并且不会更新offset。
结合这些信息以及之前查询到每分钟产生3000左右数据,也就是大概60条消息,一条消息大概需要执行1s,超过了600ms,那么就会触发重平衡,并且offset提交失败,重平衡后又重复消费相同的数据,代码中又没有幂等校验,导致一直在消费重复的消息。
这是一个非常严重的问题!
解决措施:
1、修改max-poll-records参数:一条消息大需要1s,那么5分钟最多消费300条数据,考虑到网络抖动留点缓冲时间,最终配置100;
2、增加幂等校验:即使不会因为处理500条数据超时而导致重复消费,但是也会有其他情况发生重复消费的情况,所以幂等校验是必不可少的。
- 感谢你赐予我前进的力量