SpringBoot整合RocketMQ
条评论集成RocketMQ
基本实现
我们先来回顾一下前面提过的基本用法。
生产者
// 启动Producer,可共用 |
消费者
// 创建消息者 |
官方Starter
RocketMQ在2018年给出了官方集成Spring Boot实现 rocketmq-spring-boot-starter
用法如下:
- 引入pom
<dependency> |
- 配置
spring.rocketmq.name-server=192.168.75.159:9876 |
- 加注解
|
consumer
4j |
从文档和代码都可以看出,官方starter还处于起步阶段,目前只实现了consumer的消息触发机制,还没有封装producer,配置也非常简单,没有幂等性等处理,所以还是等到它成熟以后再用吧。
我的Starter
我做的starter在这里 ,目前比官方的功能多一些,也提供Test例子代码。
Github上readme写的挺详细的,这里就不再复述了,欢迎使用、打star、提issue。
下面说一下主要思路和重点。
Producer
先说生产者,生产者相对简单一些。思路就是创建一个单例的DefaultMQProducer,在系统启动时start(),以后@Autowired注入以后就可以使用了,非常简单。剩下的就是配置DefaultMQProducer参数的问题了。
DefaultMQProducer本身是支持异步消息发送机制的,这里我只使用了同步发送消息机制,默认重试3次。
Consumer
消费者要比生产者复杂,一个AbstractMQProducer对应一个DefaultMQProducer,但是因为Tag的存在,可能多个AbstractMQConsumer对应一个DefaultMQPushConsumer,如果看代码,这里是比较绕的地方。
首先,官方提供了DefaultMQPushConsumer和DefaultMQPullConsumer。虽然从名字上看,应该一个是push,另外一个是pull,但实际上都是pull。我选择的是DefaultMQPushConsumer,用上去更像push。
细心的人可能注意到,消息回调方法consumeMessage()的参数是List<MessageExt> list ,但我们是一个一个消息处理的,这样没有问题吗?现在没有问题是因为默认consumer.setConsumeMessageBatchMaxSize(1); 也就是说这个list的默认长度是1,只有1个元素,如果我们调整了consumeMessageBatchMaxSize参数,那么实现逻辑就需要修改了。
还有,ConsumeFromWhere也是非常重要的一个参数,我现在固定使用CONSUME_FROM_MAX_OFFSET,也就是从最新数据开始消费;实际上还支持CONSUME_FROM_FIRST_OFFSET和CONSUME_FROM_TIMESTAMP,也就是从头消费或者从固定时间点开始消费,现在没有实现。
最后就是扫描注解是如何实现的,其实就一行代码
Map<String, Object> beans = applicationContext.getBeansWithAnnotation(RocketMQConsumer.class); |