Go 如何正确关闭通道
阅读原文时间:2023年08月29日阅读:5

Go 在通道这一块,没有内置函数判断通道是否已经关闭,也没有可以直接获取当前通道数量的方法。所以对于通道,Go 显示的不是那么优雅。另外,如果对通道进行了错误的使用,将会直接引发系统 panic,这是一件很危险的事情。

虽然没有判断通道是否关闭的内置函数,但是官方为我们提供了一种语法来判断通道是否关闭:

v, ok := <-ch
// 如果ok为true则代表通道已经关闭

利用这个语法,我们可以编写这样的代码判断通道是否关闭:

func TestChanClosed(t *testing.T) {
    var ch = make(chan int)

    // send
    go func() {
        for {
            ch <- 1
        }
    }()

    // receive
    go func() {
        for {
            if v, ok := <-ch; ok {
                t.Log(v)
            } else {
                t.Log("通道关闭")
                return
            }
        }
    }()

    time.Sleep(1 * time.Second)
}

也可以用 for range 简化语法,通道关闭后会主动退出 for 循环:

func TestChanClosed(t *testing.T) {
    var ch = make(chan int)

    // send
    go func() {
        for {
            ch <- 1
        }
    }()

    // receive
    go func() {
        for v := range ch {
            t.Log(v)
        }
        t.Log("通道关闭")
        return
    }()

    time.Sleep(1 * time.Second)
}

有三种情况会引发 panic:

// 会引发channel panic的情况一:发送数据到已经关闭的channel
// panic: send on closed channel
func TestChannelPanic1(t *testing.T) {
    var ch = make(chan int)
    close(ch)
    time.Sleep(10 * time.Millisecond)
    go func() {
        ch <- 1
    }()
    t.Log(<-ch)
}

// 会引发channel panic的情况一的另外一种:发送数据时关闭channel
// panic: send on closed channel
func TestChannelPanic11(t *testing.T) {
    var ch = make(chan int)
    go func() {
        go func() {
            // 没有接收数据的地方,此处会一直阻塞
            ch <- 1
        }()
    }()

    time.Sleep(20 * time.Millisecond)
    close(ch)
}

// 会引发channel panic的情况二:重复关闭channel
// panic: close of closed channel
func TestChannelPanic2(t *testing.T) {
    var ch = make(chan int)
    close(ch)
    close(ch)
}

// 会引发channel panic的情况三:未初始化关闭
// panic: close of nil channel
func TestChannelPanic3(t *testing.T) {
    var ch chan int
    close(ch)
}

我们在实际的业务中应该避免这三种不同的 panic,未初始化就关闭的情况较为少见,也不容易犯错误,重要的是要防止关闭后发送数据和重复关闭通道。

在 go 中有一条原则:Channel Closing Principle,它是指不要从接收端关闭 channel,也不要关闭有多个并发发送者的 channel。只要我们严格遵守这个原则,就可以有效的避免panic。其实这个原则就是让我们规避关闭后发送重复关闭这两种情况。

为了应对关闭后发送数据这种情况,我们很容易想到Channel Closing Principle的第一句:不要从接收端关闭 channel。所以我们应该从发送端关闭 channel:

func TestSendClose(t *testing.T) {
    var (
        ch = make(chan int)
        wg = sync.WaitGroup{}
        // 10毫秒后通知发送端停止发送数据
        after = time.After(10 * time.Millisecond)
    )
    wg.Add(2)

    // send
    go func() {
        for {
            select {
            case <-after:
                close(ch)
                wg.Done()
                return
            default:
                ch <- 1
            }
        }
    }()

    // receive
    go func() {
        defer wg.Done()
        for v := range ch {
            t.Log(v)
        }
        return
    }()

    wg.Wait()
}

这种方式可以应对单发送者的情况,如果我们的程序有多个发送者,那么就要考虑Channel Closing Principle的第二句话:不要关闭有多个并发发送者的 channel。那么这种情况下,我们应该如何正确的回收通道呢?这个时候我们可以考虑引入一个额外的通道,当接收端不想再接收数据时,就发送数据到这个额外的通道中,来通知所有的发送端退出:

func TestManySendAndOneReceive(t *testing.T) {
    var (
        sender = 3
        wg     = sync.WaitGroup{}
        numCh  = make(chan int)
        stopCh = make(chan struct{})
        // 10毫秒后通知发送端停止发送数据
        after = time.After(10 * time.Millisecond)
    )
    wg.Add(1)

    // send
    for i := 0; i < sender; i++ {
        go func() {
            for {
                select {
                case <-stopCh:
                    fmt.Println("收到退出信号")
                    return
                case numCh <- 1:
                    //fmt.Println("发送成功", value)
                }
            }
        }()
    }

    // receive
    go func() {
        for {
            select {
            case v := <-numCh:
                fmt.Println("接收到数据", v)
            case <-after:
                close(stopCh)
                wg.Done()
                return
            }
        }
    }()

    wg.Wait()
}

看完这段代码,我们发现 numCh 这个通道是没有关闭语句的,那么这段代码会引发内存泄漏吗?答案是不会,因为我们正确退出了发送端和接收端的所有协程,等到这个通道没有任何代码使用后,Go 的垃圾回收会回收此通道。

那如果此时我们的程序变得更为复杂:有多个接收者和多个发送者,这个时候怎么办呢?我们可以引入另外一个中间者,当任意协程想关闭的时候,都通知这个中间者,所有协程也同时监听这个中间者,收到中间者的退出信号时,退出当前协程:

func TestManySendAndManyReceive(t *testing.T) {
    var (
        maxRandomNumber = 5000
        receiver        = 10
        sender          = 10
        wg              = sync.WaitGroup{}
        numCh           = make(chan int)
        stopCh          = make(chan struct{})
        toStop          = make(chan string, 1)
        stoppedBy       string
    )
    wg.Add(receiver)

    // moderator
    go func() {
        stoppedBy = <-toStop
        close(stopCh)
    }()

    // senders
    for i := 0; i < sender; i++ {
        go func(id string) {
            for {
                value := rand.Intn(maxRandomNumber)
                if value == 0 {
                    select {
                    case toStop <- "sender#" + id:
                    default:
                    }
                    return
                }

                // 提前关闭goroutine
                select {
                case <-stopCh:
                    return
                default:
                }

                select {
                case <-stopCh:
                    return
                case numCh <- value:
                }
            }
        }(strconv.Itoa(i))
    }

    // receivers
    for i := 0; i < receiver; i++ {
        go func(id string) {
            defer wg.Done()
            for {
                // 提前关闭goroutine
                select {
                case <-stopCh:
                    return
                default:
                }

                select {
                case <-stopCh:
                    return
                case value := <-numCh:
                    if value == maxRandomNumber-1 {
                        select {
                        case toStop <- "receiver#" + id:
                        default:
                        }
                        return
                    }

                    t.Log(value)
                }
            }
        }(strconv.Itoa(i))
    }

    wg.Wait()
    t.Log("stopped by", stoppedBy)
}

可以使用 sync.once 语法来避免重复关闭通道:

type MyChannel struct {
    C    chan interface{}
    once sync.Once
}

func NewMyChannel() *MyChannel {
    return &MyChannel{C: make(chan interface{})}
}

func (mc *MyChannel) SafeClose() {
    mc.once.Do(func(){
        close(mc.C)
    })
}

也可以使用 sync.Mutex 语法避免重复关闭通道:

type MyChannel struct {
    C      chan interface{}
    closed bool
    mutex  sync.Mutex
}

func NewMyChannel() *MyChannel {
    return &MyChannel{C: make(chan interface{})}
}

func (mc *MyChannel) SafeClose() {
    mc.mutex.Lock()
    if !mc.closed {
        close(mc.C)
        mc.closed = true
    }
    mc.mutex.Unlock()
}

func (mc *MyChannel) IsClosed() bool {
    mc.mutex.Lock()
    defer mc.mutex.Unlock()
    return mc.closed
}

如何正确关闭 gotoutine 和 channel 防止内存泄漏是一个重要的课题,如果在编码过程中,遇到了需要打破Channel Closing Principle原则的情况,一定要思考自己的代码设计是否合理。

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章