1、死信队列
1.1死信队列的概念
死信,顾名思义就是无法被消费的消息,生产者将消息投递到borker或者队列,消费者从队列取出消息消费,由于某些特殊的原因,导致队列中的某些消息无法被消费,如果这些消息没有得到处理,即变成了死信。
1.2死信消息的来源
- 1、消息过期
- 2、队列满了,无法在添加消息
- 3、消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.
1.3代码架构图和代码
代码架构图:
代码分析:
- 1、消息TTL过期 生产者:
public class Producer {
/**
* 功能说明: 生产消息
*
* @author yangxu
* @date 2022/2/20
*/
public class Producer {
// 普通交换机名称
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 死信消息,设置TTL时间 单位是毫秒
// AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
for (int i = 0; i < 10; i++) {
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());
}
}
}
消费者1:
/**
* 功能说明:
*
* @author yangxu
* @date 2022/2/20
*/
public class Consumer01 {
// 普通交换机名称
public static final String NORMAL_EXCHANGE = "normal_exchange";
// 死信交换机的名称
public static final String DEAD_EXCHANGE = "dead_exchange";
// 普通队列的名字
public static final String NORMAL_QUEUE = "normal_queue";
// 死信队列的名字
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 声明死信和普通交换机, 类型为direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 声明普通队列
Map<String,Object> arguments = new HashMap<>();
//过期时间 单位毫秒 也可以在发消息的一方设置
// arguments.put("x-message-ttl",10000);
// 队列长度限制
// arguments.put("x-max-length",6);
// 正常队列设置死信交换机
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
// 设置死信RoutingKey
arguments.put("x-dead-letter-routing-key","lisi");
channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
// ----------------------------------------------------------------------
//声明死信队列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
// 绑定, 普通交换机和普通队列
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
// 绑定 死信队列和死信交换机
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
// 消费消息的回调
DeliverCallback deliverCallback = (consumerTag, message)->{
String msg = new String(message.getBody(),"UTF-8");
if ("info5".equals(msg)){
// 拒绝消费消息
System.out.println("C1接收的消息是:"+msg+" 此消息是被拒绝的");
// 第二个参数 false:表示不放回队列
channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
}else{
System.out.println("C1接收的消息是:"+msg);
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
}
};
channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,(consumerTag)->{});
}
}
消费者2:
/**
* 功能说明:死信队列消费者
*
* @author yangxu
* @date 2022/2/20
*/
public class Consumer02 {
// 死信队列的名字
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 消费消息的回调
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println("C1接收的消息是"+new String(message.getBody(),"UTF-8"));
};
channel.basicConsume(DEAD_QUEUE,true,deliverCallback,(consumerTag)->{});
}
}
2、延迟队列
###2.1延迟队列的概念 延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。 ###2.2延迟队列的使用场景 1.订单在十分钟之内未支付则自动取消 2.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。 3.用户注册成功后,如果三天内没有登陆则进行短信提醒。 4.用户发起退款,如果三天内没有得到处理则通知相关运营人员。 5.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议 ###2.3设置TTL
- 1、针对每一条消息设置TTL
// 4 发送消息
for (int i = 0; i < 5; i++) {
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("10000")
.build();
String msg = "RabbitMQ: TTL message" + i;
channel.basicPublish(exchange , routingKey , properties , msg.getBytes());
}
- 2、设置队列的TTL
Map<String , Object> arguments = new HashMap<>();
arguments.put("x-message-ttl" , 10000);
channel.queueDeclare(queueName , true , false , false , arguments);
评论