架构实战篇(九):Spring Boot 集成 RocketMQ

编写代码边撸猫 2018-04-24 13:36:54 ⋅ 933 阅读

Apache RocketMQ作为阿里开源的一款高性能、高吞吐量的分布式消息中间件。本文快速入门,RocketMQ消息系统的安装部署,发送,和接收消息,监控消息,的详细说明。

环境需要

64位操作系统,建议使用Linux / Unix /

  • CentOs7.3

  • 64bit JDK 1.8+

  • Maven 3.2.x

  • Git 1.8.3.1

环境安装

请参考我的另一篇文章

搭建 Apache RocketMQ 单机环境

http://www.ymq.io/2018/02/01/RocketMQ-install

新加项目

新建一个 maven 项目,这里就不详细操作了,大家都会的

不过也可以下载我的示例源码,下载地址如下

GitHub 源码:https://github.com/souyunku/spring-boot-examples/tree/master/spring-boot-rocketmq

添加依赖

在POM 中添加如下依赖

 
  1. <!-- RocketMq客户端相关依赖 -->

  2. <dependency>

  3.    <groupId>org.apache.rocketmq</groupId>

  4.    <artifactId>rocketmq-client</artifactId>

  5.    <version>4.1.0-incubating</version>

  6. </dependency>

  7. <dependency>

  8.    <groupId>org.apache.rocketmq</groupId>

  9.    <artifactId>rocketmq-common</artifactId>

  10.    <version>4.1.0-incubating</version>

  11. </dependency>

配置文件

在配置文件 application.properties 添加一下内容

 
  1. # 消费者的组名

  2. apache.rocketmq.consumer.PushConsumer=PushConsumer

  3. # 生产者的组名

  4. apache.rocketmq.producer.producerGroup=Producer

  5. # NameServer地址

  6. apache.rocketmq.namesrvAddr=192.168.252.121:9876

消息生产者

 
  1. @Component

  2. public class Producer {

  3.    /**

  4.     * 生产者的组名

  5.     */

  6.    @Value("${apache.rocketmq.producer.producerGroup}")

  7.    private String producerGroup;

  8.    /**

  9.     * NameServer 地址

  10.     */

  11.    @Value("${apache.rocketmq.namesrvAddr}")

  12.    private String namesrvAddr;

  13.    @PostConstruct

  14.    public void defaultMQProducer() {

  15.        //生产者的组名

  16.        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);

  17.        //指定NameServer地址,多个地址以 ; 隔开

  18.        producer.setNamesrvAddr(namesrvAddr);

  19.        try {

  20.            /**

  21.             * Producer对象在使用之前必须要调用start初始化,初始化一次即可

  22.             * 注意:切记不可以在每次发送消息时,都调用start方法

  23.             */

  24.            producer.start();

  25.            for (int i = 0; i < 100; i++) {

  26.                String messageBody = "我是消息内容:" + i;

  27.                String message = new String(messageBody.getBytes(), "utf-8");

  28.                //构建消息

  29.                Message msg = new Message("PushTopic" /* PushTopic */, "push"/* Tag  */, "key_" + i /* Keys */, message.getBytes());

  30.                //发送消息

  31.                SendResult result = producer.send(msg);

  32.                System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());

  33.            }

  34.        } catch (Exception e) {

  35.            e.printStackTrace();

  36.        } finally {

  37.            producer.shutdown();

  38.        }

  39.    }

  40. }

消息消费者

 
  1. @Component

  2. public class Consumer {

  3.    /**

  4.     * 消费者的组名

  5.     */

  6.    @Value("${apache.rocketmq.consumer.PushConsumer}")

  7.    private String consumerGroup;

  8.    /**

  9.     * NameServer地址

  10.     */

  11.    @Value("${apache.rocketmq.namesrvAddr}")

  12.    private String namesrvAddr;

  13.    @PostConstruct

  14.    public void defaultMQPushConsumer() {

  15.        //消费者的组名

  16.        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);

  17.        //指定NameServer地址,多个地址以 ; 隔开

  18.        consumer.setNamesrvAddr(namesrvAddr);

  19.        try {

  20.            //订阅PushTopic下Tag为push的消息

  21.            consumer.subscribe("PushTopic", "push");

  22.            //设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费

  23.            //如果非第一次启动,那么按照上次消费的位置继续消费

  24.            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

  25.            consumer.registerMessageListener(new MessageListenerConcurrently() {

  26.                @Override

  27.                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {

  28.                    try {

  29.                        for (MessageExt messageExt : list) {

  30.                            System.out.println("messageExt: " + messageExt);//输出消息内容

  31.                            String messageBody = new String(messageExt.getBody(), "utf-8");

  32.                            System.out.println("消费响应:Msg: " + messageExt.getMsgId() + ",msgBody: " + messageBody);//输出消息内容

  33.                        }

  34.                    } catch (Exception e) {

  35.                        e.printStackTrace();

  36.                        return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试

  37.                    }

  38.                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功

  39.                }

  40.            });

  41.            consumer.start();

  42.        } catch (Exception e) {

  43.            e.printStackTrace();

  44.        }

  45.    }

  46. }

启动服务

 
  1. @SpringBootApplication

  2. public class SpringBootRocketmqApplication {

  3.    public static void main(String[] args) {

  4.        SpringApplication.run(SpringBootRocketmqApplication.class, args);

  5.    }

  6. }

控制台会有响应

 
  1. 发送响应:MsgId:0AFF015E556818B4AAC208A0504F0063,发送状态:SEND_OK

  2. messageExt: MessageExt [queueId=0, storeSize=195, queueOffset=113824, sysFlag=0, bornTimestamp=1517559124047, bornHost=/192.168.252.1:62165, storeTimestamp=1517559135052, storeHost=/192.168.252.121:10911, msgId=C0A8FC7900002A9F00000000056F499C, commitLogOffset=91179420, bodyCRC=1687852546, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=113825, KEYS=key_99, CONSUME_START_TIME=1517559124049, UNIQ_KEY=0AFF015E556818B4AAC208A0504F0063, WAIT=true, TAGS=push}, body=21]]

  3. 消费响应:Msg: 0AFF015E556818B4AAC208A0504F0063,msgBody: 我是消息内容:99

  4. ...

监控服务

RocketMQ web界面监控RocketMQ-Console-Ng部署

https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console

下载并且编译

下载并且 maven 编译

 
  1. git clone https://github.com/apache/rocketmq-externals.git

  2. rocketmq-externals/rocketmq-console/

  3. mvn clean package -Dmaven.test.skip=true

启动监控服务

rocketmq.config.namesrvAddr NameServer 地址,默认启动端口8080

 
  1. nohup java -jar target/rocketmq-console-ng-1.0.0.jar --rocketmq.config.namesrvAddr=127.0.0.1:9876

访问监控服务

GitHub 源码:https://github.com/souyunku/spring-boot-examples/tree/master/spring-boot-rocketmq

Gitee 源码:https://gitee.com/souyunku/spring-boot-examples/tree/master/spring-boot-rabbitmq

关注我们

如果需要源码可以关注“IT实战联盟”公众号并留言也可以加入交流群和作者互撩哦~~~

合作伙伴

  • 作者:鹏磊

  • 出处:http://www.ymq.io/2018/02/02/spring-boot-rocketmq-example

  • Email:admin@souyunku.com

  • 版权归作者所有,转载请注明出处

  • Wechat:关注公众号,搜云库,专注于开发技术的研究与知识分享


全部评论: 0

    我有话说:

    微服务架构实战(六):Spring boot2.x 集成阿里大鱼短信接口详解与Demo

    Spring boot2.x 集成阿里大鱼短信接口,发送短信验证码及短信接口详解。

    架构实战(十一):Spring Boot 集成企业级搜索引擎 SolrCloud

    Solr是以Lucene为基础实现的文本检索应用服务。Solr部署方式有单机方式、多机Master-Slaver方式、Cloud方式。

    架构实战(十七):Spring Boot Assembly 整合 thymeleaf

    如何让服务器上的 sprig boot 项目升级变的方便快捷

    架构实战(三)-Spring Boot架构搭建RESTful API案例

    之前分享了Spring Boot 整合Swagger 让API可视化和前后端分离架构 受到了大家一致好评 ,本节就接着上节的代码做了详细的查询代码的补充和完善并搭建RESTful API架构案例。

    架构实战(七):Spring Boot Data JPA 快速入门

    Spring Data JPA 是Spring Data 的一个子项目,它通过提供基于JPA的Repository极大了减少了操作JPA的代码。

    架构实战(六):Spring Boot RestTemplate的使用

    RestTemplate是Spring提供的用于访问Rest服务的客户端,RestTemplate提供了多种便捷访问远程Http服务的方法,能够大大提高客户端的编写效率。

    架构实战(一)-Spring Boot+MyBatis基础架构搭建

    Spring的追求一定是简单点简单点,让java的开发变得更加简单、容易。瞧瞧的告诉你们直接copy就能用哦~~~

    架构实战(十五):Spring Boot 解耦之事件驱动

    通过使用spring 事件来解决业务代码的耦合

    架构实战(十):Spring Boot 集成 Dubbo

    Dubbo是阿里巴巴SOA服务化治理方案的核心框架,一个分布式服务框架,致力于提供高性能和透明化的RPC远程服务调用方案。

    码云推荐:一个优秀的分布式spring boot/Spring Cloud API限流框架,特别适合微服务架构

    一个优秀的分布式spring boot/Spring Cloud API限流框架,特别适合微服务架构.

    SpringBoot+zk+dubbo架构实践(二):SpringBoot 集成 zookeeper

    不啰嗦,本完成两件事:1、搭建SpringBoot 框架;2、基于spring boot框架访问zookeeper。

    架构实战(八):Spring Boot 集成 Druid 数据源监控

    Druid是目前最好的数据库连接池,在功能、性能、扩展性方面,都超过其他数据库连接池,包括DBCP、C3P0、BoneCP、Proxool、JBoss DataSource。

    架构实战(十四):Spring Boot 多缓存实战

    多场景下的不同缓存策略解决方案

    微服务架构学习笔记:gRPC Spring Boot Starter 2.2.0 发布,及使用步骤

    gRPC Spring Boot Starter 项目是一个 gRPC 的 Spring Boot 模块。内嵌一个 gRPC Server 对外提供服务,并支持 Spring Cloud 的服务发现

    架构实战(十三):Spring Boot Logback 邮件通知

    日志对于应用程序来说是非常重要的,当你的程序报错了,而你又不知道是多么可怕的一件事情,本文使用logback把程序报错信息邮件到开发者

    架构实战(四):Spring Boot整合 Thymeleaf

    Thymeleaf 是一种模板语言。那模板语言或模板引擎是什么?