RabbitMQ五种工作模式
1、简单模式
简单模式的特点是一个生产者,一个队列,一个消费者,这种模式不需要进行任何的交换机的binding绑定。
工具类:
/**
* 功能说明:连接工厂创建信道的工具类
*
* @author yangxu
* @date 2022/2/19
*/
public class RabbitMqUtils {
//得到一个连接的 channel
public static Channel getChannel() throws Exception {
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("host"); //按照实际情况填写自己的ip
factory.setUsername("admin"); //按照实际情况填写自己的用户名
factory.setPassword("pwd"); //按照实际情况填写自己的密码
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
生产者:
/**
* 功能说明:生产者,发送消息
*
* @author yangxu
* @date 2022/2/19
*/
public class Producer {
// 队列名称
public static final String QUEUE_NAME="hello";
// 发送消息
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 工厂ip 连接rabbitmq的队列
factory.setHost("host");
// 用户名
factory.setUsername("admin");
// 密码
factory.setPassword("pwd");
// 创建连接
Connection connection = factory.newConnection();
// 获取信道
Channel channel = connection.createChannel();
/**
* 创建队列
* 1、队列名称
* 2、队列里面的消息是否持久化,默认存储在内存中
* 3、该队列是否只供一个消费者进行消费,是否进行消息共享,true可以多个消费者消费
* 4、是否自动删除,最后一个消费者断开连接以后,该队列是否自动删除,false不自动删除
* 5、其他参数
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 发消息
String message = "hello world"; // 初次使用
/**
* 发送一个消息
* 1、发送到那个交换机
* 2、路由的配置是那个 routingKey
* 3、其他消息参数
* 4、发送消息的消息体
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("消息发送完毕");
}
}
消费者:
/**
* 功能说明: 消费者 接收消息
*
* @author yangxu
* @date 2022/2/19
*/
public class Consumer {
// 队列名称
public static final String QUEUE_NAME="hello";
// 接收消息
public static void main(String[] args) throws Exception {
//创建连接工厂
Channel channel = RabbitMqUtils.getChannel();
// 声明 接收消息
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println(new String(message.getBody()));
};
//取消消息时的回调
CancelCallback cancelCallback = (consumerTag)->{
System.out.println("消费消息被中断");
};
/**
* 消费者消费消费
* 1、消费那个队列
* 2、消费成功之后是否要自动应答 true 代表自动应答
* 3、消费者成功的回调
* 4、消费者取消消费的回调
* */
channel.basicConsume(QUEUE_NAME, true, deliverCallback,cancelCallback);
}
}
2、Work模式
work模式的特点是一个生产这,一个队列,多个消费者,每个消费者获取到的消息是唯一的。
生产者:
/**
* 功能说明:work模式 生产者
*
* @author yangxu
* @date 2022/2/19
*/
public class Task01 {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
try {
Channel channel = RabbitMqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//从控制台当中接受信息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("发送消息完成:" + message);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
消费者1:
/**
* 功能说明: 消费者1
*
* @author yangxu
* @date 2022/2/19
*/
public class Worker01 {
// 队列的名称
public static final String QUEUE_NAME="hello";
//接收消息
public static void main(String[] args) throws Exception {
//获取信道
Channel channel = RabbitMqUtils.getChannel();
// 声明 接收消息
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println("C1接收到的消息:" + new String(message.getBody()));
};
//取消消息时的回调
CancelCallback cancelCallback = (consumerTag)->{
System.out.println("消费消息被中断");
};
System.out.println("C1等待接收消息");
channel.basicConsume(QUEUE_NAME, true,deliverCallback,cancelCallback);
}
}
消费者2:
/**
* 功能说明: 消费者2
*
* @author yangxu
* @date 2022/2/19
*/
public class Worker02 {
// 队列的名称
public static final String QUEUE_NAME="hello";
//接收消息
public static void main(String[] args) throws Exception {
//获取信道
Channel channel = RabbitMqUtils.getChannel();
// 声明 接收消息
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println("C2接收到的消息:" + new String(message.getBody()));
};
//取消消息时的回调
CancelCallback cancelCallback = (consumerTag)->{
System.out.println("消费消息被中断");
};
System.out.println("C2等待接收消息");
channel.basicConsume(QUEUE_NAME, true,deliverCallback,cancelCallback);
}
}
3、Fanout发布订阅模式
这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定。生产者发送的消息会被多个消费者获取。
生产者:
/**
* 功能说明: 发送消息
*
* @author yangxu
* @date 2022/2/20
*/
public class EmitLog {
// 交换机的名称
private static final String EXCHANGE_NAME="logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
Scanner scan = new Scanner(System.in);
while (scan.hasNext()){
String message = scan.next();
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
System.out.println("生产者发出消息:"+message);
}
}
}
消费者1
/**
* 功能说明:
*
* @author yangxu
* @date 2022/2/20
*/
public class ReceiveLogs01 {
// 交换机的名称
private static final String EXCHANGE_NAME="logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 声明一个临时队列
String queue = channel.queueDeclare().getQueue();
// 绑定
channel.queueBind(queue,EXCHANGE_NAME,"",null);
System.out.println("C1等待接收消息:");
// 接收消息的回调
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println("C1控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
};
// 消费消息
channel.basicConsume(queue,true,deliverCallback,(consumerTag)->{
System.out.println("C1取消接收消息");
});
}
}
消费者2:
/**
* 功能说明:
*
* @author yangxu
* @date 2022/2/20
*/
public class ReceiveLogs02 {
// 交换机的名称
private static final String EXCHANGE_NAME="logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 声明一个临时队列
String queue = channel.queueDeclare().getQueue();
// 绑定
channel.queueBind(queue,EXCHANGE_NAME,"",null);
System.out.println("C2等待接收消息:");
// 接收消息的回调
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println("C2控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
};
// 消费消息
channel.basicConsume(queue,true,deliverCallback,(consumerTag)->{
System.out.println("C2取消接收消息");
});
}
}
4、Direct路由模式
路由模式的特点是:生产者发送消息到交换机并且要指定路由key,消费者将队列绑定到交换机时需要指定绑定key,当绑定key和路由key完全匹配时,消费者才能消费消息,一个队列可以绑定多个key。
生产者:
/**
* 功能说明:
*
* @author yangxu
* @date 2022/2/20
*/
public class DirectLogs {
// 交换机的名称
private static final String EXCHANGE_NAME="direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
Scanner scan = new Scanner(System.in);
while (scan.hasNext()){
String message = scan.next();
// 修改routingKey参数 即可想要发送到指定的消费方
channel.basicPublish(EXCHANGE_NAME,"error",null,message.getBytes());
System.out.println("生产者发出消息:"+message);
}
}
}
消费者1:
/**
* 功能说明: 消费消息
*
* @author yangxu
* @date 2022/2/20
*/
public class ReceiveLogsDirect01 {
// 交换机名称
private static final String EXCHANGE_NAME = "direct_logs";
// 队列名称
private static final String QUEUE_NAME = "console";
public static void main(String[] args) throws Exception {
// 获取信道
Channel channel = RabbitMqUtils.getChannel();
//声明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 声明一个队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 绑定
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"warning");
// 消费消息的回调
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println("C1控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
};
//消费消息
channel.basicConsume(QUEUE_NAME,true,deliverCallback,(consumerTag)->{
System.out.println("C1取消接收消息");
});
}
}
消费者2:
/**
* 功能说明: 消费消息
*
* @author yangxu
* @date 2022/2/20
*/
public class ReceiveLogsDirect02 {
// 交换机名称
private static final String EXCHANGE_NAME = "direct_logs";
// 队列名称
private static final String QUEUE_NAME = "console";
public static void main(String[] args) throws Exception {
// 获取信道
Channel channel = RabbitMqUtils.getChannel();
//声明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 声明一个队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 绑定
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
// 消费消息的回调
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println("C2控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
};
//消费消息
channel.basicConsume(QUEUE_NAME,true,deliverCallback,(consumerTag)->{
System.out.println("C2取消接收消息");
});
}
}
5、Topic通配符模式
该模式的特点是:用通配符进行匹配,#匹配0个或多个单词,*匹配一个单词。
生产者:
/**
* 功能说明: 话题模式-生产者
*
* @author yangxu
* @date 2022/2/20
*/
public class TopicLogs {
// 交换机的名称
private static final String EXCHANGE_NAME="topic_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
Scanner scan = new Scanner(System.in);
while (scan.hasNext()){
String message = scan.next();
// 修改routingKey参数 即可想要发送到指定的消费方
/**
* routingKey实例:
* quick.orange.rabbit
* lazy.orange.elephant
* quick.orange.fox
* lazy.brown.fox
*/ channel.basicPublish(EXCHANGE_NAME,"quick.orange.fox",null,message.getBytes());
System.out.println("生产者发出消息:"+message);
}
}
}
消费者1:
/**
* 功能说明: 主题交换机 消费消息
*
* @author yangxu
* @date 2022/2/20
*/
public class ReceiveLogsTopic01 {
// 交换机名称
private static final String EXCHANGE_NAME = "topic_logs";
// 队列名称
private static final String QUEUE_NAME = "q1";
public static void main(String[] args) throws Exception {
// 获取信道
Channel channel = RabbitMqUtils.getChannel();
//声明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 声明一个队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 绑定
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.orange.*");
// 消费消息的回调
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println("C1控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
System.out.println("接收队列:"+QUEUE_NAME+" 绑定键:"+message.getEnvelope().getRoutingKey());
};
//消费消息
channel.basicConsume(QUEUE_NAME,true,deliverCallback,(consumerTag)->{
System.out.println("C1取消接收消息");
});
}
}
消费者2:
/**
* 功能说明: 消费消息
*
* @author yangxu
* @date 2022/2/20
*/
public class ReceiveLogsTopic02 {
// 交换机名称
private static final String EXCHANGE_NAME = "topic_logs";
// 队列名称
private static final String QUEUE_NAME = "q2";
public static void main(String[] args) throws Exception {
// 获取信道
Channel channel = RabbitMqUtils.getChannel();
//声明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 声明一个队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 绑定
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.*.rabbit");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"lazy.#");
// 消费消息的回调
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println("C2控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
System.out.println("接收队列:"+QUEUE_NAME+" 绑定键:"+message.getEnvelope().getRoutingKey());
};
//消费消息
channel.basicConsume(QUEUE_NAME,true,deliverCallback,(consumerTag)->{
System.out.println("C2取消接收消息");
});
}
}
评论