one-api/common/requester/ws_reader.go
2024-01-25 11:56:31 +08:00

45 lines
717 B
Go

package requester
import (
"bytes"
"github.com/gorilla/websocket"
)
type wsReader[T streamable] struct {
reader *websocket.Conn
handlerPrefix HandlerPrefix[T]
DataChan chan T
ErrChan chan error
}
func (stream *wsReader[T]) Recv() (<-chan T, <-chan error) {
go stream.processLines()
return stream.DataChan, stream.ErrChan
}
func (stream *wsReader[T]) processLines() {
for {
_, msg, err := stream.reader.ReadMessage()
if err != nil {
stream.ErrChan <- err
return
}
stream.handlerPrefix(&msg, stream.DataChan, stream.ErrChan)
if msg == nil {
continue
}
if bytes.Equal(msg, StreamClosed) {
return
}
}
}
func (stream *wsReader[T]) Close() {
stream.reader.Close()
}