上篇文章主要以生产者角度:确保消息发出去了
,这篇文章主要以消费者的角度:确保处理了对应的消息
Quonum Queue不适用场景适用场景代码实现RePublish
处理包含几层含义
-
成功
ack
; -
失败
nack
-requeue:true
orfalse
; -
扔到
死信队列
试想以下场景
消费者收到消息,业务执行
异常
,重试N次
,如果N次内成功--[ack]; 否则丢弃
或扔到死信队列
;
github上问了该问题,博主的回答
-
Use a quorum queue (requires RabbitMQ 3.8.0 or higher) and specify a redelivery limit
-
Count message redeliveries in an external store (you need a unique way to identify the message if you do this)
-
Instead of nacking the message, re-publish it with a new header which counts the number of redeliveries, then acknowledge the original. This can lead to duplicate messages if you application crashes between the publish and the acknowledgment however
-
quorum-queue: 设置delivery-limit;
-
重新publish;
-
生成header头retrynumbuer;
-
发布到原来的交换机
-
丢弃消息
-
比如采用redis记录重试次数,
Quonum Queue
RabbitMQ--Quorum Queues
Quorum Queues 也是RabbitMQ的队列类型,自RabbitMQ 3.8.0起可用。默认生成的queue type 是 Classic;
-
宗旨为:将数据安全放在首位
-
Classic VS Quorum
Feature | Classic Mirrored | Quorum |
---|---|---|
Non-durable queues <可以非持久> | yes | no |
Exclusivity | yes | no |
Per message persistence | per message | always |
Membership changes | automatic | manual |
TTL | yes | no |
Queue length limits | yes | partial (drop-head strategy only) |
Lazy behaviour | yes | partial (see Memory Limit) |
Message priority<优先级> | yes | no |
Consumer priority<优先级> | yes | no |
Dead letter exchanges | yes | yes |
Adheres to policies<策略> | yes | partial (dlx, queue length limits) |
Reacts to memory alarms | yes | partial (truncates log) |
Poison message handling | no | yes |
Global QoS Prefetch | yes | no |
Poison message handling: 翻译成异常消息处理,估计没啥问题。
不适用场景
-
临时队列;
-
低延迟;
-
数据安全性没那么高的
-
非常长的队列---因为quonum一直将数据存储在内存,如可以最大的消息数量、限制消息体的大小。
适用场景
-
对数据安全性要求高的, 例如销售系统中的接收订单或选举系统中的投票,其中可能丢失的消息将对系统的正确性和功能产生重大影响。
-
发布者确认收到,消费者手动ack, 配合quonum保证数据安全性。
代码实现
// send.ts
import * as amqp from 'amqplib'
(async function publish() {
const url = `amqp://localhost:5672`;
// 默认交换机-队列-路由
const exchange = '6.delivery.limit.exchange';
const queueExName = '6.delivery.limit.queue';
const routingKey = '6.delivery.limit.routerkey';
const connect = await amqp.connect(url);
const channel = await connect.createChannel();
// 默认交换机
await channel.assertExchange(exchange, 'direct', { durable: false });
// 发布消息
channel.publish(exchange, routingKey, Buffer.from('hello world'), {
expiration: 3000
});
await sleep(1);
await connect.close();
process.exit(0);
})();
function sleep(time: number) {
return new Promise((resolve) => setTimeout(resolve, time*1000));
}
import * as amqp from 'amqplib'
/**
* 消费者 确认消息
*
* 1. 消费者重试3次,《采用quonum queue + delivery-limit: 3》
* 2.
*/
(async function consumer() {
// 交换机-路由
const url = `amqp://localhost:5672`;
// 死信交换机-路由
const deadLetterExchange = '6.dlx.exchange';
const deadLetterRoutingKey = '6.dlx.routingkey'
const deadLetterQueue = '6.dlx.queue'
// 默认交换机-队列-路由
const exchange = '6.delivery.limit.exchange';
const queueExName = '6.delivery.limit.queue';
const routingKey = '6.delivery.limit.routerkey';
const connect = await amqp.connect(url);
const channel = await connect.createChannel();
await channel.assertExchange(exchange, 'direct', { durable: false });
// 死信队列相关,为了web ui 展示数据
await channel.assertExchange(deadLetterExchange, 'direct', { durable: false });
const queueDL = await channel.assertQueue(deadLetterQueue)
await channel.bindQueue(queueDL.queue, deadLetterExchange, deadLetterRoutingKey);
const queueEX = await channel.assertQueue(queueExName, {
exclusive: false,
deadLetterExchange,
deadLetterRoutingKey,
arguments: {
'x-queue-type': 'quorum',
'x-delivery-limit': 3
}
});
await channel.bindQueue(queueEX.queue, exchange, routingKey);
await channel.consume(queueEX.queue, msg => {
console.log("消费队列", msg);
channel.nack(msg, false, true)
});
})();
➜ 6Consumer git:(main) ✗ ts-node receive.ts
消费队列 { fields:
{ consumerTag: 'amq.ctag-EKH-xj09f47rsz3EqX8e7Q',
deliveryTag: 1,
redelivered: false,
exchange: '6.delivery.limit.exchange',
routingKey: '6.delivery.limit.routerkey' },
properties:
{ contentType: undefined,
contentEncoding: undefined,
# 这里
headers: {},
deliveryMode: undefined,
priority: undefined,
correlationId: undefined,
replyTo: undefined,
expiration: '3000',
messageId: undefined,
timestamp: undefined,
type: undefined,
userId: undefined,
appId: undefined,
clusterId: undefined },
content: <Buffer 68 65 6c 6c 6f 20 77 6f 72 6c 64> }
消费队列 { fields:
{ consumerTag: 'amq.ctag-EKH-xj09f47rsz3EqX8e7Q',
# 看这里
deliveryTag: 2,
redelivered: true,
exchange: '6.delivery.limit.exchange',
routingKey: '6.delivery.limit.routerkey' },
properties:
{ contentType: undefined,
contentEncoding: undefined,
# 这里
headers: { 'x-delivery-count': 1 },
deliveryMode: undefined,
priority: undefined,
correlationId: undefined,
replyTo: undefined,
expiration: '3000',
messageId: undefined,
timestamp: undefined,
type: undefined,
userId: undefined,
appId: undefined,
clusterId: undefined },
content: <Buffer 68 65 6c 6c 6f 20 77 6f 72 6c 64> }
消费队列 { fields:
{ consumerTag: 'amq.ctag-EKH-xj09f47rsz3EqX8e7Q',
deliveryTag: 3,
redelivered: true,
exchange: '6.delivery.limit.exchange',
routingKey: '6.delivery.limit.routerkey' },
properties:
{ contentType: undefined,
contentEncoding: undefined,
headers: { 'x-delivery-count': 2 },
deliveryMode: undefined,
priority: undefined,
correlationId: undefined,
replyTo: undefined,
expiration: '3000',
messageId: undefined,
timestamp: undefined,
type: undefined,
userId: undefined,
appId: undefined,
clusterId: undefined },
content: <Buffer 68 65 6c 6c 6f 20 77 6f 72 6c 64> }
RePublish
注意:本文归作者所有,未经作者允许,不得转载