關於 rabbitmq,你想知道的都在這裏(上)

rabbitmq 在 window 上安裝 依賴 erlang 環境,首先現在 erlang

https://www.erlang.org/downloads

然後下載 rabbitmq 的 windows 版本

https://www.rabbitmq.com/download.html

首先我們瞭解下消息隊列是由交換機 exchange 和隊列組合構成的,有三種形式

  1. 直連型:一個交換機關聯一個隊列,指定一個路由 key,消息通過交換機名稱和路由 key 發送到指定隊列,發送一個,隊列裏面就多一個消息。

  2. 扇型:一個交換機關聯多個隊列。消息通過交換機名稱發送,所有關聯了這個交換機的隊列都將收到消息,發送一個消息再 N 個消息隊列產生 N 個一模一樣的消息數據。

  3. 主題型:一個交換機根據規則關聯多個隊列。這種類型與扇型的很像,但是主題型會根據動態路由 key 決定消息的投遞到哪個隊列。這裏的路由規則很像正則表達式。會根據事先設定的路由規則動態將消息投遞到隊列,可能投遞到一個隊列也可能投遞到多個隊列。

首先添加依賴:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

yml 文件指定 rabbitmq 連接信息

server:
  port: 8021
spring:
  #給項目來個名字
  application:
    name: rabbitmq-provider
  #配置rabbitMq 服務器
  rabbitmq:
    host: localhost
    port: 5672
#    username: skywalker
#    password: skywalker
#    #虛擬host 可以不設置,使用server默認host
#    virtual-host: skywalker-virtualhost
    #確認消息已發送到交換機(Exchange)
    publisher-confirm-type: correlated
    #確認消息已發送到隊列(Queue)
    publisher-returns: true

注意:我們需要創建兩個工程,一個生產者producer、一個消費者comsumer,生產者用來生產消息,消費者用來消費生產者將消息投遞到 rabbitmq 中的消息。

兩個工程中的 pom 依賴一樣,yml 也一樣,只需要將server.port設置成不同的端口即可。這裏我們將生產者設置爲8021端口,消費者設置爲8022端口。

直連型:

從上面的講解中我們知道,有交換機exchange,有隊列queue,有路由routing,因此我們需要在生產者端將三者關聯起來,然後發送消息,這樣消費端才能收到消息。

綁定關聯
@Configuration
public class DirectRabbitConfig {
    public static String directRouting = "directRouting";
    public static String directQueue = "directQueue";
    public static String directExchange = "directExchange";
    @Bean
    public Queue DirectQueue() {
        return new Queue(DirectRabbitConfig.directQueue,true);  //true 是否持久
    }
    @Bean
    DirectExchange DirectExchange() {
        return new DirectExchange(DirectRabbitConfig.directExchange);
    }
    @Bean
    Binding bindingDirect() {
//        BindingBuilder.bind(隊列A).to(交換機B).with(路由) 將隊列A綁定到交換機B,使用路由C傳遞消息
        return BindingBuilder.bind(DirectQueue()).to(DirectExchange()).with(directRouting);
    }
}
發送消息
@Autowired
private RabbitTemplate rabbitTemplate;  //使用RabbitTemplate,這提供了接收/發送等等方法

@GetMapping("/sendDirectMsg")
public String sendDirectMsg() {
    Map<String,Object> map=new HashMap<String,Object>();
    map.put("id",UUID.randomUUID().toString());
    map.put("data","hello,i am direct msg!");
    map.put("datetime",System.currentTimeMillis());
    //交換機 路由 消息(發送消息的時候不需要管隊列,因爲隊列已經在DirectRabbitConfig中配置了,隊列應該是消費者關心的事情)
    rabbitTemplate.convertAndSend(DirectRabbitConfig.directExchange, DirectRabbitConfig.directRouting, map);
    return "ok";
}

注意:在發送完消息後,通過控制檯http://localhost:15672 (默認用戶名密碼都是guest),查看是否產生了消息,如下界面:

接收消息
@Component
@RabbitListener(queues = "directQueue")//監聽的隊列名稱 directQueue,不需要管路由和交換機,因爲這些是生產者管理的事情。消費者只需要關心隊列即可
public class DirectReceiver {
    @RabbitHandler
    public void handler(Map testMessage) {
        System.out.println("directReceiver消費者收到消息  : " + testMessage.toString());
    }
}

扇型

配置關聯
@Configuration
public class FanoutRabbitConfig {
    public static String fanoutQueue1="fanoutQueue1";
    public static String fanoutQueue2="fanoutQueue2";
    public static String fanoutQueue3="fanoutQueue3";
    public static String fanoutExchange = "fanoutExchange";
 
    @Bean
    public Queue queue1() {
        return new Queue(FanoutRabbitConfig.fanoutQueue1);
    }
    @Bean
    public Queue queue2() {
        return new Queue(FanoutRabbitConfig.fanoutQueue2);
    }
    @Bean
    public Queue queue3() {
        return new Queue(FanoutRabbitConfig.fanoutQueue3);
    }
    
    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange(FanoutRabbitConfig.fanoutExchange);
    }

    @Bean
    Binding bindingExchange1() {
        //將隊列fanoutQueue1 綁定到 fanoutExchange 
        return BindingBuilder.bind(queue1()).to(fanoutExchange());
    }
    @Bean
    Binding bindingExchange2() {
        return BindingBuilder.bind(queue2()).to(fanoutExchange());
    }
    @Bean
    Binding bindingExchange3() {
        return BindingBuilder.bind(queue3()).to(fanoutExchange());
    }
}
發送消息
@GetMapping("/sendFanoutMsg")
public String sendFanoutMsg() {
    Map<String, Object> map = new HashMap<String,Object>();
    map.put("id",UUID.randomUUID().toString());
    map.put("data","hello,i am fanout msg!");
    map.put("datetime",System.currentTimeMillis());
    rabbitTemplate.convertAndSend(FanoutRabbitConfig.fanoutExchange, null, map);//扇型不需要路由key,設置了也無效
    return "ok";
}
接收消息
@Component
@RabbitListener(queues = "fanoutQueue1")
public class FanoutReceiver1 {
 
    @RabbitHandler
    public void handler(Map testMessage) {
        System.out.println("FanoutReceiver1消費者收到消息  : " +testMessage.toString());
    }
 
}

注意:扇型我們配置了 3 個隊列,此處指列出了一個隊列的名稱,自行加 2 和 3 即可,使用 postman 發送消息後再控制檯可以看到:

三個隊列都接收到了消息,消費沒有先後順序。

主題型

配置
@Configuration
public class TopicRabbitConfig {
    //綁定鍵
    public static String topicQueue1 = "topicQueue1";
    public static String topicQueue2 = "topicQueue2";
    public static String topicExchange = "topicExchange";
    public static String topicRoutingApple = "fruit.apple";
//    *  表示1~n個字符 (必須出現的)
//    #  表示0~n個字符 (可能不出現)
//    若隊列綁定爲#,則無視消息路由,接收所有消息
//    當*和#都未出現時,就相當於直連direct
    public static String topicRoutingFruit = "fruit.#";

    @Bean
    public Queue topicQueue1() {
        return new Queue(TopicRabbitConfig.topicQueue1);
    }
    @Bean
    public Queue topicQueue2() {
        return new Queue(TopicRabbitConfig.topicQueue2);
    }
 
    @Bean
    TopicExchange exchange() {
        return new TopicExchange(TopicRabbitConfig.topicExchange);
    }
 
 
    //只有攜帶路由key 爲fruit.apple,纔會分發到該隊列
    @Bean
    Binding bindingExchangeMessage() {
        return BindingBuilder.bind(topicQueue1()).to(exchange()).with(TopicRabbitConfig.topicRoutingApple);
    }
    //只要是消息攜帶的路由鍵是以fruit.開頭,都會分發到該隊列
    @Bean
    Binding bindingExchangeMessage2() {
        return BindingBuilder.bind(topicQueue2()).to(exchange()).with(TopicRabbitConfig.topicRoutingFruit);
    }
}
發送消息
@GetMapping("/sendTopicMsg1")
public String sendTopicMsg1() {
    Map<String, Object> map = new HashMap<String,Object>();
    map.put("id",UUID.randomUUID().toString());
    map.put("data","hello,i am topic msg from apple!");
    map.put("datetime",System.currentTimeMillis());
    rabbitTemplate.convertAndSend(TopicRabbitConfig.topicExchange, TopicRabbitConfig.topicRoutingApple, map);
    return "ok";
}

@GetMapping("/sendTopicMsg2")
public String sendTopicMsg2() {
    Map<String, Object> map = new HashMap<String,Object>();
    map.put("id",UUID.randomUUID().toString());
    map.put("data","hello,i am topic msg from fruit!");
    map.put("datetime",System.currentTimeMillis());
    rabbitTemplate.convertAndSend(TopicRabbitConfig.topicExchange, "fruit.orange", map);
    return "ok";
}
接收消息
@Component
@RabbitListener(queues = "topicQueue1")
public class Topic1Receiver {
 
    @RabbitHandler
    public void handler(Map testMessage) {
        System.out.println("Topic1Receiver消費者收到消息  : " + testMessage.toString());
    }
}
@Component
@RabbitListener(queues = "topicQueue2")
public class Topic2Receiver {
 
    @RabbitHandler
    public void handler(Map testMessage) {
        System.out.println("Topic2Receiver消費者收到消息  : " + testMessage.toString());
    }
}

好了,這邊基本介紹就到這裏了,回顧下,我們講解了消息隊列的三種形式,及各自的應用。另外 topic 主題型隊列是一種特殊的隊列。下回我們擴展講解下消息的持久化,消息回調(生產端和消費端)

文章已經同步更新到 Java 實驗室官方站點:

https://javawu.com/archives/2917

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/1Tot-sGFTmVfwrBkp2YI3w