All checks were successful
Publish to Confluence / confluence (push) Successful in 1m0s
2.6 KiB
2.6 KiB
engine-starter-rocketmq 使用教程
这是一个基于
rocketmq-spring-boot-starter
的扩展项目
特性
YmRocketMqClient
消息发送工具类
快速开始
- 注意:
qifu-saas-parent >= 2.0.0-SNAPSHOT
添加依赖
<dependency>
<groupId>com.yuanmeng.engine</groupId>
<artifactId>engine-starter-rocketmq</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
配置 RocketMQ
rocketmq:
name-server: rocketmq.qifu.com:30938
producer:
group: qifu-saas-xxx
namespace: dev
# 发送消息超时时间,默认3000
sendMessageTimeout: 10000
consumer:
group: qifu-saas-xxx
namespace: dev
# 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值
pull-batch-size: 10
创建需要的 Topic 和 ConsumerGroup
- Rocketmq 的 topic 和 consumer group 都需要手动创建
- 通过 RocketMQ 控制台创建
工具使用
发送消息
@RestController
@RequestMapping("/demo")
public class DemoController {
@GetMapping("/send")
public String send() {
// 发送消息,会自动添加 SkyWalking 链路追踪信息
DemoMessage message = new DemoMessage()
SendResult result = YmRocketMqClient.sendSync("demo-topic", "demo-tag", message);
return "发送成功:" + result;
}
}
消费消息
@Component
@RocketMQMessageListener(
topic = "demo-topic",
selectorExpression = "demo-tag",
consumerGroup = "${yuanmeng.rocketmq.consumer-group.demo:demo-consumer-group}"
)
public class DemoConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
DemoMessage demoMessage = JSONObject.parseObject(message.getBody(), DemoMessage.class);
log.info("处理消费消息结果:{}, tag = {}, messageId = {}", demoMessage, message.getTags(), message.getMsgId());
// 处理业务逻辑...
}
}
完整配置
yuanmeng:
rocketmq:
enable: true