From b9662250a62299c4560a979d0c8ed8bcaff82f3f Mon Sep 17 00:00:00 2001 From: Bruce Date: Fri, 1 May 2026 18:13:55 +0800 Subject: [PATCH] add conversation expire config & user query text to dingtalk card (#2147) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add conversation expire config * add user query text to card * fix(pipeline): move session limit to AI config * test(pipeline): cover AI session limit config * refactor(pipeline): merge session expire-time into AI runner stage Move the session validity duration field out of the standalone session-limit stage into the runner stage so it actually renders in the AI tab (the tab only shows the runner stage and the stage matching the selected runner — any other stage is filtered out). Read path, default config, metadata description, and tests updated accordingly. Co-Authored-By: Claude Opus 4.7 (1M context) * fix(pipeline): expire conversations from last update time * fix(n8n): sync generated conversation id into payload --------- Co-authored-by: RockChinQ Co-authored-by: Claude Opus 4.7 (1M context) --- src/langbot/libs/dingtalk_api/api.py | 6 + src/langbot/pkg/pipeline/preproc/preproc.py | 21 ++++ src/langbot/pkg/provider/runners/n8nsvapi.py | 6 + .../templates/default-pipeline-config.json | 3 +- .../templates/metadata/pipeline/ai.yaml | 22 +++- .../templates/metadata/pipeline/safety.yaml | 2 +- .../pipeline/test_chat_session_limit.py | 106 ++++++++++++++++++ 7 files changed, 163 insertions(+), 3 deletions(-) create mode 100644 tests/unit_tests/pipeline/test_chat_session_limit.py diff --git a/src/langbot/libs/dingtalk_api/api.py b/src/langbot/libs/dingtalk_api/api.py index 723ee9e8..f453d0bf 100644 --- a/src/langbot/libs/dingtalk_api/api.py +++ b/src/langbot/libs/dingtalk_api/api.py @@ -481,6 +481,12 @@ class DingTalkClient: card_data['config'] = json.dumps({'autoLayout': card_auto_layout}) card_data['content'] = '' + # 将用户的消息内容作为卡片的查询参数,方便后续处理 + if incoming_message.message_type == 'text': + card_data['query'] = incoming_message.get_text_list()[0] + else: + card_data['query'] = '...' + card_instance = dingtalk_stream.AICardReplier(self.client, incoming_message) # print(card_instance) # 先投放卡片: https://open.dingtalk.com/document/orgapp/create-and-deliver-cards diff --git a/src/langbot/pkg/pipeline/preproc/preproc.py b/src/langbot/pkg/pipeline/preproc/preproc.py index 0f865044..83ddce89 100644 --- a/src/langbot/pkg/pipeline/preproc/preproc.py +++ b/src/langbot/pkg/pipeline/preproc/preproc.py @@ -75,6 +75,27 @@ class PreProcessor(stage.PipelineStage): query.bot_uuid, ) + # Expire externally managed conversation ids after the conversation has + # been idle for longer than the configured conversation expire time. + # The idle window is measured from the last preprocess/update time, not + # from the conversation creation time. + conversation_expire_time = query.pipeline_config.get('ai', {}).get('runner', {}).get('expire-time', None) + now = datetime.datetime.now() + if conversation_expire_time is not None and conversation_expire_time > 0: + last_update_time = getattr(conversation, 'update_time', None) or getattr(conversation, 'create_time', None) + if last_update_time is not None: + conversation_idle_time = now.timestamp() - last_update_time.timestamp() + if conversation_idle_time > conversation_expire_time: + self.ap.logger.info( + f'Conversation({query.query_id}) is expired (idle: {conversation_idle_time}s), create new conversation' + ) + conversation.uuid = None + + # Treat every preprocess pass as a conversation activity update. This + # makes future expiry checks use the latest incoming message/preprocess + # time instead of the first message/creation time. + conversation.update_time = now + # 设置query query.session = session query.prompt = conversation.prompt.copy() diff --git a/src/langbot/pkg/provider/runners/n8nsvapi.py b/src/langbot/pkg/provider/runners/n8nsvapi.py index f8238eb6..543fd7ef 100644 --- a/src/langbot/pkg/provider/runners/n8nsvapi.py +++ b/src/langbot/pkg/provider/runners/n8nsvapi.py @@ -187,6 +187,12 @@ class N8nServiceAPIRunner(runner.RequestRunner): if not query.session.using_conversation.uuid: query.session.using_conversation.uuid = str(uuid.uuid4()) + # Keep query variables in sync with the generated/new conversation id. + # query.variables is later merged into payload and would otherwise + # overwrite the generated conversation_id with the stale preprocessor + # value (usually None for a new conversation). + query.variables['conversation_id'] = query.session.using_conversation.uuid + # 预处理用户消息 plain_text = await self._preprocess_user_message(query) diff --git a/src/langbot/templates/default-pipeline-config.json b/src/langbot/templates/default-pipeline-config.json index e40d3914..fe6e2842 100644 --- a/src/langbot/templates/default-pipeline-config.json +++ b/src/langbot/templates/default-pipeline-config.json @@ -38,7 +38,8 @@ }, "ai": { "runner": { - "runner": "local-agent" + "runner": "local-agent", + "expire-time": 0 }, "local-agent": { "model": { diff --git a/src/langbot/templates/metadata/pipeline/ai.yaml b/src/langbot/templates/metadata/pipeline/ai.yaml index a944c217..fd68fb47 100644 --- a/src/langbot/templates/metadata/pipeline/ai.yaml +++ b/src/langbot/templates/metadata/pipeline/ai.yaml @@ -47,6 +47,26 @@ stages: label: en_US: Langflow API zh_Hans: Langflow API + - name: expire-time + label: + en_US: Conversation expire time (seconds) + zh_Hans: 单个对话过期时间(秒) + description: + en_US: >- + Maximum idle time of a single conversation, measured from the last + message/preprocess update time. When a new message arrives and the + current conversation has been idle for longer than this value, the + existing external conversation id is discarded and a new one is + started automatically — the user does not need to trigger a reset + manually. Set to 0 to disable expiry (conversations live until the + user or another mechanism resets them). + zh_Hans: >- + 单个对话的最长空闲时间,从最后一条消息/preprocess 更新时间开始计算。 + 当新消息到达且当前对话空闲时间已超过该值时,旧的外部对话 ID 会被自动丢弃并开启新对话, + 用户无需手动重置。设置为 0 表示不限制过期时间(对话会一直保留,直到用户或其他机制主动重置)。 + type: integer + required: true + default: 0 - name: local-agent label: en_US: Local Agent @@ -539,4 +559,4 @@ stages: zh_Hans: 可选的流程调整参数 type: json required: false - default: '{}' \ No newline at end of file + default: '{}' diff --git a/src/langbot/templates/metadata/pipeline/safety.yaml b/src/langbot/templates/metadata/pipeline/safety.yaml index 32edf9f0..12cb5604 100644 --- a/src/langbot/templates/metadata/pipeline/safety.yaml +++ b/src/langbot/templates/metadata/pipeline/safety.yaml @@ -72,4 +72,4 @@ stages: - name: wait label: en_US: Wait - zh_Hans: 等待 \ No newline at end of file + zh_Hans: 等待 diff --git a/tests/unit_tests/pipeline/test_chat_session_limit.py b/tests/unit_tests/pipeline/test_chat_session_limit.py new file mode 100644 index 00000000..15cfd10b --- /dev/null +++ b/tests/unit_tests/pipeline/test_chat_session_limit.py @@ -0,0 +1,106 @@ +from __future__ import annotations + +from datetime import datetime, timedelta +from importlib import import_module +from pathlib import Path +from types import SimpleNamespace +from unittest.mock import AsyncMock, Mock + +import pytest +import yaml + + +def _preproc_module(): + # Import pipelinemgr first so pipeline stages are registered without tripping + # the stage <-> core.app circular import during isolated test collection. + import_module('langbot.pkg.pipeline.pipelinemgr') + return import_module('langbot.pkg.pipeline.preproc.preproc') + + +def _entities_module(): + return import_module('langbot.pkg.pipeline.entities') + + +def _conversation(created_at: datetime, updated_at: datetime | None = None): + prompt = Mock() + prompt.messages = [] + prompt.copy = Mock(return_value=Mock(messages=[])) + + return SimpleNamespace( + uuid='existing-conversation-uuid', + create_time=created_at, + update_time=updated_at, + prompt=prompt, + messages=[], + ) + + +def _prompt_preprocessing_context(default_prompt=None, prompt=None): + ctx = Mock() + ctx.event.default_prompt = default_prompt or [] + ctx.event.prompt = prompt or [] + return ctx + + +async def _run_preprocessor(mock_app, sample_query, conversation): + session = SimpleNamespace(launcher_type=sample_query.launcher_type, launcher_id=sample_query.launcher_id) + mock_app.sess_mgr.get_session = AsyncMock(return_value=session) + mock_app.sess_mgr.get_conversation = AsyncMock(return_value=conversation) + mock_app.plugin_connector.emit_event = AsyncMock(return_value=_prompt_preprocessing_context()) + + sample_query.pipeline_config = { + 'ai': { + 'runner': {'runner': 'local-agent', 'expire-time': 60}, + 'local-agent': {'model': {'primary': '', 'fallbacks': []}, 'prompt': []}, + }, + 'trigger': {'misc': {'combine-quote-message': False}}, + 'output': {'misc': {'exception-handling': 'show-hint'}}, + } + + return await _preproc_module().PreProcessor(mock_app).process(sample_query, 'PreProcessor') + + +@pytest.mark.asyncio +async def test_preprocessor_expires_conversation_from_last_update_time(mock_app, sample_query): + conversation = _conversation( + created_at=datetime.now() - timedelta(seconds=10), + updated_at=datetime.now() - timedelta(seconds=120), + ) + + result = await _run_preprocessor(mock_app, sample_query, conversation) + + assert result.result_type == _entities_module().ResultType.CONTINUE + assert conversation.uuid is None + assert conversation.update_time > datetime.now() - timedelta(seconds=5) + assert result.new_query.variables['conversation_id'] is None + + +@pytest.mark.asyncio +async def test_preprocessor_keeps_conversation_when_last_update_is_not_expired(mock_app, sample_query): + conversation = _conversation( + created_at=datetime.now() - timedelta(seconds=120), + updated_at=datetime.now() - timedelta(seconds=30), + ) + + result = await _run_preprocessor(mock_app, sample_query, conversation) + + assert result.result_type == _entities_module().ResultType.CONTINUE + assert conversation.uuid == 'existing-conversation-uuid' + assert conversation.update_time > datetime.now() - timedelta(seconds=5) + assert result.new_query.variables['conversation_id'] == 'existing-conversation-uuid' + + +def test_expire_time_metadata_lives_under_ai_runner_not_safety(): + metadata_dir = Path('src/langbot/templates/metadata/pipeline') + + ai_meta = yaml.safe_load((metadata_dir / 'ai.yaml').read_text()) + safety_meta = yaml.safe_load((metadata_dir / 'safety.yaml').read_text()) + + ai_stage_names = [stage['name'] for stage in ai_meta['stages']] + assert 'session-limit' not in ai_stage_names + assert 'session-limit' not in [stage['name'] for stage in safety_meta['stages']] + + runner_stage = next(stage for stage in ai_meta['stages'] if stage['name'] == 'runner') + expire_time = next(item for item in runner_stage['config'] if item['name'] == 'expire-time') + assert 'Conversation expire time' in expire_time['label']['en_US'] + assert 'Session validity' not in expire_time['label']['en_US']