这是 @Jimt 编写的 Go 中工作人员和控制器模式的一个很好的示例,用于回答 “是否有一些优雅的方法来暂停和恢复 golang 中的任何其他 goroutine?”
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
// Possible worker states.
const (
Stopped = 0
Paused = 1
Running = 2
)
// Maximum number of workers.
const WorkerCount = 1000
func main() {
// Launch workers.
var wg sync.WaitGroup
wg.Add(WorkerCount + 1)
workers := make([]chan int, WorkerCount)
for i := range workers {
workers[i] = make(chan int)
go func(i int) {
worker(i, workers[i])
wg.Done()
}(i)
}
// Launch controller routine.
go func() {
controller(workers)
wg.Done()
}()
// Wait for all goroutines to finish.
wg.Wait()
}
func worker(id int, ws <-chan int) {
state := Paused // Begin in the paused state.
for {
select {
case state = <-ws:
switch state {
case Stopped:
fmt.Printf("Worker %d: Stopped\n", id)
return
case Running:
fmt.Printf("Worker %d: Running\n", id)
case Paused:
fmt.Printf("Worker %d: Paused\n", id)
}
default:
// We use runtime.Gosched() to prevent a deadlock in this case.
// It will not be needed of work is performed here which yields
// to the scheduler.
runtime.Gosched()
if state == Paused {
break
}
// Do actual work here.
}
}
}
// controller handles the current state of all workers. They can be
// instructed to be either running, paused or stopped entirely.
func controller(workers []chan int) {
// Start workers
for i := range workers {
workers[i] <- Running
}
// Pause workers.
<-time.After(1e9)
for i := range workers {
workers[i] <- Paused
}
// Unpause workers.
<-time.After(1e9)
for i := range workers {
workers[i] <- Running
}
// Shutdown workers.
<-time.After(1e9)
for i := range workers {
close(workers[i])
}
}
但是这段代码也有一个问题:如果你想在
workers
退出时删除 worker()
中的工作通道,则会发生死锁。
如果你
close(workers[i])
,下次控制器写入它会导致恐慌,因为 go 无法写入关闭的通道。如果你使用一些互斥锁来保护它,那么它将被卡在workers[i] <- Running
上,因为worker
没有从通道读取任何内容,写入将被阻塞,并且互斥锁将导致死锁。您还可以为通道提供更大的缓冲区作为解决方法,但这还不够好。
所以我认为解决这个问题的最好方法是
worker()
退出时关闭通道,如果控制器发现通道关闭,它将跳过它并且不执行任何操作。但在这种情况下我找不到如何检查通道是否已关闭。如果我尝试读取控制器中的通道,控制器可能会被阻止。所以我现在很困惑。
PS:我尝试过恢复引发的恐慌,但它会关闭引发恐慌的 goroutine。在这种情况下,它将是控制器,因此没有用。
不过,我认为 Go 团队在 Go 的下一个版本中实现这个功能是有用的。
可以用一种 hacky 的方式来通过恢复引发的恐慌来尝试写入的通道。但是如果不读取读取通道,就无法检查它是否已关闭。
要么你会
v <- c
)v, ok <- c
)v, ok <- c
) (示例)v <- c
)从技术上讲,只有最后一个不从通道中读取,但这没什么用处。
没有办法编写一个安全的应用程序,您需要在不与其交互的情况下知道通道是否打开。
做你想做的事情的最好方法是使用两个渠道——一个用于工作,另一个表示改变状态的愿望(以及完成状态改变,如果这很重要)。
渠道便宜。 复杂的设计重载语义不是。
[还]
<-time.After(1e9)
这是一种非常令人困惑且不明显的写作方式
time.Sleep(time.Second)
让事情变得简单,每个人(包括你)都能理解。
我知道这个答案太晚了,我已经写了这个解决方案,Hacking Go run-time,这不安全,它可能会崩溃:
import (
"unsafe"
"reflect"
)
func isChanClosed(ch interface{}) bool {
if reflect.TypeOf(ch).Kind() != reflect.Chan {
panic("only channels!")
}
// get interface value pointer, from cgo_export
// typedef struct { void *t; void *v; } GoInterface;
// then get channel real pointer
cptr := *(*uintptr)(unsafe.Pointer(
unsafe.Pointer(uintptr(unsafe.Pointer(&ch)) + unsafe.Sizeof(uint(0))),
))
// this function will return true if chan.closed > 0
// see hchan on https://github.com/golang/go/blob/master/src/runtime/chan.go
// type hchan struct {
// qcount uint // total data in the queue
// dataqsiz uint // size of the circular queue
// buf unsafe.Pointer // points to an array of dataqsiz elements
// elemsize uint16
// closed uint32
// **
cptr += unsafe.Sizeof(uint(0))*2
cptr += unsafe.Sizeof(unsafe.Pointer(uintptr(0)))
cptr += unsafe.Sizeof(uint16(0))
return *(*uint32)(unsafe.Pointer(cptr)) > 0
}
嗯,你可以用
default
分支来检测它,因为关闭的通道会被选择,例如:下面的代码会选择default
,channel
,channel
,第一个选择不会被阻塞。
func main() {
ch := make(chan int)
go func() {
select {
case <-ch:
log.Printf("1.channel")
default:
log.Printf("1.default")
}
select {
case <-ch:
log.Printf("2.channel")
}
close(ch)
select {
case <-ch:
log.Printf("3.channel")
default:
log.Printf("3.default")
}
}()
time.Sleep(time.Second)
ch <- 1
time.Sleep(time.Second)
}
打印
2018/05/24 08:00:00 1.default
2018/05/24 08:00:01 2.channel
2018/05/24 08:00:01 3.channel
注意,请参考@Angad在此答案下的评论:
如果您使用缓冲通道并且它包含 未读数据
除了关闭频道之外,您还可以将频道设置为零。这样你就可以检查它是否为零。
操场上的例子: https://play.golang.org/p/v0f3d4DisCz
编辑: 这实际上是一个糟糕的解决方案,如下一个示例所示, 因为在函数中将通道设置为 nil 会破坏它: https://play.golang.org/p/YVE2-LV9TOp
我在多个并发 goroutine 中经常遇到这个问题。
这可能是也可能不是一个好的模式,但我为我的工作人员定义了一个结构体,其中包含工作人员状态的退出通道和字段:
type Worker struct {
data chan struct
quit chan bool
stopped bool
}
然后你可以让控制器为worker调用停止函数:
func (w *Worker) Stop() {
w.quit <- true
w.stopped = true
}
func (w *Worker) eventloop() {
for {
if w.Stopped {
return
}
select {
case d := <-w.data:
//DO something
if w.Stopped {
return
}
case <-w.quit:
return
}
}
}
这为您提供了一种很好的方法来让您的工作人员完全停止,而不会出现任何挂起或生成错误,这在容器中运行时尤其好。
ch1 := make(chan int)
ch2 := make(chan int)
go func(){
for i:=0; i<10; i++{
ch1 <- i
}
close(ch1)
}()
go func(){
for i:=10; i<15; i++{
ch2 <- i
}
close(ch2)
}()
ok1, ok2 := false, false
v := 0
for{
ok1, ok2 = true, true
select{
case v,ok1 = <-ch1:
if ok1 {fmt.Println(v)}
default:
}
select{
case v,ok2 = <-ch2:
if ok2 {fmt.Println(v)}
default:
}
if !ok1 && !ok2{return}
}
}
嗯,你可以做的是使用以下结构:
type MyChannel struct {
C chan T
closed bool
mutex sync.Mutex
}
func NewMyChannel() *MyChannel {
return &MyChannel{C: make(chan T)}
}
func (mc *MyChannel) SafeClose() {
mc.mutex.Lock()
defer mc.mutex.Unlock()
if !mc.closed {
close(mc.C)
mc.closed = true
}
}
func (mc *MyChannel) IsClosed() bool {
mc.mutex.Lock()
defer mc.mutex.Unlock()
return mc.closed
}
如果从通道发送或接收的所有 goroutine 都使用 SafeClose 方法,则可以避免不必要的恐慌。
来自文档:
通道可以通过内置函数 close 来关闭。接收操作符的多值分配形式报告在通道关闭之前是否发送了接收到的值。
https://golang.org/ref/spec#Receive_operator
Golang in Action 的示例显示了这种情况:
// This sample program demonstrates how to use an unbuffered
// channel to simulate a game of tennis between two goroutines.
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// wg is used to wait for the program to finish.
var wg sync.WaitGroup
func init() {
rand.Seed(time.Now().UnixNano())
}
// main is the entry point for all Go programs.
func main() {
// Create an unbuffered channel.
court := make(chan int)
// Add a count of two, one for each goroutine.
wg.Add(2)
// Launch two players.
go player("Nadal", court)
go player("Djokovic", court)
// Start the set.
court <- 1
// Wait for the game to finish.
wg.Wait()
}
// player simulates a person playing the game of tennis.
func player(name string, court chan int) {
// Schedule the call to Done to tell main we are done.
defer wg.Done()
for {
// Wait for the ball to be hit back to us.
ball, ok := <-court
fmt.Printf("ok %t\n", ok)
if !ok {
// If the channel was closed we won.
fmt.Printf("Player %s Won\n", name)
return
}
// Pick a random number and see if we miss the ball.
n := rand.Intn(100)
if n%13 == 0 {
fmt.Printf("Player %s Missed\n", name)
// Close the channel to signal we lost.
close(court)
return
}
// Display and then increment the hit count by one.
fmt.Printf("Player %s Hit %d\n", name, ball)
ball++
// Hit the ball back to the opposing player.
court <- ball
}
}
首先检查通道是否有元素会更容易,这将确保通道处于活动状态。
func isChanClosed(ch chan interface{}) bool {
if len(ch) == 0 {
select {
case _, ok := <-ch:
return !ok
}
}
return false
}
如果您收听此频道,您总是可以发现该频道已关闭。
case state, opened := <-ws:
if !opened {
// channel was closed
// return or made some final work
}
switch state {
case Stopped:
但请记住,您不能关闭一个通道两次。这会引起恐慌。