我正在尝试使用通道递归列出目录树。
目前我得到一些文件列表然后它被卡在一个目录上。目录将发送给工作人员,但不会处理它。
如何在worker(if file.IsDir()
)中发送目录以便正确处理它并且还通知文件lister在递归完成后没有新文件要处理?
这是我目前的尝试:
package main
import (
"fmt"
"os"
"path/filepath"
"errors"
"log"
)
// Job for worker
type workerJob struct {
Root string
}
// Result of a worker
type workerResult struct {
Filename string
}
func worker(jobs chan workerJob, results chan<- workerResult, done chan bool) {
for j := range jobs {
log.Printf(`Directory: %#v`, j.Root)
dir, err := os.Open(j.Root)
if err != nil {
if os.IsPermission(err) {
// Skip if there's no permission
continue
}
continue
}
fInfo, err := dir.Readdir(-1)
dir.Close()
if err != nil {
if os.IsPermission(err) {
// Skip if there's no permission
continue
}
continue
}
for _, file := range fInfo {
fpath := filepath.Join(dir.Name(), file.Name())
if file.Mode().IsRegular() {
// is file
fs := uint64(file.Size())
if fs == 0 {
// Skip zero sized
continue
}
r := workerResult{
Filename: fpath,
}
log.Printf(`sent result: %#v`, r.Filename)
results <- r
} else if file.IsDir() {
// Send directory to be processed by the worker
nj := workerJob{
Root: fpath,
}
log.Printf(`sent new dir job: %#v`, nj.Root)
jobs <- nj
}
}
done <- true
}
}
func main() {
dir := `/tmp`
workerCount := 1
jobs := make(chan workerJob, workerCount)
results := make(chan workerResult)
readDone := make(chan bool)
// start N workers
for i := 0; i < workerCount; i++ {
go worker(jobs, results, readDone)
}
jobs <- workerJob{
Root: dir,
}
readloop:
for {
select {
case res := <-results:
log.Printf(`result=%#v`, res.Filename)
case _ = <-readDone:
log.Printf(`got stop`)
break readloop
}
}
}
这导致:
2018/07/12 14:37:29 Directory: "/tmp"
2018/07/12 14:37:29 sent result: "/tmp/.bashrc"
2018/07/12 14:37:29 result="/tmp/.bashrc"
2018/07/12 14:37:29 sent result: "/tmp/.bash_profile"
2018/07/12 14:37:29 result="/tmp/.bash_profile"
2018/07/12 14:37:29 sent result: "/tmp/.bash_logout"
2018/07/12 14:37:29 result="/tmp/.bash_logout"
2018/07/12 14:37:29 sent result: "/tmp/.xinitrc"
2018/07/12 14:37:29 result="/tmp/.xinitrc"
2018/07/12 14:37:29 sent new dir job: "/tmp/.config"
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [select]:
main.main()
+0x281
goroutine 5 [chan send]:
main.worker(0xc42005a060, 0xc420078060, 0xc4200780c0)
+0x4e7
created by main.main
+0x109
Process finished with exit code 2
如何修复僵局?
你注意到jobs <- nj
永远挂了。这是因为操作阻塞直到一个工人在range
循环中接收,并且只要它在那里阻塞,它就不能到达range
循环。
为了解决这个问题,你会产生一个新的goroutine来做到这一点。
go func() {
jobs <- nj
}()
还有一个问题:你的readDone
频道。
目前,每当你的worker
完成一项工作时,这个频道就会被发出,这导致select
随机选择准备频道的可能性select
中的func main()
选择它然后关闭系统,这使得所有剩余的工作和结果都丢失了。
要解决这部分问题,你应该使用sync.WaitGroup
。每次添加新工作时,都会调用wg.Add(1)
,每次工作完成工作时,都会调用wg.Done()
。在func main()
中,你将产生一个使用wg.Wait()
等待所有工作完成然后使用readDone
关闭系统的goroutine。
// One initial job
wg.Add(1)
go func() {
jobs <- workerJob{
Root: dir,
}
}()
// When all jobs finished, shutdown the system.
go func() {
wg.Wait()
readDone <- true
}()
关于改进代码的初步评论
蒂姆的评论似乎没有触及要领。在main()
结束时关闭通道并不重要,并且你的select
声明有default
情况也不重要。如果频道上有消息,则频道读取案例将运行。
这可能被认为是一个问题,虽然没有消息,你将通过default
案例反复旋转循环,这将导致CPU使用率激增(“忙等待”),所以是的,可能只是删除默认情况。
您还可以添加一个“停止”通道的情况,该通道使用标签打破for
循环(这是必需的,否则break
只是从select语句中断而我们再次循环):
readloop:
for {
select {
case res := <-results:
log.Printf(`result=%#v`, res.Filename)
case _ = <-stopChan:
break readloop
}
最后,您还应该将f
中的变量worker()
重命名为dir
,因为它是一个目录而不是文件。只是让代码更容易阅读。对于熟练使用该语言的程序员来说,代码应该像自然语言一样阅读。就这样,这句话,
fpath := filepath.Join(f.Name(), file.Name())
变
fpath := filepath.Join(dir.Name(), file.Name())
......你的眼睛/大脑更容易扫描。
为什么你的代码被破坏了
你有一个频道死锁。您没有注意到因为default
案件意味着从技术上讲,一个goroutine总是能够“进步”。否则运行时会引起恐慌:
fatal error: all goroutines are asleep - deadlock!
这是因为worker()
具有以下结构:
receive from channel
...
...
foreach dir in root:
send to channel
...
...
但在普通频道上,发送和接收都是阻塞操作。发送/接收的goroutine在其合作伙伴出现之前不会取得进展。
您可以使用缓冲通道来避免这种情况,但事先无法知道目录中将找到多少目录,因此缓冲区可能太小。我建议产生一个goroutine,以便它可以阻止而不影响整个worker()
循环:
go func() {
for _, file := range fInfo {
...
}
}()