Golang 中的 Pipe 使用

主要記錄一下 io.Pipeos.Pipe 的使用

Golang 版本爲 1.13 ,實驗環境爲 MacOS

io.Pipe

基礎

func Pipe() (*PipeReader, *PipeWriter)

如果閱讀其源代碼,它內部有一個 pipe 對象,將其 Read 相關功能和 Write 相關功能拆分成了 PipeReaderPipeWriter 暴露出去。

PipeReader.Read() 方法寫入數據,PipeWriter.Write() 讀取數據。由於其實現了接口 io.ReadCloserio.WriteCloser 。所以可以使用更高層的工具函數來操作它們。

對於 PipeReader ,可以使用 bytes.Buffer.ReadFrom 或者 bufio.NewScanner

對於 PipeWriter 可以使用 fmt.Fprintf() 或者 bufio.NewWriter


簡單例子

這裏寫一個簡單的例子,在 goroutine 中,每隔 1 秒向 PipeWriter 中寫入當前的時間,從 PipeReader 讀取寫入的數據。不使用任何工具函數

func main() {
  r, w := io.Pipe()

  go func() {
    for {
      time.Sleep(time.Second)
      w.Write([]byte(time.Now().String()))
    }
  }()
  for {
    dataRead := make([]byte, 256)
    n, _ := r.Read(dataRead)
    fmt.Println(string(dataRead[:n]))
  }
}

輸出如下

2020-07-05 17:44:21.674781 +0800 CST m=+1.005118043
2020-07-05 17:44:22.677565 +0800 CST m=+2.007942604
2020-07-05 17:44:23.680083 +0800 CST m=+3.010501799
...

使用高級函數操作

使用 bufio.NewWriter 封裝一層 PipeWriter ,使用 bufio.NewScanner 封裝一層 PipeReader 實現跟上述代碼相同的功能:

func main() {
  r, w := io.Pipe()

  go func() {
    writer := bufio.NewWriter(w)
    for {
      time.Sleep(time.Second)
      writer.WriteString(time.Now().String() + "\n")
      
    }
  }()
  scanner := bufio.NewScanner(r)
  for scanner.Scan() {
    fmt.Println(scanner.Text())
  }
}

writer.Flush() 如果被註釋掉的話,會有很長一段時間無數據輸出。查看源碼發現,其默認的 buffer 大小爲 4096 只有當存入的數據大小大於 4096 比特時,它纔會調用 PipeWriter.Write() 寫入數據。解決方案就是把註釋的那行解除註釋,每次寫入時清空一下緩存就可以了。

其實上面的代碼還是有問題的

假設只希望輸出限定個數的時間呢?比如說 3 次。把上面的無限次循環改成優先次不就行了嘛。

func main() {
  go func() {
    writer := bufio.NewWriter(w)
    for i := 0; i < 3; i++ {
      time.Sleep(time.Second)
      writer.WriteString(time.Now().String() + "\n")
      writer.Flush()
    }
    
  }()
  scanner := bufio.NewScanner(r)
  for scanner.Scan() {
    fmt.Println(scanner.Text())
  }
}

執行代碼,發現最後出現了死鎖的情況,定位到出現死鎖的位置

func (p *pipe) Read(b []byte) (n int, err error) {


  select {
  case bw := <-p.wrCh: 
    nr := copy(b, bw)
    p.rdCh <- nr
    return nr, nil
  case <-p.done:
    return 0, p.readCloseError()
  }
}

應該是這個的問題<-p.wrCh 。這個 channel 用於接收從 pipe.Write() 中發送來的數據

func (p *pipe) Write(b []byte) (n int, err error) {

  for once := true; once || len(b) > 0; once = false {
    select {

    case p.wrCh <- b:
      nw := <-p.rdCh
      b = b[nw:]
      n += nw
    case <-p.done:
      return n, p.writeCloseError()
    }
  }
  return n, nil
}

但是由於數據已經全部發送完畢,所以這裏的 Write 函數已經退出了。所以 case bw := <-p.wrCh 不可能得到數據了。那麼就只能希望從 case <-p.done 中獲得數據,那個就是調用 pipe.CloseWrite() 或者 pipe.CloseRead() ,也就是調用 PipeWriter.Close() 或者調用 PipeReader.Close()

把那段代碼中的 writer.Close() 解除註釋就可以了。

由於 pipe.CloseWrite()pipe.CloseRead() 是一樣的,所以 PipeWriterPipeReader 只需要調用一次即可。


os.Pipe

func Pipe() (r *File, w *File, err error)

這個是對操作系統文件句柄的封裝。現在使用這個函數實現和上面類似的功能

inRinW 這一對 Pipe 用於向啓動的子進程中傳入數據

outRoutW 這一對 Pipe 用於子進程向外部輸出結果

func main() {
  inR, inW, _ := os.Pipe()
  outR, outW, _ := os.Pipe()
  done := make(chan struct{})

  process, _ := os.StartProcess("/bin/sh", nil, &os.ProcAttr{
    Files: []*os.File{inR, outW, outW}})

  go func() {
    writer := bufio.NewWriter(inW)
    for i := 0; i < 3; i++ {
      time.Sleep(time.Second)
      writer.WriteString("date\n")
      writer.Flush()
    }
    inW.Close()
    outW.Close()
  }()
  go func() {
    scanner := bufio.NewScanner(outR)
    for scanner.Scan() {
      fmt.Println(scanner.Text())
    }
    process.Signal(os.Kill)
    done <- struct{}{}
    fmt.Println("finish")
  }()

  process.Wait()
  <-done

}

輸出類似於如下數據

Sun Jul  5 18:36:52 CST 2020
Sun Jul  5 18:36:53 CST 2020
Sun Jul  5 18:36:54 CST 2020
finish

另一個比較簡單的例子是調用 ls 查看當前目錄,這裏使用 bytes.Buffer.ReadFrom 讀取數據

func main() {
  inR, inW, _ := os.Pipe()
  outR, outW, _ := os.Pipe()
  dir, _ := filepath.Abs(filepath.Dir(os.Args[0]))

  process, _ := os.StartProcess("/bin/sh", nil, &os.ProcAttr{
    Files: []*os.File{inR, outW, outW},
    Dir:   dir,
  })

  go func() {
    writer := bufio.NewWriter(inW)
    writer.WriteString("ls -a")
    writer.Flush()
    inW.Close()
    outW.Close()
  }()

  process.Wait()
  buffer := new(bytes.Buffer)
  buffer.ReadFrom(outR)
  fmt.Println(buffer.String())
}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://blog.schwarzeni.com/2020/07/05/Golang-%E4%B8%AD%E7%9A%84-Pipe-%E4%BD%BF%E7%94%A8/