我正在研究一个问题,我需要处理列表中的一堆项目。现在我只需要在配置的时间内可以处理的项目的结果,其他结果需要被丢弃。我编写了这个程序,它的一个非常简单的版本可以工作,但是扩展逻辑,我陷入困境,因为总是执行 switch 情况“全部完成”,或者执行“超时”但仍然给出所有结果。
package main
import (
"fmt"
"sync"
"time"
)
func doSomething(x int) int {
time.Sleep(time.Second * 2)
return x
}
func main() {
timeoutCfg := 5
done := make(chan struct{}, 1)
somethings := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
results := make(chan int, 10) // buffered channel to hold partial results
var wg sync.WaitGroup
// Launch goroutines to process tasks
for _, key := range somethings {
wg.Add(1)
go func(k int) {
defer wg.Done()
results <- doSomething(k)
}(key)
}
// Start the timer
timer := time.After(time.Second * time.Duration(timeoutCfg))
// Wait for all goroutines to finish or for timeout
go func() {
wg.Wait()
close(done)
close(results)
}()
select {
case <-timer:
fmt.Println("timeout")
for x := range results {
fmt.Println(x)
}
case <-done:
fmt.Println("done")
for x := range results {
fmt.Println(x)
}
}
}
好吧,先写同步代码:
package main
import (
"context"
"errors"
"fmt"
"time"
)
func doSomething(ctx context.Context, x int) (int, error) {
timer := time.NewTimer(700 * time.Millisecond)
defer timer.Stop()
select {
case <-ctx.Done(): // canceled
return 0, ctx.Err()
case <-timer.C:
}
return x, nil
}
func process(ctx context.Context, timeout time.Duration, somethings []int) ([]int, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
var results []int
var err error
for _, key := range somethings {
var result int
result, err = doSomething(ctx, key)
if err != nil {
break
}
results = append(results, result)
}
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return nil, err
}
return results, nil
}
func main() {
ctx := context.Background()
timeoutCfg := 5 * time.Second
somethings := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
results, err := process(ctx, timeoutCfg, somethings)
if err != nil {
fmt.Println(err)
} else {
for _, x := range results {
fmt.Println(x)
}
}
}
在 Go Playground 上尝试一下。这里我使用
context.Context
来取消并在第一个错误时中断循环。
context.DeadlineExceeded
时返回错误,否则返回(部分)结果。当您需要更多信息(结果是否完整)时,需要额外的布尔值。
太好了,到目前为止,让我们使
process
并发:
func process(ctx context.Context, timeout time.Duration, somethings []int) ([]int, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
eg, ctx := errgroup.WithContext(ctx)
var results resultCollector
for _, key := range somethings {
eg.Go(func() error {
result, err := doSomething(ctx, key)
if err == nil {
results.append(result)
}
return err
})
}
err := eg.Wait()
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return nil, err
}
return results.get(), nil
}
golang.org/x/sync/errgroup
,我们需要一个线程安全的append
,所以让我们写resultCollector
:
type resultCollector struct {
results []int
mu sync.Mutex
}
func (r *resultCollector) append(x int) {
r.mu.Lock()
defer r.mu.Unlock()
r.results = append(r.results, x)
}
func (r *resultCollector) get() []int {
r.mu.Lock()
defer r.mu.Unlock()
results := r.results
r.results = nil
return results
}
在 Go Playground 上尝试一下。诀窍是首选同步函数并使用
context.Context
进行取消。