在Go中实现限制读取命令行执行的结果长度

某产品遇到问题,随机的内存暴涨导致频繁OOM,甚至影响到服务器的正常运行。经查,原来是命令行执行外部程序,返回的数据太大,而这些数据都被读到了Go程序的内存。

原程序如下:

1
2
3
4
5
6
7
8
9
10
11
12
func Run(name string, args ...string) ([]byte, error) {
cmd := exec.Command(name, args...)
var buf bytes.Buffer
cmd.Stdout = &buf
if err := cmd.Start(); err != nil {
return nil, err
}
if err := cmd.Wait(); err != nil {
return nil, err
}
return buf.Bytes(), nil
}

当然,以上代码也可以用exec.Command(name, args...).Output()来实现,这样写是为了方便下文展开。

当然我们应该首先修改这个外部程序,但是有没有一种安全的方式让我们放心的读取命令行执行的结果呢?

利用LimitedReader限制读取的字节数

在io包中有一个LimitedReader结构体,它可以将一个io.Reader改造为一个限制读取字节数的io.LimitedReader,由于io.LimitedReader实现了io.Reader接口,因此可以等价替换。

1
2
3
4
5
6
7
8
func readLimit(r io.Reader, size uint64) ([]byte, error) {
var buf bytes.Buffer
n, err := buf.ReadFrom(&io.LimitedReader{R: r, N: int64(size + 1)})
if n > int64(size) {
return buf.Bytes()[:size], fmt.Errorf("data beyond limit: %v", size)
}
return buf.Bytes(), err
}

但是这样cmd.Stdout是一个io.Writer接口,我们如何获得命令行执行结果的io.Reader接口呢?答案是cmd.StdoutPipe()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func Run(name string, arg ...string) ([]byte, error) {
cmd := exec.Command(name, arg...)
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
if err := cmd.Start(); err != nil {
return nil, err
}
v, err := readLimit(stdout, 1024)
if err != nil {
return v, err
}
if err := cmd.Wait(); err != nil {
return nil, err
}
return v, nil
}

这样就实现了执行命令,并且最多读取返回结果的1024个字节的数据。

将标准输出与标准错误整合到一起

我们知道,每个UNIX进程都有3个数据流:标准输出(stdout),标准输入(stdin),标准错误(stderr)。如果我们只接收标准输出,那么外部程序的错误信息将丢失。所以我们需要将标准输出与标准错误整合到一起。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func Run(name string, arg ...string) ([]byte, error) {
cmd := exec.Command(name, arg...)
outReader, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
errReader, err := cmd.StderrPipe()
if err != nil {
return nil, err
}
stdout := io.MultiReader(outReader, errReader)
if err := cmd.Start(); err != nil {
return nil, err
}
v, err := readLimit(stdout, 1024)
if err != nil {
return v, err
}
if err := cmd.Wait(); err != nil {
return nil, err
}
return v, nil
}

借助Context实现超时处理

如果外部程序假死,我们的函数也将一直处于执行中的状态,因此超时处理十分必要。尽管利用time.Timer也可以实现,但是Go更推荐使用context.WithTimeout()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func Run(timeout time.Duration, name string, arg ...string) ([]byte, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
cmd := exec.CommandContext(ctx, name, arg...)
outReader, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
errReader, err := cmd.StderrPipe()
if err != nil {
return nil, err
}
stdout := io.MultiReader(outReader, errReader)
if err := cmd.Start(); err != nil {
return nil, err
}
v, err := readLimit(stdout, 1024)
if err != nil {
return v, err
}
errc := make(chan error, 1)
go func() {
errc <- cmd.Wait()
}()

select {
case <-ctx.Done():
return v, ctx.Err()
case err := <-errc:
return v, err
}
}

可能会有同学提问:为什么要select语句同时等待ctx.Done()cmd.Wait(),而不是由exec包自动等待ctx.Done(),并自动执行cmd.Process.Kill(),这样cmd.Wait()也能继续执行?原因有二:

  • 如果这样做,cmd.Wait()将接收到signal: killed这样的错误,而调用方无法区分这个外部程序是因超时被kill,还是被其他程序或系统kill
  • 查阅cmd.Wait()源码可知,有些情况下释放资源需要耗费一段时间,因此调用者可能在已经超时的情况下迟迟接收不到返回结果

利用函数式参数,我已经封装了一个非常方便的开源库,请参考:
https://github.com/elvinchan/util-collects/tree/master/command


——3.29更新:
在稳定运行一段时间后,服务器出现了僵尸进程的预警
Zombie process
原来是readLimit()返回错误后,没有继续执行cmd.Wait()导致的进程资源泄露。为了避免这种这种情况,已经可能出现的Panic。最后一段代码应改为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func Run(timeout time.Duration, name string, arg ...string) ([]byte, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
cmd := exec.CommandContext(ctx, name, arg...)
outReader, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
errReader, err := cmd.StderrPipe()
if err != nil {
return nil, err
}
stdout := io.MultiReader(outReader, errReader)
if err := cmd.Start(); err != nil {
return nil, err
}

var hadWait uint32
defer func() {
// prevent two approach:
// panic when reading from pipe
// read from pipe with error
if atomic.CompareAndSwapUint32(&hadWait, 0, 1) {
_ = cmd.Wait()
}
}()
v, err := readLimit(stdout, 1024)
if err != nil {
return v, err
}
errc := make(chan error, 1)
go func() {
if atomic.CompareAndSwapUint32(&hadWait, 0, 1) {
errc <- cmd.Wait()
}
}()

select {
case <-ctx.Done():
return v, ctx.Err()
case err := <-errc:
return v, err
}
}