RabbitMQ基础知识四

1、死信队列

1.1死信队列的概念

死信,顾名思义就是无法被消费的消息,生产者将消息投递到borker或者队列,消费者从队列取出消息消费,由于某些特殊的原因,导致队列中的某些消息无法被消费,如果这些消息没有得到处理,即变成了死信。

1.2死信消息的来源

  • 1、消息过期
  • 2、队列满了,无法在添加消息
  • 3、消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.

1.3代码架构图和代码

代码架构图:

代码分析:

  • 1、消息TTL过期 生产者:
public class Producer {
/**
 * 功能说明: 生产消息
 *
 * @author yangxu
 * @date 2022/2/20
 */
public class Producer {

  // 普通交换机名称
  public static final String NORMAL_EXCHANGE = "normal_exchange";

  public static void main(String[] args) throws Exception {
    Channel channel = RabbitMqUtils.getChannel();
    // 死信消息,设置TTL时间 单位是毫秒
//     AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();

    for (int i = 0; i < 10; i++) {
      String message = "info" + i;
      channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());
    }
  }
}

消费者1:

/**
 * 功能说明:
 *
 * @author yangxu
 * @date 2022/2/20
 */
public class Consumer01 {
  // 普通交换机名称
  public static final String NORMAL_EXCHANGE = "normal_exchange";
  // 死信交换机的名称
  public static final String DEAD_EXCHANGE = "dead_exchange";
  // 普通队列的名字
  public static final String NORMAL_QUEUE = "normal_queue";
  // 死信队列的名字
  public static final String  DEAD_QUEUE = "dead_queue";

  public static void main(String[] args) throws Exception {
    Channel channel = RabbitMqUtils.getChannel();
    // 声明死信和普通交换机, 类型为direct
    channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
    channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
    // 声明普通队列
    Map<String,Object> arguments = new HashMap<>();
    //过期时间 单位毫秒 也可以在发消息的一方设置
    // arguments.put("x-message-ttl",10000);

    // 队列长度限制
    // arguments.put("x-max-length",6);

    // 正常队列设置死信交换机
    arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
    // 设置死信RoutingKey
    arguments.put("x-dead-letter-routing-key","lisi");
    channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);

    // ----------------------------------------------------------------------
    //声明死信队列
    channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

    // 绑定, 普通交换机和普通队列
    channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
    // 绑定 死信队列和死信交换机
    channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
    
    // 消费消息的回调
    DeliverCallback deliverCallback  = (consumerTag, message)->{
      String msg = new String(message.getBody(),"UTF-8");
      if ("info5".equals(msg)){
        // 拒绝消费消息
        System.out.println("C1接收的消息是:"+msg+" 此消息是被拒绝的");
        // 第二个参数 false:表示不放回队列
        channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
      }else{
        System.out.println("C1接收的消息是:"+msg);
        channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
      }
    };
    channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,(consumerTag)->{});
  }
}

消费者2:

/**
 * 功能说明:死信队列消费者
 *
 * @author yangxu
 * @date 2022/2/20
 */
public class Consumer02 {
  // 死信队列的名字
  public static final String  DEAD_QUEUE = "dead_queue";
  public static void main(String[] args) throws Exception {
    Channel channel = RabbitMqUtils.getChannel();
    // 消费消息的回调
    DeliverCallback deliverCallback  = (consumerTag, message)->{
      System.out.println("C1接收的消息是"+new String(message.getBody(),"UTF-8"));
    };
    channel.basicConsume(DEAD_QUEUE,true,deliverCallback,(consumerTag)->{});
  }
}

2、延迟队列

###2.1延迟队列的概念 延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。 ###2.2延迟队列的使用场景 1.订单在十分钟之内未支付则自动取消 2.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。 3.用户注册成功后,如果三天内没有登陆则进行短信提醒。 4.用户发起退款,如果三天内没有得到处理则通知相关运营人员。 5.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议 ###2.3设置TTL

  • 1、针对每一条消息设置TTL
// 4 发送消息
for (int i = 0; i < 5; i++) {
	AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
		.deliveryMode(2)
		.contentEncoding("UTF-8")
		.expiration("10000")
		.build();
	String msg = "RabbitMQ: TTL message" + i;
	channel.basicPublish(exchange , routingKey , properties , msg.getBytes());
}

  • 2、设置队列的TTL
Map<String , Object> arguments = new HashMap<>();
arguments.put("x-message-ttl" , 10000);
channel.queueDeclare(queueName , true , false , false , arguments);

end
  • 作者:旭仔(联系作者)
  • 发表时间:2022-03-19 22:50
  • 版权声明:自由转载-非商用-非衍生-保持署名
  • 转载声明:如果是转载栈主转载的文章,请附上原文链接
  • 公众号转载:请在文末添加作者公众号二维码(公众号二维码见右边,欢迎关注)
  • 评论