RabbitMQ 概念和應用詳解

RabbitMQ 概述

RabbitMQ 可以做什麼?

RabbitMQ 是實現 AMQP(高級消息隊列協議) 的消息中間件的一種,可用於在分佈式系統中存儲轉發消息,主要有以下的技術亮點:

RabbitMQ 主要用於系統間的雙向解耦,當生產者(productor)產生大量的數據時,消費者(consumer)無法快速的消費信息,那麼就需要一個類似於中間件的代理服務器,用來處理和保存這些數據,RabbitMQ 就扮演了這個角色。

如何使用 RabbitMQ

基本概念

1.Broker

用來處理數據的消息隊列服務器實體

2. 虛擬主機 (vhost)

由 RabbitMQ 服務器創建的虛擬消息主機,擁有自己的權限機制,一個 broker 裏可以開設多個 vhost,用於不同用戶的權限隔離,vhost 之間是也完全隔離的。

3. 生產者 (productor)

產生用於消息通信的數據

4. 信道 (channel)

消息通道,在 AMQP 中可以建立多個 channel,每個 channel 代表一個會話任務。

5. 交換機 (exchange)

(1) 接受消息,轉發消息到綁定的隊列,總共有四種類型的交換器:direct,fanout,topic,headers。

(2). 交換器在 RabbitMQ 中是一個存在的實體,不能改變,如有需要只能刪除重建。

(3).topic 類型的交換器利用匹配規則分析消息的 routing-key 屬性。

(4). 屬性

6. 隊列 (queue)

(1). 隊列是 RabbitMQ 的內部對象,存儲消息

(2). 可以動態的增加消費者,隊列將接受到的消息以輪詢 (round-robin) 的方式均勻的分配給多個消費者

(3). 隊列的屬性

7. 兩個 key

channel.queue_publish(exchange=exchange_name,
                     routing-key="rabbitmq",
                     body="openstack")
channel.queue_bind(exchange=exchange_name,
                  queue=queue_name,
                  routing-key="rabbitmq")

8. 綁定 (binding)

表示交換機和隊列之間的關係,在進行綁定時,帶有一個額外的參數 binding-key,來和 routing-key 相匹配。

9. 消費者 (consumer)

監聽消息隊列來進行消息數據的讀取

10. 高可用性 (HA)

(1). 在 consumer 處理完消息後,會發送消息 ACK,通知通知 RabbitMQ 消息已被處理,可以從內存刪除。如果消費者因宕機或鏈接失敗等原因沒有發送 ACK,則 RabbitMQ 會將消息重新發送給其他監聽在隊列的下一個消費者。

channel.basicConsume(queuename, noAck=false, consumer);

(2). 消息和隊列的持久化

(3). 鏡像隊列,實現不同節點之間的元數據和消息同步

RabbitMQ 在 OpenStack 中的應用

RPC 之 neutron 專題

基於 RabbitMQ 的 RPC 消息通信是 neutron 中跨模塊進行方法調用的很重要的一種方式,根據上面的描述,要組成一個完整的 RPC 通信結構,需要信息的生產者和消費者。

1.neutron-agent 中的 RPC

在 dhcp_agent、l3_agent、metadata_agent,metering_agent 的 main 函數中都存在一段創建一個 rpc 服務端的代碼,下面以 dhcp_agent 爲例。

def main():
    register_options(cfg.CONF)
    common_config.init(sys.argv[1:])
    config.setup_logging()
    server = neutron_service.Service.create(
        binary='neutron-dhcp-agent',
        topic=topics.DHCP_AGENT,
        report_interval=cfg.CONF.AGENT.report_interval,
        manager='neutron.agent.dhcp.agent.DhcpAgentWithStateReport')
    service.launch(cfg.CONF, server).wait()

最核心的,也是跟 rpc 相關的部分包括兩部分,首先是創建 rpc 服務端。

server = neutron_service.Service.create(
    binary='neutron-dhcp-agent',
    topic=topics.DHCP_AGENT,
    report_interval=cfg.CONF.AGENT.report_interval,
    manager='neutron.agent.dhcp.agent.DhcpAgentWithStateReport')

該代碼實際上創建了一個 rpc 服務端,監聽指定的 topic 並運行 manager 上的 tasks。

create() 方法返回一個 neutron.service.Service 對象,neutron.service.Service 繼承自 neutron.common.rpc.Service 類。

首先看 neutron.common.rpc.Service 類,該類定義了 start 方法,該方法主要完成兩件事情:一件事情是將 manager 添加到 endpoints 中;一件是創建 rpc 的 consumer,分別監聽 topic 的隊列消息。

而在 neutron.service.Service 類中,初始化中生成了一個 manager 實例(即 neutron.agent.dhcp_agent.DhcpAgentWithStateReport);併爲 start 方法添加了週期性執行 report_state 方法和 periodic_tasks 方法。report_state 方法沒有具體實現,periodic_tasks 方法則調用 manager 的 periodic_tasks 方法。

manager 實例(即 neutron.agent.dhcp_agent.DhcpAgentWithStateReport)在初始化的時候首先創建一個 rpc 的 client 端,通過代碼

 self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)

該 client 端實際上定義了 report_state 方法,可以狀態以 rpc 消息的方式發送給 plugin。

manager 在初始化後,還會指定週期性運行_report_state 方法,實際上就是調用 client 端的 report_state 方法。

至此,對 rpc 服務端的創建算是完成了,之後執行代碼。

service.launch(server).wait()

service.launch(server) 方法首先會將 server 放到協程組中,並調用 server 的 start 方法來啓動 server。

2.neutron-plugin 中的 RPC

主要對 ML2Plugin 進行分析,包括兩個類:RpcCallbacks 和 AgentNotifierApi。

def start_rpc_listeners(self):
    """RpcCallbacks中實現的方法:Start the RPC loop to let the plugin communicate with agents."""
    self._setup_rpc()
    self.topic = topics.PLUGIN
    self.conn = n_rpc.create_connection(new=True)
    self.conn.create_consumer(self.topic, self.endpoints, fanout=False)
    return self.conn.consume_in_threads()

創建一個通知 rpc 的客戶端,用於向 OVS 的 agent 發出通知。所有 plugin 都需要有這樣一個發出通知消息的客戶端,創建了一個 OVS agent 的通知 rpc 客戶端。之後,創建兩個跟 service agent 相關的 consumer,分別監聽 topics.PLUGIN

ovs_neutron_agent 也會創建 RPC 的 consumer,用來監聽 topics.UPDATE、topics.DELETE 等操作。

def setup_rpc(self):
        self.agent_id = 'ovs-agent-%s' % self.conf.host
        self.topic = topics.AGENT
        self.plugin_rpc = OVSPluginApi(topics.PLUGIN)
        self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
        self.dvr_plugin_rpc = dvr_rpc.DVRServerRpcApi(topics.PLUGIN)
        self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
        # RPC network init
        self.context = context.get_admin_context_without_session()
        # Handle updates from service
        self.endpoints = [self]
        # Define the listening consumers for the agent
        consumers = [[topics.PORT, topics.UPDATE],
                     [topics.PORT, topics.DELETE],
                     [constants.TUNNEL, topics.UPDATE],
                     [constants.TUNNEL, topics.DELETE],
                     [topics.SECURITY_GROUP, topics.UPDATE],
                     [topics.DVR, topics.UPDATE],
                     [topics.NETWORK, topics.UPDATE]]

3.neutron-server 中的 RPC

這個 rpc 服務端主要通過 neutron.server 中主函數中代碼執行

neutron_rpc = service.serve_rpc()

方法的實現代碼 (目錄:neutron/neutron/service.py) 如下

def serve_rpc():
    plugin = manager.NeutronManager.get_plugin()
    service_plugins = (
        manager.NeutronManager.get_service_plugins().values())
    if cfg.CONF.rpc_workers < 1:
        cfg.CONF.set_override('rpc_workers', 1)
    if not plugin.rpc_workers_supported():
        LOG.debug("Active plugin doesn't implement start_rpc_listeners")
        if 0 < cfg.CONF.rpc_workers:
            LOG.error(_LE("'rpc_workers = %d' ignored because "
                          "start_rpc_listeners is not implemented."),
                      cfg.CONF.rpc_workers)
        raise NotImplementedError()
    try:
        rpc = RpcWorker(service_plugins)
        LOG.debug('using launcher for rpc, workers=%s', cfg.CONF.rpc_workers)
        session.dispose()
        launcher = common_service.ProcessLauncher(cfg.CONF, wait_interval=1.0)
        launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers)
        if (cfg.CONF.rpc_state_report_workers > 0 and
            plugin.rpc_state_report_workers_supported()):
            rpc_state_rep = RpcReportsWorker([plugin])
            LOG.debug('using launcher for state reports rpc, workers=%s',
                      cfg.CONF.rpc_state_report_workers)
            launcher.launch_service(
                rpc_state_rep, workers=cfg.CONF.rpc_state_report_workers)
        return launcher
    except Exception:
        with excutils.save_and_reraise_exception():
            LOG.exception(_LE('Unrecoverable error: please check log for '
                              'details.'))

其中,RpcWorker(plugin) 主要通過調用 plugin 的方法來創建 rpc 服務端, 最重要的工作是調用 plugin 的 start_rpc_listeners 來監聽消息隊列:

self._servers = self._plugin.start_rpc_listeners()

該方法在大多數 plugin 中並未被實現,目前 ml2 支持該方法。

在 neutron.plugin.ml2.plugin.ML2Plugin 類中,該方法創建了一個 topic 爲 topics.PLUGIN 的消費 rpc。

def start_rpc_listeners(self):
        self.endpoints = [rpc.RpcCallbacks(self.notifier, self.type_manager),
                          agents_db.AgentExtRpcCallback()]
        self.topic = topics.PLUGIN
        self.conn = n_rpc.create_connection(new=True)
        self.conn.create_consumer(self.topic, self.endpoints,
                                  fanout=False)
        return self.conn.consume_in_threads()

RPC 之 nova 專題

在 Openstack 中,每一個 Nova 服務初始化時會創建兩個隊列,一個名爲 “NODE-TYPE.NODE-ID”,另一個名爲 “NODE-TYPE”,NODE-TYPE 是指服務的類型,NODE-ID 指節點名稱。

1.nova 中實現 exchange 的種類

2.nova 中調用 RPC 的方式

3.nova 中模塊的邏輯功能

4.nova 中的 exchange domain

以 nova 啓動虛擬機的過程爲例,詳細介紹 RPC 通信過程。

RPC.CAST 缺少了系統消息響應流程。一個 Topic 消息生產者發送系統請求消息到 Topic 交換器,Topic 交換器根據消息的 Routing Key 將消息轉發至共享消息隊列,與共享消息隊列相連的所有 Topic 消費者接收該系統請求消息,並把它傳遞給響應的 Worker 進行處理,其調用流程如圖所示:

source:  //chyufly.github.io/blog/2016/04/13/rabbitmq-introduction

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/kka7oWl4BweTixYpDtE0gg