diff --git a/common/go-channel.go b/common/go-channel.go index 4f00dff..8cd49ae 100644 --- a/common/go-channel.go +++ b/common/go-channel.go @@ -16,7 +16,22 @@ func SafeGoroutine(f func()) { }() } -func SafeSend(ch chan bool, value bool) (closed bool) { +func SafeSendBool(ch chan bool, value bool) (closed bool) { + defer func() { + // Recover from panic if one occured. A panic would mean the channel was closed. + if recover() != nil { + closed = true + } + }() + + // This will panic if the channel is closed. + ch <- value + + // If the code reaches here, then the channel was not closed. + return false +} + +func SafeSendString(ch chan string, value string) (closed bool) { defer func() { // Recover from panic if one occured. A panic would mean the channel was closed. if recover() != nil { diff --git a/relay/channel/openai/relay-openai.go b/relay/channel/openai/relay-openai.go index d627575..6825119 100644 --- a/relay/channel/openai/relay-openai.go +++ b/relay/channel/openai/relay-openai.go @@ -50,7 +50,7 @@ func OpenaiStreamHandler(c *gin.Context, resp *http.Response, relayMode int) (*d if data[:6] != "data: " && data[:6] != "[DONE]" { continue } - dataChan <- data + common.SafeSendString(dataChan, data) data = data[6:] if !strings.HasPrefix(data, "[DONE]") { streamItems = append(streamItems, data) @@ -123,7 +123,7 @@ func OpenaiStreamHandler(c *gin.Context, resp *http.Response, relayMode int) (*d // wait data out time.Sleep(2 * time.Second) } - common.SafeSend(stopChan, true) + common.SafeSendBool(stopChan, true) }() service.SetEventStreamHeaders(c) c.Stream(func(w io.Writer) bool {