编码
配置
在pom.xml引入rocketmq-client模块和rocketmq-common模块,选择合适的版本,这里我用的是3.1.4版本
https://github.com/apache/rocketmq-externals/tree/master/rocketmq-spring-boot-starter
<dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.1.4</version> </dependency> <dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-common</artifactId> <version>3.1.4</version> </dependency>
|
发送消息
- 发送消息比较简单,首先创建消息生产者,指定组;然后启动生产者;
- 当需要发消息时调用生产者的send()方法即可,消息对象需要指定主题、标签、主键、消息体内容等信息;通过send()方法的返回值同步判断是否发送成功;
- 发送失败将会自动重试,重试次数和超时时间可以在创建生产者时进行设置。
package cn.waterlu.test.rocketmq; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message;
public class TestRocketMq { @Test public void testProducer() { String groupName = "group_producer"; String nameServer = "10.10.10.163:9876"; int retryTimes = 3; long timeout = 10000;
DefaultMQProducer producer = new DefaultMQProducer(groupName); producer.setNamesrvAddr(nameServer); producer.setRetryTimesWhenSendFailed(retryTimes); producer.setSendMsgTimeout(timeout); producer.start(); Message message = new Message(topic, tag, key, body); SendResult sendResult = producer.send(message); SendStatus status = sendResult.getSendStatus(); if (status.equals(SendStatus.SEND_OK)) { logger.info("发送成功"); } else { logger.warn("发送失败"); } } }
|
接收消息
- 消费消息与生产消息类似,需要首先创建消费者,设置参数,最后启动消费者消费消息;
- 消费者和生产者一样需要指定NamerServer地址和消费组名称;
- 消费者启动前需要指定订阅的主题和标签,进行消息过滤;
- 消费者需要注册收到消息后的处理方法;
- 消费者分为Pull和Push两种模式,其本质都是拉去消息;
- Push模式把轮询过程封装了,对用户来说,感觉消息是被推送过来的;
- Pull模式用户需要自己拉起消息。
package cn.waterlu.test.rocketmq;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.ConsumeFromWhere;
public class TestRocketMq { @Test public void testConsumer() { String consumerGroupName = "group_consumer"; String nameServer = "10.10.10.163:9876"; DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroupName); consumer.setNamesrvAddr(nameServer); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.subscribe(topic, tags); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { MessageExt messageExt = list.get(0); String topic = messageExt.getTopic(); String tag = messageExt.getTags(); String messageID = messageExt.getMsgId(); String messageKey = messageExt.getKeys(); byte[] messageBody = messageExt.getBody(); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
consumer.start(); } }
|