IO Pipeline 讀 Minio 源碼

IO Pipeline 不算什麼新鮮事兒,通過 io.Reader io.Writer 等接口,把多個流處理連接一起,只需返回 Reader, 直到調用 Read 函數時纔讀數據,高效節約內存。類比 Spark 流處理,transformation 時只是傳遞 RDD, 只有 Action 時纔會觸發數據計算

JSON Decoder 例子

舉一個從 http 讀取 json 數據的例子:

http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
  request := new(Person)
  decoder := json.NewDecoder(r.Body)
  err := decoder.Decode(&request)
  if err != nil {
     http.Error(w, err)
  }
  ......
})

我們不需要 ioutil.ReadAll 全部 body 再調用 Unmarshal, decoder 內置 buffer 流式解析即可。但是這個例子不完美,有很多問題

func decodeJSONBody(w http.ResponseWriter, r *http.Request, dst interface{}) error {
    if r.Header.Get("Content-Type") != "" {
        value, _ := header.ParseValueAndParams(r.Header, "Content-Type")
        if value != "application/json" {
            msg := "Content-Type header is not application/json"
            return &malformedRequest{status: http.StatusUnsupportedMediaType, msg: msg}
        }
    }

    r.Body = http.MaxBytesReader(w, r.Body, 1048576)

    dec := json.NewDecoder(r.Body)
    dec.DisallowUnknownFields()

    err := dec.Decode(&dst)
    if err != nil {
        var syntaxError *json.SyntaxError
        var unmarshalTypeError *json.UnmarshalTypeError

        switch {
        case errors.As(err, &syntaxError):
            msg := fmt.Sprintf("Request body contains badly-formed JSON (at position %d)", syntaxError.Offset)
            return &malformedRequest{status: http.StatusBadRequest, msg: msg}
            ......
        }
    }

 err = dec.Decode(&struct{}{})
 if err != io.EOF {
        msg := "Request body must only contain a single JSON object"
        return &malformedRequest{status: http.StatusBadRequest, msg: msg}
    }
}

上面是改進後的版本,看着舒服多了,這還只是一個 reader 的實現。在 minio 中,經常有 N 多個 io.Reader 或者 io.Writer 組合在一起,實現 io pipeline, 稍複雜一些

Minio 下載數據

略去錯誤處理,只看 getObjectHandler 主幹代碼

func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI ObjectLayer, bucket, object string, w http.ResponseWriter, r *http.Request) {
  ......
 gr, err := getObjectNInfo(ctx, bucket, object, rs, r.Header, readLock, opts)
  ......
 httpWriter := xioutil.WriteOnClose(w)
 if rs != nil || opts.PartNumber > 0 {
  statusCodeWritten = true
  w.WriteHeader(http.StatusPartialContent)
 }

 // Write object content to response body
 if _, err = xioutil.Copy(httpWriter, gr); err != nil {
    ......
 }
  ......
}

getObjectNInfo 調用後端具體實現,返回 GetObjectReader gr, 從 gr 中讀取數據寫回 http Writer ...

gr 實現有很多種,minio 支持 NAS,FS, EC 多種模式,可以從文件系統中讀數據,可以從 remote http 中讀取

1. FS 本地文件系統下載數據

GetObjectNInfo 定義在 fs-v1.go, 原理比較簡單, 根據 header 獲取要讀取文件的 offset, length 組裝後返回 objReaderFn

func (fs *FSObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) {
 ......

 objReaderFn, off, length, err := NewGetObjectReader(rs, objInfo, opts)
 if err != nil {
  ......
  return nil, err
 }

 // Read the object, doesn't exist returns an s3 compatible error.
 fsObjPath := pathJoin(fs.fsPath, bucket, object)
 readCloser, size, err := fsOpenFile(ctx, fsObjPath, off)
 if err != nil {
  ......
  return nil, err
 }

 closeFn := func() {
  readCloser.Close()
 }
 reader := io.LimitReader(readCloser, length)

 // Check if range is valid
 if off > size || off+length > size {
  ......
  return nil, err
 }

 return objReaderFn(reader, h, closeFn, rwPoolUnlocker, nsUnlocker)
}

NewGetObjectReader 代碼會處理壓縮或者加密的場景,內部還會構建 reader. fsOpenFile 打開文件後,還要封裝一層 io.LimitReader 獲取指定長度的數據

func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions) (
 fn ObjReaderFn, off, length int64, err error,
) {
 ......
 // Calculate range to read (different for encrypted/compressed objects)
 switch {
 case isCompressed:
  ......

 case isEncrypted:
  ......

  // We define a closure that performs decryption given
  // a reader that returns the desired range of
  // encrypted bytes. The header parameter is used to
  // provide encryption parameters.
  fn = func(inputReader io.Reader, h http.Header, cFns ...func()) (r *GetObjectReader, err error) {
   copySource := h.Get(xhttp.AmzServerSideEncryptionCopyCustomerAlgorithm) != ""

   // Attach decrypter on inputReader
   var decReader io.Reader
   decReader, err = DecryptBlocksRequestR(inputReader, h, seqNumber, partStart, oi, copySource)
   if err != nil {
    // Call the cleanup funcs
    for i := len(cFns) - 1; i >= 0; i-- {
     cFns[i]()
    }
    return nil, err
   }

   oi.ETag = getDecryptedETag(h, oi, false)

   // Apply the skipLen and limit on the
   // decrypted stream
   decReader = io.LimitReader(ioutil.NewSkipReader(decReader, skipLen), decRangeLength)

   // Assemble the GetObjectReader
   r = &GetObjectReader{
    ObjInfo:    oi,
    Reader:     decReader,
    cleanUpFns: cFns,
    opts:       opts,
   }
   return r, nil
  }

 default:
  off, length, err = rs.GetOffsetLength(oi.Size)
  if err != nil {
   return nil, 0, 0, err
  }
  fn = func(inputReader io.Reader, _ http.Header, cFns ...func()) (r *GetObjectReader, err error) {
   r = &GetObjectReader{
    ObjInfo:    oi,
    Reader:     inputReader,
    cleanUpFns: cFns,
    opts:       opts,
   }
   return r, nil
  }
 }
 return fn, off, length, nil
}

switch 分支會處理 isCompressed, isEncrypted, default 三種場景,區別是需要重新計算文件的 offset, length 然後再封裝對應的 io.Reader ...

2. EC 多機糾刪碼下載數據

func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) {
  ......
 fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts, true)
 if err != nil {
  return nil, toObjectErr(err, bucket, object)
 }

 if !fi.DataShardFixed() {
  diskMTime := pickValidDiskTimeWithQuorum(metaArr, fi.Erasure.DataBlocks)
  if !diskMTime.Equal(timeSentinel) && !diskMTime.IsZero() {
   for index := range onlineDisks {
    if onlineDisks[index] == OfflineDisk {
     continue
    }
    if !metaArr[index].IsValid() {
     continue
    }
    if !metaArr[index].AcceptableDelta(diskMTime, shardDiskTimeDelta) {
     // If disk mTime mismatches it is considered outdated
     // https://github.com/minio/minio/pull/13803
     //
     // This check only is active if we could find maximally
     // occurring disk mtimes that are somewhat same across
     // the quorum. Allowing to skip those shards which we
     // might think are wrong.
     onlineDisks[index] = OfflineDisk
    }
   }
  }
 }
  ......
 fn, off, length, err := NewGetObjectReader(rs, objInfo, opts)
 if err != nil {
  return nil, err
 }
 unlockOnDefer = false

 pr, pw := xioutil.WaitPipe()
 go func() {
  pw.CloseWithError(er.getObjectWithFileInfo(ctx, bucket, object, off, length, pw, fi, metaArr, onlineDisks))
 }()

 // Cleanup function to cause the go routine above to exit, in
 // case of incomplete read.
 pipeCloser := func() {
  pr.CloseWithError(nil)
 }

 return fn(pr, h, pipeCloser, nsUnlocker)
}

與 fs 本地文件系統的區別在於,需要從多個 onlineDisks 中讀取數據,並且可能是 remote 網絡請求

這裏用到了 xioutil.WaitPipe 底層是對 io.Pipe 的封裝,getObjectWithFileInfo 把數據寫入 pw 管道,上層調用 Read 從 pr 管道中讀取數據

func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, fi FileInfo, metaArr []FileInfo, onlineDisks []StorageAPI) error {
 // Reorder online disks based on erasure distribution order.
 // Reorder parts metadata based on erasure distribution order.
 onlineDisks, metaArr = shuffleDisksAndPartsMetadataByIndex(onlineDisks, metaArr, fi)

 ......
 var totalBytesRead int64
 erasure, err := NewErasure(ctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize)
 if err != nil {
  return toObjectErr(err, bucket, object)
 }

 var healOnce sync.Once

 // once we have obtained a common FileInfo i.e latest, we should stick
 // to single dataDir to read the content to avoid reading from some other
 // dataDir that has stale FileInfo{} to ensure that we fail appropriately
 // during reads and expect the same dataDir everywhere.
 dataDir := fi.DataDir
 for ; partIndex <= lastPartIndex; partIndex++ {
  if length == totalBytesRead {
   break
  }

  partNumber := fi.Parts[partIndex].Number

  // Save the current part name and size.
  partSize := fi.Parts[partIndex].Size

  partLength := partSize - partOffset
  // partLength should be adjusted so that we don't write more data than what was requested.
  if partLength > (length - totalBytesRead) {
   partLength = length - totalBytesRead
  }

  tillOffset := erasure.ShardFileOffset(partOffset, partLength, partSize)
  // Get the checksums of the current part.
  readers := make([]io.ReaderAt, len(onlineDisks))
  prefer := make([]bool, len(onlineDisks))
  for index, disk := range onlineDisks {
   if disk == OfflineDisk {
    continue
   }
   if !metaArr[index].IsValid() {
    continue
   }
   checksumInfo := metaArr[index].Erasure.GetChecksumInfo(partNumber)
   partPath := pathJoin(object, dataDir, fmt.Sprintf("part.%d", partNumber))
   readers[index] = newBitrotReader(disk, metaArr[index].Data, bucket, partPath, tillOffset,
    checksumInfo.Algorithm, checksumInfo.Hash, erasure.ShardSize())

   // Prefer local disks
   prefer[index] = disk.Hostname() == ""
  }

  written, err := erasure.Decode(ctx, writer, readers, partOffset, partLength, partSize, prefer)
  // Note: we should not be defer'ing the following closeBitrotReaders() call as
  // we are inside a for loop i.e if we use defer, we would accumulate a lot of open files by the time
  // we return from this function.
  closeBitrotReaders(readers)
  if err != nil {
   // If we have successfully written all the content that was asked
   // by the client, but we still see an error - this would mean
   // that we have some parts or data blocks missing or corrupted
   // - attempt a heal to successfully heal them for future calls.
   if written == partLength {
    var scan madmin.HealScanMode
    switch {
    case errors.Is(err, errFileNotFound):
     scan = madmin.HealNormalScan
    case errors.Is(err, errFileCorrupt):
     scan = madmin.HealDeepScan
    }
    switch scan {
    case madmin.HealNormalScan, madmin.HealDeepScan:
     healOnce.Do(func() {
      if _, healing := er.getOnlineDisksWithHealing(); !healing {
       go healObject(bucket, object, fi.VersionID, scan)
      }
     })
     // Healing is triggered and we have written
     // successfully the content to client for
     // the specific part, we should `nil` this error
     // and proceed forward, instead of throwing errors.
     err = nil
    }
   }
   if err != nil {
    return toObjectErr(err, bucket, object)
   }
  }
  for i, r := range readers {
   if r == nil {
    onlineDisks[i] = OfflineDisk
   }
  }
  // Track total bytes read from disk and written to the client.
  totalBytesRead += partLength
  // partOffset will be valid only for the first part, hence reset it to 0 for
  // the remaining parts.
  partOffset = 0
 } // End of read all parts loop.
 // Return success.
 return nil
}

newBitrotReader 封裝多個 reader, NewErasure 從 reader 中讀數據,調用 Decode 解碼讀取的數據,如果出現錯誤,那麼需要調用 healObject 嘗試修復,理論上 K+M 中至多可以損壞 M 份數據

如上圖所示,8 臺機器,每臺 16 塊硬盤,每塊硬盤 8T, 總大小 1PB. 如果 strip 條帶 K+M=16, 其中 M=4 的情況下,可用空間爲 768T,利用率 75%

至多可以損壞 32 塊硬盤,或者 2 臺機器宕機

小結

上面分析讀取,對於上傳對象邏輯也同理。Minio 代碼整體 20w 行, 涉及到了大部分對象存儲的知識,適合入門,值得一讀

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/b_FBTSMZtAw0KqE_3mnMew