0%

RocketMQ和SpringBoot的整合1和定时任务

RocketMQ和SpringBoot的整合

POM

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.6.0</version>
</dependency>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>

启动类

生产者和消费者的启动类都不需要为RocketMQ做更改

Producer

1
2
3
4
5
6
7
8
9
10
11
12
#application.yml
#服务端口

server:
port: 9001

#配置mq服务地址和producer.group名
rocketmq:
name-server: localhost:9876
producer:
group: producer
send-message-timeout: 30000

Consumer

1
2
3
4
5
6
7
8
9
#application.yml
#服务端口

server:
port: 9002

#配置mq地址
rocketmq:
name-server: localhost:9876

普通消息和延迟消息

Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class ProducerSendTest {

@Autowired
private RocketMQTemplate rocketMQTemplate;

// 普通消息
@Test
public void test01() throws Exception {
// 准备消息
String msg = "message";
// 投递消息 此处的normal-message即为消费者处的topic 这两处要对应
rocketMQTemplate.convertAndSend("normal-message", msg);
}

// 延迟消息
@Test
public void test02() throws Exception {

System.out.println(new Date().toLocaleString() + "producer sent message");

// 准备消息
String msg = "message delay";
GenericMessage message = new GenericMessage(msg);

// 投递消息
// 预设值的延迟时间间隔为:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h
/*
参数一:topic 主题
参数二:消息内容
参数三:超时时间
参数四:延迟级别

此处例子是超时时间5秒,延迟时间30秒,即延迟级别4
与普通消息不同的地方为发送消息的类型要封装为GenericMessage,发送时的方法为syncSend,并需要指定超时时间和延迟级别
*/
rocketMQTemplate.syncSend("normal-message", message, 5000, 4);
}
}

Consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 普通消息监听器 
// 延时消息监听器与普通消息的一样,区别在于因为延时生产者会延迟发送所以消费者会延迟接收到
// consumerGroup: 每一个consumer实例都属于一个consumer group,每一条消息只会被同一个consumer group里的一个consumer实例消费。(不同consumer group可以同时消费同一条消息)
// topic要与生产者处的指定字段相对应
// 消息监听器要继承RocketMQListener并设置正确的泛型 此泛型是接收消息的对象类型
@Component
@RocketMQMessageListener(consumerGroup = "consumer-group", topic = "normal-message")
public class BaseMQListener implements RocketMQListener<String> {

@Override
public void onMessage(String s) {
System.out.println(System.currentTimeMillis());
System.out.println("message received :" + s);
}
}

顺序消息

  • 在业务中许多场景需要按照发送顺序来消费,比如创建转账 转账付款 操作完成的业务在消费时必须按顺序消费才正确
  • RocketMQ是通过将相同ID的消息发送到同一个队列,而一个队列的消息只由一个消费者处理来实现顺序消息 。

Producer

  • Producer端确保消息顺序唯一要做的事情就是将消息路由到特定的队列,在RocketMQ中,通过MessageQueueSelector来实现分区的选择。
  • 需要自己编写消息选择队列的代码,这里用 orderid 对 队列总数 取模:long index = orderId % mqs.size();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

//顺序消息
@Test
public void testOrderSend() {
for (int i = 1; i <= 20; i++) {
//消息队列选择器,作用就是将同一类型(hashkey相等)的消息放入同一个队列
rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {
//保证hashkey相等的消息取出的对应一致
@Override
public MessageQueue select(List<MessageQueue> list, org.apache.rocketmq.common.message.Message message, Object o) {
Long hashKey = Long.valueOf(o.toString());
Long queueIndex = hashKey % list.size();
return list.get(queueIndex.intValue());
}
});

rocketMQTemplate.syncSendOrderly("message-order", i + "create", i + "");
rocketMQTemplate.syncSendOrderly("message-order", i + "pay", i + "");
rocketMQTemplate.syncSendOrderly("message-order", i + "completed", i + "");
}
}

Consumer

ConsumeMode.ORDERLY :顺序消费模式
ConsumeMode.CONCURRENTLY:并发消费模式

1
2
3
4
5
6
7
8
9
@Component
@RocketMQMessageListener(consumerGroup = "consumer-message-order",topic = "message-order",consumeMode = ConsumeMode.ORDERLY)
public class OrderListener implements RocketMQListener<String> {

@Override
public void onMessage(String s) {
System.out.println(Thread.currentThread()+"接收了消息,内容是:"+s);
}
}

广播消息

Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ProducerSendTest {

@Autowired
private RocketMQTemplate rocketMQTemplate;

// 发送广播消息 广播消息生产者这里的代码和普通消息生产者一样
@Test
public void test01() throws Exception {
// 准备消息
String msg = "message";
// 投递消息 此处的normal-message-broadcast即为消费者处的topic 这两处要对应
rocketMQTemplate.convertAndSend("normal-message-broadcast", msg);
}
}

Consumer

MessageModel.BROADCASTING:广播模式

MessageModel.CLUSTERING:集群模式

消费者1

1
2
3
4
5
6
7
8
9
10
11
12
13

// 消费者1
// 广播消息监听器1
@Component
@RocketMQMessageListener(consumerGroup = "consumer-group-broadcasting", topic = "normal-message-broadcast",messageModel = MessageModel.BROADCASTING)
public class SmsMQListener implements RocketMQListener<String> {

@Override
public void onMessage(String s) {
System.out.println("listener 1 received:" + s);
}
}

消费者2

1
2
3
4
5
6
7
8
9
10
11
// 消费者2
// 广播消息监听器2
@Component
@RocketMQMessageListener(consumerGroup = "consumer-group-broadcasting", topic = "normal-message-broadcast",messageModel = MessageModel.BROADCASTING)
public class SmsMQListener implements RocketMQListener<String> {

@Override
public void onMessage(String s) {
System.out.println("listener 2 received:" + s);
}
}

定时任务

启动类

1
2
3
4
5
6
7
8
9

@SpringBootApplication
@EnableScheduling // 开启定时任务注解支持
public class SpringTaskApplication {
public static void main(String[] args) {
SpringApplication.run(SpringTaskApplication.class);
}
}

定时任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

@Component
public class MyJob {
/**
* 每间隔1秒钟,输出当前时间
* 这里的cron表达式的使用方法和Linux中的Crontab的使用方法一样
* initialDelay:该属性的作用是第一次执行延迟时间
* fixedDelay:上一次任务执行完后多久再执行,参数类型为 long,单位 ms
* fixedRate:按一定的时间频率执行任务,参数类型为 long,单位 ms
*
*/
// @Scheduled(fixedRate = 1000)
// @Scheduled(fixedDelay = 1000,initialDelay = 10000) // 表示项目启动后,第一个任务延迟到10秒后执行
@Scheduled(cron = "0/1 * * * * ?")
public void showTime() {
try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace();}
System.out.println(new Date().toLocaleString());
}
}

参考

http://rocketmq.apache.org/docs/simple-example/
https://www.baeldung.com/apache-rocketmq-spring-boot

Welcome to my other publishing channels