mirror of
https://github.com/songquanpeng/one-api.git
synced 2025-09-29 14:46:38 +08:00
80 lines
1.6 KiB
Go
80 lines
1.6 KiB
Go
package requester
|
||
|
||
import (
|
||
"bufio"
|
||
"bytes"
|
||
"io"
|
||
"net/http"
|
||
)
|
||
|
||
// 流处理函数,判断依据如下:
|
||
// 1.如果有错误信息,则直接返回错误信息
|
||
// 2.如果isFinished=true,则返回io.EOF,并且如果response不为空,还将返回response
|
||
// 3.如果rawLine=nil 或者 response长度为0,则直接跳过
|
||
// 4.如果以上条件都不满足,则返回response
|
||
type HandlerPrefix[T streamable] func(rawLine *[]byte, isFinished *bool, response *[]T) error
|
||
|
||
type streamable interface {
|
||
// types.ChatCompletionStreamResponse | types.CompletionResponse
|
||
any
|
||
}
|
||
|
||
type StreamReaderInterface[T streamable] interface {
|
||
Recv() (*[]T, error)
|
||
Close()
|
||
}
|
||
|
||
type streamReader[T streamable] struct {
|
||
isFinished bool
|
||
|
||
reader *bufio.Reader
|
||
response *http.Response
|
||
|
||
handlerPrefix HandlerPrefix[T]
|
||
}
|
||
|
||
func (stream *streamReader[T]) Recv() (response *[]T, err error) {
|
||
if stream.isFinished {
|
||
err = io.EOF
|
||
return
|
||
}
|
||
response, err = stream.processLines()
|
||
return
|
||
}
|
||
|
||
//nolint:gocognit
|
||
func (stream *streamReader[T]) processLines() (*[]T, error) {
|
||
for {
|
||
rawLine, readErr := stream.reader.ReadBytes('\n')
|
||
if readErr != nil {
|
||
return nil, readErr
|
||
}
|
||
|
||
noSpaceLine := bytes.TrimSpace(rawLine)
|
||
|
||
var response []T
|
||
err := stream.handlerPrefix(&noSpaceLine, &stream.isFinished, &response)
|
||
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
if stream.isFinished {
|
||
if len(response) > 0 {
|
||
return &response, io.EOF
|
||
}
|
||
return nil, io.EOF
|
||
}
|
||
|
||
if noSpaceLine == nil || len(response) == 0 {
|
||
continue
|
||
}
|
||
|
||
return &response, nil
|
||
}
|
||
}
|
||
|
||
func (stream *streamReader[T]) Close() {
|
||
stream.response.Body.Close()
|
||
}
|