From 6680f6d83a3283bd486da17e3e30af1a09e4ad60 Mon Sep 17 00:00:00 2001 From: Xyfacai Date: Tue, 28 Nov 2023 22:02:09 +0800 Subject: [PATCH 1/3] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=E5=AE=A2=E6=88=B7?= =?UTF-8?q?=E7=AB=AF=E4=B8=AD=E6=96=AD=E8=AF=B7=E6=B1=82=EF=BC=8C=E8=AE=A1?= =?UTF-8?q?=E7=AE=97=E8=A1=A5=E5=85=A8=E9=98=BB=E5=A1=9E=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- controller/relay-openai.go | 65 +++++++++++++++++++++++--------------- 1 file changed, 40 insertions(+), 25 deletions(-) diff --git a/controller/relay-openai.go b/controller/relay-openai.go index 29545d3..f77a5b0 100644 --- a/controller/relay-openai.go +++ b/controller/relay-openai.go @@ -9,10 +9,11 @@ import ( "net/http" "one-api/common" "strings" + "sync" ) func openaiStreamHandler(c *gin.Context, resp *http.Response, relayMode int) (*OpenAIErrorWithStatusCode, string) { - responseText := "" + var responseTextBuilder strings.Builder scanner := bufio.NewScanner(resp.Body) scanner.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) { if atEOF && len(data) == 0 { @@ -26,9 +27,16 @@ func openaiStreamHandler(c *gin.Context, resp *http.Response, relayMode int) (*O } return 0, nil, nil }) - dataChan := make(chan string) - stopChan := make(chan bool) + dataChan := make(chan string, 5) + stopChan := make(chan bool, 2) + defer close(stopChan) + defer close(dataChan) + var wg sync.WaitGroup + go func() { + wg.Add(1) + defer wg.Done() + var streamItems []string for scanner.Scan() { data := scanner.Text() if len(data) < 6 { // ignore blank line or wrong format @@ -40,27 +48,33 @@ func openaiStreamHandler(c *gin.Context, resp *http.Response, relayMode int) (*O dataChan <- data data = data[6:] if !strings.HasPrefix(data, "[DONE]") { - switch relayMode { - case RelayModeChatCompletions: - var streamResponse ChatCompletionsStreamResponseSimple - err := json.Unmarshal(common.StringToByteSlice(data), &streamResponse) - if err != nil { - common.SysError("error unmarshalling stream response: " + err.Error()) - continue // just ignore the error - } - for _, choice := range streamResponse.Choices { - responseText += choice.Delta.Content - } - case RelayModeCompletions: - var streamResponse CompletionsStreamResponse - err := json.Unmarshal(common.StringToByteSlice(data), &streamResponse) - if err != nil { - common.SysError("error unmarshalling stream response: " + err.Error()) - continue - } - for _, choice := range streamResponse.Choices { - responseText += choice.Text - } + streamItems = append(streamItems, data) + } + } + streamResp := "[" + strings.Join(streamItems, ",") + "]" + switch relayMode { + case RelayModeChatCompletions: + var streamResponses []ChatCompletionsStreamResponseSimple + err := json.Unmarshal(common.StringToByteSlice(streamResp), &streamResponses) + if err != nil { + common.SysError("error unmarshalling stream response: " + err.Error()) + return // just ignore the error + } + for _, streamResponse := range streamResponses { + for _, choice := range streamResponse.Choices { + responseTextBuilder.WriteString(choice.Delta.Content) + } + } + case RelayModeCompletions: + var streamResponses []CompletionsStreamResponse + err := json.Unmarshal(common.StringToByteSlice(streamResp), &streamResponses) + if err != nil { + common.SysError("error unmarshalling stream response: " + err.Error()) + return // just ignore the error + } + for _, streamResponse := range streamResponses { + for _, choice := range streamResponse.Choices { + responseTextBuilder.WriteString(choice.Text) } } } @@ -85,7 +99,8 @@ func openaiStreamHandler(c *gin.Context, resp *http.Response, relayMode int) (*O if err != nil { return errorWrapper(err, "close_response_body_failed", http.StatusInternalServerError), "" } - return nil, responseText + wg.Wait() + return nil, responseTextBuilder.String() } func openaiHandler(c *gin.Context, resp *http.Response, promptTokens int, model string) (*OpenAIErrorWithStatusCode, *Usage) { From ed22a202f7be44f85f84055c8451fe2d22d2389f Mon Sep 17 00:00:00 2001 From: Xyfacai Date: Thu, 30 Nov 2023 20:28:57 +0800 Subject: [PATCH 2/3] =?UTF-8?q?fix:=20=E5=A6=82=E6=9E=9C=E8=BF=98=E6=9C=89?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=EF=BC=8C=E7=AD=89=E5=BE=85=E4=B8=80=E4=BC=9A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- controller/relay-openai.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/controller/relay-openai.go b/controller/relay-openai.go index f77a5b0..24a70d6 100644 --- a/controller/relay-openai.go +++ b/controller/relay-openai.go @@ -78,6 +78,10 @@ func openaiStreamHandler(c *gin.Context, resp *http.Response, relayMode int) (*O } } } + if len(dataChan) > 0 { + // wait data out + time.Sleep(2 * time.Second) + } stopChan <- true }() setEventStreamHeaders(c) From f712b73c184215a1afd7a2d84119b470bd7cd1ff Mon Sep 17 00:00:00 2001 From: Xyfacai Date: Thu, 30 Nov 2023 20:30:29 +0800 Subject: [PATCH 3/3] =?UTF-8?q?fix:=20=E5=A6=82=E6=9E=9C=E8=BF=98=E6=9C=89?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=EF=BC=8C=E7=AD=89=E5=BE=85=E4=B8=80=E4=BC=9A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- controller/relay-openai.go | 1 + 1 file changed, 1 insertion(+) diff --git a/controller/relay-openai.go b/controller/relay-openai.go index 24a70d6..c0d3df1 100644 --- a/controller/relay-openai.go +++ b/controller/relay-openai.go @@ -10,6 +10,7 @@ import ( "one-api/common" "strings" "sync" + "time" ) func openaiStreamHandler(c *gin.Context, resp *http.Response, relayMode int) (*OpenAIErrorWithStatusCode, string) {