我在服务器上工作,并决定通过使用 goroutines 和上下文来摆脱传统的异步处理长时间运行的请求(pub/sub 等)。我的想法是接受一个请求并启动一个带有新超时上下文的 goroutine,无论初始请求上下文是否被取消(即用户刷新),它将完成处理。
我以前在单个端点上做过这个,但这次我想做一个通用的可重用包装器,我可以给出一个超时并放入一个异步代码块(下面显示的是工作代码)。
type ContextSplitter struct {
InitialCtx context.Context
Timeout time.Duration
}
func NewContextSplitter(timeout time.Duration) *ContextSplitter {
return &ContextSplitter{
InitialCtx: context.Background(),
Timeout: timeout,
}
}
func (c *ContextSplitter) Do(worker func(ctx context.Context) error) error {
var wg sync.WaitGroup
errs := make(chan error, 1)
newCtx, cancel := context.WithTimeout(context.Background(), c.Timeout)
defer cancel()
wg.Add(1)
// run the worker
go func(ctx context.Context) {
defer wg.Done()
defer func() {
if r := recover(); r != nil {
// if the worker panics, send the panic error to the errs channel
errs <- fmt.Errorf("worker panic: %v", r)
}
}()
// call the worker function and send any returned errors to the errs channel
errs <- worker(ctx)
}(newCtx)
// create a sync.Once object to ensure that the done channel is only closed once
doneOnce := sync.Once{}
done := make(chan bool, 1)
// run a routine to listen for when the worker finishes executing
go func() {
wg.Wait()
done <- true
doneOnce.Do(func() {
close(errs)
close(done)
})
}()
select {
case <-c.InitialCtx.Done():
// initial context cancelled, continue processing in background
return c.InitialCtx.Err()
case <-done:
// continue
}
// collect any errors that occurred during execution and return them
var err error
for e := range errs {
if e != nil {
err = multierr.Append(err, e)
}
}
return err
}
这样使用
err := NewContextSplitter(time.Minute*5).Do(func(newCtx context.Context) error {
// do some long-running tasks, including propagating the newCtx
obj, err := doStuff(newCtx, stuff)
}
我终于让它工作了,但我写这篇文章是因为我不完全确定它为什么工作,并且正在寻找对 golang、goroutines 和上下文的内部工作的一些见解。
主要修复最终是从
NewContextSplitter()
中删除初始(请求)上下文,即this
func NewContextSplitter(initialCtx context.Context, timeout time.Duration) *ContextSplitter {
return &ContextSplitter{
InitialCtx: initialCtx,
Timeout: timeout,
}
}
到这个
func NewContextSplitter(timeout time.Duration) *ContextSplitter {
return &ContextSplitter{
InitialCtx: context.Background(),
Timeout: timeout,
}
}
基本上,请求上下文将被取消,并且我的工作程序中采用新(超时)上下文的任何函数都将失败并显示
ErrContextCancelled
。我在想初始上下文取消已传播到我的 Do(worker)
并且 defer cancel()
将被调用,从而取消我的新超时上下文。奇怪的是,worker 不会通过 newCtx 取消从 defer
退出,而是继续使用 bunk context 运行并在 ErrContextCancelled
. 上出错
我的主要问题是:
initialCtx
传递给 NewContextSplitter
允许将取消该上下文传播到 Do(worker)
,因为 Do()
不采用该上下文?让我知道我是否可以提供更多背景信息 (ha)