Merge branch 'feat/streaming' of github.com:fdc310/LangBot into streaming_feature

This commit is contained in:
fdc
2025-07-02 14:09:01 +08:00
47 changed files with 2657 additions and 849 deletions
+1 -1
View File
@@ -1,4 +1,4 @@
from __future__ import print_function
from __future__ import annotations
import traceback
import asyncio
+12 -6
View File
@@ -1,5 +1,7 @@
from __future__ import annotations
import asyncio
from .. import stage, app, note
from ...utils import importutil
@@ -20,11 +22,15 @@ class ShowNotesStage(stage.BootingStage):
try:
note_inst = note_cls(ap)
if await note_inst.need_show():
async for ret in note_inst.yield_note():
if not ret:
continue
msg, level = ret
if msg:
ap.logger.log(level, msg)
async def ayield_note(note_inst: note.LaunchNote):
async for ret in note_inst.yield_note():
if not ret:
continue
msg, level = ret
if msg:
ap.logger.log(level, msg)
asyncio.create_task(ayield_note(note_inst))
except Exception:
continue
View File
+9
View File
@@ -0,0 +1,9 @@
from __future__ import annotations
class AdapterNotFoundError(Exception):
def __init__(self, adapter_name: str):
self.adapter_name = adapter_name
def __str__(self):
return f'Adapter {self.adapter_name} not found'
+9
View File
@@ -0,0 +1,9 @@
from __future__ import annotations
class RequesterNotFoundError(Exception):
def __init__(self, requester_name: str):
self.requester_name = requester_name
def __str__(self):
return f'Requester {self.requester_name} not found'
+14 -2
View File
@@ -15,6 +15,8 @@ from ..discover import engine
from ..entity.persistence import bot as persistence_bot
from ..entity.errors import platform as platform_errors
from .logger import EventLogger
# 处理 3.4 移除了 YiriMirai 之后,插件的兼容性问题
@@ -118,8 +120,10 @@ class RuntimeBot:
if isinstance(e, asyncio.CancelledError):
self.task_context.set_current_action('Exited.')
return
traceback_str = traceback.format_exc()
self.task_context.set_current_action('Exited with error.')
await self.logger.error(f'平台适配器运行出错:\n{e}\n{traceback.format_exc()}')
await self.logger.error(f'平台适配器运行出错:\n{e}\n{traceback_str}')
self.task_wrapper = self.ap.task_mgr.create_task(
exception_wrapper(),
@@ -205,7 +209,12 @@ class PlatformManager:
for bot in bots:
# load all bots here, enable or disable will be handled in runtime
await self.load_bot(bot)
try:
await self.load_bot(bot)
except platform_errors.AdapterNotFoundError as e:
self.ap.logger.warning(f'Adapter {e.adapter_name} not found, skipping bot {bot.uuid}')
except Exception as e:
self.ap.logger.error(f'Failed to load bot {bot.uuid}: {e}\n{traceback.format_exc()}')
async def load_bot(
self,
@@ -219,6 +228,9 @@ class PlatformManager:
logger = EventLogger(name=f'platform-adapter-{bot_entity.name}', ap=self.ap)
if bot_entity.adapter not in self.adapter_dict:
raise platform_errors.AdapterNotFoundError(bot_entity.adapter)
adapter_inst = self.adapter_dict[bot_entity.adapter](
bot_entity.adapter_config,
self.ap,
+14 -14
View File
@@ -22,7 +22,7 @@ class DingTalkMessageConverter(adapter.MessageConverter):
at = True
if type(msg) is platform_message.Plain:
content += msg.text
return content,at
return content, at
@staticmethod
async def target2yiri(event: DingTalkEvent, bot_name: str):
@@ -116,15 +116,6 @@ class DingTalkAdapter(adapter.MessagePlatformAdapter):
self.bot_account_id = self.config['robot_name']
self.bot = DingTalkClient(
client_id=config['client_id'],
client_secret=config['client_secret'],
robot_name=config['robot_name'],
robot_code=config['robot_code'],
markdown_card=config['markdown_card'],
logger=self.logger,
)
async def reply_message(
self,
message_source: platform_events.MessageEvent,
@@ -136,8 +127,8 @@ class DingTalkAdapter(adapter.MessagePlatformAdapter):
)
incoming_message = event.incoming_message
content,at = await DingTalkMessageConverter.yiri2target(message)
await self.bot.send_message(content, incoming_message,at)
content, at = await DingTalkMessageConverter.yiri2target(message)
await self.bot.send_message(content, incoming_message, at)
async def send_message(self, target_type: str, target_id: str, message: platform_message.MessageChain):
content = await DingTalkMessageConverter.yiri2target(message)
@@ -157,8 +148,8 @@ class DingTalkAdapter(adapter.MessagePlatformAdapter):
await self.event_converter.target2yiri(event, self.config['robot_name']),
self,
)
except Exception as e:
await self.logger.error(f"Error in dingtalk callback: {traceback.format_exc()}")
except Exception:
await self.logger.error(f'Error in dingtalk callback: {traceback.format_exc()}')
if event_type == platform_events.FriendMessage:
self.bot.on_message('FriendMessage')(on_message)
@@ -166,6 +157,15 @@ class DingTalkAdapter(adapter.MessagePlatformAdapter):
self.bot.on_message('GroupMessage')(on_message)
async def run_async(self):
config = self.config
self.bot = DingTalkClient(
client_id=config['client_id'],
client_secret=config['client_secret'],
robot_name=config['robot_name'],
robot_code=config['robot_code'],
markdown_card=config['markdown_card'],
logger=self.logger,
)
await self.bot.start()
async def kill(self) -> bool:
+90 -9
View File
@@ -8,6 +8,7 @@ import base64
import uuid
import os
import datetime
import io
import aiohttp
@@ -35,28 +36,88 @@ class DiscordMessageConverter(adapter.MessageConverter):
for ele in message_chain:
if isinstance(ele, platform_message.Image):
image_bytes = None
filename = f'{uuid.uuid4()}.png' # 默认文件名
if ele.base64:
image_bytes = base64.b64decode(ele.base64)
# 处理base64编码的图片
if ele.base64.startswith('data:'):
# 从data URL中提取文件类型
data_header = ele.base64.split(',')[0]
if 'jpeg' in data_header or 'jpg' in data_header:
filename = f'{uuid.uuid4()}.jpg'
elif 'gif' in data_header:
filename = f'{uuid.uuid4()}.gif'
elif 'webp' in data_header:
filename = f'{uuid.uuid4()}.webp'
# 去掉data:image/xxx;base64,前缀
base64_data = ele.base64.split(',')[1]
else:
base64_data = ele.base64
image_bytes = base64.b64decode(base64_data)
elif ele.url:
# 从URL下载图片
async with aiohttp.ClientSession() as session:
async with session.get(ele.url) as response:
image_bytes = await response.read()
# 从URL或Content-Type推断文件类型
content_type = response.headers.get('Content-Type', '')
if 'jpeg' in content_type or 'jpg' in content_type:
filename = f'{uuid.uuid4()}.jpg'
elif 'gif' in content_type:
filename = f'{uuid.uuid4()}.gif'
elif 'webp' in content_type:
filename = f'{uuid.uuid4()}.webp'
elif ele.url.lower().endswith(('.jpg', '.jpeg')):
filename = f'{uuid.uuid4()}.jpg'
elif ele.url.lower().endswith('.gif'):
filename = f'{uuid.uuid4()}.gif'
elif ele.url.lower().endswith('.webp'):
filename = f'{uuid.uuid4()}.webp'
elif ele.path:
with open(ele.path, 'rb') as f:
image_bytes = f.read()
# 从文件路径读取图片
# 确保路径没有空字节
clean_path = ele.path.replace('\x00', '')
clean_path = os.path.abspath(clean_path)
if not os.path.exists(clean_path):
continue # 跳过不存在的文件
try:
with open(clean_path, 'rb') as f:
image_bytes = f.read()
# 从文件路径获取文件名,保持原始扩展名
original_filename = os.path.basename(clean_path)
if original_filename and '.' in original_filename:
# 保持原始文件名的扩展名
ext = original_filename.split('.')[-1].lower()
filename = f'{uuid.uuid4()}.{ext}'
else:
# 如果没有扩展名,尝试从文件内容检测
if image_bytes.startswith(b'\xff\xd8\xff'):
filename = f'{uuid.uuid4()}.jpg'
elif image_bytes.startswith(b'GIF'):
filename = f'{uuid.uuid4()}.gif'
elif image_bytes.startswith(b'RIFF') and b'WEBP' in image_bytes[:20]:
filename = f'{uuid.uuid4()}.webp'
# 默认保持PNG
except Exception as e:
print(f"Error reading image file {clean_path}: {e}")
continue # 跳过读取失败的文件
image_files.append(discord.File(fp=image_bytes, filename=f'{uuid.uuid4()}.png'))
if image_bytes:
# 使用BytesIO创建文件对象,避免路径问题
import io
image_files.append(discord.File(fp=io.BytesIO(image_bytes), filename=filename))
elif isinstance(ele, platform_message.Plain):
text_string += ele.text
elif isinstance(ele, platform_message.Forward):
for node in ele.node_list:
(
text_string,
image_files,
node_text,
node_images,
) = await DiscordMessageConverter.yiri2target(node.message_chain)
text_string += text_string
image_files.extend(image_files)
text_string += node_text
image_files.extend(node_images)
return text_string, image_files
@@ -199,7 +260,27 @@ class DiscordAdapter(adapter.MessagePlatformAdapter):
self.bot = MyClient(intents=intents, **args)
async def send_message(self, target_type: str, target_id: str, message: platform_message.MessageChain):
pass
msg_to_send, image_files = await self.message_converter.yiri2target(message)
try:
# 获取频道对象
channel = self.bot.get_channel(int(target_id))
if channel is None:
# 如果本地缓存中没有,尝试从API获取
channel = await self.bot.fetch_channel(int(target_id))
args = {
'content': msg_to_send,
}
if len(image_files) > 0:
args['files'] = image_files
await channel.send(**args)
except Exception as e:
await self.logger.error(f"Discord send_message failed: {e}")
raise e
async def reply_message(
self,
+108
View File
@@ -9,6 +9,7 @@ import re
import base64
import uuid
import json
import time
import datetime
import hashlib
from Crypto.Cipher import AES
@@ -320,6 +321,10 @@ class LarkEventConverter(adapter.EventConverter):
)
CARD_ID_CACHE_SIZE = 500
CARD_ID_CACHE_MAX_LIFETIME = 20 * 60 # 20分钟
class LarkAdapter(adapter.MessagePlatformAdapter):
bot: lark_oapi.ws.Client
api_client: lark_oapi.Client
@@ -338,6 +343,8 @@ class LarkAdapter(adapter.MessagePlatformAdapter):
config: dict
quart_app: quart.Quart
ap: app.Application
message_id_to_card_id: typing.Dict[str, typing.Tuple[str, int]]
def __init__(self, config: dict, ap: app.Application, logger: EventLogger):
self.config = config
@@ -345,6 +352,7 @@ class LarkAdapter(adapter.MessagePlatformAdapter):
self.logger = logger
self.quart_app = quart.Quart(__name__)
self.listeners = {}
self.message_id_to_card_id = {}
@self.quart_app.route('/lark/callback', methods=['POST'])
async def lark_callback():
@@ -390,6 +398,19 @@ class LarkAdapter(adapter.MessagePlatformAdapter):
return {'code': 500, 'message': 'error'}
async def on_message(event: lark_oapi.im.v1.P2ImMessageReceiveV1):
if self.config['enable-card-reply'] and event.event.message.message_id not in self.message_id_to_card_id:
self.ap.logger.debug('卡片回复模式开启')
# 开启卡片回复模式. 这里可以实现飞书一发消息,马上创建卡片进行回复"思考中..."
reply_message_id = await self.create_message_card(event.event.message.message_id)
self.message_id_to_card_id[event.event.message.message_id] = (reply_message_id, time.time())
if len(self.message_id_to_card_id) > CARD_ID_CACHE_SIZE:
self.message_id_to_card_id = {
k: v
for k, v in self.message_id_to_card_id.items()
if v[1] > time.time() - CARD_ID_CACHE_MAX_LIFETIME
}
lb_event = await self.event_converter.target2yiri(event, self.api_client)
await self.listeners[type(lb_event)](lb_event, self)
@@ -409,11 +430,93 @@ class LarkAdapter(adapter.MessagePlatformAdapter):
async def send_message(self, target_type: str, target_id: str, message: platform_message.MessageChain):
pass
async def create_message_card(self, message_id: str) -> str:
"""
创建卡片消息。
使用卡片消息是因为普通消息更新次数有限制,而大模型流式返回结果可能很多而超过限制,而飞书卡片没有这个限制
"""
# TODO 目前只支持卡片模板方式,且卡片变量一定是content,未来这块要做成可配置
# 发消息马上就会回复显示初始化的content信息,即思考中
content = {
'type': 'template',
'data': {'template_id': self.config['card_template_id'], 'template_variable': {'content': 'Thinking...'}},
}
request: ReplyMessageRequest = (
ReplyMessageRequest.builder()
.message_id(message_id)
.request_body(
ReplyMessageRequestBody.builder().content(json.dumps(content)).msg_type('interactive').build()
)
.build()
)
# 发起请求
response: ReplyMessageResponse = await self.api_client.im.v1.message.areply(request)
# 处理失败返回
if not response.success():
raise Exception(
f'client.im.v1.message.reply failed, code: {response.code}, msg: {response.msg}, log_id: {response.get_log_id()}, resp: \n{json.dumps(json.loads(response.raw.content), indent=4, ensure_ascii=False)}'
)
return response.data.message_id
async def reply_message(
self,
message_source: platform_events.MessageEvent,
message: platform_message.MessageChain,
quote_origin: bool = False,
):
if self.config['enable-card-reply']:
await self.reply_card_message(message_source, message, quote_origin)
else:
await self.reply_normal_message(message_source, message, quote_origin)
async def reply_card_message(
self,
message_source: platform_events.MessageEvent,
message: platform_message.MessageChain,
quote_origin: bool = False,
):
"""
回复消息变成更新卡片消息
"""
lark_message = await self.message_converter.yiri2target(message, self.api_client)
text_message = ''
for ele in lark_message[0]:
if ele['tag'] == 'text':
text_message += ele['text']
elif ele['tag'] == 'md':
text_message += ele['text']
content = {
'type': 'template',
'data': {'template_id': self.config['card_template_id'], 'template_variable': {'content': text_message}},
}
request: PatchMessageRequest = (
PatchMessageRequest.builder()
.message_id(self.message_id_to_card_id[message_source.message_chain.message_id][0])
.request_body(PatchMessageRequestBody.builder().content(json.dumps(content)).build())
.build()
)
# 发起请求
response: PatchMessageResponse = self.api_client.im.v1.message.patch(request)
# 处理失败返回
if not response.success():
raise Exception(
f'client.im.v1.message.patch failed, code: {response.code}, msg: {response.msg}, log_id: {response.get_log_id()}, resp: \n{json.dumps(json.loads(response.raw.content), indent=4, ensure_ascii=False)}'
)
return
async def reply_normal_message(
self,
message_source: platform_events.MessageEvent,
message: platform_message.MessageChain,
quote_origin: bool = False,
):
# 不再需要了,因为message_id已经被包含到message_chain中
# lark_event = await self.event_converter.yiri2target(message_source)
@@ -492,4 +595,9 @@ class LarkAdapter(adapter.MessagePlatformAdapter):
)
async def kill(self) -> bool:
# 需要断开连接,不然旧的连接会继续运行,导致飞书消息来时会随机选择一个连接
# 断开时lark.ws.Client的_receive_message_loop会打印error日志: receive message loop exit。然后进行重连,
# 所以要设置_auto_reconnect=False,让其不重连。
self.bot._auto_reconnect = False
await self.bot._disconnect()
return False
+17
View File
@@ -65,6 +65,23 @@ spec:
type: string
required: true
default: ""
- name: enable-card-reply
label:
en_US: Enable Card Reply Mode
zh_Hans: 启用飞书卡片回复模式
description:
en_US: If enabled, the bot will use the card of lark reply mode
zh_Hans: 如果启用,将使用飞书卡片方式来回复内容
type: boolean
required: true
default: false
- name: card_template_id
label:
en_US: card template id
zh_Hans: 卡片模板ID
type: string
required: true
default: "填写你的卡片template_id"
execution:
python:
path: ./lark.py
Binary file not shown.

After

Width:  |  Height:  |  Size: 466 KiB

+1
View File
@@ -8,6 +8,7 @@ metadata:
description:
en_US: WeChatPad Adapter
zh_CN: WeChatPad 适配器
icon: wechatpad.png
spec:
config:
- name: wechatpad_url
+11 -1
View File
@@ -1,12 +1,14 @@
from __future__ import annotations
import sqlalchemy
import traceback
from . import entities, requester
from ...core import app
from ...discover import engine
from . import token
from ...entity.persistence import model as persistence_model
from ...entity.errors import provider as provider_errors
FETCH_MODEL_LIST_URL = 'https://api.qchatgpt.rockchin.top/api/v2/fetch/model_list'
@@ -64,7 +66,12 @@ class ModelManager:
# load models
for llm_model in llm_models:
await self.load_llm_model(llm_model)
try:
await self.load_llm_model(llm_model)
except provider_errors.RequesterNotFoundError as e:
self.ap.logger.warning(f'Requester {e.requester_name} not found, skipping model {llm_model.uuid}')
except Exception as e:
self.ap.logger.error(f'Failed to load model {llm_model.uuid}: {e}\n{traceback.format_exc()}')
async def init_runtime_llm_model(
self,
@@ -76,6 +83,9 @@ class ModelManager:
elif isinstance(model_info, dict):
model_info = persistence_model.LLMModel(**model_info)
if model_info.requester not in self.requester_dict:
raise provider_errors.RequesterNotFoundError(model_info.requester)
requester_inst = self.requester_dict[model_info.requester](ap=self.ap, config=model_info.requester_config)
await requester_inst.initialize()
Binary file not shown.

After

Width:  |  Height:  |  Size: 9.3 KiB

@@ -0,0 +1,17 @@
from __future__ import annotations
import typing
import openai
from . import chatcmpl
class AI302ChatCompletions(chatcmpl.OpenAIChatCompletions):
"""302 AI ChatCompletion API 请求器"""
client: openai.AsyncClient
default_config: dict[str, typing.Any] = {
'base_url': 'https://api.302.ai/v1',
'timeout': 120,
}
@@ -0,0 +1,28 @@
apiVersion: v1
kind: LLMAPIRequester
metadata:
name: 302-ai-chat-completions
label:
en_US: 302 AI
zh_Hans: 302 AI
icon: 302ai.png
spec:
config:
- name: base_url
label:
en_US: Base URL
zh_Hans: 基础 URL
type: string
required: true
default: "https://api.302.ai/v1"
- name: timeout
label:
en_US: Timeout
zh_Hans: 超时时间
type: integer
required: true
default: 120
execution:
python:
path: ./302aichatcmpl.py
attr: AI302ChatCompletions
+37 -4
View File
@@ -108,7 +108,13 @@ class DifyServiceAPIRunner(runner.RequestRunner):
mode = 'basic' # 标记是基础编排还是工作流编排
basic_mode_pending_chunk = ''
stream_output_pending_chunk = ''
batch_pending_max_size = self.pipeline_config['ai']['dify-service-api'].get(
'output-batch-size', 0
) # 积累一定量的消息更新消息一次
batch_pending_index = 0
inputs = {}
@@ -126,6 +132,13 @@ class DifyServiceAPIRunner(runner.RequestRunner):
):
self.ap.logger.debug('dify-chat-chunk: ' + str(chunk))
# 查询异常情况
if chunk['event'] == 'error':
yield llm_entities.Message(
role='assistant',
content=f"查询异常: [{chunk['code']}]. {chunk['message']}.\n请重试,如果还报错,请用 <font color='red'>**!reset**</font> 命令重置对话再尝试。",
)
if chunk['event'] == 'workflow_started':
mode = 'workflow'
@@ -136,15 +149,35 @@ class DifyServiceAPIRunner(runner.RequestRunner):
role='assistant',
content=self._try_convert_thinking(chunk['data']['outputs']['answer']),
)
elif chunk['event'] == 'message':
stream_output_pending_chunk += chunk['answer']
if self.pipeline_config['ai']['dify-service-api'].get('enable-streaming', False):
# 消息数超过量就输出,从而达到streaming的效果
batch_pending_index += 1
if batch_pending_index >= batch_pending_max_size:
yield llm_entities.Message(
role='assistant',
content=self._try_convert_thinking(stream_output_pending_chunk),
)
batch_pending_index = 0
elif mode == 'basic':
if chunk['event'] == 'message':
basic_mode_pending_chunk += chunk['answer']
stream_output_pending_chunk += chunk['answer']
if self.pipeline_config['ai']['dify-service-api'].get('enable-streaming', False):
# 消息数超过量就输出,从而达到streaming的效果
batch_pending_index += 1
if batch_pending_index >= batch_pending_max_size:
yield llm_entities.Message(
role='assistant',
content=self._try_convert_thinking(stream_output_pending_chunk),
)
batch_pending_index = 0
elif chunk['event'] == 'message_end':
yield llm_entities.Message(
role='assistant',
content=self._try_convert_thinking(basic_mode_pending_chunk),
content=self._try_convert_thinking(stream_output_pending_chunk),
)
basic_mode_pending_chunk = ''
stream_output_pending_chunk = ''
if chunk is None:
raise errors.DifyAPIError('Dify API 没有返回任何响应,请检查网络连接和API配置')