RabbitMQ基础知识五

SpringBoot和Rabbitmq整合

  • 1、引入依赖
<!--RabbitMQ 依赖-->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 2、修改配置文件
spring:
  rabbitmq:
    addresses: localhost # ip
    username: rabbitmq # 用户名
    password: rabbitmq # 密码
    publisher-confirms: true # 是否开启发布确认模式
    virtual-host: server  # 虚拟主机
    publisher-returns: true # 消息没有被路由到指定的queue时将消息返回,不丢弃
    listener:
      simple:
        acknowledge-mode: manual # 手工确认
        #最小消费者数量
        concurrency: 1
        #最大的消费者数量
        max-concurrency: 5

如果配置acknowledge-mode为manual则需要在消费消费的地方调用channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);进行确认

2、具体案例rabbtimq集合Springboot实现TTL

2.1集体架构

创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交换机 Y,它们的类型都是direct,创建一个死信队列 QD,它们的绑定关系如下:

2.2配置类

/*
 * 软件版权: 杨旭
 * 修改记录:
 * 修改日期  修改人员   修改说明
 * 2022/2/20   27440   新增
 */
package com.xuzaiya.springbootrabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * 功能说明:
 *
 * @author yangxu
 * @date 2022/2/20
 */
@Configuration
public class RabbitmqConfig {

  // 普通交换机的名称
  public static final String X_EXCHANGE = "X";

  // 死信交换机的名称
  public static final String Y_DEAD_LETTER_EXCHANGE = "Y";

  // 普通队列的名字
  public static final String QUEUE_A = "QA";
  public static final String QUEUE_B = "QB";


  // 死信队列的名称
  public static final String DEAD_LETTER_QUEUE = "QD";

  // 声明x交换机
  @Bean("xExchange")
  public DirectExchange xExchange(){
    return new DirectExchange(X_EXCHANGE);
  }

  // 声明y交换机
  @Bean("yExchange")
  public DirectExchange yExchange(){
    return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
  }

  // 声明QA队列
  @Bean("queueA")
  public Queue queueA(){

    Map<String,Object> arguments = new HashMap<>();
    // 设置死信队列
    arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
    // 设置死信RoutingKey
    arguments.put("x-dead-letter-routing-key","YD");
    // 设置TTL 单位是ms
    arguments.put("x-message-ttl",10000);

    return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
  }


  // 声明QB队列
  @Bean("queueB")
  public Queue queueB(){

    Map<String,Object> arguments = new HashMap<>();
    // 设置死信队列
    arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
    // 设置死信RoutingKey
    arguments.put("x-dead-letter-routing-key","YD");
    // 设置TTL 单位是ms
    arguments.put("x-message-ttl",40000);

    return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
  }



  //声明死信队列
  @Bean("queueD")
  public Queue queueD(){
    return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
  }

  // 绑定
  @Bean
  public Binding queueABindingXA(@Qualifier("queueA") Queue queueA,
                                 @Qualifier("xExchange") DirectExchange xExchange){
    return BindingBuilder.bind(queueA).to(xExchange).with("XA");

  }

  // 绑定
  @Bean
  public Binding queueABindingXB(@Qualifier("queueB") Queue queueB,
                                 @Qualifier("xExchange") DirectExchange xExchange){
    return BindingBuilder.bind(queueB).to(xExchange).with("XB");

  }

  // 绑定
  @Bean
  public Binding queueABindingYD(@Qualifier("queueD") Queue queueD,
                                 @Qualifier("yExchange") DirectExchange yExchange){
    return BindingBuilder.bind(queueD).to(yExchange).with("YD");

  }


}

生产者:

/*
 * 软件版权: 杨旭
 * 修改记录:
 * 修改日期  修改人员   修改说明
 * 2022/2/20   27440   新增
 */
package com.xuzaiya.springbootrabbitmq.controller;

import com.xuzaiya.springbootrabbitmq.config.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

/**
 * 功能说明:
 * 发送延迟消息
 * @author yangxu
 * @date 2022/2/20
 */
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMessageController {

  @Autowired
  private RabbitTemplate rabbitTemplate;

  //开始发消息
  @GetMapping("/sendMsg/{message}")
  public void sendMsg(@PathVariable String message){
    log.info("当前时间:{},发送一条信息给两个TTL队列:{}",new Date().toString(),message);
    rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10s的队列"+message);
    rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40s的队列"+message);
  }

}

消费者:

/*
 * 软件版权: 杨旭
 * 修改记录:
 * 修改日期  修改人员   修改说明
 * 2022/2/20   27440   新增
 */
package com.xuzaiya.springbootrabbitmq.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * 功能说明:
 * 队列TTL 的消费者
 * @author yangxu
 * @date 2022/2/20
 */
@Slf4j
@Component
public class DeadLetterQueueConsumer {
  //接收消息
  @RabbitListener(queues = "QD")
  public void receiveD(Message message, Channel channel){
    String msg = new String(message.getBody());
    log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),msg);
  }
}

2.3 延时队列的优化

在这里新增了一个队列 QC,绑定关系如下,该队列不设置TTL 时间,设置每条消息的延时间:

新增QC队列的配置:

// 新的普通队列,
  public static final String QUEUE_C = "QC";

// 声明QC队列
  @Bean("queueC")
  public Queue queueC(){
    Map<String,Object> arguments = new HashMap<>();
    // 设置死信交换机
    arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
    // 设置死信队列
    arguments.put("x-dead-letter-routing-key","YD");
    return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();

  }

// 绑定
  @Bean
  public Binding queueABindingX(@Qualifier("queueC") Queue queueC,
                                 @Qualifier("xExchange") DirectExchange xExchange){
    return BindingBuilder.bind(queueC).to(xExchange).with("XC");

  }

生产者:发送消息时,指定延时时间:

 //开始发消息
  @GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
  public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){
    log.info("当前时间:{},发送一条时长是:{}毫秒给队列QC:{}",new Date().toString(),ttlTime,message);
    rabbitTemplate.convertAndSend("X","XC",message,(msg -> {
      // 设置发送消息的延时时长
      msg.getMessageProperties().setExpiration(ttlTime);
      return msg;
    }));
  }

看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。 可以使用基于插件的延迟队列进行优化。

2.4 总结

延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。

end
  • 作者:旭仔(联系作者)
  • 发表时间:2022-03-27 10:15
  • 版权声明:自由转载-非商用-非衍生-保持署名
  • 转载声明:如果是转载栈主转载的文章,请附上原文链接
  • 公众号转载:请在文末添加作者公众号二维码(公众号二维码见右边,欢迎关注)
  • 评论