func (g *gptAdaptorClient) CreateChatCompletionStream(messages []map[string]string, model string, maxTokens int, temperature float64) (<-chan string, error) {
url := "https://api.openai.com/v1/chat/completions"
// Payload to send to the API
payload := map[string]interface{}{
"model": model,
"messages": messages,
"max_tokens": maxTokens,
"temperature": temperature,
"stream": true, // Request to return as a stream
}
requestBody, err := json.Marshal(payload)
if err != nil {
return nil, fmt.Errorf("failed to marshal payload: %w", err)
}
// Create HTTP request
req, err := http.NewRequest("POST", url, bytes.NewBuffer(requestBody))
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
g.addCommonHeaders(req) // Add necessary headers
// Send request and get response
resp, err := g.Client.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to send request: %w", err)
}
// Check HTTP status code
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("API error: %s", string(body))
}
// Create channel to pass stream data
dataChannel := make(chan string)
// Process stream in a goroutine
go func() {
defer close(dataChannel)
defer resp.Body.Close()
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
line := scanner.Text()
// Check if the line doesn't contain data
if len(line) < 6 || line[:6] != "data: " {
continue
}
// Extract the JSON content after "data: "
chunk := line[6:]
if chunk == "[DONE]" {
break
}
// Send chunk to the channel
dataChannel <- chunk
}
// Check for scanner errors (if any)
if err := scanner.Err(); err != nil {
fmt.Printf("Error reading streaming response: %v\n", err)
}
}()
return dataChannel, nil
}
用超时的代码(带有错误“ http2:响应主体关闭”):
func (g *gptAdaptorClient) CreateChatCompletionStream(messages []map[string]string, model string, maxTokens int, temperature float64) (<-chan string, error) {
url := "https://api.openai.com/v1/chat/completions"
// Payload to send to the API
payload := map[string]interface{}{
"model": model,
"messages": messages,
"max_tokens": maxTokens,
"temperature": temperature,
"stream": true, // Request to return as a stream
}
requestBody, err := json.Marshal(payload)
if err != nil {
return nil, fmt.Errorf("failed to marshal payload: %w", err)
}
// Create HTTP request
req, err := http.NewRequest("POST", url, bytes.NewBuffer(requestBody))
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
g.addCommonHeaders(req) // Add necessary headers
// Set up the context with timeout for HTTP request (10 seconds)
reqCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
req = req.WithContext(reqCtx) // Send the request with the timeout context
// Send request and get response
resp, err := g.Client.Do(req)
if err != nil {
// Check if the error is a timeout and switch to Perplexity API
if reqCtx.Err() == context.DeadlineExceeded {
fmt.Println("OpenAI API call timed out, switching to Perplexity API...")
return g.callPerplexityApi(messages, model, maxTokens, temperature)
}
return nil, fmt.Errorf("failed to send request: %w", err)
}
defer resp.Body.Close()
// Check HTTP status code
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("OpenAI API error: %s", string(body))
}
// Create channel to pass stream data
dataChannel := make(chan string)
// Process stream in a goroutine
go func() {
defer close(dataChannel)
// Use scanner to read lines from the response body
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
line := scanner.Text()
// Check if the line doesn't contain data
if len(line) < 6 || line[:6] != "data: " {
continue
}
// Extract the JSON content after "data: "
chunk := line[6:]
if chunk == "[DONE]" {
break
}
// Send chunk to the channel
dataChannel <- chunk
}
// Check for scanner errors (if any)
if err := scanner.Err(); err != nil {
fmt.Printf("Error reading streaming response from OpenAI API: %v\n", err)
}
}()
return dataChannel, nil
}
问题:
电流工作代码:该代码在不超时检查的情况下进行流式传输效果很好。 timeout
:当我添加HTTP请求的上下文超时(设置为10秒)时,我会遇到错误“ HTTP2:响应主体关闭”,并且未从OpenAI API收到响应。在HTTP请求中处理超时的正确方法是使用httpclient设置这样:
package main
import (
"bufio"
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
)
type gptAdaptorClient struct {
http.Client
}
var httpClient = &http.Client{
Timeout: 30 * time.Second,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
},
}
func NewGptAdaptorClient() *gptAdaptorClient {
// Set up timeout for HTTP request (10 seconds)
return &gptAdaptorClient{
Client: http.Client{
Timeout: 10 * time.Second,
},
}
}
func (g *gptAdaptorClient) addCommonHeaders(req *http.Request) {}
func (g *gptAdaptorClient) callPerplexityApi(messages []map[string]string, model string, maxTokens int, temperature float64) (<-chan string, error) {
return nil, nil
}
func (g *gptAdaptorClient) CreateChatCompletionStream(messages []map[string]string, model string, maxTokens int, temperature float64) (<-chan string, error) {
url := "https://api.openai.com/v1/chat/completions"
// Payload to send to the API
payload := map[string]interface{}{
"model": model,
"messages": messages,
"max_tokens": maxTokens,
"temperature": temperature,
"stream": true, // Request to return as a stream
}
requestBody, err := json.Marshal(payload)
if err != nil {
return nil, fmt.Errorf("failed to marshal payload: %w", err)
}
// Create HTTP request
req, err := http.NewRequest("POST", url, bytes.NewBuffer(requestBody))
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
g.addCommonHeaders(req) // Add necessary headers
// Send request and get response
resp, err := g.Client.Do(req)
if err != nil {
// Check if the error is a timeout and switch to Perplexity API
if resp.StatusCode == http.StatusGatewayTimeout {
fmt.Println("OpenAI API call timed out, switching to Perplexity API...")
return g.callPerplexityApi(messages, model, maxTokens, temperature)
}
return nil, fmt.Errorf("failed to send request: %w", err)
}
defer resp.Body.Close()
// Check HTTP status code
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("OpenAI API error: %s", string(body))
}
// Create channel to pass stream data
dataChannel := make(chan string)
// Process stream in a goroutine
go func() {
defer close(dataChannel)
// Use scanner to read lines from the response body
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
line := scanner.Text()
// Check if the line doesn't contain data
if len(line) < 6 || line[:6] != "data: " {
continue
}
// Extract the JSON content after "data: "
chunk := line[6:]
if chunk == "[DONE]" {
break
}
// Send chunk to the channel
dataChannel <- chunk
}
// Check for scanner errors (if any)
if err := scanner.Err(); err != nil {
fmt.Printf("Error reading streaming response from OpenAI API: %v\n", err)
}
}()
return dataChannel, nil
}