模型
生产者
1 package cn.wh; 2 3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5 6 import cn.util.RabbitMqConnectionUtil; 7 8 import com.rabbitmq.client.Channel; 9 import com.rabbitmq.client.Connection;10 11 public class Send {12 13 private static final String EXCHANGE_NAME="test_exchange_direct";14 15 public static void main(String[] args) throws IOException, TimeoutException {16 17 18 Connection connection = RabbitMqConnectionUtil.getConnection();19 20 Channel channel = connection.createChannel();21 22 //exchange23 channel.exchangeDeclare(EXCHANGE_NAME, "direct");24 25 String msg="hello direct!";26 27 28 String routingKey="error";29 channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());30 31 System.out.println("send "+msg);32 33 channel.close();34 connection.close();35 }36 }
消费者
1 package cn.wh; 2 3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5 6 7 import cn.util.RabbitMqConnectionUtil; 8 import com.rabbitmq.client.Channel; 9 import com.rabbitmq.client.Connection;10 import com.rabbitmq.client.Consumer;11 import com.rabbitmq.client.DefaultConsumer;12 import com.rabbitmq.client.Envelope;13 import com.rabbitmq.client.AMQP.BasicProperties;14 15 public class Recv1 {16 private static final String EXCHANGE_NAME = "test_exchange_direct";17 private static final String QUEUE_NAME = "test_queue_direct_1";18 19 public static void main(String[] args) throws IOException, TimeoutException {20 21 Connection connection = RabbitMqConnectionUtil.getConnection();22 final Channel channel = connection.createChannel();23 24 channel.queueDeclare(QUEUE_NAME, false, false, false, null);25 26 27 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");28 29 channel.basicQos(1);30 31 //定义一个消费者32 Consumer consumer=new DefaultConsumer(channel){33 //消息到达 触发这个方法34 @Override35 public void handleDelivery(String consumerTag, Envelope envelope,36 BasicProperties properties, byte[] body) throws IOException {37 38 String msg=new String(body,"utf-8");39 System.out.println("[1] Recv msg:"+msg);40 41 try {42 Thread.sleep(2000);43 } catch (InterruptedException e) {44 e.printStackTrace();45 }finally{46 System.out.println("[1] done ");47 channel.basicAck(envelope.getDeliveryTag(), false);48 }49 }50 };51 52 boolean autoAck=false;//自动应答 false53 channel.basicConsume(QUEUE_NAME,autoAck , consumer);54 }55 56 }
消费者2
1 package cn.wh; 2 3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5 6 7 import cn.util.RabbitMqConnectionUtil; 8 import com.rabbitmq.client.Channel; 9 import com.rabbitmq.client.Connection;10 import com.rabbitmq.client.Consumer;11 import com.rabbitmq.client.DefaultConsumer;12 import com.rabbitmq.client.Envelope;13 import com.rabbitmq.client.AMQP.BasicProperties;14 15 public class Recv2 {16 private static final String EXCHANGE_NAME = "test_exchange_direct";17 private static final String QUEUE_NAME = "test_queue_direct_2";18 19 public static void main(String[] args) throws IOException, TimeoutException {20 21 Connection connection = RabbitMqConnectionUtil.getConnection();22 final Channel channel = connection.createChannel();23 24 channel.queueDeclare(QUEUE_NAME, false, false, false, null);25 26 27 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");28 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");29 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");30 31 channel.basicQos(1);32 33 //定义一个消费者34 Consumer consumer=new DefaultConsumer(channel){35 //消息到达 触发这个方法36 @Override37 public void handleDelivery(String consumerTag, Envelope envelope,38 BasicProperties properties, byte[] body) throws IOException {39 40 String msg=new String(body,"utf-8");41 System.out.println("[2] Recv msg:"+msg);42 43 try {44 Thread.sleep(2000);45 } catch (InterruptedException e) {46 e.printStackTrace();47 }finally{48 System.out.println("[2] done ");49 channel.basicAck(envelope.getDeliveryTag(), false);50 }51 }52 };53 54 boolean autoAck=false;//自动应答 false55 channel.basicConsume(QUEUE_NAME,autoAck , consumer);56 }57 58 }
Topic模型
1 public class Send { 2 private final static String EXCHANGE_NAME = "test_exchange_topic"; 3 public static void main(String[] argv) throws Exception { 4 // 获取到连接以及mq通道 5 Connection connection = ConnectionUtils.getConnection(); 6 Channel channel = connection.createChannel(); 7 // 声明exchange 8 channel.exchangeDeclare(EXCHANGE_NAME, "topic"); 9 // 消息内容10 String message = "id=1001";11 channel.basicPublish(EXCHANGE_NAME, "item.delete", null, message.getBytes());12 System.out.println(" [x] Sent '" + message + "'");13 channel.close();14 connection.close();15 }16 }
消费者
1 public class Recv { 2 private final static String QUEUE_NAME = "test_queue_topic_1"; 3 private final static String EXCHANGE_NAME = "test_exchange_topic"; 4 public static void main(String[] argv) throws Exception { 5 // 获取到连接以及mq通道 6 Connection connection = ConnectionUtils.getConnection(); 7 final Channel channel = connection.createChannel(); 8 // 声明队列 9 channel.queueDeclare(QUEUE_NAME, false, false, false, null);10 // 绑定队列到交换机11 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");12 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");13 // 同一时刻服务器只会发一条消息给消费者14 channel.basicQos(1);15 // 定义队列的消费者16 Consumer consumer = new DefaultConsumer(channel) {17 // 消息到达 触发这个方法18 @Override19 public void handleDelivery(String consumerTag, Envelope envelope,20 BasicProperties properties, byte[] body) throws IOException {21 String msg = new String(body, "utf-8");22 System.out.println("[2] Recv msg:" + msg);23 try {24 Thread.sleep(1000);25 } catch (InterruptedException e) {26 e.printStackTrace();27 } finally {28 System.out.println("[2] done ");29 // 手动回执30 channel.basicAck(envelope.getDeliveryTag(), false);31 }32 }33 };34 boolean autoAck = false;35 channel.basicConsume(QUEUE_NAME, autoAck, consumer);36 }37 }
消费者2
1 public class Recv { 2 private final static String QUEUE_NAME = "test_queue_topic_1"; 3 private final static String EXCHANGE_NAME = "test_exchange_topic"; 4 public static void main(String[] argv) throws Exception { 5 // 获取到连接以及mq通道 6 Connection connection = ConnectionUtils.getConnection(); 7 final Channel channel = connection.createChannel(); 8 // 声明队列 9 channel.queueDeclare(QUEUE_NAME, false, false, false, null);10 // 绑定队列到交换机11 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");12 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");13 // 同一时刻服务器只会发一条消息给消费者14 channel.basicQos(1);15 // 定义队列的消费者16 Consumer consumer = new DefaultConsumer(channel) {17 // 消息到达 触发这个方法18 @Override19 public void handleDelivery(String consumerTag, Envelope envelope,20 BasicProperties properties, byte[] body) throws IOException {21 String msg = new String(body, "utf-8");22 System.out.println("[2] Recv msg:" + msg);23 try {24 Thread.sleep(1000);25 } catch (InterruptedException e) {26 e.printStackTrace();27 } finally {28 System.out.println("[2] done ");29 // 手动回执30 channel.basicAck(envelope.getDeliveryTag(), false);31 }32 }33 };34 boolean autoAck = false;35 channel.basicConsume(QUEUE_NAME, autoAck, consumer);36 }37 }
Exchanges(转发器|交换机)
转发器一方面它接受生产者的消息,另一方面向队列推送消息
Nameless exchange(匿名转发)
之前我们对转换器一无所知,却可以将消息发送到队列,那是可能是我们用了默认的转发器,转发器名为空字符串""。之前我们发布消息的代码
Fanout Exchange
不处理路由键。你只需要将队列绑定到交换机上。发送消息到交换机都会被转发到与该交换机绑定的所有队列
Direct Exchange
处理路由键。
需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发 dog.puppy,也不会转发dog.guard,只会转发 dog。![](https://img2018.cnblogs.com/blog/1216909/201812/1216909-20181205160134910-722880906.png)
Topic Exchange
将路由键和某模式进行匹配。
此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。