声明:本站文章均为作者个人原创,图片均为实际截图。如有需要请收藏网站,禁止转载,谢谢配合!!!

为什么要使用交换机?

当发布者将消息直接放入队列中以后,一旦消费者对此消息使用,则消息将会从队列中删除,导致其他消费者拿不到队列中得消息。

可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:

Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给交换机
Exchange:交换机。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:

  • Fanout:广播,将消息交给所有绑定到交换机的队列

  • Direct:定向,把消息交给符合指定routing key 的队列

  • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

发布订阅模型与之前队列发送的区别就是通过交换机(exchange)允许将同一条消息发给多个消费者
发布者 - 交换机 - 队列1、队列2、队列3等 - 消费者…

交换机

  • 只负责将消息转发给队列,不存储消息
  • 发送到交换机的消息都会被转发到该交换机绑定的所有队列上
  • Fanout交换机转发消息是最快的
  • Fanout Exchange交换机可以简单的理解为广播站。

本文讲解FanoutExchange
FanoutExchange 会将接收到的消息发送到每一个跟其绑定的队列

1.在发布者publisher服务中向交换机发送消息

交换机名称是badianboke.fanout

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    //发送到交换机
    @Test
    public void testSendFandoutExchange(){
        String exchangeName = "badianboke.fanout";
        String message = "hello badianboke.com";
        rabbitTemplate.convertAndSend(exchangeName, "", message);
    }
}

2.在消费者consumer服务中声明交换机、队列并将两者绑定

cn/badianboke/mq/config/FanoutConfig.java

package cn.badianboke.mq.config;


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {
    //声明交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("badianboke.fanout");
    }
    //声明队列1
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    //声明队列2
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    //将交换机与队列1绑定
    @Bean
    public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder
                .bind(fanoutQueue1)
                .to(fanoutExchange);
    }
    //将交换机与队列2绑定
    @Bean
    public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder
                .bind(fanoutQueue2)
                .to(fanoutExchange);
    }
}

打开RabbitMq控制台,可以看到有一个交换机 badianboke.fanout

3.在消费者consumer服务中监听两个队列并接收消息

cn/badianboke/mq/listener/SpringRabbitListener.java

package cn.badianboke.mq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalTime;

@Component
public class SpringRabbitListener {

    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String message) throws InterruptedException {
        System.out.println("Consumer1 Received fanout.queue1 Message: " + message + " / " + LocalTime.now());
    }

    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String message) throws InterruptedException {
        System.err.println("Consumer2 Received fanout.queue2 Message: " + message + " / " + LocalTime.now());
    }
}

打开RabbitMq控制台,可以看到两个队列已经绑定到交换机上

4.运行服务可以看到消费者接收到消息