From 21f32605c814b7f188efa5b64a51d7b802b3b1ca Mon Sep 17 00:00:00 2001 From: CaIon <1808837298@qq.com> Date: Sun, 28 Apr 2024 16:17:16 +0800 Subject: [PATCH] feat: safe send channel --- common/go-channel.go | 17 ++++++++++++++++- relay/channel/openai/relay-openai.go | 4 ++-- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/common/go-channel.go b/common/go-channel.go index 4f00dff..8cd49ae 100644 --- a/common/go-channel.go +++ b/common/go-channel.go @@ -16,7 +16,22 @@ func SafeGoroutine(f func()) { }() } -func SafeSend(ch chan bool, value bool) (closed bool) { +func SafeSendBool(ch chan bool, value bool) (closed bool) { + defer func() { + // Recover from panic if one occured. A panic would mean the channel was closed. + if recover() != nil { + closed = true + } + }() + + // This will panic if the channel is closed. + ch <- value + + // If the code reaches here, then the channel was not closed. + return false +} + +func SafeSendString(ch chan string, value string) (closed bool) { defer func() { // Recover from panic if one occured. A panic would mean the channel was closed. if recover() != nil { diff --git a/relay/channel/openai/relay-openai.go b/relay/channel/openai/relay-openai.go index d627575..6825119 100644 --- a/relay/channel/openai/relay-openai.go +++ b/relay/channel/openai/relay-openai.go @@ -50,7 +50,7 @@ func OpenaiStreamHandler(c *gin.Context, resp *http.Response, relayMode int) (*d if data[:6] != "data: " && data[:6] != "[DONE]" { continue } - dataChan <- data + common.SafeSendString(dataChan, data) data = data[6:] if !strings.HasPrefix(data, "[DONE]") { streamItems = append(streamItems, data) @@ -123,7 +123,7 @@ func OpenaiStreamHandler(c *gin.Context, resp *http.Response, relayMode int) (*d // wait data out time.Sleep(2 * time.Second) } - common.SafeSend(stopChan, true) + common.SafeSendBool(stopChan, true) }() service.SetEventStreamHeaders(c) c.Stream(func(w io.Writer) bool {