Node&RabbitMQ系列四 X场景解构

来都来了 2020-11-01 15:34:37 ⋅ 728 阅读

目标

先设想购票的场景:用户注册成功后,三天内未登陆,则短信提醒

三段式场景: A-B-C
可以延伸至:

  1. 发起购票—>时间+未付款—>短信提醒⏰、取消订单
  2. 发起退款—>时间+未处理—>通知相关人员⏰
总的来说,满足A,到期时间后进一步处理。

思路

案例

该案例来自–RabbitMQ最佳实践

假设有个订单(Order)系统,用户下单后需要向用户发送短信通知,而所有对订单的数据显示采用了CQRS架构,即将订单的读模型和写模型分离,即所有订单的更新都通过事件发到rabbitmq,然后专门有个consumer接收这些消息用于更新订单的读模型

在次基础上,做了少许更改: 创建订单,5分钟后如果没有支付,则发短信通知。, 整个流程:

  1. 触发订单,统一放进 order.exchange 交换机,并且采用 rabbitmq delay 插件、采用topic规则;
  2. 一个订单消息,同时发到两个队列
    1. 处理订单: order.# 【后期update可以沿用该路由】
    2. 处理短信:order.create
  3. 发布消息
    1. 采用手工ack的方式,记录日志,进一步保证可靠性;
    2. 采用延迟插件
      1. order.#: 直接发送
      2. order.create: 延迟N分钟,
  4. 队列
    1. order.notification 绑定 order.create, 处理短信
    2. order.summary: 绑定order.#, 处理订单业务相关
  5. 死信队列
    1. 模拟,短信服务、订单业务异常,扔到死信队列,以供之后相关操作。

待完善

  1. 可靠投递;注:先把流程走通,实现了再完善;
  2. 重试投递;requeue问题
  3. 当然还有更多问题待发掘。

代码部分

1. rabbitmq 订单相关的交换机、队列等的名称

// 订单相关mq
export enum ORDER {
// 相关业务一个交换机
EXCHANE = 'order.exchange',
// 路由key
ROUTERKEY_CREATE = 'order.create',
ROUTERKEY_SUMMARY = 'order.#',
// 两个队列
QUEUE_NOTIFICATION = 'order.notification',
QUEUE_SUMMARY = 'order.summary',
// 死信
ELX_EXCHANGE = 'order.dlx.exchange',
ELK_QUEUE = 'order.dlq.queue'
}
 

2. 生产端

import * as rabbimq from "./rabbitmq";
import { ORDER, EX_TYPE } from './rbEnum';
import * as mock from 'mockjs';

export async function create() {
const connct = rabbimq.connection();
const channel = await connct.createConfirmChannel();
// @ts-ignore
// 统一业务,用一个交换机: 延迟+topic
await channel.assertExchange(ORDER.EXCHANE, EX_TYPE.DELAY, {
durable: true,
arguments: {'x-delayed-type': EX_TYPE.TOPIC }
});
// 模拟一些假的数据
let i = 0;
while(i < 3){
i++;
// N分钟后,对没支付的进行通知
const time = mock.Random.integer(500, 2000);
const expiration = 1000 * 60 * 60;
const content = { id: mock.Random.integer(1000), number: mock.Random.integer(1, 3), time };
const msg = JSON.stringify(content);

// !场景1:创建订单,如果三分钟不付款,则发短信通知
channel.publish(ORDER.EXCHANE, ORDER.ROUTERKEY_CREATE, Buffer.from(msg), {
expiration,
contentType: 'application/json',
headers: {
'x-delay': time, // 一定要设置,否则按一般的交换机
},
// 消息持久化
deliveryMode: 2
}, (err, ok) => {
// 生产端监听消息是否被ack;比如,记录日志啥的
// 如果消费端,nack, 则不会再次到这里
// console.log("是否被ack----ok: ", err, ok);
if (err !== null) {
console.warn('【SMS】Message nacked!');
} else {
console.log('【SMS】Message acked');
}
});
// !场景2, 发送后,供消费端消费
channel.publish(ORDER.EXCHANE, ORDER.QUEUE_SUMMARY, Buffer.from(msg), {}, (err, ok) => {
if (err !== null) {
console.warn('【summary】Message nacked!');
} else {
console.log('【summary】Message acked');
}
})

}
// 如果创建需要确认的channel,需要等待
// 生产者消息确认,一旦消息被投递到指定交换机,broker就会ack
await channel.waitForConfirms()
await channel.close();
};
 

3. 消费端

import * as rabbimq from "./rabbitmq";
import { Channel, Message } from "amqplib";
import { ORDER, EX_TYPE } from './rbEnum';
import * as mock from 'mockjs';


// 死信队列,针对无效信息的归属地
export async function orderDlq() {
const connect = rabbimq.connection();
const channel: Channel = await connect.createChannel();
await channel.assertExchange(ORDER.ELX_EXCHANGE, EX_TYPE.TOPIC, { durable: true })
const queueConsumer = await channel.assertQueue(ORDER.ELK_QUEUE, {
exclusive: true,
});
await channel.prefetch(1, false);
await channel.bindQueue(queueConsumer.queue, ORDER.ELX_EXCHANGE, 'order.#');

channel.consume(queueConsumer.queue, async msg => {
// console.info('【死信队列】收到的消息', msg.content.toString());
channel.ack(msg, false);
});
}

// 针对没有支付的,发送短信,
export async function orderSms() {
const args = {
exchange: ORDER.EXCHANE, exchangeType: EX_TYPE.DELAY,
routerKey: ORDER.ROUTERKEY_CREATE, queueName: ORDER.QUEUE_NOTIFICATION,
elx: ORDER.ELX_EXCHANGE, elk: ORDER.ELK_QUEUE
};
consumer(args, async (msg, channel) => {
const { content } = msg;
// console.log(`【短信】获取消息: ${content}`);
// 根据消息ID,查询对应用户是否支付,如果支付ack, 否则发送短信&扔到死信队列,之后再说
const success = mock.Random.boolean();
if (success) {
console.info(`【短信】无需发送短信 ${content}`, )
channel.ack(msg, false);
} else {
// !可以扔到短信队列
console.info(`【短信】该用户尚未支付,发送短信中----- ${content}`);
channel.nack(msg, false, false);
}
})
}

// 创建订单,针对订单相关信息进行处理
export async function orderSummery() {
const args = {
exchange: ORDER.EXCHANE, exchangeType: EX_TYPE.DELAY,
routerKey: ORDER.ROUTERKEY_SUMMARY, queueName: ORDER.QUEUE_SUMMARY,
elx: ORDER.ELX_EXCHANGE, elk: ORDER.ELK_QUEUE
};

consumer(args, async (msg, channel) => {
const { content, fields: { deliveryTag }, properties: { headers: { retry } } } = msg;
// console.log(`【订单-消费】获取消息: ${content}`);
// 模拟业务
const success = await mock.Random.boolean();
if (success) {
console.info('【订单-消费】消费成功', content.toString());
// broker 从内存磁盘中删除
channel.ack(msg, false);
} else {
// 仍旧保留
console.info(`【订单-消费】放入死信队列`);
channel.nack(msg, false, false);
// 最大重试次数 [加入redis 或 其他队列]
// if () {
// console.info(`【订单-消费】第 ${retry} 次消费 ${deliveryTag} 失败,尝试重试`);
// const requeue = true;
// channel.nack(msg, false, requeue);
// } else {
// console.info(`【订单-消费】第 ${retry} 次消费 ${deliveryTag} 失败,放入死信队列`);
// const requeue = false;
// channel.nack(msg, false, requeue);
// }
}
})
}


/**
* 公用消费端
* @param exchange
* @param exchangeType
* @param routerKey
* @param elx 死信交换机
* @param elk 死信队列
*/
async function consumer(args: { exchange: string, exchangeType: any, routerKey: string, queueName: string, elx: string, elk: string },
callback: (msg: Message, channel: Channel) => {}) {
const { exchange, exchangeType, routerKey, queueName, elx, elk } = args;
const connect = rabbimq.connection();
const channel: Channel = await connect.createConfirmChannel();

// ! topic + delay
await channel.assertExchange(exchange, exchangeType, { durable: true, arguments: { 'x-delayed-type': EX_TYPE.TOPIC } })

const queueConsumer = await channel.assertQueue(queueName, {
exclusive: true,
deadLetterExchange: elx,
deadLetterRoutingKey: elk,
});
await channel.prefetch(1, false);
await channel.bindQueue(queueConsumer.queue, exchange, routerKey);
channel.consume(queueConsumer.queue, async msg => {
// console.info('统一收到的消息', msg);
callback(msg, channel);
}, { noAck: false });
}
 

4. github

github—https://github.com/simuty/Node_Demo/tree/main/rabbitmq


全部评论: 0

    我有话说:

    Node&RabbitMQ系列二 延迟|死信队列

      前提 目前项目中采用ts+eggjs结合的方式,针对定时任务,采用schedule,随着业务的增多,觉得缺点啥,可能就是缺消息队列吧。上一篇文章,针对rabbitmq的基本语法进行了

    Node&RabbitMQ系列三 重连

      前提 前两篇对rabbitmq的基本概念与延迟队列、死信队列进行了代码测试,默认都是理想情况的正常操作,针对复杂多变的网络环境,先不说投递的可靠性,首先服务的可用性就是第一个拦路虎

    Node&RabbitMQ系列六 保证消费

        上篇文章主要以生产者角度:确保消息发出去了,这篇文章主要以消费者的角度:确保处理了对应的消息 Quonum Queue不适用场景适用场景代码实现RePublish

    Node&RabbitMQ系列五 保证投递

      网上又很多确保消息100%投递相关的文章,但总的来说还是分为几种情况的 发送端如何确保发送到了交换机 消费端针对不同情况处理消息–后续再说 本文围绕发布者相关内容,目标  发布者消息确认【100%】  ma...

    Node rabbitmq 入门就够了

      消息中间件 消息队列中间件(Message Queue Middleware, 简称为 MQ)是指利用高效可靠的消息传递机制进行与平台无关的数据交流, 并基于数据通信来进行分布式系统

    Node实战篇:Express中间件与request()

    Express 是一个路由和中间件 Web 框架,其自身只具有最低程度的功能:Express 应用程序基本上是一系列中间件函数调用。

    Node模块()

    模块化分工、各司其职

    Node实战篇:Express路由(三)

    Express 是一个基于 Node.js 平台的极简、灵活的 web 应用开发框架,它提供一系列强大的特性,帮助你创建各种 Web 和移动设备应用。

    Node模块之fs模块(六)

    屏幕快照 2017-08-08 上午10.53.21.png 第一部分 概述 Node.js 提供一组类似UNIX(POSIX)标准的文件操作API,Node.js中操作文件的模块是fs(File

    Node模块之Events模块(五)

    Node模块之Events模块(五)

    Nodejs视频服务器 切片ffmpeg

    Node 视频服务器 切片ffmpeg

    Node实战篇:Express--jade模板引擎(七)

    Jade(Pug) — Node Template Engine,一个高性能的模板引擎,专为 Node 而做......

    Redis系列一 基本用法&应用场景

        说明 redis的最基本使用方法以及使用场景。 字符串 // stringasync function stringFun() { const [key

    2018年8个技巧来构建更好的Node.js应用程序

    2018年8个技巧来构建更好的Node.js应用程序

    Node 模块之 util URL queryString path(八)

    第一部分 util util是一个Node.js核心模块,util模块设计的主要目的是为了满足Node内部API的需求。其中包括:格式化字符串、对象的序列化、实现对象继承等常用方法。要使用util

    Node.js 实战篇--微信支付系列(二)

    接上一篇首先我们看一下整体上微信小程序的开发流程图

    Node.js 实战篇--微信支付系列(一)

    真正的无知不是知识的贫乏, 而是拒绝获取知识!——波普尔 (哲学家 思想家)鉴于微信支付文档内容详实

    Node实战篇:Nodejs 链接 Mariadb 实例

    MariaDB数据库管理系统是MySQL的一个分支,主要由开源社区在维护,采用GPL授权许可 MariaDB的目的是完全兼容MySQL

    Redis系列 GEO附近的人

    分成4个部分。 我们先将平面切割成个正方形,然...