Golang 中的 Pipe 使用
主要記錄一下 io.Pipe
和 os.Pipe
的使用
Golang 版本爲 1.13 ,實驗環境爲 MacOS
io.Pipe
基礎
func Pipe() (*PipeReader, *PipeWriter)
如果閱讀其源代碼,它內部有一個 pipe
對象,將其 Read
相關功能和 Write
相關功能拆分成了 PipeReader
和 PipeWriter
暴露出去。
PipeReader.Read()
方法寫入數據,PipeWriter.Write()
讀取數據。由於其實現了接口 io.ReadCloser
和 io.WriteCloser
。所以可以使用更高層的工具函數來操作它們。
對於 PipeReader
,可以使用 bytes.Buffer.ReadFrom
或者 bufio.NewScanner
。
對於 PipeWriter
可以使用 fmt.Fprintf()
或者 bufio.NewWriter
簡單例子
這裏寫一個簡單的例子,在 goroutine 中,每隔 1 秒向 PipeWriter
中寫入當前的時間,從 PipeReader
讀取寫入的數據。不使用任何工具函數
func main() {
r, w := io.Pipe()
go func() {
for {
time.Sleep(time.Second)
w.Write([]byte(time.Now().String()))
}
}()
for {
dataRead := make([]byte, 256)
n, _ := r.Read(dataRead)
fmt.Println(string(dataRead[:n]))
}
}
輸出如下
2020-07-05 17:44:21.674781 +0800 CST m=+1.005118043
2020-07-05 17:44:22.677565 +0800 CST m=+2.007942604
2020-07-05 17:44:23.680083 +0800 CST m=+3.010501799
...
使用高級函數操作
使用 bufio.NewWriter
封裝一層 PipeWriter
,使用 bufio.NewScanner
封裝一層 PipeReader
實現跟上述代碼相同的功能:
func main() {
r, w := io.Pipe()
go func() {
writer := bufio.NewWriter(w)
for {
time.Sleep(time.Second)
writer.WriteString(time.Now().String() + "\n")
}
}()
scanner := bufio.NewScanner(r)
for scanner.Scan() {
fmt.Println(scanner.Text())
}
}
writer.Flush()
如果被註釋掉的話,會有很長一段時間無數據輸出。查看源碼發現,其默認的 buffer 大小爲 4096
只有當存入的數據大小大於 4096
比特時,它纔會調用 PipeWriter.Write()
寫入數據。解決方案就是把註釋的那行解除註釋,每次寫入時清空一下緩存就可以了。
其實上面的代碼還是有問題的
假設只希望輸出限定個數的時間呢?比如說 3 次。把上面的無限次循環改成優先次不就行了嘛。
func main() {
go func() {
writer := bufio.NewWriter(w)
for i := 0; i < 3; i++ {
time.Sleep(time.Second)
writer.WriteString(time.Now().String() + "\n")
writer.Flush()
}
}()
scanner := bufio.NewScanner(r)
for scanner.Scan() {
fmt.Println(scanner.Text())
}
}
執行代碼,發現最後出現了死鎖的情況,定位到出現死鎖的位置
func (p *pipe) Read(b []byte) (n int, err error) {
select {
case bw := <-p.wrCh:
nr := copy(b, bw)
p.rdCh <- nr
return nr, nil
case <-p.done:
return 0, p.readCloseError()
}
}
應該是這個的問題<-p.wrCh
。這個 channel 用於接收從 pipe.Write()
中發送來的數據
func (p *pipe) Write(b []byte) (n int, err error) {
for once := true; once || len(b) > 0; once = false {
select {
case p.wrCh <- b:
nw := <-p.rdCh
b = b[nw:]
n += nw
case <-p.done:
return n, p.writeCloseError()
}
}
return n, nil
}
但是由於數據已經全部發送完畢,所以這裏的 Write
函數已經退出了。所以 case bw := <-p.wrCh
不可能得到數據了。那麼就只能希望從 case <-p.done
中獲得數據,那個就是調用 pipe.CloseWrite()
或者 pipe.CloseRead()
,也就是調用 PipeWriter.Close()
或者調用 PipeReader.Close()
。
把那段代碼中的 writer.Close()
解除註釋就可以了。
由於 pipe.CloseWrite()
和 pipe.CloseRead()
是一樣的,所以 PipeWriter
和 PipeReader
只需要調用一次即可。
os.Pipe
func Pipe() (r *File, w *File, err error)
這個是對操作系統文件句柄的封裝。現在使用這個函數實現和上面類似的功能
inR
和 inW
這一對 Pipe 用於向啓動的子進程中傳入數據
outR
和 outW
這一對 Pipe 用於子進程向外部輸出結果
func main() {
inR, inW, _ := os.Pipe()
outR, outW, _ := os.Pipe()
done := make(chan struct{})
process, _ := os.StartProcess("/bin/sh", nil, &os.ProcAttr{
Files: []*os.File{inR, outW, outW}})
go func() {
writer := bufio.NewWriter(inW)
for i := 0; i < 3; i++ {
time.Sleep(time.Second)
writer.WriteString("date\n")
writer.Flush()
}
inW.Close()
outW.Close()
}()
go func() {
scanner := bufio.NewScanner(outR)
for scanner.Scan() {
fmt.Println(scanner.Text())
}
process.Signal(os.Kill)
done <- struct{}{}
fmt.Println("finish")
}()
process.Wait()
<-done
}
輸出類似於如下數據
Sun Jul 5 18:36:52 CST 2020
Sun Jul 5 18:36:53 CST 2020
Sun Jul 5 18:36:54 CST 2020
finish
另一個比較簡單的例子是調用 ls
查看當前目錄,這裏使用 bytes.Buffer.ReadFrom
讀取數據
func main() {
inR, inW, _ := os.Pipe()
outR, outW, _ := os.Pipe()
dir, _ := filepath.Abs(filepath.Dir(os.Args[0]))
process, _ := os.StartProcess("/bin/sh", nil, &os.ProcAttr{
Files: []*os.File{inR, outW, outW},
Dir: dir,
})
go func() {
writer := bufio.NewWriter(inW)
writer.WriteString("ls -a")
writer.Flush()
inW.Close()
outW.Close()
}()
process.Wait()
buffer := new(bytes.Buffer)
buffer.ReadFrom(outR)
fmt.Println(buffer.String())
}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://blog.schwarzeni.com/2020/07/05/Golang-%E4%B8%AD%E7%9A%84-Pipe-%E4%BD%BF%E7%94%A8/