diff --git a/src/langbot/pkg/entity/persistence/bot.py b/src/langbot/pkg/entity/persistence/bot.py index 08eda478..c3fa295f 100644 --- a/src/langbot/pkg/entity/persistence/bot.py +++ b/src/langbot/pkg/entity/persistence/bot.py @@ -16,6 +16,7 @@ class Bot(Base): enable = sqlalchemy.Column(sqlalchemy.Boolean, nullable=False, default=False) use_pipeline_name = sqlalchemy.Column(sqlalchemy.String(255), nullable=True) use_pipeline_uuid = sqlalchemy.Column(sqlalchemy.String(255), nullable=True) + pipeline_routing_rules = sqlalchemy.Column(sqlalchemy.JSON, nullable=False, server_default='[]') created_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False, server_default=sqlalchemy.func.now()) updated_at = sqlalchemy.Column( sqlalchemy.DateTime, diff --git a/src/langbot/pkg/persistence/migrations/dbm025_bot_pipeline_routing_rules.py b/src/langbot/pkg/persistence/migrations/dbm025_bot_pipeline_routing_rules.py new file mode 100644 index 00000000..816bf286 --- /dev/null +++ b/src/langbot/pkg/persistence/migrations/dbm025_bot_pipeline_routing_rules.py @@ -0,0 +1,15 @@ +import sqlalchemy +from .. import migration + + +@migration.migration_class(25) +class DBMigrateBotPipelineRoutingRules(migration.DBMigration): + """Add pipeline_routing_rules column to bots table""" + + async def upgrade(self): + sql_text = sqlalchemy.text("ALTER TABLE bots ADD COLUMN pipeline_routing_rules JSON NOT NULL DEFAULT '[]'") + await self.ap.persistence_mgr.execute_async(sql_text) + + async def downgrade(self): + sql_text = sqlalchemy.text('ALTER TABLE bots DROP COLUMN pipeline_routing_rules') + await self.ap.persistence_mgr.execute_async(sql_text) diff --git a/src/langbot/pkg/pipeline/aggregator.py b/src/langbot/pkg/pipeline/aggregator.py index 249557f1..1af85a65 100644 --- a/src/langbot/pkg/pipeline/aggregator.py +++ b/src/langbot/pkg/pipeline/aggregator.py @@ -37,6 +37,7 @@ class PendingMessage: message_chain: platform_message.MessageChain adapter: abstract_platform_adapter.AbstractMessagePlatformAdapter pipeline_uuid: typing.Optional[str] + routed_by_rule: bool = False timestamp: float = field(default_factory=time.time) @@ -125,6 +126,7 @@ class MessageAggregator: message_chain: platform_message.MessageChain, adapter: abstract_platform_adapter.AbstractMessagePlatformAdapter, pipeline_uuid: typing.Optional[str] = None, + routed_by_rule: bool = False, ) -> None: """Add a message to the aggregation buffer @@ -145,6 +147,7 @@ class MessageAggregator: message_chain=message_chain, adapter=adapter, pipeline_uuid=pipeline_uuid, + routed_by_rule=routed_by_rule, ) return @@ -159,6 +162,7 @@ class MessageAggregator: message_chain=message_chain, adapter=adapter, pipeline_uuid=pipeline_uuid, + routed_by_rule=routed_by_rule, ) force_flush = False @@ -217,6 +221,7 @@ class MessageAggregator: message_chain=msg.message_chain, adapter=msg.adapter, pipeline_uuid=msg.pipeline_uuid, + routed_by_rule=msg.routed_by_rule, ) return @@ -231,6 +236,7 @@ class MessageAggregator: message_chain=merged_msg.message_chain, adapter=merged_msg.adapter, pipeline_uuid=merged_msg.pipeline_uuid, + routed_by_rule=merged_msg.routed_by_rule, ) def _merge_messages(self, messages: list[PendingMessage]) -> PendingMessage: diff --git a/src/langbot/pkg/pipeline/controller.py b/src/langbot/pkg/pipeline/controller.py index 988306cf..09d18a58 100644 --- a/src/langbot/pkg/pipeline/controller.py +++ b/src/langbot/pkg/pipeline/controller.py @@ -63,6 +63,14 @@ class Controller: pipeline = await self.ap.pipeline_mgr.get_pipeline_by_uuid(pipeline_uuid) if pipeline: await pipeline.run(selected_query) + else: + self.ap.logger.warning( + f'Pipeline {pipeline_uuid} not found for query {selected_query.query_id}, query dropped' + ) + else: + self.ap.logger.warning( + f'No pipeline_uuid for query {selected_query.query_id}, query dropped' + ) async with self.ap.query_pool: (await self.ap.sess_mgr.get_session(selected_query))._semaphore.release() diff --git a/src/langbot/pkg/pipeline/pipelinemgr.py b/src/langbot/pkg/pipeline/pipelinemgr.py index 5d0012d1..40f93cda 100644 --- a/src/langbot/pkg/pipeline/pipelinemgr.py +++ b/src/langbot/pkg/pipeline/pipelinemgr.py @@ -323,6 +323,9 @@ class RuntimePipeline: event_ctx = await self.ap.plugin_connector.emit_event(event_obj, bound_plugins) if event_ctx.is_prevented_default(): + self.ap.logger.debug( + f'MessageReceived event prevented default for query {query.query_id}, pipeline={pipeline_name}' + ) return self.ap.logger.debug(f'Processing query {query.query_id}') diff --git a/src/langbot/pkg/pipeline/pool.py b/src/langbot/pkg/pipeline/pool.py index eb7df66b..9ee8ab07 100644 --- a/src/langbot/pkg/pipeline/pool.py +++ b/src/langbot/pkg/pipeline/pool.py @@ -41,6 +41,7 @@ class QueryPool: message_chain: platform_message.MessageChain, adapter: abstract_platform_adapter.AbstractMessagePlatformAdapter, pipeline_uuid: typing.Optional[str] = None, + routed_by_rule: bool = False, ) -> pipeline_query.Query: async with self.condition: query_id = self.query_id_counter @@ -52,7 +53,7 @@ class QueryPool: sender_id=sender_id, message_event=message_event, message_chain=message_chain, - variables={}, + variables={'_routed_by_rule': routed_by_rule}, resp_messages=[], resp_message_chain=[], adapter=adapter, diff --git a/src/langbot/pkg/pipeline/process/handlers/chat.py b/src/langbot/pkg/pipeline/process/handlers/chat.py index 6f971333..203a3612 100644 --- a/src/langbot/pkg/pipeline/process/handlers/chat.py +++ b/src/langbot/pkg/pipeline/process/handlers/chat.py @@ -61,6 +61,9 @@ class ChatMessageHandler(handler.MessageHandler): yield entities.StageProcessResult(result_type=entities.ResultType.CONTINUE, new_query=query) else: + self.ap.logger.debug( + f'NormalMessageReceived event prevented default for query {query.query_id} without reply' + ) yield entities.StageProcessResult(result_type=entities.ResultType.INTERRUPT, new_query=query) else: if event_ctx.event.user_message_alter is not None: diff --git a/src/langbot/pkg/pipeline/resprule/resprule.py b/src/langbot/pkg/pipeline/resprule/resprule.py index 1a3560ff..6e7fc369 100644 --- a/src/langbot/pkg/pipeline/resprule/resprule.py +++ b/src/langbot/pkg/pipeline/resprule/resprule.py @@ -37,6 +37,10 @@ class GroupRespondRuleCheckStage(stage.PipelineStage): if query.launcher_type.value != 'group': # 只处理群消息 return entities.StageProcessResult(result_type=entities.ResultType.CONTINUE, new_query=query) + # 通过路由规则明确指定的流水线,跳过群响应规则检查 + if query.variables and query.variables.get('_routed_by_rule', False): + return entities.StageProcessResult(result_type=entities.ResultType.CONTINUE, new_query=query) + rules = query.pipeline_config['trigger']['group-respond-rules'] use_rule = rules diff --git a/src/langbot/pkg/platform/botmgr.py b/src/langbot/pkg/platform/botmgr.py index 269e6713..6ae0f4c2 100644 --- a/src/langbot/pkg/platform/botmgr.py +++ b/src/langbot/pkg/platform/botmgr.py @@ -1,6 +1,8 @@ from __future__ import annotations import asyncio +import json +import re import traceback import sqlalchemy @@ -52,6 +54,148 @@ class RuntimeBot: self.task_context = taskmgr.TaskContext() self.logger = logger + @staticmethod + def _match_operator(actual: str, operator: str, expected: str) -> bool: + """Evaluate a single operator condition.""" + if operator == 'eq': + return actual == expected + elif operator == 'neq': + return actual != expected + elif operator == 'contains': + return expected in actual + elif operator == 'not_contains': + return expected not in actual + elif operator == 'starts_with': + return actual.startswith(expected) + elif operator == 'regex': + try: + return bool(re.search(expected, actual)) + except re.error: + return False + return False + + PIPELINE_DISCARD = '__discard__' + PIPELINE_DISCARD_DISPLAY_NAME = 'Discarded' + + def resolve_pipeline_uuid( + self, + launcher_type: str, + launcher_id: str, + message_text: str, + message_element_types: list[str] | None = None, + ) -> tuple[str | None, bool]: + """Resolve pipeline UUID based on routing rules. + + Rules are evaluated in order; first match wins. + Falls back to use_pipeline_uuid if no rule matches. + + Rule types: + - launcher_type: session type ("person" / "group") + - launcher_id: session / group id + - message_content: message text content + - message_has_element: message contains element of given type + (Image, Voice, File, Forward, Face, At, AtAll, Quote) + Operators: eq (has), neq (doesn't have) + + Operators: eq, neq, contains, not_contains, starts_with, regex + + When pipeline_uuid is ``__discard__``, the message should be + silently dropped by the caller. + + Returns: + tuple: (pipeline_uuid, routed_by_rule) - routed_by_rule is True + when a routing rule matched, False when falling back to default. + """ + rules = self.bot_entity.pipeline_routing_rules or [] + element_type_set = set(message_element_types or []) + + for rule in rules: + rule_type = rule.get('type') + operator = rule.get('operator', 'eq') + rule_value = rule.get('value', '') + target_uuid = rule.get('pipeline_uuid') + if not rule_type or not target_uuid: + continue + + if rule_type == 'launcher_type': + if self._match_operator(launcher_type, operator, rule_value): + return target_uuid, True + elif rule_type == 'launcher_id': + if self._match_operator(str(launcher_id), operator, str(rule_value)): + return target_uuid, True + elif rule_type == 'message_content': + if self._match_operator(message_text, operator, rule_value): + return target_uuid, True + elif rule_type == 'message_has_element': + has_element = rule_value in element_type_set + if operator == 'eq' and has_element: + return target_uuid, True + elif operator == 'neq' and not has_element: + return target_uuid, True + + return self.bot_entity.use_pipeline_uuid, False + + async def _record_discarded_message( + self, + launcher_type: provider_session.LauncherTypes, + launcher_id: str | int, + sender_id: str | int, + message_event: platform_events.MessageEvent, + message_chain: platform_message.MessageChain, + ) -> None: + """Record a discarded message in the monitoring system.""" + try: + if hasattr(message_chain, 'model_dump'): + message_content = json.dumps(message_chain.model_dump(), ensure_ascii=False) + else: + message_content = str(message_chain) + + sender_name = None + if hasattr(message_event, 'sender'): + if hasattr(message_event.sender, 'nickname'): + sender_name = message_event.sender.nickname + elif hasattr(message_event.sender, 'member_name'): + sender_name = message_event.sender.member_name + + # Use the same session_id format as monitoring_helper.py + session_id = f'{launcher_type}_{launcher_id}' + platform = launcher_type.value if hasattr(launcher_type, 'value') else str(launcher_type) + + await self.ap.monitoring_service.record_message( + bot_id=self.bot_entity.uuid, + bot_name=self.bot_entity.name or self.bot_entity.uuid, + pipeline_id=self.PIPELINE_DISCARD, + pipeline_name=self.PIPELINE_DISCARD_DISPLAY_NAME, + message_content=message_content, + session_id=session_id, + status='discarded', + level='info', + platform=platform, + user_id=str(sender_id), + user_name=sender_name, + ) + + # Ensure the session exists so the message appears in the session monitor. + # Don't overwrite pipeline info — a session may have messages from + # multiple pipelines; discarding shouldn't change the displayed pipeline. + session_updated = await self.ap.monitoring_service.update_session_activity( + session_id, + ) + if not session_updated: + # No session yet (first message for this launcher was discarded). + await self.ap.monitoring_service.record_session_start( + session_id=session_id, + bot_id=self.bot_entity.uuid, + bot_name=self.bot_entity.name or self.bot_entity.uuid, + pipeline_id=self.PIPELINE_DISCARD, + pipeline_name=self.PIPELINE_DISCARD_DISPLAY_NAME, + platform=platform, + user_id=str(sender_id), + user_name=sender_name, + ) + except Exception as e: + await self.logger.error(f'Failed to record discarded message: {e}') + async def initialize(self): async def on_friend_message( event: platform_events.FriendMessage, @@ -83,6 +227,23 @@ class RuntimeBot: if custom_launcher_id: launcher_id = custom_launcher_id + message_text = str(event.message_chain) + element_types = [comp.type for comp in event.message_chain] + pipeline_uuid, routed_by_rule = self.resolve_pipeline_uuid( + 'person', launcher_id, message_text, element_types + ) + + if pipeline_uuid == self.PIPELINE_DISCARD: + await self.logger.info('Person message discarded by routing rule') + await self._record_discarded_message( + provider_session.LauncherTypes.PERSON, + launcher_id, + event.sender.id, + event, + event.message_chain, + ) + return + await self.ap.msg_aggregator.add_message( bot_uuid=self.bot_entity.uuid, launcher_type=provider_session.LauncherTypes.PERSON, @@ -91,7 +252,8 @@ class RuntimeBot: message_event=event, message_chain=event.message_chain, adapter=adapter, - pipeline_uuid=self.bot_entity.use_pipeline_uuid, + pipeline_uuid=pipeline_uuid, + routed_by_rule=routed_by_rule, ) else: await self.logger.info('Pipeline skipped for person message due to webhook response') @@ -126,6 +288,23 @@ class RuntimeBot: if custom_launcher_id: launcher_id = custom_launcher_id + message_text = str(event.message_chain) + element_types = [comp.type for comp in event.message_chain] + pipeline_uuid, routed_by_rule = self.resolve_pipeline_uuid( + 'group', launcher_id, message_text, element_types + ) + + if pipeline_uuid == self.PIPELINE_DISCARD: + await self.logger.info('Group message discarded by routing rule') + await self._record_discarded_message( + provider_session.LauncherTypes.GROUP, + launcher_id, + event.sender.id, + event, + event.message_chain, + ) + return + await self.ap.msg_aggregator.add_message( bot_uuid=self.bot_entity.uuid, launcher_type=provider_session.LauncherTypes.GROUP, @@ -134,7 +313,8 @@ class RuntimeBot: message_event=event, message_chain=event.message_chain, adapter=adapter, - pipeline_uuid=self.bot_entity.use_pipeline_uuid, + pipeline_uuid=pipeline_uuid, + routed_by_rule=routed_by_rule, ) else: await self.logger.info('Pipeline skipped for group message due to webhook response') diff --git a/src/langbot/pkg/utils/constants.py b/src/langbot/pkg/utils/constants.py index ea0a682f..4fad9069 100644 --- a/src/langbot/pkg/utils/constants.py +++ b/src/langbot/pkg/utils/constants.py @@ -2,7 +2,7 @@ import langbot semantic_version = f'v{langbot.__version__}' -required_database_version = 24 +required_database_version = 25 """Tag the version of the database schema, used to check if the database needs to be migrated""" debug_mode = False diff --git a/tests/unit_tests/platform/__init__.py b/tests/unit_tests/platform/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit_tests/platform/test_routing_rules.py b/tests/unit_tests/platform/test_routing_rules.py new file mode 100644 index 00000000..3928f6f1 --- /dev/null +++ b/tests/unit_tests/platform/test_routing_rules.py @@ -0,0 +1,280 @@ +""" +RuntimeBot.resolve_pipeline_uuid and _match_operator unit tests +""" + +from unittest.mock import Mock + + +class TestMatchOperator: + """Test the _match_operator static method.""" + + @staticmethod + def _get_class(): + from langbot.pkg.platform.botmgr import RuntimeBot + + return RuntimeBot + + def test_eq(self): + cls = self._get_class() + assert cls._match_operator('hello', 'eq', 'hello') is True + assert cls._match_operator('hello', 'eq', 'world') is False + + def test_neq(self): + cls = self._get_class() + assert cls._match_operator('hello', 'neq', 'world') is True + assert cls._match_operator('hello', 'neq', 'hello') is False + + def test_contains(self): + cls = self._get_class() + assert cls._match_operator('hello world', 'contains', 'world') is True + assert cls._match_operator('hello world', 'contains', 'xyz') is False + + def test_not_contains(self): + cls = self._get_class() + assert cls._match_operator('hello world', 'not_contains', 'xyz') is True + assert cls._match_operator('hello world', 'not_contains', 'world') is False + + def test_starts_with(self): + cls = self._get_class() + assert cls._match_operator('hello world', 'starts_with', 'hello') is True + assert cls._match_operator('hello world', 'starts_with', 'world') is False + + def test_regex(self): + cls = self._get_class() + assert cls._match_operator('hello123', 'regex', r'\d+') is True + assert cls._match_operator('hello', 'regex', r'\d+') is False + + def test_regex_invalid_pattern(self): + cls = self._get_class() + assert cls._match_operator('hello', 'regex', r'[invalid') is False + + def test_unknown_operator(self): + cls = self._get_class() + assert cls._match_operator('hello', 'unknown_op', 'hello') is False + + +class TestResolvePipelineUuid: + """Test the resolve_pipeline_uuid method.""" + + @staticmethod + def _make_bot(default_pipeline: str, rules: list): + from langbot.pkg.platform.botmgr import RuntimeBot + + bot_entity = Mock() + bot_entity.use_pipeline_uuid = default_pipeline + bot_entity.pipeline_routing_rules = rules + + bot = object.__new__(RuntimeBot) + bot.bot_entity = bot_entity + return bot + + def test_no_rules_returns_default(self): + bot = self._make_bot('default-uuid', []) + uuid, routed = bot.resolve_pipeline_uuid('person', '123', 'hi') + assert uuid == 'default-uuid' + assert routed is False + + def test_none_rules_returns_default(self): + bot = self._make_bot('default-uuid', None) + uuid, routed = bot.resolve_pipeline_uuid('person', '123', 'hi') + assert uuid == 'default-uuid' + assert routed is False + + def test_launcher_type_match(self): + rules = [ + { + 'type': 'launcher_type', + 'operator': 'eq', + 'value': 'group', + 'pipeline_uuid': 'group-pipeline', + } + ] + bot = self._make_bot('default-uuid', rules) + + uuid, routed = bot.resolve_pipeline_uuid('group', '123', 'hi') + assert uuid == 'group-pipeline' + assert routed is True + + uuid, routed = bot.resolve_pipeline_uuid('person', '123', 'hi') + assert uuid == 'default-uuid' + assert routed is False + + def test_launcher_id_match(self): + rules = [ + { + 'type': 'launcher_id', + 'operator': 'eq', + 'value': '12345', + 'pipeline_uuid': 'vip-pipeline', + } + ] + bot = self._make_bot('default-uuid', rules) + + uuid, routed = bot.resolve_pipeline_uuid('person', '12345', 'hi') + assert uuid == 'vip-pipeline' + assert routed is True + + uuid, routed = bot.resolve_pipeline_uuid('person', '99999', 'hi') + assert uuid == 'default-uuid' + assert routed is False + + def test_message_content_contains(self): + rules = [ + { + 'type': 'message_content', + 'operator': 'contains', + 'value': '紧急', + 'pipeline_uuid': 'urgent-pipeline', + } + ] + bot = self._make_bot('default-uuid', rules) + + uuid, routed = bot.resolve_pipeline_uuid('person', '123', '这是紧急消息') + assert uuid == 'urgent-pipeline' + assert routed is True + + uuid, routed = bot.resolve_pipeline_uuid('person', '123', '普通消息') + assert uuid == 'default-uuid' + assert routed is False + + def test_message_content_regex(self): + rules = [ + { + 'type': 'message_content', + 'operator': 'regex', + 'value': r'^/admin\b', + 'pipeline_uuid': 'admin-pipeline', + } + ] + bot = self._make_bot('default-uuid', rules) + + uuid, routed = bot.resolve_pipeline_uuid('person', '123', '/admin help') + assert uuid == 'admin-pipeline' + assert routed is True + + uuid, routed = bot.resolve_pipeline_uuid('person', '123', 'hello /admin') + assert uuid == 'default-uuid' + assert routed is False + + def test_message_has_element_eq(self): + rules = [ + { + 'type': 'message_has_element', + 'operator': 'eq', + 'value': 'Image', + 'pipeline_uuid': 'image-pipeline', + } + ] + bot = self._make_bot('default-uuid', rules) + + uuid, routed = bot.resolve_pipeline_uuid('person', '123', 'hi', ['Plain', 'Image']) + assert uuid == 'image-pipeline' + assert routed is True + + uuid, routed = bot.resolve_pipeline_uuid('person', '123', 'hi', ['Plain']) + assert uuid == 'default-uuid' + assert routed is False + + def test_message_has_element_neq(self): + rules = [ + { + 'type': 'message_has_element', + 'operator': 'neq', + 'value': 'Image', + 'pipeline_uuid': 'text-only-pipeline', + } + ] + bot = self._make_bot('default-uuid', rules) + + uuid, routed = bot.resolve_pipeline_uuid('person', '123', 'hi', ['Plain']) + assert uuid == 'text-only-pipeline' + assert routed is True + + uuid, routed = bot.resolve_pipeline_uuid('person', '123', 'hi', ['Plain', 'Image']) + assert uuid == 'default-uuid' + assert routed is False + + def test_message_has_element_no_types_provided(self): + """When element types are not provided, should not match.""" + rules = [ + { + 'type': 'message_has_element', + 'operator': 'eq', + 'value': 'Image', + 'pipeline_uuid': 'image-pipeline', + } + ] + bot = self._make_bot('default-uuid', rules) + + uuid, routed = bot.resolve_pipeline_uuid('person', '123', 'hi') + assert uuid == 'default-uuid' + assert routed is False + + def test_first_match_wins(self): + rules = [ + { + 'type': 'launcher_type', + 'operator': 'eq', + 'value': 'group', + 'pipeline_uuid': 'first-pipeline', + }, + { + 'type': 'launcher_type', + 'operator': 'eq', + 'value': 'group', + 'pipeline_uuid': 'second-pipeline', + }, + ] + bot = self._make_bot('default-uuid', rules) + + uuid, routed = bot.resolve_pipeline_uuid('group', '123', 'hi') + assert uuid == 'first-pipeline' + assert routed is True + + def test_skip_invalid_rules(self): + rules = [ + {'type': '', 'operator': 'eq', 'value': 'x', 'pipeline_uuid': 'p1'}, + {'type': 'launcher_type', 'operator': 'eq', 'value': 'person', 'pipeline_uuid': ''}, + {'type': 'launcher_type', 'operator': 'eq', 'value': 'person', 'pipeline_uuid': 'valid'}, + ] + bot = self._make_bot('default-uuid', rules) + + uuid, routed = bot.resolve_pipeline_uuid('person', '123', 'hi') + assert uuid == 'valid' + assert routed is True + + def test_default_operator_is_eq(self): + rules = [ + { + 'type': 'launcher_type', + 'value': 'person', + 'pipeline_uuid': 'person-pipeline', + } + ] + bot = self._make_bot('default-uuid', rules) + + uuid, routed = bot.resolve_pipeline_uuid('person', '123', 'hi') + assert uuid == 'person-pipeline' + assert routed is True + + def test_discard_pipeline(self): + """When pipeline_uuid is __discard__, the message should be discarded.""" + from langbot.pkg.platform.botmgr import RuntimeBot + + rules = [ + { + 'type': 'message_content', + 'operator': 'contains', + 'value': 'spam', + 'pipeline_uuid': RuntimeBot.PIPELINE_DISCARD, + } + ] + bot = self._make_bot('default-uuid', rules) + + uuid, routed = bot.resolve_pipeline_uuid('person', '123', 'this is spam') + assert uuid == RuntimeBot.PIPELINE_DISCARD + assert routed is True + + uuid, routed = bot.resolve_pipeline_uuid('person', '123', 'normal message') + assert uuid == 'default-uuid' + assert routed is False diff --git a/web/src/app/home/bots/BotDetailContent.tsx b/web/src/app/home/bots/BotDetailContent.tsx index c59916b3..e0543baa 100644 --- a/web/src/app/home/bots/BotDetailContent.tsx +++ b/web/src/app/home/bots/BotDetailContent.tsx @@ -174,9 +174,11 @@ export default function BotDetailContent({ id }: { id: string }) { )} - - {t('common.save')} - + {activeTab === 'config' && ( + + {t('common.save')} + + )} {/* Horizontal Tabs */} diff --git a/web/src/app/home/bots/components/bot-form/BotForm.tsx b/web/src/app/home/bots/components/bot-form/BotForm.tsx index bc8e5e56..2c97e5be 100644 --- a/web/src/app/home/bots/components/bot-form/BotForm.tsx +++ b/web/src/app/home/bots/components/bot-form/BotForm.tsx @@ -16,6 +16,7 @@ import { httpClient } from '@/app/infra/http/HttpClient'; import { Bot } from '@/app/infra/entities/api'; import { getAdapterDocUrl } from '@/app/infra/entities/adapter-docs'; import { ExternalLink } from 'lucide-react'; +import RoutingRulesEditor from './RoutingRulesEditor'; import { zodResolver } from '@hookform/resolvers/zod'; import { useForm } from 'react-hook-form'; @@ -64,6 +65,28 @@ const getFormSchema = (t: (key: string) => string) => adapter_config: z.record(z.string(), z.any()), enable: z.boolean().optional(), use_pipeline_uuid: z.string().optional(), + pipeline_routing_rules: z + .array( + z.object({ + type: z.enum([ + 'launcher_type', + 'launcher_id', + 'message_content', + 'message_has_element', + ]), + operator: z.enum([ + 'eq', + 'neq', + 'contains', + 'not_contains', + 'starts_with', + 'regex', + ]), + value: z.string(), + pipeline_uuid: z.string(), + }), + ) + .optional(), }); export default function BotForm({ @@ -89,6 +112,7 @@ export default function BotForm({ adapter_config: {}, enable: true, use_pipeline_uuid: '', + pipeline_routing_rules: [], }, }); @@ -155,6 +179,7 @@ export default function BotForm({ adapter_config: val.adapter_config, enable: val.enable, use_pipeline_uuid: val.use_pipeline_uuid || '', + pipeline_routing_rules: val.pipeline_routing_rules || [], }); handleAdapterSelect(val.adapter); @@ -270,6 +295,7 @@ export default function BotForm({ adapter_config: bot.adapter_config, enable: bot.enable ?? true, use_pipeline_uuid: bot.use_pipeline_uuid ?? '', + pipeline_routing_rules: bot.pipeline_routing_rules ?? [], webhook_full_url: runtimeValues?.webhook_full_url as | string | undefined, @@ -314,6 +340,7 @@ export default function BotForm({ adapter_config: form.getValues().adapter_config, enable: form.getValues().enable, use_pipeline_uuid: form.getValues().use_pipeline_uuid, + pipeline_routing_rules: form.getValues().pipeline_routing_rules ?? [], }; httpClient .updateBot(initBotId, updateBot) @@ -464,6 +491,12 @@ export default function BotForm({ )} /> + + {/* Pipeline Routing Rules */} + )} diff --git a/web/src/app/home/bots/components/bot-form/RoutingRulesEditor.tsx b/web/src/app/home/bots/components/bot-form/RoutingRulesEditor.tsx new file mode 100644 index 00000000..42b86626 --- /dev/null +++ b/web/src/app/home/bots/components/bot-form/RoutingRulesEditor.tsx @@ -0,0 +1,480 @@ +'use client'; + +import { useTranslation } from 'react-i18next'; +import { UseFormReturn } from 'react-hook-form'; +import { + PipelineRoutingRule, + RoutingRuleOperator, +} from '@/app/infra/entities/api'; +import { Ban, GripVertical, Plus, Trash2 } from 'lucide-react'; +import { Button } from '@/components/ui/button'; +import { Input } from '@/components/ui/input'; +import { FormLabel } from '@/components/ui/form'; +import { + Select, + SelectContent, + SelectItem, + SelectSeparator, + SelectTrigger, + SelectValue, +} from '@/components/ui/select'; +import { + DndContext, + DragOverlay, + closestCenter, + PointerSensor, + KeyboardSensor, + useSensor, + useSensors, + DragEndEvent, + DragStartEvent, +} from '@dnd-kit/core'; +import { + arrayMove, + SortableContext, + sortableKeyboardCoordinates, + useSortable, + verticalListSortingStrategy, +} from '@dnd-kit/sortable'; +import { CSS } from '@dnd-kit/utilities'; +import { useRef, useMemo, useState } from 'react'; + +export const PIPELINE_DISCARD = '__discard__'; + +interface PipelineOption { + value: string; + label: string; + emoji?: string; +} + +interface RoutingRulesEditorProps { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + form: UseFormReturn; + pipelineNameList: PipelineOption[]; +} + +const OPERATORS_BY_TYPE: Record< + PipelineRoutingRule['type'], + { value: RoutingRuleOperator; labelKey: string }[] +> = { + launcher_type: [ + { value: 'eq', labelKey: 'bots.operatorEq' }, + { value: 'neq', labelKey: 'bots.operatorNeq' }, + ], + launcher_id: [ + { value: 'eq', labelKey: 'bots.operatorEq' }, + { value: 'neq', labelKey: 'bots.operatorNeq' }, + { value: 'contains', labelKey: 'bots.operatorContains' }, + { value: 'not_contains', labelKey: 'bots.operatorNotContains' }, + { value: 'regex', labelKey: 'bots.operatorRegex' }, + ], + message_content: [ + { value: 'eq', labelKey: 'bots.operatorEq' }, + { value: 'neq', labelKey: 'bots.operatorNeq' }, + { value: 'contains', labelKey: 'bots.operatorContains' }, + { value: 'not_contains', labelKey: 'bots.operatorNotContains' }, + { value: 'starts_with', labelKey: 'bots.operatorStartsWith' }, + { value: 'regex', labelKey: 'bots.operatorRegex' }, + ], + message_has_element: [ + { value: 'eq', labelKey: 'bots.operatorHas' }, + { value: 'neq', labelKey: 'bots.operatorNotHas' }, + ], +}; + +function getValuePlaceholder( + t: (key: string) => string, + rule: PipelineRoutingRule, +): string { + if (rule.type === 'launcher_id') + return t('bots.ruleValueLauncherIdPlaceholder'); + if (rule.type === 'message_has_element') + return t('bots.ruleValueElementPlaceholder'); + if (rule.operator === 'regex') return t('bots.ruleValueRegexpPlaceholder'); + return t('bots.ruleValueMessagePlaceholder'); +} + +/* ── Static rule row (used in DragOverlay) ─────────────────────────── */ + +interface RuleRowContentProps { + rule: PipelineRoutingRule; + index: number; + pipelineNameList: PipelineOption[]; + updateRule: (index: number, patch: Partial) => void; + removeRule: (index: number) => void; + dragHandleProps?: Record; + isOverlay?: boolean; +} + +function RuleRowContent({ + rule, + index, + pipelineNameList, + updateRule, + removeRule, + dragHandleProps, + isOverlay, +}: RuleRowContentProps) { + const { t } = useTranslation(); + const operatorsForType = + OPERATORS_BY_TYPE[rule.type] || OPERATORS_BY_TYPE.message_content; + const isDiscard = rule.pipeline_uuid === PIPELINE_DISCARD; + + return ( + + {/* Drag handle */} + + + + + {/* Field selector */} + { + updateRule(index, { + type: val as PipelineRoutingRule['type'], + operator: 'eq', + value: '', + }); + }} + > + + + + + + {t('bots.ruleTypeLauncherType')} + + + {t('bots.ruleTypeLauncherId')} + + + {t('bots.ruleTypeMessageContent')} + + + {t('bots.ruleTypeMessageHasElement')} + + + + + {/* Operator selector */} + { + updateRule(index, { operator: val as RoutingRuleOperator }); + }} + > + + + + + {operatorsForType.map((op) => ( + + {t(op.labelKey)} + + ))} + + + + {/* Value input */} + {rule.type === 'launcher_type' ? ( + updateRule(index, { value: val })} + > + + + + + + {t('bots.sessionTypePerson')} + + {t('bots.sessionTypeGroup')} + + + ) : rule.type === 'message_has_element' ? ( + updateRule(index, { value: val })} + > + + + + + {t('bots.elementImage')} + {t('bots.elementVoice')} + {t('bots.elementFile')} + {t('bots.elementForward')} + {t('bots.elementFace')} + {t('bots.elementAt')} + {t('bots.elementAtAll')} + {t('bots.elementQuote')} + + + ) : ( + updateRule(index, { value: e.target.value })} + /> + )} + + → + + {/* Pipeline selector */} + updateRule(index, { pipeline_uuid: val })} + > + + {rule.pipeline_uuid ? ( + isDiscard ? ( + + + {t('bots.pipelineDiscard')} + + ) : ( + (() => { + const p = pipelineNameList.find( + (p) => p.value === rule.pipeline_uuid, + ); + return ( + + {p?.emoji && ( + {p.emoji} + )} + {p?.label ?? rule.pipeline_uuid} + + ); + })() + ) + ) : ( + + )} + + + + + + {t('bots.pipelineDiscard')} + + + + {pipelineNameList.map((item) => ( + + + {item.emoji && ( + {item.emoji} + )} + {item.label} + + + ))} + + + + removeRule(index)} + > + + + + ); +} + +/* ── Sortable rule row ─────────────────────────────────────────────── */ + +interface SortableRuleRowProps { + id: string; + rule: PipelineRoutingRule; + index: number; + pipelineNameList: PipelineOption[]; + updateRule: (index: number, patch: Partial) => void; + removeRule: (index: number) => void; +} + +function SortableRuleRow({ + id, + rule, + index, + pipelineNameList, + updateRule, + removeRule, +}: SortableRuleRowProps) { + const { attributes, listeners, setNodeRef, transform, isDragging } = + useSortable({ id }); + + const style = { + transform: CSS.Transform.toString(transform), + // No transition — items reorder visually during drag via transform; + // on drop the data updates and transform resets, so animating would + // cause a redundant "swap" flicker. + opacity: isDragging ? 0.3 : undefined, + }; + + return ( + + + + ); +} + +/* ── Main editor ───────────────────────────────────────────────────── */ + +export default function RoutingRulesEditor({ + form, + pipelineNameList, +}: RoutingRulesEditorProps) { + const { t } = useTranslation(); + const [activeId, setActiveId] = useState(null); + + const rules: PipelineRoutingRule[] = + form.watch('pipeline_routing_rules') || []; + + // Stable unique ids for sortable items. + // We keep a running counter so newly added rules always get fresh ids. + const nextId = useRef(0); + const idsRef = useRef([]); + + const sortableIds = useMemo(() => { + // Grow the id list to match rules length (newly added items get new ids). + while (idsRef.current.length < rules.length) { + idsRef.current.push(`rule-${nextId.current++}`); + } + // Shrink if rules were removed from the end. + if (idsRef.current.length > rules.length) { + idsRef.current = idsRef.current.slice(0, rules.length); + } + return idsRef.current; + }, [rules.length]); + + const updateRules = (newRules: PipelineRoutingRule[]) => { + form.setValue('pipeline_routing_rules', newRules, { shouldDirty: true }); + }; + + const addRule = () => { + updateRules([ + ...rules, + { + type: 'launcher_type', + operator: 'eq', + value: '', + pipeline_uuid: '', + }, + ]); + }; + + const updateRule = (index: number, patch: Partial) => { + const updated = [...rules]; + updated[index] = { ...updated[index], ...patch }; + updateRules(updated); + }; + + const removeRule = (index: number) => { + const updated = [...rules]; + updated.splice(index, 1); + // Also remove the corresponding sortable id so indices stay in sync. + idsRef.current.splice(index, 1); + updateRules(updated); + }; + + const sensors = useSensors( + useSensor(PointerSensor, { activationConstraint: { distance: 5 } }), + useSensor(KeyboardSensor, { + coordinateGetter: sortableKeyboardCoordinates, + }), + ); + + const handleDragStart = (event: DragStartEvent) => { + setActiveId(event.active.id as string); + }; + + const handleDragEnd = (event: DragEndEvent) => { + setActiveId(null); + const { active, over } = event; + if (!over || active.id === over.id) return; + + const oldIndex = sortableIds.indexOf(active.id as string); + const newIndex = sortableIds.indexOf(over.id as string); + if (oldIndex === -1 || newIndex === -1) return; + + idsRef.current = arrayMove(idsRef.current, oldIndex, newIndex); + updateRules(arrayMove(rules, oldIndex, newIndex)); + }; + + const activeIndex = activeId ? sortableIds.indexOf(activeId) : -1; + const activeRule = activeIndex >= 0 ? rules[activeIndex] : null; + + return ( + + + + {t('bots.routingRules')} + + {t('bots.routingRulesDescription')} + + + + + {t('bots.addRoutingRule')} + + + + + + {rules.map((rule, index) => ( + + ))} + + + {activeRule ? ( + + ) : null} + + + + ); +} diff --git a/web/src/app/home/bots/components/bot-session/BotSessionMonitor.tsx b/web/src/app/home/bots/components/bot-session/BotSessionMonitor.tsx index f98d48e7..fbaf6fb3 100644 --- a/web/src/app/home/bots/components/bot-session/BotSessionMonitor.tsx +++ b/web/src/app/home/bots/components/bot-session/BotSessionMonitor.tsx @@ -10,7 +10,7 @@ import { useTranslation } from 'react-i18next'; import { httpClient } from '@/app/infra/http/HttpClient'; import { ScrollArea } from '@/components/ui/scroll-area'; import { cn } from '@/lib/utils'; -import { Copy, Check } from 'lucide-react'; +import { Ban, Bot, Copy, Check, Workflow } from 'lucide-react'; import { MessageChainComponent, Plain, @@ -19,6 +19,7 @@ import { Quote, Voice, } from '@/app/infra/entities/message'; +import { PIPELINE_DISCARD } from '@/app/home/bots/components/bot-form/RoutingRulesEditor'; interface SessionInfo { session_id: string; @@ -145,14 +146,18 @@ const BotSessionMonitor = forwardRef< }, [selectedSessionId, loadMessages]); useEffect(() => { - const container = messagesContainerRef.current; - if (container) { - const viewport = container.querySelector( - '[data-radix-scroll-area-viewport]', - ); - const scrollTarget = viewport || container; - scrollTarget.scrollTop = scrollTarget.scrollHeight; - } + if (messages.length === 0) return; + // Wait for DOM to render the new messages before scrolling + requestAnimationFrame(() => { + const container = messagesContainerRef.current; + if (container) { + const viewport = container.querySelector( + '[data-radix-scroll-area-viewport]', + ); + const scrollTarget = viewport || container; + scrollTarget.scrollTop = scrollTarget.scrollHeight; + } + }); }, [messages]); const parseMessageChain = (content: string): MessageChainComponent[] => { @@ -391,7 +396,6 @@ const BotSessionMonitor = forwardRef< )} - {session.pipeline_name} ); @@ -447,12 +451,6 @@ const BotSessionMonitor = forwardRef< > )} - {selectedSession?.pipeline_name && ( - <> - · - {selectedSession.pipeline_name} - > - )} {selectedSession?.is_active && ( <> · @@ -483,6 +481,9 @@ const BotSessionMonitor = forwardRef< ) : ( messages.map((msg) => { const isUser = isUserMessage(msg); + const isDiscarded = + msg.status === 'discarded' || + msg.pipeline_id === PIPELINE_DISCARD; return ( {renderMessageContent(msg)} - {/* Role label + timestamp */} + {/* Role label + pipeline + timestamp */} {formatTime(msg.timestamp)} + {isDiscarded ? ( + + + {t('bots.sessionMonitor.discarded', { + defaultValue: 'Discarded', + })} + + ) : msg.pipeline_name ? ( + + + {msg.pipeline_name} + + ) : null} {msg.status === 'error' && ( error )} {msg.runner_name && ( - + + {msg.runner_name} )} diff --git a/web/src/app/infra/entities/api/index.ts b/web/src/app/infra/entities/api/index.ts index 42285221..a801a9b7 100644 --- a/web/src/app/infra/entities/api/index.ts +++ b/web/src/app/infra/entities/api/index.ts @@ -140,11 +140,31 @@ export interface Bot { adapter_config: object; use_pipeline_name?: string; use_pipeline_uuid?: string; + pipeline_routing_rules?: PipelineRoutingRule[]; created_at?: string; updated_at?: string; adapter_runtime_values?: object; } +export type RoutingRuleOperator = + | 'eq' + | 'neq' + | 'contains' + | 'not_contains' + | 'starts_with' + | 'regex'; + +export interface PipelineRoutingRule { + type: + | 'launcher_type' + | 'launcher_id' + | 'message_content' + | 'message_has_element'; + operator: RoutingRuleOperator; + value: string; + pipeline_uuid: string; +} + export interface ApiRespKnowledgeBases { bases: KnowledgeBase[]; } diff --git a/web/src/i18n/locales/en-US.ts b/web/src/i18n/locales/en-US.ts index 93f3b9a3..192165f2 100644 --- a/web/src/i18n/locales/en-US.ts +++ b/web/src/i18n/locales/en-US.ts @@ -307,6 +307,39 @@ const enUS = { routingConnection: 'Routing & Connection', routingConnectionDescription: 'Bind the pipeline that processes messages for this bot', + routingRules: 'Conditional Routing Rules', + routingRulesDescription: + 'Rules are evaluated in order; first match routes to its pipeline. Fallback to the default pipeline above if none match.', + addRoutingRule: 'Add Rule', + ruleTypeLauncherType: 'Session Type', + ruleTypeLauncherId: 'Session ID', + ruleTypeMessageContent: 'Message Content', + operatorEq: 'Equals', + operatorNeq: 'Not Equals', + operatorContains: 'Contains', + operatorNotContains: 'Not Contains', + operatorStartsWith: 'Starts With', + operatorRegex: 'Regex', + operatorHas: 'Has', + operatorNotHas: 'Does not have', + ruleTypeMessageHasElement: 'Message Element', + ruleValueElementPlaceholder: 'Select element type', + elementImage: 'Image', + elementVoice: 'Voice', + elementFile: 'File', + elementForward: 'Forward', + elementFace: 'Emoji', + elementAt: '@Mention', + elementAtAll: '@All', + elementQuote: 'Quote', + ruleValuePlaceholder: 'Match value', + ruleValueLauncherIdPlaceholder: 'Group or user ID', + ruleValueMessagePlaceholder: 'Message text', + ruleValuePrefixPlaceholder: 'e.g. !draw', + ruleValueRegexpPlaceholder: 'e.g. ^/help', + pipelineDiscard: 'Discard Message', + sessionTypePerson: 'Private Chat', + sessionTypeGroup: 'Group Chat', adapterConfigDescription: 'Configure the selected platform adapter', dangerZone: 'Danger Zone', dangerZoneDescription: 'Irreversible and destructive actions', @@ -355,6 +388,9 @@ const enUS = { refresh: 'Refresh', active: 'Active', inactive: 'Inactive', + discarded: 'Discarded', + userMessage: 'User', + botMessage: 'Assistant', }, }, plugins: { diff --git a/web/src/i18n/locales/zh-Hans.ts b/web/src/i18n/locales/zh-Hans.ts index 78b7b8c8..7822bcfa 100644 --- a/web/src/i18n/locales/zh-Hans.ts +++ b/web/src/i18n/locales/zh-Hans.ts @@ -294,6 +294,39 @@ const zhHans = { basicInfoDescription: '设置机器人名称和描述', routingConnection: '路由与连接', routingConnectionDescription: '绑定处理此机器人消息的流水线', + routingRules: '条件路由规则', + routingRulesDescription: + '按顺序匹配,命中第一条规则后路由到对应流水线;都不匹配时使用上方默认流水线', + addRoutingRule: '添加规则', + ruleTypeLauncherType: '会话类型', + ruleTypeLauncherId: '会话 ID', + ruleTypeMessageContent: '消息内容', + operatorEq: '等于', + operatorNeq: '不等于', + operatorContains: '包含', + operatorNotContains: '不包含', + operatorStartsWith: '前缀匹配', + operatorRegex: '正则匹配', + operatorHas: '包含', + operatorNotHas: '不包含', + ruleTypeMessageHasElement: '消息元素类型', + ruleValueElementPlaceholder: '选择元素类型', + elementImage: '图片', + elementVoice: '语音', + elementFile: '文件', + elementForward: '转发', + elementFace: '表情', + elementAt: '@某人', + elementAtAll: '@全体', + elementQuote: '引用', + ruleValuePlaceholder: '匹配值', + ruleValueLauncherIdPlaceholder: '群号或用户 ID', + ruleValueMessagePlaceholder: '消息内容', + ruleValuePrefixPlaceholder: '如: !draw', + ruleValueRegexpPlaceholder: '如: ^/help', + pipelineDiscard: '丢弃消息', + sessionTypePerson: '私聊', + sessionTypeGroup: '群聊', adapterConfigDescription: '配置所选平台适配器', dangerZone: '危险区域', dangerZoneDescription: '不可逆的操作', @@ -340,6 +373,9 @@ const zhHans = { refresh: '刷新', active: '活跃', inactive: '不活跃', + discarded: '已丢弃', + userMessage: '用户', + botMessage: '助手', }, }, plugins: {
+ {t('bots.routingRulesDescription')} +