Node rabbitmq 入门就够了

来都来了 2020-10-23 16:11:32 ⋅ 1071 阅读

 

消息中间件

消息队列中间件(Message Queue Middleware, 简称为 MQ)是指利用高效可靠的消息传递机制进行与平台无关的数据交流, 并基于数据通信来进行分布式系统的集成。 通过提供消息传递和消息排队模型 ,它可以在分布式环境下扩展进程间的通信。一般有两种传递模式:点对 点 ( P2P, Point-to-Point )模式和发布/订阅( Pub/Sub )模式

MQ的作用
1)解耦:在项目启动之初是很难预测未来会遇到什么困难的,消息中间件在处理过程中插入了一个隐含的,基于数据的接口层,两边都实现这个接口,这样就允许独立的修改或者扩展两边的处理过程,只要两边遵守相同的接口约束即可。
2)冗余(存储):在某些情况下处理数据的过程中会失败,消息中间件允许把数据持久化知道他们完全被处理
扩展性:消息中间件解耦了应用的过程,所以提供消息入队和处理的效率是很容易的,只需要增加处理流程就可以了。
3)削峰:在访问量剧增的情况下,但是应用仍然需要发挥作用,但是这样的突发流量并不常见。而使用消息中间件采用队列的形式可以减少突发访问压力,不会因为突发的超时负荷要求而崩溃
4)可恢复性:当系统一部分组件失效时,不会影响到整个系统。消息中间件降低了进程间的耦合性,当一个处理消息的进程挂掉后,加入消息中间件的消息仍然可以在系统恢复后重新处理
5)顺序保证:在大多数场景下,处理数据的顺序也很重要,大部分消息中间件支持一定的顺序性
6)缓冲:消息中间件通过一个缓冲层来帮助任务最高效率的执行
7)异步通信:通过把把消息发送给消息中间件,消息中间件并不立即处。

RabbitMQ

概念

RabbitMQ 整体上是一个生产者与消费者模型, 主要负责接收、存储和转发消息。 可以把消 息传递的过程想象成: 当你将一个包裹送到邮局, 邮局会暂存并最终将邮件通过邮递员送到收 件人的手上, RabbitMQ 就好比由邮局、邮箱和邮递员组成的一个系统。 从计算机术语层面来说, RabbitMQ 模型更像是一种交换机模型。

1
2
3
4
5
6
7
8
9
Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输, 
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息的载体,每个消息都会被投到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。
Producer:消息生产者,就是投递消息的程序.
Consumer:消息消费者,就是接受消息的程序.
Channel:消息通道,在客户端的每个连接里,可建立多个channel.
 

Broker:消息中间件的服务节点

对于 RabbitMQ 来说, 一个 RabbitMQ Broker 可以简单地看作一个 RabbitMQ 服务节点, 或者 RabbitMQ 服务实例。 大多数情况下也可以将一个 RabbitMQ Broker 看作一 台 RabbitMQ 服务器 。

Exchange:交换机

生产者将消息发送到 Exchange ,由交换器将消息路由到一个或者多个队列中。

RoutingKey 与 Binding Key

生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,生产者就可以在发送消息给Exchange时,通过指定routing key来决定消息流向哪里。

RabbitMQ常用的Exchange Type有三种:fanout、direct、topic。

  1. fanout:把所有发送到该Exchange的消息投递到所有与它绑定的队列中。
  2. direct:把消息投递到那些binding key与routing key完全匹配的队列中。
  3. topic:将消息路由到binding key与routing key模式匹配的队列中。

示例代码

由官方教程翻译而来,用ts实现,因为觉得ts好使
https://www.rabbitmq.com/tutorials

能搜索rabbitmq的一般都会了解一些概念,就直接上代码吧。

$ node -v
v8.9.3
# 为了执行ts代码
$ ts-node -v
v9.0.0
$ brew install rabbitmq
$ brew services start rabbitmq
 

访问 http://127.0.0.1:15672, 进入rabbitmq后台管理界面。

访问 http://tryrabbitmq.com/ 进入模拟 生产-交换机-队列-消费者

以上为准备阶段与可视化阶段

direct

以下代码主要实现生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// emit_log_direct.ts 生产者
import * as amqp from 'amqplib'

const url = `amqp://localhost`;

(async function publish(){
const exchange = 'direct_logs';
const msg = 'hello world';
const routingKeys = ['info', 'error', 'warning'];
// 1. 创建链接
const connect = await amqp.connect(url);
// 2. 创建channel
const channel = await connect.createChannel();
// 3. 创建or连上 交换机
// 3.1 直连方式;
await channel.assertExchange(exchange, 'direct', { durable: false });

let i = 0;
while(i<1){
const index = random(3);
// 4. 消息发给交换机
channel.publish(exchange, routingKeys[index], Buffer.from(msg));
console.log(`[x] Sent ${msg}-- ${routingKeys[index]}`);
i++;
}
await sleep(1);
await connect.close();
process.exit(0);
})();

function sleep(time: number) {
return new Promise((resolve) => setTimeout(resolve, time*1000));
}

function random(max: number){
return Math.floor(Math.random() * Math.floor(max));
}
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// receive_logs_direct.ts 消费者
import * as amqp from 'amqplib'
const url = `amqp://localhost`;

(async function receive(){
const connect = await amqp.connect(url);
const channel = await connect.createChannel();
const exchange = 'direct_logs';
await channel.assertExchange(exchange, 'direct', { durable: false });


const routingKeys = ['info', 'error', 'warning'];


const queueA = await channel.assertQueue('queueA');
await channel.bindQueue(queueA.queue, exchange, routingKeys[0]);
await channel.bindQueue(queueA.queue, exchange, routingKeys[1]);
await channel.bindQueue(queueA.queue, exchange, routingKeys[2]);

const queueB = await channel.assertQueue('queueB');
await channel.bindQueue(queueB.queue, exchange, routingKeys[1]);

channel.consume(queueA.queue, msg => {
console.log("队列AAAAA:", msg.content.toString())
}, { noAck: true });
channel.consume(queueB.queue, msg => {
console.log("队列BBBBBBBB:", msg.content.toString())
}, { noAck: true });
// await connect.close();
})();
 
1
2
3
4
5
6
7
8
9
10
# 执行
$ ts-node emit_log_direct.ts
[x] Sent hello world-- error
[x] Sent hello world-- info

$ ts-node receive_logs_direct.ts
队列AAAAA: hello world
队列BBBBBBBB: hello world

队列AAAAA: hello world
 

topic

发送到主题交换机的消息 必须是单词列表,以.分割, routing_key无效。通常它们指定与消息相关的某些功能。 一些有效的路由关键示例:“stock.usd.nyse”,“ nyse.vmw”,“ quick.orange.rabbit”。 路由关键字中可以包含任意多个单词,最多255个字节。

1
2
* (star) : 匹配一个单词
# (hash) : 匹配 0 或 更多单词
 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// emit_log_topic.ts
import * as amqp from 'amqplib'

const url = `amqp://localhost`;

(async function publish(){
const exchange = 'topic_logs';
const msg = 'hello world';
// bothqueue、firstqueue、secondqueue
const keys = ['quick.orange.rabbit', 'quick.orange.fox', 'lazy.brown.fox'];
// 1. 创建链接
const connect = await amqp.connect(url);
// 2. 创建channel
const channel = await connect.createChannel();
// 3. 创建or连上 交换机
// 3.1 直连方式;
await channel.assertExchange(exchange, 'topic', { durable: false });

let i = 0;
while(i<1){
const index = random(3);
// 4. 消息发给交换机
channel.publish(exchange, keys[index], Buffer.from(msg));
console.log(`[x] Sent ${msg}-- ${keys[index]}`);
i++;
}
await sleep(1);
await connect.close();
process.exit(0);
})();

function sleep(time: number) {
return new Promise((resolve) => setTimeout(resolve, time*1000));
}

function random(max: number){
return Math.floor(Math.random() * Math.floor(max));
}
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

// receive_logs_topic.ts
import * as amqp from 'amqplib'
const url = `amqp://localhost`;

(async function receive(){
const connect = await amqp.connect(url);
const channel = await connect.createChannel();
const exchange = 'topic_logs';
await channel.assertExchange(exchange, 'topic', { durable: false });


const queueA = await channel.assertQueue('queue_topic_A');
// # 匹配多个单词
await channel.bindQueue(queueA.queue, exchange, '*.orange.*');

const queueB = await channel.assertQueue('queue_topic_B');
// * 可以替代一个单词
await channel.bindQueue(queueB.queue, exchange, '#');

channel.consume(queueA.queue, msg => {
console.log("队列AAAAA:", msg.content.toString())
}, { noAck: true });
channel.consume(queueB.queue, msg => {
console.log("队列BBBBBBBB:", msg.content.toString())
}, { noAck: true });
// await connect.close();
})();
 
1
2
3
4
5
6
7
8
9
10
# 执行
$ ts-node emit_log_topic.ts
[x] Sent hello world-- error
[x] Sent hello world-- info

$ ts-node receive_logs_topic.ts
队列AAAAA: hello world
队列BBBBBBBB: hello world

队列AAAAA: hello world
 
GitHub地址--[https://github.com/simuty/Node_Demo](https://github.com/simuty/Node_Demo)

参考

rabbitmq官网教程

 

全部评论: 0

    我有话说:

    Node&RabbitMQ系列五 保证投递

      网上又很多确保消息100%投递相关的文章,但总的来说还是分为几种情况的 发送端如何确保发送到交换机 消费端针对不同情况处理消息–后续再说 本文围绕发布者相关内容,目标

    Node&RabbitMQ系列二 延迟|死信队列

      前提 目前项目中采用ts+eggjs结合的方式,针对定时任务,采用schedule,随着业务的增多,觉得缺点啥,可能是缺消息队列吧。上一篇文章,针对rabbitmq的基本语法进行

    Node&RabbitMQ系列三 重连

      前提 前两篇对rabbitmq的基本概念与延迟队列、死信队列进行代码测试,默认都是理想情况的正常操作,针对复杂多变的网络环境,先不说投递的可靠性,首先服务的可用性是第一个拦路虎

    Node&RabbitMQ系列六 保证消费

        上篇文章主要以生产者角度:确保消息发出去,这篇文章主要以消费者的角度:确保处理对应的消息 Quonum Queue不适用场景适用场景代码实现RePublish

    Node&RabbitMQ系列四 X场景解构

    目标 先设想购票的场景:用户注册成功后,三天内未登陆,则短信提醒。 三段式场景: A-B-C可以延伸至: 发起购票—>时间+未付款—>短信提醒⏰、取消订单 发起退款—>时间+未处理—>通知相关人员⏰ 总的来说,满足A...

    Node模块之Events模块(五)

    Node模块之Events模块(五)

    Node异步式I/O和异步式编程(三)

     Node.js 最大的特点是异步式 I/O(或者非阻塞I/O)与事件紧密结合的编程模式。 第一部分: I/O 1.阻塞I/O与非阻塞I/O概念 1.1阻塞I/O(同步I/O) 线程在

    Nodejs视频服务器 切片ffmpeg

    Node 视频服务器 切片ffmpeg

    Node实战篇:Express--jade模板引擎(七)

    Jade(Pug) — Node Template Engine,一个高性能的模板引擎,专为 Node 而做......

    为什么要使用 Node.js?这几点你必须知道!

    经过这几年的发展,前端普遍进入技术深水区,只会Web页面开发已经难以满足企业的需求,Node逐渐成为刚性技能。 但Node在业务上的使用还没有那么普及,有的时候想用老板还不同意,本文将从4个角度

    NodeJS 10.5.0 中的线程:实用介绍

    几天前,Node.js版本10.5.0发布,其中包含的主要功能之一是添加线程支持。

    Node模块之fs模块(六)

    屏幕快照 2017-08-08 上午10.53.21.png 第一部分 概述 Node.js 提供一组类似UNIX(POSIX)标准的文件操作API,Node.js中操作文件的模块是fs(File

    Node实战篇:Express路由(三)

    Express 是一个基于 Node.js 平台的极简、灵活的 web 应用开发框架,它提供一系列强大的特性,帮助你创建各种 Web 和移动设备应用。

    这样学习正则表达式轻松

    在日常工作中,经常会用到正则操作。但是对于大多数人来说,操作正则表达式简直是抓瞎。本篇文章主要整理正则表

    2018年8个技巧来构建更好的Node.js应用程序

    2018年8个技巧来构建更好的Node.js应用程序

    Node 模块之 util URL queryString path(八)

    第一部分 util util是一个Node.js核心模块,util模块设计的主要目的是为了满足Node内部API的需求。其中包括:格式化字符串、对象的序列化、实现对象继承等常用方法。要使用util

    Node实战篇:使用joi来验证数据模型 (十)

    Joi 是 hapijs 自带的数据校验模块,他已经高度封装常用的校验功能,本文是介绍如何优雅地使用 joi 对数据进行校验......

    Nodejs神兽之--Promise

    言必信,行必果。 没毛病,这是Promise!

    Node初识与配置安装(一)

    JavaScript 发展的一个重要里程碑,标志动态...