# engine-starter-rocketmq 使用教程
> 这是一个基于 `rocketmq-spring-boot-starter` 的扩展项目
## 特性
- [X] `YmRocketMqClient` 消息发送工具类
## 快速开始
- **注意:** `qifu-saas-parent >= 2.0.0-SNAPSHOT`
### 添加依赖
```xml
com.yuanmeng.engine
engine-starter-rocketmq
2.0.0-SNAPSHOT
```
### 配置 RocketMQ
```yaml
rocketmq:
name-server: rocketmq.qifu.com:30938
producer:
group: qifu-saas-xxx
namespace: dev
# 发送消息超时时间,默认3000
sendMessageTimeout: 10000
consumer:
group: qifu-saas-xxx
namespace: dev
# 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值
pull-batch-size: 10
```
### 创建需要的 Topic 和 ConsumerGroup
- Rocketmq 的 topic 和 consumer group 都需要手动创建
- 通过 RocketMQ 控制台创建
## 工具使用
### 发送消息
```java
@RestController
@RequestMapping("/demo")
public class DemoController {
@GetMapping("/send")
public String send() {
// 发送消息,会自动添加 SkyWalking 链路追踪信息
DemoMessage message = new DemoMessage()
SendResult result = YmRocketMqClient.sendSync("demo-topic", "demo-tag", message);
return "发送成功:" + result;
}
}
```
### 消费消息
```java
@Component
@RocketMQMessageListener(
topic = "demo-topic",
selectorExpression = "demo-tag",
consumerGroup = "${yuanmeng.rocketmq.consumer-group.demo:demo-consumer-group}"
)
public class DemoConsumer implements RocketMQListener {
@Override
public void onMessage(MessageExt message) {
DemoMessage demoMessage = JSONObject.parseObject(message.getBody(), DemoMessage.class);
log.info("处理消费消息结果:{}, tag = {}, messageId = {}", demoMessage, message.getTags(), message.getMsgId());
// 处理业务逻辑...
}
}
```
## 完整配置
```yaml
yuanmeng:
rocketmq:
enable: true
```