博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ 的路由模式 Topic模式
阅读量:7085 次
发布时间:2019-06-28

本文共 8657 字,大约阅读时间需要 28 分钟。

模型

生产者

1 package cn.wh; 2  3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5  6 import cn.util.RabbitMqConnectionUtil; 7  8 import com.rabbitmq.client.Channel; 9 import com.rabbitmq.client.Connection;10 11 public class Send {12     13     private static final String EXCHANGE_NAME="test_exchange_direct";14     15     public static void main(String[] args) throws IOException, TimeoutException {16         17         18         Connection connection = RabbitMqConnectionUtil.getConnection();19         20         Channel channel = connection.createChannel();21         22         //exchange23         channel.exchangeDeclare(EXCHANGE_NAME, "direct");24         25         String  msg="hello direct!";26         27         28         String routingKey="error";29         channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());30         31         System.out.println("send "+msg);32         33         channel.close();34         connection.close();35     }36 }

消费者

 

1 package cn.wh; 2  3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5  6  7 import cn.util.RabbitMqConnectionUtil; 8 import com.rabbitmq.client.Channel; 9 import com.rabbitmq.client.Connection;10 import com.rabbitmq.client.Consumer;11 import com.rabbitmq.client.DefaultConsumer;12 import com.rabbitmq.client.Envelope;13 import com.rabbitmq.client.AMQP.BasicProperties;14 15 public class Recv1 {16     private static final String EXCHANGE_NAME = "test_exchange_direct";17     private static final String QUEUE_NAME = "test_queue_direct_1";18 19     public static void main(String[] args) throws IOException, TimeoutException {20 21         Connection connection = RabbitMqConnectionUtil.getConnection();22         final Channel channel = connection.createChannel();23 24         channel.queueDeclare(QUEUE_NAME, false, false, false, null);25         26     27         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");28         29         channel.basicQos(1);30         31         //定义一个消费者32         Consumer consumer=new DefaultConsumer(channel){33             //消息到达 触发这个方法34             @Override35             public void handleDelivery(String consumerTag, Envelope envelope,36                     BasicProperties properties, byte[] body) throws IOException {37              38                 String msg=new String(body,"utf-8");39                 System.out.println("[1] Recv msg:"+msg);40                 41                 try {42                     Thread.sleep(2000);43                 } catch (InterruptedException e) {44                     e.printStackTrace();45                 }finally{46                     System.out.println("[1] done ");47                     channel.basicAck(envelope.getDeliveryTag(), false);48                 }49             }50         };51         52         boolean autoAck=false;//自动应答 false53         channel.basicConsume(QUEUE_NAME,autoAck , consumer);54     }55         56 }

消费者2

1 package cn.wh; 2  3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5  6  7 import cn.util.RabbitMqConnectionUtil; 8 import com.rabbitmq.client.Channel; 9 import com.rabbitmq.client.Connection;10 import com.rabbitmq.client.Consumer;11 import com.rabbitmq.client.DefaultConsumer;12 import com.rabbitmq.client.Envelope;13 import com.rabbitmq.client.AMQP.BasicProperties;14 15 public class Recv2 {16     private static final String EXCHANGE_NAME = "test_exchange_direct";17     private static final String QUEUE_NAME = "test_queue_direct_2";18 19     public static void main(String[] args) throws IOException, TimeoutException {20 21         Connection connection = RabbitMqConnectionUtil.getConnection();22         final Channel channel = connection.createChannel();23 24         channel.queueDeclare(QUEUE_NAME, false, false, false, null);25         26     27         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");28         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");29         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");30         31         channel.basicQos(1);32         33         //定义一个消费者34         Consumer consumer=new DefaultConsumer(channel){35             //消息到达 触发这个方法36             @Override37             public void handleDelivery(String consumerTag, Envelope envelope,38                     BasicProperties properties, byte[] body) throws IOException {39              40                 String msg=new String(body,"utf-8");41                 System.out.println("[2] Recv msg:"+msg);42                 43                 try {44                     Thread.sleep(2000);45                 } catch (InterruptedException e) {46                     e.printStackTrace();47                 }finally{48                     System.out.println("[2] done ");49                     channel.basicAck(envelope.getDeliveryTag(), false);50                 }51             }52         };53         54         boolean autoAck=false;//自动应答 false55         channel.basicConsume(QUEUE_NAME,autoAck , consumer);56     }57         58 }

Topic模型

1 public class Send { 2  private final static String EXCHANGE_NAME = "test_exchange_topic"; 3  public static void main(String[] argv) throws Exception { 4  // 获取到连接以及mq通道 5  Connection connection = ConnectionUtils.getConnection(); 6  Channel channel = connection.createChannel(); 7  // 声明exchange 8  channel.exchangeDeclare(EXCHANGE_NAME, "topic"); 9  // 消息内容10  String message = "id=1001";11  channel.basicPublish(EXCHANGE_NAME, "item.delete", null, message.getBytes());12  System.out.println(" [x] Sent '" + message + "'");13  channel.close();14  connection.close();15  }16 }

消费者

1 public class Recv { 2 private final static String QUEUE_NAME = "test_queue_topic_1"; 3 private final static String EXCHANGE_NAME = "test_exchange_topic"; 4 public static void main(String[] argv) throws Exception { 5 // 获取到连接以及mq通道 6 Connection connection = ConnectionUtils.getConnection(); 7 final Channel channel = connection.createChannel(); 8 // 声明队列 9 channel.queueDeclare(QUEUE_NAME, false, false, false, null);10 // 绑定队列到交换机11 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");12 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");13 // 同一时刻服务器只会发一条消息给消费者14 channel.basicQos(1);15 // 定义队列的消费者16 Consumer consumer = new DefaultConsumer(channel) {17 // 消息到达 触发这个方法18 @Override19 public void handleDelivery(String consumerTag, Envelope envelope,20 BasicProperties properties, byte[] body) throws IOException {21 String msg = new String(body, "utf-8");22 System.out.println("[2] Recv msg:" + msg);23 try {24 Thread.sleep(1000);25 } catch (InterruptedException e) {26 e.printStackTrace();27 } finally {28 System.out.println("[2] done ");29 // 手动回执30 channel.basicAck(envelope.getDeliveryTag(), false);31 }32 }33 };34 boolean autoAck = false;35 channel.basicConsume(QUEUE_NAME, autoAck, consumer);36 }37 }

消费者2

1 public class Recv { 2 private final static String QUEUE_NAME = "test_queue_topic_1"; 3 private final static String EXCHANGE_NAME = "test_exchange_topic"; 4 public static void main(String[] argv) throws Exception { 5 // 获取到连接以及mq通道 6 Connection connection = ConnectionUtils.getConnection(); 7 final Channel channel = connection.createChannel(); 8 // 声明队列 9 channel.queueDeclare(QUEUE_NAME, false, false, false, null);10 // 绑定队列到交换机11 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");12 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");13 // 同一时刻服务器只会发一条消息给消费者14 channel.basicQos(1);15 // 定义队列的消费者16 Consumer consumer = new DefaultConsumer(channel) {17 // 消息到达 触发这个方法18 @Override19 public void handleDelivery(String consumerTag, Envelope envelope,20 BasicProperties properties, byte[] body) throws IOException {21 String msg = new String(body, "utf-8");22 System.out.println("[2] Recv msg:" + msg);23 try {24 Thread.sleep(1000);25 } catch (InterruptedException e) {26 e.printStackTrace();27 } finally {28 System.out.println("[2] done ");29 // 手动回执30 channel.basicAck(envelope.getDeliveryTag(), false);31 }32 }33 };34 boolean autoAck = false;35 channel.basicConsume(QUEUE_NAME, autoAck, consumer);36 }37 }

Exchanges(转发器|交换机)

转发器一方面它接受生产者的消息,另一方面向队列推送消息

Nameless exchange(匿名转发)

之前我们对转换器一无所知,却可以将消息发送到队列,那是可能是我们用了默认的转发器,转发器名为空字符串""。之前我们发布消息的代码

Fanout Exchange

不处理路由键。你只需要将队列绑定到交换机上。发送消息到交换机都会被转发到与该交换机绑定的所有队列

 

Direct Exchange

处理路由键。

需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发 dog.puppy,也不会转发dog.guard,只会转发 dog。
                                                    

Topic Exchange

将路由键和某模式进行匹配。

此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。

 

转载于:https://www.cnblogs.com/wh1520577322/p/10071505.html

你可能感兴趣的文章
<rhel6+pptpd+freeradius+mysql>
查看>>
前端有哪些优质资源可以利用?
查看>>
[ASP.NET]跨页面传值
查看>>
名词:topology、architecture和struct,究竟什么才是架构?
查看>>
极速理解设计模式系列:20.模板方法模式(Template Method Pattern)
查看>>
如何使用mysql(lamp)分离环境搭建dedecms织梦网站及apache服务器常见的403http状态码及其解决方法...
查看>>
CentOS6.5 keepalived详解及实现Nginx服务的高可用性
查看>>
OSPF路由过滤测试
查看>>
Linux基础命令小结(上)-Linux学习日记
查看>>
SMS 2003 系列 —OSD部署指南
查看>>
乾颐堂HCIE1 OSPF基础和Hello报文以及邻居的基本排错
查看>>
对VS2008生成智能win32程序简单理解
查看>>
Oracle DG 最大保护(Maximize Protection)和最高可用性(Maximize Availability)异同
查看>>
java中的类修饰符、成员变量修饰符、方法修饰符。
查看>>
IT顾问成长分享沙龙
查看>>
Spring resource bundle多语言,单引号format异常
查看>>
AIX中不小心删除了inittab文件
查看>>
微软企业级加解密解决方案MBAM利用门户查询恢复密码(用户自助和管理门户)...
查看>>
牛津大学人类未来研究所:万字长文谈AI新职场方向-政策研究
查看>>
swift UI专项训练40 用swift实现打电话和发短信功能
查看>>