one-api/common/requester/http_stream_reader.go
2024-01-25 11:56:31 +08:00

67 lines
1.2 KiB
Go

package requester
import (
"bufio"
"bytes"
"net/http"
)
var StreamClosed = []byte("stream_closed")
type HandlerPrefix[T streamable] func(rawLine *[]byte, dataChan chan T, errChan chan error)
type streamable interface {
// types.ChatCompletionStreamResponse | types.CompletionResponse
any
}
type StreamReaderInterface[T streamable] interface {
Recv() (<-chan T, <-chan error)
Close()
}
type streamReader[T streamable] struct {
reader *bufio.Reader
response *http.Response
handlerPrefix HandlerPrefix[T]
DataChan chan T
ErrChan chan error
}
func (stream *streamReader[T]) Recv() (<-chan T, <-chan error) {
go stream.processLines()
return stream.DataChan, stream.ErrChan
}
//nolint:gocognit
func (stream *streamReader[T]) processLines() {
for {
rawLine, readErr := stream.reader.ReadBytes('\n')
if readErr != nil {
stream.ErrChan <- readErr
return
}
noSpaceLine := bytes.TrimSpace(rawLine)
if len(noSpaceLine) == 0 {
continue
}
stream.handlerPrefix(&noSpaceLine, stream.DataChan, stream.ErrChan)
if noSpaceLine == nil {
continue
}
if bytes.Equal(noSpaceLine, StreamClosed) {
return
}
}
}
func (stream *streamReader[T]) Close() {
stream.response.Body.Close()
}