微服務間通信的最佳實踐
一個好的 API 架構對於有效處理微服務之間的通信很重要。不要害怕創建新的微服務,並儘可能地嘗試解耦功能。例如,與其創建一個通知服務,不如嘗試爲電子郵件通知、SMS 通知和移動推送通知創建單獨的微服務。
在這裏,我假設您有一個 API 網關來管理請求、處理到負載平衡服務器的路由並限制未經授權的訪問。
通訊類型
-
同步協議:HTTP 是一種同步協議。客戶端發送請求並等待服務的響應。這與客戶端代碼執行無關,它可以是同步的(線程被阻塞)或異步的(線程未被阻塞,並且響應最終會到達回調)。這裏的重點是協議(HTTP/HTTPS)是同步的,客戶端代碼只有在收到 HTTP 服務器響應後才能繼續其任務。
-
異步協議:其他協議如 AMQP(許多操作系統和雲環境支持的協議)使用異步消息。客戶端代碼或消息發送者通常不等待響應。它只是將消息發送到消息代理服務,例如 RabbitMQ 或 Kafka(如果我們使用的是事件驅動架構)。
爲什麼你應該避免同步協議
-
如果您不斷添加相互通信的新微服務,那麼在代碼中使用端點會造成混亂,尤其是當您必須在端點中傳遞額外信息時。例如,身份驗證令牌。
-
您必須等待耗時的調用才能獲得響應。
-
如果響應失敗並且您有重試策略,那麼它可能會造成瓶頸。
-
如果接收器服務關閉或無法處理請求,那麼我們要等到服務啓動。例如,在電子商務網站中,用戶下訂單並請求發送到發貨服務以發貨,但發貨服務關閉,我們丟失了訂單。一旦完成,如何將相同的訂單發送到運輸服務?
-
接收方可能無法一次處理大量請求,因此應該有一個地方讓請求必須等待,直到接收方準備好處理下一個請求。
爲了應對這些挑戰,我們可以使用一箇中間服務來處理兩個微服務之間的通信,也稱爲 “消息代理”。
RabbitMQ 被廣泛用作消息代理服務,如果您將 Azure 雲作爲託管服務提供商,您也可以使用 Azure 服務總線。
如何使用 RabbitMQ 來處理微服務之間的通信
可能存在發件人想要向多個服務發送消息的情況。讓我們看看 RabbitMQ 如何處理的下圖。
當發佈者發送消息時,它被 Exchange 接收,然後 Exchange 將其發送到目標隊列。消息保持在隊列中,直到接收方接收並處理它。
交換類型
-
直接交換根據消息路由鍵將消息傳遞到隊列。這是默認的交換類型。
-
扇出交換將消息傳遞到所有隊列。
-
Header Exchange 根據消息頭標識目標隊列。
-
主題交換類似於直接交換,但路由是根據路由模式完成的。它不使用固定的路由鍵,而是使用通配符。
例如,假設我們有以下路由模式。
-
order.logs.customer
-
order.logs.international
-
order.logs.customer.electronics
-
order.logs.international.electronics
“order...electronics” 的路由模式只匹配第一個詞是 “order”,第四個詞是“electronics” 的路由鍵。
“order.logs.customer.#”的路由模式匹配任何以 “order.logs.customer” 開頭的路由鍵。
實現 RabbitMQ
安裝
按照此鏈接在 Windows 上安裝 RabbitMQ。安裝後 RabbitMQ 服務將在 http://localhost:15672/ 上啓動並運行。在用戶名和密碼中輸入 “guest” 登錄,您將能夠看到所有靜態信息。
創建發件人服務
RabbitMQ 啓動並運行後,創建兩個控制檯應用程序
Sender:向 RabbitMQ 發送消息
Receiver:從 RabbitMQ 接收消息
向兩個應用程序添加包 “RabbitMQ.Client”。
using System;
using RabbitMQ.Client;
using System.Text;
class Send
{
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "hello", durable: false, exclusive: false,
autoDelete: false, arguments: null);
string message = "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "", routingKey: "hello",
basicProperties: null, body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
上面的代碼將創建一個到 RabbitMQ 的連接,創建一個隊列 “hello” 並向隊列發佈一條消息。
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
class Receive
{
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "hello", durable: false,
exclusive: false, autoDelete: false, arguments: null);
Console.WriteLine(" [*] Waiting for messages.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
};
channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
上面的代碼將創建一個到 RabbitMQ 的連接,創建一個隊列(如果它還沒有創建),並註冊一個將接收和處理消息的處理程序。
在運行發送方和接收方應用程序時,您將能夠看到在 RabbitMQ 門戶上創建的隊列,以及表示收到新消息的圖形上的尖峯。從門戶中,您將能夠看到哪個服務有待處理的消息,您可以添加該服務的另一個實例以進行負載平衡。
一開始你可以使用 rabbitMQ,事情會很順利。但是當複雜性增加並且您有很多端點調用其他服務時,它就會造成混亂。很快,您會發現自己圍繞驅動程序創建了一個包裝器,這樣您就可以減少需要編寫的代碼量。例如,每次您調用另一個服務的端點時,您都必須提供身份驗證令牌。然後你會發現自己需要處理 ack 與 nack,你將爲此創建一個簡單的 API。最終,您將需要處理有害消息——格式錯誤並導致異常的消息。
要處理所有這些工作流,您可以使用 NserviceBus。讓我們討論一個項目結構:
考慮到這種架構,ClientUI 端點將 PlaceOrder 命令發送到 Sales 端點。因此,Sales 端點將使用發佈 / 訂閱模式發佈 OrderPlaced 事件,該事件將由 Billing 端點接收。
NserviceBus 配置:
class Program
{
static async Task Main(string[] args)
{
await CreateHostBuilder(args).RunConsoleAsync();
}
public static IHostBuilder CreateHostBuilder(string[] args)
{
return Host.CreateDefaultBuilder(args)
.UseNServiceBus(context =>
{
var endpointConfiguration = new EndpointConfiguration("Sales");
//configure transport - configure where your message will
//be published/saved
//you can configure it for RabbitMq, Azure Queue, Amazon
//SQS or any other cloud provider
endpointConfiguration.UseTransport<LearningTransport>();
endpointConfiguration.SendFailedMessagesTo("error");
//When a message fails processing
//it will be forwarded here.
endpointConfiguration.AuditProcessedMessagesTo("audit");
//All messages received by an endpoint
//will be forwarded to the audit queue.
return endpointConfiguration;
});
}
}
然後使用 IMessageSession 對象發送消息:
public class HomeController : Controller
{
static int messagesSent;
private readonly ILogger<HomeController> _log;
private readonly IMessageSession _messageSession;
public HomeController(IMessageSession messageSession, ILogger<HomeController> logger)
{
_messageSession = messageSession;
_log = logger;
}
[HttpPost]
public async Task<ActionResult> PlaceOrder()
{
string orderId = Guid.NewGuid().ToString().Substring(0, 8);
var command = new PlaceOrder { OrderId = orderId };
// Send the command
await _messageSession.Send(command)
.ConfigureAwait(false);
_log.LogInformation($"Sending PlaceOrder, OrderId = {orderId}");
dynamic model = new ExpandoObject();
model.OrderId = orderId;
model.MessagesSent = Interlocked.Increment(ref messagesSent);
return View(model);
}
}
最後,添加一個處理程序來接收和處理消息:
public class PlaceOrderHandler :
IHandleMessages<PlaceOrder>
{
static readonly ILog log = LogManager.GetLogger<PlaceOrderHandler>();
static readonly Random random = new Random();
public Task Handle(PlaceOrder message, IMessageHandlerContext context)
{
log.Info($"Received PlaceOrder, OrderId = {message.OrderId}");
return Task.CompletedTask;
}
}
這是 NserviceBus 和 RabbitMQ 的基本實現。
概括
在服務之間通信時避免使用同步協議。使用 RabbitMQ 在服務之間進行通信並在消息從源傳送到目標之前臨時保存它們。使用 NserviceBus 解耦應用程序代碼和消息代理,並管理長時間運行的請求。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/ZKPqZIFpogS1TAXlb4DR2w