深入學習 RabbitMQ 五種模式 -一-

1. 安裝 erlang

下載otp_win64_25.3.exe

https://www.erlang.org/downloads

erlang 安裝完成,需要配置 erlang 環境變量

ERLANG_HOME=E:\software\Erlang OTP

PATH=%PATH%;%ERLANG_HOME%\bin;

2. 安裝 RabbitMQ

下載rabbitmq-server-3.11.13.exe

https://www.rabbitmq.com/download.html

進入安裝目錄下 sbin 目錄,安裝並運行服務

安裝服務: rabbitmq-service.bat install
刪除服務:rabbitmq-service.bat remove
啓動服務:rabbitmq-service.bat start
停止服務: rabbitmq-service.bat stop

安裝管理插件

安裝 RabbitMQ 的管理插件,方便在瀏覽器端管理 RabbitMQ

管理員身份打開 cmd,進入E:\software\RabbitMQ Server\rabbitmq_server-3.11.13\sbin目錄,運行

rabbitmq-plugins.bat enable rabbitmq_management

執行結果

重啓一下 RabbitMQ

啓動成功,登錄 RabbitMQ

訪問地址http://127.0.0.1:15672/;初始賬號和密碼guest/guest

3. RabbitMQ 常用五種模式

3.1. 簡單模式

該模式是個一對一模式,只有一個生產者 Producer(用於生產消息),一個隊列 Queue(用於存儲消息),一個消費者 Consumer (用於接收消息)。

注:簡單模式也用到了交換機,使用的是默認的交換機 (AMQP default)。

pom.xml 引入以下依賴

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <groupId>com.olive</groupId>
 <artifactId>rabbitmq-learn</artifactId>
 <version>0.0.1-SNAPSHOT</version>
 <properties>
        <maven.compiler.target>1.8</maven.compiler.target>
        <maven.compiler.source>1.8</maven.compiler.source>
    </properties>

 <dependencies>
  <!-- mq的依賴 -->
  <dependency>
   <groupId>com.rabbitmq</groupId>
   <artifactId>amqp-client</artifactId>
   <version>5.16.0</version>
  </dependency>
  <!-- 日誌處理 -->
  <dependency>
   <groupId>org.slf4j</groupId>
   <artifactId>slf4j-log4j12</artifactId>
   <version>1.7.21</version>
  </dependency>
  <dependency>
   <groupId>log4j</groupId>
   <artifactId>log4j</artifactId>
   <version>1.2.17</version>
  </dependency>
 </dependencies>
</project>

RabbitMQ 官方提供了amqp-clientjava 客戶端連接 RabbitMQ Server;倉庫地址如下

https://github.com/rabbitmq/rabbitmq-java-client
package com.olive;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 封裝連接工具類
 */
public class ConnectionUtils {
 
    public static Connection getConnection() throws Exception {
        // 1.定義連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        // 2.設置服務器地址
        factory.setHost("127.0.0.1");
        // 3.設置協議端口號
        factory.setPort(5672);
        // 4.虛擬主機名稱;默認爲 /
        factory.setVirtualHost("/");
        // 5.設置用戶名稱
        factory.setUsername("admin");
        // 6.設置用戶密碼
        factory.setPassword("admin123");
        // 7.創建連接
        Connection connection = factory.newConnection();
        return connection;
    }
}

在 RabbitMQ 管理後臺創建 admin 用戶;可以使用默認的 guest 用戶。

生產者負責創建消息並且將消息發送至指定的隊列中,簡單分爲 5 步:

創建連接 ——> 創建通道 ——> 創建(聲明)隊列 ——> 發送消息 ——> 關閉資源

package com.olive;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * 生產者(簡單模式)
 */
public class SimpleProducer {

    /**隊列名稱*/
    private static final String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws Exception {
        // 1、獲取連接
        Connection connection = ConnectionUtils.getConnection();
        // 2、創建通道(頻道)
        Channel channel = connection.createChannel();
        // 3、聲明(創建)隊列
        /*
         * queue      參數1:聲明通道中對應的隊列名稱
         * durable    參數2:是否定義持久化隊列,當mq重啓之後隊列還在
         * exclusive  參數3:是否獨佔本次連接,爲true則只能有一個消費者監聽這個隊列
         * autoDelete 參數4:是否自動刪除隊列,如果爲true表示沒有消息也沒有消費者連接自動刪除隊列
         * arguments  參數5:隊列其它參數(額外配置)
         */
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        // 4.發送消息
        /*
         * exchange   參數1:交換機名稱,如果沒有指定則使用默認Default Exchange
         * routingKey 參數2:隊列名稱或者routingKey,如果指定了交換機就是routingKey路由key,簡單模式可以傳遞隊列名稱
         * props      參數3:消息的配置信息
         * body       參數4:要發送的消息內容
         */
        String msg = "Hello World RabbitMQ!!!";
        System.out.println("生產者發送的消息:" + msg);
        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());

        //關閉資源
        channel.close();
        connection.close();
    }
}

消費者實現和生產者實現過程差不多,但是沒有關閉通道和連接,因爲消費者要一直等待隨時可能發來的消息,大致分爲如下 3 步:

獲取連接 ——> 創建通道 ——> 監聽隊列,接收消息

package com.olive;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

/**
 * 消費者(簡單模式)
 */
public class SimpleConsumer {

    /**隊列名稱*/
    private static final String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws Exception {
        // 1、獲取連接對象
        Connection connection = ConnectionUtils.getConnection();
        // 2、創建通道(頻道)
        Channel channel = connection.createChannel();

        // 3. 創建隊列Queue,如果沒有一個名字叫simple_world的隊列,則會創建該隊列,如果有則不會創建.
        // 這裏可有可無,但是發送消息是必須得有該隊列,否則消息會丟失
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        // 4、監聽隊列,接收消息
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            /*
             *  handleDelivery回調方法,當收到消息後,會自動執行該方法
             *  consumerTag 參數1:消費者標識
             *  envelope    參數2:可以獲取一些信息,如交換機,路由key...
             *  properties  參數3:配置信息
             *  body        參數4:讀取到的消息
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者獲取消息:" + new String(body));
            }
        };
        /*
         * queue    參數1:隊列名稱
         * autoAck  參數2:是否自動確認,true表示自動確認接收完消息以後會自動將消息從隊列移除。否則需要手動ack消息
         * callback 參數3:回調對象,在上面定義了
         */
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);

        //注意,消費者這裏不建議關閉資源,讓程序一直處於讀取消息的狀態
    }
}

運行生產者的代碼,表示向隊列中發送消息。

查看 RabbitMQ 控制檯中的 Queues 內容

啓動消費者,消費 RabbitMQ 隊列中的消息。

在通過 RabbitMQ 控制檯查看 Queues 的內容;發現消息已經被消費

簡單模式的不足之處:該模式是一對一,一個生產者向一個隊列中發送消息,一個消費者從綁定的隊列中獲取消息,這樣耦合性過高,如果有多個消費者想消費隊列中信息就無法實現了。

參考

www.cnblogs.com/zwh0910/p/16056182.html#autoid-5-0-0 www.cnblogs.com/tanghaorong/p/14992330.html#_label0

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/8YbyOFOAgn9R9UfTMIZJOQ