go channel 概述 - 管道
阅读原文时间:2023年07月11日阅读:1

概述

unix/linux OS 的一个进程的输出可以是另一个进程的输入,这些进程使用stdin与stdout设备作为通道,在进程之间传递数据。

同样的,GO中有io.Reader与io.Writer两个接口,如果要对一个设备进行读/写,调用实现了这两个接口的方法即可。

GO对管道的支持

func t1() {
cmd0 := exec.Command("echo","-n","how are you")
fmt.Println(cmd0)
}

exec.Command返回的对象的几个主要方法:

// Stdin specifies the process's standard input.  
//  
// If Stdin is nil, the process reads from the null device (os.DevNull).  
//  
// If Stdin is an \*os.File, the process's standard input is connected  
// directly to that file.  
//  
// Otherwise, during the execution of the command a separate  
// goroutine reads from Stdin and delivers that data to the command  
// over a pipe. In this case, Wait does not complete until the goroutine  
// stops copying, either because it has reached the end of Stdin  
// (EOF or a read error) or because writing to the pipe returned an error.  
Stdin io.Reader

// Stdout and Stderr specify the process's standard output and error.  
//  
// If either is nil, Run connects the corresponding file descriptor  
// to the null device (os.DevNull).  
//  
// If either is an \*os.File, the corresponding output from the process  
// is connected directly to that file.  
//  
// Otherwise, during the execution of the command a separate goroutine  
// reads from the process over a pipe and delivers that data to the  
// corresponding Writer. In this case, Wait does not complete until the  
// goroutine reaches EOF or encounters an error.  
//  
// If Stdout and Stderr are the same writer, and have a type that can  
// be compared with ==, at most one goroutine at a time will call Write.  
Stdout io.Writer  
Stderr io.Writer

示例

/*
参数传入的是命令组
通过buffer一次次读取返回结果,不怕返回的数据量大
如果命令是shell那样有|管道符,则使用Pip方法即可
*/
func PipCmd(cmd []string,useBufferIO… bool) string {
useBufferedIO := false

ll := len(useBufferIO)  
if ll > 0 {  
    useBufferedIO = useBufferIO\[0\]  
}  
cmd0 := Command(cmd)  
stdout0, err := cmd0.StdoutPipe()  
if err != nil {  
    fmt.Printf("Error: Couldn't obtain the stdout pipe for command : %s\\n", err)  
    return ""  
}  
defer stdout0.Close()  
if err := cmd0.Start(); err != nil {  
    fmt.Printf("Error: The command  can not be startup: %s\\n", err)  
    return ""  
}  
if !useBufferedIO {  
    var outputBuf0 bytes.Buffer  
    for {  
        tempOutput := make(\[\]byte, 1024)  
        n, err := stdout0.Read(tempOutput)  
        if err != nil {  
            if err == io.EOF {  
                break  
            } else {  
                fmt.Printf("Error: Couldn't read data from the pipe: %s\\n", err)  
                return ""  
            }  
        }  
        if n > 0 {  
            outputBuf0.Write(tempOutput\[:n\])  
        }  
    }  
    //fmt.Printf("%s\\n", outputBuf0.String())  
    res := fmt.Sprintf("%v",outputBuf0.String())  
    return res  
} else {  
    outputBuf0 := bufio.NewReader(stdout0)  
    var resBuffer bytes.Buffer

    for{  
        //outputBuf0 默认带一个4K的缓冲区,第二个参数为false表示读取完所有的行  
        output0, \_, err := outputBuf0.ReadLine()

        if err != nil {  
            if err == io.EOF{  
                break  
            }  
            fmt.Printf("Error: Couldn't read data from the pipe: %s\\n", err)  
            return ""  
        }  
        output0 = append(output0,'\\n')  
        resBuffer.Write(output0)  
    }  
    res := fmt.Sprintf("%v",resBuffer.String())  
    return res  
}  

}

func Command(cmds []string) *exec.Cmd {
name:= cmds[0]
cmd := &exec.Cmd{
Path: name,
Args: cmds,
}

if filepath.Base(name) == name {  
    if lp, err := exec.LookPath(name); err != nil {  
        //cmd.lookPathErr = err  
        ErrorHandle(err)  
    } else {  
        cmd.Path = lp  
    }  
}  
return cmd  

}

func a11() {
//命令行参数不能包含空格,比如-ht 是错的,-ht是对的
cmd := []string{"/opt/wks/go/dbm_go/src/dbm/consistency/consis_0.6.7", "-cht", "192.168.177.67", "-cpt", "3316", "-ht","114.67.105.113,192.168.177.67", "-pt","3306,3316", "-slot", "-json", "-db", "vodb", "-timeStart", "2020-09-11 14:09:27", "-pl", "2"}
res := tools.PipCmd(cmd,true)
fmt.Println(res)
}

管道的返回值

stdout0, err := cmd0.StdoutPipe()

// StdoutPipe returns a pipe that will be connected to the command's
// standard output when the command starts.
//
// Wait will close the pipe after seeing the command exit, so most callers
// need not close the pipe themselves. It is thus incorrect to call Wait
// before all reads from the pipe have completed.
// For the same reason, it is incorrect to call Run when using StdoutPipe.
// See the example for idiomatic usage.
func (c *Cmd) StdoutPipe() (io.ReadCloser, error) {
if c.Stdout != nil {
return nil, errors.New("exec: Stdout already set")
}
if c.Process != nil {
return nil, errors.New("exec: StdoutPipe after process started")
}
pr, pw, err := os.Pipe()
if err != nil {
return nil, err
}
c.Stdout = pw
c.closeAfterStart = append(c.closeAfterStart, pw)
c.closeAfterWait = append(c.closeAfterWait, pr)
return pr, nil
}

管道返回一个

io.ReadCloser

// ReadCloser is the interface that groups the basic Read and Close methods.
type ReadCloser interface {
Reader
Closer
}

不仅实现了读接口,还可以进行关闭操作

管道的内部实现

pr, pw, err := os.Pipe()

系统创建了一个管道Pipe,内部加了读锁syscall.RorkLock.RLock(),然后返回一个读文件句柄pr,和一个写文件句柄pw

// Pipe returns a connected pair of Files; reads from r return bytes written to w.
// It returns the files and an error, if any.
func Pipe() (r *File, w *File, err error) {
var p [2]int

e := syscall.Pipe2(p\[0:\], syscall.O\_CLOEXEC)  
// pipe2 was added in 2.6.27 and our minimum requirement is 2.6.23, so it  
// might not be implemented.  
if e == syscall.ENOSYS {  
    // See ../syscall/exec.go for description of lock.  
    syscall.ForkLock.RLock()  
    e = syscall.Pipe(p\[0:\])  
    if e != nil {  
        syscall.ForkLock.RUnlock()  
        return nil, nil, NewSyscallError("pipe", e)  
    }  
    syscall.CloseOnExec(p\[0\])  
    syscall.CloseOnExec(p\[1\])  
    syscall.ForkLock.RUnlock()  
} else if e != nil {  
    return nil, nil, NewSyscallError("pipe2", e)  
}

return newFile(uintptr(p\[0\]), "|0", kindPipe), newFile(uintptr(p\[1\]), "|1", kindPipe), nil  

}

读方法

// Reader is the interface that wraps the basic Read method.
//
// Read reads up to len(p) bytes into p. It returns the number of bytes
// read (0 <= n <= len(p)) and any error encountered. Even if Read // returns n < len(p), it may use all of p as scratch space during the call. // If some data is available but not len(p) bytes, Read conventionally // returns what is available instead of waiting for more. // // When Read encounters an error or end-of-file condition after // successfully reading n > 0 bytes, it returns the number of
// bytes read. It may return the (non-nil) error from the same call
// or return the error (and n == 0) from a subsequent call.
// An instance of this general case is that a Reader returning
// a non-zero number of bytes at the end of the input stream may
// return either err == EOF or err == nil. The next Read should
// return 0, EOF.
//
// Callers should always process the n > 0 bytes returned before
// considering the error err. Doing so correctly handles I/O errors
// that happen after reading some bytes and also both of the
// allowed EOF behaviors.
//
// Implementations of Read are discouraged from returning a
// zero byte count with a nil error, except when len(p) == 0.
// Callers should treat a return of 0 and nil as indicating that
// nothing happened; in particular it does not indicate EOF.
//
// Implementations must not retain p.
type Reader interface {
Read(p []byte) (n int, err error)
}

实现读接口的对象,有一个Read方法,可以将字节读到字节数组中; 与stdin设备对应,从该设备读取字节到内存。

exec.Cmd这是一个外部命令,它可以是一个静止的命令,或者一个正在运行的命令;提供启动/停止/管道/输入输入定向支持;其中管道返回一个读文件句柄和一个写文件句柄

exec.Cmd输入输出重定向

func runCmdWithPipe() {
fmt.Println("Run command `ps -ef | grep apipe`: ")
cmd1 := exec.Command("ps", "-ef")
cmd2 := exec.Command("grep", "apipe")
var outputBuf1 bytes.Buffer
cmd1.Stdout = &outputBuf1
if err := cmd1.Start(); err != nil {
fmt.Printf("Error: The first command can not be startup %s\n", err)
return
}
if err := cmd1.Wait(); err != nil {
fmt.Printf("Error: Couldn't wait for the first command: %s\n", err)
return
}
cmd2.Stdin = &outputBuf1
var outputBuf2 bytes.Buffer
cmd2.Stdout = &outputBuf2
if err := cmd2.Start(); err != nil {
fmt.Printf("Error: The second command can not be startup: %s\n", err)
return
}
if err := cmd2.Wait(); err != nil {
fmt.Printf("Error: Couldn't wait for the second command: %s\n", err)
return
}
fmt.Printf("%s\n", outputBuf2.Bytes())
}