Node&RabbitMQ系列六 保证消费

来都来了 2021-01-10 10:54:34 ⋅ 23 阅读

 

 


上篇文章主要以生产者角度:确保消息发出去了,这篇文章主要以消费者的角度:确保处理了对应的消息

Quonum Queue不适用场景适用场景代码实现RePublish

处理包含几层含义

  1. 成功ack;

  2. 失败nack-requeue: true or false;

  3. 扔到死信队列

试想以下场景

消费者收到消息,业务执行异常, 重试N次,如果N次内成功--[ack]; 否则丢弃或扔到死信队列;

github上问了该问题,博主的回答

  1. Use a quorum queue (requires RabbitMQ 3.8.0 or higher) and specify a redelivery limit

  2. Count message redeliveries in an external store (you need a unique way to identify the message if you do this)

  3. Instead of nacking the message, re-publish it with a new header which counts the number of redeliveries, then acknowledge the original. This can lead to duplicate messages if you application crashes between the publish and the acknowledgment however


  1. quorum-queue: 设置delivery-limit;

  2. 重新publish;

  3. 生成header头retrynumbuer;

  4. 发布到原来的交换机

  5. 丢弃消息

  6. 比如采用redis记录重试次数,

Quonum Queue

RabbitMQ--Quorum Queues

Quorum Queues 也是RabbitMQ的队列类型,自RabbitMQ 3.8.0起可用。默认生成的queue type 是 Classic;

  • 宗旨为:将数据安全放在首位

  • Classic VS Quorum

Feature Classic Mirrored Quorum
Non-durable queues <可以非持久> yes no
Exclusivity yes no
Per message persistence per message always
Membership changes automatic manual
TTL yes no
Queue length limits yes partial (drop-head strategy only)
Lazy behaviour yes partial (see Memory Limit)
Message priority<优先级> yes no
Consumer priority<优先级> yes no
Dead letter exchanges yes yes
Adheres to policies<策略> yes partial (dlx, queue length limits)
Reacts to memory alarms yes partial (truncates log)
Poison message handling no yes
Global QoS Prefetch yes no

Poison message handling: 翻译成异常消息处理,估计没啥问题。

不适用场景

  1. 临时队列;

  2. 低延迟;

  3. 数据安全性没那么高的

  4. 非常长的队列---因为quonum一直将数据存储在内存,如可以最大的消息数量、限制消息体的大小。

适用场景

  1. 对数据安全性要求高的, 例如销售系统中的接收订单或选举系统中的投票,其中可能丢失的消息将对系统的正确性和功能产生重大影响。

  2. 发布者确认收到,消费者手动ack, 配合quonum保证数据安全性。

代码实现

// send.ts
import * as amqp from 'amqplib'

(async function publish() {
    const url = `amqp://localhost:5672`;
    // 默认交换机-队列-路由
    const exchange = '6.delivery.limit.exchange';
    const queueExName = '6.delivery.limit.queue';
    const routingKey = '6.delivery.limit.routerkey';


    const connect = await amqp.connect(url);
    const channel = await connect.createChannel();
    // 默认交换机
    await channel.assertExchange(exchange, 'direct', { durablefalse });
    // 发布消息
    channel.publish(exchange, routingKey, Buffer.from('hello world'), {
        expiration3000
    });
    await sleep(1);
    await connect.close();
    process.exit(0);
})();

function sleep(time: number{
    return new Promise((resolve) => setTimeout(resolve, time*1000));
}
import * as amqp from 'amqplib'

/** 
 * 消费者 确认消息
 * 
 * 1. 消费者重试3次,《采用quonum queue + delivery-limit: 3》
 * 2. 
*/


(async function consumer() {

    // 交换机-路由
    const url = `amqp://localhost:5672`;
    // 死信交换机-路由
    const deadLetterExchange = '6.dlx.exchange';
    const deadLetterRoutingKey = '6.dlx.routingkey' 
    const deadLetterQueue = '6.dlx.queue' 
    // 默认交换机-队列-路由
    const exchange = '6.delivery.limit.exchange';
    const queueExName = '6.delivery.limit.queue';
    const routingKey = '6.delivery.limit.routerkey';


    const connect = await amqp.connect(url);
    const channel = await connect.createChannel();
    await channel.assertExchange(exchange, 'direct', { durablefalse });

    // 死信队列相关,为了web ui 展示数据
    await channel.assertExchange(deadLetterExchange, 'direct', { durablefalse });
    const queueDL = await channel.assertQueue(deadLetterQueue)
    await channel.bindQueue(queueDL.queue, deadLetterExchange, deadLetterRoutingKey);


    const queueEX = await channel.assertQueue(queueExName, {
        exclusivefalse,
        deadLetterExchange,
        deadLetterRoutingKey,
        arguments: {
            'x-queue-type''quorum',
            'x-delivery-limit'3
        }
    });
    await channel.bindQueue(queueEX.queue, exchange, routingKey);
    await channel.consume(queueEX.queue, msg => {
        console.log("消费队列", msg);
        channel.nack(msg, falsetrue)
    });
})();
➜  6Consumer git:(main) ✗ ts-node receive.ts
消费队列 { fields:
   { consumerTag: 'amq.ctag-EKH-xj09f47rsz3EqX8e7Q',
     deliveryTag: 1,
     redelivered: false,
     exchange: '6.delivery.limit.exchange',
     routingKey: '6.delivery.limit.routerkey' },
  properties:
   { contentType: undefined,
     contentEncoding: undefined,
# 这里
     headers: {},
     deliveryMode: undefined,
     priority: undefined,
     correlationId: undefined,
     replyTo: undefined,
     expiration: '3000',
     messageId: undefined,
     timestamp: undefined,
     type: undefined,
     userId: undefined,
     appId: undefined,
     clusterId: undefined },
  content: <Buffer 68 65 6c 6c 6f 20 77 6f 72 6c 64> }
消费队列 { fields:
   { consumerTag: 'amq.ctag-EKH-xj09f47rsz3EqX8e7Q',
# 看这里
     deliveryTag: 2,
     redelivered: true,
     exchange: '6.delivery.limit.exchange',
     routingKey: '6.delivery.limit.routerkey' },
  properties:
   { contentType: undefined,
     contentEncoding: undefined,
# 这里
     headers: { 'x-delivery-count': 1 },
     deliveryMode: undefined,
     priority: undefined,
     correlationId: undefined,
     replyTo: undefined,
     expiration: '3000',
     messageId: undefined,
     timestamp: undefined,
     type: undefined,
     userId: undefined,
     appId: undefined,
     clusterId: undefined },
  content: <Buffer 68 65 6c 6c 6f 20 77 6f 72 6c 64> }
消费队列 { fields:
   { consumerTag: 'amq.ctag-EKH-xj09f47rsz3EqX8e7Q',
     deliveryTag: 3,
     redelivered: true,
     exchange: '6.delivery.limit.exchange',
     routingKey: '6.delivery.limit.routerkey' },
  properties:
   { contentType: undefined,
     contentEncoding: undefined,
     headers: { 'x-delivery-count': 2 },
     deliveryMode: undefined,
     priority: undefined,
     correlationId: undefined,
     replyTo: undefined,
     expiration: '3000',
     messageId: undefined,
     timestamp: undefined,
     type: undefined,
     userId: undefined,
     appId: undefined,
     clusterId: undefined },
  content: <Buffer 68 65 6c 6c 6f 20 77 6f 72 6c 64> }

RePublish

 


全部评论: 0

    我有话说:

    Node&RabbitMQ系列保证投递

      网上又很多确保消息100%投递相关的文章,但总的来说还是分为几种情况的 发送端如何确保发送到了交换机 消费端针对不同情况处理消息–后续再说 本文围绕发布者相关内容,目标

    Node&RabbitMQ系列二 延迟|死信队列

      前提 目前项目中采用ts+eggjs结合的方式,针对定时任务,采用schedule,随着业务的增多,觉得缺点啥,可能就是缺消息队列吧。上一篇文章,针对rabbitmq的基本语法进行了

    Node&RabbitMQ系列三 重连

      前提 前两篇对rabbitmq的基本概念与延迟队列、死信队列进行了代码测试,默认都是理想情况的正常操作,针对复杂多变的网络环境,先不说投递的可靠性,首先服务的可用性就是第一个拦路虎

    Node rabbitmq 入门就够了

      消息中间件 消息队列中间件(Message Queue Middleware, 简称为 MQ)是指利用高效可靠的消息传递机制进行与平台无关的数据交流, 并基于数据通信来进行分布式系统

    Node&RabbitMQ系列四 X场景解构

    目标 先设想购票的场景:用户注册成功后,三天内未登陆,则短信提醒。 三段式场景: A-B-C可以延伸至: 发起购票—>时间+未付款—>短信提醒⏰、取消订单 发起退款—>时间+未处理—>通知相关人员⏰ 总的来说,满足A...

    Node模块之fs模块()

    屏幕快照 2017-08-08 上午10.53.21.png 第一部分 概述 Node.js 提供一组类似UNIX(POSIX)标准的文件操作API,Node.js中操作文件的模块是fs(File

    Node实战篇:Express路由(三)

    Express 是一个基于 Node.js 平台的极简、灵活的 web 应用开发框架,它提供一系列强大的特性,帮助你创建各种 Web 和移动设备应用。

    Node实战篇:Express 中间件 cookie-parser()

    cookieParser()实际上是对http传入的cookie进行解析后赋值给req.cookies,使得中间件可用

    Node模块之Events模块(五)

    Node模块之Events模块(五)

    Nodejs视频服务器 切片ffmpeg

    Node 视频服务器 切片ffmpeg

    Node实战篇:Express--jade模板引擎(七)

    Jade(Pug) — Node Template Engine,一个高性能的模板引擎,专为 Node 而做......

    2018年8个技巧来构建更好的Node.js应用程序

    2018年8个技巧来构建更好的Node.js应用程序

    Node 模块之 util URL queryString path(八)

    第一部分 util util是一个Node.js核心模块,util模块设计的主要目的是为了满足Node内部API的需求。其中包括:格式化字符串、对象的序列化、实现对象继承等常用方法。要使用util

    Node.js 实战篇--微信支付系列(二)

    接上一篇首先我们看一下整体上微信小程序的开发流程图

    Node.js 实战篇--微信支付系列(一)

    真正的无知不是知识的贫乏, 而是拒绝获取知识!——波普尔 (哲学家 思想家)鉴于微信支付文档内容详实

    Node实战篇:Nodejs 链接 Mariadb 实例

    MariaDB数据库管理系统是MySQL的一个分支,主要由开源社区在维护,采用GPL授权许可 MariaDB的目的是完全兼容MySQL

    Node异步式I/O和异步式编程(三)

     Node.js 最大的特点就是异步式 I/O(或者非阻塞I/O)与事件紧密结合的编程模式。 第一部分: I/O 1.阻塞I/O与非阻塞I/O概念 1.1阻塞I/O(同步I/O) 线程在