Elasticsearch 刪除重複文檔實現方式,你知道幾個?

之前文章有講到藉助:fingerprint filter 插件實現去重

近期又有社羣小夥伴問到同樣的問題,這裏一併梳理回覆如下。

1、python 腳本實現文檔去重

這裏講的實現,藉助 python 腳本實現。

由於涉及 8.X 版本 Elasticsearch 以安全方式的連接,這裏需要 python 升級到 3.10+ 版本纔可以。

1.1 實現前提

標定文檔重複標記——一般文檔中幾個字段或者全部字段重複,才認爲文檔是一樣的。

業務層面自己指定就可用 md5 值實現。

對於新聞類類線上業務的文檔舉例:

https://3g.163.com/news/article/H5APDMGH00019UD6.html

https://news.sina.com.cn/sx/2022-04-19/detail-imcwiwst2726757.shtml

如果拿文章標題(title) + 正文內容(content)內容組合取 md5,然後對比的話,兩者發佈網站不同,但內容可以認爲是重複的。

1.2 實現原理

2、實現代碼

#!/usr/local/bin/python3
from elasticsearch import Elasticsearch, helpers
import hashlib
import ssl

# 全局變量
ES_HOST = 'https://192.168.1.10:9200'
ES_USER = 'elastic'
ES_PASSWORD = '9Z=T2wOWIXXXXXXXX'
CERT_FINGERPRINT = "a4d0fe1eb7e1fd9874XXXXXXXXXX"

# 全局詞典
dict_of_duplicate_docs = {}

# https://www.elastic.co/guide/en/elasticsearch/client/python-api/current/config.html
# 要求python 版本3.10及以上
# fingerprint 生成方式,方式一:Elasticsearch 首次啓動的時候自動生成。
# 方式二:藉助命令行再次生成(結果同方式一)
# bash-4.2$ openssl x509 -fingerprint -sha256 -in config/certs/http_ca.crt
# SHA256 Fingerprint=A4:D0:FE:1E:B7:E1:FD:98:74:A4:10:6F:E1:XXXXXXXX
def es_init():
    es = Elasticsearch( ES_HOST,
    ssl_assert_fingerprint=CERT_FINGERPRINT,
    basic_auth=(ES_USER, ES_PASSWORD),
    verify_certs=False )
    return es
    
# 對每個文檔生成唯一id(自定義生成)
def populate_dict_of_duplicate_docs(hit):
    combined_key = ""
    
    # 三個字段決定索引是否重複,自動是根據業務指定的
    keys_to_include_in_hash = ["CAC""FTSE""SMI"]
    for mykey in keys_to_include_in_hash:
      combined_key += str(hit['_source'][mykey])
      
      _id = hit["_id"]
      
      # 基於三個字段的組合,生成md5值,便於後續去重用。
      hashval = hashlib.md5(combined_key.encode('utf-8')).digest()
      
      # 生成鍵值對詞典,key使用md5值,value 爲數組類型。
      # 相同的 key 值也就是相同的文檔,value 是文檔id列表
      dict_of_duplicate_docs.setdefault(hashval, []).append(_id)
      print(dict_of_duplicate_docs)
      
  
  # 待去重索引遍歷
  def scroll_over_all_docs():
      es = es_init()
      for hit in helpers.scan(es, index='stocks'):
        populate_dict_of_duplicate_docs(hit)
        
    # 去重處理函數
    def loop_over_hashes_and_remove_duplicates():
      es = es_init()
      for hashval, array_of_ids in dict_of_duplicate_docs.items():
      # 對id列表長度大於1的做去重處理操作
      if len(array_of_ids) > 1:
      print(" Duplicate docs hash=%s ****" % hashval)
      # 獲取相同的文檔
      matching_docs = es.mget(index="stocks"ids= array_of_ids[0:len(array_of_ids)-1])
      for doc in matching_docs['docs']:
        print("doc=%s\n" % doc)
        es.delete(index="stocks"id = doc['_id'])
        
def main():
    scroll_over_all_docs()
    loop_over_hashes_and_remove_duplicates()
    
main()

代碼的核心:

使用了 8.X 版本的 Elasticsearch 訪問方式。藉助:fingerprint 訪問實現。

fingerprint 兩種獲取方式:

方式一:Elasticsearch 啓動的時候已經包含。

方式二:可以藉助命令行再生成。

openssl x509 -fingerprint -sha256 -in config/certs/http_ca.crt

3、小結

文章給出 8.X 版本實現文檔去重的完整思路和 Python 代碼實現,加上之前講解的 logstash fingerprint filter 插件實現去重實現,共 2 種方案解決文檔重複問題。

你的項目實戰環節有沒有遇到文檔去重問題、刪除重複文檔問題?如何解決的?歡迎留言交流。

參考

https://github.com/deric/es-dedupe/blob/master/esdedupe/esdedupe.py

https://github.com/alexander-marquardt/deduplicate-elasticsearch

https://alexmarquardt.com/2018/07/23/deduplicating-documents-in-elasticsearch/

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/Mz_Go5ZH5MBJNp4tlRPteA