RabbitMQ使用教程(五)如何保证队列里的消息99.99%被消费?

程序员修炼之路 2019-05-31 16:22:30 ⋅ 653 阅读

1. 前言

截止目前,我们能够保证消息成功地被生产者发送到RabbitMQ服务器,也能保证RabbitMQ服务器发生异常(重启,宕机等)后消息不会丢失,也许你认为现在消息应该很安全了吧?其实还不够安全,不信你接着往下看。

2. 本篇概要

其实,还有1种场景需要考虑:当消费者接收到消息后,还没处理完业务逻辑,消费者挂掉了,那消息也算丢失了?,比如用户下单,订单中心发送了1个消息到RabbitMQ里的队列,积分中心收到这个消息,准备给这个下单的用户增加20积分,但积分还没增加成功呢,积分中心自己挂掉了,导致数据出现问题。

那么如何解决这种问题呢?

为了保证消息被消费者成功的消费,RabbitMQ提供了消息确认机制(message acknowledgement),本文主要讲解RabbitMQ中,如何使用消息确认机制来保证消息被消费者成功的消费,避免因为消费者突然宕机而引起的消息丢失。

3. 开启显式Ack模式

在第1篇博客RabbitMQ使用教程(一)RabbitMQ环境安装配置及Hello World示例中,我们开启一个消费者的代码是这样的:

// 创建队列消费者
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException
{
String message = new String(body, "UTF-8");
System.out.println("Received Message '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);

这里的重点是channel.basicConsume(QUEUE_NAME, true, consumer);方法的第2个参数,让我们先看下basicConsume()的源码:

public String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException {
return this.basicConsume(queue, autoAck, "", callback);
}

这里的autoAck参数指的是是否自动确认,如果设置为ture,RabbitMQ会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者接收到消息是否处理成功;如果设置为false,RabbitMQ会等待消费者显式的回复确认信号后才会从内存(或者磁盘)中删除。

建议将autoAck设置为false,这样消费者就有足够的时间处理消息,不用担心处理消息过程中消费者宕机造成消息丢失。

此时,队列里的消息就分成了2个部分:

  1. 等待投递给消费者的消息(下图中的Ready部分)

  2. 已经投递给消费者,但是还没有收到消费者确认信号的消息(下图中的Unacked部分)

如果RabbitMQ一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则RabbitMQ会安排该消息重新进入队列,等待投递给下一个消费者,当然也有可能还是原来的那个消费者。

RabbitMQ不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开,这么设计的原因是RabbitMQ允许消费者消费一条消息的时间可以很久很久。

为了便于理解,我们举个具体的例子,生产者的话的我们延用上文中的DurableProducer:

package com.zwwhnly.springbootaction.rabbitmq.durable;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class DurableProducer {
private final static String EXCHANGE_NAME = "durable-exchange";
private final static String QUEUE_NAME = "durable-queue";

public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
// 设置 RabbitMQ 的主机名
factory.setHost("localhost");
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel();
// 创建一个Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

// 发送消息
String message = "durable exchange test";
AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(2).build();
channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());

// 关闭频道和连接
channel.close();
connection.close();
}
}

然后新建一个消费者AckConsumer类:

package com.zwwhnly.springbootaction.rabbitmq.ack;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class AckConsumer {
private final static String QUEUE_NAME = "durable-queue";

public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
// 设置 RabbitMQ 的主机名
factory.setHost("localhost");
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel();
// 创建队列消费者
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException
{
String message = new String(body, "UTF-8");
int result = 1 / 0;
System.out.println("Received Message '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}

我们先将autoAck参数设置为ture,即自动确认,并在消费消息时故意写个异常,然后先运行生产者客户端将消息写入队列中,然后运行消费者客户端,发现消息未消费成功但是却消失了:

然后我们将autoAck设置为false:

channel.basicConsume(QUEUE_NAME, false, consumer);

再次运行生产者客户端将消息写入队列中,然后运行消费者客户端,此时虽然消费者客户端仍然代码异常,但是消息仍然在队列中:

然后我们删除掉消费者客户端中的异常代码,重新启动消费者客户端,发现消息消费成功了,但是消息一直未Ack:

手动停掉消费者客户端,发现消息又到了Ready状态,准备重新投递:

之所以消费掉消息,却一直还是Unacked状态,是因为我们没在代码中添加显式的Ack代码:

String message = new String(body, "UTF-8");
//int result = 1 / 0;
System.out.println("Received Message '" + message + "'");

long deliveryTag = envelope.getDeliveryTag();
channel.basicAck(deliveryTag, false);

deliveryTag可以看做消息的编号,它是一个64位的长整形值。

此时运行消费者客户端,发现消息消费成功,并且在队列中被移除:

4. 源码

源码地址:https://github.com/zwwhnly/springboot-action.git,欢迎下载。

作者:申城异乡人  出处:https://www.cnblogs.com/zwwhnly/


---------------END----------------

后续的内容同样精彩

长按关注“IT实战联盟”哦




全部评论: 0

    我有话说:

    Node&RabbitMQ系列六 保证消费

        上篇文章主要以生产者角度:确保消息发出去了,这篇文章主要以消费者角度:确保处理了对应消息 Quonum Queue不适用场景适用场景代码实现RePublish

    Node&RabbitMQ系列 保证投递

      网上又很多确保消息100%投递相关文章,但总来说还是分为几种情况 发送端如何确保发送到了交换机 消费端针对不同情况处理消息–后续再说 本文围绕发布者相关内容,目标

    Node&RabbitMQ系列二 延迟|死信队列

      前提 目前项目中采用ts+eggjs结合方式,针对定时任务,采用schedule,随着业务增多,觉得缺点啥,可能就是缺消息队列吧。上一篇文章,针对rabbitmq基本语法进行了

    消息队列常见问题(二):消息队列产生大量消息堆积怎么解决?

    上一节列举了生产上消息队列产生大量消息堆积会有哪些后果,那相对应解决方法有哪些呢?1、消息丢弃情况如果要实现防止消息过期问题,最好不要设置过期时间!那设置了过期时间导致消息丢失怎么补救呢?答案

    消息队列常见问题(一):生产上消息队列产生大量消息堆积会有什么后果?

    大多数消息堆积原因是Consumer出现了问题,并且没有运维/开发监控到即使修复问题,导致大量消息都积压在 MQ 中,那么会造成哪些后果呢?1、消息丢弃例如 RabbitMQ一条消息设置

    Springboot项目redisTemplate实现轻量级消息队列

    redisTemplate实现轻量级消息队列,代码奉上

    Node rabbitmq 入门就够了

      消息中间件 消息队列中间件(Message Queue Middleware, 简称为 MQ)是指利用高效可靠消息传递机制进行与平台无关数据交流, 并基于数据通信来进行分布式系统

    Node&RabbitMQ系列三 重连

      前提 前两篇对rabbitmq基本概念与延迟队列、死信队列进行了代码测试,默认都是理想情况正常操作,针对复杂多变网络环境,先不说投递可靠性,首先服务可用性就是第一个拦路虎

    腾讯自研高吞吐消息队列组件TubeMQ升级为 TubeHub

    TubeMQ简介 TubeMQ 项目始于 2013 年,是腾讯自研高吞吐消息队列组件。项目团队于 2019 年将 TubeMQ 捐赠给 Apache 基金会,成为腾讯首个 Apache 基金会

    『黑科技』开源 IP 地址定位库 ip2region,99.9%准确率

    ip2region 是什么?ip2region 是准确率 99.9% IP 地址定位库,0.0x毫秒级查

    CherryTree 0.99.17 发布

    CherryTree 0.99.17 现已发布。CherryTree 是一个支持无限层级分类笔记软件,Python 编写,支持富文本编辑和代码高亮,支持 Linux 和

    SourceTreeGitFlow使用

    使用,比较担心是管理工作稍微繁琐一点。操作倒不复...

    程序员笔记 CherryTree 0.99.28 发布

    CherryTree 0.99.28 现已发布。CherryTree 是一个支持无限层级分类笔记软件,Python 编写,支持富文本编辑和代码高亮,支持 Linux 和 Windows

    程序员笔记 CherryTree 0.99.32 发布

    CherryTree 0.99.32 现已发布。CherryTree 是一个支持无限层级分类笔记软件,Python 编写,支持富文本编辑和代码高亮,支持 Linux 和 Windows

    Node&RabbitMQ系列四 X场景解构

    目标 先设想购票场景:用户注册成功后,三天内未登陆,则短信提醒。 三段式场景: A-B-C可以延伸至: 发起购票—>时间+未付款—>短信提醒⏰、取消订单 发起退款—>时间+未

    HHVM 4.93 发布,Facebook 开发开源虚拟机

    HHVM 4.93 正式发布,这个版本发布就意味着 4.84 版本不再继续支持与维护;HHVM 4.86–4.92,以及 4.56 和 4.80 LTS 还会继续获得支持。 HHVM(HipHop