如果消费者系统的数据库宕机,会怎么样?假设我们的MQ使用都没有问题,但是如果消费者系统的数据库挂了呢?因为我们一直都是假设了一个场景,就是生产者在处理完自己的逻辑之后会推消息到MQ,然后下游消费者系统从MQ里获取消息去执行后续的处理。
那么如果这个时候,消费者系统的数据库宕机了,同样会使消费者从MQ里获取到消息之后,消费线程就会挂掉,没办法继续进行处理。
所以针对这样的场景,消费者系统要怎么处理?应该如何重试?
数据库宕机的时候,你还可以返回CONSUME_SUCCESS吗?在下面代码片段中,我们注册了一个监听器回调函数,当consumer获取到消息之后,就会调用这个函数进行处理
consumer.registerMessageListener( new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List msgs, ConsumeConcurrentlyContext context) { // 在这里对获取到的msgs订单消息进行处理 // 比如增加积分、发送优惠券、通知发货,等等 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });我们可以在这个回调函数中对消息进行处理,处理完之后,就可以告诉RocketMQ Consumer这批消息的处理结果。
比如,如果返回的是CONSUME_SUCCESS,那么Consumer就知道这批消息处理完成了,就会提交这批消息的offset到broker上去,然后下次就会继续从broker上获取下一批消息来处理。
但是如果此时我们在上面的回调函数中,对一批消息进行处理的时候,因为数据库宕机了,导致处理逻辑无法完成,此时我们还能返回CONSUME_SUCCESS吗?如果你返回的话,下次就会处理下一批消息,但是这批消息其实没有处理成功,此时必然就导致这批消息丢失了。
如果对消息的处理有异常,返回RECONSUME_LATER状态如果因为数据库宕机,导致对这批消息处理是异常的,就应该返回一个RECONSUME_LATER状态。
告诉RocketMQ这批消息处理有异常,过段时间再次给我这批消息让我重新试一下。
所以我们的代码应该改成下面这样:
consumer.registerMessageListener( new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List msgs, ConsumeConcurrentlyContext context) { try { // 在这里对获取到的msgs订单消息进行处理 // 比如增加积分、发送优惠券、通知发货,等等 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { // 如果因为数据库宕机等问题,对消息处理失败了 // 此时返回一个稍后重试的状态 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } });RocketMQ是如何让你进行消费重试的那么RocketMQ在收到你返回的RECONSUME_LATER状态之后,是如何让你进行消费重试的呢?
RocketMQ有一个针对这个ConsumerGroup的重试队列,如果返回了RECONSUME_LATER状态,他会把你这批消息放到这个消费组的重试队列中去。
比如你的消费者组的名称是"VoucherConsumerGroup",那么他会有一个“%RETRY%VoucherConsumerGroup”这个名字的重试队列。
重试队列中的消息会按照配置的时间再次给消费者,让消费者进行处理,如果再次失败,那么会再过一段时间让消费者进行处理,默认最多是重试16次,每次重试之间的间隔时间是可以配置的:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h如果连续重试16次还是无法处理消息,然后怎么办?那么如果在16次重试范围内消息处理成功了,自然就没问题了,但是如果你对一批消息重试了16次还是无法处理成功呢?这个时候会把消息放到死信队列中。
其实就是一批消息交给消费者去处理,消费者重试了16次还一直没有处理成功,就不要继续重试这批消息了,就可以认为他们死掉了就可以了,然后这批消息会自动进入死信队列。
死信队列的名字是"%DLQ%VoucherConsumerGroup"