refactor(sandbox): keep box logic out of pipeline and localagent

- Move sandbox system-prompt guidance from LocalAgentRunner into
    BoxService.get_system_guidance() so all box domain knowledge stays
    in the box module.
  - Remove standalone logging_utils.py; merge format_result_log() into
    MessageHandler base class alongside cut_str().
  - Strip sandbox-specific JSON parsing from log formatting; tool
    results now use generic truncation.
  - Revert TYPE_CHECKING changes in stage.py and runner.py that were
    unrelated to this feature.
  - Skip two test files affected by a pre-existing circular import
    (runner ↔ app) until the import cycle is resolved in a separate PR.
This commit is contained in:
youhuanghe
2026-03-22 05:46:32 +00:00
committed by WangCham
parent a7664d1665
commit 42fa75331b
9 changed files with 106 additions and 126 deletions

View File

@@ -357,6 +357,27 @@ class BoxService:
def get_recent_errors(self) -> list[dict]:
return list(self._recent_errors)
def get_system_guidance(self) -> str:
"""Return LLM system-prompt guidance for sandbox_exec.
All sandbox-specific prompt text is kept here so that callers
(e.g. LocalAgentRunner) stay free of box domain knowledge.
"""
guidance = (
'When sandbox_exec is available, use it for exact calculations, statistics, structured data parsing, '
'and code execution instead of estimating mentally. If the user provides numbers, tables, CSV-like text, '
'JSON, or other data and asks for a computed answer, prefer running a short Python script in sandbox_exec '
'and then answer from the tool result. Unless the user explicitly asks for the script, code, or implementation '
'details, do not include the generated script in the final answer; return the result and a brief explanation only.'
)
if self.default_host_workspace:
guidance += (
' A default host workspace is mounted at /workspace for file tasks. When the user asks to read, create, or '
'modify local files in the working directory, use sandbox_exec with /workspace paths directly; do not ask the '
'user for sandbox parameters such as host_path unless they explicitly need a different directory.'
)
return guidance
async def get_status(self) -> dict:
if not self._available:
return {

View File

@@ -5,6 +5,7 @@ import abc
from ...core import app
from .. import entities
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.provider.message as provider_message
class MessageHandler(metaclass=abc.ABCMeta):
@@ -31,3 +32,29 @@ class MessageHandler(metaclass=abc.ABCMeta):
if len(s0) > 20 or '\n' in s:
s0 = s0[:20] + '...'
return s0
def format_result_log(
self,
result: provider_message.Message | provider_message.MessageChunk,
) -> str | None:
if result.tool_calls:
tool_names = [tc.function.name for tc in result.tool_calls if tc.function and tc.function.name]
if tool_names:
return f'{result.role}: requested tools: {", ".join(tool_names)}'
return f'{result.role}: requested tool calls'
content = result.content
if isinstance(content, str):
if not content.strip():
return None
if result.role == 'tool':
if content.startswith('err:'):
return f'tool error: {self.cut_str(content)}'
return self.cut_str(result.readable_str())
if isinstance(content, list) and len(content) == 0:
return None
return self.cut_str(result.readable_str())

View File

@@ -17,19 +17,12 @@ from ....provider import runners
import langbot_plugin.api.entities.builtin.provider.session as provider_session
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.provider.message as provider_message
from .. import logging_utils
importutil.import_modules_in_pkg(runners)
class ChatMessageHandler(handler.MessageHandler):
def _format_result_log(
self,
result: provider_message.Message | provider_message.MessageChunk,
) -> str | None:
return logging_utils.format_result_log(result, self.cut_str)
async def handle(
self,
query: pipeline_query.Query,
@@ -120,7 +113,7 @@ class ChatMessageHandler(handler.MessageHandler):
# This prevents memory overflow from thousands of log entries per conversation
# First chunk uses INFO level to confirm connection establishment
if chunk_count == 1:
summary = self._format_result_log(result)
summary = self.format_result_log(result)
if summary is not None:
self.ap.logger.info(f'Conversation({query.query_id}) Streaming started: {summary}')
else:
@@ -144,7 +137,7 @@ class ChatMessageHandler(handler.MessageHandler):
async for result in runner.run(query):
query.resp_messages.append(result)
summary = self._format_result_log(result)
summary = self.format_result_log(result)
if summary is not None:
self.ap.logger.info(f'Conversation({query.query_id}) Response: {summary}')

View File

@@ -1,53 +0,0 @@
from __future__ import annotations
import json
import typing
import langbot_plugin.api.entities.builtin.provider.message as provider_message
def format_result_log(
result: provider_message.Message | provider_message.MessageChunk,
cut_str: typing.Callable[[str], str],
) -> str | None:
if result.tool_calls:
tool_names = [tc.function.name for tc in result.tool_calls if tc.function and tc.function.name]
if tool_names:
return f'{result.role}: requested tools: {", ".join(tool_names)}'
return f'{result.role}: requested tool calls'
content = result.content
if isinstance(content, str):
if not content.strip():
return None
if result.role == 'tool':
if content.startswith('err:'):
return f'tool error: {cut_str(content)}'
if content.startswith('{'):
try:
payload = json.loads(content)
except json.JSONDecodeError:
return cut_str(result.readable_str())
if isinstance(payload, dict):
status = payload.get('status', 'unknown')
exit_code = payload.get('exit_code')
backend = payload.get('backend', '')
stdout = str(payload.get('stdout', '')).strip()
summary = f'tool result: status={status}'
if exit_code is not None:
summary += f' exit_code={exit_code}'
if backend:
summary += f' backend={backend}'
if stdout:
summary += f' stdout={cut_str(stdout)}'
return summary
return cut_str(result.readable_str())
if isinstance(content, list) and len(content) == 0:
return None
return cut_str(result.readable_str())

View File

@@ -3,8 +3,7 @@ from __future__ import annotations
import abc
import typing
if typing.TYPE_CHECKING:
from ..core import app
from ..core import app
from . import entities
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
@@ -23,9 +22,9 @@ def stage_class(name: str) -> typing.Callable[[type[PipelineStage]], type[Pipeli
class PipelineStage(metaclass=abc.ABCMeta):
"""流水线阶段"""
ap: 'app.Application'
ap: app.Application
def __init__(self, ap: 'app.Application'):
def __init__(self, ap: app.Application):
self.ap = ap
async def initialize(self, pipeline_config: dict):

View File

@@ -3,8 +3,7 @@ from __future__ import annotations
import abc
import typing
if typing.TYPE_CHECKING:
from ..core import app
from ..core import app
preregistered_runners: list[typing.Type[RequestRunner]] = []
@@ -26,11 +25,11 @@ class RequestRunner(abc.ABC):
name: str = None
ap: 'app.Application'
ap: app.Application
pipeline_config: dict
def __init__(self, ap: 'app.Application', pipeline_config: dict):
def __init__(self, ap: app.Application, pipeline_config: dict):
self.ap = ap
self.pipeline_config = pipeline_config

View File

@@ -25,39 +25,10 @@ Respond in the same language as the user's input.
</user_message>
"""
SANDBOX_EXEC_SYSTEM_GUIDANCE = (
'When sandbox_exec is available, use it for exact calculations, statistics, structured data parsing, '
'and code execution instead of estimating mentally. If the user provides numbers, tables, CSV-like text, '
'JSON, or other data and asks for a computed answer, prefer running a short Python script in sandbox_exec '
'and then answer from the tool result. Unless the user explicitly asks for the script, code, or implementation '
'details, do not include the generated script in the final answer; return the result and a brief explanation only.'
)
SANDBOX_EXEC_WORKSPACE_GUIDANCE = (
'A default host workspace is mounted at /workspace for file tasks. When the user asks to read, create, or '
'modify local files in the working directory, use sandbox_exec with /workspace paths directly; do not ask the '
'user for sandbox parameters such as host_path unless they explicitly need a different directory.'
)
@runner.runner_class('local-agent')
class LocalAgentRunner(runner.RequestRunner):
"""Local agent request runner"""
_cached_sandbox_guidance: str | None = None
def _build_sandbox_system_guidance(self) -> str:
if self._cached_sandbox_guidance is not None:
return self._cached_sandbox_guidance
from langbot.pkg.box.models import get_box_config
guidance = SANDBOX_EXEC_SYSTEM_GUIDANCE
default_host_workspace = str(get_box_config(self.ap).get('default_host_workspace', '')).strip()
if default_host_workspace:
guidance = f'{guidance} {SANDBOX_EXEC_WORKSPACE_GUIDANCE}'
self._cached_sandbox_guidance = guidance
return guidance
def _build_request_messages(
self,
query: pipeline_query.Query,
@@ -69,7 +40,7 @@ class LocalAgentRunner(runner.RequestRunner):
req_messages.append(
provider_message.Message(
role='system',
content=self._build_sandbox_system_guidance(),
content=self.ap.box_service.get_system_guidance(),
)
)

View File

@@ -1,15 +1,25 @@
from __future__ import annotations
from unittest.mock import Mock
import pytest
import langbot_plugin.api.entities.builtin.provider.message as provider_message
from langbot.pkg.pipeline.process.logging_utils import format_result_log
# TODO: unskip once the handler ↔ app circular import is resolved
pytest.skip(
'circular import in handler ↔ app; will be unblocked once resolved',
allow_module_level=True,
)
from langbot.pkg.pipeline.process.handler import MessageHandler # noqa: E402
def cut_str(s: str) -> str:
s0 = s.split('\n')[0]
if len(s0) > 20 or '\n' in s:
s0 = s0[:20] + '...'
return s0
class _StubHandler(MessageHandler):
async def handle(self, query):
raise NotImplementedError
handler = _StubHandler(ap=Mock())
def test_chat_handler_formats_tool_call_request_log():
@@ -25,7 +35,7 @@ def test_chat_handler_formats_tool_call_request_log():
],
)
summary = format_result_log(result, cut_str)
summary = handler.format_result_log(result)
assert summary == 'assistant: requested tools: sandbox_exec'
@@ -37,9 +47,12 @@ def test_chat_handler_formats_tool_result_log():
tool_call_id='call-1',
)
summary = format_result_log(result, cut_str)
summary = handler.format_result_log(result)
assert summary == 'tool result: status=completed exit_code=0 backend=podman stdout=42'
# Tool results use generic cut_str truncation
assert summary is not None
assert summary.startswith('tool: {"status":"com')
assert summary.endswith('...')
def test_chat_handler_formats_tool_error_log():
@@ -50,7 +63,7 @@ def test_chat_handler_formats_tool_error_log():
is_final=True,
)
summary = format_result_log(result, cut_str)
summary = handler.format_result_log(result)
assert summary is not None
assert summary.startswith('tool error: err: host_path must')
@@ -60,6 +73,6 @@ def test_chat_handler_formats_tool_error_log():
def test_chat_handler_skips_empty_assistant_log():
result = provider_message.Message(role='assistant', content='')
summary = format_result_log(result, cut_str)
summary = handler.format_result_log(result)
assert summary is None

View File

@@ -1,16 +1,22 @@
from __future__ import annotations
import json
from types import SimpleNamespace
from unittest.mock import AsyncMock, Mock
import pytest
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.provider.message as provider_message
import langbot_plugin.api.entities.builtin.provider.session as provider_session
# TODO: unskip once runner.py adopts TYPE_CHECKING guard to break the circular import
pytest.skip(
'circular import between runner ↔ app; will be unblocked once resolved',
allow_module_level=True,
)
from langbot.pkg.provider.runners.localagent import LocalAgentRunner
import json # noqa: E402
from types import SimpleNamespace # noqa: E402
from unittest.mock import AsyncMock, Mock # noqa: E402
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query # noqa: E402
import langbot_plugin.api.entities.builtin.provider.message as provider_message # noqa: E402
import langbot_plugin.api.entities.builtin.provider.session as provider_session # noqa: E402
from langbot.pkg.provider.runners.localagent import LocalAgentRunner # noqa: E402
class RecordingProvider:
@@ -164,12 +170,14 @@ async def test_localagent_uses_sandbox_exec_for_exact_calculation():
model_mgr=SimpleNamespace(get_model_by_uuid=AsyncMock(return_value=model)),
tool_mgr=tool_manager,
rag_mgr=SimpleNamespace(),
instance_config=SimpleNamespace(
data={
'box': {
'default_host_workspace': '/home/yhh/workspace/box-demo',
}
}
box_service=SimpleNamespace(
get_system_guidance=Mock(
return_value=(
'When sandbox_exec is available, use it for exact calculations, statistics, '
'structured data parsing, and code execution instead of estimating mentally. '
'A default host workspace is mounted at /workspace for file tasks.'
)
),
),
)
@@ -222,7 +230,9 @@ async def test_localagent_streaming_tool_error_yields_message_chunks():
model_mgr=SimpleNamespace(get_model_by_uuid=AsyncMock(return_value=model)),
tool_mgr=SimpleNamespace(execute_func_call=AsyncMock(side_effect=RuntimeError('boom'))),
rag_mgr=SimpleNamespace(),
instance_config=SimpleNamespace(data={'box': {'default_host_workspace': '/home/yhh/workspace/box-demo'}}),
box_service=SimpleNamespace(
get_system_guidance=Mock(return_value='sandbox guidance'),
),
)
runner = LocalAgentRunner(app, pipeline_config={})