feat(box): unify native agent tools around exec/read/write/edit

This commit is contained in:
youhuanghe
2026-03-24 07:57:05 +00:00
committed by WangCham
parent 3f368c5764
commit 93104a947a
10 changed files with 519 additions and 114 deletions

View File

@@ -0,0 +1,98 @@
"""Three-layer security policy for LangBot Box.
The design separates concerns into three independent layers, aligned with
OpenCode / OpenClaw patterns:
1. **SandboxPolicy** *where* tools run (host vs sandbox).
2. **ToolPolicy** *which* tools are allowed (allow/deny lists).
3. **ElevatedPolicy** *whether* a single exec call may temporarily
escape the default sandbox boundary.
These three layers are orthogonal:
- ToolPolicy is a hard boundary; ``elevated`` cannot bypass a denied tool.
- SandboxPolicy decides the default execution location.
- ElevatedPolicy only affects ``exec`` and only when the framework allows it.
"""
from __future__ import annotations
import enum
from typing import Sequence
# ── Layer 1: Sandbox Policy ──────────────────────────────────────────
class SandboxMode(str, enum.Enum):
"""Determines when agent execution is routed through the sandbox."""
OFF = 'off'
"""Sandbox disabled; all exec runs on the host."""
NON_DEFAULT = 'non_default'
"""Only non-default sessions are sandboxed (e.g. sub-agents, MCP)."""
ALL = 'all'
"""Every agent exec call is routed through the sandbox."""
class SandboxPolicy:
"""Decides whether a given execution context should use the sandbox."""
def __init__(self, mode: SandboxMode = SandboxMode.ALL):
self.mode = mode
def should_sandbox(self, *, is_default_session: bool = True) -> bool:
if self.mode == SandboxMode.OFF:
return False
if self.mode == SandboxMode.ALL:
return True
# NON_DEFAULT: sandbox everything except the default session
return not is_default_session
# ── Layer 2: Tool Policy ─────────────────────────────────────────────
class ToolPolicy:
"""Controls which tools are available to the current agent/session.
Rules:
- ``deny`` always takes precedence over ``allow``.
- An empty ``allow`` list means "all tools allowed" (no allowlist filter).
- ``elevated`` cannot bypass a denied tool.
"""
def __init__(
self,
allow: Sequence[str] = (),
deny: Sequence[str] = (),
):
self._allow: frozenset[str] = frozenset(allow)
self._deny: frozenset[str] = frozenset(deny)
def is_tool_allowed(self, tool_name: str) -> bool:
if tool_name in self._deny:
return False
if self._allow and tool_name not in self._allow:
return False
return True
# ── Layer 3: Elevated Policy ─────────────────────────────────────────
class ElevatedPolicy:
"""Controls whether ``exec`` may request temporary privilege escalation.
``elevated`` only applies to the ``exec`` tool. It means "run this
command outside the default sandbox boundary" (e.g. with network, or
on the host). The framework decides whether to honor the request.
"""
def __init__(self, *, allow_elevated: bool = False, require_approval: bool = True):
self.allow_elevated = allow_elevated
self.require_approval = require_approval
def is_elevation_permitted(self) -> bool:
return self.allow_elevated

View File

@@ -105,9 +105,22 @@ class BoxService:
)
return self._serialize_result(result)
async def execute_sandbox_tool(self, parameters: dict, query: pipeline_query.Query) -> dict:
spec_payload = dict(parameters)
async def execute_tool(self, parameters: dict, query: pipeline_query.Query) -> dict:
"""Execute an agent-facing ``exec`` tool call.
Translates the agent-facing ``command`` field to the internal
``BoxSpec.cmd`` field and injects the session id from the query.
"""
spec_payload: dict = {'cmd': parameters['command']}
# Pass through allowed agent-facing fields
for key in ('workdir', 'timeout_sec', 'env'):
if key in parameters:
spec_payload[key] = parameters[key]
# Inject context the agent must not control
spec_payload.setdefault('session_id', str(query.query_id))
return await self.execute_spec_payload(spec_payload, query)
async def shutdown(self):
@@ -379,23 +392,23 @@ class BoxService:
return list(self._recent_errors)
def get_system_guidance(self) -> str:
"""Return LLM system-prompt guidance for sandbox_exec.
"""Return LLM system-prompt guidance for the exec tool.
All sandbox-specific prompt text is kept here so that callers
All execution-specific prompt text is kept here so that callers
(e.g. LocalAgentRunner) stay free of box domain knowledge.
"""
guidance = (
'When sandbox_exec is available, use it for exact calculations, statistics, structured data parsing, '
'When the exec tool is available, use it for exact calculations, statistics, structured data parsing, '
'and code execution instead of estimating mentally. If the user provides numbers, tables, CSV-like text, '
'JSON, or other data and asks for a computed answer, prefer running a short Python script in sandbox_exec '
'JSON, or other data and asks for a computed answer, prefer running a short Python script via exec '
'and then answer from the tool result. Unless the user explicitly asks for the script, code, or implementation '
'details, do not include the generated script in the final answer; return the result and a brief explanation only.'
)
if self.default_host_workspace:
guidance += (
' A default host workspace is mounted at /workspace for file tasks. When the user asks to read, create, or '
'modify local files in the working directory, use sandbox_exec with /workspace paths directly; do not ask the '
'user for sandbox parameters such as host_path unless they explicitly need a different directory.'
' A default workspace is mounted at /workspace for file tasks. When the user asks to read, create, or '
'modify local files in the working directory, use exec with /workspace paths directly; do not ask the '
'user for directory parameters unless they explicitly need a different directory.'
)
return guidance

View File

@@ -5,7 +5,7 @@ import copy
import typing
from .. import runner
from ..modelmgr import requester as modelmgr_requester
from ..tools.loaders.native import SANDBOX_EXEC_TOOL_NAME
from ..tools.loaders.native import EXEC_TOOL_NAME
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.provider.message as provider_message
import langbot_plugin.api.entities.builtin.rag.context as rag_context
@@ -37,7 +37,7 @@ class LocalAgentRunner(runner.RequestRunner):
) -> list[provider_message.Message]:
req_messages = query.prompt.messages.copy() + query.messages.copy()
if any(getattr(tool, 'name', None) == SANDBOX_EXEC_TOOL_NAME for tool in query.use_funcs or []):
if any(getattr(tool, 'name', None) == EXEC_TOOL_NAME for tool in query.use_funcs or []):
req_messages.append(
provider_message.Message(
role='system',

View File

@@ -1,77 +1,154 @@
from __future__ import annotations
import json
import os
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
from langbot_plugin.api.entities.events import pipeline_query
from langbot_plugin.box.models import BoxNetworkMode
from .. import loader
SANDBOX_EXEC_TOOL_NAME = 'sandbox_exec'
EXEC_TOOL_NAME = 'exec'
READ_TOOL_NAME = 'read'
WRITE_TOOL_NAME = 'write'
EDIT_TOOL_NAME = 'edit'
_ALL_TOOL_NAMES = {EXEC_TOOL_NAME, READ_TOOL_NAME, WRITE_TOOL_NAME, EDIT_TOOL_NAME}
class NativeToolLoader(loader.ToolLoader):
def __init__(self, ap):
super().__init__(ap)
self._sandbox_exec_tool: resource_tool.LLMTool | None = None
self._tools: list[resource_tool.LLMTool] | None = None
async def get_tools(self, bound_plugins: list[str] | None = None) -> list[resource_tool.LLMTool]:
if not self._is_sandbox_available():
return []
if self._sandbox_exec_tool is None:
self._sandbox_exec_tool = self._build_sandbox_exec_tool()
return [self._sandbox_exec_tool]
if self._tools is None:
self._tools = [
self._build_exec_tool(),
self._build_read_tool(),
self._build_write_tool(),
self._build_edit_tool(),
]
return list(self._tools)
async def has_tool(self, name: str) -> bool:
return name == SANDBOX_EXEC_TOOL_NAME and self._is_sandbox_available()
return name in _ALL_TOOL_NAMES and self._is_sandbox_available()
async def invoke_tool(self, name: str, parameters: dict, query: pipeline_query.Query):
if name != SANDBOX_EXEC_TOOL_NAME:
if name == EXEC_TOOL_NAME:
self.ap.logger.info(
'exec tool invoked: '
f'query_id={query.query_id} '
f'parameters={json.dumps(self._summarize_parameters(parameters), ensure_ascii=False)}'
)
return await self.ap.box_service.execute_tool(parameters, query)
elif name == READ_TOOL_NAME:
return await self._invoke_read(parameters, query)
elif name == WRITE_TOOL_NAME:
return await self._invoke_write(parameters, query)
elif name == EDIT_TOOL_NAME:
return await self._invoke_edit(parameters, query)
else:
raise ValueError(f'未找到工具: {name}')
self.ap.logger.info(
'sandbox_exec tool invoked: '
f'query_id={query.query_id} '
f'parameters={json.dumps(self._summarize_parameters(parameters), ensure_ascii=False)}'
)
return await self.ap.box_service.execute_sandbox_tool(parameters, query)
async def shutdown(self):
pass
# ── File tool implementations ────────────────────────────────────
def _resolve_host_path(self, sandbox_path: str) -> str:
"""Map a sandbox /workspace path to the host filesystem path."""
box_service = self.ap.box_service
host_root = box_service.default_host_workspace
if host_root is None:
raise ValueError('No default host workspace configured for file operations.')
mount_path = '/workspace'
if not sandbox_path.startswith(mount_path):
raise ValueError(f'Path must be under {mount_path}.')
relative = sandbox_path[len(mount_path):].lstrip('/')
host_path = os.path.realpath(os.path.join(host_root, relative))
if not (host_path == host_root or host_path.startswith(host_root + os.sep)):
raise ValueError('Path escapes the workspace boundary.')
return host_path
async def _invoke_read(self, parameters: dict, query: pipeline_query.Query) -> dict:
path = parameters['path']
self.ap.logger.info(f'read tool invoked: query_id={query.query_id} path={path}')
host_path = self._resolve_host_path(path)
if not os.path.exists(host_path):
return {'ok': False, 'error': f'File not found: {path}'}
if os.path.isdir(host_path):
entries = os.listdir(host_path)
return {'ok': True, 'content': '\n'.join(sorted(entries)), 'is_directory': True}
with open(host_path, 'r', errors='replace') as f:
content = f.read()
return {'ok': True, 'content': content}
async def _invoke_write(self, parameters: dict, query: pipeline_query.Query) -> dict:
path = parameters['path']
content = parameters['content']
self.ap.logger.info(f'write tool invoked: query_id={query.query_id} path={path} length={len(content)}')
host_path = self._resolve_host_path(path)
os.makedirs(os.path.dirname(host_path), exist_ok=True)
with open(host_path, 'w') as f:
f.write(content)
return {'ok': True, 'path': path}
async def _invoke_edit(self, parameters: dict, query: pipeline_query.Query) -> dict:
path = parameters['path']
old_string = parameters['old_string']
new_string = parameters['new_string']
self.ap.logger.info(
f'edit tool invoked: query_id={query.query_id} path={path} '
f'old_len={len(old_string)} new_len={len(new_string)}'
)
host_path = self._resolve_host_path(path)
if not os.path.isfile(host_path):
return {'ok': False, 'error': f'File not found: {path}'}
with open(host_path, 'r', errors='replace') as f:
content = f.read()
count = content.count(old_string)
if count == 0:
return {'ok': False, 'error': 'old_string not found in file.'}
if count > 1:
return {'ok': False, 'error': f'old_string matches {count} locations; provide a more unique string.'}
new_content = content.replace(old_string, new_string, 1)
with open(host_path, 'w') as f:
f.write(new_content)
return {'ok': True, 'path': path}
# ── Internals ────────────────────────────────────────────────────
def _is_sandbox_available(self) -> bool:
box_service = getattr(self.ap, 'box_service', None)
return bool(getattr(box_service, 'available', False))
def _build_sandbox_exec_tool(self) -> resource_tool.LLMTool:
def _build_exec_tool(self) -> resource_tool.LLMTool:
return resource_tool.LLMTool(
name=SANDBOX_EXEC_TOOL_NAME,
human_desc='Execute a command inside the LangBot Box sandbox',
name=EXEC_TOOL_NAME,
human_desc='Execute a command in an isolated environment',
description=(
'Run shell commands only inside the isolated LangBot Box sandbox. '
'Use this tool for local file edits, bash commands, Python execution, and exact calculations over '
'user-provided data that must not touch the host.'
'Run shell commands in an isolated execution environment. '
'Use this tool for bash commands, Python execution, and exact calculations '
'over user-provided data.'
),
parameters={
'type': 'object',
'properties': {
'cmd': {
'command': {
'type': 'string',
'description': 'Shell command to execute inside the sandbox.',
'description': 'Shell command to execute.',
},
'workdir': {
'type': 'string',
'description': (
'Absolute working directory path inside the sandbox. '
'Defaults to mount_path, or /workspace when mount_path is omitted.'
),
'default': '/workspace',
},
'mount_path': {
'type': 'string',
'description': (
'Absolute sandbox path where host_path is mounted. '
'Defaults to /workspace. When omitted, workdir defaults to the same path.'
'Working directory for the command. Defaults to /workspace.'
),
'default': '/workspace',
},
@@ -81,20 +158,90 @@ class NativeToolLoader(loader.ToolLoader):
'default': 30,
'minimum': 1,
},
'network': {
'type': 'string',
'description': 'Network policy for the sandbox session. Prefer off unless network is required.',
'enum': [e.value for e in BoxNetworkMode],
'default': 'off',
},
'env': {
'type': 'object',
'description': 'Optional environment variables to expose inside the sandbox.',
'description': 'Optional environment variables for the execution.',
'additionalProperties': {'type': 'string'},
'default': {},
},
'description': {
'type': 'string',
'description': 'Brief description of what this command does, for logging and audit.',
},
},
'required': ['cmd'],
'required': ['command'],
'additionalProperties': False,
},
func=lambda parameters: parameters,
)
def _build_read_tool(self) -> resource_tool.LLMTool:
return resource_tool.LLMTool(
name=READ_TOOL_NAME,
human_desc='Read a file from the workspace',
description='Read the contents of a file at the given path under /workspace.',
parameters={
'type': 'object',
'properties': {
'path': {
'type': 'string',
'description': 'Absolute path to the file (must be under /workspace).',
},
},
'required': ['path'],
'additionalProperties': False,
},
func=lambda parameters: parameters,
)
def _build_write_tool(self) -> resource_tool.LLMTool:
return resource_tool.LLMTool(
name=WRITE_TOOL_NAME,
human_desc='Write a file to the workspace',
description='Create or overwrite a file at the given path under /workspace with the provided content.',
parameters={
'type': 'object',
'properties': {
'path': {
'type': 'string',
'description': 'Absolute path to the file (must be under /workspace).',
},
'content': {
'type': 'string',
'description': 'Content to write to the file.',
},
},
'required': ['path', 'content'],
'additionalProperties': False,
},
func=lambda parameters: parameters,
)
def _build_edit_tool(self) -> resource_tool.LLMTool:
return resource_tool.LLMTool(
name=EDIT_TOOL_NAME,
human_desc='Edit a file in the workspace',
description=(
'Perform an exact string replacement in a file under /workspace. '
'The old_string must appear exactly once in the file.'
),
parameters={
'type': 'object',
'properties': {
'path': {
'type': 'string',
'description': 'Absolute path to the file (must be under /workspace).',
},
'old_string': {
'type': 'string',
'description': 'The exact string to find and replace.',
},
'new_string': {
'type': 'string',
'description': 'The replacement string.',
},
},
'required': ['path', 'old_string', 'new_string'],
'additionalProperties': False,
},
func=lambda parameters: parameters,
@@ -102,10 +249,10 @@ class NativeToolLoader(loader.ToolLoader):
def _summarize_parameters(self, parameters: dict) -> dict:
summary = dict(parameters)
cmd = str(summary.get('cmd', '')).strip()
cmd = str(summary.get('command', '')).strip()
if len(cmd) > 400:
cmd = f'{cmd[:397]}...'
summary['cmd'] = cmd
summary['command'] = cmd
env = summary.get('env')
if isinstance(env, dict):

View File

@@ -3,16 +3,12 @@ from __future__ import annotations
import typing
from typing import TYPE_CHECKING
from langbot.pkg.utils import importutil
from langbot.pkg.provider.tools import loaders
from langbot.pkg.provider.tools.loaders import mcp as mcp_loader, native as native_loader, plugin as plugin_loader
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
from langbot_plugin.api.entities.events import pipeline_query
if TYPE_CHECKING:
from ...core import app
importutil.import_modules_in_pkg(loaders)
from langbot.pkg.provider.tools.loaders import mcp as mcp_loader, native as native_loader, plugin as plugin_loader
class ToolManager:
@@ -28,6 +24,12 @@ class ToolManager:
self.ap = ap
async def initialize(self):
from langbot.pkg.utils import importutil
from langbot.pkg.provider.tools import loaders
from langbot.pkg.provider.tools.loaders import mcp as mcp_loader, native as native_loader, plugin as plugin_loader
importutil.import_modules_in_pkg(loaders)
self.native_tool_loader = native_loader.NativeToolLoader(self.ap)
await self.native_tool_loader.initialize()
self.plugin_tool_loader = plugin_loader.PluginToolLoader(self.ap)