從頭開發一個 RPC 是種怎樣的體驗?

【CSDN 編者按】對於開發人員來說,調用遠程服務就像是調用本地服務一樣便捷。尤其是在微服務盛行的今天,瞭解 RPC 的原理過程是十分有必要的。

作者 | Alex Ellis       譯者 | 彎月

出品 | CSDN(ID:CSDNnews)

計算機之間的通信方式多種多樣,其中最常用的一種方法是遠程過程調用(Remote Procedure Call,即 RPC)。該協議允許一臺計算機調用另一個計算機上的程序,就像調用本地程序一樣,並負責所有傳輸和通信。

假設我們需要在一臺計算機上編寫一些數學程序,並且有一個判斷數字是否爲質數的程序或函數。在使用這個函數的時候,我們只需傳遞數字進去,就可以獲得答案。這個函數保存在我們的計算機上。

很多時候,程序保存在本地非常方便調用,而且由於這些程序與我們其餘的代碼在一起,因此調用的時候幾乎不會產生延遲。

但是,在有些情況下,將這些程序保留在本地也不見得是好事。有時,我們需要在擁有大量核心和內存的計算機上運行這些程序,這樣它就可以檢查非常大的數字。但這也不是什麼難事,我們可以將主程序也放到大型計算機上運行,即使其餘的程序可能並沒有這種需求,質數查找函數也可以自由利用計算機上的資源。如果我們想讓其他程序重用質數查找函數,該怎麼辦?我們可以將其轉換成一個庫,然後在各個程序之間共享,但是每一臺運行質數查找庫的計算機,都需要大量的內存資源。

如果我們將質數查找函數單獨放在一臺計算機上,然後在需要檢查數字時與該計算機對話,怎麼樣呢?如此一來,我們就只需提高質數查找函數所在的計算機的性能,而且其他計算機上程序也可以共享這個函數。

這種方式的缺點是更加複雜。計算機可能會出現故障,網絡也有可能出問題,而且我們還需要擔心數據的來回傳遞。如果你只想編寫一個簡單的數學程序,那麼可能無需擔心網絡狀況,也不用考慮如何重新發送丟失的數據包,甚至不用擔心如何查找運行質數查找函數的計算機。如果你的工作是編寫最佳質數查找程序,那麼你可能並不關心如何監聽請求或檢查已關閉的套接字。

這時就可以考慮遠程過程調用。我們可以將計算機間通信的複雜性包裝起來,然後在通信的任意一側建立一個簡單的接口(stub)。對於編寫數學程序的人來說,看上去就像在調用同一臺計算機上的函數;而對於編寫質數查找程序的人來說,看上去就像是自己的函數被調用了。如果我們將中間部分抽象化,那麼兩側都可以專心做好自己的細節,同時仍然可以享受將計算拆分到多臺計算機的優勢。

RPC 調用的主要工作就是處理中間部分。它的一部分必須存在數學程序的計算機上,負責接受並打包參數,然後發送到另一臺計算機。此外,在收到響應後,還需要解析響應,並傳遞回去。而質數查找函數計算機則必須等待請求,解析參數,然後將其傳遞給函數,此外,還需要獲取結果,將其打包,然後再返回結果。這裏的關鍵之處是數學程序和質數查找程序間,以及它們的 stub 之間都有一個清晰的接口。

更多詳細信息,請參見 Andrew D. Birrell 和 Bruce Jay Nelson1 於 1981 年發表的論文《Implementing Remote Procedure Calls》。

從頭編寫 RPC

下面,我們來試試看能不能編寫一個 RPC。

首先,我們來編寫基本的數學程序。爲了簡單起見,我們編寫一個命令行工具,接受輸入,然後檢查是否爲質數。它有一個單獨的方法 is_prime,處理實際的檢查。

// basic_math_program.c
#include <stdio.h>
#include <stdbool.h>
// Basic prime checker. This uses the 6k+-1 optimization
// (see https://en.wikipedia.org/wiki/Primality_test)
bool is_prime(int number) {
  // Check first for 2 or 3
  if (number == 2 || number == 3) {
    return true;
  }
  // Check for 1 or easy modulos
  if (number == 1 || number % 2 == 0 || number % 3 == 0) {
    return false;
  }
  // Now check all the numbers up to sqrt(number)
  int i = 5;
  while (i * i <= number) {
    // If we've found something (or something + 2) that divides it evenly, it's not
    // prime.
    if (number % i == 0 || number % (i + 2) == 0) {
      return false;
    }
    i += 6;
  }
  return true;
}
int main(void) {
  // Prompt the user to enter a number.
  printf("Please enter a number: ");
  // Read the user's number. Assume they're entering a valid number.
  int input_number;
  scanf("%d", &input_number);
  // Check if it's prime
  if (is_prime(input_number)) {
    printf("%d is prime\n", input_number);
  } else {
    printf("%d is not prime\n", input_number);
  }
  return 0;
}

這段代碼有一些潛在的問題,我們沒有處理極端情況。但這裏只是爲了說明,無傷大雅。

目前一切順利。下面,我們將代碼拆分成多個文件,is_prime 可供同一臺計算機上的程序重用。首先,我們爲 is_prime 創建一個單獨的庫:

// is_prime.h
#ifndef IS_PRIME_H
#define IS_PRIME_H
#include <stdbool.h>
bool is_prime(int number);
#endif
// is_prime.c
#include "is_prime.h"
// Basic prime checker. This uses the 6k+-1 optimization
// (see https://en.wikipedia.org/wiki/Primality_test)
bool is_prime(int number) {
  // Check first for 2 or 3
  if (number == 2 || number == 3) {
    return true;
  }
  // Check for 1 or easy modulos
  if (number == 1 || number % 2 == 0 || number % 3 == 0) {
    return false;
  }
  // Now check all the numbers up to sqrt(number)
  int i = 5;
  while (i * i <= number) {
    // If we've found something (or something + 2) that divides it evenly, it's not
    // prime.
    if (number % i == 0 || number % (i + 2) == 0) {
      return false;
    }
    i += 6;
  }
  return true;
}

下面,從主程序中調用:

// basic_math_program_refactored.c
#include <stdio.h>
#include <stdbool.h>
#include "is_prime.h"
int main(void) {
  // Prompt the user to enter a number.
  printf("Please enter a number: ");
  // Read the user's number. Assume they're entering a valid number.
  int input_number;
  scanf("%d", &input_number);
  // Check if it's prime
  if (is_prime(input_number)) {
    printf("%d is prime\n", input_number);
  } else {
    printf("%d is not prime\n", input_number);
  }
  return 0;
}

再試試,運行正常!當然,你也可以加一些測試:

下面,我們需要將這個函數放到其他計算機上。我們需要編寫的功能包括:

我們的示例非常簡單,因爲我們只需要打包併發送一個 int 參數,然後接收一個字節的結果。對於調用程序的庫,我們需要打包數據、創建套接字、連接到主機(暫定 localhost)、發送數據、等待結果、解析,然後返回。調用程序庫的頭文件如下所示:

// client/is_prime_rpc_client.h
#ifndef IS_PRIME_RPC_CLIENT_H
#define IS_PRIME_RPC_CLIENT_H
#include <stdbool.h>
bool is_prime_rpc(int number);
#endif

可能有些讀者已經發現了,實際上這個接口與上面的函數庫一模一樣,但關鍵就在於此!因爲調用程序只需要關注業務邏輯,無需關心其他一切。但實現就稍複雜:

// client/is_prime_rpc_client.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#define SERVERPORT "5005"  // The port the server will be listening on.
#define SERVER "localhost"  // Assume localhost for now
#include "is_prime_rpc_client.h"
// Packs an int. We need to convert it from host order to network order.
int pack(int input) {
  return htons(input);
}
// Gets the IPv4 or IPv6 sockaddr.
void *get_in_addr(struct sockaddr *sa) {
  if (sa->sa_family == AF_INET) {
    return &(((struct sockaddr_in*)sa)->sin_addr);
  } else {
    return &(((struct sockaddr_in6*)sa)->sin6_addr);
  }
}
// Gets a socket to connect with.
int get_socket() {
  int sockfd;
  struct addrinfo hints, *server_info, *p;
  int number_of_bytes;
  memset(&hints, 0, sizeof hints);
  hints.ai_family = AF_UNSPEC;
  hints.ai_socktype = SOCK_STREAM;  // We want to use TCP to ensure it gets there
  int return_value = getaddrinfo(SERVER, SERVERPORT, &hints, &server_info);
  if (return_value != 0) {
    fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(return_value));
    exit(1);
  }
  // We end up with a linked-list of addresses, and we want to connect to the
  // first one we can
  for (p = server_info; p != NULL; p = p->ai_next) {
    // Try to make a socket with this one.
    if ((sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1) {
      // Something went wrong getting this socket, so we can try the next one.
      perror("client: socket");
      continue;
    }
    // Try to connect to that socket.
    if (connect(sockfd, p->ai_addr, p->ai_addrlen) == -1) {
      // If something went wrong connecting to this socket, we can close it and
      // move on to the next one.
      close(sockfd);
      perror("client: connect");
      continue;
    }
    // If we've made it this far, we have a valid socket and can stop iterating
    // through.
    break;
  }
  // If we haven't gotten a valid sockaddr here, that means we can't connect.
  if (p == NULL) {
    fprintf(stderr, "client: failed to connect\n");
    exit(2);
  }
  // Otherwise, we're good.
  return sockfd;
}
// Client side library for the is_prime RPC.
bool is_prime_rpc(int number) {
  // First, we need to pack the data, ensuring that it's sent across the
  // network in the right format.
  int packed_number = pack(number);
  // Now, we can grab a socket we can use to connect see how we can connect
  int sockfd = get_socket();
  // Send just the packed number.
  if (send(sockfd, &packed_number, sizeof packed_number, 0) == -1) {
    perror("send");
    close(sockfd);
    exit(0);
  }
  // Now, wait to receive the answer.
  int buf[1];  // Just receiving a single byte back that represents a boolean.
  int bytes_received = recv(sockfd, &buf, 1, 0);
  if (bytes_received == -1) {
    perror("recv");
    exit(1);
  }
  // Since we just have the one byte, we don't really need to do anything while
  // unpacking it, since one byte in reverse order is still just a byte.
  bool result = buf[0];
  // All done! Close the socket and return the result.
  close(sockfd);
  return result;
}

如前所述,這段代碼需要打包參數、連接到服務器、發送數據、接收數據、解析,並返回結果。我們的示例相對很簡單,因爲我們只需要確保數字的字節順序符合網絡字節順序。

接下來,我們需要在服務器上運行被調用的庫。它需要調用我們前面編寫的 is_prime 庫:

// server/is_prime_rpc_server.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <arpa/inet.h>
#include <sys/wait.h>
#include <signal.h>
#include "is_prime.h"
#define SERVERPORT "5005"  // The port the server will be listening on.
// Gets the IPv4 or IPv6 sockaddr.
void *get_in_addr(struct sockaddr *sa) {
if (sa->sa_family == AF_INET) {
return &(((struct sockaddr_in*)sa)->sin_addr);
  } else {
return &(((struct sockaddr_in6*)sa)->sin6_addr);
  }
}
// Unpacks an int. We need to convert it from network order to our host order.
int unpack(int packed_input) {
return ntohs(packed_input);
}
// Gets a socket to listen with.
int get_and_bind_socket() {
int sockfd;
struct addrinfo hints, *server_info, *p;
int number_of_bytes;
memset(&hints, 0, sizeof hints);
  hints.ai_family = AF_UNSPEC;
  hints.ai_socktype = SOCK_STREAM;  // We want to use TCP to ensure it gets there
  hints.ai_flags = AI_PASSIVE;  // Just use the server's IP.
int return_value = getaddrinfo(NULL, SERVERPORT, &hints, &server_info);
if (return_value != 0) {
fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(return_value));
exit(1);
  }
// We end up with a linked-list of addresses, and we want to connect to the
// first one we can
for (p = server_info; p != NULL; p = p->ai_next) {
// Try to make a socket with this one.
if ((sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1) {
// Something went wrong getting this socket, so we can try the next one.
      perror("server: socket");
continue;
    }
// We want to be able to reuse this, so we can set the socket option.
int yes = 1;
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1) {
      perror("setsockopt");
exit(1);
    }
// Try to bind that socket.
if (bind(sockfd, p->ai_addr, p->ai_addrlen) == -1) {
// If something went wrong binding this socket, we can close it and
// move on to the next one.
      close(sockfd);
      perror("server: bind");
continue;
    }
// If we've made it this far, we have a valid socket and can stop iterating
// through.
break;
  }
// If we haven't gotten a valid sockaddr here, that means we can't connect.
if (p == NULL) {
fprintf(stderr, "server: failed to bind\n");
exit(2);
  }
// Otherwise, we're good.
return sockfd;
}
int main(void) {
int sockfd = get_and_bind_socket();
// We want to listen forever on this socket
if (listen(sockfd, /*backlog=*/1) == -1) {
    perror("listen");
exit(1);
  }
printf("Server waiting for connections.\n");
struct sockaddr their_addr;  // Address information of the client
socklen_t sin_size;
int new_fd;
while(1) {
    sin_size = sizeof their_addr;
    new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &sin_size);
if (new_fd == -1) {
      perror("accept");
continue;
    }
// Once we've accepted an incoming request, we can read from it into a buffer.
int buffer;
int bytes_received = recv(new_fd, &buffer, sizeof buffer, 0);
if (bytes_received == -1) {
      perror("recv");
continue;
    }
// We need to unpack the received data.
int number = unpack(buffer);
printf("Received a request: is %d prime?\n", number);
// Now, we can finally call the is_prime library!
bool number_is_prime = is_prime(number);
printf("Sending response: %s\n", number_is_prime ? "true" : "false");
// Note that we don't have to pack a single byte.
// We can now send it back.
if (send(new_fd, &number_is_prime, sizeof number_is_prime, 0) == -1) {
      perror("send");
    }
    close(new_fd);
  }
}

最後,我們更新一下我們的主函數,使用新的 RPC 庫調用:

// client/basic_math_program_distributed.c
#include <stdio.h>
#include <stdbool.h>
#include "is_prime_rpc_client.h"
int main(void) {
// Prompt the user to enter a number.
printf("Please enter a number: ");
// Read the user's number. Assume they're entering a valid number.
int input_number;
scanf("%d", &input_number);
// Check if it's prime, but now via the RPC library
if (is_prime_rpc(input_number)) {
printf("%d is prime\n", input_number);
  } else {
printf("%d is not prime\n", input_number);
  }
return 0;
}

這個 RPC 實際的運行情況如下:

現在運行服務器,就可以運行客戶端將質數檢查的工作分佈到其他計算機上運行!現在,程序調用 is_prime_rpc 時,所有網絡業務都在後臺進行。我們已經成功分發了計算,客戶端實際上是在遠程調用程序。

示例有待改進的方面

================

本文中的實現只是一個示例,雖然實現了一些功能,但只是一個玩具。真正的框架(例如 gRPC3)要複雜得多。我們的實現需要改進的方面包括:

計算機科學就是要站在巨人的肩膀上,很多庫已經爲我們完成了大量工作。

原文鏈接:https://alexanderell.is/posts/rpc-from-scratch/

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/Lj1S4aRBxvNFgpvfy09j-Q