leveldb 高性能讀寫接口

leveldb 讀寫接口

leveldb 中分爲兩類接口,一類是 Mutation,一類是 ReadOnly。

其中 Write 接口用來批量 Put 或 Delete 操作。

1.WriteBatch

WriteBatch 用來批量 Put 或 Delete 操作,WriteBatch 的內部實現是通過 WriteBatchInternal,該類提供了一堆靜態函數。

class LEVELDB_EXPORT WriteBatch {
 public:
class LEVELDB_EXPORT Handler {
   public:
    virtual ~Handler();
    virtual void Put(const Slice& key, const Slice& value) = 0;
    virtual void Delete(const Slice& key) = 0;
  };

// Store the mapping "key->value" in the database.
void Put(const Slice& key, const Slice& value);

// If the database contains a mapping for "key", erase it.  Else do nothing.
void Delete(const Slice& key);

}

put 與 delete 會將 header 與 k、v 壓縮到std::string rep_;中,具體的格式如下:

其中 key 與 value 會先寫入各自的 length(varint32 編碼),再寫入 k 或 v 的實際內容。

  1. 高性能 Write

Put 與 Delet 接口底層實現仍舊是 Write,不同之處在於 Put 與 Delete 會構造對應類型的 WriteBatch,然後通過 Write 接口執行寫。

Write 在寫入時爲了保證 LSM-Tree 不被破壞,leveldb 保證寫入的 sequence number 有序,支持併發寫入。sync 參數默認是 false,提供了高性能寫的能力,即批量寫異步化。

如果併發寫入直接使用同步的方式,對性能影響比較大,在 leveldb 中採用了一種合併隊列方式,即同一時刻存在多個 writer,但只有一個 writer 在執行寫 (這個寫的數據是所有 writer 往隊列中放的 batch,會被合併到一塊)。簡單來說就是:每一輪選舉一個寫線程執行隊列中所有(相同 sync 類型) 的寫,隨後喚醒繼續。

具體來說:

Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Writer w(&mutex_);
  w.batch = updates;
  w.sync = options.sync;
  w.done = false;

MutexLock l(&mutex_);
  writers_.push_back(&w);
while (!w.done && &w != writers_.front()) {
    w.cv.Wait();
  }
if (w.done) {
    return w.status;
  }

// May temporarily unlock and wait.
  Status status = MakeRoomForWrite(updates == nullptr);
uint64_t last_sequence = versions_->LastSequence();
  Writer* last_writer = &w;
if (status.ok() && updates != nullptr) {  // nullptr batch is for compactions
    WriteBatch* write_batch = BuildBatchGroup(&last_writer);
    WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
    last_sequence += WriteBatchInternal::Count(write_batch);

    // Add to log and apply to memtable.  We can release the lock
    // during this phase since &w is currently responsible for logging
    // and protects against concurrent loggers and concurrent writes
    // into mem_.
    {
      mutex_.Unlock();
      status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
      bool sync_error = false;
      if (status.ok() && options.sync) {
        status = logfile_->Sync();
        if (!status.ok()) {
          sync_error = true;
        }
      }
      if (status.ok()) {
        status = WriteBatchInternal::InsertInto(write_batch, mem_);
      }
      mutex_.Lock();
      if (sync_error) {
        // The state of the log file is indeterminate: the log record we
        // just added may or may not show up when the DB is re-opened.
        // So we force the DB into a mode where all future writes fail.
        RecordBackgroundError(status);
      }
    }
    if (write_batch == tmp_batch_) tmp_batch_->Clear();

    versions_->SetLastSequence(last_sequence);
  }

while (true) {
    Writer* ready = writers_.front();
    writers_.pop_front();
    if (ready != &w) {
      ready->status = status;
      ready->done = true;
      ready->cv.Signal();
    }
    if (ready == last_writer) break;
  }

// Notify new head of write queue
if (!writers_.empty()) {
    writers_.front()->cv.Signal();
  }

return status;
}

3.Readonly

readonly 接口包含了幾個:Get、GetSnapshot。

3.1 GetSnapshot

const Snapshot* DBImpl::GetSnapshot() {
  MutexLock l(&mutex_);
  return snapshots_.New(versions_->LastSequence());
}

使用方法爲:

leveldb::ReadOptions options;
options.snapshot = db->GetSnapshot();
leveldb::Iterator* iter = db->NewIterator(options);
delete iter;
db->ReleaseSnapshot(options.snapshot);

Snapshot 作爲一個基類是空的,其子類 SnapshotImpl 實現了一個雙向鏈表的快照 (SequenceNumber)。

class LEVELDB_EXPORT Snapshot {
 protected:
  virtual ~Snapshot();
};

所以說快照可以理解爲之前提到的 SequenceNumber。

class SnapshotImpl : public Snapshot {
 public:
  SnapshotImpl* prev_;
  SnapshotImpl* next_;

  const SequenceNumber sequence_number_;
};

SnapshotImpl 更像是一個雙向鏈表節點,包含了節點信息,那麼維護快照的結構就叫做 SnapshotList,實現了鏈表的維護,比較典型的接口爲 New、Delete。

而這個雙向鏈表的實現是通過 dummy 節點實現 (SnapshotImpl head_;)。

對於 New 來說,就是根據 sequence_number 創建一個快照節點,並加入到快照鏈表中。

  SnapshotImpl* New(SequenceNumber sequence_number) {
    assert(empty() || newest()->sequence_number_ <= sequence_number);

    SnapshotImpl* snapshot = new SnapshotImpl(sequence_number);
    snapshot->next_ = &head_;
    snapshot->prev_ = head_.prev_;
    snapshot->prev_->next_ = snapshot;
    snapshot->next_->prev_ = snapshot;
    return snapshot;
  }

Delete 就是從快照鏈表中刪除一個快照節點。

所以回到開頭 GetSnapshot 其實就是通過根據 VersionSet 得到最近的 sequence 構造一個 snapshot 節點放入快照鏈中。

const Snapshot* DBImpl::GetSnapshot() {
  MutexLock l(&mutex_);
  return snapshots_.New(versions_->LastSequence());
}

ReleaseSnapshot 就是刪除。

3.2 Get

Get 邏輯比較簡單,下面是細節:

Status DBImpl::Get(const ReadOptions& options, const Slice& key,
                   std::string* value) {
  Status s;
MutexLock l(&mutex_);
  SequenceNumber snapshot;
if (options.snapshot != nullptr) {
    snapshot =
        static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number();
  } else {
    snapshot = versions_->LastSequence();
  }

  MemTable* mem = mem_;
  MemTable* imm = imm_;
  Version* current = versions_->current();
  mem->Ref();
if (imm != nullptr) imm->Ref();
  current->Ref();

bool have_stat_update = false;
  Version::GetStats stats;

// Unlock while reading from files and memtables
  {
    mutex_.Unlock();
    // First look in the memtable, then in the immutable memtable (if any).
    LookupKey lkey(key, snapshot);
    if (mem->Get(lkey, value, &s)) {
      // Done
    } elseif (imm != nullptr && imm->Get(lkey, value, &s)) {
      // Done
    } else {
      s = current->Get(options, lkey, value, &stats);
      have_stat_update = true;
    }
    mutex_.Lock();
  }

if (have_stat_update && current->UpdateStats(stats)) {
    MaybeScheduleCompaction();
  }
  mem->Unref();
if (imm != nullptr) imm->Unref();
  current->Unref();
return s;
}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/5xVkFoScKMXWfiirT3ojBQ