feat(agent): add event orchestration surface

This commit is contained in:
Junyan Qin
2026-06-23 23:23:09 +08:00
parent d7b97741e3
commit ed3598f8ac
35 changed files with 2983 additions and 142 deletions
@@ -0,0 +1,40 @@
from __future__ import annotations
import quart
from .. import group
@group.group_class('agents', '/api/v1/agents')
class AgentsRouterGroup(group.RouterGroup):
async def initialize(self) -> None:
@self.route('', methods=['GET', 'POST'], auth_type=group.AuthType.USER_TOKEN_OR_API_KEY)
async def _() -> str:
if quart.request.method == 'GET':
sort_by = quart.request.args.get('sort_by', 'updated_at')
sort_order = quart.request.args.get('sort_order', 'DESC')
return self.success(data={'agents': await self.ap.agent_service.get_agents(sort_by, sort_order)})
json_data = await quart.request.json
created = await self.ap.agent_service.create_agent(json_data)
return self.success(data=created)
@self.route('/_/metadata', methods=['GET'], auth_type=group.AuthType.USER_TOKEN_OR_API_KEY)
async def _() -> str:
return self.success(data=await self.ap.agent_service.get_agent_metadata())
@self.route('/<agent_uuid>', methods=['GET', 'PUT', 'DELETE'], auth_type=group.AuthType.USER_TOKEN_OR_API_KEY)
async def _(agent_uuid: str) -> str:
if quart.request.method == 'GET':
agent = await self.ap.agent_service.get_agent(agent_uuid)
if agent is None:
return self.http_status(404, -1, 'agent not found')
return self.success(data={'agent': agent})
if quart.request.method == 'PUT':
json_data = await quart.request.json
await self.ap.agent_service.update_agent(agent_uuid, json_data)
return self.success()
await self.ap.agent_service.delete_agent(agent_uuid)
return self.success()
+220
View File
@@ -0,0 +1,220 @@
from __future__ import annotations
import datetime
import uuid
import typing
import sqlalchemy
from ....core import app
from ....entity.persistence import agent as persistence_agent
AGENT_KIND_AGENT = 'agent'
AGENT_KIND_PIPELINE = 'pipeline'
PIPELINE_EVENT_PATTERNS = ['message.*']
AGENT_DEFAULT_EVENT_PATTERNS = ['*']
class AgentService:
"""Unified product surface for Agent orchestration instances and Pipelines."""
ap: app.Application
def __init__(self, ap: app.Application) -> None:
self.ap = ap
async def get_agent_metadata(self) -> dict[str, typing.Any]:
"""Return metadata needed by Agent forms."""
pipeline_metadata = await self.ap.pipeline_service.get_pipeline_metadata()
ai_metadata = next((item for item in pipeline_metadata if item.get('name') == 'ai'), None)
return {
'runner_config': ai_metadata,
'kinds': [
{
'name': AGENT_KIND_AGENT,
'supported_event_patterns': AGENT_DEFAULT_EVENT_PATTERNS,
'message_only': False,
},
{
'name': AGENT_KIND_PIPELINE,
'supported_event_patterns': PIPELINE_EVENT_PATTERNS,
'message_only': True,
},
],
}
async def get_agents(self, sort_by: str = 'updated_at', sort_order: str = 'DESC') -> list[dict]:
agents = await self._get_agent_rows()
pipelines = await self.ap.pipeline_service.get_pipelines(sort_by='updated_at', sort_order='DESC')
items = [self._agent_to_product_item(agent) for agent in agents]
items.extend(self._pipeline_to_product_item(pipeline) for pipeline in pipelines)
reverse = sort_order == 'DESC'
sort_key = sort_by if sort_by in {'created_at', 'updated_at'} else 'updated_at'
return sorted(items, key=lambda item: self._parse_sort_time(item.get(sort_key)), reverse=reverse)
async def get_agent(self, agent_uuid: str) -> dict | None:
agent = await self._get_agent_row(agent_uuid)
if agent is not None:
return self._agent_to_product_item(agent, include_config=True)
pipeline = await self.ap.pipeline_service.get_pipeline(agent_uuid)
if pipeline is not None:
return self._pipeline_to_product_item(pipeline, include_config=True)
return None
async def create_agent(self, agent_data: dict) -> dict[str, str]:
kind = agent_data.get('kind') or AGENT_KIND_AGENT
if kind == AGENT_KIND_PIPELINE:
pipeline_uuid = await self.ap.pipeline_service.create_pipeline(
{
'name': agent_data.get('name') or 'New Pipeline',
'description': agent_data.get('description') or '',
'emoji': agent_data.get('emoji') or '⚙️',
'config': {},
}
)
return {'uuid': pipeline_uuid, 'kind': AGENT_KIND_PIPELINE}
if kind != AGENT_KIND_AGENT:
raise ValueError(f'Unsupported agent kind: {kind}')
config = agent_data.get('config') or await self._get_default_agent_config()
runner_id = self._resolve_runner_id(config)
new_uuid = str(uuid.uuid4())
values = {
'uuid': new_uuid,
'name': agent_data.get('name') or 'New Agent',
'description': agent_data.get('description') or '',
'emoji': agent_data.get('emoji') or '🤖',
'kind': AGENT_KIND_AGENT,
'component_ref': agent_data.get('component_ref') or runner_id,
'config': config,
'enabled': agent_data.get('enabled', True),
'supported_event_patterns': agent_data.get('supported_event_patterns') or AGENT_DEFAULT_EVENT_PATTERNS,
}
await self.ap.persistence_mgr.execute_async(sqlalchemy.insert(persistence_agent.Agent).values(**values))
return {'uuid': new_uuid, 'kind': AGENT_KIND_AGENT}
async def update_agent(self, agent_uuid: str, agent_data: dict) -> None:
existing_agent = await self._get_agent_row(agent_uuid)
if existing_agent is None:
pipeline = await self.ap.pipeline_service.get_pipeline(agent_uuid)
if pipeline is None:
raise ValueError(f'Agent {agent_uuid} not found')
await self.ap.pipeline_service.update_pipeline(agent_uuid, agent_data)
return
update_data = agent_data.copy()
for protected_field in ('uuid', 'kind', 'created_at', 'updated_at', 'capability'):
update_data.pop(protected_field, None)
if 'config' in update_data:
update_data['component_ref'] = update_data.get('component_ref') or self._resolve_runner_id(
update_data['config']
)
if 'supported_event_patterns' in update_data and not update_data['supported_event_patterns']:
update_data['supported_event_patterns'] = AGENT_DEFAULT_EVENT_PATTERNS
await self.ap.persistence_mgr.execute_async(
sqlalchemy.update(persistence_agent.Agent)
.where(persistence_agent.Agent.uuid == agent_uuid)
.values(**update_data)
)
async def delete_agent(self, agent_uuid: str) -> None:
existing_agent = await self._get_agent_row(agent_uuid)
if existing_agent is not None:
await self.ap.persistence_mgr.execute_async(
sqlalchemy.delete(persistence_agent.Agent).where(persistence_agent.Agent.uuid == agent_uuid)
)
return
pipeline = await self.ap.pipeline_service.get_pipeline(agent_uuid)
if pipeline is None:
raise ValueError(f'Agent {agent_uuid} not found')
await self.ap.pipeline_service.delete_pipeline(agent_uuid)
async def _get_agent_rows(self) -> list[persistence_agent.Agent]:
result = await self.ap.persistence_mgr.execute_async(sqlalchemy.select(persistence_agent.Agent))
return list(result.all())
async def _get_agent_row(self, agent_uuid: str) -> persistence_agent.Agent | None:
result = await self.ap.persistence_mgr.execute_async(
sqlalchemy.select(persistence_agent.Agent).where(persistence_agent.Agent.uuid == agent_uuid)
)
return result.first()
async def _get_default_agent_config(self) -> dict[str, typing.Any]:
runners = []
if getattr(self.ap, 'agent_runner_registry', None) is not None:
try:
runners = await self.ap.agent_runner_registry.list_runners(bound_plugins=None)
except Exception as e:
if getattr(self.ap, 'logger', None):
self.ap.logger.warning(f'Failed to load plugin agent runners for default agent config: {e}')
if not runners:
return {'runner': {'id': '', 'expire-time': 0}, 'runner_config': {}}
selected_runner = runners[0]
return {
'runner': {'id': selected_runner.id, 'expire-time': 0},
'runner_config': {
selected_runner.id: self.ap.pipeline_service._get_default_values_from_schema(
selected_runner.config_schema
)
},
}
@staticmethod
def _resolve_runner_id(config: dict[str, typing.Any]) -> str | None:
runner = config.get('runner') if isinstance(config, dict) else None
if isinstance(runner, dict):
runner_id = runner.get('id')
if runner_id:
return runner_id
return None
def _agent_to_product_item(
self,
agent: persistence_agent.Agent,
include_config: bool = False,
) -> dict[str, typing.Any]:
item = self.ap.persistence_mgr.serialize_model(persistence_agent.Agent, agent)
item['kind'] = AGENT_KIND_AGENT
item['capability'] = {
'supported_event_patterns': item.get('supported_event_patterns') or AGENT_DEFAULT_EVENT_PATTERNS,
'message_only': False,
}
if not include_config:
item.pop('config', None)
return item
@staticmethod
def _pipeline_to_product_item(pipeline: dict, include_config: bool = False) -> dict[str, typing.Any]:
item = pipeline.copy()
item['kind'] = AGENT_KIND_PIPELINE
item['component_ref'] = 'pipeline'
item['enabled'] = True
item['supported_event_patterns'] = PIPELINE_EVENT_PATTERNS
item['capability'] = {
'supported_event_patterns': PIPELINE_EVENT_PATTERNS,
'message_only': True,
}
if not include_config:
item.pop('config', None)
return item
@staticmethod
def _parse_sort_time(value: typing.Any) -> datetime.datetime:
if isinstance(value, datetime.datetime):
return value
if isinstance(value, str):
try:
return datetime.datetime.fromisoformat(value)
except ValueError:
return datetime.datetime.min
return datetime.datetime.min
+82
View File
@@ -6,6 +6,7 @@ import typing
from ....core import app
from ....discover import engine
from ....entity.persistence import agent as persistence_agent
from ....entity.persistence import bot as persistence_bot
from ....entity.persistence import pipeline as persistence_pipeline
@@ -36,6 +37,84 @@ class BotService:
return True
return False
@staticmethod
def _is_message_event_pattern(event_pattern: str) -> bool:
return event_pattern == 'message.*' or event_pattern.startswith('message.')
@staticmethod
def _event_pattern_covers(supported_pattern: str, binding_pattern: str) -> bool:
if supported_pattern == '*':
return True
if supported_pattern == binding_pattern:
return True
if binding_pattern == '*':
return False
if supported_pattern.endswith('.*'):
namespace = supported_pattern[:-2]
return binding_pattern == f'{namespace}.*' or binding_pattern.startswith(f'{namespace}.')
return False
@classmethod
def _agent_supports_event_pattern(cls, supported_patterns: list[str] | None, event_pattern: str) -> bool:
patterns = supported_patterns or ['*']
return any(cls._event_pattern_covers(pattern, event_pattern) for pattern in patterns)
async def _normalize_event_bindings(self, bindings: list[dict] | None) -> list[dict]:
"""Validate and normalize Bot event bindings."""
if not bindings:
return []
normalized: list[dict] = []
for index, raw_binding in enumerate(bindings):
if not isinstance(raw_binding, dict):
continue
event_pattern = str(raw_binding.get('event_pattern') or '').strip()
target_type = str(raw_binding.get('target_type') or '').strip()
target_uuid = str(raw_binding.get('target_uuid') or '').strip()
if not event_pattern or not target_type:
continue
if target_type == 'pipeline':
if not self._is_message_event_pattern(event_pattern):
raise ValueError('Pipeline can only be bound to message events')
result = await self.ap.persistence_mgr.execute_async(
sqlalchemy.select(persistence_pipeline.LegacyPipeline.uuid).where(
persistence_pipeline.LegacyPipeline.uuid == target_uuid
)
)
if result.first() is None:
raise ValueError('Pipeline not found')
elif target_type == 'agent':
result = await self.ap.persistence_mgr.execute_async(
sqlalchemy.select(persistence_agent.Agent).where(persistence_agent.Agent.uuid == target_uuid)
)
agent = result.first()
if agent is None:
raise ValueError('Agent not found')
if not self._agent_supports_event_pattern(agent.supported_event_patterns, event_pattern):
raise ValueError('Agent does not support this event pattern')
elif target_type == 'discard':
target_uuid = ''
else:
raise ValueError(f'Unsupported event binding target type: {target_type}')
normalized.append(
{
'id': raw_binding.get('id') or str(uuid.uuid4()),
'event_pattern': event_pattern,
'target_type': target_type,
'target_uuid': target_uuid,
'filters': raw_binding.get('filters') or [],
'priority': int(raw_binding.get('priority') or 0),
'enabled': bool(raw_binding.get('enabled', True)),
'description': raw_binding.get('description') or '',
'order': index,
}
)
return normalized
async def get_bots(self, include_secret: bool = True) -> list[dict]:
"""获取所有机器人"""
result = await self.ap.persistence_mgr.execute_async(sqlalchemy.select(persistence_bot.Bot))
@@ -137,6 +216,9 @@ class BotService:
if 'uuid' in update_data:
del update_data['uuid']
if 'event_bindings' in update_data:
update_data['event_bindings'] = await self._normalize_event_bindings(update_data.get('event_bindings'))
# set use_pipeline_name
if 'use_pipeline_uuid' in update_data:
result = await self.ap.persistence_mgr.execute_async(
+29 -1
View File
@@ -28,7 +28,7 @@ if typing.TYPE_CHECKING:
INSTRUCTIONS = """\
This MCP server manages a LangBot instance. LangBot is an LLM-native instant
messaging bot platform. Use these tools to inspect and manage bots, pipelines,
messaging bot platform. Use these tools to inspect and manage bots, agents, pipelines,
models, knowledge bases, MCP servers, and skills.
Authentication uses a LangBot API key (web-UI-created `lbk_...` key or the
@@ -141,6 +141,34 @@ class LangBotMCPServer:
await ap.pipeline_service.delete_pipeline(pipeline_uuid)
return _dump({'ok': True})
# ----- Agents -------------------------------------------------- #
@mcp.tool(description='List product-level Agents, including Agent orchestrations and Pipelines.')
async def list_agents() -> str:
return _dump(await ap.agent_service.get_agents())
@mcp.tool(description='Get a product-level Agent or Pipeline by UUID.')
async def get_agent(agent_uuid: str) -> str:
return _dump(await ap.agent_service.get_agent(agent_uuid))
@mcp.tool(
description=(
'Create an Agent orchestration or Pipeline. `agent_data` matches '
'POST /api/v1/agents; set kind to `agent` or `pipeline`. Returns the new UUID and kind.'
)
)
async def create_agent(agent_data: dict) -> str:
return _dump(await ap.agent_service.create_agent(agent_data))
@mcp.tool(description='Update an Agent orchestration or Pipeline by UUID.')
async def update_agent(agent_uuid: str, agent_data: dict) -> str:
await ap.agent_service.update_agent(agent_uuid, agent_data)
return _dump({'ok': True})
@mcp.tool(description='Delete an Agent orchestration or Pipeline by UUID.')
async def delete_agent(agent_uuid: str) -> str:
await ap.agent_service.delete_agent(agent_uuid)
return _dump({'ok': True})
# ----- Models -------------------------------------------------- #
@mcp.tool(description='List all configured LLM models. Secrets are redacted.')
async def list_llm_models() -> str:
+3
View File
@@ -27,6 +27,7 @@ from ..api.http.service import space as space_service
from ..api.http.service import model as model_service
from ..api.http.service import provider as provider_service
from ..api.http.service import pipeline as pipeline_service
from ..api.http.service import agent as agent_service
from ..api.http.service import bot as bot_service
from ..api.http.service import knowledge as knowledge_service
from ..api.http.service import mcp as mcp_service
@@ -147,6 +148,8 @@ class Application:
pipeline_service: pipeline_service.PipelineService = None
agent_service: agent_service.AgentService = None
bot_service: bot_service.BotService = None
knowledge_service: knowledge_service.KnowledgeService = None
+4
View File
@@ -23,6 +23,7 @@ from ...api.http.service import space as space_service
from ...api.http.service import model as model_service
from ...api.http.service import provider as provider_service
from ...api.http.service import pipeline as pipeline_service
from ...api.http.service import agent as agent_service
from ...api.http.service import bot as bot_service
from ...api.http.service import knowledge as knowledge_service
from ...api.http.service import mcp as mcp_service
@@ -75,6 +76,9 @@ class BuildAppStage(stage.BootingStage):
pipeline_service_inst = pipeline_service.PipelineService(ap)
ap.pipeline_service = pipeline_service_inst
agent_service_inst = agent_service.AgentService(ap)
ap.agent_service = agent_service_inst
bot_service_inst = bot_service.BotService(ap)
ap.bot_service = bot_service_inst
@@ -0,0 +1,26 @@
import sqlalchemy
from .base import Base
class Agent(Base):
"""Product-level Agent orchestration instance."""
__tablename__ = 'agents'
uuid = sqlalchemy.Column(sqlalchemy.String(255), primary_key=True, unique=True)
name = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
description = sqlalchemy.Column(sqlalchemy.String(255), nullable=False, default='')
emoji = sqlalchemy.Column(sqlalchemy.String(10), nullable=True, default='🤖')
kind = sqlalchemy.Column(sqlalchemy.String(50), nullable=False, default='agent')
component_ref = sqlalchemy.Column(sqlalchemy.String(255), nullable=True)
config = sqlalchemy.Column(sqlalchemy.JSON, nullable=False, default={})
enabled = sqlalchemy.Column(sqlalchemy.Boolean, nullable=False, default=True)
supported_event_patterns = sqlalchemy.Column(sqlalchemy.JSON, nullable=False, default=['*'])
created_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False, server_default=sqlalchemy.func.now())
updated_at = sqlalchemy.Column(
sqlalchemy.DateTime,
nullable=False,
server_default=sqlalchemy.func.now(),
onupdate=sqlalchemy.func.now(),
)
@@ -17,6 +17,7 @@ class Bot(Base):
use_pipeline_name = sqlalchemy.Column(sqlalchemy.String(255), nullable=True)
use_pipeline_uuid = sqlalchemy.Column(sqlalchemy.String(255), nullable=True)
pipeline_routing_rules = sqlalchemy.Column(sqlalchemy.JSON, nullable=False, server_default='[]')
event_bindings = sqlalchemy.Column(sqlalchemy.JSON, nullable=False, server_default='[]')
created_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False, server_default=sqlalchemy.func.now())
updated_at = sqlalchemy.Column(
sqlalchemy.DateTime,
@@ -16,6 +16,7 @@ from langbot.pkg.entity.persistence.base import Base
# Import all ORM models so they are registered with Base.metadata
# This is required for autogenerate to detect model changes
from langbot.pkg.entity.persistence import (
agent, # noqa: F401
agent_run, # noqa: F401
agent_runner_state, # noqa: F401
apikey, # noqa: F401
@@ -0,0 +1,20 @@
"""Merge agent runner and MCP migration heads.
Revision ID: 0007_merge_agent_mcp_heads
Revises: 8d3a1f2c4b6e, 0006_normalize_mcp_remote_mode
Create Date: 2026-06-23
"""
# revision identifiers, used by Alembic.
revision = '0007_merge_agent_mcp_heads'
down_revision = ('8d3a1f2c4b6e', '0006_normalize_mcp_remote_mode')
branch_labels = None
depends_on = None
def upgrade() -> None:
pass
def downgrade() -> None:
pass
@@ -0,0 +1,66 @@
"""Add Agent product surface tables.
Revision ID: 0008_agent_product_surface
Revises: 0007_merge_agent_mcp_heads
Create Date: 2026-06-23
"""
from __future__ import annotations
from alembic import op
import sqlalchemy as sa
revision = '0008_agent_product_surface'
down_revision = '0007_merge_agent_mcp_heads'
branch_labels = None
depends_on = None
def _table_exists(inspector: sa.Inspector, table_name: str) -> bool:
return table_name in inspector.get_table_names()
def _column_exists(inspector: sa.Inspector, table_name: str, column_name: str) -> bool:
if not _table_exists(inspector, table_name):
return False
return any(column['name'] == column_name for column in inspector.get_columns(table_name))
def upgrade() -> None:
bind = op.get_bind()
inspector = sa.inspect(bind)
if not _table_exists(inspector, 'agents'):
op.create_table(
'agents',
sa.Column('uuid', sa.String(length=255), nullable=False),
sa.Column('name', sa.String(length=255), nullable=False),
sa.Column('description', sa.String(length=255), nullable=False, server_default=''),
sa.Column('emoji', sa.String(length=10), nullable=True),
sa.Column('kind', sa.String(length=50), nullable=False, server_default='agent'),
sa.Column('component_ref', sa.String(length=255), nullable=True),
sa.Column('config', sa.JSON(), nullable=False, server_default='{}'),
sa.Column('enabled', sa.Boolean(), nullable=False, server_default=sa.true()),
sa.Column('supported_event_patterns', sa.JSON(), nullable=False, server_default='["*"]'),
sa.Column('created_at', sa.DateTime(), server_default=sa.func.now(), nullable=False),
sa.Column('updated_at', sa.DateTime(), server_default=sa.func.now(), nullable=False),
sa.PrimaryKeyConstraint('uuid'),
sa.UniqueConstraint('uuid'),
)
if _table_exists(inspector, 'bots') and not _column_exists(inspector, 'bots', 'event_bindings'):
with op.batch_alter_table('bots') as batch_op:
batch_op.add_column(sa.Column('event_bindings', sa.JSON(), nullable=False, server_default='[]'))
def downgrade() -> None:
bind = op.get_bind()
inspector = sa.inspect(bind)
if _table_exists(inspector, 'bots') and _column_exists(inspector, 'bots', 'event_bindings'):
with op.batch_alter_table('bots') as batch_op:
batch_op.drop_column('event_bindings')
if _table_exists(inspector, 'agents'):
op.drop_table('agents')
+692 -118
View File
@@ -3,7 +3,10 @@ from __future__ import annotations
import asyncio
import json
import re
import time
import traceback
import typing
import uuid
import sqlalchemy
from ..core import app, entities as core_entities, taskmgr
@@ -14,14 +17,30 @@ from ..entity.persistence import bot as persistence_bot
from ..entity.persistence import pipeline as persistence_pipeline
from ..entity.errors import platform as platform_errors
from ..agent.runner.host_models import (
AgentBinding,
AgentEventEnvelope,
BindingScope,
DeliveryPolicy,
ResourcePolicy,
StatePolicy,
)
from .logger import EventLogger
import langbot_plugin.api.entities.builtin.provider.session as provider_session
import langbot_plugin.api.entities.builtin.provider.message as provider_message
import langbot_plugin.api.entities.events as plugin_events
import langbot_plugin.api.entities.builtin.platform.events as platform_events
import langbot_plugin.api.entities.builtin.platform.message as platform_message
import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter
from langbot_plugin.api.entities.builtin.agent_runner.event import (
ActorContext,
SubjectContext,
RawEventRef,
)
from langbot_plugin.api.entities.builtin.agent_runner.input import AgentInput
from langbot_plugin.api.entities.builtin.agent_runner.delivery import DeliveryContext
class RuntimeBot:
@@ -77,6 +96,7 @@ class RuntimeBot:
PIPELINE_DISCARD = '__discard__'
PIPELINE_DISCARD_DISPLAY_NAME = 'Discarded'
EVENT_DATA_MAX_STRING_BYTES = 512
@staticmethod
def _eba_event_to_plugin_event(event: platform_events.EBAEvent) -> plugin_events.BaseEventModel | None:
@@ -103,6 +123,572 @@ class RuntimeBot:
return None
@staticmethod
def _match_event_pattern(event_type: str, pattern: str) -> bool:
if not event_type or not pattern:
return False
if pattern == '*':
return True
if pattern.endswith('.*'):
return event_type.startswith(f'{pattern[:-2]}.')
return event_type == pattern
@classmethod
def _is_message_event_type(cls, event_type: str) -> bool:
return cls._match_event_pattern(event_type, 'message.*')
@classmethod
def _agent_supports_event_type(
cls,
supported_patterns: list[str] | None,
event_type: str,
) -> bool:
return any(cls._match_event_pattern(event_type, pattern) for pattern in (supported_patterns or ['*']))
@staticmethod
def _get_nested_value(data: dict[str, typing.Any], path: str) -> typing.Any:
current: typing.Any = data
for key in path.split('.'):
if isinstance(current, dict):
current = current.get(key)
else:
current = getattr(current, key, None)
if current is None:
return None
return current
@classmethod
def _match_event_filter(
cls,
event_data: dict[str, typing.Any],
event_filter: dict[str, typing.Any],
) -> bool:
field = str(event_filter.get('field') or event_filter.get('path') or '').strip()
if not field:
return True
operator = str(event_filter.get('operator') or 'eq')
expected = event_filter.get('value')
actual = cls._get_nested_value(event_data, field)
if operator == 'eq':
return actual == expected
if operator == 'neq':
return actual != expected
if operator == 'contains':
if isinstance(actual, (list, tuple, set)):
return expected in actual
return str(expected) in str(actual or '')
if operator == 'not_contains':
if isinstance(actual, (list, tuple, set)):
return expected not in actual
return str(expected) not in str(actual or '')
if operator == 'starts_with':
return str(actual or '').startswith(str(expected))
if operator == 'regex':
try:
return bool(re.search(str(expected), str(actual or '')))
except re.error:
return False
return False
@classmethod
def _match_event_filters(
cls,
event: platform_events.EBAEvent,
filters: typing.Any,
) -> bool:
if not filters:
return True
if not isinstance(filters, list):
return False
event_data = cls._safe_model_dump(event)
return all(
cls._match_event_filter(event_data, event_filter)
for event_filter in filters
if isinstance(event_filter, dict)
)
def _resolve_eba_event_binding(
self,
event: platform_events.EBAEvent,
event_type: str,
) -> dict[str, typing.Any] | None:
"""Resolve the highest priority Bot event binding for a platform event."""
raw_bindings = self.bot_entity.event_bindings or []
if isinstance(raw_bindings, str):
try:
raw_bindings = json.loads(raw_bindings)
except json.JSONDecodeError:
raw_bindings = []
if not isinstance(raw_bindings, list):
return None
matched: list[tuple[int, int, dict[str, typing.Any]]] = []
for index, binding in enumerate(raw_bindings):
if not isinstance(binding, dict) or not binding.get('enabled', True):
continue
event_pattern = str(binding.get('event_pattern') or '')
if not self._match_event_pattern(event_type, event_pattern):
continue
if not self._match_event_filters(event, binding.get('filters')):
continue
priority = int(binding.get('priority') or 0)
order = int(binding.get('order', index))
matched.append((priority, -order, binding))
if not matched:
return None
matched.sort(key=lambda item: (item[0], item[1]), reverse=True)
return matched[0][2]
@staticmethod
def _safe_model_dump(model: typing.Any) -> dict[str, typing.Any]:
if model is None:
return {}
if hasattr(model, 'model_dump'):
try:
return model.model_dump(mode='json')
except TypeError:
try:
return model.model_dump()
except Exception:
return {}
except Exception:
return {}
if isinstance(model, dict):
return model
return {}
@classmethod
def _compact_event_data(cls, event: platform_events.EBAEvent) -> dict[str, typing.Any]:
raw_event_data = cls._safe_model_dump(event)
compact: dict[str, typing.Any] = {}
for key, value in raw_event_data.items():
if key == 'source_platform_object' or key.startswith('_'):
continue
if value is None or isinstance(value, (bool, int, float)):
compact[key] = value
continue
if isinstance(value, str):
if len(value.encode('utf-8')) <= cls.EVENT_DATA_MAX_STRING_BYTES:
compact[key] = value
continue
if isinstance(value, (list, dict)):
try:
encoded = json.dumps(value, ensure_ascii=False)
except (TypeError, ValueError):
continue
if len(encoded.encode('utf-8')) <= cls.EVENT_DATA_MAX_STRING_BYTES:
compact[key] = value
return compact
@staticmethod
def _get_entity_id(entity: typing.Any) -> str | None:
entity_id = getattr(entity, 'id', None)
if entity_id is None and isinstance(entity, dict):
entity_id = entity.get('id')
if entity_id is None or entity_id == '':
return None
return str(entity_id)
@staticmethod
def _get_entity_name(entity: typing.Any) -> str | None:
if entity is None:
return None
if hasattr(entity, 'get_name'):
try:
name = entity.get_name()
if name:
return str(name)
except Exception:
pass
for attr in ('nickname', 'member_name', 'name', 'display_name'):
value = getattr(entity, attr, None)
if value:
return str(value)
if isinstance(entity, dict):
for attr in ('nickname', 'member_name', 'name', 'display_name'):
value = entity.get(attr)
if value:
return str(value)
return None
@classmethod
def _infer_actor_context(cls, event: platform_events.EBAEvent) -> ActorContext | None:
actor = getattr(event, 'sender', None) or getattr(event, 'member', None) or getattr(event, 'user', None)
actor_id = cls._get_entity_id(actor)
actor_name = cls._get_entity_name(actor)
if actor_id is None:
user_id = getattr(event, 'user_id', None)
if user_id:
actor_id = str(user_id)
if actor_id is None:
return None
return ActorContext(
actor_type='user',
actor_id=actor_id,
actor_name=actor_name,
metadata={},
)
@classmethod
def _infer_subject_context(cls, event: platform_events.EBAEvent) -> SubjectContext:
group = getattr(event, 'group', None)
if group is not None:
group_id = cls._get_entity_id(group)
return SubjectContext(
subject_type='group',
subject_id=group_id,
data={'group_name': cls._get_entity_name(group)},
)
message_id = getattr(event, 'message_id', None)
if message_id:
return SubjectContext(
subject_type='message',
subject_id=str(message_id),
data={},
)
feedback_id = getattr(event, 'feedback_id', None)
if feedback_id:
return SubjectContext(
subject_type='feedback',
subject_id=str(feedback_id),
data={'message_id': getattr(event, 'message_id', None)},
)
action = getattr(event, 'action', None)
if action:
return SubjectContext(
subject_type='platform_action',
subject_id=str(action),
data={},
)
return SubjectContext(
subject_type='event',
subject_id=getattr(event, 'type', None),
data={},
)
@staticmethod
def _session_to_reply_target(session_id: str | None) -> tuple[str | None, str | None]:
if not session_id or '_' not in session_id:
return None, None
target_type, target_id = session_id.split('_', 1)
if target_type == 'person':
target_type = 'person'
elif target_type == 'group':
target_type = 'group'
else:
return None, None
return target_type, target_id or None
@classmethod
def _infer_reply_target(
cls,
event: platform_events.EBAEvent,
) -> tuple[str | None, str | None, dict[str, typing.Any]]:
metadata: dict[str, typing.Any] = {}
group = getattr(event, 'group', None)
group_id = cls._get_entity_id(group)
if group_id:
metadata['group_id'] = group_id
return 'group', group_id, metadata
chat_id = getattr(event, 'chat_id', None)
chat_type = getattr(event, 'chat_type', None)
chat_type_value = getattr(chat_type, 'value', chat_type)
if chat_id:
metadata['chat_id'] = str(chat_id)
if chat_type_value == 'group':
return 'group', str(chat_id), metadata
return 'person', str(chat_id), metadata
session_target_type, session_target_id = cls._session_to_reply_target(getattr(event, 'session_id', None))
if session_target_type and session_target_id:
return session_target_type, session_target_id, metadata
raw_data = getattr(event, 'data', None)
if isinstance(raw_data, dict):
target_type = raw_data.get('target_type') or raw_data.get('chat_type')
target_id = (
raw_data.get('target_id')
or raw_data.get('chat_id')
or raw_data.get('group_id')
or raw_data.get('user_id')
)
if target_type and target_id:
return str(target_type), str(target_id), metadata
return None, None, metadata
@classmethod
def _build_agent_input(cls, event: platform_events.EBAEvent) -> AgentInput:
text = None
contents: list[dict[str, typing.Any]] = []
message_chain = getattr(event, 'message_chain', None)
if message_chain:
text_parts: list[str] = []
try:
for component in message_chain:
if isinstance(component, platform_message.Plain):
text_parts.append(component.text)
elif isinstance(component, platform_message.Image):
if component.url:
contents.append({'type': 'image_url', 'image_url': {'url': component.url}})
elif component.base64:
contents.append({'type': 'image_base64', 'image_base64': component.base64})
except TypeError:
text_parts.append(str(message_chain))
text = ''.join(text_parts) or str(message_chain)
if text is None:
feedback_content = getattr(event, 'feedback_content', None)
if feedback_content:
text = str(feedback_content)
elif getattr(event, 'action', None):
text = str(getattr(event, 'action'))
else:
text = str(getattr(event, 'type', 'event'))
if text:
contents.insert(0, {'type': 'text', 'text': text})
return AgentInput(
text=text,
contents=contents,
attachments=[],
)
def _eba_event_to_agent_envelope(
self,
event: platform_events.EBAEvent,
adapter: abstract_platform_adapter.AbstractMessagePlatformAdapter,
) -> AgentEventEnvelope:
event_type = getattr(event, 'type', None) or event.__class__.__name__
event_time = getattr(event, 'timestamp', None) or time.time()
event_id = (
getattr(event, 'message_id', None) or getattr(event, 'feedback_id', None) or f'{event_type}:{uuid.uuid4()}'
)
target_type, target_id, target_metadata = self._infer_reply_target(event)
conversation_id = None
if target_type and target_id:
conversation_id = f'{target_type}_{target_id}'
elif getattr(event, 'session_id', None):
conversation_id = str(getattr(event, 'session_id'))
return AgentEventEnvelope(
event_id=f'platform:{self.bot_entity.uuid}:{event_id}',
event_type=event_type,
event_time=int(event_time) if isinstance(event_time, (int, float)) else None,
source='platform',
source_event_type=event_type,
bot_id=self.bot_entity.uuid,
workspace_id=None,
conversation_id=conversation_id,
thread_id=None,
actor=self._infer_actor_context(event),
subject=self._infer_subject_context(event),
input=self._build_agent_input(event),
delivery=DeliveryContext(
surface='platform',
reply_target={
'target_type': target_type,
'target_id': target_id,
'message_id': getattr(event, 'message_id', None),
**target_metadata,
},
supports_streaming=False,
supports_edit=False,
supports_reaction=False,
platform_capabilities={
'adapter': adapter.__class__.__name__,
'event_type': event_type,
},
),
raw_ref=RawEventRef(ref_id=str(event_id), storage_key=None),
data=self._compact_event_data(event),
)
@staticmethod
def _agent_product_to_binding(
agent: dict[str, typing.Any],
event_binding: dict[str, typing.Any],
event_type: str,
bot_uuid: str,
) -> AgentBinding | None:
config = agent.get('config') if isinstance(agent, dict) else None
if not isinstance(config, dict):
return None
runner = config.get('runner')
runner_id = None
if isinstance(runner, dict):
runner_id = runner.get('id')
runner_id = runner_id or agent.get('component_ref')
if not runner_id:
return None
runner_config_map = config.get('runner_config')
runner_config = {}
if isinstance(runner_config_map, dict):
runner_config = runner_config_map.get(runner_id) or {}
return AgentBinding(
binding_id=f'bot:{bot_uuid}:{event_binding.get("id") or uuid.uuid4()}',
scope=BindingScope(scope_type='bot', scope_id=bot_uuid),
event_types=[event_type],
runner_id=runner_id,
runner_config=runner_config,
resource_policy=ResourcePolicy(),
state_policy=StatePolicy(state_scopes=['conversation', 'actor', 'subject', 'runner']),
delivery_policy=DeliveryPolicy(enable_streaming=False, enable_reply=True),
enabled=True,
agent_id=agent.get('uuid'),
)
@staticmethod
def _provider_content_to_text(content: typing.Any) -> str:
if content is None:
return ''
if isinstance(content, str):
return content
if isinstance(content, list):
parts: list[str] = []
for item in content:
item_data = item.model_dump(mode='json') if hasattr(item, 'model_dump') else item
if isinstance(item_data, dict):
if item_data.get('type') == 'text' and item_data.get('text') is not None:
parts.append(str(item_data.get('text')))
elif item_data.get('text') is not None:
parts.append(str(item_data.get('text')))
elif item_data is not None:
parts.append(str(item_data))
return ''.join(parts)
return str(content)
@classmethod
def _provider_output_to_text(cls, result: provider_message.Message | provider_message.MessageChunk) -> str:
if getattr(result, 'all_content', None):
return str(getattr(result, 'all_content'))
return cls._provider_content_to_text(getattr(result, 'content', None))
async def _deliver_agent_outputs(
self,
envelope: AgentEventEnvelope,
outputs: list[provider_message.Message | provider_message.MessageChunk],
) -> None:
if not outputs or not envelope.delivery.reply_target:
return
reply_target = envelope.delivery.reply_target
target_type = reply_target.get('target_type')
target_id = reply_target.get('target_id')
if not target_type or not target_id:
return
final_text = ''
for output in outputs:
output_text = self._provider_output_to_text(output)
if isinstance(output, provider_message.Message):
final_text = output_text or final_text
elif output_text:
final_text = output_text
if not final_text:
return
await self.adapter.send_message(
str(target_type),
str(target_id),
platform_message.MessageChain([platform_message.Plain(text=final_text)]),
)
async def _dispatch_eba_event_to_agent(
self,
event: platform_events.EBAEvent,
adapter: abstract_platform_adapter.AbstractMessagePlatformAdapter,
) -> None:
event_type = getattr(event, 'type', None) or event.__class__.__name__
event_binding = self._resolve_eba_event_binding(event, event_type)
if event_binding is None:
if isinstance(event, platform_events.MessageReceivedEvent):
await self._dispatch_eba_message_to_pipeline(event, adapter)
return
target_type = event_binding.get('target_type')
if target_type == 'discard':
if isinstance(event, platform_events.MessageReceivedEvent):
await self._dispatch_eba_message_to_pipeline(
event,
adapter,
pipeline_uuid=self.PIPELINE_DISCARD,
routed_by_event_binding=True,
)
return
await self.logger.info(f'EBA event {event_type} discarded by event binding')
return
if target_type == 'pipeline':
if not self._is_message_event_type(event_type):
await self.logger.warning(f'EBA event {event_type} ignored Pipeline target for non-message event')
return
await self._dispatch_eba_message_to_pipeline(
event,
adapter,
pipeline_uuid=event_binding.get('target_uuid'),
routed_by_event_binding=True,
)
return
if target_type != 'agent':
await self.logger.warning(f'EBA event {event_type} ignored unsupported target type {target_type}')
return
target_uuid = event_binding.get('target_uuid')
agent = await self.ap.agent_service.get_agent(target_uuid)
if not agent or agent.get('kind') != 'agent':
await self.logger.warning(f'EBA event {event_type} target agent not found: {target_uuid}')
return
if not agent.get('enabled', True):
await self.logger.info(f'EBA event {event_type} target agent disabled: {target_uuid}')
return
if not self._agent_supports_event_type(agent.get('supported_event_patterns'), event_type):
await self.logger.info(f'EBA event {event_type} target agent does not support this event: {target_uuid}')
return
binding = self._agent_product_to_binding(agent, event_binding, event_type, self.bot_entity.uuid)
if binding is None:
await self.logger.warning(f'EBA event {event_type} target agent has no runner: {target_uuid}')
return
envelope = self._eba_event_to_agent_envelope(event, adapter)
outputs: list[provider_message.Message | provider_message.MessageChunk] = []
try:
async for output in self.ap.agent_run_orchestrator.run(envelope, binding):
outputs.append(output)
except Exception:
await self.logger.error(f'Failed to run Agent for EBA event {event_type}: {traceback.format_exc()}')
return
try:
await self._deliver_agent_outputs(envelope, outputs)
except Exception:
await self.logger.error(
f'Failed to deliver Agent output for EBA event {event_type}: {traceback.format_exc()}'
)
def resolve_pipeline_uuid(
self,
launcher_type: str,
@@ -222,128 +808,115 @@ class RuntimeBot:
except Exception as e:
await self.logger.error(f'Failed to record discarded message: {e}')
async def _handle_legacy_message_event(
self,
event: platform_events.FriendMessage | platform_events.GroupMessage,
adapter: abstract_platform_adapter.AbstractMessagePlatformAdapter,
pipeline_uuid_override: str | None = None,
routed_by_event_binding: bool = False,
) -> None:
is_group_message = isinstance(event, platform_events.GroupMessage)
launcher_kind = 'group' if is_group_message else 'person'
launcher_type = (
provider_session.LauncherTypes.GROUP if is_group_message else provider_session.LauncherTypes.PERSON
)
launcher_id = event.group.id if is_group_message else event.sender.id
sender_id = event.sender.id
image_components = [
component for component in event.message_chain if isinstance(component, platform_message.Image)
]
await self.logger.info(
f'{event.message_chain}',
images=image_components,
message_session_id=f'{launcher_kind}_{launcher_id}',
)
skip_pipeline = False
if hasattr(self.ap, 'webhook_pusher') and self.ap.webhook_pusher:
if is_group_message:
skip_pipeline = await self.ap.webhook_pusher.push_group_message(
event, self.bot_entity.uuid, adapter.__class__.__name__
)
else:
skip_pipeline = await self.ap.webhook_pusher.push_person_message(
event, self.bot_entity.uuid, adapter.__class__.__name__
)
if skip_pipeline:
await self.logger.info(f'Pipeline skipped for {launcher_kind} message due to webhook response')
return
if hasattr(adapter, 'get_launcher_id'):
custom_launcher_id = adapter.get_launcher_id(event)
if custom_launcher_id:
launcher_id = custom_launcher_id
if pipeline_uuid_override is None:
message_text = str(event.message_chain)
element_types = [comp.type for comp in event.message_chain]
pipeline_uuid, routed_by_rule = self.resolve_pipeline_uuid(
launcher_kind, launcher_id, message_text, element_types
)
else:
pipeline_uuid = pipeline_uuid_override
routed_by_rule = routed_by_event_binding
if pipeline_uuid == self.PIPELINE_DISCARD:
await self.logger.info(f'{launcher_kind.title()} message discarded by routing rule')
await self._record_discarded_message(
launcher_type,
launcher_id,
sender_id,
event,
event.message_chain,
)
return
await self.ap.msg_aggregator.add_message(
bot_uuid=self.bot_entity.uuid,
launcher_type=launcher_type,
launcher_id=launcher_id,
sender_id=sender_id,
message_event=event,
message_chain=event.message_chain,
adapter=adapter,
pipeline_uuid=pipeline_uuid,
routed_by_rule=routed_by_rule,
)
async def _dispatch_eba_message_to_pipeline(
self,
event: platform_events.EBAEvent,
adapter: abstract_platform_adapter.AbstractMessagePlatformAdapter,
pipeline_uuid: str | None = None,
routed_by_event_binding: bool = False,
) -> None:
if not isinstance(event, platform_events.MessageReceivedEvent):
event_type = getattr(event, 'type', None) or event.__class__.__name__
await self.logger.warning(f'EBA event {event_type} cannot be dispatched to legacy Pipeline')
return
await self._handle_legacy_message_event(
event.to_legacy_event(),
adapter,
pipeline_uuid_override=pipeline_uuid,
routed_by_event_binding=routed_by_event_binding,
)
async def initialize(self):
async def on_friend_message(
event: platform_events.FriendMessage,
adapter: abstract_platform_adapter.AbstractMessagePlatformAdapter,
):
image_components = [
component for component in event.message_chain if isinstance(component, platform_message.Image)
]
await self.logger.info(
f'{event.message_chain}',
images=image_components,
message_session_id=f'person_{event.sender.id}',
)
# Push to webhooks and check if pipeline should be skipped
skip_pipeline = False
if hasattr(self.ap, 'webhook_pusher') and self.ap.webhook_pusher:
skip_pipeline = await self.ap.webhook_pusher.push_person_message(
event, self.bot_entity.uuid, adapter.__class__.__name__
)
# Only add to query pool if no webhook requested to skip pipeline
if not skip_pipeline:
launcher_id = event.sender.id
if hasattr(adapter, 'get_launcher_id'):
custom_launcher_id = adapter.get_launcher_id(event)
if custom_launcher_id:
launcher_id = custom_launcher_id
message_text = str(event.message_chain)
element_types = [comp.type for comp in event.message_chain]
pipeline_uuid, routed_by_rule = self.resolve_pipeline_uuid(
'person', launcher_id, message_text, element_types
)
if pipeline_uuid == self.PIPELINE_DISCARD:
await self.logger.info('Person message discarded by routing rule')
await self._record_discarded_message(
provider_session.LauncherTypes.PERSON,
launcher_id,
event.sender.id,
event,
event.message_chain,
)
return
await self.ap.msg_aggregator.add_message(
bot_uuid=self.bot_entity.uuid,
launcher_type=provider_session.LauncherTypes.PERSON,
launcher_id=launcher_id,
sender_id=event.sender.id,
message_event=event,
message_chain=event.message_chain,
adapter=adapter,
pipeline_uuid=pipeline_uuid,
routed_by_rule=routed_by_rule,
)
else:
await self.logger.info('Pipeline skipped for person message due to webhook response')
await self._handle_legacy_message_event(event, adapter)
async def on_group_message(
event: platform_events.GroupMessage,
adapter: abstract_platform_adapter.AbstractMessagePlatformAdapter,
):
image_components = [
component for component in event.message_chain if isinstance(component, platform_message.Image)
]
await self.logger.info(
f'{event.message_chain}',
images=image_components,
message_session_id=f'group_{event.group.id}',
)
# Push to webhooks and check if pipeline should be skipped
skip_pipeline = False
if hasattr(self.ap, 'webhook_pusher') and self.ap.webhook_pusher:
skip_pipeline = await self.ap.webhook_pusher.push_group_message(
event, self.bot_entity.uuid, adapter.__class__.__name__
)
# Only add to query pool if no webhook requested to skip pipeline
if not skip_pipeline:
launcher_id = event.group.id
if hasattr(adapter, 'get_launcher_id'):
custom_launcher_id = adapter.get_launcher_id(event)
if custom_launcher_id:
launcher_id = custom_launcher_id
message_text = str(event.message_chain)
element_types = [comp.type for comp in event.message_chain]
pipeline_uuid, routed_by_rule = self.resolve_pipeline_uuid(
'group', launcher_id, message_text, element_types
)
if pipeline_uuid == self.PIPELINE_DISCARD:
await self.logger.info('Group message discarded by routing rule')
await self._record_discarded_message(
provider_session.LauncherTypes.GROUP,
launcher_id,
event.sender.id,
event,
event.message_chain,
)
return
await self.ap.msg_aggregator.add_message(
bot_uuid=self.bot_entity.uuid,
launcher_type=provider_session.LauncherTypes.GROUP,
launcher_id=launcher_id,
sender_id=event.sender.id,
message_event=event,
message_chain=event.message_chain,
adapter=adapter,
pipeline_uuid=pipeline_uuid,
routed_by_rule=routed_by_rule,
)
else:
await self.logger.info('Pipeline skipped for group message due to webhook response')
await self._handle_legacy_message_event(event, adapter)
self.adapter.register_listener(platform_events.FriendMessage, on_friend_message)
self.adapter.register_listener(platform_events.GroupMessage, on_group_message)
@@ -398,13 +971,14 @@ class RuntimeBot:
):
event.bot_uuid = self.bot_entity.uuid
plugin_event = self._eba_event_to_plugin_event(event)
if plugin_event is None:
return
try:
await self.ap.plugin_connector.emit_event(plugin_event)
except Exception:
await self.logger.error(f'Failed to dispatch EBA event to plugins: {traceback.format_exc()}')
if plugin_event is not None:
try:
await self.ap.plugin_connector.emit_event(plugin_event)
except Exception:
await self.logger.error(f'Failed to dispatch EBA event to plugins: {traceback.format_exc()}')
await self._dispatch_eba_event_to_agent(event, adapter)
self.adapter.register_listener(platform_events.EBAEvent, on_eba_event)