RabbitMQ 概念和應用詳解
RabbitMQ 概述
RabbitMQ 可以做什麼?
RabbitMQ 是實現 AMQP(高級消息隊列協議) 的消息中間件的一種,可用於在分佈式系統中存儲轉發消息,主要有以下的技術亮點:
-
可靠性
-
靈活的路由
-
集羣部署
-
高可用的隊列消息
-
可視化的管理工具
RabbitMQ 主要用於系統間的雙向解耦,當生產者(productor)產生大量的數據時,消費者(consumer)無法快速的消費信息,那麼就需要一個類似於中間件的代理服務器,用來處理和保存這些數據,RabbitMQ 就扮演了這個角色。
如何使用 RabbitMQ
-
Erlang 語言包
-
RabbitMQ 安裝包
基本概念
1.Broker
用來處理數據的消息隊列服務器實體
2. 虛擬主機 (vhost)
由 RabbitMQ 服務器創建的虛擬消息主機,擁有自己的權限機制,一個 broker 裏可以開設多個 vhost,用於不同用戶的權限隔離,vhost 之間是也完全隔離的。
3. 生產者 (productor)
產生用於消息通信的數據
4. 信道 (channel)
消息通道,在 AMQP 中可以建立多個 channel,每個 channel 代表一個會話任務。
5. 交換機 (exchange)
(1) 接受消息,轉發消息到綁定的隊列,總共有四種類型的交換器:direct,fanout,topic,headers。
- **direct:**轉發消息到 routing-key 指定的隊列
-
**fanout:**轉發消息到所有綁定的隊列,類似於一種廣播發送的方式。
-
**topic:**按照規則轉發消息,這種規則多爲模式匹配,也顯得更加靈活
(2). 交換器在 RabbitMQ 中是一個存在的實體,不能改變,如有需要只能刪除重建。
(3).topic 類型的交換器利用匹配規則分析消息的 routing-key 屬性。
(4). 屬性
-
**持久性:**聲明時 durable 屬性爲 true
-
**自動刪除:**綁定的 queue 刪除也跟着刪除
-
**惰性:**不會自動創建
6. 隊列 (queue)
(1). 隊列是 RabbitMQ 的內部對象,存儲消息
(2). 可以動態的增加消費者,隊列將接受到的消息以輪詢 (round-robin) 的方式均勻的分配給多個消費者
(3). 隊列的屬性
-
**持久性:**如果啓用,隊列將會在 server 重啓之前有效
-
**自動刪除:**消費者停止使用之後就會自動刪除
-
**惰性:**不會自動創建
-
**排他性:**如果啓用,隊列只能被聲明它的消費者使用。
7. 兩個 key
- routing-key: 消息不能直接發到 queues,需要先發送到 exchanges,routing-key 指定 queues 名稱,exchanges 通過 routing-key 來識別與之綁定的 queues
channel.queue_publish(exchange=exchange_name,
routing-key="rabbitmq",
body="openstack")
- binding-key: 主要是用來表示 exchanges 和 queues 之間的關係,爲了區別 queue_publish 的 routing-key,就稱作 binding-key。
channel.queue_bind(exchange=exchange_name,
queue=queue_name,
routing-key="rabbitmq")
8. 綁定 (binding)
表示交換機和隊列之間的關係,在進行綁定時,帶有一個額外的參數 binding-key,來和 routing-key 相匹配。
9. 消費者 (consumer)
監聽消息隊列來進行消息數據的讀取
10. 高可用性 (HA)
(1). 在 consumer 處理完消息後,會發送消息 ACK,通知通知 RabbitMQ 消息已被處理,可以從內存刪除。如果消費者因宕機或鏈接失敗等原因沒有發送 ACK,則 RabbitMQ 會將消息重新發送給其他監聽在隊列的下一個消費者。
channel.basicConsume(queuename, noAck=false, consumer);
(2). 消息和隊列的持久化
(3). 鏡像隊列,實現不同節點之間的元數據和消息同步
RabbitMQ 在 OpenStack 中的應用
RPC 之 neutron 專題
基於 RabbitMQ 的 RPC 消息通信是 neutron 中跨模塊進行方法調用的很重要的一種方式,根據上面的描述,要組成一個完整的 RPC 通信結構,需要信息的生產者和消費者。
-
**client 端:**用於產生 rpc 消息。
-
**server 端:**用於監聽消息數據並進行相應的處理。
1.neutron-agent 中的 RPC
在 dhcp_agent、l3_agent、metadata_agent,metering_agent 的 main 函數中都存在一段創建一個 rpc 服務端的代碼,下面以 dhcp_agent 爲例。
def main():
register_options(cfg.CONF)
common_config.init(sys.argv[1:])
config.setup_logging()
server = neutron_service.Service.create(
binary='neutron-dhcp-agent',
topic=topics.DHCP_AGENT,
report_interval=cfg.CONF.AGENT.report_interval,
manager='neutron.agent.dhcp.agent.DhcpAgentWithStateReport')
service.launch(cfg.CONF, server).wait()
最核心的,也是跟 rpc 相關的部分包括兩部分,首先是創建 rpc 服務端。
server = neutron_service.Service.create(
binary='neutron-dhcp-agent',
topic=topics.DHCP_AGENT,
report_interval=cfg.CONF.AGENT.report_interval,
manager='neutron.agent.dhcp.agent.DhcpAgentWithStateReport')
該代碼實際上創建了一個 rpc 服務端,監聽指定的 topic 並運行 manager 上的 tasks。
create() 方法返回一個 neutron.service.Service 對象,neutron.service.Service 繼承自 neutron.common.rpc.Service 類。
首先看 neutron.common.rpc.Service 類,該類定義了 start 方法,該方法主要完成兩件事情:一件事情是將 manager 添加到 endpoints 中;一件是創建 rpc 的 consumer,分別監聽 topic 的隊列消息。
而在 neutron.service.Service 類中,初始化中生成了一個 manager 實例(即 neutron.agent.dhcp_agent.DhcpAgentWithStateReport);併爲 start 方法添加了週期性執行 report_state 方法和 periodic_tasks 方法。report_state 方法沒有具體實現,periodic_tasks 方法則調用 manager 的 periodic_tasks 方法。
manager 實例(即 neutron.agent.dhcp_agent.DhcpAgentWithStateReport)在初始化的時候首先創建一個 rpc 的 client 端,通過代碼
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
該 client 端實際上定義了 report_state 方法,可以狀態以 rpc 消息的方式發送給 plugin。
manager 在初始化後,還會指定週期性運行_report_state 方法,實際上就是調用 client 端的 report_state 方法。
至此,對 rpc 服務端的創建算是完成了,之後執行代碼。
service.launch(server).wait()
service.launch(server) 方法首先會將 server 放到協程組中,並調用 server 的 start 方法來啓動 server。
2.neutron-plugin 中的 RPC
主要對 ML2Plugin 進行分析,包括兩個類:RpcCallbacks 和 AgentNotifierApi。
-
**RpcCallbacks:**負責當 agent 往 plugin 發出 rpc 請求時候,plugin 實現請求的相關動作,除了繼承自父類(dhcp rpc、dvr rpc、sg_db rpc 和 tunnel rpc)中的方法,還包括 get_port_from_device、get_device_details、get_devices_details_list、update_device_down、update_device_up、get_dvr_mac_address_by_host、get_compute_ports_on_host_by_subnet、get_subnet_for_dvr 等方法。
-
**AgentNotifierApi:**負責當 plugin 往 agent 發出 rpc 請求(plugin 通知 agent)的時候,plugin 端的方法。
def start_rpc_listeners(self):
"""RpcCallbacks中實現的方法:Start the RPC loop to let the plugin communicate with agents."""
self._setup_rpc()
self.topic = topics.PLUGIN
self.conn = n_rpc.create_connection(new=True)
self.conn.create_consumer(self.topic, self.endpoints, fanout=False)
return self.conn.consume_in_threads()
創建一個通知 rpc 的客戶端,用於向 OVS 的 agent 發出通知。所有 plugin 都需要有這樣一個發出通知消息的客戶端,創建了一個 OVS agent 的通知 rpc 客戶端。之後,創建兩個跟 service agent 相關的 consumer,分別監聽 topics.PLUGIN
ovs_neutron_agent 也會創建 RPC 的 consumer,用來監聽 topics.UPDATE、topics.DELETE 等操作。
def setup_rpc(self):
self.agent_id = 'ovs-agent-%s' % self.conf.host
self.topic = topics.AGENT
self.plugin_rpc = OVSPluginApi(topics.PLUGIN)
self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
self.dvr_plugin_rpc = dvr_rpc.DVRServerRpcApi(topics.PLUGIN)
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
# RPC network init
self.context = context.get_admin_context_without_session()
# Handle updates from service
self.endpoints = [self]
# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.PORT, topics.DELETE],
[constants.TUNNEL, topics.UPDATE],
[constants.TUNNEL, topics.DELETE],
[topics.SECURITY_GROUP, topics.UPDATE],
[topics.DVR, topics.UPDATE],
[topics.NETWORK, topics.UPDATE]]
3.neutron-server 中的 RPC
這個 rpc 服務端主要通過 neutron.server 中主函數中代碼執行
neutron_rpc = service.serve_rpc()
方法的實現代碼 (目錄:neutron/neutron/service.py) 如下
def serve_rpc():
plugin = manager.NeutronManager.get_plugin()
service_plugins = (
manager.NeutronManager.get_service_plugins().values())
if cfg.CONF.rpc_workers < 1:
cfg.CONF.set_override('rpc_workers', 1)
if not plugin.rpc_workers_supported():
LOG.debug("Active plugin doesn't implement start_rpc_listeners")
if 0 < cfg.CONF.rpc_workers:
LOG.error(_LE("'rpc_workers = %d' ignored because "
"start_rpc_listeners is not implemented."),
cfg.CONF.rpc_workers)
raise NotImplementedError()
try:
rpc = RpcWorker(service_plugins)
LOG.debug('using launcher for rpc, workers=%s', cfg.CONF.rpc_workers)
session.dispose()
launcher = common_service.ProcessLauncher(cfg.CONF, wait_interval=1.0)
launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers)
if (cfg.CONF.rpc_state_report_workers > 0 and
plugin.rpc_state_report_workers_supported()):
rpc_state_rep = RpcReportsWorker([plugin])
LOG.debug('using launcher for state reports rpc, workers=%s',
cfg.CONF.rpc_state_report_workers)
launcher.launch_service(
rpc_state_rep, workers=cfg.CONF.rpc_state_report_workers)
return launcher
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_LE('Unrecoverable error: please check log for '
'details.'))
其中,RpcWorker(plugin) 主要通過調用 plugin 的方法來創建 rpc 服務端, 最重要的工作是調用 plugin 的 start_rpc_listeners 來監聽消息隊列:
self._servers = self._plugin.start_rpc_listeners()
該方法在大多數 plugin 中並未被實現,目前 ml2 支持該方法。
在 neutron.plugin.ml2.plugin.ML2Plugin 類中,該方法創建了一個 topic 爲 topics.PLUGIN 的消費 rpc。
def start_rpc_listeners(self):
self.endpoints = [rpc.RpcCallbacks(self.notifier, self.type_manager),
agents_db.AgentExtRpcCallback()]
self.topic = topics.PLUGIN
self.conn = n_rpc.create_connection(new=True)
self.conn.create_consumer(self.topic, self.endpoints,
fanout=False)
return self.conn.consume_in_threads()
RPC 之 nova 專題
在 Openstack 中,每一個 Nova 服務初始化時會創建兩個隊列,一個名爲 “NODE-TYPE.NODE-ID”,另一個名爲 “NODE-TYPE”,NODE-TYPE 是指服務的類型,NODE-ID 指節點名稱。
1.nova 中實現 exchange 的種類
-
direct: 初始化中,各個模塊對每一條系統消息自動生成多個隊列放入 RabbitMQ 服務器中,隊列中綁定的 binding-key 要與 routing-key 匹配
-
topic: 各個模塊也會自動生成兩個隊列放入 RabbitMQ 服務器中。
2.nova 中調用 RPC 的方式
-
RPC.CALL: 用於請求和響應方式
-
RPC.CAST: 只是提供單向請求
3.nova 中模塊的邏輯功能
-
Invoker: 向消息隊列中發送系統請求信息,如 Nova-API 和 Nova-Scheduler,通過 RPC.CALL 和 RPC.CAST 兩個進程發送系統請求消息。
-
Worker: 從消息隊列中獲取 Invoker 模塊發送的系統請求消息以及向 Invoker 模塊回覆系統響應消息,如 Nova-Compute、Nova-Volume 和 Nova-Network,對 RPC.CALL 做出響應。
4.nova 中的 exchange domain
-
direct exchange domain: Topic 消息生產者(Nova-API 或者 Nova-Scheduler)與 Topic 交換器生成邏輯連接,通過 PRC.CALL 或者 RPC.CAST 進程將系統請求消息發往 Topic 交換器。交換器根據不同的 routing-key 將系統請求消息轉發到不同的類型的消息隊列。Topic 消息消費者探測到新消息已進入響應隊列,立即從隊列中接收消息並調用執行系統消息所請求的應用程序。
-
點到點消息隊列:Topic 消息消費者應用程序接收 RPC.CALL 的遠程調用請求,並在執行相關計算任務之後將結果以系統響應消息的方式通過 Direct 交換器反饋給 Direct 消息消費者。
-
共享消息隊列:Topic 消息消費者應用程序只是接收 RPC.CAST 的遠程調用請求來執行相關的計算任務,並沒有響應消息反饋。
-
topic exchange domain: Direct 交換域並不是獨立運作,而是受限於 Topic 交換域中 RPC.CALL 的遠程調用流程與結果,每一個 RPC.CALL 激活一次 Direct 消息交換的運作。
以 nova 啓動虛擬機的過程爲例,詳細介紹 RPC 通信過程。
RPC.CAST 缺少了系統消息響應流程。一個 Topic 消息生產者發送系統請求消息到 Topic 交換器,Topic 交換器根據消息的 Routing Key 將消息轉發至共享消息隊列,與共享消息隊列相連的所有 Topic 消費者接收該系統請求消息,並把它傳遞給響應的 Worker 進行處理,其調用流程如圖所示:
source: //chyufly.github.io/blog/2016/04/13/rabbitmq-introduction
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/kka7oWl4BweTixYpDtE0gg