SeaweedFS 分佈式文件系統源碼分析

本文基於 seaweedfs 3.46[1]

SeaweedFS 的架構包括 Master Server、Volume Server 和 Filer Server 。

啓動 Master Server

啓動一個 Master Server 可以使用以下命令:

weed master -ip=127.0.0.1 -ip.bind=0.0.0.0

啓動入口以及所有的參數定義在 weed/command/master.go ,默認情況 http 監聽端口使用 9333 ,grpc 監聽端口則在 http 端口的基礎上加 10000 (所有組件的默認規則)即 19333 :

if *masterOption.portGrpc == 0 {
 *masterOption.portGrpc = 10000 + *masterOption.port
}

Master Server 支持多節點(奇數)部署。使用 Raft 一致性算法來選舉 Leader 節點,這樣可以保證在 Leader 節點宕機的情況下,其他節點可以重新選舉出新的 Leader 節點,從而保證系統的高可用性。

如下,啓動一個由三個 Master Server 節點所組成的集羣:

weed master -ip=127.0.0.1 -ip.bind=0.0.0.0 -port=9333 -peers="127.0.0.1:9333,127.0.0.1:9334,127.0.0.1:9335"
weed master -ip=127.0.0.1 -ip.bind=0.0.0.0 -port=9334 -peers="127.0.0.1:9333,127.0.0.1:9334,127.0.0.1:9335"
weed master -ip=127.0.0.1 -ip.bind=0.0.0.0 -port=9335 -peers="127.0.0.1:9333,127.0.0.1:9334,127.0.0.1:9335"

當 Master Server 啓動時,它會嘗試加入集羣並參與 Leader 選舉。一旦選舉完成,Leader 節點將負責管理整個集羣以及 Volume Server 。

首先會創建一個 Master Server 包裝的 weed_server.RaftServer 對象:

raftServer, err = weed_server.NewRaftServer(raftServerOption)
if raftServer == nil {
 glog.Fatalf("please verify %s is writable, see https://github.com/seaweedfs/seaweedfs/issues/717: %s", *masterOption.metaFolder, err)
}

weed_server.NewRaftServer() 方法中會創建好 Raft 節點所需的各種參數和對象,然後調用 github.com/seaweedfs/raft[2] 庫創建 RaftServer 對象並啓動 Raft 節點:

type RaftServer struct {
 // 存儲初始節點信息
 peers map[string]pb.ServerAddress
 // Raft 節點
 raftServer raft.Server
 // HashiCorp Raft 節點
 RaftHashicorp *hashicorpRaft.Raft
 // 用於管理 Raft 節點之間的通信
 TransportManager *transport.Manager
 // Raft 節點的數據目錄
 dataDir string
 // Raft 節點的地址
 serverAddr pb.ServerAddress
 // Raft 集羣的拓撲結構
 topo *topology.Topology
 // Raft 節點的 gRPC 服務
 *raft.GrpcServer
}

func NewRaftServer(option *RaftServerOption) (*RaftServer, error) {
 // 通過 option 創建一個 RaftServer 對象 s
 s := &RaftServer{
  peers:      option.Peers,
  serverAddr: option.ServerAddr,
  dataDir:    option.DataDir,
  topo:       option.Topo,
 }

 //...

 // 調用 github.com/seaweedfs/raft 庫,創建 RaftServer 對象
 s.raftServer, err = raft.NewServer(string(s.serverAddr), s.dataDir, transporter, stateMachine, option.Topo, "")

 //...

 // 啓動 Raft 節點
 if err := s.raftServer.Start(); err != nil {
  return nil, err
 }

 // 將節點加入到 Raft 集羣中
 for name, peer := range s.peers {
  if err := s.raftServer.AddPeer(name, peer.ToGrpcAddress()); err != nil {
   return nil, err
  }
 }

 //...

 glog.V(0).Infof("current cluster leader: %v", s.raftServer.Leader())

 return s, nil
}

最後,會打印出當前的 Leader 節點,如果對 Raft 選舉算法的處理細節感興趣,可以繼續深入 s.raftServer.Start() 的實現。

Raft 節點啓動成功後,Master Server 會註冊一些集羣相關的接口,方便查看集羣狀態:

r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET")
r.HandleFunc("/cluster/healthz", raftServer.HealthzHandler).Methods("GET""HEAD")
if *masterOption.raftHashicorp {
 r.HandleFunc("/raft/stats", raftServer.StatsRaftHandler).Methods("GET")
}

請求如下:

$ curl http://127.0.0.1:9333/cluster/status
{"IsLeader":true,"Leader":"127.0.0.1:9333","Peers":["127.0.0.1:9335","127.0.0.1:9334"]}
$ curl http://127.0.0.1:9334/cluster/status
{"Leader":"127.0.0.1:9333","Peers":["127.0.0.1:9335","127.0.0.1:9333"]}
$ curl http://127.0.0.1:9335/cluster/status
{"Leader":"127.0.0.1:9333","Peers":["127.0.0.1:9333","127.0.0.1:9334"]}

啓動 Volume Server

啓動一個 Volume Server 可以使用以下命令:

weed volume -mserver="127.0.0.1:9333" -dir=data -ip=127.0.0.1 -ip.bind=0.0.0.0

啓動入口以及所有的參數定義在 weed/command/volume.go ,默認情況 http 監聽端口使用 8080 ,grpc 監聽端口使用 18080 。

其中,-mserver 爲 Master Server 連接地址,當需要連接的 Master Server 爲集羣時,可以將多個 Master Server 的連接地址用逗號分隔; -dir 則用來指定 Volume Server 存儲數據文件的目錄。

和 Master Server 不同,Volume Server 支持橫向擴展,其節點數量規模可以隨着數據量和性能需求的變化而隨時動態調整。

一旦 Volume Server 啓動後,就會與 Master Server 保持通信,彙報自身的狀態,並根據 Master Server 的指示執行創建、刪除、修復等操作。

核心邏輯在 weed/server/volume_grpc_client_to_master.goVolumeServer.doHeartbeat 方法。

首先會創建一個 Master Server 的 gRPC 連接客戶端,並使用該客戶端調用 SendHeartbeat 方法:

// 創建 Master Server 的 gRPC 連接客戶端
client := master_pb.NewSeaweedClient(grpcConnection)
// 調用 SendHeartbeat
stream, err := client.SendHeartbeat(ctx)
if err != nil {
 glog.V(0).Infof("SendHeartbeat to %s: %v", masterAddress, err)
 return "", err
}

SendHeartbeat 方法是一個雙向流式 RPC ,允許在一次調用中發送多個請求和響應,其 ProtoBuf 定義如下:

rpc SendHeartbeat (stream Heartbeat) returns (stream HeartbeatResponse) {
}

接着創建一個 goroutine 用來處理從 Master Server 發送過來的 Heartbeat 請求:

go func() {
 for {
  // 從輸入流中讀取 Heartbeat 請求
  in, err := stream.Recv()
  if err != nil {
   doneChan <- err
   return
  }
  // ...

  // 如果 Heartbeat 請求中包含了卷大小限制,並且該限制和當前 Volume Server 中保存的限制不同
  if in.GetVolumeSizeLimit() != 0 && vs.store.GetVolumeSizeLimit() != in.GetVolumeSizeLimit() {
   // 將 Volume Server 中保存的限制更新爲 Heartbeat 請求中的限制
   vs.store.SetVolumeSizeLimit(in.GetVolumeSizeLimit())
   // 調用 vs.store.MaybeAdjustVolumeMax() 方法重新計算卷的最大容量
   if vs.store.MaybeAdjustVolumeMax() {
    // 如果計算結果發生了變化,則使用 stream.Send() 方法向 Master Server 發送 Heartbeat 響應
    if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
     glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", vs.currentMaster, err)
     return
    }
   }
  }
  // 如果 Heartbeat 請求中包含了新的 Master Server 地址,並且該地址和當前地址不同
  if in.GetLeader() != "" && string(vs.currentMaster) != in.GetLeader() {
   // 通知主函數切換新的 Master Server 地址作爲 Leader
   glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), vs.currentMaster)
   newLeader = pb.ServerAddress(in.GetLeader())
   doneChan <- nil
   return
  }
 }
}()

最後使用一個 for select 來監聽來自 Volume Server 存儲層的四個通道:NewVolumesChanNewEcShardsChanDeletedVolumesChanDeletedEcShardsChan。每當有新的卷或 EC 分片被創建或刪除時,會生成一個 Heartbeat 消息,並使用 stream.Send() 方法將其發送到 Master Server ,同時也會定期發送心跳消息給 Master Server :

for {
 select {
 // 有新的卷被創建
 case volumeMessage := <-vs.store.NewVolumesChan:
  // ...
  // 通知 Master Server
  glog.V(0).Infof("volume server %s:%d adds volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id)
  if err = stream.Send(deltaBeat); err != nil {
   glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
   return "", err
  }
 // 有新的 EC 分片被創建
 case ecShardMessage := <-vs.store.NewEcShardsChan:
  // ...
  // 通知 Master Server
  glog.V(0).Infof("volume server %s:%d adds ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id,
   erasure_coding.ShardBits(ecShardMessage.EcIndexBits).ShardIds())
  if err = stream.Send(deltaBeat); err != nil {
   glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
   return "", err
  }
 // 有卷被刪除
 case volumeMessage := <-vs.store.DeletedVolumesChan:
  // ...
  // 通知 Master Server
  glog.V(0).Infof("volume server %s:%d deletes volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id)
  if err = stream.Send(deltaBeat); err != nil {
   glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
   return "", err
  }
 // 有 EC 分片被刪除
 case ecShardMessage := <-vs.store.DeletedEcShardsChan:
  // ...
  // 通知 Master Server
  glog.V(0).Infof("volume server %s:%d deletes ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id,
   erasure_coding.ShardBits(ecShardMessage.EcIndexBits).ShardIds())
  if err = stream.Send(deltaBeat); err != nil {
   glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
   return "", err
  }
 // 發送卷信息的心跳消息
 case <-volumeTickChan.C:
  glog.V(4).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port)
  vs.store.MaybeAdjustVolumeMax()
  if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
   glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterAddress, err)
   return "", err
  }
 // 發送 EC 分片信息的心跳消息
 case <-ecShardTickChan.C:
  glog.V(4).Infof("volume server %s:%d ec heartbeat", vs.store.Ip, vs.store.Port)
  if err = stream.Send(vs.store.CollectErasureCodingHeartbeat()); err != nil {
   glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterAddress, err)
   return "", err
  }
 // Volume Server 停止,退出監聽
 case err = <-doneChan:
  return
 // 用於在 Volume Server 停止時發送最終的心跳消息
 case <-vs.stopChan:
  // ...
  glog.V(1).Infof("volume server %s:%d stops and deletes all volumes", vs.store.Ip, vs.store.Port)
  if err = stream.Send(emptyBeat); err != nil {
   glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
   return "", err
  }
  return
 }
}

啓動 Filer Server

啓動一個 Filer Server 可以使用以下命令:

weed filer -s3 -master="127.0.0.1:9333" -ip=127.0.0.1 -ip.bind=0.0.0.0

啓動入口以及所有的參數定義在 weed/command/filer.go ,默認情況 http 監聽端口使用 8888 ,grpc 監聽端口使用 18888 。

在這裏,-master 爲 Master Server 連接地址,同樣地,當需要連接的 Master Server 爲集羣時,可以將多個 Master Server 的連接地址用逗號分隔; -s3 則代表要啓動 S3 網關功能,默認監聽 8333 端口。

Filer Server 可以理解爲一個文件管理器,通過向下對接 Volume Server 與 Master Server,對外提供豐富的功能與特性,除了自身提供的 API 接口,還支持擴展其它比如 POSIX ,WebDAV,S3 等的文件操作接口。

Filer Server 通過外部數據庫存儲文件的元數據信息。默認情況下,使用的是 leveldb ,支持替換爲其它流行的數據庫,例如 Sqlite、MySql、Etcd 等,具體可以參考 wiki/Filer-Stores[3] 。

作爲一個 API Server ,Filer Server 在架構上就是一個服務端 + 數據庫模型,其節點的數量和規模可以根據不同的工作負載和使用情況進行優化和調整。

上傳文件

首先分析 Filer Server 自身提供的 API 接口,上傳文件可以直接調用 :

$ curl -F "file_name=@test.txt" -X POST "http://127.0.0.1:8888"
{"name":"test.txt","size":14}

文件上傳的接口定義在 weed/server/filer_server_handlers_write.goPostHandler 方法:

func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, contentLength int64) {
 // 解析請求的目標路徑
 // ...
 // 解析請求的查詢參數,用於確定文件的存儲位置和屬性
 // ...
 if query.Has("mv.from") {
  // 若查詢參數中出現 mv.from ,則進行文件移動操作
  fs.move(ctx, w, r, so)
 } else {
  // 文件上傳操作,自動分塊
  fs.autoChunk(ctx, w, r, contentLength, so)
 }

 util.CloseRequest(r)
}

跟蹤到 fs.autoChunk 方法:

func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, contentLength int64, so *operation.StorageOption) {
 //...

 if r.Method == "POST" {
  // 上傳文件
  if r.Header.Get("Content-Type") == "" && strings.HasSuffix(r.URL.Path, "/") {
   reply, err = fs.mkdir(ctx, w, r)
  } else {
   // 自動分塊上傳
   reply, md5bytes, err = fs.doPostAutoChunk(ctx, w, r, chunkSize, contentLength, so)
  }
 } else {
  // 創建目錄
  reply, md5bytes, err = fs.doPutAutoChunk(ctx, w, r, chunkSize, contentLength, so)
 }

 //...
}

繼續來到 fs.doPostAutoChunk 方法:

func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, contentLength int64, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) {

 // 讀取上傳的文件內容
 multipartReader, multipartReaderErr := r.MultipartReader()
 if multipartReaderErr != nil {
  return nil, nil, multipartReaderErr
 }

 // 讀取第一個分塊,在這裏,我們只需要讀取第一個分塊,即上傳文件的內容的分塊
 part1, part1Err := multipartReader.NextPart()
 if part1Err != nil {
  return nil, nil, part1Err
 }

 // 獲取文件名和 Content-Type
 fileName := part1.FileName()
 if fileName != "" {
  fileName = path.Base(fileName)
 }
 contentType := part1.Header.Get("Content-Type")
 if contentType == "application/octet-stream" {
  contentType = ""
 }

 // 核心邏輯
 // 將上傳的文件內容轉換爲文件分塊,並返回文件分塊的相關信息
 fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, part1, chunkSize, fileName, contentType, contentLength, so)
 if err != nil {
  return nil, nil, err
 }

 // 計算文件內容的 MD5 值
 md5bytes = md5Hash.Sum(nil)
 // 保存文件元數據信息
 filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, contentType, so, md5bytes, fileChunks, chunkOffset, smallContent)
 if replyerr != nil {
  fs.filer.DeleteChunks(fileChunks)
 }

 return
}

這些都比較好讀,繼續跟蹤到核心邏輯處 fs.uploadReaderToChunks ,方法內首先會進行一些正確性校驗和必要變量的初始化,然後開啓一個循環,不斷讀取數據並將其轉換爲一個或多個 Chunk :

func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) {
 // ...一系列操作
 // 進行一些正確性校驗和必要變量的初始化

 for {

  // 使用對象池機制限制 bytes.Buffer 對象的數量,優化內存佔用
  bytesBufferLimitCond.L.Lock()
  for atomic.LoadInt64(&bytesBufferCounter) >= 4 {
   glog.V(4).Infof("waiting for byte buffer %d", atomic.LoadInt64(&bytesBufferCounter))
   bytesBufferLimitCond.Wait()
  }
  atomic.AddInt64(&bytesBufferCounter, 1)
  bytesBufferLimitCond.L.Unlock()

  bytesBuffer := bufPool.Get().(*bytes.Buffer)
  glog.V(4).Infof("received byte buffer %d", atomic.LoadInt64(&bytesBufferCounter))

  // 【關鍵】分塊操作,每個塊就是一個 bytes.Buffer
  // 根據 chunkSize 從 partReader 中讀取數據,並將讀取的數據保存到 bytes.Buffer 對象中
  limitedReader := io.LimitReader(partReader, int64(chunkSize))

  bytesBuffer.Reset()

  dataSize, err := bytesBuffer.ReadFrom(limitedReader)

  // 處理讀取數據時可能出現的錯誤,以及在讀取完整個文件時的處理
  // ...

  wg.Add(1)
  // 開啓 goroutine 處理
  go func(offset int64) {
   defer func() {
    // 將 bytes.Buffer 對象歸還對象池
    bufPool.Put(bytesBuffer)
    atomic.AddInt64(&bytesBufferCounter, -1)
    // 通知其他 goroutine 可以使用更多的 bytes.Buffer 對象
    bytesBufferLimitCond.Signal()
    wg.Done()
   }()

   // 【關鍵】上傳數據塊
   chunks, toChunkErr := fs.dataToChunk(fileName, contentType, bytesBuffer.Bytes(), offset, so)

   if toChunkErr != nil {
    // 記錄上傳錯誤
    uploadErrLock.Lock()
    if uploadErr == nil {
     uploadErr = toChunkErr
    }
    uploadErrLock.Unlock()
   }
   if chunks != nil {
    fileChunksLock.Lock()
    fileChunksSize := len(fileChunks) + len(chunks)
    for _, chunk := range chunks {
     // 【關鍵】將當前上傳的數據塊添加到 fileChunks 列表中
     fileChunks = append(fileChunks, chunk)
     glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, fileChunksSize, chunk.FileId, offset, offset+int64(chunk.Size))
    }
    fileChunksLock.Unlock()
   }
  }(chunkOffset)

  // 更新已經讀取的數據塊的大小
  chunkOffset = chunkOffset + dataSize

  if dataSize < int64(chunkSize) {
   // 已經讀取完整個文件
   break
  }
 }

 wg.Wait()

 if uploadErr != nil {
  // 上傳出錯,刪除 fileChunks
  fs.filer.DeleteChunks(fileChunks)
  return nil, md5Hash, 0, uploadErr, nil
 }
 // 【關鍵】對已經上傳的數據塊,即 fileChunks 進行排序,以便後續可以正確地進行數據合併
 slices.SortFunc(fileChunks, func(a, b *filer_pb.FileChunk) bool {
  return a.Offset < b.Offset
 })
 // 返回 fileChunks 給調用方保存
 return fileChunks, md5Hash, chunkOffset, nil, smallContent
}

文件的分塊操作都是在 Filer Server 完成的。而其中上傳數據塊的 fs.dataToChunk 方法會與 Master Server 進行交互。

該方法首先會調用 fs.assignNewFileInfo 向 Master Server 請求分配一個新的文件 ID(fid)以及上傳 URL :

fileId, urlLocation, auth, uploadErr = fs.assignNewFileInfo(so)
if uploadErr != nil {
 // ...
 return uploadErr
}

然後使用分配的 fid 調用上傳 URL 上傳數據塊:

uploadResult, uploadErr, _ = fs.doUpload(urlLocation, dataReader, fileName, contentType, nil, auth)
if uploadErr != nil {
 // ...
 return uploadErr
}

這個由 Master Server 所分配的上傳 URL ,實際就是 Volume Server 的上傳地址,例 http://127.0.0.1:8080/14,1f343c431d ,其中 14,1f343c431d 就是文件 ID ,其實這個文件 ID 更準確地說應該是代表一個數據塊的文件 ID。

SeaweedFS 會根據 maxMB 參數,來把文件拆分成多個塊存儲,默認大小是 4MB 。即一個 100MB 大小的文件,上傳到 SeaweedFS 後會被分成 25 個塊存儲,也就是申請分配了 25 個文件 ID 。

f.maxMB = cmdFiler.Flag.Int("maxMB", 4, "split files larger than the limit")

到這裏,總算捋清流程了。

那還有一個 S3 接口的文件上傳呢?

不用擔心,SeaweedFS S3 只是做了一個 API 的代理轉發,依舊轉發到 Filer Server 自身提供的 API 接口,邏輯依舊和上面一致,代碼位置在 weed/s3api/s3api_object_handlers.go

// 這裏的 uploadUrl 實際就是 Filer Server 的地址
// 例如在名稱爲 test 的 S3 Bucket 中上傳 test.txt 文件
// 則 uploadUrl 爲: http://127.0.0.1:8888/buckets/test/test.txt
etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader, "")

下載文件

和上傳文件一樣,SeaweedFS S3 爲文件下載做了一個代理轉發,轉發到 Filer Server 自身提供的 API 接口:

// 這裏的 destUrl 實際就是 Filer Server 的地址
// 例如要下載 test Bucket 中的 test.txt 文件
// 則 destUrl 爲: http://127.0.0.1:8888/buckets/test/test.txt
s3a.proxyToFiler(w, r, destUrl, false, passThroughResponse)

所以,當下載一個文件時:

$ curl http://127.0.0.1:8888/test.txt
hello test.txt

直接來看 weed/server/filer_server_handlers_read.goGetOrHeadHandler 接口:

func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) {

 // ...
 // 從 URL 中獲取文件或文件夾路徑

 // 根據文件或文件夾的完整路徑從元數據數據庫中查找出 Entry 記錄(即文件的元數據信息)

 // 若是文件夾,則列出文件夾下的文件
 // ...

 // 如果指定了 metadata=true 參數,則直接返回文件或文件夾的元數據信息
 if query.Get("metadata") == "true" {
  // ...
  return
 }

 // 減少服務器帶寬
 // 通過 Etag 資源標識對比資源是否發生變化
 etag := filer.ETagEntry(entry)
 if checkPreconditions(w, r, entry) {
  // 如果資源未發生改變,則返回 304 Not Modified 響應,不返回具體的資源
  // 客戶端可以直接讀取緩存中的數據
  return
 }

 // 設置 ETag 標識到響應頭
 setEtag(w, etag)

 // ...

 // 這裏是用來處理獲取圖片文件的邏輯
 if rangeReq := r.Header.Get("Range"); rangeReq == "" {
  // ...
 }

 // 獲取普通文件核心邏輯
 processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
  // 偏移量從請求頭中獲取,例 Range: bytes=100-199
  // 若無指定偏移量,默認爲 0
  // 判斷請求的範圍是否在文件的內容大小範圍內
  if offset+size <= int64(len(entry.Content)) {
   // ...
   return err
  }
  // 從元數據數據庫獲取到的chunks信息
  chunks := entry.GetChunks()
  // 判斷文件是否只存在於遠程存儲中,例如 AWS S3 、Google Cloud Storage 等
  if entry.IsInRemoteOnly() {
   // 將遠程對象緩存到本地集羣,並更新新的chunks
   // ...
  }

  // 【核心】開始讀取文件並寫入 HTTP 響應
  // MasterClient :Master 節點的客戶端
  // chunks :要讀取的文件數據塊列表
  // offset :請求的文件內容的起始位置
  // size :請求的文件內容的大小
  // DownloadMaxBytesPs :下載速率的限制,單位是字節/秒
  err = filer.StreamContentWithThrottler(fs.filer.MasterClient, writer, chunks, offset, size, fs.option.DownloadMaxBytesPs)
  if err != nil {
   stats.FilerRequestCounter.WithLabelValues(stats.ErrorReadStream).Inc()
   glog.Errorf("failed to stream content %s: %v", r.URL, err)
  }
  return err
 })
}

根據代碼,我們可以直接通過 metadata=true 查詢參數查看文件的元數據信息:

$ curl http://127.0.0.1:8888/test.txt?metadata=true
{"FullPath":"/test.txt","Mtime":"2023-04-23T17:18:37+08:00","Crtime":"2023-04-23T17:18:37+08:00","Mode":432,"Uid":4294967295,"Gid":4294967295,"Mime":"text/plain","TtlSec":0,"UserName":"","GroupNames":null,"SymlinkTarget":"","Md5":"wuSNy045Bd4p8mTjIc40cg==","FileSize":14,"Rdev":0,"Inode":0,"Extended":null,"chunks":[{"file_id":"14,1f343c431d","size":14,"modified_ts_ns":1682241517592601300,"e_tag":"wuSNy045Bd4p8mTjIc40cg==","fid":{"volume_id":14,"file_key":31,"cookie":876364573},"is_compressed":true}],"HardLinkId":null,"HardLinkCounter":0,"Content":null,"Remote":null,"Quota":0}

其中最重要的就是 chunks 信息,裏面定義了該文件的所有數據塊信息,只要把所有數據塊拼湊一起,就可以還原出整個文件。文件大小的原因,這裏剛好只有一個塊,其文件 ID 爲 14,1f343c431d

繼續解讀文件下載的核心方法 filer.StreamContentWithThrottler ,首先獲取所有文件 ID 所對應的 URL 列表:

// 將 chunks 轉換爲視圖列表 chunkViews
chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)

fileId2Url := make(map[string][]string)

// 通過 chunkViews.Front() 獲取 chunkViews 列表的頭部元素,然後在每次迭代中將 x 移動到下一個元素,直到遍歷完整個列表
for x := chunkViews.Front(); x != nil; x = x.Next {
 // 從 x.Value 中獲取 chunkView 對象
 chunkView := x.Value
 var urlStrings []string
 var err error
 // 獲取 chunkView 對應的文件 ID 的 URL 列表,並將 URL 列表存儲在 urlStrings 變量中
 // 在分佈式系統中,網絡故障和其他因素可能導致某些請求失敗,因此需要多次嘗試獲取 URL 列表,以提高獲取成功的概率
 for _, backoff := range getLookupFileIdBackoffSchedule {
  urlStrings, err = masterClient.GetLookupFileIdFunction()(chunkView.FileId)
  if err == nil && len(urlStrings) > 0 {
   break
  }
  glog.V(4).Infof("waiting for chunk: %s", chunkView.FileId)
  time.Sleep(backoff)
 }
 // 錯誤處理
 // ...
 fileId2Url[chunkView.FileId] = urlStrings
}

然後,通過獲取到的 URL 列表下載文件的所有 chunk :

// 下載速度限制器
downloadThrottler := util.NewWriteThrottler(downloadMaxBytesPs)
remaining := size
// 通過遍歷 chunkViews 列表來下載每個 chunk
for x := chunkViews.Front(); x != nil; x = x.Next {
 chunkView := x.Value
 // 檢查文件偏移量
 if offset < chunkView.ViewOffset {
  // ...
 }
 urlStrings := fileId2Url[chunkView.FileId]
 start := time.Now()
 // 【核心】從 URL 列表中讀取 chunkView 的數據,並將數據寫入到 writer 中給到客戶端
 err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize))
 // 更新文件偏移量
 offset += int64(chunkView.ViewSize)
 // 更新剩餘數據大小
 remaining -= int64(chunkView.ViewSize)
 // ...
}
// 檢查文件的所有數據是否都已經成功下載
if remaining > 0 {
 glog.V(4).Infof("zero [%d,%d)", offset, offset+remaining)
 err := writeZero(writer, remaining)
 if err != nil {
  return fmt.Errorf("write zero [%d,%d)", offset, offset+remaining)
 }
}

可以總結出,下載文件本質也是和 Master Server 交互,通過文件 ID 獲取到對應 Volume Server 的數據塊下載地址列表,按照列表順序請求下載數據塊,最後重新整合成了一個完整的文件返回給客戶端。

最後,附上文件下載的流程:

參考資料

[1]

seaweedfs 3.46: https://github.com/seaweedfs/seaweedfs/tree/3.46

[2]

github.com/seaweedfs/raft: https://github.com/seaweedfs/raft/tree/v1.1.0

[3]

wiki/Filer-Stores: https://github.com/seaweedfs/seaweedfs/wiki/Filer-Stores

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/apkJ1fG7jwdlC6D4KH0Dkw