mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-02 03:55:55 +00:00
add conversation expire config & user query text to dingtalk card (#2147)
* add conversation expire config * add user query text to card * fix(pipeline): move session limit to AI config * test(pipeline): cover AI session limit config * refactor(pipeline): merge session expire-time into AI runner stage Move the session validity duration field out of the standalone session-limit stage into the runner stage so it actually renders in the AI tab (the tab only shows the runner stage and the stage matching the selected runner — any other stage is filtered out). Read path, default config, metadata description, and tests updated accordingly. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(pipeline): expire conversations from last update time * fix(n8n): sync generated conversation id into payload --------- Co-authored-by: RockChinQ <rockchinq@gmail.com> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -481,6 +481,12 @@ class DingTalkClient:
|
||||
card_data['config'] = json.dumps({'autoLayout': card_auto_layout})
|
||||
card_data['content'] = ''
|
||||
|
||||
# 将用户的消息内容作为卡片的查询参数,方便后续处理
|
||||
if incoming_message.message_type == 'text':
|
||||
card_data['query'] = incoming_message.get_text_list()[0]
|
||||
else:
|
||||
card_data['query'] = '...'
|
||||
|
||||
card_instance = dingtalk_stream.AICardReplier(self.client, incoming_message)
|
||||
# print(card_instance)
|
||||
# 先投放卡片: https://open.dingtalk.com/document/orgapp/create-and-deliver-cards
|
||||
|
||||
@@ -75,6 +75,27 @@ class PreProcessor(stage.PipelineStage):
|
||||
query.bot_uuid,
|
||||
)
|
||||
|
||||
# Expire externally managed conversation ids after the conversation has
|
||||
# been idle for longer than the configured conversation expire time.
|
||||
# The idle window is measured from the last preprocess/update time, not
|
||||
# from the conversation creation time.
|
||||
conversation_expire_time = query.pipeline_config.get('ai', {}).get('runner', {}).get('expire-time', None)
|
||||
now = datetime.datetime.now()
|
||||
if conversation_expire_time is not None and conversation_expire_time > 0:
|
||||
last_update_time = getattr(conversation, 'update_time', None) or getattr(conversation, 'create_time', None)
|
||||
if last_update_time is not None:
|
||||
conversation_idle_time = now.timestamp() - last_update_time.timestamp()
|
||||
if conversation_idle_time > conversation_expire_time:
|
||||
self.ap.logger.info(
|
||||
f'Conversation({query.query_id}) is expired (idle: {conversation_idle_time}s), create new conversation'
|
||||
)
|
||||
conversation.uuid = None
|
||||
|
||||
# Treat every preprocess pass as a conversation activity update. This
|
||||
# makes future expiry checks use the latest incoming message/preprocess
|
||||
# time instead of the first message/creation time.
|
||||
conversation.update_time = now
|
||||
|
||||
# 设置query
|
||||
query.session = session
|
||||
query.prompt = conversation.prompt.copy()
|
||||
|
||||
@@ -187,6 +187,12 @@ class N8nServiceAPIRunner(runner.RequestRunner):
|
||||
if not query.session.using_conversation.uuid:
|
||||
query.session.using_conversation.uuid = str(uuid.uuid4())
|
||||
|
||||
# Keep query variables in sync with the generated/new conversation id.
|
||||
# query.variables is later merged into payload and would otherwise
|
||||
# overwrite the generated conversation_id with the stale preprocessor
|
||||
# value (usually None for a new conversation).
|
||||
query.variables['conversation_id'] = query.session.using_conversation.uuid
|
||||
|
||||
# 预处理用户消息
|
||||
plain_text = await self._preprocess_user_message(query)
|
||||
|
||||
|
||||
@@ -38,7 +38,8 @@
|
||||
},
|
||||
"ai": {
|
||||
"runner": {
|
||||
"runner": "local-agent"
|
||||
"runner": "local-agent",
|
||||
"expire-time": 0
|
||||
},
|
||||
"local-agent": {
|
||||
"model": {
|
||||
|
||||
@@ -47,6 +47,26 @@ stages:
|
||||
label:
|
||||
en_US: Langflow API
|
||||
zh_Hans: Langflow API
|
||||
- name: expire-time
|
||||
label:
|
||||
en_US: Conversation expire time (seconds)
|
||||
zh_Hans: 单个对话过期时间(秒)
|
||||
description:
|
||||
en_US: >-
|
||||
Maximum idle time of a single conversation, measured from the last
|
||||
message/preprocess update time. When a new message arrives and the
|
||||
current conversation has been idle for longer than this value, the
|
||||
existing external conversation id is discarded and a new one is
|
||||
started automatically — the user does not need to trigger a reset
|
||||
manually. Set to 0 to disable expiry (conversations live until the
|
||||
user or another mechanism resets them).
|
||||
zh_Hans: >-
|
||||
单个对话的最长空闲时间,从最后一条消息/preprocess 更新时间开始计算。
|
||||
当新消息到达且当前对话空闲时间已超过该值时,旧的外部对话 ID 会被自动丢弃并开启新对话,
|
||||
用户无需手动重置。设置为 0 表示不限制过期时间(对话会一直保留,直到用户或其他机制主动重置)。
|
||||
type: integer
|
||||
required: true
|
||||
default: 0
|
||||
- name: local-agent
|
||||
label:
|
||||
en_US: Local Agent
|
||||
|
||||
106
tests/unit_tests/pipeline/test_chat_session_limit.py
Normal file
106
tests/unit_tests/pipeline/test_chat_session_limit.py
Normal file
@@ -0,0 +1,106 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from importlib import import_module
|
||||
from pathlib import Path
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import AsyncMock, Mock
|
||||
|
||||
import pytest
|
||||
import yaml
|
||||
|
||||
|
||||
def _preproc_module():
|
||||
# Import pipelinemgr first so pipeline stages are registered without tripping
|
||||
# the stage <-> core.app circular import during isolated test collection.
|
||||
import_module('langbot.pkg.pipeline.pipelinemgr')
|
||||
return import_module('langbot.pkg.pipeline.preproc.preproc')
|
||||
|
||||
|
||||
def _entities_module():
|
||||
return import_module('langbot.pkg.pipeline.entities')
|
||||
|
||||
|
||||
def _conversation(created_at: datetime, updated_at: datetime | None = None):
|
||||
prompt = Mock()
|
||||
prompt.messages = []
|
||||
prompt.copy = Mock(return_value=Mock(messages=[]))
|
||||
|
||||
return SimpleNamespace(
|
||||
uuid='existing-conversation-uuid',
|
||||
create_time=created_at,
|
||||
update_time=updated_at,
|
||||
prompt=prompt,
|
||||
messages=[],
|
||||
)
|
||||
|
||||
|
||||
def _prompt_preprocessing_context(default_prompt=None, prompt=None):
|
||||
ctx = Mock()
|
||||
ctx.event.default_prompt = default_prompt or []
|
||||
ctx.event.prompt = prompt or []
|
||||
return ctx
|
||||
|
||||
|
||||
async def _run_preprocessor(mock_app, sample_query, conversation):
|
||||
session = SimpleNamespace(launcher_type=sample_query.launcher_type, launcher_id=sample_query.launcher_id)
|
||||
mock_app.sess_mgr.get_session = AsyncMock(return_value=session)
|
||||
mock_app.sess_mgr.get_conversation = AsyncMock(return_value=conversation)
|
||||
mock_app.plugin_connector.emit_event = AsyncMock(return_value=_prompt_preprocessing_context())
|
||||
|
||||
sample_query.pipeline_config = {
|
||||
'ai': {
|
||||
'runner': {'runner': 'local-agent', 'expire-time': 60},
|
||||
'local-agent': {'model': {'primary': '', 'fallbacks': []}, 'prompt': []},
|
||||
},
|
||||
'trigger': {'misc': {'combine-quote-message': False}},
|
||||
'output': {'misc': {'exception-handling': 'show-hint'}},
|
||||
}
|
||||
|
||||
return await _preproc_module().PreProcessor(mock_app).process(sample_query, 'PreProcessor')
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_preprocessor_expires_conversation_from_last_update_time(mock_app, sample_query):
|
||||
conversation = _conversation(
|
||||
created_at=datetime.now() - timedelta(seconds=10),
|
||||
updated_at=datetime.now() - timedelta(seconds=120),
|
||||
)
|
||||
|
||||
result = await _run_preprocessor(mock_app, sample_query, conversation)
|
||||
|
||||
assert result.result_type == _entities_module().ResultType.CONTINUE
|
||||
assert conversation.uuid is None
|
||||
assert conversation.update_time > datetime.now() - timedelta(seconds=5)
|
||||
assert result.new_query.variables['conversation_id'] is None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_preprocessor_keeps_conversation_when_last_update_is_not_expired(mock_app, sample_query):
|
||||
conversation = _conversation(
|
||||
created_at=datetime.now() - timedelta(seconds=120),
|
||||
updated_at=datetime.now() - timedelta(seconds=30),
|
||||
)
|
||||
|
||||
result = await _run_preprocessor(mock_app, sample_query, conversation)
|
||||
|
||||
assert result.result_type == _entities_module().ResultType.CONTINUE
|
||||
assert conversation.uuid == 'existing-conversation-uuid'
|
||||
assert conversation.update_time > datetime.now() - timedelta(seconds=5)
|
||||
assert result.new_query.variables['conversation_id'] == 'existing-conversation-uuid'
|
||||
|
||||
|
||||
def test_expire_time_metadata_lives_under_ai_runner_not_safety():
|
||||
metadata_dir = Path('src/langbot/templates/metadata/pipeline')
|
||||
|
||||
ai_meta = yaml.safe_load((metadata_dir / 'ai.yaml').read_text())
|
||||
safety_meta = yaml.safe_load((metadata_dir / 'safety.yaml').read_text())
|
||||
|
||||
ai_stage_names = [stage['name'] for stage in ai_meta['stages']]
|
||||
assert 'session-limit' not in ai_stage_names
|
||||
assert 'session-limit' not in [stage['name'] for stage in safety_meta['stages']]
|
||||
|
||||
runner_stage = next(stage for stage in ai_meta['stages'] if stage['name'] == 'runner')
|
||||
expire_time = next(item for item in runner_stage['config'] if item['name'] == 'expire-time')
|
||||
assert 'Conversation expire time' in expire_time['label']['en_US']
|
||||
assert 'Session validity' not in expire_time['label']['en_US']
|
||||
Reference in New Issue
Block a user