RabbitMQ基础知识二

RabbitMQ五种工作模式

1、简单模式

简单模式的特点是一个生产者,一个队列,一个消费者,这种模式不需要进行任何的交换机的binding绑定。

工具类:

/**
 * 功能说明:连接工厂创建信道的工具类
 *
 * @author yangxu
 * @date 2022/2/19
 */
public class RabbitMqUtils {
  //得到一个连接的 channel
  public static Channel getChannel() throws Exception {
    //创建一个连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("host"); //按照实际情况填写自己的ip
    factory.setUsername("admin"); //按照实际情况填写自己的用户名
    factory.setPassword("pwd"); //按照实际情况填写自己的密码
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    return channel;
  }
}

生产者:

/**
 * 功能说明:生产者,发送消息
 *
 * @author yangxu
 * @date 2022/2/19
 */
public class Producer {
  // 队列名称
  public static final String QUEUE_NAME="hello";
  // 发送消息
  public static void main(String[] args) throws IOException, TimeoutException {
    // 创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 工厂ip 连接rabbitmq的队列
    factory.setHost("host");
    // 用户名
    factory.setUsername("admin");
    // 密码
    factory.setPassword("pwd");
    // 创建连接
    Connection connection = factory.newConnection();
    // 获取信道
    Channel channel = connection.createChannel();
    /**
     * 创建队列
     * 1、队列名称
     * 2、队列里面的消息是否持久化,默认存储在内存中
     * 3、该队列是否只供一个消费者进行消费,是否进行消息共享,true可以多个消费者消费
     * 4、是否自动删除,最后一个消费者断开连接以后,该队列是否自动删除,false不自动删除
     * 5、其他参数
     */
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    // 发消息
    String message = "hello world"; // 初次使用
    /**
     * 发送一个消息
     * 1、发送到那个交换机
     * 2、路由的配置是那个 routingKey
     * 3、其他消息参数
     * 4、发送消息的消息体
     */
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    System.out.println("消息发送完毕");
  }
}

消费者:

/**
 * 功能说明: 消费者 接收消息
 *
 * @author yangxu
 * @date 2022/2/19
 */
public class Consumer {
  // 队列名称
  public static final String QUEUE_NAME="hello";

  // 接收消息
  public static void main(String[] args) throws Exception {
    //创建连接工厂
    Channel channel = RabbitMqUtils.getChannel();
    // 声明 接收消息
    DeliverCallback deliverCallback = (consumerTag, message)->{
      System.out.println(new String(message.getBody()));
    };

    //取消消息时的回调
    CancelCallback cancelCallback = (consumerTag)->{
      System.out.println("消费消息被中断");
    };
    /**
     * 消费者消费消费
     * 1、消费那个队列
     * 2、消费成功之后是否要自动应答 true 代表自动应答
     * 3、消费者成功的回调
     * 4、消费者取消消费的回调
     * */
    channel.basicConsume(QUEUE_NAME, true, deliverCallback,cancelCallback);
  }
}

2、Work模式

work模式的特点是一个生产这,一个队列,多个消费者,每个消费者获取到的消息是唯一的。

生产者:

/**
 * 功能说明:work模式 生产者
 *
 * @author yangxu
 * @date 2022/2/19
 */
public class Task01 {
  private static final String QUEUE_NAME = "hello";
  public static void main(String[] args) throws Exception {
    try {
      Channel channel = RabbitMqUtils.getChannel();
      channel.queueDeclare(QUEUE_NAME, false, false, false, null);
      //从控制台当中接受信息
      Scanner scanner = new Scanner(System.in);
      while (scanner.hasNext()) {
        String message = scanner.next();
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println("发送消息完成:" + message);
      }
    } catch (Exception e) {
        e.printStackTrace();
    }
  }
}

消费者1:

/**
 * 功能说明: 消费者1
 *
 * @author yangxu
 * @date 2022/2/19
 */
public class Worker01 {
  // 队列的名称
  public static final String QUEUE_NAME="hello";
  //接收消息
  public static void main(String[] args) throws Exception {
    //获取信道
    Channel channel = RabbitMqUtils.getChannel();
    // 声明 接收消息
    DeliverCallback deliverCallback = (consumerTag, message)->{
      System.out.println("C1接收到的消息:" + new String(message.getBody()));
    };
    //取消消息时的回调
    CancelCallback cancelCallback = (consumerTag)->{
      System.out.println("消费消息被中断");
    };
    System.out.println("C1等待接收消息");
    channel.basicConsume(QUEUE_NAME, true,deliverCallback,cancelCallback);
  }
}

消费者2:

/**
 * 功能说明: 消费者2
 *
 * @author yangxu
 * @date 2022/2/19
 */
public class Worker02 {
  // 队列的名称
  public static final String QUEUE_NAME="hello";
  //接收消息
  public static void main(String[] args) throws Exception {
    //获取信道
    Channel channel = RabbitMqUtils.getChannel();
    // 声明 接收消息
    DeliverCallback deliverCallback = (consumerTag, message)->{
      System.out.println("C2接收到的消息:" + new String(message.getBody()));
    };
    //取消消息时的回调
    CancelCallback cancelCallback = (consumerTag)->{
      System.out.println("消费消息被中断");
    };
    System.out.println("C2等待接收消息");
    channel.basicConsume(QUEUE_NAME, true,deliverCallback,cancelCallback);


  }
}

3、Fanout发布订阅模式

这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定。生产者发送的消息会被多个消费者获取。

生产者:

/**
 * 功能说明: 发送消息
 *
 * @author yangxu
 * @date 2022/2/20
 */
public class EmitLog {
  // 交换机的名称
  private static final String EXCHANGE_NAME="logs";
  public static void main(String[] args) throws Exception {
    Channel channel = RabbitMqUtils.getChannel();
    // 声明交换机
    channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
    Scanner scan = new Scanner(System.in);
    while (scan.hasNext()){
      String message = scan.next();
      channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
      System.out.println("生产者发出消息:"+message);
    }
  }
}

消费者1

/**
 * 功能说明:
 *
 * @author yangxu
 * @date 2022/2/20
 */
public class ReceiveLogs01 {
  // 交换机的名称
  private static final String EXCHANGE_NAME="logs";
  public static void main(String[] args) throws Exception {
    Channel channel = RabbitMqUtils.getChannel();
    // 声明交换机
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    // 声明一个临时队列
    String queue = channel.queueDeclare().getQueue();
    // 绑定
    channel.queueBind(queue,EXCHANGE_NAME,"",null);
    System.out.println("C1等待接收消息:");
    // 接收消息的回调
    DeliverCallback deliverCallback = (consumerTag, message)->{
      System.out.println("C1控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
    };
    // 消费消息
    channel.basicConsume(queue,true,deliverCallback,(consumerTag)->{
      System.out.println("C1取消接收消息");
    });
  }
}

消费者2:

/**
 * 功能说明:
 *
 * @author yangxu
 * @date 2022/2/20
 */
public class ReceiveLogs02 {
  // 交换机的名称
  private static final String EXCHANGE_NAME="logs";
  public static void main(String[] args) throws Exception {
    Channel channel = RabbitMqUtils.getChannel();
    // 声明交换机
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    // 声明一个临时队列
    String queue = channel.queueDeclare().getQueue();
    // 绑定
    channel.queueBind(queue,EXCHANGE_NAME,"",null);
    System.out.println("C2等待接收消息:");
    // 接收消息的回调
    DeliverCallback deliverCallback = (consumerTag, message)->{
      System.out.println("C2控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
    };
    // 消费消息
    channel.basicConsume(queue,true,deliverCallback,(consumerTag)->{
      System.out.println("C2取消接收消息");
    });
  }
}

4、Direct路由模式

路由模式的特点是:生产者发送消息到交换机并且要指定路由key,消费者将队列绑定到交换机时需要指定绑定key,当绑定key和路由key完全匹配时,消费者才能消费消息,一个队列可以绑定多个key。

生产者:

/**
 * 功能说明:
 *
 * @author yangxu
 * @date 2022/2/20
 */
public class DirectLogs {
  // 交换机的名称
  private static final String EXCHANGE_NAME="direct_logs";
  public static void main(String[] args) throws Exception {
    Channel channel = RabbitMqUtils.getChannel();
    // 声明交换机
    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    Scanner scan = new Scanner(System.in);
    while (scan.hasNext()){
      String message = scan.next();
      // 修改routingKey参数 即可想要发送到指定的消费方
      channel.basicPublish(EXCHANGE_NAME,"error",null,message.getBytes());
      System.out.println("生产者发出消息:"+message);
    }
  }
}

消费者1:

/**
 * 功能说明: 消费消息
 *
 * @author yangxu
 * @date 2022/2/20
 */
public class ReceiveLogsDirect01 {
  // 交换机名称
  private static final String EXCHANGE_NAME = "direct_logs";
  // 队列名称
  private static final String QUEUE_NAME = "console";
  public static void main(String[] args) throws Exception {
    // 获取信道
    Channel channel = RabbitMqUtils.getChannel();
    //声明一个交换机
    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    // 声明一个队列
    channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    // 绑定
    channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");
    channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"warning");
    // 消费消息的回调
    DeliverCallback deliverCallback = (consumerTag, message)->{
      System.out.println("C1控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
    };
    //消费消息
    channel.basicConsume(QUEUE_NAME,true,deliverCallback,(consumerTag)->{
      System.out.println("C1取消接收消息");
    });
  }
}

消费者2:

/**
 * 功能说明: 消费消息
 *
 * @author yangxu
 * @date 2022/2/20
 */
public class ReceiveLogsDirect02 {
  // 交换机名称
  private static final String EXCHANGE_NAME = "direct_logs";
  // 队列名称
  private static final String QUEUE_NAME = "console";
  public static void main(String[] args) throws Exception {
    // 获取信道
    Channel channel = RabbitMqUtils.getChannel();
    //声明一个交换机
    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    // 声明一个队列
    channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    // 绑定
    channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
    // 消费消息的回调
    DeliverCallback deliverCallback = (consumerTag, message)->{
      System.out.println("C2控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
    };
    //消费消息
    channel.basicConsume(QUEUE_NAME,true,deliverCallback,(consumerTag)->{
      System.out.println("C2取消接收消息");
    });
  }
}

5、Topic通配符模式

该模式的特点是:用通配符进行匹配,#匹配0个或多个单词,*匹配一个单词。

生产者:

/**
 * 功能说明: 话题模式-生产者
 *
 * @author yangxu
 * @date 2022/2/20
 */
public class TopicLogs {
  // 交换机的名称
  private static final String EXCHANGE_NAME="topic_logs";
  public static void main(String[] args) throws Exception {
    Channel channel = RabbitMqUtils.getChannel();
    // 声明交换机
    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
    Scanner scan = new Scanner(System.in);
    while (scan.hasNext()){
      String message = scan.next();
      // 修改routingKey参数 即可想要发送到指定的消费方
      /**
       * routingKey实例:
       * quick.orange.rabbit
       * lazy.orange.elephant
       * quick.orange.fox
       * lazy.brown.fox
       */ channel.basicPublish(EXCHANGE_NAME,"quick.orange.fox",null,message.getBytes());
      System.out.println("生产者发出消息:"+message);
    }
  }
}

消费者1:

/**
 * 功能说明: 主题交换机 消费消息
 *
 * @author yangxu
 * @date 2022/2/20
 */
public class ReceiveLogsTopic01 {
  // 交换机名称
  private static final String EXCHANGE_NAME = "topic_logs";
  // 队列名称
  private static final String QUEUE_NAME = "q1";
  public static void main(String[] args) throws Exception {
    // 获取信道
    Channel channel = RabbitMqUtils.getChannel();
    //声明一个交换机
    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
    // 声明一个队列
    channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    // 绑定
    channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.orange.*");
    // 消费消息的回调
    DeliverCallback deliverCallback = (consumerTag, message)->{
      System.out.println("C1控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
      System.out.println("接收队列:"+QUEUE_NAME+"  绑定键:"+message.getEnvelope().getRoutingKey());
    };
    //消费消息
    channel.basicConsume(QUEUE_NAME,true,deliverCallback,(consumerTag)->{
      System.out.println("C1取消接收消息");
    });
  }
}

消费者2:

/**
 * 功能说明: 消费消息
 *
 * @author yangxu
 * @date 2022/2/20
 */
public class ReceiveLogsTopic02 {
  // 交换机名称
  private static final String EXCHANGE_NAME = "topic_logs";
  // 队列名称
  private static final String QUEUE_NAME = "q2";
  public static void main(String[] args) throws Exception {
    // 获取信道
    Channel channel = RabbitMqUtils.getChannel();
    //声明一个交换机
    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
    // 声明一个队列
    channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    // 绑定
    channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.*.rabbit");
    channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"lazy.#");
    // 消费消息的回调
    DeliverCallback deliverCallback = (consumerTag, message)->{
      System.out.println("C2控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
      System.out.println("接收队列:"+QUEUE_NAME+"  绑定键:"+message.getEnvelope().getRoutingKey());
    };
    //消费消息
    channel.basicConsume(QUEUE_NAME,true,deliverCallback,(consumerTag)->{
      System.out.println("C2取消接收消息");
    });
  }
}
end
  • 作者:旭仔(联系作者)
  • 发表时间:2022-03-19 22:50
  • 版权声明:自由转载-非商用-非衍生-保持署名
  • 转载声明:如果是转载栈主转载的文章,请附上原文链接
  • 公众号转载:请在文末添加作者公众号二维码(公众号二维码见右边,欢迎关注)
  • 评论