feat: pipeline routing fix - add routed_by_rule bypass and diagnostic logging

- Skip GroupRespondRuleCheckStage when message is routed by rule
- Add WARNING logs when queries are silently dropped
- Add pipeline routing rules support (bot entity, migration, web UI)
- Pass routed_by_rule flag through aggregator -> pool -> query variables
This commit is contained in:
Typer_Body
2026-04-02 01:20:26 +08:00
parent c3e2d5e055
commit ac337b31df
14 changed files with 505 additions and 8 deletions

View File

@@ -16,6 +16,7 @@ class Bot(Base):
enable = sqlalchemy.Column(sqlalchemy.Boolean, nullable=False, default=False)
use_pipeline_name = sqlalchemy.Column(sqlalchemy.String(255), nullable=True)
use_pipeline_uuid = sqlalchemy.Column(sqlalchemy.String(255), nullable=True)
pipeline_routing_rules = sqlalchemy.Column(sqlalchemy.JSON, nullable=False, server_default='[]')
created_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False, server_default=sqlalchemy.func.now())
updated_at = sqlalchemy.Column(
sqlalchemy.DateTime,

View File

@@ -0,0 +1,15 @@
import sqlalchemy
from .. import migration
@migration.migration_class(25)
class DBMigrateBotPipelineRoutingRules(migration.DBMigration):
"""Add pipeline_routing_rules column to bots table"""
async def upgrade(self):
sql_text = sqlalchemy.text("ALTER TABLE bots ADD COLUMN pipeline_routing_rules JSON NOT NULL DEFAULT '[]'")
await self.ap.persistence_mgr.execute_async(sql_text)
async def downgrade(self):
sql_text = sqlalchemy.text('ALTER TABLE bots DROP COLUMN pipeline_routing_rules')
await self.ap.persistence_mgr.execute_async(sql_text)

View File

@@ -37,6 +37,7 @@ class PendingMessage:
message_chain: platform_message.MessageChain
adapter: abstract_platform_adapter.AbstractMessagePlatformAdapter
pipeline_uuid: typing.Optional[str]
routed_by_rule: bool = False
timestamp: float = field(default_factory=time.time)
@@ -125,6 +126,7 @@ class MessageAggregator:
message_chain: platform_message.MessageChain,
adapter: abstract_platform_adapter.AbstractMessagePlatformAdapter,
pipeline_uuid: typing.Optional[str] = None,
routed_by_rule: bool = False,
) -> None:
"""Add a message to the aggregation buffer
@@ -145,6 +147,7 @@ class MessageAggregator:
message_chain=message_chain,
adapter=adapter,
pipeline_uuid=pipeline_uuid,
routed_by_rule=routed_by_rule,
)
return
@@ -159,6 +162,7 @@ class MessageAggregator:
message_chain=message_chain,
adapter=adapter,
pipeline_uuid=pipeline_uuid,
routed_by_rule=routed_by_rule,
)
force_flush = False
@@ -217,6 +221,7 @@ class MessageAggregator:
message_chain=msg.message_chain,
adapter=msg.adapter,
pipeline_uuid=msg.pipeline_uuid,
routed_by_rule=msg.routed_by_rule,
)
return
@@ -231,6 +236,7 @@ class MessageAggregator:
message_chain=merged_msg.message_chain,
adapter=merged_msg.adapter,
pipeline_uuid=merged_msg.pipeline_uuid,
routed_by_rule=merged_msg.routed_by_rule,
)
def _merge_messages(self, messages: list[PendingMessage]) -> PendingMessage:

View File

@@ -63,6 +63,14 @@ class Controller:
pipeline = await self.ap.pipeline_mgr.get_pipeline_by_uuid(pipeline_uuid)
if pipeline:
await pipeline.run(selected_query)
else:
self.ap.logger.warning(
f'Pipeline {pipeline_uuid} not found for query {selected_query.query_id}, query dropped'
)
else:
self.ap.logger.warning(
f'No pipeline_uuid for query {selected_query.query_id}, query dropped'
)
async with self.ap.query_pool:
(await self.ap.sess_mgr.get_session(selected_query))._semaphore.release()

View File

@@ -247,7 +247,9 @@ class RuntimePipeline:
await self._check_output(query, result)
if result.result_type == pipeline_entities.ResultType.INTERRUPT:
self.ap.logger.debug(f'Stage {stage_container.inst_name} interrupted query {query.query_id}')
self.ap.logger.warning(
f'Stage {stage_container.inst_name} interrupted query {query.query_id}, message: {str(query.message_chain)[:100]}'
)
break
elif result.result_type == pipeline_entities.ResultType.CONTINUE:
query = result.new_query
@@ -261,7 +263,9 @@ class RuntimePipeline:
await self._check_output(query, sub_result)
if sub_result.result_type == pipeline_entities.ResultType.INTERRUPT:
self.ap.logger.debug(f'Stage {stage_container.inst_name} interrupted query {query.query_id}')
self.ap.logger.warning(
f'Stage {stage_container.inst_name} interrupted query {query.query_id}, message: {str(query.message_chain)[:100]}'
)
break
elif sub_result.result_type == pipeline_entities.ResultType.CONTINUE:
query = sub_result.new_query
@@ -323,6 +327,9 @@ class RuntimePipeline:
event_ctx = await self.ap.plugin_connector.emit_event(event_obj, bound_plugins)
if event_ctx.is_prevented_default():
self.ap.logger.warning(
f'MessageReceived event prevented default for query {query.query_id}, pipeline={pipeline_name}, message: {str(query.message_chain)[:100]}'
)
return
self.ap.logger.debug(f'Processing query {query.query_id}')

View File

@@ -41,6 +41,7 @@ class QueryPool:
message_chain: platform_message.MessageChain,
adapter: abstract_platform_adapter.AbstractMessagePlatformAdapter,
pipeline_uuid: typing.Optional[str] = None,
routed_by_rule: bool = False,
) -> pipeline_query.Query:
async with self.condition:
query_id = self.query_id_counter
@@ -52,7 +53,7 @@ class QueryPool:
sender_id=sender_id,
message_event=message_event,
message_chain=message_chain,
variables={},
variables={'_routed_by_rule': routed_by_rule},
resp_messages=[],
resp_message_chain=[],
adapter=adapter,

View File

@@ -61,6 +61,9 @@ class ChatMessageHandler(handler.MessageHandler):
yield entities.StageProcessResult(result_type=entities.ResultType.CONTINUE, new_query=query)
else:
self.ap.logger.warning(
f'NormalMessageReceived event prevented default for query {query.query_id} without reply, message: {str(query.message_chain)[:100]}'
)
yield entities.StageProcessResult(result_type=entities.ResultType.INTERRUPT, new_query=query)
else:
if event_ctx.event.user_message_alter is not None:

View File

@@ -37,6 +37,10 @@ class GroupRespondRuleCheckStage(stage.PipelineStage):
if query.launcher_type.value != 'group': # 只处理群消息
return entities.StageProcessResult(result_type=entities.ResultType.CONTINUE, new_query=query)
# 通过路由规则明确指定的流水线,跳过群响应规则检查
if query.variables and query.variables.get('_routed_by_rule', False):
return entities.StageProcessResult(result_type=entities.ResultType.CONTINUE, new_query=query)
rules = query.pipeline_config['trigger']['group-respond-rules']
use_rule = rules

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
import asyncio
import re
import traceback
import sqlalchemy
@@ -52,6 +53,69 @@ class RuntimeBot:
self.task_context = taskmgr.TaskContext()
self.logger = logger
@staticmethod
def _match_operator(actual: str, operator: str, expected: str) -> bool:
"""Evaluate a single operator condition."""
if operator == 'eq':
return actual == expected
elif operator == 'neq':
return actual != expected
elif operator == 'contains':
return expected in actual
elif operator == 'not_contains':
return expected not in actual
elif operator == 'starts_with':
return actual.startswith(expected)
elif operator == 'regex':
try:
return bool(re.search(expected, actual))
except re.error:
return False
return False
def resolve_pipeline_uuid(
self,
launcher_type: str,
launcher_id: str,
message_text: str,
) -> tuple[str | None, bool]:
"""Resolve pipeline UUID based on routing rules.
Rules are evaluated in order; first match wins.
Falls back to use_pipeline_uuid if no rule matches.
Rule types:
- launcher_type: session type ("person" / "group")
- launcher_id: session / group id
- message_content: message text content
Operators: eq, neq, contains, not_contains, starts_with, regex
Returns:
tuple: (pipeline_uuid, routed_by_rule) - routed_by_rule is True
when a routing rule matched, False when falling back to default.
"""
rules = self.bot_entity.pipeline_routing_rules or []
for rule in rules:
rule_type = rule.get('type')
operator = rule.get('operator', 'eq')
rule_value = rule.get('value', '')
target_uuid = rule.get('pipeline_uuid')
if not rule_type or not target_uuid:
continue
if rule_type == 'launcher_type':
if self._match_operator(launcher_type, operator, rule_value):
return target_uuid, True
elif rule_type == 'launcher_id':
if self._match_operator(str(launcher_id), operator, str(rule_value)):
return target_uuid, True
elif rule_type == 'message_content':
if self._match_operator(message_text, operator, rule_value):
return target_uuid, True
return self.bot_entity.use_pipeline_uuid, False
async def initialize(self):
async def on_friend_message(
event: platform_events.FriendMessage,
@@ -83,6 +147,9 @@ class RuntimeBot:
if custom_launcher_id:
launcher_id = custom_launcher_id
message_text = str(event.message_chain)
pipeline_uuid, routed_by_rule = self.resolve_pipeline_uuid('person', launcher_id, message_text)
await self.ap.msg_aggregator.add_message(
bot_uuid=self.bot_entity.uuid,
launcher_type=provider_session.LauncherTypes.PERSON,
@@ -91,7 +158,8 @@ class RuntimeBot:
message_event=event,
message_chain=event.message_chain,
adapter=adapter,
pipeline_uuid=self.bot_entity.use_pipeline_uuid,
pipeline_uuid=pipeline_uuid,
routed_by_rule=routed_by_rule,
)
else:
await self.logger.info('Pipeline skipped for person message due to webhook response')
@@ -126,6 +194,9 @@ class RuntimeBot:
if custom_launcher_id:
launcher_id = custom_launcher_id
message_text = str(event.message_chain)
pipeline_uuid, routed_by_rule = self.resolve_pipeline_uuid('group', launcher_id, message_text)
await self.ap.msg_aggregator.add_message(
bot_uuid=self.bot_entity.uuid,
launcher_type=provider_session.LauncherTypes.GROUP,
@@ -134,7 +205,8 @@ class RuntimeBot:
message_event=event,
message_chain=event.message_chain,
adapter=adapter,
pipeline_uuid=self.bot_entity.use_pipeline_uuid,
pipeline_uuid=pipeline_uuid,
routed_by_rule=routed_by_rule,
)
else:
await self.logger.info('Pipeline skipped for group message due to webhook response')

View File

@@ -2,7 +2,7 @@ import langbot
semantic_version = f'v{langbot.__version__}'
required_database_version = 24
required_database_version = 25
"""Tag the version of the database schema, used to check if the database needs to be migrated"""
debug_mode = False

View File

@@ -13,9 +13,9 @@ import { IDynamicFormItemSchema } from '@/app/infra/entities/form/dynamic';
import { UUID } from 'uuidjs';
import DynamicFormComponent from '@/app/home/components/dynamic-form/DynamicFormComponent';
import { httpClient } from '@/app/infra/http/HttpClient';
import { Bot } from '@/app/infra/entities/api';
import { Bot, PipelineRoutingRule, RoutingRuleOperator } from '@/app/infra/entities/api';
import { getAdapterDocUrl } from '@/app/infra/entities/adapter-docs';
import { ExternalLink } from 'lucide-react';
import { ExternalLink, Plus, Trash2 } from 'lucide-react';
import { zodResolver } from '@hookform/resolvers/zod';
import { useForm } from 'react-hook-form';
@@ -33,6 +33,7 @@ import {
FormMessage,
} from '@/components/ui/form';
import { Input } from '@/components/ui/input';
import { Button } from '@/components/ui/button';
import {
Select,
SelectContent,
@@ -64,6 +65,23 @@ const getFormSchema = (t: (key: string) => string) =>
adapter_config: z.record(z.string(), z.any()),
enable: z.boolean().optional(),
use_pipeline_uuid: z.string().optional(),
pipeline_routing_rules: z
.array(
z.object({
type: z.enum(['launcher_type', 'launcher_id', 'message_content']),
operator: z.enum([
'eq',
'neq',
'contains',
'not_contains',
'starts_with',
'regex',
]),
value: z.string(),
pipeline_uuid: z.string(),
}),
)
.optional(),
});
export default function BotForm({
@@ -89,6 +107,7 @@ export default function BotForm({
adapter_config: {},
enable: true,
use_pipeline_uuid: '',
pipeline_routing_rules: [],
},
});
@@ -155,6 +174,7 @@ export default function BotForm({
adapter_config: val.adapter_config,
enable: val.enable,
use_pipeline_uuid: val.use_pipeline_uuid || '',
pipeline_routing_rules: val.pipeline_routing_rules || [],
});
handleAdapterSelect(val.adapter);
@@ -270,6 +290,7 @@ export default function BotForm({
adapter_config: bot.adapter_config,
enable: bot.enable ?? true,
use_pipeline_uuid: bot.use_pipeline_uuid ?? '',
pipeline_routing_rules: bot.pipeline_routing_rules ?? [],
webhook_full_url: runtimeValues?.webhook_full_url as
| string
| undefined,
@@ -314,6 +335,7 @@ export default function BotForm({
adapter_config: form.getValues().adapter_config,
enable: form.getValues().enable,
use_pipeline_uuid: form.getValues().use_pipeline_uuid,
pipeline_routing_rules: form.getValues().pipeline_routing_rules ?? [],
};
httpClient
.updateBot(initBotId, updateBot)
@@ -464,6 +486,308 @@ export default function BotForm({
</FormItem>
)}
/>
{/* Pipeline Routing Rules */}
<div className="mt-6">
<div className="flex items-center justify-between mb-2">
<div>
<FormLabel>{t('bots.routingRules')}</FormLabel>
<p className="text-sm text-muted-foreground mt-1">
{t('bots.routingRulesDescription')}
</p>
</div>
<Button
type="button"
variant="outline"
size="sm"
onClick={() => {
const rules =
form.getValues('pipeline_routing_rules') || [];
form.setValue(
'pipeline_routing_rules',
[
...rules,
{
type: 'launcher_type' as const,
operator: 'eq' as const,
value: '',
pipeline_uuid: '',
},
],
{ shouldDirty: true },
);
}}
>
<Plus className="h-4 w-4 mr-1" />
{t('bots.addRoutingRule')}
</Button>
</div>
{(form.watch('pipeline_routing_rules') || []).map(
(rule, index) => {
// Determine which operators are available for the current type
const operatorsForType: {
value: RoutingRuleOperator;
labelKey: string;
}[] =
rule.type === 'launcher_type'
? [
{ value: 'eq', labelKey: 'bots.operatorEq' },
{ value: 'neq', labelKey: 'bots.operatorNeq' },
]
: rule.type === 'launcher_id'
? [
{ value: 'eq', labelKey: 'bots.operatorEq' },
{ value: 'neq', labelKey: 'bots.operatorNeq' },
{
value: 'contains',
labelKey: 'bots.operatorContains',
},
{
value: 'not_contains',
labelKey: 'bots.operatorNotContains',
},
{
value: 'regex',
labelKey: 'bots.operatorRegex',
},
]
: [
{ value: 'eq', labelKey: 'bots.operatorEq' },
{ value: 'neq', labelKey: 'bots.operatorNeq' },
{
value: 'contains',
labelKey: 'bots.operatorContains',
},
{
value: 'not_contains',
labelKey: 'bots.operatorNotContains',
},
{
value: 'starts_with',
labelKey: 'bots.operatorStartsWith',
},
{
value: 'regex',
labelKey: 'bots.operatorRegex',
},
];
return (
<div
key={index}
className="flex items-center gap-2 mt-2 p-3 border rounded-md bg-muted/30"
>
{/* Field selector */}
<Select
value={rule.type}
onValueChange={(val) => {
const rules = [
...(form.getValues('pipeline_routing_rules') ||
[]),
];
const newType =
val as PipelineRoutingRule['type'];
// Reset operator to 'eq' when switching type
rules[index] = {
...rules[index],
type: newType,
operator: 'eq',
value: '',
};
form.setValue('pipeline_routing_rules', rules, {
shouldDirty: true,
});
}}
>
<SelectTrigger className="w-[130px]">
<SelectValue />
</SelectTrigger>
<SelectContent>
<SelectItem value="launcher_type">
{t('bots.ruleTypeLauncherType')}
</SelectItem>
<SelectItem value="launcher_id">
{t('bots.ruleTypeLauncherId')}
</SelectItem>
<SelectItem value="message_content">
{t('bots.ruleTypeMessageContent')}
</SelectItem>
</SelectContent>
</Select>
{/* Operator selector */}
<Select
value={rule.operator || 'eq'}
onValueChange={(val) => {
const rules = [
...(form.getValues('pipeline_routing_rules') ||
[]),
];
rules[index] = {
...rules[index],
operator: val as RoutingRuleOperator,
};
form.setValue('pipeline_routing_rules', rules, {
shouldDirty: true,
});
}}
>
<SelectTrigger className="w-[120px]">
<SelectValue />
</SelectTrigger>
<SelectContent>
{operatorsForType.map((op) => (
<SelectItem key={op.value} value={op.value}>
{t(op.labelKey)}
</SelectItem>
))}
</SelectContent>
</Select>
{/* Value input */}
{rule.type === 'launcher_type' ? (
<Select
value={rule.value}
onValueChange={(val) => {
const rules = [
...(form.getValues(
'pipeline_routing_rules',
) || []),
];
rules[index] = { ...rules[index], value: val };
form.setValue('pipeline_routing_rules', rules, {
shouldDirty: true,
});
}}
>
<SelectTrigger className="w-[100px]">
<SelectValue
placeholder={t('bots.ruleValuePlaceholder')}
/>
</SelectTrigger>
<SelectContent>
<SelectItem value="person">
{t('bots.sessionTypePerson')}
</SelectItem>
<SelectItem value="group">
{t('bots.sessionTypeGroup')}
</SelectItem>
</SelectContent>
</Select>
) : (
<Input
className="flex-1"
placeholder={
rule.type === 'launcher_id'
? t('bots.ruleValueLauncherIdPlaceholder')
: rule.operator === 'regex'
? t('bots.ruleValueRegexpPlaceholder')
: t('bots.ruleValueMessagePlaceholder')
}
value={rule.value}
onChange={(e) => {
const rules = [
...(form.getValues(
'pipeline_routing_rules',
) || []),
];
rules[index] = {
...rules[index],
value: e.target.value,
};
form.setValue('pipeline_routing_rules', rules, {
shouldDirty: true,
});
}}
/>
)}
<span className="text-sm text-muted-foreground shrink-0">
</span>
{/* Pipeline selector */}
<Select
value={rule.pipeline_uuid}
onValueChange={(val) => {
const rules = [
...(form.getValues('pipeline_routing_rules') ||
[]),
];
rules[index] = {
...rules[index],
pipeline_uuid: val,
};
form.setValue('pipeline_routing_rules', rules, {
shouldDirty: true,
});
}}
>
<SelectTrigger className="w-[200px]">
{rule.pipeline_uuid ? (
(() => {
const p = pipelineNameList.find(
(p) => p.value === rule.pipeline_uuid,
);
return (
<div className="flex items-center gap-2">
{p?.emoji && (
<span className="text-sm shrink-0">
{p.emoji}
</span>
)}
<span>
{p?.label ?? rule.pipeline_uuid}
</span>
</div>
);
})()
) : (
<SelectValue
placeholder={t('bots.selectPipeline')}
/>
)}
</SelectTrigger>
<SelectContent>
{pipelineNameList.map((item) => (
<SelectItem key={item.value} value={item.value}>
<div className="flex items-center gap-2">
{item.emoji && (
<span className="text-sm shrink-0">
{item.emoji}
</span>
)}
<span>{item.label}</span>
</div>
</SelectItem>
))}
</SelectContent>
</Select>
<Button
type="button"
variant="ghost"
size="icon"
className="shrink-0"
onClick={() => {
const rules = [
...(form.getValues('pipeline_routing_rules') ||
[]),
];
rules.splice(index, 1);
form.setValue('pipeline_routing_rules', rules, {
shouldDirty: true,
});
}}
>
<Trash2 className="h-4 w-4 text-destructive" />
</Button>
</div>
);
},
)}
</div>
</CardContent>
</Card>
)}

View File

@@ -140,11 +140,27 @@ export interface Bot {
adapter_config: object;
use_pipeline_name?: string;
use_pipeline_uuid?: string;
pipeline_routing_rules?: PipelineRoutingRule[];
created_at?: string;
updated_at?: string;
adapter_runtime_values?: object;
}
export type RoutingRuleOperator =
| 'eq'
| 'neq'
| 'contains'
| 'not_contains'
| 'starts_with'
| 'regex';
export interface PipelineRoutingRule {
type: 'launcher_type' | 'launcher_id' | 'message_content';
operator: RoutingRuleOperator;
value: string;
pipeline_uuid: string;
}
export interface ApiRespKnowledgeBases {
bases: KnowledgeBase[];
}

View File

@@ -307,6 +307,26 @@ const enUS = {
routingConnection: 'Routing & Connection',
routingConnectionDescription:
'Bind the pipeline that processes messages for this bot',
routingRules: 'Conditional Routing Rules',
routingRulesDescription:
'Rules are evaluated in order; first match routes to its pipeline. Fallback to the default pipeline above if none match.',
addRoutingRule: 'Add Rule',
ruleTypeLauncherType: 'Session Type',
ruleTypeLauncherId: 'Session ID',
ruleTypeMessageContent: 'Message Content',
operatorEq: 'Equals',
operatorNeq: 'Not Equals',
operatorContains: 'Contains',
operatorNotContains: 'Not Contains',
operatorStartsWith: 'Starts With',
operatorRegex: 'Regex',
ruleValuePlaceholder: 'Match value',
ruleValueLauncherIdPlaceholder: 'Group or user ID',
ruleValueMessagePlaceholder: 'Message text',
ruleValuePrefixPlaceholder: 'e.g. !draw',
ruleValueRegexpPlaceholder: 'e.g. ^/help',
sessionTypePerson: 'Private Chat',
sessionTypeGroup: 'Group Chat',
adapterConfigDescription: 'Configure the selected platform adapter',
dangerZone: 'Danger Zone',
dangerZoneDescription: 'Irreversible and destructive actions',

View File

@@ -294,6 +294,26 @@ const zhHans = {
basicInfoDescription: '设置机器人名称和描述',
routingConnection: '路由与连接',
routingConnectionDescription: '绑定处理此机器人消息的流水线',
routingRules: '条件路由规则',
routingRulesDescription:
'按顺序匹配,命中第一条规则后路由到对应流水线;都不匹配时使用上方默认流水线',
addRoutingRule: '添加规则',
ruleTypeLauncherType: '会话类型',
ruleTypeLauncherId: '会话 ID',
ruleTypeMessageContent: '消息内容',
operatorEq: '等于',
operatorNeq: '不等于',
operatorContains: '包含',
operatorNotContains: '不包含',
operatorStartsWith: '前缀匹配',
operatorRegex: '正则匹配',
ruleValuePlaceholder: '匹配值',
ruleValueLauncherIdPlaceholder: '群号或用户 ID',
ruleValueMessagePlaceholder: '消息内容',
ruleValuePrefixPlaceholder: '如: !draw',
ruleValueRegexpPlaceholder: '如: ^/help',
sessionTypePerson: '私聊',
sessionTypeGroup: '群聊',
adapterConfigDescription: '配置所选平台适配器',
dangerZone: '危险区域',
dangerZoneDescription: '不可逆的操作',