软件编程
位置:首页>> 软件编程>> java编程>> Springboot 整合 RocketMQ 收发消息的配置过程

Springboot 整合 RocketMQ 收发消息的配置过程

作者:liqiangbk  发布时间:2023-01-22 22:49:28 

标签:Springboot,整合,RocketMQ,收发消息

Springboot 整合 RocketMQ 收发消息

创建springboot项目

pom.xml添加rocketmq-spring-boot-starter依赖。


<dependency>
   <groupId>org.apache.rocketmq</groupId>
   <artifactId>rocketmq-spring-boot-starter</artifactId>
   <version>2.1.0</version>
</dependency>

yml 配置

application.yml


rocketmq:
 name-server: 192.168.64.141:9876

application-demo1.yml

使用 demo1 profile 指定生产者组组名


rocketmq:
 producer:
   group: producer-demo1

application-demo2.yml

使用 demo2 profile 指定生产者组组名


rocketmq:
 producer:
   group: producer-demo2

测试

demo 1

  • 发送普通消息

  • 发送 Spring 的通用 Message 对象

  • 发送异步消息

  • 发送顺序消息

生产者


package cn.tedu.demo2.m1;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class Producer {
   @Autowired
   private RocketMQTemplate t ;
   public  void send(){
       //发送同步消息
       t.convertAndSend("Topic1:TagA", "Hello world! ");
       //发送spring的Message
       Message<String> message = MessageBuilder.withPayload("Hello Spring message! ").build();
       t.send("Topic1:TagA",message);
       //发送异步消息
       t.asyncSend("Topic1:TagA", "hello world asyn", new SendCallback() {
           @Override
           public void onSuccess(SendResult sendResult) {
               System.out.println("发送成功");
           }
           @Override
           public void onException(Throwable throwable) {
               System.out.println("发送失败");
           }
       });
       //发送顺序消息
       t.syncSendOrderly("Topic1", "98456237,创建", "98456237");
       t.syncSendOrderly("Topic1", "98456237,支付", "98456237");
       t.syncSendOrderly("Topic1", "98456237,完成", "98456237");
   }
}

消费者


package cn.tedu.demo2.m1;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "Topic1",consumerGroup = "consumer-demo1")
public class Consumer  implements RocketMQListener<String> {
   @Override
   public void onMessage(String s) {
       System.out.println("收到"+s);
   }
}

主类


package cn.tedu.demo2.m1;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Main {
   public static void main(String[] args) {
       SpringApplication.run(Main.class, args);
   }
}

测试类

需要放在 test 文件夹

激活 demo1 profile  @ActiveProfiles("demo1")


package cn.tedu.demo2.m1;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
@SpringBootTest
@ActiveProfiles("demo1")
public class Test1 {
   @Autowired
   private  Producer producer;
   @Test
   public void test1(){
       producer.send();
       try {
           Thread.sleep(5000);
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
   }
}

demo 2

发送事务消息

生产者


package cn.tedu.demo2.m2;

import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Component

public class Producer {

@Autowired
   private RocketMQTemplate t;

public void send(){
       Message<String> message = MessageBuilder.withPayload("Hello world").build();
       //一旦发送消息,则执行 *
       t.sendMessageInTransaction("Topic2",message,null);
   }
   @RocketMQTransactionListener
   class Lis implements RocketMQLocalTransactionListener {
       @Override
       public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
           System.out.println("执行本地事务");
           return RocketMQLocalTransactionState.UNKNOWN;
       }

@Override
       public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
           System.out.println("执行事务回查");
           return RocketMQLocalTransactionState.COMMIT;
       }
   }

}

消费者


package cn.tedu.demo2.m2;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "Topic2",consumerGroup = "consumer-demo2")
public class Consumer implements RocketMQListener<String> {
   @Override
   public void onMessage(String s) {
       System.out.println("收到"+s);
   }
}

主类


package cn.tedu.demo2.m2;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Main {
   public static void main(String[] args) {
       SpringApplication.run(Main.class, args);
   }
}

测试类


package cn.tedu.demo2.m2;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
@SpringBootTest
@ActiveProfiles("demo2")
public class Test2 {
   @Autowired
   private  Producer producer;
   @Test
   public void  test1(){
       producer.send();
       //为了能够收到消费者消费的数据,这里通过休眠模拟等待时间
       try {
           Thread.sleep(30000);
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
   }
}

来源:https://www.cnblogs.com/liqbk/p/13677137.html

0
投稿

猜你喜欢

手机版 软件编程 asp之家 www.aspxhome.com