Node&RabbitMQ系列五 保证投递

来都来了 2020-12-18 16:20:02 ⋅ 109 阅读

 

网上又很多确保消息100%投递相关的文章,但总的来说还是分为几种情况的

  1. 发送端如何确保发送到了交换机
  2. 消费端针对不同情况处理消息–后续再说

本文围绕发布者相关内容,目标

  •  发布者消息确认【100%】
  •  mandatory

消息属性详解

{ fields:
{ consumerTag: 'amq.ctag-_zo-n-crGPvrunQmICBoqA',
deliveryTag: 1,
redelivered: false,
exchange: 'delayed-exchange',
routingKey: 'delayed-routingKey' },
properties:
{ contentType: undefined,
contentEncoding: undefined,
headers: { 'x-delay': 1000, 'x-death': [Array] },
deliveryMode: undefined,
priority: undefined,
correlationId: undefined,
replyTo: undefined,
expiration: undefined,
messageId: undefined,
timestamp: undefined,
type: undefined,
userId: undefined,
appId: undefined,
clusterId: undefined },
content:
<Buffer 7b 22 6d 73 67 22 3a 32 2c 22 74 74 6c 22 3a 31 30 30 30 7d>
}
 

字段 类型 说明
content-type 消息体类型HIMI类型 如:application/ json
content-encodeing 压缩消息 如:utf8
message-id 应用级别,跟踪消息的流动情况 如销售订单或支持类请; 允许255个字节的utf8
correlation-id 应用级别,跟踪消息的流动情况 如:传送关联消息的事务ID或其他类似数据;允许255个字节的utf8
timestamp 应用级别 如:诊断消息发生意外、性能、是否处理-丢弃、监控报警等
expiration 消息自动过期, 整数或整数时间戳的 字符串 注意
过期的消息会被直接丢弃
delivery-mode 是否持久化 1 表示非持久化消息, 2 表示持久化消息
app-id 收集统计数据,如校验版本、平台 追踪恶意消息
user-id 识别用户,根据业务判断是否使用  
type 消息类型名称 如:不采用json,采用protobuf,就可以不必自行设置type,在次设置传输即可
reply-to 使用 reply-to 可以构建一个用来回复消息的私有响应队列 “在请 求消息中使用私有响应队列时 , 它可以保存私有响应队 列的名称” 。 这个定义中 有太 多的不明确性,所以应该谨慎使用这个属性。
headers 自定义头 根据需要设置key-value即可
priority 优先级 0-9之间的整数

消息持久化 VS 队列持久化

  1. 消息的 delivery-mode 才 会向 RabbitMQ 指定消息是否应该被持久化
  2. 队列的持久性属性告诉 RabbitMQ 队列的定 义在 重新启 动 RabbitMQ 服务器或群集之后是否仍然有效
  3. 队列可能包含持久化未持久化的消息;

delivery-mode 指定为 1 将会尽可能降低消息投递的延迟性。侧重:速度,即便丢失不影响业务
delivery-mode 指定为 2 专注于保证消息的可靠投递而不是消息吞吐量。侧重:可靠,非速度

生产端

投递速度 VS 可靠性

说实在的,真的没有考虑这么多种情况。。。

主要元凶

  1. 网络
  2. 硬件
  3. 系统
  1. 消费发布时保证消息进入队列的重要性有多高?
  2. 如果消息无法路由,是否应将消息返回给发布者?
  3. 如果消息无法路由,是否应该将其发送到其他地方稍后进行重新路由
  4. 如果 RabbitMQ 服务器崩溃,可以接受信息丢失吗?
  5. RabbitMQ 在处理新消息时是否应该确认它已经为发布者执行了所有请求的路由和持久化任务?
  6. 消息发布者是否可以批量投递消息,然后从 RabbitMQ 收到一个确认用于表明所有请求的路由和持久化任务己经批量应用到所有的消息中?
  7. 如果你要批量发布消息,而这些消息需要确认路由和持久化,那么对每一条消息是否需要对目标队列实现真正意义上的原子提交
  8. 在可靠投递方面是否有可接受的平衡性,你的发布者可以使用它来实现更高的性能和消息吞吐量吗 ?
  9. 消息发布还有哪些方面会影响消息吞吐量和性能

确保投递成功–code


(async function testConfirmChannel() {
const url = `amqp://localhost`;
const connect = await amqp.connect(url)
let index = 0;
/**
*
* 1. 发布4条消息
* 2. 如果成功则打印成功的消息
* 3. 如果因为某种原因失败,则重试发布3次,最终记录失败消息
*
* 失败原因
* 1. 网络,connect.close、channel.close
* 2. 其他故障
*/
while (index < 4) {
index++;
await retryTest(index, 3);
}
/**
* 发送消息
* @param index 发送的消息
* @param retryTime 单个消息的 重试次数
*/
async function retryTest(index: number, retryTime: number) {
try {
console.log(`发送消息${index}`);
await publish(`发送消息${index}`, connect);
} catch (error) {
if (retryTime > 0) {
await sleep(3);
console.log(`${index}重试, 次数为${retryTime}`)
await retryTest(index, retryTime - 1)
} else {
// ! 如果单条消息重试后仍旧失败,则记录📝特殊处理
console.error(`🚩🚩🚩 ${index} 需要特殊处理`);
}
}
}
process.exit(0);
})();
 
/**
* 内容:发布【确认】
* 1. 问题:如何确保消息发送到了交换机?
*
* 2. createConfirmChannel
*
* @param msg 发布的消息
* @param connect rabbitmq connect
*/
async function publish(msg: string, connect: amqp.Connection) {
const exchange = '5.confirm.exchange';
const exchangeType = 'direct';
const routingKey = '5.confirm.routingKey';

// 接受确认的channel
const channel = await connect.createConfirmChannel();
await channel.assertExchange(exchange, exchangeType, { durable: false })
const content = JSON.stringify({ msg });
const random = Math.random() < 0.4;
console.log('随机出现❌', random);
if (random) {
// 为了演示发送不到交换机
channel.close()
// connect.close();
}
channel.publish(exchange, routingKey, Buffer.from(content), {}, (err, ok) => {
if (err !== null) {
console.log('发布消息-交换机-失败', err);
} else {
console.log('发布消息-交换机-确认', err, ok, content);
}
});
await channel.waitForConfirms()
await channel.close();
}
 
#结果 全部成功
➜ git:(main) ✗ ts-node send.ts
发送消息1
随机出现❌ false
发布消息-交换机-确认 null undefined {"msg":"发送消息1"}
发送消息2
随机出现❌ false
发布消息-交换机-确认 null undefined {"msg":"发送消息2"}
发送消息3
随机出现❌ false
发布消息-交换机-确认 null undefined {"msg":"发送消息3"}
发送消息4
随机出现❌ false
发布消息-交换机-确认 null undefined {"msg":"发送消息4"}
 
➜  6test git:(main) ✗ ts-node send.ts
发送消息1
随机出现❌ false
发布消息-交换机-确认 null undefined {"msg":"发送消息1"}
发送消息2
随机出现❌ true
发布消息-交换机-失败 Error: channel closed
at ConfirmChannel.<anonymous> (/Users/mw/Desktop/Github/Integration/Rabbitmq/node_modules/amqplib/lib/channel.js:39:18)
...
at process._tickCallback (internal/process/next_tick.js:63:19)
2重试, 次数为3
发送消息2
随机出现❌ true
发布消息-交换机-失败 Error: channel closed
at ConfirmChannel.<anonymous> (/Users/mw/Desktop/Github/Integration/Rabbitmq/node_modules/amqplib/lib/channel.js:39:18)
...
at process._tickCallback (internal/process/next_tick.js:63:19)
2重试, 次数为2
发送消息2
随机出现❌ true
发布消息-交换机-失败 Error: channel closed
at ConfirmChannel.<anonymous> (/Users/mw/Desktop/Github/Integration/Rabbitmq/node_modules/amqplib/lib/channel.js:39:18)
...
at process._tickCallback (internal/process/next_tick.js:63:19)
2重试, 次数为1
发送消息2
随机出现❌ false
发布消息-交换机-确认 null undefined {"msg":"发送消息2"}
发送消息3
随机出现❌ false
发布消息-交换机-确认 null undefined {"msg":"发送消息3"}
发送消息4
随机出现❌ false
发布消息-交换机-确认 null undefined {"msg":"发送消息4"}
 

mandatory

mandatory
当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返回给生产者(Basic.Return + Content-Header + Content-Body);当mandatory设置为false时,出现上述情形broker会直接将消息扔掉。

The immediate and mandatory fields are part of the AMQP specification, and are also covered in the RabbitMQ FAQ to clarify how its implementers interpreted their meaning:

Mandatory

This flag tells the server how to react if a message cannot be routed to a queue. Specifically, if mandatory is set and after running the bindings the message was placed on zero queues then the message is returned to the sender (with a basic.return). If mandatory had not been set under the same circumstances the server would silently drop the message.

Or in my words, "Put this message on at least one queue. If you can't, send it back to me."

immediate
当immediate标志位设置为true时,如果exchange在将消息路由到queue(s)时发现对于的queue上么有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或者多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。

概括来说,mandatory标志告诉服务器至少将该消息route到一个队列中,否则将消息返还给生产者;immediate标志告诉服务器如果该消息关联的queue上有消费者,则马上将消息投递给它,如果所有queue都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。


全部评论: 0

    我有话说:

    Node&RabbitMQ系列三 重连

      前提 前两篇对rabbitmq的基本概念与延迟队列、死信队列进行了代码测试,默认都是理想情况的正常操作,针对复杂多变的网络环境,先不说投递的可靠性,首先服务的可用性就是第一个拦路虎

    Node&RabbitMQ系列保证消费

        上篇文章主要以生产者角度:确保消息发出去了,这篇文章主要以消费者的角度:确保处理了对应的消息 Quonum Queue不适用场景适用场景代码实现RePublish 处理包含几层含义 成功ack; 失败nac...

    Node&RabbitMQ系列二 延迟|死信队列

      前提 目前项目中采用ts+eggjs结合的方式,针对定时任务,采用schedule,随着业务的增多,觉得缺点啥,可能就是缺消息队列吧。上一篇文章,针对rabbitmq的基本语法进行了

    Node模块之Events模块()

    Node模块之Events模块()

    Node&RabbitMQ系列四 X场景解构

    目标 先设想购票的场景:用户注册成功后,三天内未登陆,则短信提醒。 三段式场景: A-B-C可以延伸至: 发起购票—>时间+未付款—>短信提醒⏰、取消订单 发起退款—>时间+未处理—>通知相关人员⏰ 总的来说,满足A...

    Node rabbitmq 入门就够了

      消息中间件 消息队列中间件(Message Queue Middleware, 简称为 MQ)是指利用高效可靠的消息传递机制进行与平台无关的数据交流, 并基于数据通信来进行分布式系统

    Node实战篇:Express路由(三)

    Express 是一个基于 Node.js 平台的极简、灵活的 web 应用开发框架,它提供一系列强大的特性,帮助你创建各种 Web 和移动设备应用。

    Node模块之fs模块(六)

    屏幕快照 2017-08-08 上午10.53.21.png 第一部分 概述 Node.js 提供一组类似UNIX(POSIX)标准的文件操作API,Node.js中操作文件的模块是fs(File

    Nodejs视频服务器 切片ffmpeg

    Node 视频服务器 切片ffmpeg

    Node实战篇:Express--jade模板引擎(七)

    Jade(Pug) — Node Template Engine,一个高性能的模板引擎,专为 Node 而做......

    Node实战篇:Express 中间件-body-parser()

    body-parser是什么?body-parser是一个HTTP请求体解析中间件,使用这个模块可以解析JS

    2018年8个技巧来构建更好的Node.js应用程序

    2018年8个技巧来构建更好的Node.js应用程序

    Node 模块之 util URL queryString path(八)

    第一部分 util util是一个Node.js核心模块,util模块设计的主要目的是为了满足Node内部API的需求。其中包括:格式化字符串、对象的序列化、实现对象继承等常用方法。要使用util

    Node.js 实战篇--微信支付系列(二)

    接上一篇首先我们看一下整体上微信小程序的开发流程图

    Node.js 实战篇--微信支付系列(一)

    真正的无知不是知识的贫乏, 而是拒绝获取知识!——波普尔 (哲学家 思想家)鉴于微信支付文档内容详实

    Node实战篇:Nodejs 链接 Mariadb 实例

    MariaDB数据库管理系统是MySQL的一个分支,主要由开源社区在维护,采用GPL授权许可 MariaDB的目的是完全兼容MySQL

    Node异步式I/O和异步式编程(三)

     Node.js 最大的特点就是异步式 I/O(或者非阻塞I/O)与事件紧密结合的编程模式。 第一部分: I/O 1.阻塞I/O与非阻塞I/O概念 1.1阻塞I/O(同步I/O) 线程在

    Node实战篇:Express中间件与request(四)

    Express 是一个路由和中间件 Web 框架,其自身只具有最低程度的功能:Express 应用程序基本上是一系列中间件函数调用。