4 個維度搞懂 Nacos 註冊中心

大家好呀,我是樓仔。

現如今市面上註冊中心的輪子很多,我實際使用過的就有三款:Eureka、Gsched、Nacos,由於當前參與 Nacos 集羣的維護和開發工作,期間也參與了 Nacos 社區的一些開發和 Bug Fix 工作,過程中對 Nacos 原理有了一定的積累,今天給大家分享一下 Nacos 動態服務發現的原理。

不 BB,上文章目錄:

01 什麼是動態服務發現?

服務發現是指使用一個註冊中心來記錄分佈式系統中的全部服務的信息,以便其他服務能夠快速的找到這些已註冊的服務。

在單體應用中,DNS+Nginx 可以滿足服務發現的要求,此時服務的 IP 列表配置在 nginx 上。在微服務架構中,由於服務粒度變的更細,服務的上下線更加頻繁,我們需要一款註冊中心來動態感知服務的上下線,並且推送 IP 列表變化給服務消費者,架構如下圖。

02 Nacos 實現動態服務發現的原理

Nacos 實現動態服務發現的核心原理如下圖,我們接下來的內容將圍繞這個圖來進行。

2.1 通訊協議

整個服務註冊與發現過程,都離不開通訊協議,在 1.x 的 Nacos 版本中服務端只支持 http 協議,後來爲了提升性能在 2.x 版本引入了谷歌的 grpc,grpc 是一款長連接協議,極大的減少了 http 請求頻繁的連接創建和銷燬過程,能大幅度提升性能,節約資源。

據官方測試,Nacos 服務端 grpc 版本,相比 http 版本的性能提升了 9 倍以上。

2.2 Nacos 服務註冊

簡單來講,服務註冊的目的就是客戶端將自己的 ip 端口等信息上報給 Nacos 服務端,過程如下:

Nacos SDK 註冊失敗時的自動補償機制時序圖。

相關源碼如下:

@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName,
            instance);
        //添加redo日誌
    redoService.cacheInstanceForRedo(serviceName, groupName, instance);

    doRegisterService(serviceName, groupName, instance);
}
public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
   //向服務端發起註冊
    InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
            NamingRemoteConstants.REGISTER_INSTANCE, instance);
    requestToServer(request, Response.class);
    //標記註冊成功
    redoService.instanceRegistered(serviceName, groupName);
}

執行補償定時任務 RedoScheduledTask。

@Override
public void run() {
    if (!redoService.isConnected()) {
        LogUtils.NAMING_LOGGER.warn("Grpc Connection is disconnect, skip current redo task");
        return;
    }
    try {
        redoForInstances();
        redoForSubscribes();
    } catch (Exception e) {
        LogUtils.NAMING_LOGGER.warn("Redo task run with unexpected exception: ", e);
    }
}
  private void redoForInstances() {
    for (InstanceRedoData each : redoService.findInstanceRedoData()) {
        try {
            redoForInstance(each);
        } catch (NacosException e) {
            LogUtils.NAMING_LOGGER.error("Redo instance operation {} for {}@@{} failed. ", each.getRedoType(),
                    each.getGroupName(), each.getServiceName(), e);
        }
    }
}

2.3 Nacos 心跳機制

目前主流的註冊中心,比如 Consul、Eureka、Zk 包括我們公司自研的 Gsched,都是通過心跳機制來感知服務的下線。Nacos 也是通過心跳機制來實現的。

Nacos 目前 SDK 維護了兩個分支的版本(1.x、2.x),這兩個版本心跳機制的實現不一樣。其中 1.x 版本的 SDK 通過 http 協議來定時向服務端發送心跳維持自己的健康狀態,2.x 版本的 SDK 則通過 grpc 自身的心跳機制來保活,當 Nacos 服務端接受不到服務實例的心跳,會認爲實例下線。如下圖:

grpc 監測到連接斷開事件,發送 ClientDisconnectEvent。

public class ConnectionBasedClientManager extends ClientConnectionEventListener implements ClientManager {
  //連接斷開,發送連接斷開事件
public boolean clientDisconnected(String clientId) {
    Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);
    ConnectionBasedClient client = clients.remove(clientId);
    if (null == client) {
        return true;
    }
    client.release();
    NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
    return true;
}
}

移除客戶端註冊的服務實例

public class ClientServiceIndexesManager extends SmartSubscriber {

  @Override
    public void onEvent(Event event) {
    //接收失去連接事件
        if (event instanceof ClientEvent.ClientDisconnectEvent) {
            handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);
        } else if (event instanceof ClientOperationEvent) {
            handleClientOperation((ClientOperationEvent) event);
        }
    }
    private void handleClientDisconnect(ClientEvent.ClientDisconnectEvent event) {
        Client client = event.getClient();
        for (Service each : client.getAllSubscribeService()) {
            removeSubscriberIndexes(each, client.getClientId());
        }
        //移除客戶端註冊的服務實例
        for (Service each : client.getAllPublishedService()) {
            removePublisherIndexes(each, client.getClientId());
        }
    }
    
    //移除客戶端註冊的服務實例
    private void removePublisherIndexes(Service service, String clientId) {
        if (!publisherIndexes.containsKey(service)) {
            return;
        }
        publisherIndexes.get(service).remove(clientId);
        NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
    }
}

2.4 Nacos 服務訂閱

當一個服務發生上下線,Nacos 如何知道要推送給哪些客戶端?

Nacos SDK 提供了訂閱和取消訂閱方法,當客戶端向服務端發起訂閱請求,服務端會記錄發起調用的客戶端爲該服務的訂閱者,同時將服務的最新實例列表返回。當客戶端發起了取消訂閱,服務端就會從該服務的訂閱者列表中把當前客戶端移除。

當客戶端發起訂閱時,服務端除了會同步返回最新的服務實例列表,還會異步的通過 grpc 推送給該訂閱者最新的服務實例列表,這樣做的目的是爲了異步更新客戶端本地緩存的服務數據。

當客戶端訂閱的服務上下線,該服務所有的訂閱者會立刻收到最新的服務列表並且將服務最新的實例數據更新到內存。

我們也看一下相關源碼,服務端接收到訂閱數據,首先保存到內存中。

@Override
public void subscribeService(Service service, Subscriber subscriber, String clientId) {
    Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);
    Client client = clientManager.getClient(clientId);
    //校驗長連接是否正常
    if (!clientIsLegal(client, clientId)) {
        return;
    }
    //保存訂閱數據
    client.addServiceSubscriber(singleton, subscriber);
    client.setLastUpdatedTime();
    //發送訂閱事件
    NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId));
}

    private void handleClientOperation(ClientOperationEvent event) {
    Service service = event.getService();
    String clientId = event.getClientId();
    if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
        addPublisherIndexes(service, clientId);
    } else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
        removePublisherIndexes(service, clientId);
    } else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
    //處理訂閱操作
        addSubscriberIndexes(service, clientId);
    } else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
        removeSubscriberIndexes(service, clientId);
    }
}

然後發佈訂閱事件。

private void addSubscriberIndexes(Service service, String clientId) {
    //保存訂閱數據
    subscriberIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
    // Fix #5404, Only first time add need notify event.
    if (subscriberIndexes.get(service).add(clientId)) {
    //發佈訂閱事件
        NotifyCenter.publishEvent(new ServiceEvent.ServiceSubscribedEvent(service, clientId));
    }
}

服務端自己消費訂閱事件,並且推送給訂閱的客戶端最新的服務實例數據。

@Override
public void onEvent(Event event) {
    if (!upgradeJudgement.isUseGrpcFeatures()) {
        return;
    }
    if (event instanceof ServiceEvent.ServiceChangedEvent) {
        // If service changed, push to all subscribers.
        ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
        Service service = serviceChangedEvent.getService();
        delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
    } else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {
        // If service is subscribed by one client, only push this client.
        ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event;
        Service service = subscribedEvent.getService();
        delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(),
                subscribedEvent.getClientId()));
    }
}

2.5 Nacos 推送

推送方式

前面說了服務的註冊和訂閱都會發生推送(服務端 -> 客戶端),那推送到底是如何實現的呢?

在早期的 Nacos 版本,當服務實例變化,服務端會通過 udp 協議將最新的數據發送給客戶端,後來發現 udp 推送有一定的丟包率,於是新版本的 Nacos 支持了 grpc 推送。Nacos 服務端會自動判斷客戶端的版本來選擇哪種方式來進行推送,如果你使用 1.4.2 以前的 SDK 進行註冊,那 Nacos 服務端會使用 udp 協議來進行推送,反之則使用 grpc。

推送失敗重試

當發送推送時,客戶端可能正在重啓,或者連接不穩定導致推送失敗,這個時候 Nacos 會進行重試。Nacos 將每個推送都封裝成一個任務對象,放入到隊列中,再開啓一個線程不停的從隊列取出任務執行,執行之前會先刪除該任務,如果執行失敗則將任務重新添加到隊列,該線程會記錄任務執行的時間,如果超過 1 秒,則會記錄到日誌。

推送部分源碼

添加推送任務到執行隊列中。

private static class PushDelayTaskProcessor implements NacosTaskProcessor {

    private final PushDelayTaskExecuteEngine executeEngine;

    public PushDelayTaskProcessor(PushDelayTaskExecuteEngine executeEngine) {
        this.executeEngine = executeEngine;
    }

    @Override
    public boolean process(NacosTask task) {
        PushDelayTask pushDelayTask = (PushDelayTask) task;
        Service service = pushDelayTask.getService();
        NamingExecuteTaskDispatcher.getInstance()
                .dispatchAndExecuteTask(service, new PushExecuteTask(service, executeEngine, pushDelayTask));
        return true;
    }
}

推送任務 PushExecuteTask 的執行。

public class PushExecuteTask extends AbstractExecuteTask {

//..省略

@Override
public void run() {
    try {
        //封裝要推送的服務實例數據
        PushDataWrapper wrapper = generatePushData();
        ClientManager clientManager = delayTaskEngine.getClientManager();
        //如果是服務上下線導致的推送,獲取所有訂閱者
        //如果是訂閱導致的推送,獲取訂閱者
        for (String each : getTargetClientIds()) {
            Client client = clientManager.getClient(each);
            if (null == client) {
                // means this client has disconnect
                continue;
            }
            Subscriber subscriber = clientManager.getClient(each).getSubscriber(service);
            //推送給訂閱者
            delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper,
                    new NamingPushCallback(each, subscriber, wrapper.getOriginalData(), delayTask.isPushToAll()));
        }
    } catch (Exception e) {
        Loggers.PUSH.error("Push task for service" + service.getGroupedServiceName() + " execute failed ", e);
        //當推送發生異常,重新將推送任務放入執行隊列
        delayTaskEngine.addTask(service, new PushDelayTask(service, 1000L));
    }
}

  //如果是服務上下線導致的推送,獲取所有訂閱者
        //如果是訂閱導致的推送,獲取訂閱者
    private Collection<String> getTargetClientIds() {
    return delayTask.isPushToAll() ? delayTaskEngine.getIndexesManager().getAllClientsSubscribeService(service)
            : delayTask.getTargetClients();
}

執行推送任務線程 InnerWorker 的執行。

/**
 * Inner execute worker.
 */
private class InnerWorker extends Thread {

    InnerWorker(String name) {
        setDaemon(false);
        setName(name);
    }

    @Override
    public void run() {
        while (!closed.get()) {
            try {
            //從隊列中取出任務PushExecuteTask 
                Runnable task = queue.take();
                long begin = System.currentTimeMillis();
                //執行PushExecuteTask 
                task.run();
                long duration = System.currentTimeMillis() - begin;
                if (duration > 1000L) {
                    log.warn("task {} takes {}ms", task, duration);
                }
            } catch (Throwable e) {
                log.error("[TASK-FAILED] " + e.toString(), e);
            }
        }
    }
}

2.6 Nacos SDK 查詢服務實例

服務消費者首先需要調用 Nacos SDK 的接口來獲取最新的服務實例,然後才能從獲取到的實例列表中以加權輪詢的方式選擇出一個實例(包含 ip,port 等信息),最後再發起調用。

前面已經提到 Nacos 服務發生上下線、訂閱的時候都會推送最新的服務實例列表到當客戶端,客戶端再更新本地內存中的緩衝數據,所以調用 Nacos SDK 提供的查詢實例列表的接口時,不會直接請求服務端獲取數據,而是會優先使用內存中的服務數據,只有內存中查不到的情況下才會發起訂閱請求服務端數據。

Nacos SDK 內存中的數據除了接受來自服務端的推送更新之外,自己本地也會有一個定時任務定時去獲取服務端數據來進行兜底。Nacos SDK 在查詢的時候也了容災機制,即從磁盤獲取服務數據,而這個磁盤的數據其實也是來自於內存,有一個定時任務定時從內存緩存中獲取然後加載到磁盤。Nacos SDK 的容災機制默認關閉,可通過設置環境變量 failover-mode=true 來開啓。

架構圖

用戶查詢流程

查詢服務實例部分源碼

private final ConcurrentMap<String, ServiceInfo> serviceInfoMap;
 @Override
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,
        boolean subscribe) throws NacosException {
    ServiceInfo serviceInfo;
    String clusterString = StringUtils.join(clusters, ",");
    //這裏默認傳過來是true
    if (subscribe) {
    //從本地內存獲取服務數據,如果獲取不到則從磁盤獲取
        serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
        if (null == serviceInfo || !clientProxy.isSubscribed(serviceName, groupName, clusterString)) {
      //如果從本地獲取不到數據,則調用訂閱方法
            serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
        }
    } else {
     //適用於不走訂閱,直接從服務端獲取數據的情況
        serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
    }
    List<Instance> list;
    if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
        return new ArrayList<Instance>();
    }
    return list;
}
}
  //從本地內存獲取服務數據,如果開啓了故障轉移則直接從磁盤獲取,因爲當服務端掛了,本地啓動時內存中也沒有數據
public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) {
    NAMING_LOGGER.debug("failover-mode: {}", failoverReactor.isFailoverSwitch());
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    String key = ServiceInfo.getKey(groupedServiceName, clusters);
    //故障轉移則直接從磁盤獲取
    if (failoverReactor.isFailoverSwitch()) {
        return failoverReactor.getService(key);
    }
    //返回內存中數據
    return serviceInfoMap.get(key);
}
  1. 結語 =====

本篇文章向大家介紹 Nacos 服務發現的基本概念和核心能力以及實現的原理,旨在讓大家對 Nacos 的服務註冊與發現功能有更多的瞭解,做到心中有數。

這篇文章原作者是我好友,小米大佬胡俊,如果對 Nacos 開源感興趣的同學,也可以和我聯繫。

最後,把樓仔的座右銘送給你:我從清晨走過,也擁抱夜晚的星辰,人生沒有捷徑,你我皆平凡,你好,陌生人,一起共勉。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/w9HmJ9mrGh5vX15ODbYS6Q