chore(agent-runner): stack on tool runtime fixes

# Conflicts:
#	src/langbot/pkg/provider/tools/loaders/mcp_stdio.py
#	tests/unit_tests/provider/test_mcp_box_integration.py
This commit is contained in:
huanghuoguoguo
2026-06-14 11:19:12 +08:00
3 changed files with 495 additions and 84 deletions

View File

@@ -276,7 +276,19 @@ class BoxStdioSessionRuntime:
# to delete them, so refresh source files in place and preserve runtime
# directories instead of rmtree'ing the whole staging root.
with _workspace_copy_lock(process_host_root):
preserved_names = {'.venv', 'venv', 'env', '.env', '.cache', '.tmp', '.langbot'}
os.makedirs(process_host_workspace, exist_ok=True)
for name in os.listdir(process_host_workspace):
if name in preserved_names:
continue
path = os.path.join(process_host_workspace, name)
if os.path.isdir(path) and not os.path.islink(path):
shutil.rmtree(path, ignore_errors=True)
else:
try:
os.unlink(path)
except FileNotFoundError:
pass
shutil.copytree(
source_path,
process_host_workspace,

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
import json
import mimetypes
import os
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
@@ -8,6 +9,7 @@ from langbot_plugin.api.entities.events import pipeline_query
from .. import loader
from ..errors import ToolNotFoundError
from .availability import is_box_backend_available
from . import skill as skill_loader
EXEC_TOOL_NAME = 'exec'
@@ -22,6 +24,15 @@ _ALL_TOOL_NAMES = {EXEC_TOOL_NAME, READ_TOOL_NAME, WRITE_TOOL_NAME, EDIT_TOOL_NA
# Skip these dirs during grep walk to avoid noise
_SKIP_DIRS = {'.git', 'node_modules', '__pycache__', '.venv', 'venv', '.tox', 'dist', 'build'}
_DEFAULT_READ_MAX_LINES = 2000
_MAX_READ_MAX_LINES = 10000
_DEFAULT_TOOL_RESULT_MAX_BYTES = 50 * 1024
_BOX_FILE_SCRIPT_MAX_BYTES = 2048
_GLOB_MAX_MATCHES = 100
_GREP_MAX_MATCHES = 200
_GREP_MAX_FILES = 5000
_GREP_MAX_LINE_CHARS = 500
class NativeToolLoader(loader.ToolLoader):
def __init__(self, ap):
@@ -43,18 +54,7 @@ class NativeToolLoader(loader.ToolLoader):
async def _check_backend_available(self) -> bool:
"""Check if the box backend is truly available (not just the runtime)."""
box_service = getattr(self.ap, 'box_service', None)
if box_service is None:
return False
if not getattr(box_service, 'available', False):
return False
# Check if backend is truly available via get_status
try:
status = await box_service.get_status()
backend_info = status.get('backend', {})
return backend_info.get('available', False)
except Exception:
return False
return await is_box_backend_available(self.ap)
async def get_tools(self, bound_plugins: list[str] | None = None) -> list[resource_tool.LLMTool]:
if not self._is_sandbox_available():
@@ -139,6 +139,7 @@ class NativeToolLoader(loader.ToolLoader):
# via execute_tool. Skills are mounted at /workspace/.skills/{name}/
# via extra_mounts built by BoxService.
result = await self.ap.box_service.execute_tool(parameters, query)
result = self._normalize_exec_result(result)
if selected_skill is not None:
self._refresh_skill_from_disk(selected_skill)
@@ -227,19 +228,65 @@ class NativeToolLoader(loader.ToolLoader):
except Exception:
return {'ok': False, 'error': stdout or 'Box file operation returned no result'}
async def _read_workspace_via_box(self, path: str, query: pipeline_query.Query) -> dict:
async def _read_workspace_via_box(self, path: str, parameters: dict, query: pipeline_query.Query) -> dict:
offset = self._positive_int(parameters.get('offset'), default=1)
max_lines = self._positive_int(
parameters.get('limit'),
default=_DEFAULT_READ_MAX_LINES,
max_value=_MAX_READ_MAX_LINES,
)
# Box file fallback returns through exec stdout, which is already capped
# by BoxService. Keep this payload small enough to remain valid JSON.
max_bytes = min(
self._positive_int(parameters.get('max_bytes'), default=_DEFAULT_TOOL_RESULT_MAX_BYTES),
_BOX_FILE_SCRIPT_MAX_BYTES,
)
script = f"""
import json, os
path = {json.dumps(path)}
offset = {offset}
max_lines = {max_lines}
max_bytes = {max_bytes}
if not path.startswith('/workspace'):
print(json.dumps({{'ok': False, 'error': 'Path must be under /workspace.'}}))
elif not os.path.exists(path):
print(json.dumps({{'ok': False, 'error': f'File not found: {{path}}'}}))
elif os.path.isdir(path):
print(json.dumps({{'ok': True, 'content': '\\n'.join(sorted(os.listdir(path))), 'is_directory': True}}))
entries = sorted(os.listdir(path))
content = '\\n'.join(entries)
print(json.dumps({{'ok': True, 'content': content, 'is_directory': True, 'total': len(entries), 'truncated': False}}))
else:
lines = []
output_bytes = 0
end_line = offset - 1
truncated = False
next_offset = None
with open(path, 'r', encoding='utf-8', errors='replace') as f:
print(json.dumps({{'ok': True, 'content': f.read()}}))
for line_number, line in enumerate(f, 1):
if line_number < offset:
continue
if len(lines) >= max_lines:
truncated = True
next_offset = line_number
break
line_bytes = len(line.encode('utf-8'))
if output_bytes + line_bytes > max_bytes:
truncated = True
next_offset = line_number
break
lines.append(line.rstrip('\\n'))
output_bytes += line_bytes
end_line = line_number
print(json.dumps({{
'ok': True,
'content': '\\n'.join(lines),
'truncated': truncated,
'start_line': offset,
'end_line': end_line,
'next_offset': next_offset,
'max_lines': max_lines,
'max_bytes': max_bytes,
}}))
""".strip()
return await self._run_workspace_file_script(script, query)
@@ -307,12 +354,27 @@ else:
if not any(part in skip_dirs for part in item.parts)
]
hits.sort(key=lambda item: item.stat().st_mtime if item.exists() else 0, reverse=True)
shown = hits[:100]
shown = hits[:{_GLOB_MAX_MATCHES}]
matches = []
output_bytes = 0
truncated_by_bytes = False
for item in shown:
rel = os.path.relpath(str(item), path)
matches.append(os.path.join(path, rel).replace(os.sep, '/'))
print(json.dumps({{'ok': True, 'matches': matches, 'total': len(hits), 'truncated': len(hits) > 100}}))
sandbox_path = os.path.join(path, rel).replace(os.sep, '/')
entry_bytes = len(sandbox_path.encode('utf-8')) + (1 if matches else 0)
if output_bytes + entry_bytes > {_DEFAULT_TOOL_RESULT_MAX_BYTES}:
truncated_by_bytes = True
break
matches.append(sandbox_path)
output_bytes += entry_bytes
print(json.dumps({{
'ok': True,
'matches': matches,
'preview': '\\n'.join(matches),
'total': len(hits),
'truncated': len(hits) > len(matches) or truncated_by_bytes,
'truncated_by': 'bytes' if truncated_by_bytes else ('matches' if len(hits) > len(matches) else None),
}}))
""".strip()
return await self._run_workspace_file_script(script, query)
@@ -350,29 +412,54 @@ else:
continue
if item.is_file():
files.append(item)
if len(files) >= 5000:
if len(files) >= {_GREP_MAX_FILES}:
break
matches = []
output_bytes = 0
truncated_by = None
for fp in files:
try:
text = fp.read_text(errors='ignore')
handle = fp.open('r', encoding='utf-8', errors='ignore')
except OSError:
continue
for lineno, line in enumerate(text.splitlines(), 1):
if regex.search(line):
if base.is_file():
file_path = path
else:
rel = os.path.relpath(str(fp), path)
file_path = os.path.join(path, rel).replace(os.sep, '/')
matches.append({{'file': file_path, 'line': lineno, 'content': line.rstrip()}})
if len(matches) >= 200:
break
if len(matches) >= 200:
with handle:
for lineno, line in enumerate(handle, 1):
if regex.search(line):
if base.is_file():
file_path = path
else:
rel = os.path.relpath(str(fp), path)
file_path = os.path.join(path, rel).replace(os.sep, '/')
content = line.rstrip()
line_truncated = False
if len(content) > {_GREP_MAX_LINE_CHARS}:
content = content[:{_GREP_MAX_LINE_CHARS}] + '... [truncated]'
line_truncated = True
entry = {{'file': file_path, 'line': lineno, 'content': content}}
entry_bytes = len(json.dumps(entry, ensure_ascii=False).encode('utf-8')) + 1
if output_bytes + entry_bytes > {_DEFAULT_TOOL_RESULT_MAX_BYTES}:
truncated_by = 'bytes'
break
if line_truncated and truncated_by is None:
truncated_by = 'line'
matches.append(entry)
output_bytes += entry_bytes
if len(matches) >= {_GREP_MAX_MATCHES}:
truncated_by = truncated_by or 'matches'
break
if truncated_by == 'bytes' or len(matches) >= {_GREP_MAX_MATCHES}:
break
if truncated_by == 'bytes' or len(matches) >= {_GREP_MAX_MATCHES}:
break
print(json.dumps({{'ok': True, 'matches': matches, 'total': len(matches), 'truncated': len(matches) >= 200}}))
print(json.dumps({{
'ok': True,
'matches': matches,
'total': len(matches),
'truncated': truncated_by is not None,
'truncated_by': truncated_by,
}}))
""".strip()
return await self._run_workspace_file_script(script, query)
@@ -387,14 +474,22 @@ else:
)
if skill_request is not None and hasattr(self.ap.box_service, 'read_skill_file'):
selected_skill, relative = skill_request
host_path = self._resolve_skill_host_path(selected_skill, relative)
if host_path and os.path.exists(host_path):
if os.path.isdir(host_path):
return self._build_directory_result(os.listdir(host_path))
result = self._read_text_file_preview(host_path, parameters)
host_root = str(selected_skill.get('package_root', '') or '')
return await self._attach_file_artifact_ref(result, host_path, host_root, path, query)
try:
result = await self.ap.box_service.read_skill_file(selected_skill['name'], relative)
return {'ok': True, 'content': result.get('content', '')}
return self._build_read_result_from_text(str(result.get('content', '')), parameters)
except Exception:
try:
result = await self.ap.box_service.list_skill_files(selected_skill['name'], relative)
entries = [entry['name'] for entry in result.get('entries', [])]
return {'ok': True, 'content': '\n'.join(sorted(entries)), 'is_directory': True}
return self._build_directory_result(entries)
except Exception as exc:
return {'ok': False, 'error': str(exc)}
@@ -405,15 +500,15 @@ else:
include_activated=True,
)
if self._should_use_box_workspace_files(selected_skill):
return await self._read_workspace_via_box(path, query)
return await self._read_workspace_via_box(path, parameters, query)
if not os.path.exists(host_path):
return {'ok': False, 'error': f'File not found: {path}'}
if os.path.isdir(host_path):
entries = os.listdir(host_path)
return {'ok': True, 'content': '\n'.join(sorted(entries)), 'is_directory': True}
with open(host_path, 'r', errors='replace') as f:
content = f.read()
return {'ok': True, 'content': content}
return self._build_directory_result(entries)
result = self._read_text_file_preview(host_path, parameters)
host_root = self._get_host_root(selected_skill)
return await self._attach_file_artifact_ref(result, host_path, host_root, path, query)
async def _invoke_write(self, parameters: dict, query: pipeline_query.Query) -> dict:
path = parameters['path']
@@ -584,6 +679,29 @@ else:
'type': 'string',
'description': 'Absolute path to the file (must be under /workspace).',
},
'offset': {
'type': 'integer',
'description': '1-indexed line number to start reading from. Defaults to 1.',
'default': 1,
'minimum': 1,
},
'limit': {
'type': 'integer',
'description': f'Maximum number of lines to return. Defaults to {_DEFAULT_READ_MAX_LINES}.',
'default': _DEFAULT_READ_MAX_LINES,
'minimum': 1,
'maximum': _MAX_READ_MAX_LINES,
},
'max_bytes': {
'type': 'integer',
'description': (
'Maximum bytes of file content to return. '
f'Defaults to {_DEFAULT_TOOL_RESULT_MAX_BYTES}.'
),
'default': _DEFAULT_TOOL_RESULT_MAX_BYTES,
'minimum': 1,
'maximum': _DEFAULT_TOOL_RESULT_MAX_BYTES,
},
},
'required': ['path'],
'additionalProperties': False,
@@ -740,22 +858,30 @@ else:
hits.sort(key=lambda p: p.stat().st_mtime if p.exists() else 0, reverse=True)
total = len(hits)
shown = hits[:100]
shown = hits[:_GLOB_MAX_MATCHES]
# Convert back to sandbox paths
sandbox_paths = []
output_bytes = 0
truncated_by_bytes = False
for h in shown:
rel = os.path.relpath(str(h), host_path)
sandbox_path = os.path.join(path, rel)
entry_bytes = len(sandbox_path.encode('utf-8')) + (1 if sandbox_paths else 0)
if output_bytes + entry_bytes > _DEFAULT_TOOL_RESULT_MAX_BYTES:
truncated_by_bytes = True
break
sandbox_paths.append(sandbox_path)
output_bytes += entry_bytes
result_lines = sandbox_paths
result = '\n'.join(result_lines)
if total > 100:
result += f'\n... ({total} matches, showing first 100)'
return {'ok': True, 'matches': result_lines, 'total': total, 'truncated': total > 100}
return {
'ok': True,
'matches': sandbox_paths,
'preview': '\n'.join(sandbox_paths),
'total': total,
'truncated': total > len(sandbox_paths) or truncated_by_bytes,
'truncated_by': 'bytes' if truncated_by_bytes else ('matches' if total > len(sandbox_paths) else None),
}
async def _invoke_grep(self, parameters: dict, query: pipeline_query.Query) -> dict:
pattern = parameters['pattern']
@@ -791,32 +917,46 @@ else:
files = self._grep_walk(base, include)
matches = []
output_bytes = 0
truncated_by = None
for fp in files:
try:
text = fp.read_text(errors='ignore')
handle = fp.open('r', encoding='utf-8', errors='ignore')
except OSError:
continue
for lineno, line in enumerate(text.splitlines(), 1):
if regex.search(line):
rel = os.path.relpath(str(fp), host_path)
sandbox_path = os.path.join(path, rel)
matches.append(
{
with handle:
for lineno, line in enumerate(handle, 1):
if regex.search(line):
rel = os.path.relpath(str(fp), host_path)
sandbox_path = os.path.join(path, rel)
content, line_truncated = self._truncate_grep_line(line.rstrip())
entry = {
'file': sandbox_path,
'line': lineno,
'content': line.rstrip(),
'content': content,
}
)
if len(matches) >= 200:
break
if len(matches) >= 200:
entry_bytes = len(json.dumps(entry, ensure_ascii=False).encode('utf-8')) + 1
if output_bytes + entry_bytes > _DEFAULT_TOOL_RESULT_MAX_BYTES:
truncated_by = 'bytes'
break
if line_truncated and truncated_by is None:
truncated_by = 'line'
matches.append(entry)
output_bytes += entry_bytes
if len(matches) >= _GREP_MAX_MATCHES:
truncated_by = truncated_by or 'matches'
break
if truncated_by == 'bytes' or len(matches) >= _GREP_MAX_MATCHES:
break
if truncated_by == 'bytes' or len(matches) >= _GREP_MAX_MATCHES:
break
return {
'ok': True,
'matches': matches,
'total': len(matches),
'truncated': len(matches) >= 200,
'truncated': truncated_by is not None,
'truncated_by': truncated_by,
}
@staticmethod
@@ -828,10 +968,285 @@ else:
continue
if item.is_file():
results.append(item)
if len(results) >= 5000:
if len(results) >= _GREP_MAX_FILES:
break
return results
@staticmethod
def _resolve_skill_host_path(selected_skill: dict, relative: str) -> str | None:
package_root = str(selected_skill.get('package_root', '') or '').strip()
if not package_root:
return None
host_root = os.path.realpath(package_root)
host_path = os.path.realpath(os.path.join(host_root, relative))
if not (host_path == host_root or host_path.startswith(host_root + os.sep)):
raise ValueError('Path escapes the skill package boundary.')
return host_path
def _get_host_root(self, selected_skill: dict | None) -> str:
if selected_skill is not None:
return str(selected_skill.get('package_root', '') or '')
return str(getattr(self.ap.box_service, 'default_workspace', '') or '')
async def _attach_file_artifact_ref(
self,
result: dict,
host_path: str,
host_root: str,
sandbox_path: str,
query: pipeline_query.Query,
) -> dict:
if not result.get('ok') or not result.get('truncated') or result.get('artifact_refs'):
return result
if not host_root or not os.path.isfile(host_path):
return result
run_session = self._get_agent_run_session(query)
if not run_session:
return result
persistence_mgr = getattr(self.ap, 'persistence_mgr', None)
get_db_engine = getattr(persistence_mgr, 'get_db_engine', None)
if not callable(get_db_engine):
return result
try:
from langbot.pkg.agent.runner.artifact_store import ArtifactStore
authorization = run_session.get('authorization', {}) if isinstance(run_session, dict) else {}
mime_type = mimetypes.guess_type(host_path)[0] or 'text/plain'
size_bytes = os.path.getsize(host_path)
metadata = {
'tool_name': READ_TOOL_NAME,
'sandbox_path': sandbox_path,
'truncated_by': result.get('truncated_by'),
'start_line': result.get('start_line'),
'end_line': result.get('end_line'),
'next_offset': result.get('next_offset'),
}
artifact_id = await ArtifactStore(get_db_engine()).register_file_artifact(
artifact_id=None,
host_path=host_path,
host_root=host_root,
artifact_type='file',
source='tool',
mime_type=mime_type,
name=os.path.basename(host_path),
size_bytes=size_bytes,
conversation_id=authorization.get('conversation_id'),
run_id=run_session.get('run_id') if isinstance(run_session, dict) else None,
runner_id=run_session.get('runner_id') if isinstance(run_session, dict) else None,
bot_id=getattr(query, 'bot_uuid', None),
workspace_id=authorization.get('workspace_id'),
thread_id=authorization.get('thread_id'),
metadata=metadata,
)
artifact_ref = {
'artifact_id': artifact_id,
'artifact_type': 'file',
'mime_type': mime_type,
'name': os.path.basename(host_path),
'size_bytes': size_bytes,
}
enriched = dict(result)
enriched['preview'] = str(result.get('content') or '')
enriched['artifact_refs'] = [artifact_ref]
return enriched
except Exception as exc:
self.ap.logger.warning(f'Failed to register read artifact for {sandbox_path}: {exc}')
return result
@staticmethod
def _get_agent_run_session(query: pipeline_query.Query) -> dict | None:
session = getattr(query, '_agent_run_session', None)
return session if isinstance(session, dict) else None
def _normalize_exec_result(self, result: dict) -> dict:
normalized = dict(result)
stdout = str(normalized.get('stdout') or '')
stderr = str(normalized.get('stderr') or '')
stdout, stdout_capped = self._truncate_text_to_bytes_with_flag(stdout, _DEFAULT_TOOL_RESULT_MAX_BYTES)
stderr, stderr_capped = self._truncate_text_to_bytes_with_flag(stderr, _DEFAULT_TOOL_RESULT_MAX_BYTES)
normalized['stdout'] = stdout
normalized['stderr'] = stderr
normalized['stdout_truncated'] = bool(normalized.get('stdout_truncated') or stdout_capped)
normalized['stderr_truncated'] = bool(normalized.get('stderr_truncated') or stderr_capped)
if stdout and stderr:
preview_raw = f'stdout:\n{stdout}\n\nstderr:\n{stderr}'
else:
preview_raw = stdout or stderr
preview, preview_capped = self._truncate_text_to_bytes_with_flag(preview_raw, _DEFAULT_TOOL_RESULT_MAX_BYTES)
normalized['preview'] = preview
normalized['truncated'] = bool(
normalized['stdout_truncated'] or normalized['stderr_truncated'] or preview_capped
)
if preview_capped and not normalized.get('truncated_by'):
normalized['truncated_by'] = 'bytes'
return normalized
def _build_directory_result(self, entries: list[str]) -> dict:
sorted_entries = sorted(str(entry) for entry in entries)
content = '\n'.join(sorted_entries)
preview = self._truncate_text_to_bytes(content, _DEFAULT_TOOL_RESULT_MAX_BYTES)
truncated = preview != content
return {
'ok': True,
'content': preview,
'is_directory': True,
'total': len(sorted_entries),
'truncated': truncated,
'truncated_by': 'bytes' if truncated else None,
}
def _read_text_file_preview(self, host_path: str, parameters: dict) -> dict:
offset = self._positive_int(parameters.get('offset'), default=1)
max_lines = self._positive_int(
parameters.get('limit'),
default=_DEFAULT_READ_MAX_LINES,
max_value=_MAX_READ_MAX_LINES,
)
max_bytes = self._positive_int(
parameters.get('max_bytes'),
default=_DEFAULT_TOOL_RESULT_MAX_BYTES,
max_value=_DEFAULT_TOOL_RESULT_MAX_BYTES,
)
lines: list[str] = []
output_bytes = 0
end_line = offset - 1
truncated = False
truncated_by: str | None = None
next_offset: int | None = None
with open(host_path, 'r', encoding='utf-8', errors='replace') as f:
for line_number, line in enumerate(f, 1):
if line_number < offset:
continue
if len(lines) >= max_lines:
truncated = True
truncated_by = 'lines'
next_offset = line_number
break
line_bytes = len(line.encode('utf-8'))
if output_bytes + line_bytes > max_bytes:
truncated = True
truncated_by = 'bytes'
next_offset = line_number
break
lines.append(line.rstrip('\n'))
output_bytes += line_bytes
end_line = line_number
if not lines and truncated_by == 'bytes':
content = (
f'[Line {next_offset or offset} exceeds the {self._format_size(max_bytes)} read limit. '
'Use exec with a byte-range command for this line, or read a different offset.]'
)
else:
content = '\n'.join(lines)
return {
'ok': True,
'content': content,
'truncated': truncated,
'truncated_by': truncated_by,
'start_line': offset,
'end_line': end_line,
'next_offset': next_offset,
'max_lines': max_lines,
'max_bytes': max_bytes,
}
def _build_read_result_from_text(self, content: str, parameters: dict) -> dict:
offset = self._positive_int(parameters.get('offset'), default=1)
max_lines = self._positive_int(
parameters.get('limit'),
default=_DEFAULT_READ_MAX_LINES,
max_value=_MAX_READ_MAX_LINES,
)
max_bytes = self._positive_int(
parameters.get('max_bytes'),
default=_DEFAULT_TOOL_RESULT_MAX_BYTES,
max_value=_DEFAULT_TOOL_RESULT_MAX_BYTES,
)
all_lines = content.splitlines()
start_index = offset - 1
if start_index >= len(all_lines) and all_lines:
return {'ok': False, 'error': f'Offset {offset} is beyond end of file ({len(all_lines)} lines total)'}
output_lines: list[str] = []
output_bytes = 0
truncated = False
truncated_by: str | None = None
next_offset: int | None = None
for index, line in enumerate(all_lines[start_index:], start_index + 1):
if len(output_lines) >= max_lines:
truncated = True
truncated_by = 'lines'
next_offset = index
break
line_bytes = len(line.encode('utf-8')) + (1 if output_lines else 0)
if output_bytes + line_bytes > max_bytes:
truncated = True
truncated_by = 'bytes'
next_offset = index
break
output_lines.append(line)
output_bytes += line_bytes
end_line = offset + len(output_lines) - 1
return {
'ok': True,
'content': '\n'.join(output_lines),
'truncated': truncated,
'truncated_by': truncated_by,
'start_line': offset,
'end_line': end_line,
'next_offset': next_offset,
'max_lines': max_lines,
'max_bytes': max_bytes,
}
@staticmethod
def _positive_int(value, *, default: int, max_value: int | None = None) -> int:
try:
parsed = int(value)
except (TypeError, ValueError):
parsed = default
if parsed <= 0:
parsed = default
if max_value is not None:
parsed = min(parsed, max_value)
return parsed
@staticmethod
def _truncate_grep_line(line: str) -> tuple[str, bool]:
if len(line) <= _GREP_MAX_LINE_CHARS:
return line, False
return f'{line[:_GREP_MAX_LINE_CHARS]}... [truncated]', True
@staticmethod
def _truncate_text_to_bytes(text: str, max_bytes: int) -> str:
return NativeToolLoader._truncate_text_to_bytes_with_flag(text, max_bytes)[0]
@staticmethod
def _truncate_text_to_bytes_with_flag(text: str, max_bytes: int) -> tuple[str, bool]:
data = text.encode('utf-8')
if len(data) <= max_bytes:
return text, False
truncated = data[:max_bytes]
while truncated and (truncated[-1] & 0xC0) == 0x80:
truncated = truncated[:-1]
return truncated.decode('utf-8', errors='ignore'), True
@staticmethod
def _format_size(bytes_count: int) -> str:
if bytes_count < 1024:
return f'{bytes_count}B'
return f'{bytes_count / 1024:.1f}KB'
def _summarize_parameters(self, parameters: dict) -> dict:
summary = dict(parameters)
cmd = str(summary.get('command', '')).strip()

View File

@@ -509,32 +509,11 @@ class TestPythonWorkspacePreparation:
)
assert command is not None
assert '_LB_SYSTEM_PYTHON="$(command -v python3 || command -v python || true)"' in command
assert '"$_LB_SYSTEM_PYTHON" -m venv "$_LB_VENV_DIR"' in command
assert 'python -m venv "$_LB_VENV_DIR"' in command
assert 'python -m pip install -r "/workspace/.mcp/u1/workspace/requirements.txt"' in command
assert 'pip install --no-cache-dir -r' not in command
def test_process_payload_can_start_from_prepared_python_env(self, mcp_module):
payload = {
'command': 'python',
'args': ['/workspace/.mcp/u1/workspace/server.py'],
'env': {},
'cwd': '/workspace/.mcp/u1/workspace',
}
wrapped = mcp_module.BoxStdioSessionRuntime._wrap_process_payload_with_python_env(
payload,
'/workspace/.mcp/u1/workspace',
)
assert wrapped['command'] == 'sh'
assert wrapped['args'][0] == '-lc'
assert 'export VIRTUAL_ENV=/workspace/.mcp/u1/workspace/.venv' in wrapped['args'][1]
assert 'export PATH=/workspace/.mcp/u1/workspace/.venv/bin:$PATH' in wrapped['args'][1]
assert 'exec python /workspace/.mcp/u1/workspace/server.py' in wrapped['args'][1]
assert wrapped['cwd'] == '/workspace/.mcp/u1/workspace'
def test_staging_refresh_preserves_box_runtime_dirs(self, mcp_module, tmp_path):
def test_staging_refresh_removes_stale_source_files_but_preserves_runtime_dirs(self, mcp_module, tmp_path):
source = tmp_path / 'source'
source.mkdir()
(source / 'server.py').write_text('print("new")\n', encoding='utf-8')
@@ -547,11 +526,16 @@ class TestPythonWorkspacePreparation:
(workspace / '.langbot').mkdir()
(workspace / '.langbot' / 'python-env.lock').mkdir()
(workspace / 'server.py').write_text('print("old")\n', encoding='utf-8')
(workspace / 'removed.py').write_text('stale\n', encoding='utf-8')
(workspace / 'removed_dir').mkdir()
(workspace / 'removed_dir' / 'old.txt').write_text('stale\n', encoding='utf-8')
mcp_module.BoxStdioSessionRuntime._copy_workspace_tree(str(source), str(process_root), str(workspace))
assert (workspace / 'server.py').read_text(encoding='utf-8') == 'print("new")\n'
assert (workspace / 'requirements.txt').read_text(encoding='utf-8') == 'mcp==1.26.0\n'
assert not (workspace / 'removed.py').exists()
assert not (workspace / 'removed_dir').exists()
assert (workspace / '.venv' / 'bin' / 'python').exists()
assert (workspace / '.langbot' / 'python-env.lock').is_dir()