diff --git a/src/langbot/pkg/entity/persistence/bot.py b/src/langbot/pkg/entity/persistence/bot.py index 08eda478..c3fa295f 100644 --- a/src/langbot/pkg/entity/persistence/bot.py +++ b/src/langbot/pkg/entity/persistence/bot.py @@ -16,6 +16,7 @@ class Bot(Base): enable = sqlalchemy.Column(sqlalchemy.Boolean, nullable=False, default=False) use_pipeline_name = sqlalchemy.Column(sqlalchemy.String(255), nullable=True) use_pipeline_uuid = sqlalchemy.Column(sqlalchemy.String(255), nullable=True) + pipeline_routing_rules = sqlalchemy.Column(sqlalchemy.JSON, nullable=False, server_default='[]') created_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False, server_default=sqlalchemy.func.now()) updated_at = sqlalchemy.Column( sqlalchemy.DateTime, diff --git a/src/langbot/pkg/persistence/migrations/dbm025_bot_pipeline_routing_rules.py b/src/langbot/pkg/persistence/migrations/dbm025_bot_pipeline_routing_rules.py new file mode 100644 index 00000000..816bf286 --- /dev/null +++ b/src/langbot/pkg/persistence/migrations/dbm025_bot_pipeline_routing_rules.py @@ -0,0 +1,15 @@ +import sqlalchemy +from .. import migration + + +@migration.migration_class(25) +class DBMigrateBotPipelineRoutingRules(migration.DBMigration): + """Add pipeline_routing_rules column to bots table""" + + async def upgrade(self): + sql_text = sqlalchemy.text("ALTER TABLE bots ADD COLUMN pipeline_routing_rules JSON NOT NULL DEFAULT '[]'") + await self.ap.persistence_mgr.execute_async(sql_text) + + async def downgrade(self): + sql_text = sqlalchemy.text('ALTER TABLE bots DROP COLUMN pipeline_routing_rules') + await self.ap.persistence_mgr.execute_async(sql_text) diff --git a/src/langbot/pkg/pipeline/aggregator.py b/src/langbot/pkg/pipeline/aggregator.py index 249557f1..1af85a65 100644 --- a/src/langbot/pkg/pipeline/aggregator.py +++ b/src/langbot/pkg/pipeline/aggregator.py @@ -37,6 +37,7 @@ class PendingMessage: message_chain: platform_message.MessageChain adapter: abstract_platform_adapter.AbstractMessagePlatformAdapter pipeline_uuid: typing.Optional[str] + routed_by_rule: bool = False timestamp: float = field(default_factory=time.time) @@ -125,6 +126,7 @@ class MessageAggregator: message_chain: platform_message.MessageChain, adapter: abstract_platform_adapter.AbstractMessagePlatformAdapter, pipeline_uuid: typing.Optional[str] = None, + routed_by_rule: bool = False, ) -> None: """Add a message to the aggregation buffer @@ -145,6 +147,7 @@ class MessageAggregator: message_chain=message_chain, adapter=adapter, pipeline_uuid=pipeline_uuid, + routed_by_rule=routed_by_rule, ) return @@ -159,6 +162,7 @@ class MessageAggregator: message_chain=message_chain, adapter=adapter, pipeline_uuid=pipeline_uuid, + routed_by_rule=routed_by_rule, ) force_flush = False @@ -217,6 +221,7 @@ class MessageAggregator: message_chain=msg.message_chain, adapter=msg.adapter, pipeline_uuid=msg.pipeline_uuid, + routed_by_rule=msg.routed_by_rule, ) return @@ -231,6 +236,7 @@ class MessageAggregator: message_chain=merged_msg.message_chain, adapter=merged_msg.adapter, pipeline_uuid=merged_msg.pipeline_uuid, + routed_by_rule=merged_msg.routed_by_rule, ) def _merge_messages(self, messages: list[PendingMessage]) -> PendingMessage: diff --git a/src/langbot/pkg/pipeline/controller.py b/src/langbot/pkg/pipeline/controller.py index 988306cf..09d18a58 100644 --- a/src/langbot/pkg/pipeline/controller.py +++ b/src/langbot/pkg/pipeline/controller.py @@ -63,6 +63,14 @@ class Controller: pipeline = await self.ap.pipeline_mgr.get_pipeline_by_uuid(pipeline_uuid) if pipeline: await pipeline.run(selected_query) + else: + self.ap.logger.warning( + f'Pipeline {pipeline_uuid} not found for query {selected_query.query_id}, query dropped' + ) + else: + self.ap.logger.warning( + f'No pipeline_uuid for query {selected_query.query_id}, query dropped' + ) async with self.ap.query_pool: (await self.ap.sess_mgr.get_session(selected_query))._semaphore.release() diff --git a/src/langbot/pkg/pipeline/pipelinemgr.py b/src/langbot/pkg/pipeline/pipelinemgr.py index 5d0012d1..e59a5f15 100644 --- a/src/langbot/pkg/pipeline/pipelinemgr.py +++ b/src/langbot/pkg/pipeline/pipelinemgr.py @@ -247,7 +247,9 @@ class RuntimePipeline: await self._check_output(query, result) if result.result_type == pipeline_entities.ResultType.INTERRUPT: - self.ap.logger.debug(f'Stage {stage_container.inst_name} interrupted query {query.query_id}') + self.ap.logger.warning( + f'Stage {stage_container.inst_name} interrupted query {query.query_id}, message: {str(query.message_chain)[:100]}' + ) break elif result.result_type == pipeline_entities.ResultType.CONTINUE: query = result.new_query @@ -261,7 +263,9 @@ class RuntimePipeline: await self._check_output(query, sub_result) if sub_result.result_type == pipeline_entities.ResultType.INTERRUPT: - self.ap.logger.debug(f'Stage {stage_container.inst_name} interrupted query {query.query_id}') + self.ap.logger.warning( + f'Stage {stage_container.inst_name} interrupted query {query.query_id}, message: {str(query.message_chain)[:100]}' + ) break elif sub_result.result_type == pipeline_entities.ResultType.CONTINUE: query = sub_result.new_query @@ -323,6 +327,9 @@ class RuntimePipeline: event_ctx = await self.ap.plugin_connector.emit_event(event_obj, bound_plugins) if event_ctx.is_prevented_default(): + self.ap.logger.warning( + f'MessageReceived event prevented default for query {query.query_id}, pipeline={pipeline_name}, message: {str(query.message_chain)[:100]}' + ) return self.ap.logger.debug(f'Processing query {query.query_id}') diff --git a/src/langbot/pkg/pipeline/pool.py b/src/langbot/pkg/pipeline/pool.py index eb7df66b..9ee8ab07 100644 --- a/src/langbot/pkg/pipeline/pool.py +++ b/src/langbot/pkg/pipeline/pool.py @@ -41,6 +41,7 @@ class QueryPool: message_chain: platform_message.MessageChain, adapter: abstract_platform_adapter.AbstractMessagePlatformAdapter, pipeline_uuid: typing.Optional[str] = None, + routed_by_rule: bool = False, ) -> pipeline_query.Query: async with self.condition: query_id = self.query_id_counter @@ -52,7 +53,7 @@ class QueryPool: sender_id=sender_id, message_event=message_event, message_chain=message_chain, - variables={}, + variables={'_routed_by_rule': routed_by_rule}, resp_messages=[], resp_message_chain=[], adapter=adapter, diff --git a/src/langbot/pkg/pipeline/process/handlers/chat.py b/src/langbot/pkg/pipeline/process/handlers/chat.py index 6f971333..0f2e6a90 100644 --- a/src/langbot/pkg/pipeline/process/handlers/chat.py +++ b/src/langbot/pkg/pipeline/process/handlers/chat.py @@ -61,6 +61,9 @@ class ChatMessageHandler(handler.MessageHandler): yield entities.StageProcessResult(result_type=entities.ResultType.CONTINUE, new_query=query) else: + self.ap.logger.warning( + f'NormalMessageReceived event prevented default for query {query.query_id} without reply, message: {str(query.message_chain)[:100]}' + ) yield entities.StageProcessResult(result_type=entities.ResultType.INTERRUPT, new_query=query) else: if event_ctx.event.user_message_alter is not None: diff --git a/src/langbot/pkg/pipeline/resprule/resprule.py b/src/langbot/pkg/pipeline/resprule/resprule.py index 1a3560ff..6e7fc369 100644 --- a/src/langbot/pkg/pipeline/resprule/resprule.py +++ b/src/langbot/pkg/pipeline/resprule/resprule.py @@ -37,6 +37,10 @@ class GroupRespondRuleCheckStage(stage.PipelineStage): if query.launcher_type.value != 'group': # 只处理群消息 return entities.StageProcessResult(result_type=entities.ResultType.CONTINUE, new_query=query) + # 通过路由规则明确指定的流水线,跳过群响应规则检查 + if query.variables and query.variables.get('_routed_by_rule', False): + return entities.StageProcessResult(result_type=entities.ResultType.CONTINUE, new_query=query) + rules = query.pipeline_config['trigger']['group-respond-rules'] use_rule = rules diff --git a/src/langbot/pkg/platform/botmgr.py b/src/langbot/pkg/platform/botmgr.py index f9907b84..aa03ec38 100644 --- a/src/langbot/pkg/platform/botmgr.py +++ b/src/langbot/pkg/platform/botmgr.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import re import traceback import sqlalchemy @@ -52,6 +53,69 @@ class RuntimeBot: self.task_context = taskmgr.TaskContext() self.logger = logger + @staticmethod + def _match_operator(actual: str, operator: str, expected: str) -> bool: + """Evaluate a single operator condition.""" + if operator == 'eq': + return actual == expected + elif operator == 'neq': + return actual != expected + elif operator == 'contains': + return expected in actual + elif operator == 'not_contains': + return expected not in actual + elif operator == 'starts_with': + return actual.startswith(expected) + elif operator == 'regex': + try: + return bool(re.search(expected, actual)) + except re.error: + return False + return False + + def resolve_pipeline_uuid( + self, + launcher_type: str, + launcher_id: str, + message_text: str, + ) -> tuple[str | None, bool]: + """Resolve pipeline UUID based on routing rules. + + Rules are evaluated in order; first match wins. + Falls back to use_pipeline_uuid if no rule matches. + + Rule types: + - launcher_type: session type ("person" / "group") + - launcher_id: session / group id + - message_content: message text content + + Operators: eq, neq, contains, not_contains, starts_with, regex + + Returns: + tuple: (pipeline_uuid, routed_by_rule) - routed_by_rule is True + when a routing rule matched, False when falling back to default. + """ + rules = self.bot_entity.pipeline_routing_rules or [] + for rule in rules: + rule_type = rule.get('type') + operator = rule.get('operator', 'eq') + rule_value = rule.get('value', '') + target_uuid = rule.get('pipeline_uuid') + if not rule_type or not target_uuid: + continue + + if rule_type == 'launcher_type': + if self._match_operator(launcher_type, operator, rule_value): + return target_uuid, True + elif rule_type == 'launcher_id': + if self._match_operator(str(launcher_id), operator, str(rule_value)): + return target_uuid, True + elif rule_type == 'message_content': + if self._match_operator(message_text, operator, rule_value): + return target_uuid, True + + return self.bot_entity.use_pipeline_uuid, False + async def initialize(self): async def on_friend_message( event: platform_events.FriendMessage, @@ -83,6 +147,9 @@ class RuntimeBot: if custom_launcher_id: launcher_id = custom_launcher_id + message_text = str(event.message_chain) + pipeline_uuid, routed_by_rule = self.resolve_pipeline_uuid('person', launcher_id, message_text) + await self.ap.msg_aggregator.add_message( bot_uuid=self.bot_entity.uuid, launcher_type=provider_session.LauncherTypes.PERSON, @@ -91,7 +158,8 @@ class RuntimeBot: message_event=event, message_chain=event.message_chain, adapter=adapter, - pipeline_uuid=self.bot_entity.use_pipeline_uuid, + pipeline_uuid=pipeline_uuid, + routed_by_rule=routed_by_rule, ) else: await self.logger.info('Pipeline skipped for person message due to webhook response') @@ -126,6 +194,9 @@ class RuntimeBot: if custom_launcher_id: launcher_id = custom_launcher_id + message_text = str(event.message_chain) + pipeline_uuid, routed_by_rule = self.resolve_pipeline_uuid('group', launcher_id, message_text) + await self.ap.msg_aggregator.add_message( bot_uuid=self.bot_entity.uuid, launcher_type=provider_session.LauncherTypes.GROUP, @@ -134,7 +205,8 @@ class RuntimeBot: message_event=event, message_chain=event.message_chain, adapter=adapter, - pipeline_uuid=self.bot_entity.use_pipeline_uuid, + pipeline_uuid=pipeline_uuid, + routed_by_rule=routed_by_rule, ) else: await self.logger.info('Pipeline skipped for group message due to webhook response') diff --git a/src/langbot/pkg/utils/constants.py b/src/langbot/pkg/utils/constants.py index ea0a682f..4fad9069 100644 --- a/src/langbot/pkg/utils/constants.py +++ b/src/langbot/pkg/utils/constants.py @@ -2,7 +2,7 @@ import langbot semantic_version = f'v{langbot.__version__}' -required_database_version = 24 +required_database_version = 25 """Tag the version of the database schema, used to check if the database needs to be migrated""" debug_mode = False diff --git a/web/src/app/home/bots/components/bot-form/BotForm.tsx b/web/src/app/home/bots/components/bot-form/BotForm.tsx index bc8e5e56..7d2d298a 100644 --- a/web/src/app/home/bots/components/bot-form/BotForm.tsx +++ b/web/src/app/home/bots/components/bot-form/BotForm.tsx @@ -13,9 +13,9 @@ import { IDynamicFormItemSchema } from '@/app/infra/entities/form/dynamic'; import { UUID } from 'uuidjs'; import DynamicFormComponent from '@/app/home/components/dynamic-form/DynamicFormComponent'; import { httpClient } from '@/app/infra/http/HttpClient'; -import { Bot } from '@/app/infra/entities/api'; +import { Bot, PipelineRoutingRule, RoutingRuleOperator } from '@/app/infra/entities/api'; import { getAdapterDocUrl } from '@/app/infra/entities/adapter-docs'; -import { ExternalLink } from 'lucide-react'; +import { ExternalLink, Plus, Trash2 } from 'lucide-react'; import { zodResolver } from '@hookform/resolvers/zod'; import { useForm } from 'react-hook-form'; @@ -33,6 +33,7 @@ import { FormMessage, } from '@/components/ui/form'; import { Input } from '@/components/ui/input'; +import { Button } from '@/components/ui/button'; import { Select, SelectContent, @@ -64,6 +65,23 @@ const getFormSchema = (t: (key: string) => string) => adapter_config: z.record(z.string(), z.any()), enable: z.boolean().optional(), use_pipeline_uuid: z.string().optional(), + pipeline_routing_rules: z + .array( + z.object({ + type: z.enum(['launcher_type', 'launcher_id', 'message_content']), + operator: z.enum([ + 'eq', + 'neq', + 'contains', + 'not_contains', + 'starts_with', + 'regex', + ]), + value: z.string(), + pipeline_uuid: z.string(), + }), + ) + .optional(), }); export default function BotForm({ @@ -89,6 +107,7 @@ export default function BotForm({ adapter_config: {}, enable: true, use_pipeline_uuid: '', + pipeline_routing_rules: [], }, }); @@ -155,6 +174,7 @@ export default function BotForm({ adapter_config: val.adapter_config, enable: val.enable, use_pipeline_uuid: val.use_pipeline_uuid || '', + pipeline_routing_rules: val.pipeline_routing_rules || [], }); handleAdapterSelect(val.adapter); @@ -270,6 +290,7 @@ export default function BotForm({ adapter_config: bot.adapter_config, enable: bot.enable ?? true, use_pipeline_uuid: bot.use_pipeline_uuid ?? '', + pipeline_routing_rules: bot.pipeline_routing_rules ?? [], webhook_full_url: runtimeValues?.webhook_full_url as | string | undefined, @@ -314,6 +335,7 @@ export default function BotForm({ adapter_config: form.getValues().adapter_config, enable: form.getValues().enable, use_pipeline_uuid: form.getValues().use_pipeline_uuid, + pipeline_routing_rules: form.getValues().pipeline_routing_rules ?? [], }; httpClient .updateBot(initBotId, updateBot) @@ -464,6 +486,308 @@ export default function BotForm({ )} /> + + {/* Pipeline Routing Rules */} +
+ {t('bots.routingRulesDescription')} +
+