深入學習 RabbitMQ 五種模式 -二-

1. 工作模式

工作模式也被稱爲任務模型(Task Queues)。當消息處理比較耗時的時候,可能生產消息的速度會遠遠大於消息的消費速度。長此以往,消息就會堆積越來越多,無法及時處理。此時就可以使用 work 模型:讓多個消費者綁定到一個隊列,共同消費隊列中的消息。隊列中的消息一旦消費,就會消失,因此任務是不會被重複執行。

這種模式只有一個生產者 Producer,一個用於存儲消息的隊列 Queue、多個消費者 Consumer 用於接收消息。

工作隊列模式的特點有三:

1.1. 創建生產者

生產者向隊列中發送 10 條消息

package com.olive;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * 生產者(工作模式)
 */
public class WorkerProducer {

    /**隊列名稱*/
    private static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws Exception {
        // 1、創建連接
        Connection connection = ConnectionUtils.getConnection();
        // 2、創建通道
        Channel channel = connection.createChannel();
        // 3、聲明隊列 queueDeclare(隊列名稱,是否持久化,是否獨佔本連接,是否自動刪除,附加屬性參數)
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        // 4、發送10條消息
        for (int i = 1; i <= 10; i++) {
            String msg = "Hello World RabbitMQ!!!" + i;
            System.out.println("生產者發送消息:" + msg);
            // basicPublish(交換機名稱-""表示不用交換機,隊列名稱或者routingKey, 消息的屬性信息, 消息內容的字節數組);
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        }
        //釋放資源
        channel.close();
        connection.close();
    }
}

1.2. 創建消費者

創建兩個消費者 WorkerConsumer1 和 WorkerConsumer2

package com.olive;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

/**
 * 消費者1(工作模式)
 */
public class WorkerConsumer1 {

    /**隊列名稱*/
    private static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws Exception {
        // 1、獲取連接對象
        Connection connection = ConnectionUtils.getConnection();
        // 2、創建通道(頻道)
        Channel channel = connection.createChannel();

        // 3、創建隊列Queue,如果沒有一個名字叫work_queue的隊列,則會創建該隊列,如果有則不會創建.
        // 這裏可有可無,但是發送消息是必須得有該隊列,否則消息會丟失
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        // 4、監聽隊列,接收消息
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            // handleDelivery(消費者標識, 消息包的內容, 屬性信息(生產者的發送時指定), 讀取到的消息)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者獲取消息:" + new String(body));
                // 模擬消息處理延時,加個線程睡眠時間
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        // basicConsume(隊列名稱, 是否自動確認, 回調對象)
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
        //注意,消費者這裏不建議關閉資源,讓程序一直處於讀取消息的狀態
    }
}
package com.olive;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

/**
 * 消費者2(工作模式)
 */
public class WorkerConsumer2 {

    /**隊列名稱*/
    private static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws Exception {
        // 1、獲取連接對象
        Connection connection = ConnectionUtils.getConnection();
        // 2、創建通道(頻道)
        Channel channel = connection.createChannel();

        // 3、創建隊列Queue,如果沒有一個名字叫work_queue的隊列,則會創建該隊列,如果有則不會創建.
        // 這裏可有可無,但是發送消息是必須得有該隊列,否則消息會丟失
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        // 4、監聽隊列,接收消息
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            // handleDelivery(消費者標識, 消息包的內容, 屬性信息(生產者的發送時指定), 讀取到的消息)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者獲取消息:" + new String(body));
                // 模擬消息處理延時,加個線程睡眠時間
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        // basicConsume(隊列名稱, 是否自動確認, 回調對象)
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
        //注意,消費者這裏不建議關閉資源,讓程序一直處於讀取消息的狀態
    }
}

消費者 2 與消費者 1 的代碼邏輯是一模一樣的

1.3. 驗證

首先分別啓動兩個消費者**(注意這裏一定要先啓動消費者)**

從 RabbitMQ 管理後臺查看,已經創建了work_queue隊列。

啓動生產者,分別查看消費者 1 與消費者 2 的控制檯的打印信息

消費者 1WorkerConsumer1

消費者 2WorkerConsumer2

從兩個消費者控制檯的打印結果看,兩個消費者消費的消息像是輪詢方式消費的。

上面實現的就是輪詢分發的方式。

現象:消費者 1 處理完消息之後,消費者 2 才能處理,它兩這樣輪着來處理消息,直到消息處理完成,這種方式叫輪詢分發 (round-robin),結果就是不管兩個消費者誰忙,數據總是你消費一個我消費一個,不管消費者處理數據的性能,此時 autoAck = true。

/**
* @param queue 隊列名稱
* @param autoAck 是否自動發送確認,true自動確認,表示接收完消息後,自動將消息在隊列中移除;false手動發送ack確認消息
* @param callback 回調對象
*/
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

注意:autoAck 屬性設置爲 true,表示消息自動確認。消費者在消費時消息的確認模式可以分爲:自動確認和手動確認

自動確認:在隊列中的消息被消費者讀取之後會自動從隊列中刪除。不管消息是否被消費者消費成功,消息都會刪除。

手動確認:當消費者讀取消息後,消費端需要手動發送 ACK 用於確認消息已經消費成功了(也就是需要自己編寫代碼發送 ACK 確認),如果設爲手動確認而沒有發送 ACK 確認,那麼消息就會一直存在隊列中(前提是進行了持久化操作),後續就可能會造成消息重複消費,如果過多的消息堆積在隊列中,還可能造成內存溢出,手動確認消費者在處理完消息之後要及時發送 ACK 確認給隊列

使用輪詢分發的方式會有一個明顯的缺點,例如,消費者 1 處理數據的效率很慢,消費者 2 處理數據的效率很高,正常情況下消費者 2 處理的數據應該多一點纔對,而輪詢分發則不管你的性能如何,反正就是每次處理一個消息,對於這種情況可以使用公平分發的方式來解決。

要實現公平分發,需要做如下修改:

  1. 消費者:保證消息一次只分發一次

  2. 消費者:關閉自動確認,並且手動發送 ACK 給隊列

修改後再次運行,由於消費者 1 設置處理完一個消息後睡眠 2 秒,而消費者 2 爲 1 秒,所以期望輸出的結果爲:消費者 2 處理消息的速度大概是消費者 1 的兩倍左右,結果如下。

消費者 1

消費者 2

2. 發佈訂閱模式

發佈訂閱模式(Publish/Subscribe):該模式需要涉及到交換機了,也可以稱它爲廣播模式,消息通過交換機廣播到所有與其綁定的隊列中。

一個消費者將消息首先發送到交換機上(這裏的交換機類型爲 fanout),然後交換機綁定到多個隊列,這樣每個發到 fanout 類型交換器的消息會被分發到所有的隊列中,最後被監聽該隊列的消費者所接收並消費。如下圖所示:

package com.olive;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * 生產者(發佈訂閱模式)
 */
public class PubSubProducer {

    // 交換機名稱
    private static final String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws Exception {
        // 1、創建連接
        Connection connection = ConnectionUtils.getConnection();
        // 2、創建通道
        Channel channel = connection.createChannel();
        // 3、連續發送10條消息
        for (int i = 1; i <= 10; i++) {
            String msg = "Hello World RabbitMQ!!!~~~" + i;
            System.out.println("生產者發送的消息:" + msg);
            //basicPublish(交換機名稱[默認Default Exchage],路由key[簡單模式可以傳遞隊列名稱],消息其它屬性,發送的消息內容)
            channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
        }
        //關閉資源
        channel.close();
        connection.close();
    }
}

由於從這裏開始涉及到交換機了,使用這裏介紹一下四種交換機的類型:

  1. direct(直連):消息中的路由鍵(RoutingKey)如果和 Bingding 中的 bindingKey 完全匹配,交換器就將消息發到對應的隊列中。是基於完全匹配、單播的模式。

  2. fanout(廣播):把所有發送到 fanout 交換器的消息路由到所有綁定該交換器的隊列中,fanout 類型轉發消息是最快的。

  3. topic(主題):通過模式匹配的方式對消息進行路由,將路由鍵和某個模式進行匹配,此時隊列需要綁定到一個模式上。匹配規則:

① RoutingKey 和 BindingKey 爲一個 點號 '.' 分隔的字符串。比如: stock.usd.nyse;可以放任意的 key 在 routing_key 中,當然最長不能超過 255 bytes。

② BindingKey 可使用 * 和 # 用於做模糊匹配:* 匹配一個單詞,# 匹配 0 個或者多個單詞;

  1. headers:不依賴於路由鍵進行匹配,是根據發送消息內容中的 headers 屬性進行匹配,除此之外 headers 交換器和 direct 交換器完全一致,但性能差很多,目前幾乎用不到了。

消費者 1

注意:在發送消息前,RabbitMQ 服務器中必須的有隊列,否則消息可能會丟失,如果還涉及到交換機與隊列綁定,那麼就得先聲明交換機、隊列並且設置綁定的路由值 (Routing Key),以免程序出現異常,由於本例所有的聲明都是在消費者中,所以我們首先要啓動消費者。如果 RabbitMQ 服務器中已經存在了聲明的隊列或者交換機,那麼就不在創建,如果沒有則創建相應名稱的隊列或者交換機。

package com.olive;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

/**
 * 消費者1(發佈訂閱模式)
 */
public class PubSubConsumer1 {

    // 隊列名稱
    private static final String QUEUE_NAME1 = "fanout_queue1";
    // 交換機名稱
    private static final String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws Exception {
        // 1、獲取連接對象
        Connection connection = ConnectionUtils.getConnection();
        // 2、創建通道(頻道)
        Channel channel = connection.createChannel();

        /* 3、聲明交換機
         * exchange  參數1:交換機名稱
         * type      參數2:交換機類型
         * durable   參數3:交換機是否持久化
         */
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true);

        // 4、聲明隊列Queue queueDeclare(隊列名稱,是否持久化,是否獨佔本連接,是否自動刪除,附加參數)
        channel.queueDeclare(QUEUE_NAME1, true, false, false, null);

        // 5、綁定隊列和交換機 queueBind(隊列名, 交換機名, 路由key[交換機的類型爲fanout ,routingKey設置爲""])
        channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "");

        // 6、監聽隊列,接收消息
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //獲取交換機信息
                String exchange = envelope.getExchange();
                //獲取消息信息
                String message = new String(body, "utf-8");
                System.out.println("交換機名稱:" + exchange + ",消費者獲取消息: " + message);
            }
        };
        channel.basicConsume(QUEUE_NAME1, true, defaultConsumer);

        //注意,消費者這裏不建議關閉資源,讓程序一直處於讀取消息的狀態
    }
}

消費者 2

消費者 1 基本一樣,只是隊列名稱不同

package com.olive;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

/**
 * 消費者2(發佈訂閱模式)
 */
public class PubSubConsumer2 {

 // 隊列名稱
 private static final String QUEUE_NAME2 = "fanout_queue2";
 // 交換機名稱
 private static final String EXCHANGE_NAME = "fanout_exchange";

 public static void main(String[] args) throws Exception {
  // 1、獲取連接對象
  Connection connection = ConnectionUtils.getConnection();
  // 2、創建通道(頻道)
  Channel channel = connection.createChannel();
  // 3、聲明交換機,如果沒有名稱爲EXCHANGE_NAME的交換機則創建,有則不創建
  channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true);
  // 4、聲明隊列Queue。channel.queueDeclare(隊列名稱,是否持久化,是否獨佔本連接,是否自動刪除,附加參數)
  channel.queueDeclare(QUEUE_NAME2, true, false, false, null);
  // 5、綁定隊列和交換機。channel.queueBind(隊列名, 交換機名, 路由key[fanout交換機的routingKey設置爲""])
  channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "");
  // 6、監聽隊列,接收消息
  DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
   @Override
   public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
     byte[] body) throws IOException {
    // 獲取交換機信息
    String exchange = envelope.getExchange();
    // 獲取消息信息
    String message = new String(body, "utf-8");
    System.out.println("交換機名稱:" + exchange + ",消費者獲取消息: " + message);
   }
  };
  channel.basicConsume(QUEUE_NAME2, true, defaultConsumer);
  // 注意,消費者這裏不建議關閉資源,讓程序一直處於讀取消息的狀態
 }
}

首先分別啓動所有消費者,然後使用生產者發送消息;在每個消費者對應的控制檯可以查看到生產者發送的所有消息;達到廣播的效果。

消費者 1

消費者 2

執行完測試代碼後,在 RabbitMQ 的管理後臺找到 Exchanges 選項卡,點擊fanout_exchange交換機,可以查看到如下的綁定:

fanout_exchange是代碼中定義的交換機的名稱;fanout_queue1fanout_queue2是代碼中消費者 1 和消費者 2 定義的兩個隊列的名稱

發佈訂閱模式引入了交換機的概念,所以相對前面的類型更加靈活廣泛一些。這種模式需要設置類型爲 fanout 的交換機,並且將交換機和隊列進行綁定,當消息發送到交換機後,交換機會將消息發送到所有綁定的隊列,最後被監聽該隊列的消費者所接收並消費。發佈訂閱模式也可以叫廣播模式,不需要 RoutingKey 的判斷。

發佈訂閱模式與工作隊列模式的區別:

  1. 工作隊列模式不用定義交換機,而發佈 / 訂閱模式需要定義交換機。

  2. 發佈 / 訂閱模式的生產方是面向交換機發送消息,工作隊列模式的生產方是面向隊列發送消息 (底層使用默認交換機)。

  3. 發佈 / 訂閱模式需要設置隊列和交換機的綁定,工作隊列模式不需要設置,實際上工作隊列模式會將隊列綁定到默認的交換機 。

參考 cnblogs.com/tanghaorong/p/14992330.html

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/BQoAmCM13apdmOkV53ElZw