博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
SpringBoot2.0 整合 RocketMQ ,实现请求异步处理
阅读量:6162 次
发布时间:2019-06-21

本文共 8943 字,大约阅读时间需要 29 分钟。

一、RocketMQ

1、架构图片

SpringBoot2.0 整合 RocketMQ ,实现请求异步处理

2、角色分类

(1)、Broker

RocketMQ 的核心,接收 Producer 发过来的消息、处理 Consumer 的消费消息请求、消息的持 久化存储、服务端过滤功能等 。

(2)、NameServer

消息队列中的状态服务器,集群的各个组件通过它来了解全局的信息 。类似微服务中注册中心的服务注册,发现,下线,上线的概念。

热备份:

NamServer可以部署多个,相互之间独立,其他角色同时向多个NameServer 机器上报状态信息。

心跳机制:

NameServer 中的 Broker、 Topic等状态信息不会持久存储,都是由各个角色定时上报并存储到内存中,超时不上报的话, NameServer会认为某个机器出故障不可用。

(3)、Producer

消息的生成者,最常用的producer类就是DefaultMQProducer。

(4)、Consumer

消息的消费者,常用Consumer类

DefaultMQPushConsumer
收到消息后自动调用传入的处理方法来处理,实时性高
DefaultMQPullConsumer
用户自主控制 ,灵活性更高。

3、通信机制

(1)、Broker启动后需要完成一次将自己注册至NameServer的操作;随后每隔30s时间定时向NameServer更新Topic路由信息。

(2)、Producer发送消息时候,需要根据消息的Topic从本地缓存的获取路由信息。如果没有则更新路由信息会从NameServer重新拉取,同时Producer会默认每隔30s向NameServer拉取一次路由信息。

(3)、Consumer消费消息时候,从NameServer获取的路由信息,并再完成客户端的负载均衡后,监听指定消息队列获取消息并进行消费。

二、代码实现案例

1、项目结构图

SpringBoot2.0 整合 RocketMQ ,实现请求异步处理

版本描述

2.1.3.RELEASE
4.3.0

2、配置文件

rocketmq:  # 生产者配置  producer:    isOnOff: on    # 发送同一类消息的设置为同一个group,保证唯一    groupName: FeePlatGroup    # 服务地址    namesrvAddr: 10.1.1.207:9876    # 消息最大长度 默认1024*4(4M)    maxMessageSize: 4096    # 发送消息超时时间,默认3000    sendMsgTimeout: 3000    # 发送消息失败重试次数,默认2    retryTimesWhenSendFailed: 2  # 消费者配置  consumer:    isOnOff: on    # 官方建议:确保同一组中的每个消费者订阅相同的主题。    groupName: FeePlatGroup    # 服务地址    namesrvAddr: 10.1.1.207:9876    # 接收该 Topic 下所有 Tag    topics: FeePlatTopic~*;    consumeThreadMin: 20    consumeThreadMax: 64    # 设置一次消费消息的条数,默认为1条    consumeMessageBatchMaxSize: 1# 配置 Group  Topic  Tagfee-plat:  fee-plat-group: FeePlatGroup  fee-plat-topic: FeePlatTopic  fee-account-tag: FeeAccountTag

3、生产者配置

import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * RocketMQ 生产者配置 */@Configurationpublic class ProducerConfig {    private static final Logger LOG = LoggerFactory.getLogger(ProducerConfig.class) ;    @Value("${rocketmq.producer.groupName}")    private String groupName;    @Value("${rocketmq.producer.namesrvAddr}")    private String namesrvAddr;    @Value("${rocketmq.producer.maxMessageSize}")    private Integer maxMessageSize ;    @Value("${rocketmq.producer.sendMsgTimeout}")    private Integer sendMsgTimeout;    @Value("${rocketmq.producer.retryTimesWhenSendFailed}")    private Integer retryTimesWhenSendFailed;    @Bean    public DefaultMQProducer getRocketMQProducer() {        DefaultMQProducer producer;        producer = new DefaultMQProducer(this.groupName);        producer.setNamesrvAddr(this.namesrvAddr);        //如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName        if(this.maxMessageSize!=null){            producer.setMaxMessageSize(this.maxMessageSize);        }        if(this.sendMsgTimeout!=null){            producer.setSendMsgTimeout(this.sendMsgTimeout);        }        //如果发送消息失败,设置重试次数,默认为2次        if(this.retryTimesWhenSendFailed!=null){            producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);        }        try {            producer.start();        } catch (MQClientException e) {            e.printStackTrace();        }        return producer;    }}

4、消费者配置

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.consumer.ConsumeFromWhere;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import javax.annotation.Resource;/** * RocketMQ 消费者配置 */@Configurationpublic class ConsumerConfig {    private static final Logger LOG = LoggerFactory.getLogger(ConsumerConfig.class) ;    @Value("${rocketmq.consumer.namesrvAddr}")    private String namesrvAddr;    @Value("${rocketmq.consumer.groupName}")    private String groupName;    @Value("${rocketmq.consumer.consumeThreadMin}")    private int consumeThreadMin;    @Value("${rocketmq.consumer.consumeThreadMax}")    private int consumeThreadMax;    @Value("${rocketmq.consumer.topics}")    private String topics;    @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")    private int consumeMessageBatchMaxSize;    @Resource    private RocketMsgListener msgListener;    @Bean    public DefaultMQPushConsumer getRocketMQConsumer(){        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);        consumer.setNamesrvAddr(namesrvAddr);        consumer.setConsumeThreadMin(consumeThreadMin);        consumer.setConsumeThreadMax(consumeThreadMax);        consumer.registerMessageListener(msgListener);        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);        consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);        try {            String[] topicTagsArr = topics.split(";");            for (String topicTags : topicTagsArr) {                String[] topicTag = topicTags.split("~");                consumer.subscribe(topicTag[0],topicTag[1]);            }            consumer.start();        }catch (MQClientException e){            e.printStackTrace();        }        return consumer;    }}

5、消息监听配置

import com.rocket.queue.service.impl.ParamConfigService;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.common.message.MessageExt;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;import org.springframework.util.CollectionUtils;import javax.annotation.Resource;import java.util.List;/** * 消息消费监听 */@Componentpublic class RocketMsgListener implements MessageListenerConcurrently {    private static final Logger LOG = LoggerFactory.getLogger(RocketMsgListener.class) ;    @Resource    private ParamConfigService paramConfigService ;    @Override    public ConsumeConcurrentlyStatus consumeMessage(List
list, ConsumeConcurrentlyContext context) { if (CollectionUtils.isEmpty(list)){ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } MessageExt messageExt = list.get(0); LOG.info("接受到的消息为:"+new String(messageExt.getBody())); int reConsume = messageExt.getReconsumeTimes(); // 消息已经重试了3次,如果不需要再次消费,则返回成功 if(reConsume ==3){ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } if(messageExt.getTopic().equals(paramConfigService.feePlatTopic)){ String tags = messageExt.getTags() ; switch (tags){ case "FeeAccountTag": LOG.info("开户 tag == >>"+tags); break ; default: LOG.info("未匹配到Tag == >>"+tags); break; } } // 消息消费成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}

6、配置参数绑定

import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Service;@Servicepublic class ParamConfigService {    @Value("${fee-plat.fee-plat-group}")    public String feePlatGroup ;    @Value("${fee-plat.fee-plat-topic}")    public String feePlatTopic ;    @Value("${fee-plat.fee-account-tag}")    public String feeAccountTag ;}

7、消息发送测试

import com.rocket.queue.service.FeePlatMqService;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.springframework.stereotype.Service;import javax.annotation.Resource;@Servicepublic class FeePlatMqServiceImpl implements FeePlatMqService {    @Resource    private DefaultMQProducer defaultMQProducer;    @Resource    private ParamConfigService paramConfigService ;    @Override    public SendResult openAccountMsg(String msgInfo) {        // 可以不使用Config中的Group        defaultMQProducer.setProducerGroup(paramConfigService.feePlatGroup);        SendResult sendResult = null;        try {            Message sendMsg = new Message(paramConfigService.feePlatTopic,                                          paramConfigService.feeAccountTag,                                         "fee_open_account_key", msgInfo.getBytes());            sendResult = defaultMQProducer.send(sendMsg);        } catch (Exception e) {            e.printStackTrace();        }        return sendResult ;    }}

本文的重点是你有没有收获与成长,其余的都不重要,希望读者们能谨记这一点。同时我经过多年的收藏目前也算收集到了一套完整的学习资料,包括但不限于:分布式架构、高可扩展、高性能、高并发、Jvm性能调优、Spring,MyBatis,Nginx源码分析,Redis,ActiveMQ、、Mycat、Netty、Kafka、Mysql、Zookeeper、Tomcat、Docker、Dubbo、Nginx等多个知识点高级进阶干货,希望对想成为架构师的朋友有一定的参考和帮助

需要详细架构师思维导图和以下资料的可以加一下技术交流分享群:“708 701 457”免费获取

SpringBoot2.0 整合 RocketMQ ,实现请求异步处理

SpringBoot2.0 整合 RocketMQ ,实现请求异步处理
SpringBoot2.0 整合 RocketMQ ,实现请求异步处理

转载于:https://blog.51cto.com/14230003/2405833

你可能感兴趣的文章
MySQL Online DDL 方案剖析
查看>>
java源码 - ReentrantLock之FairSync
查看>>
Java入门系列-24-实现网络通信
查看>>
Nginx 配置
查看>>
eclipse错误及解决方法
查看>>
WPF自适应可关闭的TabControl 类似浏览器的标签页
查看>>
程序员技能图谱分享--极客时间
查看>>
iOS开发之通过代理逆向传值
查看>>
每天学点SpringCloud(八):使用Apollo做配置中心
查看>>
Jmockit单元测试MockUp调用原始方法
查看>>
顺风车Android性能优化之View布局优化
查看>>
Windows 7 延长支持服务价格曝光:一台电脑最低25美元
查看>>
谷歌发布自然问答数据集 Natural Questions
查看>>
高通-物理专线2.0新版资费计划
查看>>
Python3基础——字典、其他常用操作
查看>>
Jenkins 教程(一)实现自动化打包及邮件通知
查看>>
敬业签电脑手机云同步便签及安卓手机和苹果手机云同步桌面便签
查看>>
六大核心!突破自动驾驶和智慧交通的必由之路
查看>>
数据科学即将迎来“无代码”时代
查看>>
阿里云成立技术脱贫联盟,要用技术助力脱贫
查看>>