为什么要使用交换机?
当发布者将消息直接放入队列中以后,一旦消费者对此消息使用,则消息将会从队列中删除,导致其他消费者拿不到队列中得消息。
可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:
Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给交换机
Exchange:交换机。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:
Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key 的队列
Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
发布订阅模型与之前队列发送的区别就是通过交换机(exchange)允许将同一条消息发给多个消费者
发布者 - 交换机 - 队列1、队列2、队列3等 - 消费者…
交换机
- 只负责将消息转发给队列,不存储消息
- 发送到交换机的消息都会被转发到该交换机绑定的所有队列上
- Fanout交换机转发消息是最快的
- Fanout Exchange交换机可以简单的理解为广播站。
本文讲解FanoutExchange
FanoutExchange 会将接收到的消息发送到每一个跟其绑定的队列
1.在发布者publisher服务中向交换机发送消息
交换机名称是badianboke.fanout
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
//发送到交换机
@Test
public void testSendFandoutExchange(){
String exchangeName = "badianboke.fanout";
String message = "hello badianboke.com";
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
}
2.在消费者consumer服务中声明交换机、队列并将两者绑定
cn/badianboke/mq/config/FanoutConfig.java
package cn.badianboke.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConfig {
//声明交换机
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("badianboke.fanout");
}
//声明队列1
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
//声明队列2
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
//将交换机与队列1绑定
@Bean
public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder
.bind(fanoutQueue1)
.to(fanoutExchange);
}
//将交换机与队列2绑定
@Bean
public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder
.bind(fanoutQueue2)
.to(fanoutExchange);
}
}
打开RabbitMq控制台,可以看到有一个交换机 badianboke.fanout
3.在消费者consumer服务中监听两个队列并接收消息
cn/badianboke/mq/listener/SpringRabbitListener.java
package cn.badianboke.mq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalTime;
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String message) throws InterruptedException {
System.out.println("Consumer1 Received fanout.queue1 Message: " + message + " / " + LocalTime.now());
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String message) throws InterruptedException {
System.err.println("Consumer2 Received fanout.queue2 Message: " + message + " / " + LocalTime.now());
}
}
打开RabbitMq控制台,可以看到两个队列已经绑定到交换机上