leveldb 高性能讀寫接口
leveldb 讀寫接口
leveldb 中分爲兩類接口,一類是 Mutation,一類是 ReadOnly。
-
Mutation 接口有
-
Put、Delete、Write
-
ReadOnly 接口有
-
Get
其中 Write 接口用來批量 Put 或 Delete 操作。
1.WriteBatch
WriteBatch 用來批量 Put 或 Delete 操作,WriteBatch 的內部實現是通過 WriteBatchInternal,該類提供了一堆靜態函數。
class LEVELDB_EXPORT WriteBatch {
public:
class LEVELDB_EXPORT Handler {
public:
virtual ~Handler();
virtual void Put(const Slice& key, const Slice& value) = 0;
virtual void Delete(const Slice& key) = 0;
};
// Store the mapping "key->value" in the database.
void Put(const Slice& key, const Slice& value);
// If the database contains a mapping for "key", erase it. Else do nothing.
void Delete(const Slice& key);
}
put 與 delete 會將 header 與 k、v 壓縮到std::string rep_;
中,具體的格式如下:
其中 key 與 value 會先寫入各自的 length(varint32 編碼),再寫入 k 或 v 的實際內容。
- 高性能 Write
Put 與 Delet 接口底層實現仍舊是 Write,不同之處在於 Put 與 Delete 會構造對應類型的 WriteBatch,然後通過 Write 接口執行寫。
Write 在寫入時爲了保證 LSM-Tree 不被破壞,leveldb 保證寫入的 sequence number 有序,支持併發寫入。sync 參數默認是 false,提供了高性能寫的能力,即批量寫異步化。
如果併發寫入直接使用同步的方式,對性能影響比較大,在 leveldb 中採用了一種合併隊列方式,即同一時刻存在多個 writer,但只有一個 writer 在執行寫 (這個寫的數據是所有 writer 往隊列中放的 batch,會被合併到一塊)。簡單來說就是:每一輪選舉一個寫線程執行隊列中所有(相同 sync 類型) 的寫,隨後喚醒繼續。
具體來說:
-
每次調用 Write 會構造一個 WriteBatch 包含 batch,是否同步寫入 sync,寫入是否完成 done。
-
在 leveledb 中,同時只有一個 write 線程在寫數據,其他線程只會將自己的 write 放入
writers_
雙端隊列中並阻塞等待,直到這個 write 被別的線程消費 (執行完了, done 了),或者輪到自己消費了 (是剛纔的 wirter),那麼開始執行真正的寫入。 -
接着查看是否有足夠空間寫入,例如
mem_
是否寫滿,是否必須觸發 minor compaction 等 -
BuildBatchGroup 方法會將
writers_
隊列中所有 sync 設置相同的寫請求組成一個 WriteBatch 進行寫入,合併它們的 WriteBatch 到tmp_batch_
中,允許攢批次聚合寫入。因爲同一時刻只有單一寫者,因此可以在這裏安全地複用同一個tmp_batch_
。如果 writers_ 隊列中只有一個 Writer,那麼 BuildBatchGroup 方法就不會多做拷貝,直接返回此 Writer 內的 WriteBatch。last_writer
會記錄下隊列中最後一個 batch。 -
然後通過 InsertInto 寫入到 memtable。
-
前面會把 [current_writer..... 到......last_writer] 合併成一個 WriteBatch 寫入到 memtable,相當於把別的線程的 writer 給處理了,那麼最後需要判斷一下不是當前的 writer 就設置 done 結束並喚醒對應的線程(通知被我處理了)。如果到了 last_writer 那麼表示本輪處理完畢。
-
最後,再次喚醒隊列中的頭部 writer 進入下一輪。
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Writer w(&mutex_);
w.batch = updates;
w.sync = options.sync;
w.done = false;
MutexLock l(&mutex_);
writers_.push_back(&w);
while (!w.done && &w != writers_.front()) {
w.cv.Wait();
}
if (w.done) {
return w.status;
}
// May temporarily unlock and wait.
Status status = MakeRoomForWrite(updates == nullptr);
uint64_t last_sequence = versions_->LastSequence();
Writer* last_writer = &w;
if (status.ok() && updates != nullptr) { // nullptr batch is for compactions
WriteBatch* write_batch = BuildBatchGroup(&last_writer);
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch);
// Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging
// and protects against concurrent loggers and concurrent writes
// into mem_.
{
mutex_.Unlock();
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
bool sync_error = false;
if (status.ok() && options.sync) {
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
if (status.ok()) {
status = WriteBatchInternal::InsertInto(write_batch, mem_);
}
mutex_.Lock();
if (sync_error) {
// The state of the log file is indeterminate: the log record we
// just added may or may not show up when the DB is re-opened.
// So we force the DB into a mode where all future writes fail.
RecordBackgroundError(status);
}
}
if (write_batch == tmp_batch_) tmp_batch_->Clear();
versions_->SetLastSequence(last_sequence);
}
while (true) {
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != &w) {
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer) break;
}
// Notify new head of write queue
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}
return status;
}
3.Readonly
readonly 接口包含了幾個:Get、GetSnapshot。
3.1 GetSnapshot
const Snapshot* DBImpl::GetSnapshot() {
MutexLock l(&mutex_);
return snapshots_.New(versions_->LastSequence());
}
使用方法爲:
leveldb::ReadOptions options;
options.snapshot = db->GetSnapshot();
leveldb::Iterator* iter = db->NewIterator(options);
delete iter;
db->ReleaseSnapshot(options.snapshot);
Snapshot 作爲一個基類是空的,其子類 SnapshotImpl 實現了一個雙向鏈表的快照 (SequenceNumber)。
class LEVELDB_EXPORT Snapshot {
protected:
virtual ~Snapshot();
};
所以說快照可以理解爲之前提到的 SequenceNumber。
class SnapshotImpl : public Snapshot {
public:
SnapshotImpl* prev_;
SnapshotImpl* next_;
const SequenceNumber sequence_number_;
};
SnapshotImpl 更像是一個雙向鏈表節點,包含了節點信息,那麼維護快照的結構就叫做 SnapshotList,實現了鏈表的維護,比較典型的接口爲 New、Delete。
而這個雙向鏈表的實現是通過 dummy 節點實現 (SnapshotImpl head_;
)。
對於 New 來說,就是根據 sequence_number 創建一個快照節點,並加入到快照鏈表中。
SnapshotImpl* New(SequenceNumber sequence_number) {
assert(empty() || newest()->sequence_number_ <= sequence_number);
SnapshotImpl* snapshot = new SnapshotImpl(sequence_number);
snapshot->next_ = &head_;
snapshot->prev_ = head_.prev_;
snapshot->prev_->next_ = snapshot;
snapshot->next_->prev_ = snapshot;
return snapshot;
}
Delete 就是從快照鏈表中刪除一個快照節點。
所以回到開頭 GetSnapshot 其實就是通過根據 VersionSet 得到最近的 sequence 構造一個 snapshot 節點放入快照鏈中。
const Snapshot* DBImpl::GetSnapshot() {
MutexLock l(&mutex_);
return snapshots_.New(versions_->LastSequence());
}
ReleaseSnapshot 就是刪除。
3.2 Get
Get 邏輯比較簡單,下面是細節:
-
拿到 snapshot,或者說 sequence number。
-
根據 key 與 snapshot 構造出 lookup key
-
先從 memtable 中查詢,查詢不到再從 immtable 中查詢,最後通過 Version 在 sst 文件中查詢。
Status DBImpl::Get(const ReadOptions& options, const Slice& key,
std::string* value) {
Status s;
MutexLock l(&mutex_);
SequenceNumber snapshot;
if (options.snapshot != nullptr) {
snapshot =
static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number();
} else {
snapshot = versions_->LastSequence();
}
MemTable* mem = mem_;
MemTable* imm = imm_;
Version* current = versions_->current();
mem->Ref();
if (imm != nullptr) imm->Ref();
current->Ref();
bool have_stat_update = false;
Version::GetStats stats;
// Unlock while reading from files and memtables
{
mutex_.Unlock();
// First look in the memtable, then in the immutable memtable (if any).
LookupKey lkey(key, snapshot);
if (mem->Get(lkey, value, &s)) {
// Done
} elseif (imm != nullptr && imm->Get(lkey, value, &s)) {
// Done
} else {
s = current->Get(options, lkey, value, &stats);
have_stat_update = true;
}
mutex_.Lock();
}
if (have_stat_update && current->UpdateStats(stats)) {
MaybeScheduleCompaction();
}
mem->Unref();
if (imm != nullptr) imm->Unref();
current->Unref();
return s;
}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/5xVkFoScKMXWfiirT3ojBQ