RocketMQ-Consumer消息零丢失方案

无天有壁纸 2024-05-04 23:01:38
通过前面的方案,我们可以保证消息一定会达到MQ中,也确保了MQ中的消息不会丢失,只要做到这一点,我们就可以保证下游消费者系统一定可以获取到消息,但是即使下游消费者获取了消息,这条消息数据就一定不会丢失吗? 答案是未必的,假设下游消费者系统已经获取了消息,但是消息目前还在他的内存里,还没有执行业务逻辑,此时他就直接提交了这条消息的offset到broker去说自己已经处理过了,然后这个时候下游消费者系统突然就宕机了,内存里的消息没有了,业务逻辑也没有执行,结果broker已经收到他提交的消息offset了,还以为他已经处理完这个消息了。等消费者系统重启后,就不会再次消费这个消息了,还是会出现数据丢失。 consumer.registerMessageListener( new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List msgs, ConsumeConcurrentlyContext context) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });上面代码中的MessageListenerConcurrently这个类是注册一个监听器,当获取到一批消息之后,就会回调你得到这个监听器函数,让你来处理这一批消息。 然后当你处理完毕之后,返回consumeConcurrentlyStatus.CONSUME_SUCCESS作为消费成功的示意,告诉RocketMQ,这批消息我已经处理完毕了,RocketMQ会提交这批消息的offset到broker去。 所以如果对一批消息处理完毕了,同时提交消息的offset给broker,即使消费者系统宕机了,此时是不会丢失消息的。 如果一批消息还没处理完,没返回consumeConcurrentlyStatus.CONSUME_SUCCESS这个状态呢,此时消费者宕机了,RocketMQ其实会感知到这个个consumer已经挂了,会把没处理完的消息交给其他机器去处理,所以在这种情况下,消息也绝对不会丢失的。 需要警惕的地方:不能异步消费消息 我们不能在代码中对消息进行异步处理,因为一旦开启了异步处理,消费者的Listener就会返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS状态。这就可能出现异步处理逻辑还没处理完消息,就已经提交这批消息的offset给broker了,认为已经处理结束了。这时如果异步处理逻辑报错,那消息同样会丢失了。
0 阅读:0

无天有壁纸

简介:感谢大家的关注