refactor(agent-runner): remove host context windowing

This commit is contained in:
huanghuoguoguo
2026-06-02 17:01:45 +08:00
parent afaf09ccc7
commit d0383e146e
26 changed files with 79 additions and 815 deletions

View File

@@ -114,6 +114,9 @@ class ConfigMigration:
if old_runner_name:
old_config = ai_config.get(old_runner_name, {})
if old_config:
old_config = dict(old_config)
if runner_id == OLD_RUNNER_TO_PLUGIN_RUNNER_ID['local-agent']:
old_config.pop('max-round', None)
return ConfigMigration.normalize_runner_config_for_migration(runner_id, old_config)
return {}

View File

@@ -296,8 +296,6 @@ class AgentRunContextBuilder:
adapter_context = {
'query_id': None,
'pipeline_uuid': binding.pipeline_uuid,
'max_round': binding.max_round, # For reference only
'adapter_messages': [],
'extra': {},
}
@@ -316,7 +314,7 @@ class AgentRunContextBuilder:
'state': state,
'runtime': runtime,
'config': binding.runner_config,
'bootstrap': None, # Optional - no messages inlined by default
'bootstrap': None,
'adapter': adapter_context,
'metadata': {}, # Additional metadata
}

View File

@@ -11,7 +11,6 @@ from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession
from sqlalchemy.orm import sessionmaker
from ...entity.persistence.event_log import EventLog
from ...entity.persistence.transcript import Transcript
class EventLogStore:

View File

@@ -8,8 +8,6 @@ import typing
import pydantic
from langbot_plugin.api.entities.builtin.agent_runner.event import (
AgentEventContext,
ConversationContext,
ActorContext,
SubjectContext,
RawEventRef,
@@ -172,6 +170,3 @@ class AgentBinding(pydantic.BaseModel):
# Fields for Pipeline adapter
pipeline_uuid: str | None = None
"""Pipeline UUID (for Pipeline adapter)."""
max_round: int | None = None
"""max-round (for Pipeline adapter bootstrap, not Protocol v1)."""

View File

@@ -133,16 +133,6 @@ class AgentRunOrchestrator:
# Merge prompt into adapter.extra for Pipeline adapter consumers.
if 'prompt' in adapter_context:
context['adapter']['extra']['prompt'] = adapter_context['prompt']
# Merge bootstrap if provided
if adapter_context.get('bootstrap'):
context['bootstrap'] = adapter_context['bootstrap']
# Also expose the bootstrap window through adapter metadata.
bootstrap_messages = adapter_context['bootstrap'].get('messages')
if bootstrap_messages:
context['adapter']['adapter_messages'] = bootstrap_messages
# Merge runtime metadata if provided
if adapter_context.get('runtime_metadata'):
context['runtime']['metadata'].update(adapter_context['runtime_metadata'])
# Set query_id if provided
if adapter_context.get('query_id'):
context['runtime']['query_id'] = adapter_context['query_id']

View File

@@ -29,7 +29,6 @@ from .host_models import (
DeliveryPolicy,
)
from . import events as runner_events
from ...pipeline.msgtrun.round_policy import select_max_round_messages
class PipelineAdapter:
@@ -38,7 +37,6 @@ class PipelineAdapter:
This adapter is responsible for:
- Converting Query to AgentEventEnvelope
- Converting Pipeline config to temporary AgentBinding
- Handling max-round as bootstrap policy
- Putting Query-only fields into adapter context
"""
@@ -118,10 +116,6 @@ class PipelineAdapter:
runner_config = ai_config.get('runner_config', {}).get(runner_id, {})
pipeline_uuid = getattr(query, 'pipeline_uuid', None)
# Extract max_round for adapter (used in bootstrap, not Protocol v1)
# Note: config uses 'max-round' with hyphen, not 'max_round' with underscore
max_round = runner_config.get('max-round') or ai_config.get('max-round')
# Build scope
scope = BindingScope(
scope_type="pipeline",
@@ -158,45 +152,8 @@ class PipelineAdapter:
delivery_policy=delivery_policy,
enabled=True,
pipeline_uuid=pipeline_uuid,
max_round=max_round,
)
@classmethod
def build_bootstrap_context(
cls,
query: pipeline_query.Query,
binding: AgentBinding,
) -> tuple[dict[str, typing.Any] | None, dict[str, typing.Any]]:
"""Build bootstrap messages and runtime metadata for Pipeline max-round."""
max_round = binding.max_round
source_messages = query.messages or []
if not max_round or max_round <= 0 or not source_messages:
return None, {}
packaged_messages = select_max_round_messages(source_messages, max_round)
bootstrap_messages = [cls._dump_message(msg) for msg in packaged_messages]
bootstrap = {
"messages": bootstrap_messages,
"summary": None,
"artifacts": [],
"metadata": {},
}
runtime_metadata = {
'context_packaging': {
'policy': {
'mode': 'max_round',
'max_round': max_round,
},
'history': {
'source': 'query.messages',
'source_total_count': len(source_messages),
'delivered_count': len(packaged_messages),
'messages_complete': len(packaged_messages) == len(source_messages),
},
},
}
return bootstrap, runtime_metadata
@classmethod
def build_adapter_context(
cls,
@@ -204,13 +161,10 @@ class PipelineAdapter:
binding: AgentBinding,
) -> dict[str, typing.Any]:
"""Build Query-derived fields for the Pipeline adapter entry."""
bootstrap, runtime_metadata = cls.build_bootstrap_context(query, binding)
return {
'params': cls.build_params(query),
'prompt': cls.build_prompt(query),
'bootstrap': bootstrap,
'query_id': getattr(query, 'query_id', None),
'runtime_metadata': runtime_metadata,
}
@classmethod

View File

@@ -19,7 +19,6 @@ default_stage_order = [
'BanSessionCheckStage', # 封禁会话检查
'PreContentFilterStage', # 内容过滤前置阶段
'PreProcessor', # 预处理器
'ConversationMessageTruncator', # 会话消息截断器
'RequireRateLimitOccupancy', # 请求速率限制占用
'MessageProcessor', # 处理器
'ReleaseRateLimitOccupancy', # 释放速率限制占用

View File

@@ -1,22 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('msg-truncator-cfg-migration', 9)
class MsgTruncatorConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'msg-truncate' not in self.ap.pipeline_cfg.data
async def run(self):
"""执行迁移"""
self.ap.pipeline_cfg.data['msg-truncate'] = {
'method': 'round',
'round': {'max-round': 10},
}
await self.ap.pipeline_cfg.dump_config()

View File

@@ -118,9 +118,6 @@ class DBMigrateV3Config(migration.DBMigration):
'runner': self.ap.provider_cfg.data['runner'],
}
pipeline_config['ai']['local-agent']['model'] = model_uuid
pipeline_config['ai']['local-agent']['max-round'] = self.ap.pipeline_cfg.data['msg-truncate']['round'][
'max-round'
]
pipeline_config['ai']['local-agent']['prompt'] = [
{

View File

@@ -1,39 +0,0 @@
from __future__ import annotations
from .. import stage, entities
from . import truncator
from ...utils import importutil
from ...agent.runner.config_migration import ConfigMigration
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
from . import truncators
importutil.import_modules_in_pkg(truncators)
@stage.stage_class('ConversationMessageTruncator')
class ConversationMessageTruncator(stage.PipelineStage):
"""Conversation message truncator
Used to truncate the conversation message chain to adapt to the LLM message length limit.
"""
trun: truncator.Truncator
async def initialize(self, pipeline_config: dict):
use_method = 'round'
for trun in truncator.preregistered_truncators:
if trun.name == use_method:
self.trun = trun(self.ap)
break
else:
raise ValueError(f'Unknown truncator: {use_method}')
async def process(self, query: pipeline_query.Query, stage_inst_name: str) -> entities.StageProcessResult:
"""处理"""
if ConfigMigration.resolve_runner_id(query.pipeline_config):
return entities.StageProcessResult(result_type=entities.ResultType.CONTINUE, new_query=query)
query = await self.trun.truncate(query)
return entities.StageProcessResult(result_type=entities.ResultType.CONTINUE, new_query=query)

View File

@@ -1,34 +0,0 @@
"""Shared max-round message window helpers for Pipeline behavior."""
from __future__ import annotations
import typing
DEFAULT_MAX_ROUND = 10
def get_max_round(config: dict[str, typing.Any]) -> typing.Any:
"""Return the configured Pipeline max-round value."""
return config.get('max-round', DEFAULT_MAX_ROUND)
def select_max_round_messages(
messages: list[typing.Any] | None,
max_round: typing.Any,
) -> list[typing.Any]:
"""Select a bounded recent message window by user-round count."""
if not messages:
return []
temp_messages: list[typing.Any] = []
current_round = 0
for msg in messages[::-1]:
if current_round < max_round:
temp_messages.append(msg)
if getattr(msg, 'role', None) == 'user':
current_round += 1
else:
break
return temp_messages[::-1]

View File

@@ -1,56 +0,0 @@
from __future__ import annotations
import typing
import abc
from ...core import app
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
preregistered_truncators: list[typing.Type[Truncator]] = []
def truncator_class(
name: str,
) -> typing.Callable[[typing.Type[Truncator]], typing.Type[Truncator]]:
"""截断器类装饰器
Args:
name (str): 截断器名称
Returns:
typing.Callable[[typing.Type[Truncator]], typing.Type[Truncator]]: 装饰器
"""
def decorator(cls: typing.Type[Truncator]) -> typing.Type[Truncator]:
assert issubclass(cls, Truncator)
cls.name = name
preregistered_truncators.append(cls)
return cls
return decorator
class Truncator(abc.ABC):
"""消息截断器基类"""
name: str
ap: app.Application
def __init__(self, ap: app.Application):
self.ap = ap
async def initialize(self):
pass
@abc.abstractmethod
async def truncate(self, query: pipeline_query.Query) -> pipeline_query.Query:
"""截断
一般只需要操作query.messages也可以扩展操作query.prompt, query.user_message。
请勿操作其他字段。
"""
pass

View File

@@ -1,29 +0,0 @@
from __future__ import annotations
from .. import truncator
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
from ....agent.runner.config_migration import ConfigMigration
from ..round_policy import (
get_max_round,
select_max_round_messages,
)
@truncator.truncator_class('round')
class RoundTruncator(truncator.Truncator):
"""Truncate the conversation message chain to adapt to the LLM message length limit."""
async def truncate(self, query: pipeline_query.Query) -> pipeline_query.Query:
"""截断"""
runner_id = ConfigMigration.resolve_runner_id(query.pipeline_config)
if runner_id:
runner_config = ConfigMigration.resolve_runner_config(query.pipeline_config, runner_id)
else:
runner_config = query.pipeline_config.get('msg-truncate', {}).get('round', {})
query.messages = select_max_round_messages(
query.messages,
get_max_round(runner_config),
)
return query

View File

@@ -28,7 +28,6 @@ from . import (
wrapper,
preproc,
ratelimit,
msgtrun,
)
importutil.import_modules_in_pkgs(
@@ -42,7 +41,6 @@ importutil.import_modules_in_pkgs(
wrapper,
preproc,
ratelimit,
msgtrun,
]
)
@@ -438,6 +436,9 @@ class PipelineManager:
# initialize stage containers according to pipeline_entity.stages
stage_containers: list[StageInstContainer] = []
for stage_name in pipeline_entity.stages:
if stage_name not in self.stage_dict:
self.ap.logger.warning(f'Pipeline stage {stage_name} is not registered; skipping')
continue
stage_containers.append(StageInstContainer(inst_name=stage_name, inst=self.stage_dict[stage_name](self.ap)))
for stage_container in stage_containers:

View File

@@ -34,11 +34,5 @@
"limit": 60
}
}
},
"msg-truncate": {
"method": "round",
"round": {
"max-round": 10
}
}
}
}