關於 rabbitmq,你想知道的都在這裏(上)
rabbitmq 在 window 上安裝 依賴 erlang 環境,首先現在 erlang
https://www.erlang.org/downloads
然後下載 rabbitmq 的 windows 版本
https://www.rabbitmq.com/download.html
首先我們瞭解下消息隊列是由交換機 exchange 和隊列組合構成的,有三種形式
-
直連型:一個交換機關聯一個隊列,指定一個路由 key,消息通過交換機名稱和路由 key 發送到指定隊列,發送一個,隊列裏面就多一個消息。
-
扇型:一個交換機關聯多個隊列。消息通過交換機名稱發送,所有關聯了這個交換機的隊列都將收到消息,發送一個消息再 N 個消息隊列產生 N 個一模一樣的消息數據。
-
主題型:一個交換機根據規則關聯多個隊列。這種類型與扇型的很像,但是主題型會根據動態路由 key 決定消息的投遞到哪個隊列。這裏的路由規則很像正則表達式。會根據事先設定的路由規則動態將消息投遞到隊列,可能投遞到一個隊列也可能投遞到多個隊列。
首先添加依賴:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
yml 文件指定 rabbitmq 連接信息
server:
port: 8021
spring:
#給項目來個名字
application:
name: rabbitmq-provider
#配置rabbitMq 服務器
rabbitmq:
host: localhost
port: 5672
# username: skywalker
# password: skywalker
# #虛擬host 可以不設置,使用server默認host
# virtual-host: skywalker-virtualhost
#確認消息已發送到交換機(Exchange)
publisher-confirm-type: correlated
#確認消息已發送到隊列(Queue)
publisher-returns: true
注意:我們需要創建兩個工程,一個生產者
producer
、一個消費者comsumer
,生產者用來生產消息,消費者用來消費生產者將消息投遞到 rabbitmq 中的消息。
兩個工程中的 pom 依賴一樣,yml 也一樣,只需要將
server.port
設置成不同的端口即可。這裏我們將生產者設置爲8021
端口,消費者設置爲8022
端口。
直連型:
從上面的講解中我們知道,有交換機exchange
,有隊列queue
,有路由routing
,因此我們需要在生產者端將三者關聯起來,然後發送消息,這樣消費端才能收到消息。
綁定關聯
@Configuration
public class DirectRabbitConfig {
public static String directRouting = "directRouting";
public static String directQueue = "directQueue";
public static String directExchange = "directExchange";
@Bean
public Queue DirectQueue() {
return new Queue(DirectRabbitConfig.directQueue,true); //true 是否持久
}
@Bean
DirectExchange DirectExchange() {
return new DirectExchange(DirectRabbitConfig.directExchange);
}
@Bean
Binding bindingDirect() {
// BindingBuilder.bind(隊列A).to(交換機B).with(路由) 將隊列A綁定到交換機B,使用路由C傳遞消息
return BindingBuilder.bind(DirectQueue()).to(DirectExchange()).with(directRouting);
}
}
發送消息
@Autowired
private RabbitTemplate rabbitTemplate; //使用RabbitTemplate,這提供了接收/發送等等方法
@GetMapping("/sendDirectMsg")
public String sendDirectMsg() {
Map<String,Object> map=new HashMap<String,Object>();
map.put("id",UUID.randomUUID().toString());
map.put("data","hello,i am direct msg!");
map.put("datetime",System.currentTimeMillis());
//交換機 路由 消息(發送消息的時候不需要管隊列,因爲隊列已經在DirectRabbitConfig中配置了,隊列應該是消費者關心的事情)
rabbitTemplate.convertAndSend(DirectRabbitConfig.directExchange, DirectRabbitConfig.directRouting, map);
return "ok";
}
注意:在發送完消息後,通過控制檯
http://localhost:15672
(默認用戶名密碼都是guest
),查看是否產生了消息,如下界面:
接收消息
@Component
@RabbitListener(queues = "directQueue")//監聽的隊列名稱 directQueue,不需要管路由和交換機,因爲這些是生產者管理的事情。消費者只需要關心隊列即可
public class DirectReceiver {
@RabbitHandler
public void handler(Map testMessage) {
System.out.println("directReceiver消費者收到消息 : " + testMessage.toString());
}
}
扇型
配置關聯
@Configuration
public class FanoutRabbitConfig {
public static String fanoutQueue1="fanoutQueue1";
public static String fanoutQueue2="fanoutQueue2";
public static String fanoutQueue3="fanoutQueue3";
public static String fanoutExchange = "fanoutExchange";
@Bean
public Queue queue1() {
return new Queue(FanoutRabbitConfig.fanoutQueue1);
}
@Bean
public Queue queue2() {
return new Queue(FanoutRabbitConfig.fanoutQueue2);
}
@Bean
public Queue queue3() {
return new Queue(FanoutRabbitConfig.fanoutQueue3);
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(FanoutRabbitConfig.fanoutExchange);
}
@Bean
Binding bindingExchange1() {
//將隊列fanoutQueue1 綁定到 fanoutExchange
return BindingBuilder.bind(queue1()).to(fanoutExchange());
}
@Bean
Binding bindingExchange2() {
return BindingBuilder.bind(queue2()).to(fanoutExchange());
}
@Bean
Binding bindingExchange3() {
return BindingBuilder.bind(queue3()).to(fanoutExchange());
}
}
發送消息
@GetMapping("/sendFanoutMsg")
public String sendFanoutMsg() {
Map<String, Object> map = new HashMap<String,Object>();
map.put("id",UUID.randomUUID().toString());
map.put("data","hello,i am fanout msg!");
map.put("datetime",System.currentTimeMillis());
rabbitTemplate.convertAndSend(FanoutRabbitConfig.fanoutExchange, null, map);//扇型不需要路由key,設置了也無效
return "ok";
}
接收消息
@Component
@RabbitListener(queues = "fanoutQueue1")
public class FanoutReceiver1 {
@RabbitHandler
public void handler(Map testMessage) {
System.out.println("FanoutReceiver1消費者收到消息 : " +testMessage.toString());
}
}
注意:扇型我們配置了 3 個隊列,此處指列出了一個隊列的名稱,自行加 2 和 3 即可,使用 postman 發送消息後再控制檯可以看到:
三個隊列都接收到了消息,消費沒有先後順序。
主題型
配置
@Configuration
public class TopicRabbitConfig {
//綁定鍵
public static String topicQueue1 = "topicQueue1";
public static String topicQueue2 = "topicQueue2";
public static String topicExchange = "topicExchange";
public static String topicRoutingApple = "fruit.apple";
// * 表示1~n個字符 (必須出現的)
// # 表示0~n個字符 (可能不出現)
// 若隊列綁定爲#,則無視消息路由,接收所有消息
// 當*和#都未出現時,就相當於直連direct
public static String topicRoutingFruit = "fruit.#";
@Bean
public Queue topicQueue1() {
return new Queue(TopicRabbitConfig.topicQueue1);
}
@Bean
public Queue topicQueue2() {
return new Queue(TopicRabbitConfig.topicQueue2);
}
@Bean
TopicExchange exchange() {
return new TopicExchange(TopicRabbitConfig.topicExchange);
}
//只有攜帶路由key 爲fruit.apple,纔會分發到該隊列
@Bean
Binding bindingExchangeMessage() {
return BindingBuilder.bind(topicQueue1()).to(exchange()).with(TopicRabbitConfig.topicRoutingApple);
}
//只要是消息攜帶的路由鍵是以fruit.開頭,都會分發到該隊列
@Bean
Binding bindingExchangeMessage2() {
return BindingBuilder.bind(topicQueue2()).to(exchange()).with(TopicRabbitConfig.topicRoutingFruit);
}
}
發送消息
@GetMapping("/sendTopicMsg1")
public String sendTopicMsg1() {
Map<String, Object> map = new HashMap<String,Object>();
map.put("id",UUID.randomUUID().toString());
map.put("data","hello,i am topic msg from apple!");
map.put("datetime",System.currentTimeMillis());
rabbitTemplate.convertAndSend(TopicRabbitConfig.topicExchange, TopicRabbitConfig.topicRoutingApple, map);
return "ok";
}
@GetMapping("/sendTopicMsg2")
public String sendTopicMsg2() {
Map<String, Object> map = new HashMap<String,Object>();
map.put("id",UUID.randomUUID().toString());
map.put("data","hello,i am topic msg from fruit!");
map.put("datetime",System.currentTimeMillis());
rabbitTemplate.convertAndSend(TopicRabbitConfig.topicExchange, "fruit.orange", map);
return "ok";
}
接收消息
@Component
@RabbitListener(queues = "topicQueue1")
public class Topic1Receiver {
@RabbitHandler
public void handler(Map testMessage) {
System.out.println("Topic1Receiver消費者收到消息 : " + testMessage.toString());
}
}
@Component
@RabbitListener(queues = "topicQueue2")
public class Topic2Receiver {
@RabbitHandler
public void handler(Map testMessage) {
System.out.println("Topic2Receiver消費者收到消息 : " + testMessage.toString());
}
}
好了,這邊基本介紹就到這裏了,回顧下,我們講解了消息隊列的三種形式,及各自的應用。另外 topic 主題型隊列是一種特殊的隊列。下回我們擴展講解下消息的持久化,消息回調(生產端和消費端)
文章已經同步更新到 Java 實驗室官方站點:
https://javawu.com/archives/2917
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/1Tot-sGFTmVfwrBkp2YI3w