Node&RabbitMQ系列三 重连

来都来了 2020-10-29 18:09:10 ⋅ 872 阅读

 




前提

前两篇对rabbitmq的基本概念与延迟队列、死信队列进行了代码测试,默认都是理想情况的正常操作,针对复杂多变的网络环境,先不说投递的可靠性,首先服务的可用性就是第一个拦路虎,如:重连、限流。

本文目标:

  1. 单独抽离rabbitmq配置,便于之后写插件
  2. 考虑异常,比如:重联,<之前为了实现API,不想考虑>
  3. 消费端限流,为啥,因为遇到过bug…
  4.  

有了前边三篇的基础,就直接上代码了

代码篇

重点


// !应该写断言的。。。下次
import * as assert from 'assert'
import * as amqp from 'amqplib'
import * as consumer from './consumer'


// 连接配置:https://www.squaremobius.net/amqp.node/channel_api.html#connect
// url | config
const config = {
protocol: 'amqp',
hostname: 'localhost',
port: 5672,
username: 'guest',
password: 'guest',
// 最大连接数,0:无限
// the size in bytes of the maximum frame allowed over the connection. 0 means no limit (but since frames have a size field which is an unsigned 32 bit integer, it’s perforce 2^32 - 1); I default it to 0x1000, i.e. 4kb, which is the allowed minimum, will fit many purposes, and not chug through Node.JS’s buffer pooling.
frameMax: 0,
// 心跳周期
heartbeat: 0,
}

let connect: amqp.Connection;
// 最大连接次数...
let maxConnectTimes = 0;
let isConnect = false;
export const init = async () => {
try {
connect = await amqp.connect(config);
// 监听error\close,重新连接
connect.on('error', err => {
reconnect(err, 'error');
});
// 什么时候会触发?网络异常、服务异常、管理后台删除
connect.on('close', err => {
reconnect(err, 'close');
});
console.info('[x]Rabbitmq connect success');
// !注册执行消费者
// 可以根据需求,多写几个?
consumer.run(connect);
return connect;
} catch (error) {
reconnect(error, 'catch');
}

}

const reconnect = (err, event) => {
// 因为后台删除连接,会同时触发error、close, 为了不一次创建两个,所以做个限制
if (!isConnect) {
isConnect = true;
maxConnectTimes++;
console.error(`[x]Lost connection to RMQ. reconnectingCount: ${maxConnectTimes}. Reconnecting in 10 seconds...`);
console.error('[x]Rabbitmq close: ', event, err);
// 5秒连接一次
return setTimeout(init, 1000 * 5);
}
}

// 公用这个连接
export const connection = () => {
return connect;
}
 

启动文件

import * as http from 'http'
import * as rabbitmq from './rabbitmq';
import * as producer from './producer';

/**
* 实现功能
* 1. 启动node服务 && 初始化rabbitmq <包含:重联、启动消费端>
* 2. 接口请求:http://127.0.0.1:3000/producer; 触发【生产者生产信息】
* 3. 【消费端】
* 1. 监听队列进行消费
* 2. 限流,其实设置一下参数就行了
* 3. 待完成:根据业务,扔到死信队列 ?
*/

http.createServer((req, res) => {
if (req.url === '/producer') {
producer.publish();
}
res.end('hello world')
}).listen(3000, () => {
rabbitmq.init();
console.log('开启端口3000')
})
 

消费端

async function consumer(args: {exchange, queue, routingKey, connection}, cb: (msg: any, channel: any) => void){
// 常规操作
const channel = await args.connection.createChannel();
await channel.assertExchange(args.exchange, 'direct', {durable: false});
const queueA = await channel.assertQueue(args.queue, {exclusive: false});
await channel.bindQueue(queueA.queue, args.exchange, args.routingKey);
// !消费端限流
await channel.prefetch(1, false);
// 消费队列
await channel.consume(queueA.queue, msg => {
cb(msg, channel);
});
}


export const run = (connection) => {
consumer({
exchange: 'order.exchange',
routingKey: 'order.routingKey',
queue: 'order.queue',
connection,
}, async (msg, channel) => {
const data = msg.content.toString();
console.info(`${(new Date()).getMinutes()}:${(new Date()).getSeconds()} consumer msg:%j`, data);
console.log('msg: ', msg)
return setTimeout(function () {
try {
/**
* 针对队列信息进行业务操作
* 1. 直接消费
* 2. 重回队列
* 3. 扔到死信队列
*/
channel.ack(msg);
// if(Number(data) < 6) {
// // 手动ack
// channel.ack(msg);
// } else {
// // !1. 重回队列
// channel.nack(msg);
// // !2. 扔到死信队列
// // 下个demo再整理。
// }
} catch (err) {
console.error('消息 Ack Error:', err)
}
// 每隔1s执行一个任务
}, 1000);
})
}
 

github

# 启动服务,node应该都会,要不也不会看这个
$ ts-node index.ts
 

代码地址:https://github.com/simuty/Node_Demo

 

 

参考链接
How to reestablish connection after a failure? #153
一次 RabbitMQ 生产故障引发的服务重连限流思考
Node + MQ 限流小计
Java秒杀系统实战系列~秒杀逻辑优化之RabbitMQ接口限流二
【RabbitMQ】一文带你搞定RabbitMQ延迟队列
amqp wiki


全部评论: 0

    我有话说:

    Node&RabbitMQ系列二 延迟|死信队列

      前提 目前项目中采用ts+eggjs结合的方式,针对定时任务,采用schedule,随着业务的增多,觉得缺点啥,可能就是缺消息队列吧。上一篇文章,针对rabbitmq的基本语法进行了

    Node&RabbitMQ系列四 X场景解构

    目标 先设想购票的场景:用户注册成功后,天内未登陆,则短信提醒。 段式场景: A-B-C可以延伸至: 发起购票—>时间+未付款—>短信提醒⏰、取消订单 发起退款—>时间+未

    Node&RabbitMQ系列五 保证投递

      网上又很多确保消息100%投递相关的文章,但总的来说还是分为几种情况的 发送端如何确保发送到了交换机 消费端针对不同情况处理消息–后续再说 本文围绕发布者相关内容,目标  发布者消息确认【100%】  ma...

    Node&RabbitMQ系列六 保证消费

        上篇文章主要以生产者角度:确保消息发出去了,这篇文章主要以消费者的角度:确保处理了对应的消息 Quonum Queue不适用场景适用场景代码实现RePublish 处理包含几层含义 成功ack; 失败nac...

    Node rabbitmq 入门就够了

      消息中间件 消息队列中间件(Message Queue Middleware, 简称为 MQ)是指利用高效可靠的消息传递机制进行与平台无关的数据交流, 并基于数据通信来进行分布式系统

    Node实战篇:Express路由()

    Express 是一个基于 Node.js 平台的极简、灵活的 web 应用开发框架,它提供一系列强大的特性,帮助你创建各种 Web 和移动设备应用。

    Node异步式I/O和异步式编程()

     Node.js 最大的特点就是异步式 I/O(或者非阻塞I/O)与事件紧密结合的编程模式。 第一部分: I/O 1.阻塞I/O与非阻塞I/O概念 1.1阻塞I/O(同步I/O) 线程在

    Node模块之fs模块(六)

    屏幕快照 2017-08-08 上午10.53.21.png 第一部分 概述 Node.js 提供一组类似UNIX(POSIX)标准的文件操作API,Node.js中操作文件的模块是fs(File

    Node模块之Events模块(五)

    Node模块之Events模块(五)

    Nodejs视频服务器 切片ffmpeg

    Node 视频服务器 切片ffmpeg

    Node实战篇:Express--jade模板引擎(七)

    Jade(Pug) — Node Template Engine,一个高性能的模板引擎,专为 Node 而做......

    2018年8个技巧来构建更好的Node.js应用程序

    2018年8个技巧来构建更好的Node.js应用程序

    Node 模块之 util URL queryString path(八)

    第一部分 util util是一个Node.js核心模块,util模块设计的主要目的是为了满足Node内部API的需求。其中包括:格式化字符串、对象的序列化、实现对象继承等常用方法。要使用util

    Node.js 实战篇--微信支付系列(二)

    接上一篇首先我们看一下整体上微信小程序的开发流程图

    Node.js 实战篇--微信支付系列(一)

    真正的无知不是知识的贫乏, 而是拒绝获取知识!——波普尔 (哲学家 思想家)鉴于微信支付文档内容详实

    Node实战篇:Nodejs 链接 Mariadb 实例

    MariaDB数据库管理系统是MySQL的一个分支,主要由开源社区在维护,采用GPL授权许可 MariaDB的目的是完全兼容MySQL

    Node实战篇:Express中间件与request(四)

    Express 是一个路由和中间件 Web 框架,其自身只具有最低程度的功能:Express 应用程序基本上是一系列中间件函数调用。

    为什么要使用 Node.js?这几点你必须知道!

    经过这几年的发展,前端普遍进入了技术深水区,只会Web页面开发已经难以满足企业的需求,Node逐渐成为了刚性技能。 但Node在业务上的使用还没有那么普及,有的时候想用老板还不同意,本文将从4个角度

    NodeJS 10.5.0 中的线程:实用介绍

    几天前,Node.js版本10.5.0发布,其中包含的主要功能之一是添加了线程支持。