feat(tools): add glob and grep native sandbox tools

Add file discovery and content search capabilities to the sandbox:
- glob: Find files by pattern (supports ** recursive matching)
- grep: Search file contents with regex patterns

Both tools respect skill package paths and include safety limits
(max 100 files for glob, max 200 matches for grep).

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
huanghuoguoguo
2026-05-13 21:18:12 +08:00
parent 7145447bcb
commit 892556da2a
2 changed files with 191 additions and 3 deletions

View File

@@ -13,8 +13,13 @@ EXEC_TOOL_NAME = 'exec'
READ_TOOL_NAME = 'read'
WRITE_TOOL_NAME = 'write'
EDIT_TOOL_NAME = 'edit'
GLOB_TOOL_NAME = 'glob'
GREP_TOOL_NAME = 'grep'
_ALL_TOOL_NAMES = {EXEC_TOOL_NAME, READ_TOOL_NAME, WRITE_TOOL_NAME, EDIT_TOOL_NAME}
_ALL_TOOL_NAMES = {EXEC_TOOL_NAME, READ_TOOL_NAME, WRITE_TOOL_NAME, EDIT_TOOL_NAME, GLOB_TOOL_NAME, GREP_TOOL_NAME}
# Skip these dirs during grep walk to avoid noise
_SKIP_DIRS = {'.git', 'node_modules', '__pycache__', '.venv', 'venv', '.tox', 'dist', 'build'}
class NativeToolLoader(loader.ToolLoader):
@@ -31,6 +36,8 @@ class NativeToolLoader(loader.ToolLoader):
self._build_read_tool(),
self._build_write_tool(),
self._build_edit_tool(),
self._build_glob_tool(),
self._build_grep_tool(),
]
return list(self._tools)
@@ -51,6 +58,10 @@ class NativeToolLoader(loader.ToolLoader):
return await self._invoke_write(parameters, query)
if name == EDIT_TOOL_NAME:
return await self._invoke_edit(parameters, query)
if name == GLOB_TOOL_NAME:
return await self._invoke_glob(parameters, query)
if name == GREP_TOOL_NAME:
return await self._invoke_grep(parameters, query)
raise ValueError(f'未找到工具: {name}')
async def shutdown(self):
@@ -344,6 +355,183 @@ class NativeToolLoader(loader.ToolLoader):
func=lambda parameters: parameters,
)
def _build_glob_tool(self) -> resource_tool.LLMTool:
return resource_tool.LLMTool(
name=GLOB_TOOL_NAME,
human_desc='Find files matching a glob pattern',
description=(
'Find files matching a glob pattern under /workspace. '
'Supports ** for recursive matching (e.g. **/*.py). '
'Results are sorted by modification time (newest first). '
'Visible and activated skill packages can be searched through /workspace/.skills/<skill-name>/...'
),
parameters={
'type': 'object',
'properties': {
'pattern': {
'type': 'string',
'description': 'Glob pattern, e.g. **/*.py or src/**/*.ts',
},
'path': {
'type': 'string',
'description': 'Directory to search in (must be under /workspace, default: /workspace)',
'default': '/workspace',
},
},
'required': ['pattern'],
'additionalProperties': False,
},
func=lambda parameters: parameters,
)
def _build_grep_tool(self) -> resource_tool.LLMTool:
return resource_tool.LLMTool(
name=GREP_TOOL_NAME,
human_desc='Search file contents with regex',
description=(
'Search file contents with regex pattern under /workspace. '
'Returns matching lines with file path and line number. '
'Visible and activated skill packages can be searched through /workspace/.skills/<skill-name>/...'
),
parameters={
'type': 'object',
'properties': {
'pattern': {
'type': 'string',
'description': 'Regex pattern to search for',
},
'path': {
'type': 'string',
'description': 'File or directory to search (must be under /workspace, default: /workspace)',
'default': '/workspace',
},
'include': {
'type': 'string',
'description': 'Only search files matching this glob (e.g. *.py)',
},
},
'required': ['pattern'],
'additionalProperties': False,
},
func=lambda parameters: parameters,
)
async def _invoke_glob(self, parameters: dict, query: pipeline_query.Query) -> dict:
pattern = parameters['pattern']
path = str(parameters.get('path', '/workspace') or '/workspace')
self.ap.logger.info(f'glob tool invoked: query_id={query.query_id} pattern={pattern} path={path}')
host_path, _selected_skill = self._resolve_host_path(
query,
path,
include_visible=True,
include_activated=True,
)
if not os.path.isdir(host_path):
return {'ok': False, 'error': f'Path is not a directory: {path}'}
from pathlib import Path
base = Path(host_path)
hits = list(base.rglob(pattern))
# Filter out skipped directories
hits = [h for h in hits if not any(skip in h.parts for skip in _SKIP_DIRS)]
# Sort by mtime, newest first
hits.sort(key=lambda p: p.stat().st_mtime if p.exists() else 0, reverse=True)
total = len(hits)
shown = hits[:100]
# Convert back to sandbox paths
sandbox_paths = []
for h in shown:
rel = os.path.relpath(str(h), host_path)
sandbox_path = os.path.join(path, rel)
sandbox_paths.append(sandbox_path)
result_lines = sandbox_paths
result = '\n'.join(result_lines)
if total > 100:
result += f'\n... ({total} matches, showing first 100)'
return {'ok': True, 'matches': result_lines, 'total': total, 'truncated': total > 100}
async def _invoke_grep(self, parameters: dict, query: pipeline_query.Query) -> dict:
pattern = parameters['pattern']
path = str(parameters.get('path', '/workspace') or '/workspace')
include = parameters.get('include')
self.ap.logger.info(f'grep tool invoked: query_id={query.query_id} pattern={pattern} path={path}')
import re
from pathlib import Path
try:
regex = re.compile(pattern)
except re.error as e:
return {'ok': False, 'error': f'Invalid regex: {e}'}
host_path, _selected_skill = self._resolve_host_path(
query,
path,
include_visible=True,
include_activated=True,
)
if not os.path.exists(host_path):
return {'ok': False, 'error': f'Path not found: {path}'}
base = Path(host_path)
if base.is_file():
files = [base]
else:
files = self._grep_walk(base, include)
matches = []
for fp in files:
try:
text = fp.read_text(errors='ignore')
except OSError:
continue
for lineno, line in enumerate(text.splitlines(), 1):
if regex.search(line):
rel = os.path.relpath(str(fp), host_path)
sandbox_path = os.path.join(path, rel)
matches.append({
'file': sandbox_path,
'line': lineno,
'content': line.rstrip(),
})
if len(matches) >= 200:
break
if len(matches) >= 200:
break
return {
'ok': True,
'matches': matches,
'total': len(matches),
'truncated': len(matches) >= 200,
}
@staticmethod
def _grep_walk(root, include: str | None) -> list:
"""Walk dir tree for grep, skipping junk dirs."""
from pathlib import Path
results = []
for item in root.rglob(include or '*'):
if any(skip in item.parts for skip in _SKIP_DIRS):
continue
if item.is_file():
results.append(item)
if len(results) >= 5000:
break
return results
def _summarize_parameters(self, parameters: dict) -> dict:
summary = dict(parameters)
cmd = str(summary.get('command', '')).strip()

View File

@@ -47,10 +47,10 @@ class ToolManager:
# Log native (sandbox) tool availability once at startup
box_service = getattr(self.ap, 'box_service', None)
if box_service and getattr(box_service, 'available', False):
self.ap.logger.info('Native sandbox tools (exec/read/write/edit) are available.')
self.ap.logger.info('Native sandbox tools (exec/read/write/edit/glob/grep) are available.')
else:
self.ap.logger.warning(
'Native sandbox tools (exec/read/write/edit) are NOT available. '
'Native sandbox tools (exec/read/write/edit/glob/grep) are NOT available. '
'Box runtime is not connected — the LLM will not have access to code execution tools.'
)