测试文章内增加广告

小晨Maste 2020-11-03 09:22:35 ⋅ 0 阅读

目标

先设想购票的场景:用户注册成功后,三天内未登陆,则短信提醒

三段式场景: A-B-C
可以延伸至:

  1. 发起购票—>时间+未付款—>短信提醒⏰、取消订单
  2. 发起退款—>时间+未处理—>通知相关人员⏰
总的来说,满足A,到期时间后进一步处理。

思路

案例

该案例来自–RabbitMQ最佳实践

假设有个订单(Order)系统,用户下单后需要向用户发送短信通知,而所有对订单的数据显示采用了CQRS架构,即将订单的读模型和写模型分离,即所有订单的更新都通过事件发到rabbitmq,然后专门有个consumer接收这些消息用于更新订单的读模型

在次基础上,做了少许更改: 创建订单,5分钟后如果没有支付,则发短信通知。, 整个流程:

  1. 触发订单,统一放进 order.exchange 交换机,并且采用 rabbitmq delay 插件、采用topic规则;
  2. 一个订单消息,同时发到两个队列
    1. 处理订单: order.# 【后期update可以沿用该路由】
    2. 处理短信:order.create
  3. 发布消息
    1. 采用手工ack的方式,记录日志,进一步保证可靠性;
    2. 采用延迟插件
      1. order.#: 直接发送
      2. order.create: 延迟N分钟,
  4. 队列
    1. order.notification 绑定 order.create, 处理短信
    2. order.summary: 绑定order.#, 处理订单业务相关
  5. 死信队列
    1. 模拟,短信服务、订单业务异常,扔到死信队列,以供之后相关操作。

待完善

  1. 可靠投递;注:先把流程走通,实现了再完善;
  2. 重试投递;requeue问题
  3. 当然还有更多问题待发掘。

 

代码部分

1. rabbitmq 订单相关的交换机、队列等的名称

// 订单相关mq
export enum ORDER {
// 相关业务一个交换机
EXCHANE = 'order.exchange',
// 路由key
ROUTERKEY_CREATE = 'order.create',
ROUTERKEY_SUMMARY = 'order.#',
// 两个队列
QUEUE_NOTIFICATION = 'order.notification',
QUEUE_SUMMARY = 'order.summary',
// 死信
ELX_EXCHANGE = 'order.dlx.exchange',
ELK_QUEUE = 'order.dlq.queue'
}
 

2. 生产端

import * as rabbimq from "./rabbitmq";
import { ORDER, EX_TYPE } from './rbEnum';
import * as mock from 'mockjs';

export async function create() {
const connct = rabbimq.connection();
const channel = await connct.createConfirmChannel();
// @ts-ignore
// 统一业务,用一个交换机: 延迟+topic
await channel.assertExchange(ORDER.EXCHANE, EX_TYPE.DELAY, {
durable: true,
arguments: {'x-delayed-type': EX_TYPE.TOPIC }
});
// 模拟一些假的数据
let i = 0;
while(i < 3){
i++;
// N分钟后,对没支付的进行通知
const time = mock.Random.integer(500, 2000);
const expiration = 1000 * 60 * 60;
const content = { id: mock.Random.integer(1000), number: mock.Random.integer(1, 3), time };
const msg = JSON.stringify(content);

// !场景1:创建订单,如果三分钟不付款,则发短信通知
channel.publish(ORDER.EXCHANE, ORDER.ROUTERKEY_CREATE, Buffer.from(msg), {
expiration,
contentType: 'application/json',
headers: {
'x-delay': time, // 一定要设置,否则按一般的交换机
},
// 消息持久化
deliveryMode: 2
}, (err, ok) => {
// 生产端监听消息是否被ack;比如,记录日志啥的
// 如果消费端,nack, 则不会再次到这里
// console.log("是否被ack----ok: ", err, ok);
if (err !== null) {
console.warn('【SMS】Message nacked!');
} else {
console.log('【SMS】Message acked');
}
});
// !场景2, 发送后,供消费端消费
channel.publish(ORDER.EXCHANE, ORDER.QUEUE_SUMMARY, Buffer.from(msg), {}, (err, ok) => {
if (err !== null) {
console.warn('【summary】Message nacked!');
} else {
console.log('【summary】Message acked');
}
})

}
// 如果创建需要确认的channel,需要等待
// 生产者消息确认,一旦消息被投递到指定交换机,broker就会ack
await channel.waitForConfirms()
await channel.close();
};
 

3. 消费端

import * as rabbimq from "./rabbitmq";
import { Channel, Message } from "amqplib";
import { ORDER, EX_TYPE } from './rbEnum';
import * as mock from 'mockjs';


// 死信队列,针对无效信息的归属地
export async function orderDlq() {
const connect = rabbimq.connection();
const channel: Channel = await connect.createChannel();
await channel.assertExchange(ORDER.ELX_EXCHANGE, EX_TYPE.TOPIC, { durable: true })
const queueConsumer = await channel.assertQueue(ORDER.ELK_QUEUE, {
exclusive: true,
});
await channel.prefetch(1, false);
await channel.bindQueue(queueConsumer.queue, ORDER.ELX_EXCHANGE, 'order.#');

channel.consume(queueConsumer.queue, async msg => {
// console.info('【死信队列】收到的消息', msg.content.toString());
channel.ack(msg, false);
});
}

// 针对没有支付的,发送短信,
export async function orderSms() {
const args = {
exchange: ORDER.EXCHANE, exchangeType: EX_TYPE.DELAY,
routerKey: ORDER.ROUTERKEY_CREATE, queueName: ORDER.QUEUE_NOTIFICATION,
elx: ORDER.ELX_EXCHANGE, elk: ORDER.ELK_QUEUE
};
consumer(args, async (msg, channel) => {
const { content } = msg;
// console.log(`【短信】获取消息: ${content}`);
// 根据消息ID,查询对应用户是否支付,如果支付ack, 否则发送短信&扔到死信队列,之后再说
const success = mock.Random.boolean();
if (success) {
console.info(`【短信】无需发送短信 ${content}`, )
channel.ack(msg, false);
} else {
// !可以扔到短信队列
console.info(`【短信】该用户尚未支付,发送短信中----- ${content}`);
channel.nack(msg, false, false);
}
})
}

// 创建订单,针对订单相关信息进行处理
export async function orderSummery() {
const args = {
exchange: ORDER.EXCHANE, exchangeType: EX_TYPE.DELAY,
routerKey: ORDER.ROUTERKEY_SUMMARY, queueName: ORDER.QUEUE_SUMMARY,
elx: ORDER.ELX_EXCHANGE, elk: ORDER.ELK_QUEUE
};

consumer(args, async (msg, channel) => {
const { content, fields: { deliveryTag }, properties: { headers: { retry } } } = msg;
// console.log(`【订单-消费】获取消息: ${content}`);
// 模拟业务
const success = await mock.Random.boolean();
if (success) {
console.info('【订单-消费】消费成功', content.toString());
// broker 从内存磁盘中删除
channel.ack(msg, false);
} else {
// 仍旧保留
console.info(`【订单-消费】放入死信队列`);
channel.nack(msg, false, false);
// 最大重试次数 [加入redis 或 其他队列]
// if () {
// console.info(`【订单-消费】第 ${retry} 次消费 ${deliveryTag} 失败,尝试重试`);
// const requeue = true;
// channel.nack(msg, false, requeue);
// } else {
// console.info(`【订单-消费】第 ${retry} 次消费 ${deliveryTag} 失败,放入死信队列`);
// const requeue = false;
// channel.nack(msg, false, requeue);
// }
}
})
}


/**
* 公用消费端
* @param exchange
* @param exchangeType
* @param routerKey
* @param elx 死信交换机
* @param elk 死信队列
*/
async function consumer(args: { exchange: string, exchangeType: any, routerKey: string, queueName: string, elx: string, elk: string },
callback: (msg: Message, channel: Channel) => {}) {
const { exchange, exchangeType, routerKey, queueName, elx, elk } = args;
const connect = rabbimq.connection();
const channel: Channel = await connect.createConfirmChannel();

// ! topic + delay
await channel.assertExchange(exchange, exchangeType, { durable: true, arguments: { 'x-delayed-type': EX_TYPE.TOPIC } })

const queueConsumer = await channel.assertQueue(queueName, {
exclusive: true,
deadLetterExchange: elx,
deadLetterRoutingKey: elk,
});
await channel.prefetch(1, false);
await channel.bindQueue(queueConsumer.queue, exchange, routerKey);
channel.consume(queueConsumer.queue, async msg => {
// console.info('统一收到的消息', msg);
callback(msg, channel);
}, { noAck: false });
}
 

4. github

github—https://github.com/simuty/Node_Demo/tree/main/rabbitmq


全部评论: 0

    我有话说:

    单元测试增强工具TestableMock 让Mock返璞归真

    简介 阿里巴巴研发效能团队开源的Java单元测试增强工具,换种思路写Mock,让单元测试更简单。 无需初始化,不挑测试框架,甭管要换的方法是被测类的私有方法、静态方法还是其他任何类的成员方法,也甭管

    「开源资讯」Atom 1.52.0 和1.53.0-beta0发布,跨平台文本编辑器

    Atom 同时发布了 1.52.0 和 1.53.0-beta0 版本。Atom 是 GitHub 专门为程序员推出的一个跨平台文本编辑器。具有简洁和直观的图形用户界面,并有很多有趣的特点:支持

    精品推荐:Docker与自动化测试及其测试实践

    Docker 本身并不会直接加速测试执行。在串行执行测试时,在容器中执行测试反而会带来约 5% 左右的性能衰减。

    前端实战篇-聊聊JavaScript

    内存生命周期、分配存、使用分配的内存(读与写操作),当应用程序不再需要时,释放掉已分配的

    TestableMock v0.6.0 发布,新增测试参数快速构造工具

    项目介绍 TestableMock是一款由阿里效能团队开源的Java单元测试增强工具,提供四项具有针对性的辅助能力: 快速Mock任意调用:解决传统Mock工具使用繁琐的问题 访问被测类私有成员

    推荐一款 RabbitMQTest 性能测试工具

    软件简介 RabbitMQTest 用于 RabbitMQ 性能测试,可提供对单个队列写入,消费以及对多个队列进行同时读写操作的测试. 可配置连接数,通道数 GitHub

    CKEditor 5 v26.0.0 发布:具有可扩展的构建、联部件样式和注释指南

    CKEditor 5 v26.0.0 已经发布,本次更新包括支持创建一个定制的编辑器构建、Mac上的按键管理、设计联部件、编辑器占位符和一组关于使用协作特性注释的新指南等内容。 通过 DLL

    ngrok网穿透服务部署记录

    ngrok,一个用于实现网穿透服务,golang写的,已经很久远的一个东西了,可自己部署的版本最后一个版本是1.7.1,很久也没更新了,但他还是比较稳妥的

    BeetlSQL 3.0.10 发布,置 sega 事务支持

    本次发布主要增加了分布式Sega事务支持,适合多数据源 按照社区建议,修改了了springboot 的 yml配置方式 修改了@Jackson和@UpdateTime,本来是用来作为例子,但社区

    开源文档管理系统 Wizard 1.2.6 发布

    服务,如果你的团队想要使用他,你必须在自己公司...

    京东技术:APP的UI自动化测试框架及平台化探索

    UI自动化测试,即通过模拟手动操作用户UI界面的方式,以代码方式实现自动操作和验证的一种自动化测试手段。

    BlackArch Linux 2020.12.01 发布,渗透测试发行版

    时隔半年,BlackArch Linux 发布了2020年的最后一次版本更新 2020.12.01,新版本增加了超过 100 款工具,官方表示 BlackArch Linux

    Atom 1.56.0 发布,GitHub 官方文本编辑器

    Atom 是由 GitHub 开发的自由及开放源代码的文字与代码编辑器,支持 macOS、Windows 和 Linux 操作系统,支持 Node.js 所写的插件,并置由 Github 提供的

    iOS实战篇:[译]iOS扩充--OCR光学字符识别(附项目GitHub地址)

    OCR(Optical Character Recognition) 光学字符识别, 是从图像中电子扫描提取文本的过程, 可以在文档编辑等多种形式重用它,例如: 文本搜索/压缩等用途。

    MongoDB 查询文档(五)

    第一部分 前期准备1.1 插入测试数据db.test1.insertMany

    精品推荐:【CKEditor】全球最优秀的网页在线文字编辑器之一

    CKEditor是新一代的FCKeditor,是一个重新开发的版本。CKEditor是全球最优秀的网页在线文字编辑器之一,因其惊人的性能与可扩展性而广泛的被运用于各大网站。

    SpringBoot+zk+dubbo架构实践(四):sb+zk+dubbo框架搭建(附源码GitHub地址)

    本篇案例模拟了一个provider服务提供方和PC、Web两个服务消费方附GitHub源码......

    HQChart 1.9370 版本发布,增加分时图画图工具

    , 小程序 更新日志 筹码图增加黑色风格 HQ...