Compare commits

..

5 Commits

Author SHA1 Message Date
huanghuoguoguo
282c2d7f54 test(tools): cover runtime hardening edge cases 2026-06-15 10:00:52 +08:00
huanghuoguoguo
9fa3251f3d fix(tools): decouple runtime from agent runner 2026-06-14 21:15:21 +08:00
huanghuoguoguo
64b7e9c509 fix(tools): clear stale Python workspace env locks 2026-06-14 11:32:10 +08:00
huanghuoguoguo
7b67dcc302 fix(tools): bootstrap Python workspaces with available interpreter 2026-06-14 11:32:10 +08:00
huanghuoguoguo
a60827f221 fix(tools): harden agent runner tool runtimes 2026-06-14 11:32:10 +08:00
12 changed files with 801 additions and 205 deletions

View File

@@ -146,13 +146,19 @@ def wrap_python_command_with_env(command: str, *, mount_path: str = '/workspace'
_LB_PIP_CACHE_DIR="{mount_path}/.cache/pip"
mkdir -p "$_LB_META_DIR" "$_LB_TMP_DIR" "$_LB_PIP_CACHE_DIR"
_LB_SYSTEM_PYTHON="$(command -v python3 || command -v python || true)"
if [ -z "$_LB_SYSTEM_PYTHON" ]; then
echo "python3 or python is required to prepare the workspace Python environment" >&2
exit 127
fi
export TMPDIR="$_LB_TMP_DIR"
export TEMP="$_LB_TMP_DIR"
export TMP="$_LB_TMP_DIR"
export PIP_CACHE_DIR="$_LB_PIP_CACHE_DIR"
_lb_python_meta() {{
python - <<'PY'
"$_LB_SYSTEM_PYTHON" - <<'PY'
import hashlib
import json
import os
@@ -201,15 +207,26 @@ def wrap_python_command_with_env(command: str, *, mount_path: str = '/workspace'
_LB_LOCK_WAIT=0
while ! mkdir "$_LB_LOCK_DIR" 2>/dev/null; do
if [ "$_LB_LOCK_WAIT" -ge 120 ]; then
_LB_LOCK_OWNER="$(cat "$_LB_LOCK_DIR/pid" 2>/dev/null || true)"
if [ -n "$_LB_LOCK_OWNER" ] && kill -0 "$_LB_LOCK_OWNER" 2>/dev/null; then
echo "Timed out waiting for active Python environment lock: $_LB_LOCK_DIR" >&2
exit 1
fi
echo "Timed out waiting for Python environment lock, clearing stale lock: $_LB_LOCK_DIR" >&2
rm -rf "$_LB_LOCK_DIR" 2>/dev/null || true
if mkdir "$_LB_LOCK_DIR" 2>/dev/null; then
break
fi
echo "Timed out waiting for Python environment lock: $_LB_LOCK_DIR" >&2
exit 1
fi
sleep 1
_LB_LOCK_WAIT=$((_LB_LOCK_WAIT + 1))
done
printf '%s\\n' "$$" > "$_LB_LOCK_DIR/pid" 2>/dev/null || true
_lb_cleanup_lock() {{
rmdir "$_LB_LOCK_DIR" >/dev/null 2>&1 || true
rm -rf "$_LB_LOCK_DIR" >/dev/null 2>&1 || true
}}
trap _lb_cleanup_lock EXIT INT TERM
@@ -225,7 +242,7 @@ def wrap_python_command_with_env(command: str, *, mount_path: str = '/workspace'
if [ "$_LB_NEEDS_BOOTSTRAP" -eq 1 ]; then
rm -rf "$_LB_VENV_DIR"
python -m venv "$_LB_VENV_DIR"
"$_LB_SYSTEM_PYTHON" -m venv "$_LB_VENV_DIR"
. "$_LB_VENV_DIR/bin/activate"
python -m pip install --upgrade pip setuptools wheel
if [ -f "{mount_path}/requirements.txt" ]; then

View File

@@ -197,11 +197,10 @@ class PluginRuntimeConnector(ManagedRuntimeConnector):
self,
file_bytes: bytes,
task_context: taskmgr.TaskContext | None,
) -> tuple[str | None, str | None, str | None]:
) -> tuple[str | None, str | None]:
"""Extract plugin identity and dependency metadata from a plugin package."""
plugin_author = None
plugin_name = None
plugin_version = None
try:
with zipfile.ZipFile(io.BytesIO(file_bytes)) as zf:
@@ -210,7 +209,6 @@ class PluginRuntimeConnector(ManagedRuntimeConnector):
metadata = manifest.get('metadata', {})
plugin_author = metadata.get('author')
plugin_name = metadata.get('name')
plugin_version = metadata.get('version')
except Exception:
pass
@@ -229,7 +227,7 @@ class PluginRuntimeConnector(ManagedRuntimeConnector):
except Exception:
pass
return plugin_author, plugin_name, plugin_version
return plugin_author, plugin_name
async def _install_mcp_from_marketplace(
self,
@@ -371,7 +369,6 @@ class PluginRuntimeConnector(ManagedRuntimeConnector):
):
plugin_author = install_info.get('plugin_author')
plugin_name = install_info.get('plugin_name')
plugin_file_transferred = False
if install_source == PluginInstallSource.MARKETPLACE:
# Handle marketplace plugin/mcp/skill installation
@@ -466,18 +463,9 @@ class PluginRuntimeConnector(ManagedRuntimeConnector):
)
file_bytes = download_resp.content
plugin_author, plugin_name, plugin_version = self._inspect_plugin_package(
file_bytes,
task_context,
)
if task_context is not None and plugin_author and plugin_name:
task_context.metadata['plugin_name'] = f'{plugin_author}/{plugin_name}'
if task_context is not None and plugin_version:
task_context.metadata['plugin_version'] = plugin_version
self._inspect_plugin_package(file_bytes, task_context)
file_key = await self.handler.send_file(file_bytes, 'lbpkg')
install_info['plugin_file_key'] = file_key
install_source = PluginInstallSource.LOCAL
plugin_file_transferred = True
self.ap.logger.info(f'Transfered file {file_key} to plugin runtime')
# Continue to install via runtime
else:
@@ -493,14 +481,12 @@ class PluginRuntimeConnector(ManagedRuntimeConnector):
mcp_resp.raise_for_status()
raise Exception(f'Failed to get MCP {plugin_author}/{plugin_name}')
if install_source == PluginInstallSource.LOCAL and not plugin_file_transferred:
if install_source == PluginInstallSource.LOCAL:
# transfer file before install
file_bytes = install_info['plugin_file']
plugin_author, plugin_name, plugin_version = self._inspect_plugin_package(file_bytes, task_context)
plugin_author, plugin_name = self._inspect_plugin_package(file_bytes, task_context)
if task_context is not None and plugin_author and plugin_name:
task_context.metadata['plugin_name'] = f'{plugin_author}/{plugin_name}'
if task_context is not None and plugin_version:
task_context.metadata['plugin_version'] = plugin_version
file_key = await self.handler.send_file(file_bytes, 'lbpkg')
install_info['plugin_file_key'] = file_key
del install_info['plugin_file']
@@ -537,11 +523,9 @@ class PluginRuntimeConnector(ManagedRuntimeConnector):
task_context.metadata['download_speed'] = downloaded / elapsed if elapsed > 0 else 0
file_bytes = b''.join(chunks)
plugin_author, plugin_name, plugin_version = self._inspect_plugin_package(file_bytes, task_context)
plugin_author, plugin_name = self._inspect_plugin_package(file_bytes, task_context)
if task_context is not None and plugin_author and plugin_name:
task_context.metadata['plugin_name'] = f'{plugin_author}/{plugin_name}'
if task_context is not None and plugin_version:
task_context.metadata['plugin_version'] = plugin_version
file_key = await self.handler.send_file(file_bytes, 'lbpkg')
install_info['plugin_file_key'] = file_key
self.ap.logger.info(f'Transfered file {file_key} to plugin runtime')

View File

@@ -0,0 +1,18 @@
from __future__ import annotations
from typing import Any
async def is_box_backend_available(ap: Any) -> bool:
"""Return whether the configured Box backend is ready for tool execution."""
box_service = getattr(ap, 'box_service', None)
if box_service is None:
return False
if not getattr(box_service, 'available', False):
return False
try:
status = await box_service.get_status()
backend_info = status.get('backend', {})
return bool(backend_info.get('available', False))
except Exception:
return False

View File

@@ -5,6 +5,8 @@ import asyncio
import os
import shutil
import shlex
import threading
from contextlib import suppress
from typing import TYPE_CHECKING, Any
import pydantic
@@ -18,12 +20,26 @@ from ....box.workspace import (
rewrite_mounted_path,
rewrite_venv_command,
unwrap_venv_path,
wrap_python_command_with_env,
)
if TYPE_CHECKING:
from .mcp import RuntimeMCPSession
_WORKSPACE_COPY_LOCKS: dict[str, threading.Lock] = {}
_WORKSPACE_COPY_LOCKS_GUARD = threading.Lock()
def _workspace_copy_lock(path: str) -> threading.Lock:
with _WORKSPACE_COPY_LOCKS_GUARD:
lock = _WORKSPACE_COPY_LOCKS.get(path)
if lock is None:
lock = threading.Lock()
_WORKSPACE_COPY_LOCKS[path] = lock
return lock
class MCPSessionErrorPhase(enum.Enum):
"""Which phase of the MCP lifecycle failed."""
@@ -49,7 +65,7 @@ class MCPServerBoxConfig(pydantic.BaseModel):
host_path: str | None = None
host_path_mode: str = 'ro' # MCP servers default to read-write mount only when explicitly requested
env: dict[str, str] = pydantic.Field(default_factory=dict)
startup_timeout_sec: int = 120 # Longer default to allow dependency bootstrap
startup_timeout_sec: int = 300 # First Docker bootstrap may need to build a venv and install MCP deps.
cpus: float | None = None
memory_mb: int | None = None
pids_limit: int | None = None
@@ -128,6 +144,7 @@ class BoxStdioSessionRuntime:
workspace = self._build_workspace(host_path=None)
host_path = self.resolve_host_path()
process_cwd = '/workspace'
install_cmd: str | None = None
try:
await workspace.create_session()
@@ -168,6 +185,8 @@ class BoxStdioSessionRuntime:
env=self.server_config.get('env', {}),
cwd=process_cwd,
)
if install_cmd:
payload = self._wrap_process_payload_with_python_env(payload, process_cwd)
payload['process_id'] = self.process_id
await workspace.box_service.start_managed_process(workspace.session_id, payload)
except Exception:
@@ -253,14 +272,42 @@ class BoxStdioSessionRuntime:
@staticmethod
def _copy_workspace_tree(source_path: str, process_host_root: str, process_host_workspace: str) -> None:
shutil.rmtree(process_host_root, ignore_errors=True)
os.makedirs(process_host_root, exist_ok=True)
shutil.copytree(
source_path,
process_host_workspace,
symlinks=True,
ignore=shutil.ignore_patterns('.git', '__pycache__', '.pytest_cache', '.mypy_cache', '.ruff_cache'),
)
# Docker-backed bootstrap writes root-owned runtime directories such as
# .venv/.tmp into the staged workspace. The host process may not be able
# to delete them, so refresh source files in place and preserve runtime
# directories instead of rmtree'ing the whole staging root.
with _workspace_copy_lock(process_host_root):
preserved_names = {'.venv', 'venv', 'env', '.cache', '.tmp', '.langbot'}
os.makedirs(process_host_workspace, exist_ok=True)
for name in os.listdir(process_host_workspace):
if name in preserved_names:
continue
path = os.path.join(process_host_workspace, name)
if os.path.isdir(path) and not os.path.islink(path):
shutil.rmtree(path, ignore_errors=True)
else:
# The entry may disappear between listdir and unlink if cleanup races us.
with suppress(FileNotFoundError):
os.unlink(path)
shutil.copytree(
source_path,
process_host_workspace,
symlinks=True,
dirs_exist_ok=True,
ignore=shutil.ignore_patterns(
'.git',
'__pycache__',
'.pytest_cache',
'.mypy_cache',
'.ruff_cache',
'.venv',
'venv',
'env',
'.cache',
'.tmp',
'.langbot',
),
)
async def _cleanup_staged_workspace(self) -> None:
if not self.resolve_host_path():
@@ -343,23 +390,25 @@ class BoxStdioSessionRuntime:
@staticmethod
def detect_install_command(host_path: str, workspace_path: str = '/workspace') -> str | None:
workspace_kind = classify_python_workspace(host_path)
quoted_workspace_path = shlex.quote(workspace_path)
if workspace_kind == 'package':
return (
'mkdir -p /opt/_lb_src'
f' && tar -C {quoted_workspace_path}'
' --exclude=.venv --exclude=.git --exclude=__pycache__'
' --exclude=node_modules --exclude=.tox --exclude=.nox'
' --exclude="*.egg-info" --exclude=.uv-cache'
' -cf - .'
' | tar -C /opt/_lb_src -xf -'
' && pip install --no-cache-dir /opt/_lb_src'
' && rm -rf /opt/_lb_src'
)
if workspace_kind == 'requirements':
return f'pip install --no-cache-dir -r {quoted_workspace_path}/requirements.txt'
if workspace_kind in {'package', 'requirements'}:
return wrap_python_command_with_env('python -c "pass"', mount_path=workspace_path).rstrip()
return None
@staticmethod
def _wrap_process_payload_with_python_env(payload: dict[str, Any], workspace_path: str) -> dict[str, Any]:
"""Start a prepared Python workspace without writing bootstrap output to MCP stdio."""
workspace_root = workspace_path.rstrip('/') or '/workspace'
venv_dir = f'{workspace_root}/.venv'
venv_bin = f'{venv_dir}/bin'
command = ' '.join([shlex.quote(payload['command']), *[shlex.quote(arg) for arg in payload.get('args', [])]])
wrapped = dict(payload)
wrapped['command'] = 'sh'
wrapped['args'] = [
'-lc',
(f'export VIRTUAL_ENV={shlex.quote(venv_dir)}; export PATH={shlex.quote(venv_bin)}:$PATH; exec {command}'),
]
return wrapped
def build_box_session_payload(self, session_id: str, host_path: str | None = None) -> dict[str, Any]:
workspace = self._build_workspace()
workspace.session_id = session_id

View File

@@ -8,6 +8,7 @@ from langbot_plugin.api.entities.events import pipeline_query
from .. import loader
from ..errors import ToolNotFoundError
from .availability import is_box_backend_available
from . import skill as skill_loader
EXEC_TOOL_NAME = 'exec'
@@ -22,6 +23,15 @@ _ALL_TOOL_NAMES = {EXEC_TOOL_NAME, READ_TOOL_NAME, WRITE_TOOL_NAME, EDIT_TOOL_NA
# Skip these dirs during grep walk to avoid noise
_SKIP_DIRS = {'.git', 'node_modules', '__pycache__', '.venv', 'venv', '.tox', 'dist', 'build'}
_DEFAULT_READ_MAX_LINES = 2000
_MAX_READ_MAX_LINES = 10000
_DEFAULT_TOOL_RESULT_MAX_BYTES = 50 * 1024
_BOX_FILE_SCRIPT_MAX_BYTES = 2048
_GLOB_MAX_MATCHES = 100
_GREP_MAX_MATCHES = 200
_GREP_MAX_FILES = 5000
_GREP_MAX_LINE_CHARS = 500
class NativeToolLoader(loader.ToolLoader):
def __init__(self, ap):
@@ -43,18 +53,7 @@ class NativeToolLoader(loader.ToolLoader):
async def _check_backend_available(self) -> bool:
"""Check if the box backend is truly available (not just the runtime)."""
box_service = getattr(self.ap, 'box_service', None)
if box_service is None:
return False
if not getattr(box_service, 'available', False):
return False
# Check if backend is truly available via get_status
try:
status = await box_service.get_status()
backend_info = status.get('backend', {})
return backend_info.get('available', False)
except Exception:
return False
return await is_box_backend_available(self.ap)
async def get_tools(self, bound_plugins: list[str] | None = None) -> list[resource_tool.LLMTool]:
if not self._is_sandbox_available():
@@ -139,6 +138,7 @@ class NativeToolLoader(loader.ToolLoader):
# via execute_tool. Skills are mounted at /workspace/.skills/{name}/
# via extra_mounts built by BoxService.
result = await self.ap.box_service.execute_tool(parameters, query)
result = self._normalize_exec_result(result)
if selected_skill is not None:
self._refresh_skill_from_disk(selected_skill)
@@ -227,19 +227,65 @@ class NativeToolLoader(loader.ToolLoader):
except Exception:
return {'ok': False, 'error': stdout or 'Box file operation returned no result'}
async def _read_workspace_via_box(self, path: str, query: pipeline_query.Query) -> dict:
async def _read_workspace_via_box(self, path: str, parameters: dict, query: pipeline_query.Query) -> dict:
offset = self._positive_int(parameters.get('offset'), default=1)
max_lines = self._positive_int(
parameters.get('limit'),
default=_DEFAULT_READ_MAX_LINES,
max_value=_MAX_READ_MAX_LINES,
)
# Box file fallback returns through exec stdout, which is already capped
# by BoxService. Keep this payload small enough to remain valid JSON.
max_bytes = min(
self._positive_int(parameters.get('max_bytes'), default=_DEFAULT_TOOL_RESULT_MAX_BYTES),
_BOX_FILE_SCRIPT_MAX_BYTES,
)
script = f"""
import json, os
path = {json.dumps(path)}
offset = {offset}
max_lines = {max_lines}
max_bytes = {max_bytes}
if not path.startswith('/workspace'):
print(json.dumps({{'ok': False, 'error': 'Path must be under /workspace.'}}))
elif not os.path.exists(path):
print(json.dumps({{'ok': False, 'error': f'File not found: {{path}}'}}))
elif os.path.isdir(path):
print(json.dumps({{'ok': True, 'content': '\\n'.join(sorted(os.listdir(path))), 'is_directory': True}}))
entries = sorted(os.listdir(path))
content = '\\n'.join(entries)
print(json.dumps({{'ok': True, 'content': content, 'is_directory': True, 'total': len(entries), 'truncated': False}}))
else:
lines = []
output_bytes = 0
end_line = offset - 1
truncated = False
next_offset = None
with open(path, 'r', encoding='utf-8', errors='replace') as f:
print(json.dumps({{'ok': True, 'content': f.read()}}))
for line_number, line in enumerate(f, 1):
if line_number < offset:
continue
if len(lines) >= max_lines:
truncated = True
next_offset = line_number
break
line_bytes = len(line.encode('utf-8'))
if output_bytes + line_bytes > max_bytes:
truncated = True
next_offset = line_number
break
lines.append(line.rstrip('\\n'))
output_bytes += line_bytes
end_line = line_number
print(json.dumps({{
'ok': True,
'content': '\\n'.join(lines),
'truncated': truncated,
'start_line': offset,
'end_line': end_line,
'next_offset': next_offset,
'max_lines': max_lines,
'max_bytes': max_bytes,
}}))
""".strip()
return await self._run_workspace_file_script(script, query)
@@ -307,12 +353,27 @@ else:
if not any(part in skip_dirs for part in item.parts)
]
hits.sort(key=lambda item: item.stat().st_mtime if item.exists() else 0, reverse=True)
shown = hits[:100]
shown = hits[:{_GLOB_MAX_MATCHES}]
matches = []
output_bytes = 0
truncated_by_bytes = False
for item in shown:
rel = os.path.relpath(str(item), path)
matches.append(os.path.join(path, rel).replace(os.sep, '/'))
print(json.dumps({{'ok': True, 'matches': matches, 'total': len(hits), 'truncated': len(hits) > 100}}))
sandbox_path = os.path.join(path, rel).replace(os.sep, '/')
entry_bytes = len(sandbox_path.encode('utf-8')) + (1 if matches else 0)
if output_bytes + entry_bytes > {_DEFAULT_TOOL_RESULT_MAX_BYTES}:
truncated_by_bytes = True
break
matches.append(sandbox_path)
output_bytes += entry_bytes
print(json.dumps({{
'ok': True,
'matches': matches,
'preview': '\\n'.join(matches),
'total': len(hits),
'truncated': len(hits) > len(matches) or truncated_by_bytes,
'truncated_by': 'bytes' if truncated_by_bytes else ('matches' if len(hits) > len(matches) else None),
}}))
""".strip()
return await self._run_workspace_file_script(script, query)
@@ -350,29 +411,54 @@ else:
continue
if item.is_file():
files.append(item)
if len(files) >= 5000:
if len(files) >= {_GREP_MAX_FILES}:
break
matches = []
output_bytes = 0
truncated_by = None
for fp in files:
try:
text = fp.read_text(errors='ignore')
handle = fp.open('r', encoding='utf-8', errors='ignore')
except OSError:
continue
for lineno, line in enumerate(text.splitlines(), 1):
if regex.search(line):
if base.is_file():
file_path = path
else:
rel = os.path.relpath(str(fp), path)
file_path = os.path.join(path, rel).replace(os.sep, '/')
matches.append({{'file': file_path, 'line': lineno, 'content': line.rstrip()}})
if len(matches) >= 200:
break
if len(matches) >= 200:
with handle:
for lineno, line in enumerate(handle, 1):
if regex.search(line):
if base.is_file():
file_path = path
else:
rel = os.path.relpath(str(fp), path)
file_path = os.path.join(path, rel).replace(os.sep, '/')
content = line.rstrip()
line_truncated = False
if len(content) > {_GREP_MAX_LINE_CHARS}:
content = content[:{_GREP_MAX_LINE_CHARS}] + '... [truncated]'
line_truncated = True
entry = {{'file': file_path, 'line': lineno, 'content': content}}
entry_bytes = len(json.dumps(entry, ensure_ascii=False).encode('utf-8')) + 1
if output_bytes + entry_bytes > {_DEFAULT_TOOL_RESULT_MAX_BYTES}:
truncated_by = 'bytes'
break
if line_truncated and truncated_by is None:
truncated_by = 'line'
matches.append(entry)
output_bytes += entry_bytes
if len(matches) >= {_GREP_MAX_MATCHES}:
truncated_by = truncated_by or 'matches'
break
if truncated_by == 'bytes' or len(matches) >= {_GREP_MAX_MATCHES}:
break
if truncated_by == 'bytes' or len(matches) >= {_GREP_MAX_MATCHES}:
break
print(json.dumps({{'ok': True, 'matches': matches, 'total': len(matches), 'truncated': len(matches) >= 200}}))
print(json.dumps({{
'ok': True,
'matches': matches,
'total': len(matches),
'truncated': truncated_by is not None,
'truncated_by': truncated_by,
}}))
""".strip()
return await self._run_workspace_file_script(script, query)
@@ -387,14 +473,20 @@ else:
)
if skill_request is not None and hasattr(self.ap.box_service, 'read_skill_file'):
selected_skill, relative = skill_request
host_path = self._resolve_skill_host_path(selected_skill, relative)
if host_path and os.path.exists(host_path):
if os.path.isdir(host_path):
return self._build_directory_result(os.listdir(host_path))
return self._read_text_file_preview(host_path, parameters)
try:
result = await self.ap.box_service.read_skill_file(selected_skill['name'], relative)
return {'ok': True, 'content': result.get('content', '')}
return self._build_read_result_from_text(str(result.get('content', '')), parameters)
except Exception:
try:
result = await self.ap.box_service.list_skill_files(selected_skill['name'], relative)
entries = [entry['name'] for entry in result.get('entries', [])]
return {'ok': True, 'content': '\n'.join(sorted(entries)), 'is_directory': True}
return self._build_directory_result(entries)
except Exception as exc:
return {'ok': False, 'error': str(exc)}
@@ -405,15 +497,13 @@ else:
include_activated=True,
)
if self._should_use_box_workspace_files(selected_skill):
return await self._read_workspace_via_box(path, query)
return await self._read_workspace_via_box(path, parameters, query)
if not os.path.exists(host_path):
return {'ok': False, 'error': f'File not found: {path}'}
if os.path.isdir(host_path):
entries = os.listdir(host_path)
return {'ok': True, 'content': '\n'.join(sorted(entries)), 'is_directory': True}
with open(host_path, 'r', errors='replace') as f:
content = f.read()
return {'ok': True, 'content': content}
return self._build_directory_result(entries)
return self._read_text_file_preview(host_path, parameters)
async def _invoke_write(self, parameters: dict, query: pipeline_query.Query) -> dict:
path = parameters['path']
@@ -584,6 +674,28 @@ else:
'type': 'string',
'description': 'Absolute path to the file (must be under /workspace).',
},
'offset': {
'type': 'integer',
'description': '1-indexed line number to start reading from. Defaults to 1.',
'default': 1,
'minimum': 1,
},
'limit': {
'type': 'integer',
'description': f'Maximum number of lines to return. Defaults to {_DEFAULT_READ_MAX_LINES}.',
'default': _DEFAULT_READ_MAX_LINES,
'minimum': 1,
'maximum': _MAX_READ_MAX_LINES,
},
'max_bytes': {
'type': 'integer',
'description': (
f'Maximum bytes of file content to return. Defaults to {_DEFAULT_TOOL_RESULT_MAX_BYTES}.'
),
'default': _DEFAULT_TOOL_RESULT_MAX_BYTES,
'minimum': 1,
'maximum': _DEFAULT_TOOL_RESULT_MAX_BYTES,
},
},
'required': ['path'],
'additionalProperties': False,
@@ -740,22 +852,30 @@ else:
hits.sort(key=lambda p: p.stat().st_mtime if p.exists() else 0, reverse=True)
total = len(hits)
shown = hits[:100]
shown = hits[:_GLOB_MAX_MATCHES]
# Convert back to sandbox paths
sandbox_paths = []
output_bytes = 0
truncated_by_bytes = False
for h in shown:
rel = os.path.relpath(str(h), host_path)
sandbox_path = os.path.join(path, rel)
entry_bytes = len(sandbox_path.encode('utf-8')) + (1 if sandbox_paths else 0)
if output_bytes + entry_bytes > _DEFAULT_TOOL_RESULT_MAX_BYTES:
truncated_by_bytes = True
break
sandbox_paths.append(sandbox_path)
output_bytes += entry_bytes
result_lines = sandbox_paths
result = '\n'.join(result_lines)
if total > 100:
result += f'\n... ({total} matches, showing first 100)'
return {'ok': True, 'matches': result_lines, 'total': total, 'truncated': total > 100}
return {
'ok': True,
'matches': sandbox_paths,
'preview': '\n'.join(sandbox_paths),
'total': total,
'truncated': total > len(sandbox_paths) or truncated_by_bytes,
'truncated_by': 'bytes' if truncated_by_bytes else ('matches' if total > len(sandbox_paths) else None),
}
async def _invoke_grep(self, parameters: dict, query: pipeline_query.Query) -> dict:
pattern = parameters['pattern']
@@ -791,32 +911,46 @@ else:
files = self._grep_walk(base, include)
matches = []
output_bytes = 0
truncated_by = None
for fp in files:
try:
text = fp.read_text(errors='ignore')
handle = fp.open('r', encoding='utf-8', errors='ignore')
except OSError:
continue
for lineno, line in enumerate(text.splitlines(), 1):
if regex.search(line):
rel = os.path.relpath(str(fp), host_path)
sandbox_path = os.path.join(path, rel)
matches.append(
{
with handle:
for lineno, line in enumerate(handle, 1):
if regex.search(line):
rel = os.path.relpath(str(fp), host_path)
sandbox_path = os.path.join(path, rel)
content, line_truncated = self._truncate_grep_line(line.rstrip())
entry = {
'file': sandbox_path,
'line': lineno,
'content': line.rstrip(),
'content': content,
}
)
if len(matches) >= 200:
break
if len(matches) >= 200:
entry_bytes = len(json.dumps(entry, ensure_ascii=False).encode('utf-8')) + 1
if output_bytes + entry_bytes > _DEFAULT_TOOL_RESULT_MAX_BYTES:
truncated_by = 'bytes'
break
if line_truncated and truncated_by is None:
truncated_by = 'line'
matches.append(entry)
output_bytes += entry_bytes
if len(matches) >= _GREP_MAX_MATCHES:
truncated_by = truncated_by or 'matches'
break
if truncated_by == 'bytes' or len(matches) >= _GREP_MAX_MATCHES:
break
if truncated_by == 'bytes' or len(matches) >= _GREP_MAX_MATCHES:
break
return {
'ok': True,
'matches': matches,
'total': len(matches),
'truncated': len(matches) >= 200,
'truncated': truncated_by is not None,
'truncated_by': truncated_by,
}
@staticmethod
@@ -828,10 +962,207 @@ else:
continue
if item.is_file():
results.append(item)
if len(results) >= 5000:
if len(results) >= _GREP_MAX_FILES:
break
return results
@staticmethod
def _resolve_skill_host_path(selected_skill: dict, relative: str) -> str | None:
package_root = str(selected_skill.get('package_root', '') or '').strip()
if not package_root:
return None
host_root = os.path.realpath(package_root)
host_path = os.path.realpath(os.path.join(host_root, relative))
if not (host_path == host_root or host_path.startswith(host_root + os.sep)):
raise ValueError('Path escapes the skill package boundary.')
return host_path
def _normalize_exec_result(self, result: dict) -> dict:
normalized = dict(result)
stdout = str(normalized.get('stdout') or '')
stderr = str(normalized.get('stderr') or '')
stdout, stdout_capped = self._truncate_text_to_bytes_with_flag(stdout, _DEFAULT_TOOL_RESULT_MAX_BYTES)
stderr, stderr_capped = self._truncate_text_to_bytes_with_flag(stderr, _DEFAULT_TOOL_RESULT_MAX_BYTES)
normalized['stdout'] = stdout
normalized['stderr'] = stderr
normalized['stdout_truncated'] = bool(normalized.get('stdout_truncated') or stdout_capped)
normalized['stderr_truncated'] = bool(normalized.get('stderr_truncated') or stderr_capped)
if stdout and stderr:
preview_raw = f'stdout:\n{stdout}\n\nstderr:\n{stderr}'
else:
preview_raw = stdout or stderr
preview, preview_capped = self._truncate_text_to_bytes_with_flag(preview_raw, _DEFAULT_TOOL_RESULT_MAX_BYTES)
normalized['preview'] = preview
normalized['truncated'] = bool(
normalized['stdout_truncated'] or normalized['stderr_truncated'] or preview_capped
)
if preview_capped and not normalized.get('truncated_by'):
normalized['truncated_by'] = 'bytes'
return normalized
def _build_directory_result(self, entries: list[str]) -> dict:
sorted_entries = sorted(str(entry) for entry in entries)
content = '\n'.join(sorted_entries)
preview = self._truncate_text_to_bytes(content, _DEFAULT_TOOL_RESULT_MAX_BYTES)
truncated = preview != content
return {
'ok': True,
'content': preview,
'is_directory': True,
'total': len(sorted_entries),
'truncated': truncated,
'truncated_by': 'bytes' if truncated else None,
}
def _read_text_file_preview(self, host_path: str, parameters: dict) -> dict:
offset = self._positive_int(parameters.get('offset'), default=1)
max_lines = self._positive_int(
parameters.get('limit'),
default=_DEFAULT_READ_MAX_LINES,
max_value=_MAX_READ_MAX_LINES,
)
max_bytes = self._positive_int(
parameters.get('max_bytes'),
default=_DEFAULT_TOOL_RESULT_MAX_BYTES,
max_value=_DEFAULT_TOOL_RESULT_MAX_BYTES,
)
lines: list[str] = []
output_bytes = 0
end_line = offset - 1
truncated = False
truncated_by: str | None = None
next_offset: int | None = None
with open(host_path, 'r', encoding='utf-8', errors='replace') as f:
for line_number, line in enumerate(f, 1):
if line_number < offset:
continue
if len(lines) >= max_lines:
truncated = True
truncated_by = 'lines'
next_offset = line_number
break
line_bytes = len(line.encode('utf-8'))
if output_bytes + line_bytes > max_bytes:
truncated = True
truncated_by = 'bytes'
next_offset = line_number
break
lines.append(line.rstrip('\n'))
output_bytes += line_bytes
end_line = line_number
if not lines and truncated_by == 'bytes':
content = (
f'[Line {next_offset or offset} exceeds the {self._format_size(max_bytes)} read limit. '
'Use exec with a byte-range command for this line, or read a different offset.]'
)
else:
content = '\n'.join(lines)
return {
'ok': True,
'content': content,
'truncated': truncated,
'truncated_by': truncated_by,
'start_line': offset,
'end_line': end_line,
'next_offset': next_offset,
'max_lines': max_lines,
'max_bytes': max_bytes,
}
def _build_read_result_from_text(self, content: str, parameters: dict) -> dict:
offset = self._positive_int(parameters.get('offset'), default=1)
max_lines = self._positive_int(
parameters.get('limit'),
default=_DEFAULT_READ_MAX_LINES,
max_value=_MAX_READ_MAX_LINES,
)
max_bytes = self._positive_int(
parameters.get('max_bytes'),
default=_DEFAULT_TOOL_RESULT_MAX_BYTES,
max_value=_DEFAULT_TOOL_RESULT_MAX_BYTES,
)
all_lines = content.splitlines()
start_index = offset - 1
if start_index >= len(all_lines) and all_lines:
return {'ok': False, 'error': f'Offset {offset} is beyond end of file ({len(all_lines)} lines total)'}
output_lines: list[str] = []
output_bytes = 0
truncated = False
truncated_by: str | None = None
next_offset: int | None = None
for index, line in enumerate(all_lines[start_index:], start_index + 1):
if len(output_lines) >= max_lines:
truncated = True
truncated_by = 'lines'
next_offset = index
break
line_bytes = len(line.encode('utf-8')) + (1 if output_lines else 0)
if output_bytes + line_bytes > max_bytes:
truncated = True
truncated_by = 'bytes'
next_offset = index
break
output_lines.append(line)
output_bytes += line_bytes
end_line = offset + len(output_lines) - 1
return {
'ok': True,
'content': '\n'.join(output_lines),
'truncated': truncated,
'truncated_by': truncated_by,
'start_line': offset,
'end_line': end_line,
'next_offset': next_offset,
'max_lines': max_lines,
'max_bytes': max_bytes,
}
@staticmethod
def _positive_int(value, *, default: int, max_value: int | None = None) -> int:
try:
parsed = int(value)
except (TypeError, ValueError):
parsed = default
if parsed <= 0:
parsed = default
if max_value is not None:
parsed = min(parsed, max_value)
return parsed
@staticmethod
def _truncate_grep_line(line: str) -> tuple[str, bool]:
if len(line) <= _GREP_MAX_LINE_CHARS:
return line, False
return f'{line[:_GREP_MAX_LINE_CHARS]}... [truncated]', True
@staticmethod
def _truncate_text_to_bytes(text: str, max_bytes: int) -> str:
return NativeToolLoader._truncate_text_to_bytes_with_flag(text, max_bytes)[0]
@staticmethod
def _truncate_text_to_bytes_with_flag(text: str, max_bytes: int) -> tuple[str, bool]:
data = text.encode('utf-8')
if len(data) <= max_bytes:
return text, False
truncated = data[:max_bytes]
while truncated and (truncated[-1] & 0xC0) == 0x80:
truncated = truncated[:-1]
return truncated.decode('utf-8', errors='ignore'), True
@staticmethod
def _format_size(bytes_count: int) -> str:
if bytes_count < 1024:
return f'{bytes_count}B'
return f'{bytes_count / 1024:.1f}KB'
def _summarize_parameters(self, parameters: dict) -> dict:
summary = dict(parameters)
cmd = str(summary.get('command', '')).strip()

View File

@@ -72,6 +72,45 @@ def register_activated_skill(query: pipeline_query.Query, skill_data: dict) -> N
activated[skill_name] = skill_data
def normalize_skill_names(value: typing.Any) -> list[str]:
"""Return a de-duplicated list of non-empty skill names."""
if not isinstance(value, list):
return []
names: list[str] = []
for item in value:
skill_name = str(item or '').strip()
if skill_name and skill_name not in names:
names.append(skill_name)
return names
def get_activated_skill_names(query: pipeline_query.Query) -> list[str]:
"""Return activated skill names for callers that own persistence policy."""
return normalize_skill_names(list(get_activated_skills(query).keys()))
def restore_activated_skills(
ap: app.Application,
query: pipeline_query.Query,
skill_names: typing.Any,
) -> list[str]:
"""Restore caller-provided activated skill names into Query variables.
Persistence and state scope ownership belong to higher-level flows. This
helper only rebuilds current Query state from pipeline-visible skills, so
removed or unbound skills stay unavailable to native exec/write/edit.
"""
restored: list[str] = []
for skill_name in normalize_skill_names(skill_names):
skill_data = get_visible_skill(ap, query, skill_name)
if skill_data is None:
continue
register_activated_skill(query, skill_data)
restored.append(skill_name)
return restored
def parse_skill_mount_path(sandbox_path: str) -> tuple[str | None, str]:
normalized_path = str(sandbox_path or '/workspace').strip() or '/workspace'
if normalized_path == SKILL_MOUNT_PREFIX:

View File

@@ -6,6 +6,7 @@ import typing
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
from .. import loader
from .availability import is_box_backend_available
# Align with Claude Code's Skill tool design:
# - activate: Activate a skill via Tool Call, returns SKILL.md content
@@ -45,18 +46,7 @@ class SkillToolLoader(loader.ToolLoader):
async def _check_sandbox_available(self) -> bool:
"""Check if the box backend is truly available (not just the runtime)."""
box_service = getattr(self.ap, 'box_service', None)
if box_service is None:
return False
if not getattr(box_service, 'available', False):
return False
# Check if backend is truly available via get_status
try:
status = await box_service.get_status()
backend_info = status.get('backend', {})
return backend_info.get('available', False)
except Exception:
return False
return await is_box_backend_available(self.ap)
async def get_tools(self, bound_plugins: list[str] | None = None) -> list[resource_tool.LLMTool]:
if not self._is_available():
@@ -92,16 +82,15 @@ class SkillToolLoader(loader.ToolLoader):
if not skill_name:
raise ValueError('skill_name is required')
skill_mgr = self.ap.skill_mgr
skill_data = skill_mgr.get_skill_by_name(skill_name)
from . import skill as skill_loader
skill_data = skill_loader.get_visible_skill(self.ap, query, skill_name)
if skill_data is None:
visible_skills = getattr(skill_mgr, 'skills', {})
visible_skills = skill_loader.get_visible_skills(self.ap, query)
available_names = ', '.join(sorted(visible_skills.keys())) or 'none'
raise ValueError(f'Skill "{skill_name}" not found. Available skills: {available_names}')
# Register activated skill for sandbox mount path resolution
from . import skill as skill_loader
skill_loader.register_activated_skill(query, skill_data)
# Return SKILL.md content as Tool Result (injects into context)
@@ -127,6 +116,7 @@ class SkillToolLoader(loader.ToolLoader):
'activated': True,
'skill_name': skill_name,
'mount_path': mount_path,
'activated_skill_names': skill_loader.get_activated_skill_names(query),
'content': result_content,
}
@@ -201,13 +191,13 @@ class SkillToolLoader(loader.ToolLoader):
return resource_tool.LLMTool(
name=ACTIVATE_SKILL_TOOL_NAME,
human_desc='Activate a skill',
description=self._build_activate_tool_description(),
description='Activate a pipeline-visible skill by name and return its instructions as a tool result.',
parameters={
'type': 'object',
'properties': {
'skill_name': {
'type': 'string',
'description': 'The skill name to activate (no arguments). E.g., "pdf" or "data-analysis"',
'description': 'The skill name to activate.',
},
},
'required': ['skill_name'],
@@ -255,50 +245,3 @@ class SkillToolLoader(loader.ToolLoader):
},
func=lambda parameters: parameters,
)
def _build_activate_tool_description(self) -> str:
"""Build tool description with embedded available_skills list."""
skill_mgr = getattr(self.ap, 'skill_mgr', None)
if skill_mgr is None:
return 'Activate a skill. No skills are currently available.'
skills = getattr(skill_mgr, 'skills', {})
if not skills:
return 'Activate a skill. No skills are currently available.'
# Build <available_skills> section
available_skills_lines = ['<available_skills>']
for skill_name, skill_data in sorted(skills.items()):
description = skill_data.get('description', '')
available_skills_lines.append('<skill>')
available_skills_lines.append(f'<name>{skill_name}</name>')
available_skills_lines.append(f'<description>{description}</description>')
available_skills_lines.append('</skill>')
available_skills_lines.append('</available_skills>')
available_skills_block = '\n'.join(available_skills_lines)
return f"""Activate a skill within the main conversation.
<skills_instructions>
When users ask you to perform tasks, check if any of the available skills
below can help complete the task more effectively. Skills provide specialized
capabilities and domain knowledge.
How to use skills:
- Invoke skills using this tool with the skill name only (no arguments)
- When you invoke a skill, you will see <command-message>
The skill is activated
</command-message>
- The skill's instructions will be provided in the tool result
- Examples:
- skill_name: "pdf" - invoke the pdf skill
- skill_name: "data-analysis" - invoke the data-analysis skill
Important:
- Only use skills listed in <available_skills> below
- Do not invoke a skill that is already running
- To create a new skill: prepare it in /workspace, then use register_skill tool
</skills_instructions>
{available_skills_block}"""

View File

@@ -54,7 +54,9 @@ def test_classify_python_workspace_detects_package_and_requirements():
def test_wrap_python_command_with_env_contains_bootstrap_and_command():
command = wrap_python_command_with_env('python script.py')
assert 'python -m venv "$_LB_VENV_DIR"' in command
assert '_LB_SYSTEM_PYTHON="$(command -v python3 || command -v python || true)"' in command
assert '"$_LB_SYSTEM_PYTHON" -m venv "$_LB_VENV_DIR"' in command
assert 'kill -0 "$_LB_LOCK_OWNER"' in command
assert 'export VIRTUAL_ENV="$_LB_VENV_DIR"' in command
assert command.rstrip().endswith('python script.py')

View File

@@ -49,30 +49,6 @@ class TestExtractDepsMetadata:
assert 'flask' in task_context.metadata['deps_list']
assert 'numpy' in task_context.metadata['deps_list']
def test_extract_plugin_identity_includes_version(self):
"""Extract plugin identity and version from manifest.yaml."""
connector = self._create_connector()
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w') as zf:
zf.writestr(
'manifest.yaml',
'\n'.join(
[
'metadata:',
' author: langbot-team',
' name: LangRAG',
' version: 0.1.8',
]
),
)
assert connector._inspect_plugin_package(zip_buffer.getvalue(), None) == (
'langbot-team',
'LangRAG',
'0.1.8',
)
def test_extract_deps_empty_requirements(self):
"""Handle empty requirements.txt."""
connector = self._create_connector()

View File

@@ -180,7 +180,7 @@ class TestMCPServerBoxConfig:
assert cfg.host_path is None
assert cfg.host_path_mode == 'ro'
assert cfg.env == {}
assert cfg.startup_timeout_sec == 120
assert cfg.startup_timeout_sec == 300
assert cfg.cpus is None
assert cfg.memory_mb is None
assert cfg.pids_limit is None
@@ -494,6 +494,84 @@ class TestBuildBoxProcessPayload:
assert payload['args'] == ['/opt/other/server.py', '--flag']
# ── Python Workspace Preparation ────────────────────────────────────
class TestPythonWorkspacePreparation:
def test_requirements_workspace_uses_venv_bootstrap(self, mcp_module, tmp_path):
host_path = tmp_path / 'mcp-source'
host_path.mkdir()
(host_path / 'requirements.txt').write_text('mcp==1.26.0\n', encoding='utf-8')
command = mcp_module.BoxStdioSessionRuntime.detect_install_command(
str(host_path),
'/workspace/.mcp/u1/workspace',
)
assert command is not None
assert '_LB_SYSTEM_PYTHON="$(command -v python3 || command -v python || true)"' in command
assert '"$_LB_SYSTEM_PYTHON" -m venv "$_LB_VENV_DIR"' in command
assert 'python -m pip install -r "/workspace/.mcp/u1/workspace/requirements.txt"' in command
assert 'pip install --no-cache-dir -r' not in command
def test_staging_refresh_removes_stale_source_files_but_preserves_runtime_dirs(self, mcp_module, tmp_path):
source = tmp_path / 'source'
source.mkdir()
(source / 'server.py').write_text('print("new")\n', encoding='utf-8')
(source / 'requirements.txt').write_text('mcp==1.26.0\n', encoding='utf-8')
(source / '.env').write_text('TOKEN=new\n', encoding='utf-8')
process_root = tmp_path / 'shared' / '.mcp' / 'u1'
workspace = process_root / 'workspace'
(workspace / '.venv' / 'bin').mkdir(parents=True)
(workspace / '.venv' / 'bin' / 'python').write_text('', encoding='utf-8')
(workspace / '.langbot').mkdir()
(workspace / '.langbot' / 'python-env.lock').mkdir()
(workspace / '.env').write_text('TOKEN=old\n', encoding='utf-8')
(workspace / 'server.py').write_text('print("old")\n', encoding='utf-8')
(workspace / 'removed.py').write_text('stale\n', encoding='utf-8')
(workspace / 'removed_dir').mkdir()
(workspace / 'removed_dir' / 'old.txt').write_text('stale\n', encoding='utf-8')
mcp_module.BoxStdioSessionRuntime._copy_workspace_tree(str(source), str(process_root), str(workspace))
assert (workspace / 'server.py').read_text(encoding='utf-8') == 'print("new")\n'
assert (workspace / 'requirements.txt').read_text(encoding='utf-8') == 'mcp==1.26.0\n'
assert (workspace / '.env').read_text(encoding='utf-8') == 'TOKEN=new\n'
assert not (workspace / 'removed.py').exists()
assert not (workspace / 'removed_dir').exists()
assert (workspace / '.venv' / 'bin' / 'python').exists()
assert (workspace / '.langbot' / 'python-env.lock').is_dir()
def test_staging_refresh_ignores_unlink_race(self, mcp_module, tmp_path, monkeypatch):
mcp_stdio_module = sys.modules['langbot.pkg.provider.tools.loaders.mcp_stdio']
source = tmp_path / 'source'
source.mkdir()
(source / 'server.py').write_text('print("new")\n', encoding='utf-8')
process_root = tmp_path / 'shared' / '.mcp' / 'u1'
workspace = process_root / 'workspace'
workspace.mkdir(parents=True)
stale_file = workspace / 'removed.py'
stale_file.write_text('stale\n', encoding='utf-8')
real_unlink = os.unlink
def unlink_with_race(path):
if os.fspath(path) == str(stale_file):
real_unlink(path)
raise FileNotFoundError(path)
real_unlink(path)
monkeypatch.setattr(mcp_stdio_module.os, 'unlink', unlink_with_race)
mcp_module.BoxStdioSessionRuntime._copy_workspace_tree(str(source), str(process_root), str(workspace))
assert not stale_file.exists()
assert (workspace / 'server.py').read_text(encoding='utf-8') == 'print("new")\n'
# ── get_runtime_info_dict ───────────────────────────────────────────

View File

@@ -193,6 +193,29 @@ class TestSkillPathHelpers:
assert list(result.keys()) == ['visible']
def test_restore_activated_skills_uses_caller_provided_names_and_visibility(self):
from langbot.pkg.provider.tools.loaders.skill import (
ACTIVATED_SKILLS_KEY,
PIPELINE_BOUND_SKILLS_KEY,
get_activated_skill_names,
restore_activated_skills,
)
ap = _make_ap()
ap.skill_mgr = SimpleNamespace(
skills={
'visible': _make_skill_data(name='visible'),
'hidden': _make_skill_data(name='hidden'),
}
)
query = SimpleNamespace(variables={PIPELINE_BOUND_SKILLS_KEY: ['visible']})
restored = restore_activated_skills(ap, query, ['visible', 'hidden', 'visible', ''])
assert restored == ['visible']
assert list(query.variables[ACTIVATED_SKILLS_KEY].keys()) == ['visible']
assert get_activated_skill_names(query) == ['visible']
def test_resolve_virtual_skill_path_allows_visible_skill_reads(self):
from langbot.pkg.provider.tools.loaders.skill import (
PIPELINE_BOUND_SKILLS_KEY,
@@ -245,7 +268,8 @@ class TestSkillPathHelpers:
command = wrap_skill_command_with_python_env('python scripts/run.py')
assert 'python -m venv "$_LB_VENV_DIR"' in command
assert '_LB_SYSTEM_PYTHON="$(command -v python3 || command -v python || true)"' in command
assert '"$_LB_SYSTEM_PYTHON" -m venv "$_LB_VENV_DIR"' in command
assert 'export VIRTUAL_ENV="$_LB_VENV_DIR"' in command
assert command.rstrip().endswith('python scripts/run.py')
@@ -281,6 +305,7 @@ class TestSkillToolLoader:
assert result['activated'] is True
assert result['skill_name'] == 'demo'
assert result['mount_path'] == '/workspace/.skills/demo'
assert result['activated_skill_names'] == ['demo']
assert 'Step 1' in result['content']
assert set(query.variables[ACTIVATED_SKILLS_KEY].keys()) == {'demo'}
@@ -456,7 +481,9 @@ class TestNativeToolLoaderSkillPaths:
SimpleNamespace(query_id='q1', variables={PIPELINE_BOUND_SKILLS_KEY: ['demo']}),
)
assert result == {'ok': True, 'content': 'demo instructions'}
assert result['ok'] is True
assert result['content'] == 'demo instructions'
assert result['truncated'] is False
@pytest.mark.asyncio
async def test_exec_in_activated_skill_mount_rewrites_command_and_refreshes(self):
@@ -485,7 +512,7 @@ class TestNativeToolLoaderSkillPaths:
query,
)
assert result == {'ok': True}
assert result['ok'] is True
tool_parameters = ap.box_service.execute_tool.await_args.args[0]
assert tool_parameters['command'] == 'python /workspace/.skills/demo/scripts/run.py'
assert tool_parameters['workdir'] == '/workspace/.skills/demo'

View File

@@ -248,3 +248,135 @@ async def test_path_escape_blocked():
with pytest.raises(ValueError, match='escapes'):
await loader.invoke_tool('read', {'path': '/workspace/../../etc/passwd'}, _make_query())
@pytest.mark.asyncio
async def test_box_availability_helper_handles_unavailable_and_errors():
from langbot.pkg.provider.tools.loaders.availability import is_box_backend_available
assert await is_box_backend_available(SimpleNamespace()) is False
assert await is_box_backend_available(SimpleNamespace(box_service=SimpleNamespace(available=False))) is False
unavailable_backend = SimpleNamespace(
available=True,
get_status=AsyncMock(return_value={'backend': {'available': False}}),
)
assert await is_box_backend_available(SimpleNamespace(box_service=unavailable_backend)) is False
failing_backend = SimpleNamespace(
available=True,
get_status=AsyncMock(side_effect=RuntimeError('box unavailable')),
)
assert await is_box_backend_available(SimpleNamespace(box_service=failing_backend)) is False
@pytest.mark.asyncio
async def test_read_file_supports_offset_limit_and_truncation_metadata():
with tempfile.TemporaryDirectory() as tmpdir:
loader, _ = _make_loader_with_workspace(tmpdir)
with open(os.path.join(tmpdir, 'lines.txt'), 'w', encoding='utf-8') as f:
f.write('one\ntwo\nthree\nfour\n')
result = await loader.invoke_tool(
'read',
{'path': '/workspace/lines.txt', 'offset': 2, 'limit': 2},
_make_query(),
)
assert result == {
'ok': True,
'content': 'two\nthree',
'truncated': True,
'truncated_by': 'lines',
'start_line': 2,
'end_line': 3,
'next_offset': 4,
'max_lines': 2,
'max_bytes': 50 * 1024,
}
@pytest.mark.asyncio
async def test_read_file_handles_line_larger_than_byte_limit():
with tempfile.TemporaryDirectory() as tmpdir:
loader, _ = _make_loader_with_workspace(tmpdir)
with open(os.path.join(tmpdir, 'long-line.txt'), 'w', encoding='utf-8') as f:
f.write('abcdef\n')
result = await loader.invoke_tool(
'read',
{'path': '/workspace/long-line.txt', 'max_bytes': 3},
_make_query(),
)
assert result['ok'] is True
assert result['truncated'] is True
assert result['truncated_by'] == 'bytes'
assert result['next_offset'] == 1
assert 'exceeds the 3B read limit' in result['content']
@pytest.mark.asyncio
async def test_exec_result_is_capped_and_exposes_preview_metadata():
with tempfile.TemporaryDirectory() as tmpdir:
box_service = SimpleNamespace(
available=True,
default_workspace=tmpdir,
execute_tool=AsyncMock(
return_value={
'ok': True,
'stdout': 'a' * 60000,
'stderr': 'b' * 60000,
'exit_code': 0,
}
),
)
loader = NativeToolLoader(SimpleNamespace(box_service=box_service, logger=Mock()))
result = await loader.invoke_tool('exec', {'command': 'python -V'}, _make_query())
assert result['ok'] is True
assert len(result['stdout'].encode('utf-8')) == 50 * 1024
assert len(result['stderr'].encode('utf-8')) == 50 * 1024
assert len(result['preview'].encode('utf-8')) == 50 * 1024
assert result['stdout_truncated'] is True
assert result['stderr_truncated'] is True
assert result['truncated'] is True
assert result['truncated_by'] == 'bytes'
@pytest.mark.asyncio
async def test_glob_caps_match_count_and_returns_preview():
with tempfile.TemporaryDirectory() as tmpdir:
loader, _ = _make_loader_with_workspace(tmpdir)
for index in range(105):
with open(os.path.join(tmpdir, f'file-{index:03d}.txt'), 'w', encoding='utf-8') as f:
f.write(str(index))
result = await loader.invoke_tool('glob', {'path': '/workspace', 'pattern': '*.txt'}, _make_query())
assert result['ok'] is True
assert result['total'] == 105
assert len(result['matches']) == 100
assert result['preview'] == '\n'.join(result['matches'])
assert result['truncated'] is True
assert result['truncated_by'] == 'matches'
@pytest.mark.asyncio
async def test_grep_reports_invalid_regex_and_truncates_long_matching_lines():
with tempfile.TemporaryDirectory() as tmpdir:
loader, _ = _make_loader_with_workspace(tmpdir)
with open(os.path.join(tmpdir, 'data.txt'), 'w', encoding='utf-8') as f:
f.write('needle ' + ('x' * 600) + '\n')
invalid = await loader.invoke_tool('grep', {'path': '/workspace', 'pattern': '['}, _make_query())
result = await loader.invoke_tool('grep', {'path': '/workspace', 'pattern': 'needle'}, _make_query())
assert invalid['ok'] is False
assert 'Invalid regex' in invalid['error']
assert result['ok'] is True
assert result['truncated'] is True
assert result['truncated_by'] == 'line'
assert result['matches'][0]['file'] == '/workspace/data.txt'
assert result['matches'][0]['content'].endswith('... [truncated]')