某产品遇到问题,随机的内存暴涨导致频繁OOM,甚至影响到服务器的正常运行。经查,原来是命令行执行外部程序,返回的数据太大,而这些数据都被读到了Go程序的内存。
原程序如下:1
2
3
4
5
6
7
8
9
10
11
12func Run(name string, args ...string) ([]byte, error) {
cmd := exec.Command(name, args...)
var buf bytes.Buffer
cmd.Stdout = &buf
if err := cmd.Start(); err != nil {
return nil, err
}
if err := cmd.Wait(); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
当然,以上代码也可以用exec.Command(name, args...).Output()
来实现,这样写是为了方便下文展开。
当然我们应该首先修改这个外部程序,但是有没有一种安全的方式让我们放心的读取命令行执行的结果呢?
利用LimitedReader
限制读取的字节数
在io包中有一个LimitedReader
结构体,它可以将一个io.Reader
改造为一个限制读取字节数的io.LimitedReader
,由于io.LimitedReader
实现了io.Reader
接口,因此可以等价替换。1
2
3
4
5
6
7
8func readLimit(r io.Reader, size uint64) ([]byte, error) {
var buf bytes.Buffer
n, err := buf.ReadFrom(&io.LimitedReader{R: r, N: int64(size + 1)})
if n > int64(size) {
return buf.Bytes()[:size], fmt.Errorf("data beyond limit: %v", size)
}
return buf.Bytes(), err
}
但是这样cmd.Stdout
是一个io.Writer
接口,我们如何获得命令行执行结果的io.Reader
接口呢?答案是cmd.StdoutPipe()
。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18func Run(name string, arg ...string) ([]byte, error) {
cmd := exec.Command(name, arg...)
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
if err := cmd.Start(); err != nil {
return nil, err
}
v, err := readLimit(stdout, 1024)
if err != nil {
return v, err
}
if err := cmd.Wait(); err != nil {
return nil, err
}
return v, nil
}
这样就实现了执行命令,并且最多读取返回结果的1024个字节的数据。
将标准输出与标准错误整合到一起
我们知道,每个UNIX进程都有3个数据流:标准输出(stdout),标准输入(stdin),标准错误(stderr)。如果我们只接收标准输出,那么外部程序的错误信息将丢失。所以我们需要将标准输出与标准错误整合到一起。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23func Run(name string, arg ...string) ([]byte, error) {
cmd := exec.Command(name, arg...)
outReader, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
errReader, err := cmd.StderrPipe()
if err != nil {
return nil, err
}
stdout := io.MultiReader(outReader, errReader)
if err := cmd.Start(); err != nil {
return nil, err
}
v, err := readLimit(stdout, 1024)
if err != nil {
return v, err
}
if err := cmd.Wait(); err != nil {
return nil, err
}
return v, nil
}
借助Context实现超时处理
如果外部程序假死,我们的函数也将一直处于执行中的状态,因此超时处理十分必要。尽管利用time.Timer
也可以实现,但是Go更推荐使用context.WithTimeout()
。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32func Run(timeout time.Duration, name string, arg ...string) ([]byte, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
cmd := exec.CommandContext(ctx, name, arg...)
outReader, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
errReader, err := cmd.StderrPipe()
if err != nil {
return nil, err
}
stdout := io.MultiReader(outReader, errReader)
if err := cmd.Start(); err != nil {
return nil, err
}
v, err := readLimit(stdout, 1024)
if err != nil {
return v, err
}
errc := make(chan error, 1)
go func() {
errc <- cmd.Wait()
}()
select {
case <-ctx.Done():
return v, ctx.Err()
case err := <-errc:
return v, err
}
}
可能会有同学提问:为什么要select语句同时等待ctx.Done()
和cmd.Wait()
,而不是由exec包自动等待ctx.Done()
,并自动执行cmd.Process.Kill()
,这样cmd.Wait()
也能继续执行?原因有二:
- 如果这样做,
cmd.Wait()
将接收到signal: killed
这样的错误,而调用方无法区分这个外部程序是因超时被kill,还是被其他程序或系统kill - 查阅
cmd.Wait()
源码可知,有些情况下释放资源需要耗费一段时间,因此调用者可能在已经超时的情况下迟迟接收不到返回结果
利用函数式参数,我已经封装了一个非常方便的开源库,请参考:
https://github.com/elvinchan/util-collects/tree/master/command
——3.29更新:
在稳定运行一段时间后,服务器出现了僵尸进程的预警
原来是readLimit()
返回错误后,没有继续执行cmd.Wait()
导致的进程资源泄露。为了避免这种这种情况,已经可能出现的Panic。最后一段代码应改为:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44func Run(timeout time.Duration, name string, arg ...string) ([]byte, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
cmd := exec.CommandContext(ctx, name, arg...)
outReader, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
errReader, err := cmd.StderrPipe()
if err != nil {
return nil, err
}
stdout := io.MultiReader(outReader, errReader)
if err := cmd.Start(); err != nil {
return nil, err
}
var hadWait uint32
defer func() {
// prevent two approach:
// panic when reading from pipe
// read from pipe with error
if atomic.CompareAndSwapUint32(&hadWait, 0, 1) {
_ = cmd.Wait()
}
}()
v, err := readLimit(stdout, 1024)
if err != nil {
return v, err
}
errc := make(chan error, 1)
go func() {
if atomic.CompareAndSwapUint32(&hadWait, 0, 1) {
errc <- cmd.Wait()
}
}()
select {
case <-ctx.Done():
return v, ctx.Err()
case err := <-errc:
return v, err
}
}