From fc6ae6bf34e3c70613903f598fa0dcb16db8be44 Mon Sep 17 00:00:00 2001 From: CalciumIon <1808837298@qq.com> Date: Thu, 27 Jun 2024 17:17:23 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=94=AF=E6=8C=81=E8=AE=BE=E7=BD=AE?= =?UTF-8?q?=E6=B5=81=E6=A8=A1=E5=BC=8F=E8=B6=85=E6=97=B6=E6=97=B6=E9=97=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 ++ common/go-channel.go | 19 +++++++++++++++++++ constant/system-setting.go | 4 ++++ relay/channel/openai/relay-openai.go | 7 ++++++- 4 files changed, 31 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 963555e..873126c 100644 --- a/README.md +++ b/README.md @@ -83,6 +83,8 @@ ``` 可以实现400错误转为500错误,从而重试 +## 比原版One API多出的配置 +- `STREAMING_TIMEOUT`:设置流式一次回复的超时时间,默认为 30 秒 ## 部署 ### 部署要求 diff --git a/common/go-channel.go b/common/go-channel.go index 8cd49ae..3fc8653 100644 --- a/common/go-channel.go +++ b/common/go-channel.go @@ -3,6 +3,7 @@ package common import ( "fmt" "runtime/debug" + "time" ) func SafeGoroutine(f func()) { @@ -45,3 +46,21 @@ func SafeSendString(ch chan string, value string) (closed bool) { // If the code reaches here, then the channel was not closed. return false } + +// SafeSendStringTimeout send, return true, else return false +func SafeSendStringTimeout(ch chan string, value string, timeout int) (closed bool) { + defer func() { + // Recover from panic if one occured. A panic would mean the channel was closed. + if recover() != nil { + closed = false + } + }() + + // This will panic if the channel is closed. + select { + case ch <- value: + return true + case <-time.After(time.Duration(timeout) * time.Second): + return false + } +} diff --git a/constant/system-setting.go b/constant/system-setting.go index b2976e4..de20fef 100644 --- a/constant/system-setting.go +++ b/constant/system-setting.go @@ -1,9 +1,13 @@ package constant +import "one-api/common" + var ServerAddress = "http://localhost:3000" var WorkerUrl = "" var WorkerValidKey = "" +var StreamingTimeout = common.GetOrDefault("STREAMING_TIMEOUT", 30) + func EnableWorker() bool { return WorkerUrl != "" } diff --git a/relay/channel/openai/relay-openai.go b/relay/channel/openai/relay-openai.go index 8733cce..5146a4f 100644 --- a/relay/channel/openai/relay-openai.go +++ b/relay/channel/openai/relay-openai.go @@ -8,6 +8,7 @@ import ( "io" "net/http" "one-api/common" + "one-api/constant" "one-api/dto" relaycommon "one-api/relay/common" relayconstant "one-api/relay/constant" @@ -51,7 +52,11 @@ func OpenaiStreamHandler(c *gin.Context, resp *http.Response, info *relaycommon. if data[:6] != "data: " && data[:6] != "[DONE]" { continue } - common.SafeSendString(dataChan, data) + if !common.SafeSendStringTimeout(dataChan, data, constant.StreamingTimeout) { + // send data timeout, stop the stream + common.LogInfo(c, "send data timeout, stop the stream") + break + } data = data[6:] if !strings.HasPrefix(data, "[DONE]") { streamItems = append(streamItems, data)