RabbitMQ 延迟队列,消息延迟推送

脱发程序员 2019-06-03 17:01:30 ⋅ 863 阅读

https://www.cnblogs.com/haixiang/p/10966985.html

目录

  • 应用场景

  • 消息延迟推送的实现

  • 测试结果


应用场景

目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如:

  • 淘宝七天自动确认收货。在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付系统将款打给商家,这个过程持续七天,就是使用了消息中间件的延迟推送功能。

  • 12306 购票支付确认页面。我们在选好票点击确定跳转的页面中往往都会有倒计时,代表着 30 分钟内订单不确认的话将会自动取消订单。其实在下订单那一刻开始购票业务系统就会发送一个延时消息给订单系统,延时30分钟,告诉订单系统订单未完成,如果我们在30分钟内完成了订单,则可以通过逻辑代码判断来忽略掉收到的消息。

在上面两种场景中,如果我们使用下面两种传统解决方案无疑大大降低了系统的整体性能和吞吐量:

  • 使用 redis 给订单设置过期时间,最后通过判断 redis 中是否还有该订单来决定订单是否已经完成。这种解决方案相较于消息的延迟推送性能较低,因为我们知道 redis 都是存储于内存中,我们遇到恶意下单或者刷单的将会给内存带来巨大压力。

  • 使用传统的数据库轮询来判断数据库表中订单的状态,这无疑增加了IO次数,性能极低。

  • 使用 jvm 原生的 DelayQueue ,也是大量占用内存,而且没有持久化策略,系统宕机或者重启都会丢失订单信息。

消息延迟推送的实现

在 RabbitMQ 3.6.x 之前我们一般采用死信队列+TTL过期时间来实现延迟队列,我们这里不做过多介绍,可以参考之前文章来了解:TTL、死信队列

在 RabbitMQ 3.6.x 开始,RabbitMQ 官方提供了延迟队列的插件,可以下载放置到 RabbitMQ 根目录下的 plugins 下。延迟队列插件下载

首先我们创建交换机和消息队列,application.properties 中配置与上一篇文章相同。

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class MQConfig {

public static final String LAZY_EXCHANGE = "Ex.LazyExchange";
public static final String LAZY_QUEUE = "MQ.LazyQueue";
public static final String LAZY_KEY = "lazy.#";

@Bean
public TopicExchange lazyExchange(){
//Map<String, Object> pros = new HashMap<>();
//设置交换机支持延迟消息推送
//pros.put("x-delayed-message", "topic");
TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);
exchange.setDelayed(true);
return exchange;
}

@Bean
public Queue lazyQueue(){
return new Queue(LAZY_QUEUE, true);
}

@Bean
public Binding lazyBinding(){
return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(LAZY_KEY);
}
}

我们在 Exchange 的声明中可以设置exchange.setDelayed(true)来开启延迟队列,也可以设置为以下内容传入交换机声明的方法中,因为第一种方式的底层就是通过这种方式来实现的。

        //Map<String, Object> pros = new HashMap<>();
//设置交换机支持延迟消息推送
//pros.put("x-delayed-message", "topic");
TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);

发送消息时我们需要指定延迟推送的时间,我们这里在发送消息的方法中传入参数 new MessagePostProcessor() 是为了获得 Message对象,因为需要借助 Message对象的api 来设置延迟时间。

import com.anqi.mq.config.MQConfig;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
public class MQSender {

@Autowired
private RabbitTemplate rabbitTemplate;

//confirmCallback returnCallback 代码省略,请参照上一篇

public void sendLazy(Object message){
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
//id + 时间戳 全局唯一
CorrelationData correlationData = new CorrelationData("12345678909"+new Date());

//发送消息时指定 header 延迟时间
rabbitTemplate.convertAndSend(MQConfig.LAZY_EXCHANGE, "lazy.boot", message,
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//设置消息持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
//message.getMessageProperties().setHeader("x-delay", "6000");
message.getMessageProperties().setDelay(6000);
return message;
}
}, correlationData);
}
}

我们可以观察 setDelay(Integer i)底层代码,也是在 header 中设置 s-delay。等同于我们手动设置 header

message.getMessageProperties().setHeader("x-delay", "6000");

/**
* Set the x-delay header.
* @param delay the delay.
* @since 1.6
*/

public void setDelay(Integer delay) {
if (delay == null || delay < 0) {
this.headers.remove(X_DELAY);
}
else {
this.headers.put(X_DELAY, delay);
}
}

消费端进行消费

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

@Component
public class MQReceiver {

@RabbitListener(queues = "MQ.LazyQueue")
@RabbitHandler
public void onLazyMessage(Message msg, Channel channel) throws IOException{
long deliveryTag = msg.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, true);
System.out.println("lazy receive " + new String(msg.getBody()));

}

测试结果

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest
@RunWith(SpringRunner.class)
public class MQSenderTest {

@Autowired
private MQSender mqSender;

@Test
public void sendLazy() throws Exception {
String msg = "hello spring boot";

mqSender.sendLazy(msg + ":");
}
}

果然在 6 秒后收到了消息 lazy receive hello spring boot:

---------------END----------------

后续的内容同样精彩

长按关注“IT实战联盟”哦




全部评论: 0

    我有话说:

    Node&RabbitMQ系列二 延迟|死信队列

      前提 目前项目中采用ts+eggjs结合的方式,针对定时任务,采用schedule,随着业务的增多,觉得缺点啥,可能就是缺消息队列吧。上一篇文章,针对rabbitmq的基本语法进行了

    Node&RabbitMQ系列三 重连

      前提 前两篇对rabbitmq的基本概念与延迟队列、死信队列进行了代码测试,默认都是理想情况的正常操作,针对复杂多变的网络环境,先不说投递的可靠性,首先服务的可用性就是第一个拦路虎

    3分钟学会 React-Native 消息【附源码】

    作为一个独立的APP应用怎么能没有消息呢?

    老板说:明天来加班写个FCM消息功能......

    基于Spring Boot集成Firebase实现FCM消息功能

    Node&RabbitMQ系列六 保证消费

        上篇文章主要以生产者角度:确保消息发出去了,这篇文章主要以消费者的角度:确保处理了对应的消息 Quonum Queue不适用场景适用场景代码实现RePublish

    Node&RabbitMQ系列四 X场景解构

    目标 先设想购票的场景:用户注册成功后,三天内未登陆,则短信提醒。 三段式场景: A-B-C可以延伸至: 发起购票—>时间+未付款—>短信提醒⏰、取消订单 发起退款—>时间+未

    Node rabbitmq 入门就够了

      消息中间件 消息队列中间件(Message Queue Middleware, 简称为 MQ)是指利用高效可靠的消息传递机制进行与平台无关的数据交流, 并基于数据通信来进行分布式系统的

    HTTP/2.0 服务器实现

    HTTP/2允许服务器在请求之前先响应信息到客户端(之前客户端有过请求),如果实现了HTTP缓存,的响应信息可以在客户端被缓存(可通过no-cache进行配置)。

    Springboot项目redisTemplate实现轻量级消息队列

    redisTemplate实现轻量级消息队列,代码奉上

    架构实战篇:使用MyBatis延迟加载模式为数据库减压,附演示实例

    MyBatis中的延迟加载,也称为懒加载,是指在进行关联查询时,按照设置延迟规则推迟对关联对象的select查询。延迟加载可以有效的减少数据库压力......

    Apache BookKeeper 4.13.0 发布,可扩展、容错、低延迟的存储服务

    Apache BookKeeper 是一个可扩展、容错、低延迟的存储服务,针对实时工作负载进行了优化。它已被用作构建可靠服务的基础服务。它也是 Apache DistributedLog 的日志段

    消息队列常见问题(二):消息队列产生大量的消息堆积怎么解决?

    上一节列举了生产上消息队列产生大量的消息堆积会有哪些后果,那相对应的解决方法有哪些呢?1、消息被丢弃情况如果要实现防止消息过期问题,最好不要设置过期时间!那设置了过期时间导致消息丢失怎么补救呢?答案

    消息队列常见问题(一):生产上消息队列产生大量的消息堆积会有什么后果?

    大多数消息堆积原因是Consumer出现了问题,并且没有被运维/开发监控到即使修复问题,导致大量的消息都积压在 MQ 中,那么会造成哪些后果呢?1、消息被丢弃例如 RabbitMQ 中的一条消息设置

    Node&RabbitMQ系列五 保证投递

      网上又很多确保消息100%投递相关的文章,但总的来说还是分为几种情况的 发送端如何确保发送到了交换机 消费端针对不同情况处理消息–后续再说 本文围绕发布者相关内容,目标

    腾讯自研高吞吐消息队列组件TubeMQ升级为 TubeHub

    TubeMQ简介 TubeMQ 项目始于 2013 年,是腾讯自研的高吞吐消息队列组件。项目团队于 2019 年将 TubeMQ 捐赠给 Apache 基金会,成为腾讯首个被 Apache 基金会

    转载:Kafka高可用,高吞吐量低延迟的高并发的特性背后实现机制

    1 概述 Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式消息系统,Linkedin于2010年贡献给了Apache基金会并

    RabbitMQ 3.8.10 发布

    RabbitMQ 3.8.10 已发布,RabbitMQ 是一个 Advanced Message Queuing Protocol(AMQP)的开源实现,由以高性能、健壮以及可伸缩性出名的