软件编程
位置:首页>> 软件编程>> java编程>> springboot2.5.6集成RabbitMq实现Topic主题模式(推荐)

springboot2.5.6集成RabbitMq实现Topic主题模式(推荐)

作者:Scarlet-Max  发布时间:2021-10-03 22:01:22 

标签:springboot,RabbitMq,Topic,主题模式

1.application.yml


server:
 port: 8184
spring:
 application:
   name: rabbitmq-demo
 rabbitmq:
   host: 127.0.0.1 # ip地址
   port: 5672
   username: admin # 连接账号
   password: 123456 # 连接密码
   template:
     retry:
       enabled: true # 开启失败重试
       initial-interval: 10000ms # 第一次重试的间隔时长
       max-interval: 300000ms # 最长重试间隔,超过这个间隔将不再重试
       multiplier: 2 # 下次重试间隔的倍数,此处是2即下次重试间隔是上次的2倍
     exchange: topic.exchange # 缺省的交换机名称,此处配置后,发送消息如果不指定交换机就会使用这个
   publisher-confirm-type: correlated # 生产者确认机制,确保消息会正确发送,如果发送失败会有错误回执,从而触发重试
   publisher-returns: true
   listener:
     type: simple
     simple:
       acknowledge-mode: manual
       prefetch: 1 # 限制每次发送一条数据。
       concurrency: 3 # 同一个队列启动几个消费者
       max-concurrency: 3 # 启动消费者最大数量
       # 重试策略相关配置
       retry:
         enabled: true # 是否支持重试
         max-attempts: 5
         stateless: false
         multiplier: 1.0 # 时间策略乘数因子
         initial-interval: 1000ms
         max-interval: 10000ms
       default-requeue-rejected: true

2.pom.xml引入依赖


<!-- rabbitmq -->
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-amqp</artifactId>
       </dependency>

3.常量类创建


/**
* @author kkp
* @ClassName RabbitMqConstants
* @date 2021/11/3 14:16
* @Description
*/
public class RabbitMqConstants {
   public final static String TEST1_QUEUE = "test1-queue";

public final static String TEST2_QUEUE = "test2-queue";

public final static String EXCHANGE_NAME = "test.topic.exchange";
   /**
    * routingKey1
    */
   public final static String TOPIC_TEST1_ROUTINGKEY = "topic.test1.*";

public final static String TOPIC_TEST1_ROUTINGKEY_TEST = "topic.test1.test";
   /**
    * routingKey1
    */
   public final static String TOPIC_TEST2_ROUTINGKEY = "topic.test2.*";

public final static String TOPIC_TEST2_ROUTINGKEY_TEST = "topic.test2.test";
}

4.配置Configuration


import com.example.demo.common.RabbitMqConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

/**
* @author kkp
* @ClassName RabbitMqConfig
* @date 2021/11/3 14:16
* @Description
*/
@Slf4j
@Configuration
public class RabbitMqConfig {
   @Autowired
   private CachingConnectionFactory connectionFactory;

/**
    *  声明交换机
    */
   @Bean(RabbitMqConstants.EXCHANGE_NAME)
   public Exchange exchange(){
//durable(true) 持久化,mq重启之后交换机还在
       // Topic模式
       //return ExchangeBuilder.topicExchange(RabbitMqConstants.EXCHANGE_NAME).durable(true).build();
       //发布订阅模式
       return ExchangeBuilder.fanoutExchange(RabbitMqConstants.EXCHANGE_NAME).durable(true).build();
   }

/**
    *  声明队列
    *  new Queue(QUEUE_EMAIL,true,false,false)
    *  durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列
    *  auto-delete 表示消息队列没有在使用时将被自动删除 默认是false
    *  exclusive  表示该消息队列是否只在当前connection生效,默认是false
    */
   @Bean(RabbitMqConstants.TEST1_QUEUE)
   public Queue esQueue() {
       return new Queue(RabbitMqConstants.TEST1_QUEUE);
   }

/**
    *  声明队列
    */
   @Bean(RabbitMqConstants.TEST2_QUEUE)
   public Queue gitalkQueue() {
       return new Queue(RabbitMqConstants.TEST2_QUEUE);
   }

/**
    *  TEST1_QUEUE队列绑定交换机,指定routingKey
    */
   @Bean
   public Binding bindingEs(@Qualifier(RabbitMqConstants.TEST1_QUEUE) Queue queue,
                            @Qualifier(RabbitMqConstants.EXCHANGE_NAME) Exchange exchange) {
       return BindingBuilder.bind(queue).to(exchange).with(RabbitMqConstants.TOPIC_TEST1_ROUTINGKEY).noargs();
   }

/**
    *  TEST2_QUEUE队列绑定交换机,指定routingKey
    */
   @Bean
   public Binding bindingGitalk(@Qualifier(RabbitMqConstants.TEST2_QUEUE) Queue queue,
                                @Qualifier(RabbitMqConstants.EXCHANGE_NAME) Exchange exchange) {
       return BindingBuilder.bind(queue).to(exchange).with(RabbitMqConstants.TOPIC_TEST2_ROUTINGKEY).noargs();
   }

/**
    * 如果需要在生产者需要消息发送后的回调,
    * 需要对rabbitTemplate设置ConfirmCallback对象,
    * 由于不同的生产者需要对应不同的ConfirmCallback,
    * 如果rabbitTemplate设置为单例bean,
    * 则所有的rabbitTemplate实际的ConfirmCallback为最后一次申明的ConfirmCallback。
    * @return
    */
   @Bean
   @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
   public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
       return template;
   }
}

5.Rabbit工具类创建


import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.UUID;
/**
* @author kkp
* @ClassName RabbitMqUtils
* @date 2021/11/3 14:21
* @Description
*/
@Slf4j
@Component
public class RabbitMqUtils implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{

private RabbitTemplate rabbitTemplate;

/**
    * 构造方法注入
    */
   @Autowired
   public RabbitMqUtils(RabbitTemplate rabbitTemplate) {
       this.rabbitTemplate = rabbitTemplate;
       //这是是设置回调能收到发送到响应
       rabbitTemplate.setConfirmCallback(this);
       //如果设置备份队列则不起作用
       rabbitTemplate.setMandatory(true);
       rabbitTemplate.setReturnCallback(this);
   }

/**
    * 回调确认
    */
   @Override
   public void confirm(CorrelationData correlationData, boolean ack, String cause) {
       if(ack){
           log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
       }else{
           log.info("消息发送失败:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
       }
   }

/**
    * 消息发送到转换器的时候没有对列,配置了备份对列该回调则不生效
    * @param message
    * @param replyCode
    * @param replyText
    * @param exchange
    * @param routingKey
    */
   @Override
   public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
       log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
   }

/**
    * 发送到指定Queue
    * @param queueName
    * @param obj
    */
   public void send(String queueName, Object obj){
       CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
       this.rabbitTemplate.convertAndSend(queueName, obj, correlationId);
   }

/**
    * 1、交换机名称
    * 2、routingKey
    * 3、消息内容
    */
   public void sendByRoutingKey(String exChange, String routingKey, Object obj){
       CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
       this.rabbitTemplate.convertAndSend(exChange, routingKey, obj, correlationId);
   }
}

6.service创建


public interface TestService {

String sendTest1(String content);

String sendTest2(String content);
}

7.impl实现


import com.example.demo.common.RabbitMqConstants;
import com.example.demo.util.RabbitMqUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @author kkp
* @ClassName TestServiceImpl
* @date 2021/11/3 14:24
* @Description
*/
@Service
@Slf4j
public class TestServiceImpl implements TestService {

@Autowired
   private RabbitMqUtils rabbitMqUtils;

@Override
   public String sendTest1(String content) {
       rabbitMqUtils.sendByRoutingKey(RabbitMqConstants.EXCHANGE_NAME,
               RabbitMqConstants.TOPIC_TEST1_ROUTINGKEY_TEST, content);
       log.info(RabbitMqConstants.TOPIC_TEST1_ROUTINGKEY_TEST+"***************发送成功*****************");
       return "发送成功!";
   }

@Override
   public String sendTest2(String content) {
       rabbitMqUtils.sendByRoutingKey(RabbitMqConstants.EXCHANGE_NAME,
               RabbitMqConstants.TOPIC_TEST2_ROUTINGKEY_TEST, content);
       log.info(RabbitMqConstants.TOPIC_TEST2_ROUTINGKEY_TEST+"***************发送成功*****************");
       return "发送成功!";
   }
}

8.监听类


import com.example.demo.common.RabbitMqConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;

/**
* @author kkp
* @ClassName RabbitMqListener
* @date 2021/11/3 14:22
* @Description
*/

@Slf4j
@Component
public class RabbitMqListener {

@RabbitListener(queues = RabbitMqConstants.TEST1_QUEUE)
   public void test1Consumer(Message message, Channel channel) {
       try {
           //手动确认消息已经被消费
           channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
           log.info("Counsoum1消费消息:" + message.toString() + "。成功!");
       } catch (Exception e) {
           e.printStackTrace();
           log.info("Counsoum1消费消息:" + message.toString() + "。失败!");
       }
   }

@RabbitListener(queues = RabbitMqConstants.TEST2_QUEUE)
   public void test2Consumer(Message message, Channel channel) {
       try {
           //手动确认消息已经被消费
           channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
           log.info("Counsoum2消费消息:" + message.toString() + "。成功!");
       } catch (Exception e) {
           e.printStackTrace();
           log.info("Counsoum2消费消息:" + message.toString() + "。失败!");
       }
   }

}

9.Controller测试


import com.example.demo.server.TestService;
import jdk.nashorn.internal.objects.annotations.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.util.Map;

/**
* @author kkp
* @ClassName TestController
* @date 2021/11/3 14:25
* @Description
*/
@Slf4j
@RestController
@RequestMapping("/enterprise")
public class TestController {

@Autowired
   private TestService testService;

@GetMapping("/finance")
   public String hello3(@RequestParam(required = false) Map<String, Object> params) {
       return testService.sendTest2(params.get("entId").toString());
   }
   /**
    * 发送消息test2
    * @param content
    * @return
    */
   @PostMapping(value = "/finance2")
   public String sendTest2(@RequestBody String content) {
       return testService.sendTest2(content);
   }

}

springboot2.5.6集成RabbitMq实现Topic主题模式(推荐)

来源:https://blog.csdn.net/weixin_44907173/article/details/121124048

0
投稿

猜你喜欢

手机版 软件编程 asp之家 www.aspxhome.com