diff --git a/pkg/api/http/controller/groups/pipelines/webchat.py b/pkg/api/http/controller/groups/pipelines/webchat.py
index a3bf8585..bebac818 100644
--- a/pkg/api/http/controller/groups/pipelines/webchat.py
+++ b/pkg/api/http/controller/groups/pipelines/webchat.py
@@ -15,7 +15,7 @@ class WebChatDebugRouterGroup(group.RouterGroup):
async def stream_generator(generator):
async for message in generator:
yield rf"data:{json.dumps({'message': message})}\n\n"
- yield "data:{'type': 'end'}\n\n''"
+ yield "data:{type: end}\n\n''"
try:
data = await quart.request.get_json()
session_type = data.get('session_type', 'person')
diff --git a/pkg/platform/sources/webchat.py b/pkg/platform/sources/webchat.py
index 2e8b7b99..274c5657 100644
--- a/pkg/platform/sources/webchat.py
+++ b/pkg/platform/sources/webchat.py
@@ -19,6 +19,7 @@ class WebChatMessage(BaseModel):
content: str
message_chain: list[dict]
timestamp: str
+ is_final: bool = False
class WebChatSession:
@@ -117,10 +118,10 @@ class WebChatAdapter(msadapter.MessagePlatformAdapter):
async def reply_message_chunk(
self,
message_source: platform_events.MessageEvent,
- message_id: str,
+ message_id: int,
message: platform_message.MessageChain,
quote_origin: bool = False,
- is_fianl: bool = False,
+ is_final: bool = False,
) -> dict:
"""回复消息"""
message_data = WebChatMessage(
@@ -132,14 +133,21 @@ class WebChatAdapter(msadapter.MessagePlatformAdapter):
)
# notify waiter
- if isinstance(message_source, platform_events.FriendMessage):
- queue = self.webchat_person_session.resp_queues[message_source.message_chain.message_id]
- elif isinstance(message_source, platform_events.GroupMessage):
- queue = self.webchat_group_session.resp_queues[message_source.message_chain.message_id]
+ session = (self.webchat_group_session if isinstance(message_source, platform_events.GroupMessage) else self.webchat_person_session)
+ if message_source.message_chain.message_id not in session.resp_waiters:
+ # session.resp_waiters[message_source.message_chain.message_id] = asyncio.Queue()
+ queue = session.resp_queues[message_source.message_chain.message_id]
+
+ # if isinstance(message_source, platform_events.FriendMessage):
+ # queue = self.webchat_person_session.resp_queues[message_source.message_chain.message_id]
+ # elif isinstance(message_source, platform_events.GroupMessage):
+ # queue = self.webchat_group_session.resp_queues[message_source.message_chain.message_id]
+ if is_final:
+ message_data.is_final = True
+ # print(message_data)
+ await queue.put(message_data)
+
- queue.put(message_data)
- if is_fianl:
- queue.put(None)
return message_data.model_dump()
@@ -192,6 +200,10 @@ class WebChatAdapter(msadapter.MessagePlatformAdapter):
message_id = len(use_session.get_message_list(pipeline_uuid)) + 1
+ if is_stream:
+ use_session.resp_queues[message_id] = asyncio.Queue()
+ logger.debug(f"Initialized queue for message_id: {message_id}")
+
use_session.get_message_list(pipeline_uuid).append(
WebChatMessage(
id=message_id,
@@ -232,9 +244,11 @@ class WebChatAdapter(msadapter.MessagePlatformAdapter):
queue = use_session.resp_queues[message_id]
while True:
resp_message = await queue.get()
- if resp_message is None:
+ print(resp_message)
+ if resp_message.is_final:
resp_message.id = len(use_session.get_message_list(pipeline_uuid)) + 1
use_session.get_message_list(pipeline_uuid).append(resp_message)
+ yield resp_message.model_dump()
break
yield resp_message.model_dump()
diff --git a/web/src/app/home/pipelines/components/debug-dialog/DebugDialog.tsx b/web/src/app/home/pipelines/components/debug-dialog/DebugDialog.tsx
index 45bf8b38..2c051b00 100644
--- a/web/src/app/home/pipelines/components/debug-dialog/DebugDialog.tsx
+++ b/web/src/app/home/pipelines/components/debug-dialog/DebugDialog.tsx
@@ -46,17 +46,20 @@ export default function DebugDialog({
messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
};
- const loadMessages = useCallback(async (pipelineId: string) => {
- try {
- const response = await httpClient.getWebChatHistoryMessages(
- pipelineId,
- sessionType,
- );
- setMessages(response.messages);
- } catch (error) {
- console.error('Failed to load messages:', error);
- }
- }, [sessionType]);
+ const loadMessages = useCallback(
+ async (pipelineId: string) => {
+ try {
+ const response = await httpClient.getWebChatHistoryMessages(
+ pipelineId,
+ sessionType,
+ );
+ setMessages(response.messages);
+ } catch (error) {
+ console.error('Failed to load messages:', error);
+ }
+ },
+ [sessionType],
+ );
useEffect(() => {
scrollToBottom();
@@ -242,7 +245,6 @@ export default function DebugDialog({
}
}
} else {
-
setMessages((prevMessages) => [...prevMessages, userMessage]);
setInputValue('');
setHasAt(false);
@@ -388,10 +390,7 @@ export default function DebugDialog({
{t('pipelines.debugDialog.streaming')}
-