架構雜談: TiKV 彈性設計及實現

TiKV 作爲首屈一指的用 Rust 語言實現的分佈式 KV 存儲引擎,獲得了業界極大的關注和商業上的成功。網上也有不少 TiKV 架構設計及源碼解析的文章,這篇文章主要探討 TiKV 如何運用 Rust 語言的抽象及併發來實現 TiKV 的彈性,以供後續深入運用 Rust 語言和 TiKV 備忘參考。

一、TiDB 存儲引擎 TiKV

分佈式存儲引擎 TiKV 基於單機存儲引擎 RocksDB,通過 Raft 分佈式共識算法保證數據強一致性,它具有高度分層,彈性擴展,穩定可靠的特性,扛起分佈式數據庫 TiDB 核心基礎責任。

TiKV 使用 Rust 語言來實現,其整體架構如下:

每一個 TiDB 節點實例作爲 TiKV Client 來與 TiKV 節點實例進行交互;

每一個 TiKV 節點實例包含一個存儲實例 Store,可包括多個 Raft Regions,與 PD 集羣節點進行交互;

PD 集羣作爲整個 TiDB/TiKV 數據服務的控制中心,以上帝視角來調度、監控 TiKV 各個節點實例的狀態,動態彈性觸發調整數據分區存儲、查詢、副本遷移等;

Raft Group Regions 中分區 Region,是 TiKV 達成分佈共識,彈性分片的核心;

一個 Raft Group Region 對應一個 Raft 共識分組,由多個分佈在不同節點中的 Region 組成,它只負責指定 Region 內的 K/V 存儲及查詢;

一個 Raft Group Region 中的 Region 誰是 Leader、Follower 由 PD 根據系統狀態動態來決定;

二、何爲 TiKV 彈性設計?

這裏講的 TiKV 彈性主要是指 TiKV 的動態伸縮能力,它可以在 PD 調度策略安排下,動態支持不同分區的數據寫入、讀取、遷移,以達到數據安全、讀寫流量均衡、高性能的目的。

分佈式存儲系統作爲一個複雜的系統,涉及到網絡、存儲、安全基礎設施,其動態伸縮的彈性設計其實是整個系統的核心,直接決定系統的優劣。

TiKV 彈性涉及的主要方面包括:

1、TiKV 節點實例啓動、退出的彈性支持

每一個 TiKV 節點首次啓動時,須與 PD 交互註冊分配到屬於自己的唯一 ID,並記錄到本地存儲中;

啓動成功後,不間斷向 PD 彙報當前節點的狀態,PD 根據收集到的所有 TiKV 節點狀態及整個系統的狀態和配置後,通知 TiKV 節點加入或退出或 Split 或 Merge 某些分區 Region;

2、TiKV 節點中多個不同分區 Region 的彈性支持

2.1. 多個 Raft 共識分組支持

TiKV 獨創性的實現了一個 TiKV 節點可支持多個 Raft 共識分組;

這樣極大增加了系統實現的複雜性,其目的在於可將寫入和讀取流量均衡的分配到不同 TiKV 節點,增加系統的彈性,這也成爲 TiKV 架構設計及實現的突出亮點;

這樣設計的原因,跟 Raft 共識算法保證強一致性的實現邏輯及性能需求相關,因爲 Raft 分佈式共識算法本身往往離不開 CAP 理論的約束;

在越來越多的大量高併發的數據讀取、寫入需求下,分區分片處理數據成爲一種必然的選擇,於是 TiKV 選擇一個節點支持多個 Raft 共識分組,一個共識分組的讀取、寫入流量由該共識分組的 Leader 節點來處理,其他 Follower 角色節點往往只負責達成共識、存儲數據副本、時刻準備成爲 Leader;

2.2. 動態參與不同 Region 的數據存儲

一個 Region 對應 Key 值在指定範圍 [a, b) 的數據存儲,一個 Region A 可分割成兩個 Region A1 [a, c)和 Region A2 [c, b),兩個 Region 可合併成一個新的 Region;

從 TiKV 節點的角度來看,它可以不同角色參與到不同 Region 的數據存儲,動態支持加入、退出、分割、合併 Region,以保證數據及流量的動態調整;

從 PD 的角度來看,由它來爲不同 Region 的分配 ID 標識、記錄狀態包括其 Range、觸發不同 TiKV 節點以不同角色加入、退出、分割、合併某個 Region;

TiKV Client 則依據 PD 提供的 Region 狀態,根據自身業務以 Region 爲主自主決定與不同 TiKV 節點進行交互;

  1. 不同查詢算子的彈性支持

由於 TiKV 節點離數據最近,TiKV 可高性能低流量的方式支持不同的查詢子算法,這樣 TiDB 可動態分配不同算子給不同 TiKV 節點,以無狀態聚合者的模式來實現 SQL 查詢需求,增加整個系統查詢需求的彈性支撐;

  1. 原生 KV 和事務型 KV 操作支持

TiKV 同時支持原生 KV 操作和基於 MVCC 算法的事務型 KV 操作,以支持多種使用場景的 KV 操作需求;

原生 KV 操作交互流程如下:

事務型 KV 操作交互流程如下:

三、TiKV 分層實現

TiKV 的實現可分爲從 API 接口層 Server/RaftClient、事務支撐層 Storage、共識 / 算法層 RaftBatchSystem/RaftGroup Regions、數據存儲層 RaftEngine/KvEngine,如下圖所示:

從 IO 及數據流轉角度看,API 接口層和數據存儲層,涉及大量 Socket IO 和 File IO,需要異步讀寫支持;

事務支撐層、共識 / 算法層,涉及大量併發並行的計算任務,需要異步併發任務池支撐;

四、TiKV 中應用 Rust 語言抽象

Rust 語言作爲新一代系統編程語言,提供強大的內存安全、高性能的框架支持;

一個好的系統架構設計往往會通過抽象分層來解藕聚合架構的各個部分,從而讓整個系統有機的整合在一起;

4.1.Trait 接口定義及類型實現

在 TiKV 內部實現中大量運用先定義 Trait 接口,再在不同泛型對象結構實現不同 T 的 rait 接口,比如:

ServerTransport 實現 Transport Trait 用來向不同 TiKV 實例發送 RaftMsg;

RaftRouter 實現 RaftStoreRouter 和 LocalReadRouter Trait 分別用來發送 StoreMsg/RaftMsg 修寫請求和讀請求對應的 Message;

RaftKv 實現 Engine Trait 用來實現高層次數據的異步修寫、獲取快照能力;

BatchSystem 中抽象出 Fsm、FsmScheduler、PollHandler Trait,

其中 ControlFsm、StoreFsm、PeerFsm、ApplyFsm 實現 Fsm Trait 來表示它們可接收相應類型的 Message 並進行處理;

RaftPoller 和 ApplyPoller 實現 PollHandler Trait 來批量處理 StoreMsg、RaftMsg 和 ApplyTask,以便不同 region 的寫操作一次性寫入;

NormalScheduler 實現 FsmScheduler Trait 來表示收到指定 Fsm 相關 Message 後,觸發 PollHandler 實現者來處理指定 Fsm 收到的 Msg;

4.1.1.KvEngine 和 RaftEngine 引擎 Trait 定義示例:

/// A TiKV key-value store
pub trait KvEngine:
    Peekable
    + SyncMutable
    + Iterable
    + WriteBatchExt
    + DBOptionsExt
    + CFNamesExt
    + CFOptionsExt
    + ImportExt
    + SstExt
    + TablePropertiesExt
    + CompactExt
    + RangePropertiesExt
    + MvccPropertiesExt
    + TtlPropertiesExt
    + PerfContextExt
    + MiscExt
    + Send
    + Sync
    + Clone
    + Debug
    + Unpin
    + 'static
{
    /// A consistent read-only snapshot of the database
    type Snapshot: Snapshot;
    /// Create a snapshot
    fn snapshot(&self) -> Self::Snapshot;
    /// Syncs any writes to disk
    fn sync(&self) -> Result<()>;
    /// Flush metrics to prometheus
    /// `instance` is the label of the metric to flush.
    fn flush_metrics(&self, _instance: &str) {}
    /// Reset internal statistics
    fn reset_statistics(&self) {}
    /// Cast to a concrete engine type
    /// This only exists as a temporary hack during refactor.
    /// It cannot be used forever.
    fn bad_downcast<T: 'static>(&self) -> &T;
}

KvEngine Trait 基於多個 super trait 來描述一個基礎 KvEngine 需要提供的能力,其 Snapshot 作爲關聯類型來支持引擎數據的讀取;

pub trait RaftEngine: Clone + Sync + Send + 'static {
    type LogBatch: RaftLogBatch;
    fn log_batch(&self, capacity: usize) -> Self::LogBatch;
    /// Synchronize the Raft engine.
    fn sync(&self) -> Result<()>;
    fn get_raft_state(&self, raft_group_id: u64)
        -> Result<Option<RaftLocalState>>;
    fn get_entry(&self, raft_group_id: u64, index: u64)
        -> Result<Option<Entry>>;
    /// Append some log entries and return written bytes.
    fn append(&self, raft_group_id: u64, entries: Vec<Entry>)
        -> Result<usize>;
    fn put_raft_state(&self, raft_group_id: u64
        , state: &RaftLocalState) -> Result<()>;
    // 省略部分其他方法
}
pub trait RaftLogBatch: Send {
    fn append(&mut self, raft_group_id: u64
        , entries: Vec<Entry>) -> Result<()>;
    /// Remove Raft logs in [`from`, `to`) 
    /// which will be overwritten later.
    fn cut_logs(&mut self, raft_group_id: u64
        , from: u64, to: u64);
    fn put_raft_state(&mut self, raft_group_id: u64
        , state: &RaftLocalState) -> Result<()>;
    fn is_empty(&self) -> bool;
}

RaftEngine trait 用來描述不同 raft 分組存儲 raft log 的引擎,其 LogBatch 作爲關聯類型來支持 RaftEngine;

#[derive(Clone, Debug)]
pub struct Engines<K, R> {
    pub kv: K,
    pub raft: R,
}
impl<K: KvEngine, R: RaftEngine> Engines<K, R> {
    pub fn new(kv_engine: K, raft_engine: R) -> Self {
        Engines {
            kv: kv_engine,
            raft: raft_engine,
        }
    }
    // 省略部分其他方法
}

由於每一次來自 Client 端的寫入都需要先以 Log 的方式寫入 RaftEngine,待可 apply 後才寫入 KvEngine,所以 TiKV 抽象出一個包含 kv 和 raft 的 Engines<K, R> 結構來支持數據的讀取與寫入;

4.1.2.KvEngine Trait 實現

//基於RocksDB來提供RocksEngine以實現KvEngine trait;
pub struct RocksEngine {
    db: Arc<DB>,
    shared_block_cache: bool,
}
impl KvEngine for RocksEngine {
    type Snapshot = RocksSnapshot;
    fn snapshot(&self) -> RocksSnapshot {
        RocksSnapshot::new(self.db.clone())
    }
    // 省略部分其他方法
}

4.1.3.RaftEngine Trait 實現

TiKV 支持兩種方式來存儲 RaftLog,其中一個基於 RocksDB 的 RocksEngine 來實現,另一個由額外的 raft-engine 提供的 RaftLogEngine 來實現;

由啓動 TiKV 時根據 config.raft_engine.enable 配置項來決定使用哪種實現;

RaftLogEngine 實現 RaftEngine Trait:

use raft_engine::{EntryExt, Error as RaftEngineError,
    LogBatch, RaftLogEngine as RawRaftEngine};
pub struct RaftLogEngine(RawRaftEngine<Entry, EntryExtTyped>);
impl RaftEngine for RaftLogEngine {
    type LogBatch = RaftLogBatch;
    fn log_batch(&self, _capacity: usize) -> Self::LogBatch {
        RaftLogBatch::default()
    }
    // 省略部分其他方法
}

RocksEngine 實現 RaftEngine Trait:

use crate::{RocksEngine, RocksWriteBatch};
use engine_traits::{Error, RaftEngine, RaftLogBatch, Result};
impl RaftEngine for RocksEngine {
    type LogBatch = RocksWriteBatch;
    fn log_batch(&self, capacity: usize) -> Self::LogBatch {
        RocksWriteBatch::with_capacity(self.as_inner().clone()
          , capacity)
    }
    fn sync(&self) -> Result<()> {
        self.sync_wal()
    }
    // 省略部分其他方法
}

4.2. 使用 ThreadPool 調度執行不同 Task

自定義實現了一個 ThreadPool/FuturePool 來並行調度執行 Task 或 Future,充分利用多核併發的能力,並與 Rust 語言提供的異步 Future/Channel 完美整合;

/// Scheduler provides interface to schedule task
/// to underlying workers.
pub struct Scheduler<T: Display + Send> {
    counter: Arc<AtomicUsize>,
    sender: UnboundedSender<Msg<T>>,
    pending_capacity: usize,
    metrics_pending_task_count: IntGauge,
}
enum Msg<T: Display + Send> {
    Task(T),
    Timeout,
}
pub trait Runnable: Send {
    type Task: Display + Send + 'static;
    /// Runs a task.
    fn run(&mut self, _: Self::Task) {
        unimplemented!()
    }
    fn on_tick(&mut self) {}
    fn shutdown(&mut self) {}
}
/// A worker that can schedule time consuming tasks.
#[derive(Clone)]
pub struct Worker {
    pool: Arc<Mutex<Option<ThreadPool<
      yatp::task::future::TaskCell>>>>,
    remote: Remote<yatp::task::future::TaskCell>,
    pending_capacity: usize,
    counter: Arc<AtomicUsize>,
    stop: Arc<AtomicBool>,
    thread_count: usize,
}
pub struct Builder<S: Into<String>> {
    name: S,
    thread_count: usize,
    pending_capacity: usize,
}
impl<S: Into<String>> Builder<S> {
    pub fn thread_count(mut self, thread_count: usize)->Self{
        self.thread_count = thread_count;
        self
    }
    pub fn create(self) -> Worker {
        let pool = YatpPoolBuilder::new(
           DefaultTicker::default())
           .name_prefix(self.name)
           .thread_count(self.thread_count, self.thread_count)
           .build_single_level_pool();
        let remote = pool.remote().clone();
        let pool = Arc::new(Mutex::new(Some(pool)));
        Worker {
            remote,
            stop: Arc::new(AtomicBool::new(false)),
            pool,
            counter: Arc::new(AtomicUsize::new(0)),
            pending_capacity: self.pending_capacity,
            thread_count: self.thread_count,
        }
    }
}
impl Worker {
    pub fn new<S: Into<String>>(name: S) -> Worker {
        Builder::new(name).create()
    }
    pub fn start<R: Runnable + 'static, S: Into<String>>(
        &self,
        name: S,
        runner: R,
    ) -> Scheduler<R::Task> {
        let (tx, rx) = unbounded();
        self.start_impl(runner, rx
            , metrics_pending_task_count.clone());
        Scheduler::new(
            tx,
            self.counter.clone(),
            self.pending_capacity,
            metrics_pending_task_count,
        )
    }
    fn start_impl<R: Runnable + 'static>(
        &self,
        runner: R,
        mut receiver: UnboundedReceiver<Msg<R::Task>>,
        metrics_pending_task_count: IntGauge,
    ) {
        let counter = self.counter.clone();
        self.remote.spawn(async move {
            let mut handle = RunnableWrapper {inner: runner};
            while let Some(msg) = receiver.next().await {
                match msg {
                    Msg::Task(task) => {
                      handle.inner.run(task);
                      counter.fetch_sub(1, Ordering::SeqCst);
                      metrics_pending_task_count.dec();
                    }
                    Msg::Timeout => (),
                }
            }
        });
    }
    // 省略部分其他方法
}
impl<T: Display + Send> Scheduler<T> {
    /// Schedules a task to run.
    ///
    /// If the worker is stopped or number pending tasks
    /// exceeds capacity, an error will return.
    pub fn schedule(&self, task: T) -> Result<()
        , ScheduleError<T>> {
        debug!("scheduling task {}", task);
        if self.counter.load(Ordering::Acquire) >= 
            self.pending_capacity {
            return Err(ScheduleError::Full(task));
        }
        self.counter.fetch_add(1, Ordering::SeqCst);
        self.metrics_pending_task_count.inc();
        if let Err(e) = self.sender.unbounded_send(
            Msg::Task(task)) {
            if let Msg::Task(t) = e.into_inner() {
                self.counter.fetch_sub(1, Ordering::SeqCst);
                self.metrics_pending_task_count.dec();
                return Err(ScheduleError::Stopped(t));
            }
        }
        Ok(())
    }
    // 省略部分其他方法
}

基於上面 Worker、Scheduler 自定義不同的 CleanupRunner、CleanupTask、CompactRunner、CompactTask、PdRunner、PdTask、RegionRunner、RegionTask、SplitCheckRunner、SplitCheckTask 等則可實現不同後臺任務的併發執行;

五、總結

上面簡要介紹了 TiKV 彈性設計涉及到的主要內容,並從其實現代碼中初步理解如何使用 Trait 抽象和實現來達到分層抽象,如何使用 ThreadPool、Channel、Worker、Task、Future 來實現子任務併發執行;

由於整個 TiKV 實現涉及到 Raft 算法、MVCC、BatchSystem、Region 處理等,其具體實現比較複雜,有興趣可自行閱讀代碼或相關文檔,期望該文能對理解 TiKV 及其實現有所幫助。

參考:

Deep Dive TiKV

https://tikv.org/deep-dive/key-value-engine/introduction/

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/VOCyGDcAz0iyvlbcbDHO-g