mirror of
https://github.com/yangjian102621/geekai.git
synced 2025-09-18 01:06:39 +08:00
187 lines
5.6 KiB
Go
187 lines
5.6 KiB
Go
package chatimpl
|
||
|
||
// * +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
||
// * Copyright 2023 The Geek-AI Authors. All rights reserved.
|
||
// * Use of this source code is governed by a Apache-2.0 license
|
||
// * that can be found in the LICENSE file.
|
||
// * @Author yangjian102621@163.com
|
||
// * +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
||
|
||
import (
|
||
"bufio"
|
||
"context"
|
||
"encoding/json"
|
||
"errors"
|
||
"fmt"
|
||
"geekai/core/types"
|
||
"geekai/store/model"
|
||
"geekai/store/vo"
|
||
"geekai/utils"
|
||
req2 "github.com/imroc/req/v3"
|
||
"io"
|
||
"strings"
|
||
"time"
|
||
)
|
||
|
||
// OPenAI 消息发送实现
|
||
func (h *ChatHandler) sendOpenAiMessage(
|
||
chatCtx []types.Message,
|
||
req types.ApiRequest,
|
||
userVo vo.User,
|
||
ctx context.Context,
|
||
session *types.ChatSession,
|
||
role model.ChatRole,
|
||
prompt string,
|
||
ws *types.WsClient) error {
|
||
promptCreatedAt := time.Now() // 记录提问时间
|
||
start := time.Now()
|
||
var apiKey = model.ApiKey{}
|
||
response, err := h.doRequest(ctx, req, session, &apiKey)
|
||
logger.Info("HTTP请求完成,耗时:", time.Now().Sub(start))
|
||
if err != nil {
|
||
if strings.Contains(err.Error(), "context canceled") {
|
||
return fmt.Errorf("用户取消了请求:%s", prompt)
|
||
} else if strings.Contains(err.Error(), "no available key") {
|
||
return errors.New("抱歉😔😔😔,系统已经没有可用的 API KEY,请联系管理员!")
|
||
}
|
||
return err
|
||
} else {
|
||
defer response.Body.Close()
|
||
}
|
||
|
||
contentType := response.Header.Get("Content-Type")
|
||
if strings.Contains(contentType, "text/event-stream") {
|
||
replyCreatedAt := time.Now() // 记录回复时间
|
||
// 循环读取 Chunk 消息
|
||
var message = types.Message{}
|
||
var contents = make([]string, 0)
|
||
var function model.Function
|
||
var toolCall = false
|
||
var arguments = make([]string, 0)
|
||
scanner := bufio.NewScanner(response.Body)
|
||
var isNew = true
|
||
for scanner.Scan() {
|
||
line := scanner.Text()
|
||
if !strings.Contains(line, "data:") || len(line) < 30 {
|
||
continue
|
||
}
|
||
|
||
var responseBody = types.ApiResponse{}
|
||
err = json.Unmarshal([]byte(line[6:]), &responseBody)
|
||
if err != nil { // 数据解析出错
|
||
return errors.New(line)
|
||
}
|
||
if len(responseBody.Choices) == 0 { // Fixed: 兼容 Azure API 第一个输出空行
|
||
continue
|
||
}
|
||
|
||
if responseBody.Choices[0].FinishReason == "stop" && len(contents) == 0 {
|
||
utils.ReplyMessage(ws, "抱歉😔😔😔,AI助手由于未知原因已经停止输出内容。")
|
||
break
|
||
}
|
||
|
||
var tool types.ToolCall
|
||
if len(responseBody.Choices[0].Delta.ToolCalls) > 0 {
|
||
tool = responseBody.Choices[0].Delta.ToolCalls[0]
|
||
if toolCall && tool.Function.Name == "" {
|
||
arguments = append(arguments, tool.Function.Arguments)
|
||
continue
|
||
}
|
||
}
|
||
|
||
// 兼容 Function Call
|
||
fun := responseBody.Choices[0].Delta.FunctionCall
|
||
if fun.Name != "" {
|
||
tool = *new(types.ToolCall)
|
||
tool.Function.Name = fun.Name
|
||
} else if toolCall {
|
||
arguments = append(arguments, fun.Arguments)
|
||
continue
|
||
}
|
||
|
||
if !utils.IsEmptyValue(tool) {
|
||
res := h.DB.Where("name = ?", tool.Function.Name).First(&function)
|
||
if res.Error == nil {
|
||
toolCall = true
|
||
callMsg := fmt.Sprintf("正在调用工具 `%s` 作答 ...\n\n", function.Label)
|
||
utils.ReplyChunkMessage(ws, types.WsMessage{Type: types.WsStart})
|
||
utils.ReplyChunkMessage(ws, types.WsMessage{Type: types.WsMiddle, Content: callMsg})
|
||
contents = append(contents, callMsg)
|
||
}
|
||
continue
|
||
}
|
||
|
||
if responseBody.Choices[0].FinishReason == "tool_calls" ||
|
||
responseBody.Choices[0].FinishReason == "function_call" { // 函数调用完毕
|
||
break
|
||
}
|
||
|
||
// output stopped
|
||
if responseBody.Choices[0].FinishReason != "" {
|
||
break // 输出完成或者输出中断了
|
||
} else {
|
||
content := responseBody.Choices[0].Delta.Content
|
||
contents = append(contents, utils.InterfaceToString(content))
|
||
if isNew {
|
||
utils.ReplyChunkMessage(ws, types.WsMessage{Type: types.WsStart})
|
||
isNew = false
|
||
}
|
||
utils.ReplyChunkMessage(ws, types.WsMessage{
|
||
Type: types.WsMiddle,
|
||
Content: utils.InterfaceToString(responseBody.Choices[0].Delta.Content),
|
||
})
|
||
}
|
||
} // end for
|
||
|
||
if err := scanner.Err(); err != nil {
|
||
if strings.Contains(err.Error(), "context canceled") {
|
||
logger.Info("用户取消了请求:", prompt)
|
||
} else {
|
||
logger.Error("信息读取出错:", err)
|
||
}
|
||
}
|
||
|
||
if toolCall { // 调用函数完成任务
|
||
var params map[string]interface{}
|
||
_ = utils.JsonDecode(strings.Join(arguments, ""), ¶ms)
|
||
logger.Debugf("函数名称: %s, 函数参数:%s", function.Name, params)
|
||
params["user_id"] = userVo.Id
|
||
var apiRes types.BizVo
|
||
r, err := req2.C().R().SetHeader("Content-Type", "application/json").
|
||
SetHeader("Authorization", function.Token).
|
||
SetBody(params).
|
||
SetSuccessResult(&apiRes).Post(function.Action)
|
||
errMsg := ""
|
||
if err != nil {
|
||
errMsg = err.Error()
|
||
} else if r.IsErrorState() {
|
||
errMsg = r.Status
|
||
}
|
||
if errMsg != "" || apiRes.Code != types.Success {
|
||
msg := "调用函数工具出错:" + apiRes.Message + errMsg
|
||
utils.ReplyChunkMessage(ws, types.WsMessage{
|
||
Type: types.WsMiddle,
|
||
Content: msg,
|
||
})
|
||
contents = append(contents, msg)
|
||
} else {
|
||
utils.ReplyChunkMessage(ws, types.WsMessage{
|
||
Type: types.WsMiddle,
|
||
Content: apiRes.Data,
|
||
})
|
||
contents = append(contents, utils.InterfaceToString(apiRes.Data))
|
||
}
|
||
}
|
||
|
||
// 消息发送成功
|
||
if len(contents) > 0 {
|
||
h.saveChatHistory(req, prompt, contents, message, chatCtx, session, role, userVo, promptCreatedAt, replyCreatedAt)
|
||
}
|
||
} else {
|
||
body, _ := io.ReadAll(response.Body)
|
||
return fmt.Errorf("请求 OpenAI API 失败:%s", body)
|
||
}
|
||
|
||
return nil
|
||
}
|