深度解析 Lucene 輕量級全文索引實現原理

作者:vivo 互聯網服務器團隊 - Qian Yulun

一、Lucene 簡介

1.1 Lucene 是什麼?

1.2 Lucene 的使用場景

適用於需要數據索引量不大的場景,當索引量過大時需要使用 ES、Solr 等全文搜索服務器實現搜索功能。

1.3 通過本文你能瞭解到哪些內容?

本文旨在分享 Lucene 搜索引擎的源碼閱讀和功能開發中的經驗,Lucene 採用 7.3.1 版本。

二、Lucene 基礎工作流程

索引的生成分爲兩個部分:

  1. 創建階段:
  1. 搜索階段:

索引創建及搜索流程如下圖所示:

圖片

三、Lucene 索引構成

3.1 正向索引

Lucene 的基礎層次結構由索引、段、文檔、域、詞五個部分組成。正向索引的生成即爲基於 Lucene 的基礎層次結構一級一級處理文檔並分解域存儲詞的過程。

圖片

索引文件層級關係如圖 1 所示:

3.2 倒排索引

Lucene 全文索引的核心是基於倒排索引實現的快速索引機制。

倒排索引原理如圖 2 所示,倒排索引簡單來說就是基於分析器將文本內容進行分詞後,記錄每個詞出現在哪篇文章中,從而通過用戶輸入的搜索詞查詢出包含該詞的文章。

圖片

**問題:**上述倒排索引使用時每次都需要將索引詞加載到內存中,當文章數量較多,篇幅較長時,索引詞可能會佔用大量的存儲空間,加載到內存後內存損耗較大。

解決方案:從 Lucene4 開始,Lucene 採用了 FST 來減少索引詞帶來的空間消耗。

FST(Finite StateTransducers),中文名有限狀態機轉換器。其主要特點在於以下四點:

具體存儲方式如圖 3 所示:

圖片

倒排索引相關文件包含. tip、.tim 和. doc 這三個文件,其中:

3.3 索引查詢及文檔搜索過程

Lucene 利用倒排索引定位需要查詢的文檔號,通過文檔號搜索出文件後,再利用詞權重等信息對文檔排序後返回。

文件格式如圖 4 所示:

圖片

上文主要講解 Lucene 的工作原理,下文將闡述 Java 中 Lucene 執行索引、查詢等操作的相關代碼。

四、Lucene 的增刪改操作

Lucene 項目中文本的解析,存儲等操作均由 IndexWriter 類實現,IndexWriter 文件主要由 Directory 和 IndexWriterConfig 兩個類構成,其中:

Directory:用於指定存放索引文件的目錄類型。既然要對文本內容進行搜索,自然需要先將這些文本內容及索引信息寫入到目錄裏。Directory 是一個抽象類,針對索引的存儲允許有多種不同的實現。常見的存儲方式一般包括存儲有本地(FSDirectory),內存(RAMDirectory)等。

IndexWriterConfig:用於指定 IndexWriter 在文件內容寫入時的相關配置,包括 OpenMode 索引構建模式、Similarity 相關性算法等。

IndexWriter 具體是如何操作索引的呢?讓我們來簡單分析一下 IndexWriter 索引操作的相關源碼。

4.1. 文檔的新增

a. Lucene 會爲每個文檔創建 ThreadState 對象,對象持有 DocumentWriterPerThread 來執行文件的增刪改操作;

ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter) {
  ThreadState threadState = null;
  synchronized (this) {
    if (freeList.isEmpty()) {
      // 如果不存在已創建的空閒ThreadState,則新創建一個
      return newThreadState();
    } else {
      // freeList後進先出,僅使用有限的ThreadState操作索引
      threadState = freeList.remove(freeList.size()-1);
      // 優先使用已經初始化過DocumentWriterPerThread的ThreadState,並將其與當前
      // ThreadState換位,將其移到隊尾優先使用
      if (threadState.dwpt == null) {
        for(int i=0;i<freeList.size();i++) {
          ThreadState ts = freeList.get(i);
          if (ts.dwpt != null) {
            freeList.set(i, threadState);
            threadState = ts;
            break;
          }
        }
      }
    }
  }
  threadState.lock();
  return threadState;
}

b. 索引文件的插入:DocumentWriterPerThread 調用 DefaultIndexChain 下的 processField 來處理文檔中的每個域,processField 方法是索引鏈的核心執行邏輯。通過用戶對每個域設置的不同的 FieldType 進行相應的索引、分詞、存儲等操作。FieldType 中比較重要的是 indexOptions:

// 構建倒排表
if (fieldType.indexOptions() != IndexOptions.NONE) {
    fp = getOrAddField(fieldName, fieldType, true);
    boolean first = fp.fieldGen != fieldGen;
    // field具體的索引、分詞操作
    fp.invert(field, first);
    if (first) {
      fields[fieldCount++] = fp;
      fp.fieldGen = fieldGen;
    }
} else {
  verifyUnIndexedFieldType(fieldName, fieldType);
}
// 存儲該field的storeField
if (fieldType.stored()) {
  if (fp == null) {
    fp = getOrAddField(fieldName, fieldType, false);
  }
  if (fieldType.stored()) {
    String value = field.stringValue();
    if (value != null && value.length() > IndexWriter.MAX_STORED_STRING_LENGTH) {
      throw new IllegalArgumentException("stored field \"" + field.name() + "\" is too large (" + value.length() + " characters) to store");
    }
    try {
      storedFieldsConsumer.writeField(fp.fieldInfo, field);
    } catch (Throwable th) {
      throw AbortingException.wrap(th);
    }
  }
}
// 建立DocValue(通過文檔查詢文檔下包含了哪些詞)
DocValuesType dvType = fieldType.docValuesType();
if (dvType == null) {
  throw new NullPointerException("docValuesType must not be null (field: \"" + fieldName + "\")");
}
if (dvType != DocValuesType.NONE) {
  if (fp == null) {
    fp = getOrAddField(fieldName, fieldType, false);
  }
  indexDocValue(fp, dvType, field);
}
if (fieldType.pointDimensionCount() != 0) {
  if (fp == null) {
    fp = getOrAddField(fieldName, fieldType, false);
  }
  indexPoint(fp, field);
}

c. 解析 Field 首先需要構造 TokenStream 類,用於產生和轉換 token 流,TokenStream 有兩個重要的派生類 Tokenizer 和 TokenFilter,其中 Tokenizer 用於通過 java.io.Reader 類讀取字符,產生 Token 流,然後通過任意數量的 TokenFilter 來處理這些輸入的 Token 流,具體源碼如下:

// invert:對Field進行分詞處理首先需要將Field轉化爲TokenStream
try (TokenStream stream = tokenStream = field.tokenStream(docState.analyzer, tokenStream))
// TokenStream在不同分詞器下實現不同,根據不同分詞器返回相應的TokenStream
if (tokenStream != null) {
  return tokenStream;
} else if (readerValue() != null) {
  return analyzer.tokenStream(name(), readerValue());
} else if (stringValue() != null) {
  return analyzer.tokenStream(name(), stringValue());
}
public final TokenStream tokenStream(final String fieldName, final Reader reader) {
  // 通過複用策略,如果TokenStreamComponents中已經存在Component則複用。
  TokenStreamComponents components = reuseStrategy.getReusableComponents(this, fieldName);
  final Reader r = initReader(fieldName, reader);
  // 如果Component不存在,則根據分詞器創建對應的Components。
  if (components == null) {
    components = createComponents(fieldName);
    reuseStrategy.setReusableComponents(this, fieldName, components);
  }
  // 將java.io.Reader輸入流傳入Component中。
  components.setReader(r);
  return components.getTokenStream();
}

d. 根據 IndexWriterConfig 中配置的分詞器,通過策略模式返回分詞器對應的分詞組件,針對不同的語言及不同的分詞需求,分詞組件存在很多不同的實現。

 以 StandardAnalyzer(標準分詞器)爲例:

// 標準分詞器創建Component過程,涵蓋了標準分詞處理器、Term轉化小寫、常用詞過濾三個功能
protected TokenStreamComponents createComponents(final String fieldName) {
  final StandardTokenizer src = new StandardTokenizer();
  src.setMaxTokenLength(maxTokenLength);
  TokenStream tok = new StandardFilter(src);
  tok = new LowerCaseFilter(tok);
  tok = new StopFilter(tok, stopwords);
  return new TokenStreamComponents(src, tok) {
    @Override
    protected void setReader(final Reader reader) {
      src.setMaxTokenLength(StandardAnalyzer.this.maxTokenLength);
      super.setReader(reader);
    }
  };
}

e. 在獲取 TokenStream 之後通過 TokenStream 中的 incrementToken 方法分析並獲取屬性,再通過 TermsHashPerField 下的 add 方法構建倒排表,最終將 Field 的相關數據存儲到類型爲 FreqProxPostingsArray 的 freqProxPostingsArray 中,以及 TermVectorsPostingsArray 的 termVectorsPostingsArray 中,構成倒排表;

// 以LowerCaseFilter爲例,通過其下的increamentToken將Token中的字符轉化爲小寫
public final boolean incrementToken() throws IOException {
  if (input.incrementToken()) {
    CharacterUtils.toLowerCase(termAtt.buffer(), 0, termAtt.length());
    return true;
  } else
    return false;
}
  try (TokenStream stream = tokenStream = field.tokenStream(docState.analyzer, tokenStream)) {
    // reset TokenStream
    stream.reset();
    invertState.setAttributeSource(stream);
    termsHashPerField.start(field, first);
    // 分析並獲取Token屬性
    while (stream.incrementToken()) {
      ……
      try {
        // 構建倒排表
        termsHashPerField.add();
      } catch (MaxBytesLengthExceededException e) {
        ……
      } catch (Throwable th) {
        throw AbortingException.wrap(th);
      }
    }
    ……
}

4.2 文檔的刪除

a. Lucene 下文檔的刪除,首先將要刪除的 Term 或 Query 添加到刪除隊列中;

synchronized long deleteTerms(final Term... terms) throws IOException {
  // TODO why is this synchronized?
  final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
  // 文檔刪除操作是將刪除的詞信息添加到刪除隊列中,根據flush策略進行刪除
  long seqNo = deleteQueue.addDelete(terms);
  flushControl.doOnDelete();
  lastSeqNo = Math.max(lastSeqNo, seqNo);
  if (applyAllDeletes(deleteQueue)) {
    seqNo = -seqNo;
  }
  return seqNo;
}

b. 根據 Flush 策略觸發刪除操作;

private boolean applyAllDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
  // 判斷是否滿足刪除條件 --> onDelete
  if (flushControl.getAndResetApplyAllDeletes()) {
    if (deleteQueue != null) {
      ticketQueue.addDeletes(deleteQueue);
    }
    // 指定執行刪除操作的event
    putEvent(ApplyDeletesEvent.INSTANCE); // apply deletes event forces a purge
    return true;
  }
  return false;
}
public void onDelete(DocumentsWriterFlushControl control, ThreadState state) {
  // 判斷並設置是否滿足刪除條件
  if ((flushOnRAM() && control.getDeleteBytesUsed() > 1024*1024*indexWriterConfig.getRAMBufferSizeMB())) {
    control.setApplyAllDeletes();
    if (infoStream.isEnabled("FP")) {
      infoStream.message("FP", "force apply deletes bytesUsed=" + control.getDeleteBytesUsed() + " vs ramBufferMB=" + indexWriterConfig.getRAMBufferSizeMB());
    }
  }
}

4.3 文檔的更新

文檔的更新就是一個先刪除後插入的過程,本文就不再做更多贅述。

4.4 索引 Flush

文檔寫入到一定數量後,會由某一線程觸發 IndexWriter 的 Flush 操作,生成段並將內存中的 Document 信息寫到硬盤上。Flush 操作目前僅有一種策略:FlushByRamOrCountsPolicy。FlushByRamOrCountsPolicy 主要基於兩種策略自動執行 Flush 操作:

其中 activeBytes() 爲 dwpt 收集的索引所佔的內存量,deleteByteUsed 爲刪除的索引量。

@Override
public void onInsert(DocumentsWriterFlushControl control, ThreadState state) {
  // 根據文檔數進行Flush
  if (flushOnDocCount()
      && state.dwpt.getNumDocsInRAM() >= indexWriterConfig
          .getMaxBufferedDocs()) {
    // Flush this state by num docs
    control.setFlushPending(state);
  // 根據內存使用量進行Flush
  } else if (flushOnRAM()) {// flush by RAM
    final long limit = (long) (indexWriterConfig.getRAMBufferSizeMB() * 1024.d * 1024.d);
    final long totalRam = control.activeBytes() + control.getDeleteBytesUsed();
    if (totalRam >= limit) {
      if (infoStream.isEnabled("FP")) {
        infoStream.message("FP", "trigger flush: activeBytes=" + control.activeBytes() + " deleteBytes=" + control.getDeleteBytesUsed() + " vs limit=" + limit);
      }
      markLargestWriterPending(control, state, totalRam);
    }
  }
}

將內存信息寫入索引庫。

圖片

索引的 Flush 分爲主動 Flush 和自動 Flush,根據策略觸發的 Flush 操作爲自動 Flush,主動 Flush 的執行與自動 Flush 有較大區別,關於主動 Flush 本文暫不多做贅述。需要了解的話可以跳轉鏈接

4.5 索引段 Merge

索引 Flush 時每個 dwpt 會單獨生成一個 segment,當 segment 過多時進行全文檢索可能會跨多個 segment,產生多次加載的情況,因此需要對過多的 segment 進行合併。

段合併的執行通過 MergeScheduler 進行管理。mergeScheduler 也包含了多種管理策略,包括 NoMergeScheduler、SerialMergeScheduler 和 ConcurrentMergeScheduler。

  1. merge 操作首先需要通過 updatePendingMerges 方法根據段的合併策略查詢需要合併的段。段合併策略分爲很多種,本文僅介紹兩種 Lucene 默認使用的段合併策略:TieredMergePolicy 和 LogMergePolicy。
private synchronized boolean updatePendingMerges(MergePolicy mergePolicy, MergeTrigger trigger, int maxNumSegments)
  throws IOException {
  final MergePolicy.MergeSpecification spec;
  // 查詢需要合併的段
  if (maxNumSegments != UNBOUNDED_MAX_MERGE_SEGMENTS) {
    assert trigger == MergeTrigger.EXPLICIT || trigger == MergeTrigger.MERGE_FINISHED :
    "Expected EXPLICT or MERGE_FINISHED as trigger even with maxNumSegments set but was: " + trigger.name();
    spec = mergePolicy.findForcedMerges(segmentInfos, maxNumSegments, Collections.unmodifiableMap(segmentsToMerge), this);
    newMergesFound = spec != null;
    if (newMergesFound) {
      final int numMerges = spec.merges.size();
      for(int i=0;i<numMerges;i++) {
        final MergePolicy.OneMerge merge = spec.merges.get(i);
        merge.maxNumSegments = maxNumSegments;
      }
    }
  } else {
    spec = mergePolicy.findMerges(trigger, segmentInfos, this);
  }
  // 註冊所有需要合併的段
  newMergesFound = spec != null;
  if (newMergesFound) {
    final int numMerges = spec.merges.size();
    for(int i=0;i<numMerges;i++) {
      registerMerge(spec.merges.get(i));
    }
  }
  return newMergesFound;
}

2)通過 ConcurrentMergeScheduler 類中的 merge 方法創建用戶合併的線程 MergeThread 並啓動。

@Override
public synchronized void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException {
  ……
  while (true) {
    ……
    // 取出註冊的後選段
    OneMerge merge = writer.getNextMerge();
    boolean success = false;
    try {
      // 構建用於合併的線程MergeThread 
      final MergeThread newMergeThread = getMergeThread(writer, merge);
      mergeThreads.add(newMergeThread);
      updateIOThrottle(newMergeThread.merge, newMergeThread.rateLimiter);
      if (verbose()) {
        message("    launch new thread [" + newMergeThread.getName() + "]");
      }
      // 啓用線程
      newMergeThread.start();
      updateMergeThreads();
      success = true;
    } finally {
      if (!success) {
        writer.mergeFinish(merge);
      }
    }
  }
}

3)通過 doMerge 方法執行 merge 操作;

public void merge(MergePolicy.OneMerge merge) throws IOException {
  ……
      try {
        // 用於處理merge前緩存任務及新段相關信息生成
        mergeInit(merge);
        // 執行段之間的merge操作
        mergeMiddle(merge, mergePolicy);
        mergeSuccess(merge);
        success = true;
      } catch (Throwable t) {
        handleMergeException(t, merge);
      } finally {
        // merge完成後的收尾工作
        mergeFinish(merge)
      }
……
}

五、Lucene 搜索功能實現

5.1 加載索引庫

Lucene 想要執行搜索首先需要將索引段加載到內存中,由於加載索引庫的操作非常耗時,因此僅有當索引庫產生變化時需要重新加載索引庫。

圖片

加載索引庫分爲加載段信息和加載文檔信息兩個部分:

1)加載段信息:

2)加載文檔信息:

圖片

5.2 封裝

索引庫加載完成後需要 IndexReader 封裝進 IndexSearch,IndexSearch 通過用戶構造的 Query 語句和指定的 Similarity 文本相似度算法(默認 BM25)返回用戶需要的結果。通過 IndexSearch.search 方法實現搜索功能。

搜索:Query 包含多種實現,包括 BooleanQuery、PhraseQuery、TermQuery、PrefixQuery 等多種查詢方法,使用者可根據項目需求構造查詢語句

排序:IndexSearch 除了通過 Similarity 計算文檔相關性分值排序外,也提供了 BoostQuery 的方式讓用戶指定關鍵詞分值,定製排序。Similarity 相關性算法也包含很多種不同的相關性分值計算實現,此處暫不做贅述,讀者有需要可自行網上查閱。

六、總結

Lucene 作爲全文索引工具包,爲中小型項目提供了強大的全文檢索功能支持,但 Lucene 在使用的過程中存在諸多問題:

Lucene 使用時存在諸多限制,使用起來也不那麼方便,當數據量增大時還是儘量選擇 ElasticSearch 等分佈式搜索服務器作爲搜索功能的實現方案。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/_oT38Ra9QXiDKNjG3r7mAQ