Node&RabbitMQ系列二 延迟|死信队列

来都来了 2020-10-28 15:17:41 ⋅ 275 阅读

 

前提

目前项目中采用ts+eggjs结合的方式,针对定时任务,采用schedule,随着业务的增多,觉得缺点啥,可能就是缺消息队列吧。上一篇文章,针对rabbitmq的基本语法进行了学习。缺乏具体的使用场景,今天找到一个文章 Node.js结合RabbitMQ延迟队列实现定时任务
,基于这篇文章学习一下死信队列相关内容,逐步加深对mq的理解与认知。

可能很多摘录自上文,但代码是自己跑过的。

实际业务中对于定时任务的需求是不可避免的,例如,订单超时自动取消每天定时拉取数据等,在Node.js中系统层面提供了setTimeout、setInterval两个API或通过node-schedule这种第三方库来实现。
通过这种方式实现对于简单的定时任务是ok的,过于复杂的、可用性要求较高的系统就会存在以下缺点。

  1. 消耗系统内存,如果定时任务很多,长时间得不到释放,将会一直占用系统进程耗费内存。
  2. 单线程如何保障出现系统崩溃后之前的定时任务不受影响?多进程集群模式下一致性的保证?
  3. setTimeout、setInterval会存在时间误差,对于时间精度要求较高的是不行的。

RabbitMQ TTL+DLX 实现定时任务

RabbitMQ本身是不支持的,可以通过它提供的两个特性Time-To-Live and ExpirationDead Letter Exchanges来实现,通过以下泳道图可以看到一个消息从发布到消费的整个过程。

死信队列

死信队列全称 Dead-Letter-Exchange 简称 DLX 是 RabbitMQ 中交换器的一种类型,消息在一段时间之后没有被消费就会变成死信被重新 publish 到另一个 DLX 交换器队列中,因此称为死信队列。

死信队列产生几种情况

  1. 消息被拒绝
  2. 消息TTL过期
  3. 队列达到最大长度

设置DLX的两个参数:

  1. deadLetterExchange: 设置DLX,当正常队列的消息成为死信后会被路由到DLX中
  2. deadLetterRoutingKey: 设置DLX指定的路由键
注意:Dead-Letter-Exchange也是一种普通的Exchange

消息TTL

消息的TTL指的是消息的存活时间,RabbitMQ支持消息队列两种方式设置TTL,分别如下:

  1. 消息设置TTL:对消息的设置是在发送时进行TTL设置,通过 x-message-ttl 或 expiration 字段设置,单位为毫秒,代表消息的过期时间,每条消息的TTL可不同
  2. 队列设置TTL:对队列的设置是在消息入队列时计算,通过 x-expires 设置,队列中的所有消息都有相同的过期时间,当超过了队列的超时设置,消息会自动的清除。
注意:如果以上两种方式都做了设置,消息的TTL则以两者之中最小的那个为准。

问题汇总

缺失插件

$ ts-node producer.ts 

/Users/mw/Desktop/Node_Demo/rabbitmq/node_modules/amqplib/lib/connection.js:91
var e = new Error(emsg);
^
Error: Connection closed: 503 (COMMAND-INVALID) with message "COMMAND_INVALID - unknown exchange type 'x-delayed-message'"
 
# 1. 罗列插件列表
$ rabbitmq-plugins list

# 2. 去官网下载插件: rabbitmq_delayed_message_exchange
# https://www.rabbitmq.com/community-plugins.html

# 3. 找到插件目录
$ ps -ef | grep rabbitmq
501 723 1 0 Fri10AM ?? 0:00.03 /bin/sh /usr/local/opt/rabbitmq/sbin/rabbitmq-server
501 1125 723 0 Fri10AM ?? 64:35.98 /usr/local/Cellar/erlang/22.2/lib/erlang/erts-10.6/bin/beam.smp -W w -A 64 -MBas ageffcbf -MHas ageffcbf -MBlmbcs 512 -MHlmbcs 512 -MMmcs 30 -P 1048576 -t 5000000 -stbt db -zdbbl 128000 -K true -B i -- -root /usr/local/Cellar/erlang/22.2/lib/erlang -progname erl -- -home /Users/mw -- -pa /usr/local/Cellar/rabbitmq/3.8.2/ebin -noshell -noinput -s rabbit boot -sname rabbit@localhost -boot /usr/local/opt/erlang/lib/erlang/bin/start_clean -kernel inet_default_connect_options [{nodelay,true}] -rabbit tcp_listeners [{"127.0.0.1",5672}] -sasl errlog_type error -sasl sasl_error_logger false -rabbit lager_log_root "/usr/local/var/log/rabbitmq" -rabbit lager_default_file "/usr/local/var/log/rabbitmq/rabbit@localhost.log" -rabbit lager_upgrade_file "/usr/local/var/log/rabbitmq/rabbit@localhost_upgrade.log" -rabbit feature_flags_file "/usr/local/var/lib/rabbitmq/mnesia/rabbit@localhost-feature_flags" -rabbit enabled_plugins_file "/usr/local/etc/rabbitmq/enabled_plugins"
-rabbit plugins_dir "/usr/local/Cellar/rabbitmq/3.8.2/plugins" -rabbit plugins_expand_dir "/usr/local/var/lib/rabbitmq/mnesia/rabbit@localhost-plugins-expand" -os_mon start_cpu_sup false -os_mon start_disksup false -os_mon start_memsup false -mnesia dir "/usr/local/var/lib/rabbitmq/mnesia/rabbit@localhost" -ra data_dir "/usr/local/var/lib/rabbitmq/mnesia/rabbit@localhost/quorum" -kernel inet_dist_listen_min 25672 -kernel inet_dist_listen_max 25672 --

501 96364 96362 0 Fri03PM ?? 0:00.65 /usr/bin/ssh git@github.com git-upload-pack 'Quinton/egg-rabbitmq.git'
501 72100 11727 0 4:14PM ttys011 0:00.00 grep rabbitmq

# 4. 将下载好的插件,放入 -rabbit plugins_dir "/usr/local/Cellar/rabbitmq/3.8.2/plugins"
# 5. 开启插件
$ rabbitmq-plugins enable rabbitmq_delayed_message_exchange
 

代码–延迟队列

// producer.ts
import * as amqp from 'amqplib'

const url = `amqp://localhost:5672`
async function publish(msg: string, ttl: number) {
const exchange = 'my-delayed-exchange';
const exchangeType = 'x-delayed-message'; // x-delayed-message 交换机的类型
const routingKey = 'my-delayed-routingKey';

const connect = await amqp.connect(url);
const channel = await connect.createChannel();
await channel.assertExchange(exchange, exchangeType, { durable: true, arguments: {'x-delayed-type': 'direct' }})
console.log('发布消息', msg, ttl, routingKey);
channel.publish(exchange, routingKey, Buffer.from(msg), {
headers: {
'x-delay': ttl, // 一定要设置,否则无效
}
});
channel.close();
}

(async function test(){
await publish('msg0 1S Expire', 1000);
await publish('msg0 2S Expire', 2000);
await publish('msg0 3S Expire', 3000);
// 最后一个不会触发
process.exit(0);
})();
 
// consumer.ts
import * as amqp from 'amqplib'

const url = `amqp://localhost:5672`;

(async function publish() {
const exchange = 'my-delayed-exchange';
const exchangeType = 'x-delayed-message';
const routingKey = 'my-delayed-routingKey';
const queueName = 'my-delayed-queue';
try {
const connect = await amqp.connect(url);
const channel = await connect.createChannel();
await channel.assertExchange(exchange, exchangeType, { durable: true, arguments: { 'x-delayed-type': 'direct' } })
const queueA = await channel.assertQueue(queueName);
console.log(queueA);

await channel.bindQueue(queueA.queue, exchange, routingKey);
await channel.consume(queueA.queue, msg => {
console.log("接受到的消息", msg.content.toString());
}, { noAck: true });
} catch (error) {
console.log(error);
}
})();
 
mw$ ts-node producer.ts 
发布消息 msg0 1S Expire 1000 delayed-routingKey
发布消息 msg0 2S Expire 2000 delayed-routingKey
发布消息 msg0 3S Expire 3000 delayed-routingKey

$ ts-node consumer.ts
{ queue: 'delayed-queue', messageCount: 0, consumerCount: 0 }
接受到的消息 msg0 1S Expire
接受到的消息 msg0 2S Expire
 

代码–死信队列

流程

  1. 生产者X发消息,队列A消费
  2. 设置死信队列B;
  3. 启动 X、A、B
  4. X-A,形成生产-消费;X->A
  5. kill A, 生产消息,过期后进入死信队列,由B进行消费,形成 X-B
// producerDLX.ts
import * as amqp from 'amqplib'
/**
* 1. 正常发送消息到交换机
* 2. 如果超时,发送到死信队列
* 3. 对应 consumerDLX.ts 、consumerEX.ts
* 4. consumerEX.ts 消费生产的消息
* 5. consumerDLX.ts 消费 生产的消息 被会收到 死信队列的消息
*/

(async function publish() {

const url = `amqp://localhost:5672`;
// 默认交换机-队列-路由
const exchange = 'ex.exchange';
const routingKey = 'ex.routerkey';

const connect = await amqp.connect(url);
const channel = await connect.createChannel();
// 默认交换机
await channel.assertExchange(exchange, 'direct', { durable: false });
// 发布消息
channel.publish(exchange, routingKey, Buffer.from('hello world'), {
expiration: 3000
});
await sleep(1);
await connect.close();
process.exit(0);
})();


function sleep(time: number) {
return new Promise((resolve) => setTimeout(resolve, time*1000));
}
 

import * as amqp from 'amqplib'

/**
* 对应producerDLX.ts 发送的消息进行消费
*/

(async function consumer() {

// 交换机-路由
const url = `amqp://localhost:5672`;
// 死信交换机-路由
const deadLetterExchange = 'dlx.exchange';
const deadLetterRoutingKey = 'dlx.routingkey'
// 默认交换机-队列-路由
const exchange = 'ex.exchange';
const queueExName = 'ex.queue';
const routingKey = 'ex.routerkey';

const connect = await amqp.connect(url);
const channel = await connect.createChannel();
await channel.assertExchange(exchange, 'direct', { durable: false });
const queueEX = await channel.assertQueue(queueExName, {
exclusive: false,
deadLetterExchange,
deadLetterRoutingKey,
});
await channel.bindQueue(queueExName, exchange, routingKey);
await channel.consume(queueEX.queue, msg => {
console.log("消费队列", msg);
}, {noAck: true});
})();
 
import * as amqp from 'amqplib'
/**
* 针对死信队列,进行消费
* 对应producerDLX.ts
*/

(async function consumer() {

const url = `amqp://localhost:5672`;
// 死信交换机-路由
const deadLetterExchange = 'dlx.exchange';
const deadLetterRoutingKey = 'dlx.routingkey'
const deadLetterQueue = 'dlx.queue';

const connect = await amqp.connect(url);
const channel = await connect.createChannel();
// 默认交换机
await channel.assertExchange(deadLetterExchange, 'direct', { durable: false });
// 队列,超时发动到死信队列
const queueDLX = await channel.assertQueue(deadLetterQueue, {exclusive: false});
await channel.bindQueue(deadLetterQueue, deadLetterExchange, deadLetterRoutingKey);
await channel.consume(queueDLX.queue, msg => {
console.log("消费死信队列", msg);
}, {noAck: true});
})();
 

代码–小结

  1. 延迟发送需要下载插件
  2. x-delayed-type 支持 famout\direct\topic 原生交换机类型
  3. process.exit(0) 不会触发的原因,你觉得呢?
  4. routingKey等自定义字段,最好是常量。
  5. 看管理界面

生产者只管生产消息即可,至于如何消费,都交给消费端去处理。
具体的应用场景,下节再整。
把每个小块学会,再组合起来。

加油,打工人!!!

代码地址:https://github.com/simuty/Node_Demo

参考链接

Node.js结合RabbitMQ延迟队列实现定时任务


全部评论: 0

    我有话说:

    Node&RabbitMQ系列三 重连

      前提 前两篇对rabbitmq的基本概念与延迟队列死信队列进行了代码测试,默认都是理想情况的正常操作,针对复杂多变的网络环境,先不说投递的可靠性,首先服务的可用性就是第一个拦路虎

    Node&RabbitMQ系列四 X场景解构

    目标 先设想购票的场景:用户注册成功后,三天内未登陆,则短信提醒。 三段式场景: A-B-C可以延伸至: 发起购票—>时间+未付款—>短信提醒⏰、取消订单 发起退款—>时间+未

    Node&RabbitMQ系列五 保证投递

      网上又很多确保消息100%投递相关的文章,但总的来说还是分为几种情况的 发送端如何确保发送到了交换机 消费端针对不同情况处理消息–后续再说 本文围绕发布者相关内容,目标  发布者消息确认【100%】  ma...

    Node&RabbitMQ系列六 保证消费

        上篇文章主要以生产者角度:确保消息发出去了,这篇文章主要以消费者的角度:确保处理了对应的消息 Quonum Queue不适用场景适用场景代码实现RePublish 处理包含几层含义 成功ack; 失败nac...

    Node rabbitmq 入门就够了

      消息中间件 消息队列中间件(Message Queue Middleware, 简称为 MQ)是指利用高效可靠的消息传递机制进行与平台无关的数据交流, 并基于数据通信来进行分布式系统

    Node.js 实战篇--微信支付系列

    接上一篇首先我们看一下整体上微信小程序的开发流程图

    Node包管理NPM()

    NPM是什么? [NPM官网](https://docs.npmjs.com/)给出解释如下: ``` Use npm to install, share, and distribute code; manage dependencies i...

    Node实战篇:Express路由(三)

    Express 是一个基于 Node.js 平台的极简、灵活的 web 应用开发框架,它提供一系列强大的特性,帮助你创建各种 Web 和移动设备应用。

    Node模块之fs模块(六)

    屏幕快照 2017-08-08 上午10.53.21.png 第一部分 概述 Node.js 提供一组类似UNIX(POSIX)标准的文件操作API,Node.js中操作文件的模块是fs(File

    Node模块之Events模块(五)

    Node模块之Events模块(五)

    Nodejs视频服务器 切片ffmpeg

    Node 视频服务器 切片ffmpeg

    Node实战篇:Express--jade模板引擎(七)

    Jade(Pug) — Node Template Engine,一个高性能的模板引擎,专为 Node 而做......

    Kafka系列

    目的是:提供负载均衡,实现系统的高伸缩性【Sca...

    2018年8个技巧来构建更好的Node.js应用程序

    2018年8个技巧来构建更好的Node.js应用程序

    消息队列常见问题():消息队列产生大量的消息堆积怎么解决?

    上一节列举了生产上消息队列产生大量的消息堆积会有哪些后果,那相对应的解决方法有哪些呢?1、消息被丢弃情况如果要实现防止消息过期问题,最好不要设置过期时间!那设置了过期时间导致消息丢失怎么补救呢?答案

    Node 模块之 util URL queryString path(八)

    第一部分 util util是一个Node.js核心模块,util模块设计的主要目的是为了满足Node内部API的需求。其中包括:格式化字符串、对象的序列化、实现对象继承等常用方法。要使用util

    Node.js 实战篇--微信支付系列(一)

    真正的无知不是知识的贫乏, 而是拒绝获取知识!——波普尔 (哲学家 思想家)鉴于微信支付文档内容详实

    Node实战篇:Nodejs 链接 Mariadb 实例

    MariaDB数据库管理系统是MySQL的一个分支,主要由开源社区在维护,采用GPL授权许可 MariaDB的目的是完全兼容MySQL