使用OpenAi流API时如何处理HTTP请求超时,并防止“ HTTP2:响应主体关闭”错误? 我正在处理一个使用OpenAI API进行流聊天完成的应用程序。流媒体效果很好,但是如果API在一定时期内没有响应,我想处理超时。 WH ...

问题描述 投票:0回答:0

func (g *gptAdaptorClient) CreateChatCompletionStream(messages []map[string]string, model string, maxTokens int, temperature float64) (<-chan string, error) { url := "https://api.openai.com/v1/chat/completions" // Payload to send to the API payload := map[string]interface{}{ "model": model, "messages": messages, "max_tokens": maxTokens, "temperature": temperature, "stream": true, // Request to return as a stream } requestBody, err := json.Marshal(payload) if err != nil { return nil, fmt.Errorf("failed to marshal payload: %w", err) } // Create HTTP request req, err := http.NewRequest("POST", url, bytes.NewBuffer(requestBody)) if err != nil { return nil, fmt.Errorf("failed to create request: %w", err) } g.addCommonHeaders(req) // Add necessary headers // Send request and get response resp, err := g.Client.Do(req) if err != nil { return nil, fmt.Errorf("failed to send request: %w", err) } // Check HTTP status code if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) return nil, fmt.Errorf("API error: %s", string(body)) } // Create channel to pass stream data dataChannel := make(chan string) // Process stream in a goroutine go func() { defer close(dataChannel) defer resp.Body.Close() scanner := bufio.NewScanner(resp.Body) for scanner.Scan() { line := scanner.Text() // Check if the line doesn't contain data if len(line) < 6 || line[:6] != "data: " { continue } // Extract the JSON content after "data: " chunk := line[6:] if chunk == "[DONE]" { break } // Send chunk to the channel dataChannel <- chunk } // Check for scanner errors (if any) if err := scanner.Err(); err != nil { fmt.Printf("Error reading streaming response: %v\n", err) } }() return dataChannel, nil }

用超时的代码(带有错误“ http2:响应主体关闭”):

func (g *gptAdaptorClient) CreateChatCompletionStream(messages []map[string]string, model string, maxTokens int, temperature float64) (<-chan string, error) {
    url := "https://api.openai.com/v1/chat/completions"

    // Payload to send to the API
    payload := map[string]interface{}{
        "model":       model,
        "messages":    messages,
        "max_tokens":  maxTokens,
        "temperature": temperature,
        "stream":      true, // Request to return as a stream
    }

    requestBody, err := json.Marshal(payload)
    if err != nil {
        return nil, fmt.Errorf("failed to marshal payload: %w", err)
    }

    // Create HTTP request
    req, err := http.NewRequest("POST", url, bytes.NewBuffer(requestBody))
    if err != nil {
        return nil, fmt.Errorf("failed to create request: %w", err)
    }

    g.addCommonHeaders(req) // Add necessary headers

    // Set up the context with timeout for HTTP request (10 seconds)
    reqCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    req = req.WithContext(reqCtx) // Send the request with the timeout context

    // Send request and get response
    resp, err := g.Client.Do(req)
    if err != nil {
        // Check if the error is a timeout and switch to Perplexity API
        if reqCtx.Err() == context.DeadlineExceeded {
            fmt.Println("OpenAI API call timed out, switching to Perplexity API...")
            return g.callPerplexityApi(messages, model, maxTokens, temperature)
        }
        return nil, fmt.Errorf("failed to send request: %w", err)
    }
    defer resp.Body.Close()

    // Check HTTP status code
    if resp.StatusCode != http.StatusOK {
        body, _ := io.ReadAll(resp.Body)
        return nil, fmt.Errorf("OpenAI API error: %s", string(body))
    }

    // Create channel to pass stream data
    dataChannel := make(chan string)

    // Process stream in a goroutine
    go func() {
        defer close(dataChannel)

        // Use scanner to read lines from the response body
        scanner := bufio.NewScanner(resp.Body)
        for scanner.Scan() {
            line := scanner.Text()

            // Check if the line doesn't contain data
            if len(line) < 6 || line[:6] != "data: " {
                continue
            }

            // Extract the JSON content after "data: "
            chunk := line[6:]
            if chunk == "[DONE]" {
                break
            }

            // Send chunk to the channel
            dataChannel <- chunk
        }

        // Check for scanner errors (if any)
        if err := scanner.Err(); err != nil {
            fmt.Printf("Error reading streaming response from OpenAI API: %v\n", err)
        }
    }()

    return dataChannel, nil
}

问题:


电流工作代码:该代码在不超时检查的情况下进行流式传输效果很好。 timeout

:当我添加HTTP请求的上下文超时(设置为10秒)时,我会遇到错误“ HTTP2:响应主体关闭”,并且未从OpenAI API收到响应。
  • 我期望的是:我想确保如果OpenAI API在10秒内不响应,我应该切换到像Perplexity这样的替代API提供商。
  • 问题:
  • 为什么添加http请求的上下文超时会导致流式传输时错误“ http2:响应主体关闭”?
  • 如何在避免此问题的同时添加HTTP请求的超时,并确保该请求成功完成时继续正确?
  • 有一种更好的方法来处理http请求超时,用于流式API,例如Openai的GO,而不会过早关闭响应主体?

在HTTP请求中处理超时的正确方法是使用httpclient设置这样:

    package main import ( "bufio" "bytes" "crypto/tls" "encoding/json" "fmt" "io" "net/http" "time" ) type gptAdaptorClient struct { http.Client } var httpClient = &http.Client{ Timeout: 30 * time.Second, Transport: &http.Transport{ TLSClientConfig: &tls.Config{ InsecureSkipVerify: true, }, }, } func NewGptAdaptorClient() *gptAdaptorClient { // Set up timeout for HTTP request (10 seconds) return &gptAdaptorClient{ Client: http.Client{ Timeout: 10 * time.Second, }, } } func (g *gptAdaptorClient) addCommonHeaders(req *http.Request) {} func (g *gptAdaptorClient) callPerplexityApi(messages []map[string]string, model string, maxTokens int, temperature float64) (<-chan string, error) { return nil, nil } func (g *gptAdaptorClient) CreateChatCompletionStream(messages []map[string]string, model string, maxTokens int, temperature float64) (<-chan string, error) { url := "https://api.openai.com/v1/chat/completions" // Payload to send to the API payload := map[string]interface{}{ "model": model, "messages": messages, "max_tokens": maxTokens, "temperature": temperature, "stream": true, // Request to return as a stream } requestBody, err := json.Marshal(payload) if err != nil { return nil, fmt.Errorf("failed to marshal payload: %w", err) } // Create HTTP request req, err := http.NewRequest("POST", url, bytes.NewBuffer(requestBody)) if err != nil { return nil, fmt.Errorf("failed to create request: %w", err) } g.addCommonHeaders(req) // Add necessary headers // Send request and get response resp, err := g.Client.Do(req) if err != nil { // Check if the error is a timeout and switch to Perplexity API if resp.StatusCode == http.StatusGatewayTimeout { fmt.Println("OpenAI API call timed out, switching to Perplexity API...") return g.callPerplexityApi(messages, model, maxTokens, temperature) } return nil, fmt.Errorf("failed to send request: %w", err) } defer resp.Body.Close() // Check HTTP status code if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) return nil, fmt.Errorf("OpenAI API error: %s", string(body)) } // Create channel to pass stream data dataChannel := make(chan string) // Process stream in a goroutine go func() { defer close(dataChannel) // Use scanner to read lines from the response body scanner := bufio.NewScanner(resp.Body) for scanner.Scan() { line := scanner.Text() // Check if the line doesn't contain data if len(line) < 6 || line[:6] != "data: " { continue } // Extract the JSON content after "data: " chunk := line[6:] if chunk == "[DONE]" { break } // Send chunk to the channel dataChannel <- chunk } // Check for scanner errors (if any) if err := scanner.Err(); err != nil { fmt.Printf("Error reading streaming response from OpenAI API: %v\n", err) } }() return dataChannel, nil }
go http timeout openai-api http2
最新问题
© www.soinside.com 2019 - 2025. All rights reserved.