fix: stash

This commit is contained in:
Junyan Qin
2025-08-07 21:56:40 +08:00
parent e744e9c4ef
commit 051fffd41e
2 changed files with 26 additions and 16 deletions
@@ -48,8 +48,7 @@ class WebChatDebugRouterGroup(group.RouterGroup):
} }
return quart.Response(stream_generator(generator), mimetype='text/event-stream',headers=headers) return quart.Response(stream_generator(generator), mimetype='text/event-stream',headers=headers)
else: else: # non-stream
# result = await webchat_adapter.send_webchat_message(pipeline_uuid, session_type, message_chain_obj)
result = None result = None
async for message in webchat_adapter.send_webchat_message( async for message in webchat_adapter.send_webchat_message(
pipeline_uuid, session_type, message_chain_obj pipeline_uuid, session_type, message_chain_obj
+25 -14
View File
@@ -26,7 +26,7 @@ class WebChatSession:
id: str id: str
message_lists: dict[str, list[WebChatMessage]] = {} message_lists: dict[str, list[WebChatMessage]] = {}
resp_waiters: dict[int, asyncio.Future[WebChatMessage]] resp_waiters: dict[int, asyncio.Future[WebChatMessage]]
resp_queues = dict[int, asyncio.Queue[WebChatMessage]] resp_queues: dict[int, asyncio.Queue[WebChatMessage]]
def __init__(self, id: str): def __init__(self, id: str):
self.id = id self.id = id
@@ -109,9 +109,9 @@ class WebChatAdapter(msadapter.MessagePlatformAdapter):
# notify waiter # notify waiter
if isinstance(message_source, platform_events.FriendMessage): if isinstance(message_source, platform_events.FriendMessage):
self.webchat_person_session.resp_waiters[message_source.message_chain.message_id].set_result(message_data) self.webchat_person_session.resp_queues[message_source.message_chain.message_id].put(message_data)
elif isinstance(message_source, platform_events.GroupMessage): elif isinstance(message_source, platform_events.GroupMessage):
self.webchat_group_session.resp_waiters[message_source.message_chain.message_id].set_result(message_data) self.webchat_group_session.resp_queues[message_source.message_chain.message_id].put(message_data)
return message_data.model_dump() return message_data.model_dump()
@@ -205,9 +205,8 @@ class WebChatAdapter(msadapter.MessagePlatformAdapter):
message_id = len(use_session.get_message_list(pipeline_uuid)) + 1 message_id = len(use_session.get_message_list(pipeline_uuid)) + 1
if is_stream: use_session.resp_queues[message_id] = asyncio.Queue()
use_session.resp_queues[message_id] = asyncio.Queue() logger.debug(f'Initialized queue for message_id: {message_id}')
logger.debug(f'Initialized queue for message_id: {message_id}')
use_session.get_message_list(pipeline_uuid).append( use_session.get_message_list(pipeline_uuid).append(
WebChatMessage( WebChatMessage(
@@ -242,6 +241,7 @@ class WebChatAdapter(msadapter.MessagePlatformAdapter):
self.ap.platform_mgr.webchat_proxy_bot.bot_entity.use_pipeline_uuid = pipeline_uuid self.ap.platform_mgr.webchat_proxy_bot.bot_entity.use_pipeline_uuid = pipeline_uuid
# trigger pipeline
if event.__class__ in self.listeners: if event.__class__ in self.listeners:
await self.listeners[event.__class__](event, self) await self.listeners[event.__class__](event, self)
@@ -257,20 +257,31 @@ class WebChatAdapter(msadapter.MessagePlatformAdapter):
yield resp_message.model_dump() yield resp_message.model_dump()
break break
yield resp_message.model_dump() yield resp_message.model_dump()
use_session.resp_queues.pop(message_id)
else: else: # non-stream
# set waiter # set waiter
waiter = asyncio.Future[WebChatMessage]() # waiter = asyncio.Future[WebChatMessage]()
use_session.resp_waiters[message_id] = waiter # use_session.resp_waiters[message_id] = waiter
waiter.add_done_callback(lambda future: use_session.resp_waiters.pop(message_id)) # # waiter.add_done_callback(lambda future: use_session.resp_waiters.pop(message_id))
resp_message = await waiter # resp_message = await waiter
resp_message.id = len(use_session.get_message_list(pipeline_uuid)) + 1 # resp_message.id = len(use_session.get_message_list(pipeline_uuid)) + 1
use_session.get_message_list(pipeline_uuid).append(resp_message) # use_session.get_message_list(pipeline_uuid).append(resp_message)
yield resp_message.model_dump() # yield resp_message.model_dump()
queue = use_session.resp_queues[message_id]
while True:
resp_message = await queue.get()
resp_message.id = msg_id
if resp_message.is_final:
resp_message.id = msg_id
use_session.get_message_list(pipeline_uuid).append(resp_message)
yield resp_message.model_dump()
break
yield resp_message.model_dump()
def get_webchat_messages(self, pipeline_uuid: str, session_type: str) -> list[dict]: def get_webchat_messages(self, pipeline_uuid: str, session_type: str) -> list[dict]:
"""获取调试消息历史""" """获取调试消息历史"""