RocketMQ-从RocketMQ底层原理分析消息为什么会重复消费

无天有壁纸 2024-05-05 22:38:20
系统发送消息到MQ的时候会重复吗?可能有的同学乍一看觉得应该不可能,但是其实在生产环境中运行的系统,显然是有可能把整个消息重复发两次的。 首先假设用户在支付成功之后,我们的订单系统收到了一个支付成功的通知,接着他就向MQ发送了一条订单支付成功的消息,这个大家都知道没有什么问题。但是偏偏可能因为不知道什么原因,你的订单系统处理的速度有点慢,然后可能因为你订单系统处理的速度慢了,这就导致支付系统跟你订单系统之间的请求出现了超时,此时有可能支付系统再次重试调用了你订单系统得到接口去通知你,这个订单支付成功了,然后你的订单系统这个时候可能又推送了一条消息到MQ中,相当于是一个订单支付成功的消息,你重复推了两次到MQ,此时相当于是MQ里就会对一个订单的支付成功消息,共有两条。那如果你订单系统对一个订单重复推送了两次支付成功的消息到MQ,MQ里对一个订单有两条重复的支付成功消息,优惠券系统必然会消费到一个订单的两条重复的支付成功消息,也必然会针对这个订单给用户重复的派发两个优惠券。接着我们来考虑第二种情况,假设支付系统没有对一个订单重复调用你的订单系统的接口,而是你订单系统自己可能就重复发送消息到MQ里去,那这是一个什么情况呢?假设我们的订单系统为了保证消息一定能投递到MQ里去,因此采用了重试的代码,这种重试的方式就可能导致消息重复发送。假设你发送了一条消息到MQ了,其实MQ已经接收到了这条消息了,结果MQ返回响应给你的时候,网络有问题超时了,就是你没能及时接收到MQ返回给你的响应。这个时候,代码里可能会发现一个网络超时的异常,然后就会进行重试再次发送这个消息到MQ里去,然后MQ必然会收到一条一模一样的消息,进而导致你的消息重复发送了。接着我们继续看,即使你没有重复发送消息到MQ,哪怕MQ里就一条消息,下游优惠券系统也可能会重复进行消费。假设你的优惠券系统拿到了一条订单支付成功的消息,然后都已经进行处理了,也就是说都已经对这个订单给你发了一张优惠券了,之前有介绍过,这个时候他应该返回一个CONSUME_SUCCESS的状态,然后提交消费进度offset到broker的。但是不巧的是,你刚刚发完优惠券,还没来得及提交offset到broker呢,优惠券系统就进行了一次重启,这时因为你没提交这条消息的offset给broker,broker并不知道你已经处理完这条消息了,然后优惠券系统重启之后,broker就会再次把这条消息交给你,让你再一次进行处理,然后你会再一次发送一张优惠券,导致重复发送了两次优惠券。对订单系统核心流程引入 幂等性机制,保证数据不会重复到底什么是幂等性机制 前面我们详细分析了MQ的消息出现重复的问题,接下来就要看看到底该如何去避免MQ中的消息进行重复处理。 要解决这个问题,需要先了解一下幂等性,所谓幂等性机制,就是用来避免对同一个请求或者同一条消息进行重复处理的机制,意思就是比如你的一个接口,如果别人对一次请求重试了多次来调用你的接口,你的接口必须保证自己系统的数据是正常的,不能多出来一些重复的数据,这就是幂等性的意思。 那么对于我们的MQ而言,就是你从MQ里获取消息的时候,要保证对同一个消息只能处理一次,不能重复处理多次,导致出现重复的数据。 因此要解决MQ的消息重复问题,关键就是要引入幂等性机制。 发送消息到MQ的时候如何保证幂等性 我们都知道,订单系统的接口可能会被重复调用导致发送重复的消息到MQ去,也可能自己有重试机制导致发送重复得到消息到MQ,那么如果想要订单系统别发送重复的消息到MQ去,应该怎么做呢? 大体上来说,实现的方案有两种: 第一个方案就是业务判断法,就是说你的大订单系统必须要知道自己是否发送过消息到MQ去,消息到底是否已经在MQ里了。我们举个例子,当支付系统重试调用你的订单系统的接口时,你需要发送一个请求到MQ去,查询一下当前MQ里是否存在针对这个订单的支付消息?如果MQ告诉你,针对id=1000这个订单的支付成功消息,在我这里已经有了,你之前已经写入进来了,那么订单系统就可以不要再次发送这条消息到MQ去了。这个业务判断法的核心就在于,你的消息肯定是存在于MQ里的,到底发没发送过,只有MQ知道,如果没发送过这条消息,MQ里肯定没有这个消息,如果发送过这个消息,MQ里肯定有这个消息。所以当你的订单系统的接口被重试调用的时候,你这个接口上来应该就发送请求到MQ里去查询一下,如果有的话,就不再重复发送消息了。第二种方法就是状态判断法,基于Redis缓存的幂等性机制来实现。这个方法的核心在于你需要引入Redis缓存来存储你是否发送过消息的状态,如果你成功发送了一个消息到MQ里去,你得在Redis缓存里写一条数据,标记这个消息已经发送过。当接口被重复调用的时候,只需要根据订单的id去缓存里查询一下,这个订单的支付消息是否已经发送给MQ了,如果发送过了,就不要再次发送了。两种幂等性机制都是很常用的,但是要注意一个事情,就是对于基于Redis的状态判断法,可能没有办法完全做到幂等性。 例如你的支付系统发送请求给订单系统,然后已经发送消息到MQ了,但是此时订单系统突然崩溃了,还没来得及把消息发送的状态写入Redis。这个时候如果你的订单系统在其他机器上部署了,或者重启了,那么这个时候订单系统被重试调用的时候,就会去redis里查询消息发送状态,会以为没有发送过,然后会再次发送重复消息到MQ里。 有没有必要在订单系统环节保证消息不重复发送? 如果要在订单系统环境保证消息不重复发送,有两种方案: 通过直接查询MQ来判断消息是否发过,通过引入Redis来保证消息发送状态其实这两种方案都不是太好。因为RocketMQ虽然是支持你查询某个消息是否存在的,但是性能不是太好,会影响你的接口的性能。 另外基于Redis的消息发送状态的方案,在极端情况下无法100%保证幂等性,所以也不是一个好的方案。 所以建议不用再发送消息的时候保证幂等性,也就是可以允许他会发送重复的消息到MQ里去。 消费者系统如何保证消息处理的幂等性? 其实可以直接基于业务信息来判断就可以了,因为消费者系统每次拿到一条消息后,会做一些列的业务逻辑,很大的可能是会对数据库进行一系列的操作,那么在数据库中肯定可以有能唯一标识的一些字段。只要去数据库中查询一下字段是否唯一就可以了。
0 阅读:3

无天有壁纸

简介:感谢大家的关注