「中间件」RocketMQ解决消息顺序和重复性消费问题整理

Java范思哲 2019-07-09 08:44:26 ⋅ 154 阅读

前言

现在越来越多的产品采用的是分布式架构,部署的时候也同样是分布式部署,那么各个应用间的异步通信大多选择消息中间件MQ来处理,那么就回避不了两个问题:

1. 发送消息的顺序性
2. 消息被重复消费

目前在生产环境,使用较多的消息队列中间件有ActiveMQ,RabbitMQ,Kafka,RocketMQ等,本文的设计是以RocketMQ为例来解决这两个问题。

一、发送消息的顺序性

1、 什么是顺序消息?

顺序消息即有序消息,发送者(Producer)按照顺序发送消息,消费者(Consumer)按照消息的发送顺序进行消费。例如:我们在某宝购买一款笔记本电脑,需要下单、支付和订单完成这3个流程,相对应的产生3条消息,分别是创建订单——订单支付——订单完成,为了保证业务的完整性肯定需要按照这个顺序依次消费才能达到预期目的。

2、 第一种模型

但是在生产环境MQ肯定是集群部署,例如多Master模式、多Master多Slave模式(异步复制)、多Master多Slave模式(同步双写)等模式。为了保证消息的顺序模型可能是这样的:


第一种模型

M1:创建订单、M2:订单支付、M3:订单完成
由于MQ Service是集群部署,假设M1发送到MQ Service1,M2发送到MQ Service2,依次类推。如果要保证M1最先被消费,那么需要M1到达消费端被消费后,通知MQ Service2,然后MQ Service2再将M2发送到消费端,M2被消费后,再通知MQ Service3,将M3发送到消费端。

问题:三条消息分别发送到三台或者其中两台Service上,就很难保证M1第一个到达MQ集群,也不能保证最先被消费。加入M2、M3其中任意一个优先于M1到达MQ集群,并且优先于M1被消费,那么就没有顺序可言了。综合分析这个架构模式并不能保证消息被MQ顺序消费。

3、 第二种模型

基于第一种模型分析来看要想保证M1、M2、M3能够顺序消费,首先要保证能够顺序发送到同一个MQ Service中,改进后模型如下:


第二种模型

如上图所示,将三条消息按照顺序发送到同一个MQ Service中,基于先到先被消费原则,依次消费的顺序为M1 > M2 > M3,这样就保证了消息的顺序性。
如果使用这种设计在正常情况下是没问题的,但是在实际场景中很可能会遇到下面的问题:


网络延迟问题

生产者、MQ集群和消费者不可能发布在同一台服务器中,那么消息在传输过程中就会遇到网络延迟问题。如上图所示,M1和M2在发送给“消费者1”的过程中遇到了延迟问题,M3先于M1和M2被消费,那问题又回到了原点,这种方案依然不能解决消息被顺序消费的问题。

4、 第三种模型

第二种模型宣告失败,接着分析,我们保证了生产者将3个消息按照顺序发送给同一个 MQ Service这个逻辑是没问题的,那么为了解决上方网络延迟问题,那我们就把3个消息发送给同一个“消费者”被消费呢?即使遇到网络问题或者消费者响应问题,M1被消费失败,为了保证消息一定会被消费,肯定会选择重发消息到另一个“消费者”端,如下图所示:


第三种模型

如上图所示,将3条消息发送给“消费者1”,M1被消费的时候遇到问题,没有被消费成功,那么会将消息发送给“消费者2”进行重试,这样就保证了消息的顺序性。
但是可能会遇到另一个问题,“消费者1”没有响应有两种情况,一种是M1在网络传输过程中丢失,另一种是“消费者1”已经消费成功了但是返回的响应信息没有被MQ Service收到。如果是第二种情况重发M1给“消费者2”就会造成M1被重复消费,也就引发了文章开头的第二个消息重复消费问题。
我们总结一下要保证消息严格的按照顺序消费,最可行的办法就是:

保证生产者 —— MQService —— 消费者 是一对一对一的关系

5、 MessageQueueSelector实现顺序发送和消费

上述办法虽然可行性最高,但是也存在更加严重的问题,例如:

1. 并行度就会成为消息系统的瓶颈(吞吐量不够)
2. 更多的异常处理,比如:只要消费端出现问题,就会导致整个处理流程阻塞,我们不得不花费更多的精力来解决阻塞的问题。

看到这里挠挠头又掉了一大把头发,过度设计将会造成效率低下,甚至浪费更多的资源。换种思路,从业务角度来看,保证消息的顺序性不仅仅是依靠消息系统,那就寻找更加合理的方式来解决。
RocketMQ本身具有发送顺序消息功能,那么通过源码角度来分析一下:

// RocketMQ通过MessageQueueSelector中实现的算法来确定消息发送到哪一个队列上
// RocketMQ默认提供了两种MessageQueueSelector实现:随机/Hash
// 当然你可以根据业务实现自己的MessageQueueSelector来决定消息按照何种策略发送到消息队列中
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);

备注:send方法带有参数MessageQueueSelector,MessageQueueSelector是让用户自己决定消息发送到哪一个队列,如果是局部消息的话,用来决定消息与队列的对应关系。

6、 源码示例

接下来我们就用代码模拟一下MessageQueueSelector如何使用。
1、 创建一个生产者(Producer)

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.List;
public class Producer {
public static void main(String[] args) throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("quickstart_producer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for(int i = 0;i < 6 ;i++){
int orderId=(int)((Math.random()*9+1)*10000000);
for(int j = 0;j < 3 ;j++){
Message msg = new Message("AAA","TagA",("推送的订单ID为="+orderId).getBytes());
try {
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
},orderId);
System.out.println(sendResult);
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}

上述生产者代码中我们看到在发送消息的时候使用了两个for循环来模拟场景,第一个for循环是产生6个订单,按照MQ的负载策略6个订单将分别发送到不同的消费者端。第二个for循环是每个订单里面产生3条有序的订单消息(M1、M2、M3),订单id是随机生成不重复的9位数字(生产场景使用不同的规则)。

2、 创建两个消费者(Consumer)
消费者1:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.io.UnsupportedEncodingException;
import java.util.List;
public class Consumer1 {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quickstart_consumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("AAA","TagA");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : list){
try {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(),"utf-8");
String tags = msg.getTags();
System.out.println("接收到消息1:topic:"+topic+",tags:"+tags+",msg:"+msgBody);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer1 Started.");
}
}

消费者2:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.io.UnsupportedEncodingException;
import java.util.List;
public class Consumer1 {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quickstart_consumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("AAA","TagA");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : list){
try {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(),"utf-8");
String tags = msg.getTags();
System.out.println("接收到消息2:topic:"+topic+",tags:"+tags+",msg:"+msgBody);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer1 Started.");
}
}

3、 运行测试
先启动两个消费者,启动成功后再启动生产者,运行结果如下:
生产者执行信息

SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B0061910000, offsetMsgId=3452373400002A9F000000000006325B, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=3], queueOffset=397]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B00621C0001, offsetMsgId=3452373400002A9F0000000000063315, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=3], queueOffset=398]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B0062610002, offsetMsgId=3452373400002A9F00000000000633CF, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=3], queueOffset=399]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B0062A00003, offsetMsgId=3452373400002A9F0000000000063489, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=0], queueOffset=919]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B0062DF0004, offsetMsgId=3452373400002A9F0000000000063543, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=0], queueOffset=920]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B00631F0005, offsetMsgId=3452373400002A9F00000000000635FD, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=0], queueOffset=921]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B00635F0006, offsetMsgId=3452373400002A9F00000000000636B7, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=1], queueOffset=348]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B00639E0007, offsetMsgId=3452373400002A9F0000000000063771, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=1], queueOffset=349]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B0063DE0008, offsetMsgId=3452373400002A9F000000000006382B, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=1], queueOffset=350]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B0064210009, offsetMsgId=3452373400002A9F00000000000638E5, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=2], queueOffset=327]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B006462000A, offsetMsgId=3452373400002A9F000000000006399F, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=2], queueOffset=328]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B0064A2000B, offsetMsgId=3452373400002A9F0000000000063A59, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=2], queueOffset=329]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B0064E5000C, offsetMsgId=3452373400002A9F0000000000063B13, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=2], queueOffset=330]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B006525000D, offsetMsgId=3452373400002A9F0000000000063BCD, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=2], queueOffset=331]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B006565000E, offsetMsgId=3452373400002A9F0000000000063C87, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=2], queueOffset=332]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B0065A7000F, offsetMsgId=3452373400002A9F0000000000063D41, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=0], queueOffset=922]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B0065E60010, offsetMsgId=3452373400002A9F0000000000063DFB, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=0], queueOffset=923]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B0066250011, offsetMsgId=3452373400002A9F0000000000063EB5, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=0], queueOffset=924]

一共产生了6组数据,每组3条消息,共发送了18条消息。

消费者1执行信息:

接收到消息1:topic:AAA,tags:TagA,msg:推送的订单ID为=890566068
接收到消息1:topic:AAA,tags:TagA,msg:推送的订单ID为=890566068
接收到消息1:topic:AAA,tags:TagA,msg:推送的订单ID为=890566068
接收到消息1:topic:AAA,tags:TagA,msg:推送的订单ID为=944070249
接收到消息1:topic:AAA,tags:TagA,msg:推送的订单ID为=944070249
接收到消息1:topic:AAA,tags:TagA,msg:推送的订单ID为=944070249
接收到消息1:topic:AAA,tags:TagA,msg:推送的订单ID为=252781420
接收到消息1:topic:AAA,tags:TagA,msg:推送的订单ID为=252781420
接收到消息1:topic:AAA,tags:TagA,msg:推送的订单ID为=252781420

消费者1接收到了3组消息,共9条信息。

消费者2执行信息:

接收到消息2:topic:AAA,tags:TagA,msg:推送的订单ID为=171730631
接收到消息2:topic:AAA,tags:TagA,msg:推送的订单ID为=171730631
接收到消息2:topic:AAA,tags:TagA,msg:推送的订单ID为=171730631
接收到消息2:topic:AAA,tags:TagA,msg:推送的订单ID为=202312726
接收到消息2:topic:AAA,tags:TagA,msg:推送的订单ID为=202312726
接收到消息2:topic:AAA,tags:TagA,msg:推送的订单ID为=202312726
接收到消息2:topic:AAA,tags:TagA,msg:推送的订单ID为=999804694
接收到消息2:topic:AAA,tags:TagA,msg:推送的订单ID为=999804694
接收到消息2:topic:AAA,tags:TagA,msg:推送的订单ID为=999804694

消费者1接收到了3组消息,共9条信息。

备注:通过运行结果可以看出生产者发送的消息会根据MessageQueueSelector实现的算法来选择一个队列,那么相同的策orderId消息自然是发送到了同一个队列,那么在消费的时候也会被一起消费。

二、消息被重复消费

在上面设计订单顺序消费的时候有抛出一个新的问题,就是消息重复。但是熟读RocketMQ API的朋友应该都知道,RocketMQ是不提供重复消费问题的解决方案。那么先来了解一下什么是重复性消费:

1、 什么是消息重复消费?

在网络不可达的情况下,只要通过网络今夕数据交换,就不可避免的产生同一条消息被不同两个或两个以上的消费者消费。

2、 解决方案

如果消费端收到两条一样的信息,怎么处理呢?

  1. 消费端处理消息的业务逻辑保持幂等性

  2. 保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现

第1条的原理是,只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。第2条原理就是利用一张日志表来记录已经处理成功的msgId,如果新到的msgId已经在日志表中,那么就不再处理这条消息。
第1条解决方案,很明显应该在消费端实现,不属于消息系统要实现的功能。第2条可以消息系统实现,也可以业务端实现。正常情况下出现重复消息的概率其实很小,如果由消息系统来实现的话,肯定会对消息系统的吞吐量和高可用有影响,所以最好还是由业务端自己处理消息重复的问题,这也是RocketMQ不解决消息重复的问题的原因。

三、总结

要实现顺序消费根据文中的方案就可以解决掉这个问题,但是在实际应用场景中肯定不会这么简单。跟其他朋友在聊这个话题的时候,他们还采用了其他的解决方案,比如有先后业务逻辑耦合的消息不通过MQ实现,通过业务代码实现,吞吐瓶颈可以通过多线程解决,当然体量过大还是需要MQ的。
由于RocketMQ并不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重,这样问题始终在可控范围内。



全部评论: 0

    我有话说:

    消息队列常见问题(二):消息队列产生大量的消息堆积怎么解决

    上一节列举了生产上消息队列产生大量的消息堆积会有哪些后果,那相对应的解决方法有哪些呢?1、消息被丢弃情况如果要实现防止消息过期问题,最好不要设置过期时间!那设置了过期时间导致消息丢失怎么补救呢?答案

    Kafka 慌了!这个中间,要火了?

    你知道吗?在消息中间的编年史上,RocketMQ可谓独当一面。作为Apache 顶级项目(TLP),Apache RocketMQ 是国内首个非 Hadoop 生态体系的顶级项目,开源至今被全球

    消息队列常见问题(一):生产上消息队列产生大量的消息堆积会有什么后果?

    大多数消息堆积原因是Consumer出现了问题,并且没有被运维/开发监控到即使修复问题,导致大量的消息都积压在 MQ 中,那么会造成哪些后果呢?1、消息被丢弃例如 RabbitMQ 中的一条消息设置

    架构实战篇(九):Spring Boot 集成 RocketMQ

    快速集成阿里开源消息队列 RocketMQ

    Node实战篇:Express 中间-body-parser(五)

    body-parser是什么?body-parser是一个HTTP请求体解析中间,使用这个模块可以解析JS

    Node实战篇:Express 中间 cookie-parser(六)

    cookieParser()实际上是对http传入的cookie进行解析后赋值给req.cookies,使得中间可用

    Node实战篇:Express中间与request(四)

    Express 是一个路由中间 Web 框架,其自身只具有最低程度的功能:Express 应用程序基本上是一系列中间函数调用。

    「转载」蘑菇街消息系统上云实践

    小编又来啦~本周要推荐给大家的是一篇跟中间上云相关的技术文章,这里面详细的记录了,蘑菇街自研消息系统上云的全过程,也是市面上开放出来为数不多的企业自研组件上云实践。有相关需求的同学可以好好学习下

    「轻阅读」消息总线(MQ)知多少

    消息总线(Message Queue,MQ),是一种跨进程的通信机制,用于在上下游之间传递消息。MQ是一种常见的上下游“逻辑解耦+物理解耦”的消息通信服务,消息发送上游只需要依赖MQ,逻辑上物理上

    Springboot项目redisTemplate实现轻量级消息队列

    redisTemplate实现轻量级消息队列,代码奉上

    Linux 下安装 Elasticsearch5.6.x 详细步骤以及踩坑解决方案

    网上有各种ES版本的安装步骤问题解决方案,但是在安装过程中还是遇到了许多问题,那么今天来整理一份详细的安装过程以及碰到的问题心得;有什么不对的问题希望大家留言一起讨论。

    「开源资讯」陌陌安全团队开源Java静态代码审计插

    陌陌安全本次开源的Java静态代码安全审计插,侧重于在编码过程中发现项目潜在的安全风险,并提供一键修复能力。 此插作为Java项目静态代码安全审计工具,侧重于在编码过程中发现项目潜在的安全风险

    Node&RabbitMQ系列六 保证消费

        上篇文章主要以生产者角度:确保消息发出去了,这篇文章主要以消费者的角度:确保处理了对应的消息 Quonum Queue不适用场景适用场景代码实现RePublish

    Openfire 4.6.2 发布,即时消息传输平台

    ) 服务器,它非常易于设置管理,提供良好的安全性...

    Node rabbitmq 入门就够了

      消息中间 消息队列中间(Message Queue Middleware, 简称为 MQ)是指利用高效可靠的消息传递机制进行与平台无关的数据交流, 并基于数据通信来进行分布式系统的

    腾讯自研高吞吐消息队列组件TubeMQ升级为 TubeHub

    TubeMQ简介 TubeMQ 项目始于 2013 年,是腾讯自研的高吞吐消息队列组。项目团队于 2019 年将 TubeMQ 捐赠给 Apache 基金会,成为腾讯首个被 Apache 基金会

    Spring Boot 解决跨域问题的 3 种方案!

    作者:telami telami.cn/2019/springboot-resolve-cors/ 前后端分离大势所趋,跨域问题更是老生常谈,随便用标题去google