同步 -> Go 中的异步处理和上下文

问题描述 投票:0回答:0

背景

我在服务器上工作,并决定通过使用 goroutines 和上下文来摆脱传统的异步处理长时间运行的请求(pub/sub 等)。我的想法是接受一个请求并启动一个带有新超时上下文的 goroutine,无论初始请求上下文是否被取消(即用户刷新),它将完成处理。

我以前在单个端点上做过这个,但这次我想做一个通用的可重用包装器,我可以给出一个超时并放入一个异步代码块(下面显示的是工作代码)。

type ContextSplitter struct {
    InitialCtx context.Context
    Timeout    time.Duration
}

func NewContextSplitter(timeout time.Duration) *ContextSplitter {
    return &ContextSplitter{
        InitialCtx: context.Background(),
        Timeout:    timeout,
    }
}

func (c *ContextSplitter) Do(worker func(ctx context.Context) error) error {
    var wg sync.WaitGroup
    errs := make(chan error, 1)
    newCtx, cancel := context.WithTimeout(context.Background(), c.Timeout)

    defer cancel()

    wg.Add(1)

    // run the worker
    go func(ctx context.Context) {
        defer wg.Done()
        defer func() {
            if r := recover(); r != nil {
                // if the worker panics, send the panic error to the errs channel
                errs <- fmt.Errorf("worker panic: %v", r)
            }
        }()

        // call the worker function and send any returned errors to the errs channel
        errs <- worker(ctx)
    }(newCtx)

    // create a sync.Once object to ensure that the done channel is only closed once
    doneOnce := sync.Once{}
    done := make(chan bool, 1)

    // run a routine to listen for when the worker finishes executing
    go func() {
        wg.Wait()
        done <- true
        doneOnce.Do(func() {
            close(errs)
            close(done)
        })
    }()

    select {
    case <-c.InitialCtx.Done():
        // initial context cancelled, continue processing in background
        return c.InitialCtx.Err()
    case <-done:
        // continue
    }

    // collect any errors that occurred during execution and return them
    var err error
    for e := range errs {
        if e != nil {
            err = multierr.Append(err, e)
        }
    }

    return err
}

这样使用

err := NewContextSplitter(time.Minute*5).Do(func(newCtx context.Context) error {
    // do some long-running tasks, including propagating the newCtx
    obj, err := doStuff(newCtx, stuff)
}

问题

我终于让它工作了,但我写这篇文章是因为我不完全确定它为什么工作,并且正在寻找对 golang、goroutines 和上下文的内部工作的一些见解。

主要修复最终是从

NewContextSplitter()
中删除初始(请求)上下文,即this

func NewContextSplitter(initialCtx context.Context, timeout time.Duration) *ContextSplitter {
    return &ContextSplitter{
        InitialCtx: initialCtx,
        Timeout:    timeout,
    }
}

到这个

func NewContextSplitter(timeout time.Duration) *ContextSplitter {
    return &ContextSplitter{
        InitialCtx: context.Background(),
        Timeout:    timeout,
    }
}

基本上,请求上下文将被取消,并且我的工作程序中采用新(超时)上下文的任何函数都将失败并显示

ErrContextCancelled
。我在想初始上下文取消已传播到我的
Do(worker)
并且
defer cancel()
将被调用,从而取消我的新超时上下文。奇怪的是,worker 不会通过 newCtx 取消从
defer
退出,而是继续使用 bunk context 运行并在
ErrContextCancelled
.

上出错

我的主要问题是:

  • 为什么将
    initialCtx
    传递给
    NewContextSplitter
    允许将取消该上下文传播到
    Do(worker)
    ,因为
    Do()
    不采用该上下文?
  • 为什么工人继续处理取消的上下文?
  • 我还遗漏了什么/任何建议吗?
  • 这是处理长时间运行任务的有效模式吗?

让我知道我是否可以提供更多背景信息 (ha)

go asynchronous server goroutine
© www.soinside.com 2019 - 2024. All rights reserved.