RocketMQ和SpringBoot的整合
POM 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 <dependency > <groupId > org.apache.rocketmq</groupId > <artifactId > rocketmq-spring-boot-starter</artifactId > <version > 2.0.3</version > </dependency > <dependency > <groupId > org.apache.rocketmq</groupId > <artifactId > rocketmq-client</artifactId > <version > 4.6.0</version > </dependency > <dependency > <groupId > joda-time</groupId > <artifactId > joda-time</artifactId > </dependency >
启动类
生产者和消费者的启动类都不需要为RocketMQ做更改
Producer 1 2 3 4 5 6 7 8 9 10 11 12 #application.yml #服务端口 server: port: 9001 #配置mq服务地址和producer.group名 rocketmq: name-server: localhost:9876 producer: group: producer send-message-timeout: 30000
Consumer 1 2 3 4 5 6 7 8 9 #application.yml #服务端口 server: port: 9002 #配置mq地址 rocketmq: name-server: localhost:9876
普通消息和延迟消息 Producer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public class ProducerSendTest { @Autowired private RocketMQTemplate rocketMQTemplate; @Test public void test01 () throws Exception { String msg = "message" ; rocketMQTemplate.convertAndSend("normal-message" , msg); } @Test public void test02 () throws Exception { System.out.println(new Date().toLocaleString() + "producer sent message" ); String msg = "message delay" ; GenericMessage message = new GenericMessage(msg); rocketMQTemplate.syncSend("normal-message" , message, 5000 , 4 ); } }
Consumer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Component @RocketMQMessageListener(consumerGroup = "consumer-group", topic = "normal-message") public class BaseMQListener implements RocketMQListener <String > { @Override public void onMessage (String s) { System.out.println(System.currentTimeMillis()); System.out.println("message received :" + s); } }
顺序消息
在业务中许多场景需要按照发送顺序来消费,比如创建转账 转账付款 操作完成的业务在消费时必须按顺序消费才正确
RocketMQ是通过将相同ID的消息发送到同一个队列,而一个队列的消息只由一个消费者处理来实现顺序消息 。
Producer
Producer端确保消息顺序唯一要做的事情就是将消息路由到特定的队列,在RocketMQ中,通过MessageQueueSelector来实现分区的选择。
需要自己编写消息选择队列的代码,这里用 orderid 对 队列总数 取模:long index = orderId % mqs.size();
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Test public void testOrderSend () { for (int i = 1 ; i <= 20 ; i++) { rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() { @Override public MessageQueue select (List<MessageQueue> list, org.apache.rocketmq.common.message.Message message, Object o) { Long hashKey = Long.valueOf(o.toString()); Long queueIndex = hashKey % list.size(); return list.get(queueIndex.intValue()); } }); rocketMQTemplate.syncSendOrderly("message-order" , i + "create" , i + "" ); rocketMQTemplate.syncSendOrderly("message-order" , i + "pay" , i + "" ); rocketMQTemplate.syncSendOrderly("message-order" , i + "completed" , i + "" ); } }
Consumer
ConsumeMode.ORDERLY :顺序消费模式 ConsumeMode.CONCURRENTLY:并发消费模式
1 2 3 4 5 6 7 8 9 @Component @RocketMQMessageListener(consumerGroup = "consumer-message-order",topic = "message-order",consumeMode = ConsumeMode.ORDERLY) public class OrderListener implements RocketMQListener <String > { @Override public void onMessage (String s) { System.out.println(Thread.currentThread()+"接收了消息,内容是:" +s); } }
广播消息 Producer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class ProducerSendTest { @Autowired private RocketMQTemplate rocketMQTemplate; @Test public void test01 () throws Exception { String msg = "message" ; rocketMQTemplate.convertAndSend("normal-message-broadcast" , msg); } }
Consumer
MessageModel.BROADCASTING:广播模式
MessageModel.CLUSTERING:集群模式
消费者1
1 2 3 4 5 6 7 8 9 10 11 12 13 @Component @RocketMQMessageListener(consumerGroup = "consumer-group-broadcasting", topic = "normal-message-broadcast",messageModel = MessageModel.BROADCASTING) public class SmsMQListener implements RocketMQListener <String > { @Override public void onMessage (String s) { System.out.println("listener 1 received:" + s); } }
消费者2
1 2 3 4 5 6 7 8 9 10 11 @Component @RocketMQMessageListener(consumerGroup = "consumer-group-broadcasting", topic = "normal-message-broadcast",messageModel = MessageModel.BROADCASTING) public class SmsMQListener implements RocketMQListener <String > { @Override public void onMessage (String s) { System.out.println("listener 2 received:" + s); } }
定时任务
启动类
1 2 3 4 5 6 7 8 9 @SpringBootApplication @EnableScheduling public class SpringTaskApplication { public static void main (String[] args) { SpringApplication.run(SpringTaskApplication.class); } }
定时任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Component public class MyJob { @Scheduled(cron = "0/1 * * * * ?") public void showTime () { try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { e.printStackTrace();} System.out.println(new Date().toLocaleString()); } }
参考
http://rocketmq.apache.org/docs/simple-example/ https://www.baeldung.com/apache-rocketmq-spring-boot