1、前文
最近项目中使用消息队列,好处是可以解耦、削峰、异步,具体的大家可以查一下详细说明哈,话不多说,开始上代码。
2、引用依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
3、配置yml
server: port: 9204 spring: application: name: test-rabbitmq rabbitmq: host: 127.0.0.1 port: 15672 username: guest password: guest virtualHost: /
4、配置Config类,此处最为关键
package com.ccse.semiphysical.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @Author: lizj * @CreateTime: 2023-12-26 14:31 * @Description: TODO */ @Configuration public class RabbitmqConfig { public static final String QUEUE_INFORM_ENVPARAMS = "queue_inform_envparams"; public static final String QUEUE_INFORM_ENVRETURN = "queue_inform_envreturn"; public static final String QUEUE_INFORM_CLOSETASK = "queue_inform_closetask"; public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform_env"; public static final String ROUTINGKEY_ENVPARAMS="inform.#.environmentalparams.#"; public static final String ROUTINGKEY_SEND="inform.environmentalparams"; public static final String ROUTINGKEY_ENVRETURN="inform.#.environmentalreturn.#"; public static final String ROUTINGKEY_CLOSETASK="inform.#.closetask.#"; public static final String ROUTINGKEY_RETURN="inform.environmentalreturn"; public static final String ROUTINGKEY_CLOSE_TASK="inform.closetask"; //声明交换机 @Bean(EXCHANGE_TOPICS_INFORM) public Exchange EXCHANGE_TOPICS_INFORM(){ //durable(true) 持久化,mq重启之后交换机还在 return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build(); } //声明QUEUE_INFORM_ENVPARAMS队列 @Bean(QUEUE_INFORM_ENVPARAMS) public Queue QUEUE_INFORM_ENVPARAMS(){ return new Queue(QUEUE_INFORM_ENVPARAMS); } //声明QUEUE_INFORM_ENVRETURN队列 @Bean(QUEUE_INFORM_ENVRETURN) public Queue QUEUE_INFORM_ENVRETURN(){ return new Queue(QUEUE_INFORM_ENVRETURN); } //声明QUEUE_INFORM_CLOSETASK队列 @Bean(QUEUE_INFORM_CLOSETASK) public Queue QUEUE_INFORM_CLOSETASK(){ return new Queue(QUEUE_INFORM_CLOSETASK); } //ROUTINGKEY_ENVPARAMS队列绑定交换机,指定routingKey @Bean public Binding BINDING_QUEUE_INFORM_ENVPARAMS(@Qualifier(QUEUE_INFORM_ENVPARAMS) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_ENVPARAMS).noargs(); } //ROUTINGKEY_ENVRETURN队列绑定交换机,指定routingKey @Bean public Binding BINDING_ROUTINGKEY_ENVRETURN(@Qualifier(QUEUE_INFORM_ENVRETURN) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_ENVRETURN).noargs(); } @Bean public Binding BINDING_ROUTINGKEY_CLOSETASK(@Qualifier(QUEUE_INFORM_CLOSETASK) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_CLOSETASK).noargs(); } }
5、生产者
@Autowired RabbitTemplate rabbitTemplate; //使用rabbitTemplate发送消息 //交换机名称,routikey,发送的消息 rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM, RabbitmqConfig.ROUTINGKEY_SEND, JSON.toJSONString(retrunData));
6、消费者
@RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_ENVPARAMS}) public void receive_envparams(@Payload String msg){ //此处省略业务代码 }