Compare commits

..

1 Commits

Author SHA1 Message Date
huanghuoguoguo
e27e26b071 test: add frontend smoke and backend e2e CI 2026-06-16 10:54:57 +08:00
25 changed files with 994 additions and 806 deletions

46
.github/workflows/frontend-tests.yml vendored Normal file
View File

@@ -0,0 +1,46 @@
name: Frontend Tests
on:
pull_request:
types: [opened, synchronize, reopened, ready_for_review]
paths:
- 'web/**'
- '.github/workflows/frontend-tests.yml'
push:
branches:
- master
- develop
paths:
- 'web/**'
- '.github/workflows/frontend-tests.yml'
jobs:
playwright-smoke:
name: Playwright Smoke
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Setup Node.js
uses: actions/setup-node@v4
with:
node-version: '25'
- name: Install pnpm
uses: pnpm/action-setup@v4
with:
version: 8.9.2
- name: Install dependencies
working-directory: web
run: pnpm install --frozen-lockfile
- name: Install Playwright browsers
working-directory: web
run: pnpm exec playwright install --with-deps chromium
- name: Run Playwright smoke tests
working-directory: web
run: pnpm test:e2e

View File

@@ -29,7 +29,7 @@ jobs:
run: uv sync --dev
- name: Run ruff check
run: uv run ruff check src
run: uv run ruff check src/langbot/ tests/ --output-format=concise
- name: Run ruff format
run: uv run ruff format src --check

View File

@@ -84,6 +84,67 @@ jobs:
echo "" >> $GITHUB_STEP_SUMMARY
echo "Test Status: ${{ job.status }}" >> $GITHUB_STEP_SUMMARY
e2e:
name: E2E Startup Tests
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Install uv
uses: astral-sh/setup-uv@v4
- name: Install dependencies
run: uv sync --dev
- name: Run E2E startup tests
run: uv run pytest tests/e2e -q --tb=short
- name: E2E Test Summary
if: always()
run: |
echo "## E2E Startup Test Results" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "Test Status: ${{ job.status }}" >> $GITHUB_STEP_SUMMARY
box-integration:
name: Box Integration Tests
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Install uv
uses: astral-sh/setup-uv@v4
- name: Install dependencies
run: uv sync --dev
- name: Check Docker runtime
run: docker info
- name: Run Box integration tests
run: uv run pytest tests/integration_tests -q --tb=short
- name: Box Integration Test Summary
if: always()
run: |
echo "## Box Integration Test Results" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "Test Status: ${{ job.status }}" >> $GITHUB_STEP_SUMMARY
coverage:
name: Coverage Gate
runs-on: ubuntu-latest
@@ -129,4 +190,4 @@ jobs:
echo "## Coverage Results" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "Threshold: 18%" >> $GITHUB_STEP_SUMMARY
echo "Status: ${{ job.status }}" >> $GITHUB_STEP_SUMMARY
echo "Status: ${{ job.status }}" >> $GITHUB_STEP_SUMMARY

View File

@@ -146,19 +146,13 @@ def wrap_python_command_with_env(command: str, *, mount_path: str = '/workspace'
_LB_PIP_CACHE_DIR="{mount_path}/.cache/pip"
mkdir -p "$_LB_META_DIR" "$_LB_TMP_DIR" "$_LB_PIP_CACHE_DIR"
_LB_SYSTEM_PYTHON="$(command -v python3 || command -v python || true)"
if [ -z "$_LB_SYSTEM_PYTHON" ]; then
echo "python3 or python is required to prepare the workspace Python environment" >&2
exit 127
fi
export TMPDIR="$_LB_TMP_DIR"
export TEMP="$_LB_TMP_DIR"
export TMP="$_LB_TMP_DIR"
export PIP_CACHE_DIR="$_LB_PIP_CACHE_DIR"
_lb_python_meta() {{
"$_LB_SYSTEM_PYTHON" - <<'PY'
python - <<'PY'
import hashlib
import json
import os
@@ -207,26 +201,15 @@ def wrap_python_command_with_env(command: str, *, mount_path: str = '/workspace'
_LB_LOCK_WAIT=0
while ! mkdir "$_LB_LOCK_DIR" 2>/dev/null; do
if [ "$_LB_LOCK_WAIT" -ge 120 ]; then
_LB_LOCK_OWNER="$(cat "$_LB_LOCK_DIR/pid" 2>/dev/null || true)"
if [ -n "$_LB_LOCK_OWNER" ] && kill -0 "$_LB_LOCK_OWNER" 2>/dev/null; then
echo "Timed out waiting for active Python environment lock: $_LB_LOCK_DIR" >&2
exit 1
fi
echo "Timed out waiting for Python environment lock, clearing stale lock: $_LB_LOCK_DIR" >&2
rm -rf "$_LB_LOCK_DIR" 2>/dev/null || true
if mkdir "$_LB_LOCK_DIR" 2>/dev/null; then
break
fi
echo "Timed out waiting for Python environment lock: $_LB_LOCK_DIR" >&2
exit 1
fi
sleep 1
_LB_LOCK_WAIT=$((_LB_LOCK_WAIT + 1))
done
printf '%s\\n' "$$" > "$_LB_LOCK_DIR/pid" 2>/dev/null || true
_lb_cleanup_lock() {{
rm -rf "$_LB_LOCK_DIR" >/dev/null 2>&1 || true
rmdir "$_LB_LOCK_DIR" >/dev/null 2>&1 || true
}}
trap _lb_cleanup_lock EXIT INT TERM
@@ -242,7 +225,7 @@ def wrap_python_command_with_env(command: str, *, mount_path: str = '/workspace'
if [ "$_LB_NEEDS_BOOTSTRAP" -eq 1 ]; then
rm -rf "$_LB_VENV_DIR"
"$_LB_SYSTEM_PYTHON" -m venv "$_LB_VENV_DIR"
python -m venv "$_LB_VENV_DIR"
. "$_LB_VENV_DIR/bin/activate"
python -m pip install --upgrade pip setuptools wheel
if [ -f "{mount_path}/requirements.txt" ]; then

View File

@@ -1,18 +0,0 @@
from __future__ import annotations
from typing import Any
async def is_box_backend_available(ap: Any) -> bool:
"""Return whether the configured Box backend is ready for tool execution."""
box_service = getattr(ap, 'box_service', None)
if box_service is None:
return False
if not getattr(box_service, 'available', False):
return False
try:
status = await box_service.get_status()
backend_info = status.get('backend', {})
return bool(backend_info.get('available', False))
except Exception:
return False

View File

@@ -5,8 +5,6 @@ import asyncio
import os
import shutil
import shlex
import threading
from contextlib import suppress
from typing import TYPE_CHECKING, Any
import pydantic
@@ -20,26 +18,12 @@ from ....box.workspace import (
rewrite_mounted_path,
rewrite_venv_command,
unwrap_venv_path,
wrap_python_command_with_env,
)
if TYPE_CHECKING:
from .mcp import RuntimeMCPSession
_WORKSPACE_COPY_LOCKS: dict[str, threading.Lock] = {}
_WORKSPACE_COPY_LOCKS_GUARD = threading.Lock()
def _workspace_copy_lock(path: str) -> threading.Lock:
with _WORKSPACE_COPY_LOCKS_GUARD:
lock = _WORKSPACE_COPY_LOCKS.get(path)
if lock is None:
lock = threading.Lock()
_WORKSPACE_COPY_LOCKS[path] = lock
return lock
class MCPSessionErrorPhase(enum.Enum):
"""Which phase of the MCP lifecycle failed."""
@@ -65,7 +49,7 @@ class MCPServerBoxConfig(pydantic.BaseModel):
host_path: str | None = None
host_path_mode: str = 'ro' # MCP servers default to read-write mount only when explicitly requested
env: dict[str, str] = pydantic.Field(default_factory=dict)
startup_timeout_sec: int = 300 # First Docker bootstrap may need to build a venv and install MCP deps.
startup_timeout_sec: int = 120 # Longer default to allow dependency bootstrap
cpus: float | None = None
memory_mb: int | None = None
pids_limit: int | None = None
@@ -144,7 +128,6 @@ class BoxStdioSessionRuntime:
workspace = self._build_workspace(host_path=None)
host_path = self.resolve_host_path()
process_cwd = '/workspace'
install_cmd: str | None = None
try:
await workspace.create_session()
@@ -185,8 +168,6 @@ class BoxStdioSessionRuntime:
env=self.server_config.get('env', {}),
cwd=process_cwd,
)
if install_cmd:
payload = self._wrap_process_payload_with_python_env(payload, process_cwd)
payload['process_id'] = self.process_id
await workspace.box_service.start_managed_process(workspace.session_id, payload)
except Exception:
@@ -272,42 +253,14 @@ class BoxStdioSessionRuntime:
@staticmethod
def _copy_workspace_tree(source_path: str, process_host_root: str, process_host_workspace: str) -> None:
# Docker-backed bootstrap writes root-owned runtime directories such as
# .venv/.tmp into the staged workspace. The host process may not be able
# to delete them, so refresh source files in place and preserve runtime
# directories instead of rmtree'ing the whole staging root.
with _workspace_copy_lock(process_host_root):
preserved_names = {'.venv', 'venv', 'env', '.cache', '.tmp', '.langbot'}
os.makedirs(process_host_workspace, exist_ok=True)
for name in os.listdir(process_host_workspace):
if name in preserved_names:
continue
path = os.path.join(process_host_workspace, name)
if os.path.isdir(path) and not os.path.islink(path):
shutil.rmtree(path, ignore_errors=True)
else:
# The entry may disappear between listdir and unlink if cleanup races us.
with suppress(FileNotFoundError):
os.unlink(path)
shutil.copytree(
source_path,
process_host_workspace,
symlinks=True,
dirs_exist_ok=True,
ignore=shutil.ignore_patterns(
'.git',
'__pycache__',
'.pytest_cache',
'.mypy_cache',
'.ruff_cache',
'.venv',
'venv',
'env',
'.cache',
'.tmp',
'.langbot',
),
)
shutil.rmtree(process_host_root, ignore_errors=True)
os.makedirs(process_host_root, exist_ok=True)
shutil.copytree(
source_path,
process_host_workspace,
symlinks=True,
ignore=shutil.ignore_patterns('.git', '__pycache__', '.pytest_cache', '.mypy_cache', '.ruff_cache'),
)
async def _cleanup_staged_workspace(self) -> None:
if not self.resolve_host_path():
@@ -390,25 +343,23 @@ class BoxStdioSessionRuntime:
@staticmethod
def detect_install_command(host_path: str, workspace_path: str = '/workspace') -> str | None:
workspace_kind = classify_python_workspace(host_path)
if workspace_kind in {'package', 'requirements'}:
return wrap_python_command_with_env('python -c "pass"', mount_path=workspace_path).rstrip()
quoted_workspace_path = shlex.quote(workspace_path)
if workspace_kind == 'package':
return (
'mkdir -p /opt/_lb_src'
f' && tar -C {quoted_workspace_path}'
' --exclude=.venv --exclude=.git --exclude=__pycache__'
' --exclude=node_modules --exclude=.tox --exclude=.nox'
' --exclude="*.egg-info" --exclude=.uv-cache'
' -cf - .'
' | tar -C /opt/_lb_src -xf -'
' && pip install --no-cache-dir /opt/_lb_src'
' && rm -rf /opt/_lb_src'
)
if workspace_kind == 'requirements':
return f'pip install --no-cache-dir -r {quoted_workspace_path}/requirements.txt'
return None
@staticmethod
def _wrap_process_payload_with_python_env(payload: dict[str, Any], workspace_path: str) -> dict[str, Any]:
"""Start a prepared Python workspace without writing bootstrap output to MCP stdio."""
workspace_root = workspace_path.rstrip('/') or '/workspace'
venv_dir = f'{workspace_root}/.venv'
venv_bin = f'{venv_dir}/bin'
command = ' '.join([shlex.quote(payload['command']), *[shlex.quote(arg) for arg in payload.get('args', [])]])
wrapped = dict(payload)
wrapped['command'] = 'sh'
wrapped['args'] = [
'-lc',
(f'export VIRTUAL_ENV={shlex.quote(venv_dir)}; export PATH={shlex.quote(venv_bin)}:$PATH; exec {command}'),
]
return wrapped
def build_box_session_payload(self, session_id: str, host_path: str | None = None) -> dict[str, Any]:
workspace = self._build_workspace()
workspace.session_id = session_id

View File

@@ -8,7 +8,6 @@ from langbot_plugin.api.entities.events import pipeline_query
from .. import loader
from ..errors import ToolNotFoundError
from .availability import is_box_backend_available
from . import skill as skill_loader
EXEC_TOOL_NAME = 'exec'
@@ -23,15 +22,6 @@ _ALL_TOOL_NAMES = {EXEC_TOOL_NAME, READ_TOOL_NAME, WRITE_TOOL_NAME, EDIT_TOOL_NA
# Skip these dirs during grep walk to avoid noise
_SKIP_DIRS = {'.git', 'node_modules', '__pycache__', '.venv', 'venv', '.tox', 'dist', 'build'}
_DEFAULT_READ_MAX_LINES = 2000
_MAX_READ_MAX_LINES = 10000
_DEFAULT_TOOL_RESULT_MAX_BYTES = 50 * 1024
_BOX_FILE_SCRIPT_MAX_BYTES = 2048
_GLOB_MAX_MATCHES = 100
_GREP_MAX_MATCHES = 200
_GREP_MAX_FILES = 5000
_GREP_MAX_LINE_CHARS = 500
class NativeToolLoader(loader.ToolLoader):
def __init__(self, ap):
@@ -53,7 +43,18 @@ class NativeToolLoader(loader.ToolLoader):
async def _check_backend_available(self) -> bool:
"""Check if the box backend is truly available (not just the runtime)."""
return await is_box_backend_available(self.ap)
box_service = getattr(self.ap, 'box_service', None)
if box_service is None:
return False
if not getattr(box_service, 'available', False):
return False
# Check if backend is truly available via get_status
try:
status = await box_service.get_status()
backend_info = status.get('backend', {})
return backend_info.get('available', False)
except Exception:
return False
async def get_tools(self, bound_plugins: list[str] | None = None) -> list[resource_tool.LLMTool]:
if not self._is_sandbox_available():
@@ -138,7 +139,6 @@ class NativeToolLoader(loader.ToolLoader):
# via execute_tool. Skills are mounted at /workspace/.skills/{name}/
# via extra_mounts built by BoxService.
result = await self.ap.box_service.execute_tool(parameters, query)
result = self._normalize_exec_result(result)
if selected_skill is not None:
self._refresh_skill_from_disk(selected_skill)
@@ -227,65 +227,19 @@ class NativeToolLoader(loader.ToolLoader):
except Exception:
return {'ok': False, 'error': stdout or 'Box file operation returned no result'}
async def _read_workspace_via_box(self, path: str, parameters: dict, query: pipeline_query.Query) -> dict:
offset = self._positive_int(parameters.get('offset'), default=1)
max_lines = self._positive_int(
parameters.get('limit'),
default=_DEFAULT_READ_MAX_LINES,
max_value=_MAX_READ_MAX_LINES,
)
# Box file fallback returns through exec stdout, which is already capped
# by BoxService. Keep this payload small enough to remain valid JSON.
max_bytes = min(
self._positive_int(parameters.get('max_bytes'), default=_DEFAULT_TOOL_RESULT_MAX_BYTES),
_BOX_FILE_SCRIPT_MAX_BYTES,
)
async def _read_workspace_via_box(self, path: str, query: pipeline_query.Query) -> dict:
script = f"""
import json, os
path = {json.dumps(path)}
offset = {offset}
max_lines = {max_lines}
max_bytes = {max_bytes}
if not path.startswith('/workspace'):
print(json.dumps({{'ok': False, 'error': 'Path must be under /workspace.'}}))
elif not os.path.exists(path):
print(json.dumps({{'ok': False, 'error': f'File not found: {{path}}'}}))
elif os.path.isdir(path):
entries = sorted(os.listdir(path))
content = '\\n'.join(entries)
print(json.dumps({{'ok': True, 'content': content, 'is_directory': True, 'total': len(entries), 'truncated': False}}))
print(json.dumps({{'ok': True, 'content': '\\n'.join(sorted(os.listdir(path))), 'is_directory': True}}))
else:
lines = []
output_bytes = 0
end_line = offset - 1
truncated = False
next_offset = None
with open(path, 'r', encoding='utf-8', errors='replace') as f:
for line_number, line in enumerate(f, 1):
if line_number < offset:
continue
if len(lines) >= max_lines:
truncated = True
next_offset = line_number
break
line_bytes = len(line.encode('utf-8'))
if output_bytes + line_bytes > max_bytes:
truncated = True
next_offset = line_number
break
lines.append(line.rstrip('\\n'))
output_bytes += line_bytes
end_line = line_number
print(json.dumps({{
'ok': True,
'content': '\\n'.join(lines),
'truncated': truncated,
'start_line': offset,
'end_line': end_line,
'next_offset': next_offset,
'max_lines': max_lines,
'max_bytes': max_bytes,
}}))
print(json.dumps({{'ok': True, 'content': f.read()}}))
""".strip()
return await self._run_workspace_file_script(script, query)
@@ -353,27 +307,12 @@ else:
if not any(part in skip_dirs for part in item.parts)
]
hits.sort(key=lambda item: item.stat().st_mtime if item.exists() else 0, reverse=True)
shown = hits[:{_GLOB_MAX_MATCHES}]
shown = hits[:100]
matches = []
output_bytes = 0
truncated_by_bytes = False
for item in shown:
rel = os.path.relpath(str(item), path)
sandbox_path = os.path.join(path, rel).replace(os.sep, '/')
entry_bytes = len(sandbox_path.encode('utf-8')) + (1 if matches else 0)
if output_bytes + entry_bytes > {_DEFAULT_TOOL_RESULT_MAX_BYTES}:
truncated_by_bytes = True
break
matches.append(sandbox_path)
output_bytes += entry_bytes
print(json.dumps({{
'ok': True,
'matches': matches,
'preview': '\\n'.join(matches),
'total': len(hits),
'truncated': len(hits) > len(matches) or truncated_by_bytes,
'truncated_by': 'bytes' if truncated_by_bytes else ('matches' if len(hits) > len(matches) else None),
}}))
matches.append(os.path.join(path, rel).replace(os.sep, '/'))
print(json.dumps({{'ok': True, 'matches': matches, 'total': len(hits), 'truncated': len(hits) > 100}}))
""".strip()
return await self._run_workspace_file_script(script, query)
@@ -411,54 +350,29 @@ else:
continue
if item.is_file():
files.append(item)
if len(files) >= {_GREP_MAX_FILES}:
if len(files) >= 5000:
break
matches = []
output_bytes = 0
truncated_by = None
for fp in files:
try:
handle = fp.open('r', encoding='utf-8', errors='ignore')
text = fp.read_text(errors='ignore')
except OSError:
continue
with handle:
for lineno, line in enumerate(handle, 1):
if regex.search(line):
if base.is_file():
file_path = path
else:
rel = os.path.relpath(str(fp), path)
file_path = os.path.join(path, rel).replace(os.sep, '/')
content = line.rstrip()
line_truncated = False
if len(content) > {_GREP_MAX_LINE_CHARS}:
content = content[:{_GREP_MAX_LINE_CHARS}] + '... [truncated]'
line_truncated = True
entry = {{'file': file_path, 'line': lineno, 'content': content}}
entry_bytes = len(json.dumps(entry, ensure_ascii=False).encode('utf-8')) + 1
if output_bytes + entry_bytes > {_DEFAULT_TOOL_RESULT_MAX_BYTES}:
truncated_by = 'bytes'
break
if line_truncated and truncated_by is None:
truncated_by = 'line'
matches.append(entry)
output_bytes += entry_bytes
if len(matches) >= {_GREP_MAX_MATCHES}:
truncated_by = truncated_by or 'matches'
break
if truncated_by == 'bytes' or len(matches) >= {_GREP_MAX_MATCHES}:
break
if truncated_by == 'bytes' or len(matches) >= {_GREP_MAX_MATCHES}:
for lineno, line in enumerate(text.splitlines(), 1):
if regex.search(line):
if base.is_file():
file_path = path
else:
rel = os.path.relpath(str(fp), path)
file_path = os.path.join(path, rel).replace(os.sep, '/')
matches.append({{'file': file_path, 'line': lineno, 'content': line.rstrip()}})
if len(matches) >= 200:
break
if len(matches) >= 200:
break
print(json.dumps({{
'ok': True,
'matches': matches,
'total': len(matches),
'truncated': truncated_by is not None,
'truncated_by': truncated_by,
}}))
print(json.dumps({{'ok': True, 'matches': matches, 'total': len(matches), 'truncated': len(matches) >= 200}}))
""".strip()
return await self._run_workspace_file_script(script, query)
@@ -473,20 +387,14 @@ else:
)
if skill_request is not None and hasattr(self.ap.box_service, 'read_skill_file'):
selected_skill, relative = skill_request
host_path = self._resolve_skill_host_path(selected_skill, relative)
if host_path and os.path.exists(host_path):
if os.path.isdir(host_path):
return self._build_directory_result(os.listdir(host_path))
return self._read_text_file_preview(host_path, parameters)
try:
result = await self.ap.box_service.read_skill_file(selected_skill['name'], relative)
return self._build_read_result_from_text(str(result.get('content', '')), parameters)
return {'ok': True, 'content': result.get('content', '')}
except Exception:
try:
result = await self.ap.box_service.list_skill_files(selected_skill['name'], relative)
entries = [entry['name'] for entry in result.get('entries', [])]
return self._build_directory_result(entries)
return {'ok': True, 'content': '\n'.join(sorted(entries)), 'is_directory': True}
except Exception as exc:
return {'ok': False, 'error': str(exc)}
@@ -497,13 +405,15 @@ else:
include_activated=True,
)
if self._should_use_box_workspace_files(selected_skill):
return await self._read_workspace_via_box(path, parameters, query)
return await self._read_workspace_via_box(path, query)
if not os.path.exists(host_path):
return {'ok': False, 'error': f'File not found: {path}'}
if os.path.isdir(host_path):
entries = os.listdir(host_path)
return self._build_directory_result(entries)
return self._read_text_file_preview(host_path, parameters)
return {'ok': True, 'content': '\n'.join(sorted(entries)), 'is_directory': True}
with open(host_path, 'r', errors='replace') as f:
content = f.read()
return {'ok': True, 'content': content}
async def _invoke_write(self, parameters: dict, query: pipeline_query.Query) -> dict:
path = parameters['path']
@@ -674,28 +584,6 @@ else:
'type': 'string',
'description': 'Absolute path to the file (must be under /workspace).',
},
'offset': {
'type': 'integer',
'description': '1-indexed line number to start reading from. Defaults to 1.',
'default': 1,
'minimum': 1,
},
'limit': {
'type': 'integer',
'description': f'Maximum number of lines to return. Defaults to {_DEFAULT_READ_MAX_LINES}.',
'default': _DEFAULT_READ_MAX_LINES,
'minimum': 1,
'maximum': _MAX_READ_MAX_LINES,
},
'max_bytes': {
'type': 'integer',
'description': (
f'Maximum bytes of file content to return. Defaults to {_DEFAULT_TOOL_RESULT_MAX_BYTES}.'
),
'default': _DEFAULT_TOOL_RESULT_MAX_BYTES,
'minimum': 1,
'maximum': _DEFAULT_TOOL_RESULT_MAX_BYTES,
},
},
'required': ['path'],
'additionalProperties': False,
@@ -852,30 +740,22 @@ else:
hits.sort(key=lambda p: p.stat().st_mtime if p.exists() else 0, reverse=True)
total = len(hits)
shown = hits[:_GLOB_MAX_MATCHES]
shown = hits[:100]
# Convert back to sandbox paths
sandbox_paths = []
output_bytes = 0
truncated_by_bytes = False
for h in shown:
rel = os.path.relpath(str(h), host_path)
sandbox_path = os.path.join(path, rel)
entry_bytes = len(sandbox_path.encode('utf-8')) + (1 if sandbox_paths else 0)
if output_bytes + entry_bytes > _DEFAULT_TOOL_RESULT_MAX_BYTES:
truncated_by_bytes = True
break
sandbox_paths.append(sandbox_path)
output_bytes += entry_bytes
return {
'ok': True,
'matches': sandbox_paths,
'preview': '\n'.join(sandbox_paths),
'total': total,
'truncated': total > len(sandbox_paths) or truncated_by_bytes,
'truncated_by': 'bytes' if truncated_by_bytes else ('matches' if total > len(sandbox_paths) else None),
}
result_lines = sandbox_paths
result = '\n'.join(result_lines)
if total > 100:
result += f'\n... ({total} matches, showing first 100)'
return {'ok': True, 'matches': result_lines, 'total': total, 'truncated': total > 100}
async def _invoke_grep(self, parameters: dict, query: pipeline_query.Query) -> dict:
pattern = parameters['pattern']
@@ -911,46 +791,32 @@ else:
files = self._grep_walk(base, include)
matches = []
output_bytes = 0
truncated_by = None
for fp in files:
try:
handle = fp.open('r', encoding='utf-8', errors='ignore')
text = fp.read_text(errors='ignore')
except OSError:
continue
with handle:
for lineno, line in enumerate(handle, 1):
if regex.search(line):
rel = os.path.relpath(str(fp), host_path)
sandbox_path = os.path.join(path, rel)
content, line_truncated = self._truncate_grep_line(line.rstrip())
entry = {
for lineno, line in enumerate(text.splitlines(), 1):
if regex.search(line):
rel = os.path.relpath(str(fp), host_path)
sandbox_path = os.path.join(path, rel)
matches.append(
{
'file': sandbox_path,
'line': lineno,
'content': content,
'content': line.rstrip(),
}
entry_bytes = len(json.dumps(entry, ensure_ascii=False).encode('utf-8')) + 1
if output_bytes + entry_bytes > _DEFAULT_TOOL_RESULT_MAX_BYTES:
truncated_by = 'bytes'
break
if line_truncated and truncated_by is None:
truncated_by = 'line'
matches.append(entry)
output_bytes += entry_bytes
if len(matches) >= _GREP_MAX_MATCHES:
truncated_by = truncated_by or 'matches'
break
if truncated_by == 'bytes' or len(matches) >= _GREP_MAX_MATCHES:
break
if truncated_by == 'bytes' or len(matches) >= _GREP_MAX_MATCHES:
)
if len(matches) >= 200:
break
if len(matches) >= 200:
break
return {
'ok': True,
'matches': matches,
'total': len(matches),
'truncated': truncated_by is not None,
'truncated_by': truncated_by,
'truncated': len(matches) >= 200,
}
@staticmethod
@@ -962,207 +828,10 @@ else:
continue
if item.is_file():
results.append(item)
if len(results) >= _GREP_MAX_FILES:
if len(results) >= 5000:
break
return results
@staticmethod
def _resolve_skill_host_path(selected_skill: dict, relative: str) -> str | None:
package_root = str(selected_skill.get('package_root', '') or '').strip()
if not package_root:
return None
host_root = os.path.realpath(package_root)
host_path = os.path.realpath(os.path.join(host_root, relative))
if not (host_path == host_root or host_path.startswith(host_root + os.sep)):
raise ValueError('Path escapes the skill package boundary.')
return host_path
def _normalize_exec_result(self, result: dict) -> dict:
normalized = dict(result)
stdout = str(normalized.get('stdout') or '')
stderr = str(normalized.get('stderr') or '')
stdout, stdout_capped = self._truncate_text_to_bytes_with_flag(stdout, _DEFAULT_TOOL_RESULT_MAX_BYTES)
stderr, stderr_capped = self._truncate_text_to_bytes_with_flag(stderr, _DEFAULT_TOOL_RESULT_MAX_BYTES)
normalized['stdout'] = stdout
normalized['stderr'] = stderr
normalized['stdout_truncated'] = bool(normalized.get('stdout_truncated') or stdout_capped)
normalized['stderr_truncated'] = bool(normalized.get('stderr_truncated') or stderr_capped)
if stdout and stderr:
preview_raw = f'stdout:\n{stdout}\n\nstderr:\n{stderr}'
else:
preview_raw = stdout or stderr
preview, preview_capped = self._truncate_text_to_bytes_with_flag(preview_raw, _DEFAULT_TOOL_RESULT_MAX_BYTES)
normalized['preview'] = preview
normalized['truncated'] = bool(
normalized['stdout_truncated'] or normalized['stderr_truncated'] or preview_capped
)
if preview_capped and not normalized.get('truncated_by'):
normalized['truncated_by'] = 'bytes'
return normalized
def _build_directory_result(self, entries: list[str]) -> dict:
sorted_entries = sorted(str(entry) for entry in entries)
content = '\n'.join(sorted_entries)
preview = self._truncate_text_to_bytes(content, _DEFAULT_TOOL_RESULT_MAX_BYTES)
truncated = preview != content
return {
'ok': True,
'content': preview,
'is_directory': True,
'total': len(sorted_entries),
'truncated': truncated,
'truncated_by': 'bytes' if truncated else None,
}
def _read_text_file_preview(self, host_path: str, parameters: dict) -> dict:
offset = self._positive_int(parameters.get('offset'), default=1)
max_lines = self._positive_int(
parameters.get('limit'),
default=_DEFAULT_READ_MAX_LINES,
max_value=_MAX_READ_MAX_LINES,
)
max_bytes = self._positive_int(
parameters.get('max_bytes'),
default=_DEFAULT_TOOL_RESULT_MAX_BYTES,
max_value=_DEFAULT_TOOL_RESULT_MAX_BYTES,
)
lines: list[str] = []
output_bytes = 0
end_line = offset - 1
truncated = False
truncated_by: str | None = None
next_offset: int | None = None
with open(host_path, 'r', encoding='utf-8', errors='replace') as f:
for line_number, line in enumerate(f, 1):
if line_number < offset:
continue
if len(lines) >= max_lines:
truncated = True
truncated_by = 'lines'
next_offset = line_number
break
line_bytes = len(line.encode('utf-8'))
if output_bytes + line_bytes > max_bytes:
truncated = True
truncated_by = 'bytes'
next_offset = line_number
break
lines.append(line.rstrip('\n'))
output_bytes += line_bytes
end_line = line_number
if not lines and truncated_by == 'bytes':
content = (
f'[Line {next_offset or offset} exceeds the {self._format_size(max_bytes)} read limit. '
'Use exec with a byte-range command for this line, or read a different offset.]'
)
else:
content = '\n'.join(lines)
return {
'ok': True,
'content': content,
'truncated': truncated,
'truncated_by': truncated_by,
'start_line': offset,
'end_line': end_line,
'next_offset': next_offset,
'max_lines': max_lines,
'max_bytes': max_bytes,
}
def _build_read_result_from_text(self, content: str, parameters: dict) -> dict:
offset = self._positive_int(parameters.get('offset'), default=1)
max_lines = self._positive_int(
parameters.get('limit'),
default=_DEFAULT_READ_MAX_LINES,
max_value=_MAX_READ_MAX_LINES,
)
max_bytes = self._positive_int(
parameters.get('max_bytes'),
default=_DEFAULT_TOOL_RESULT_MAX_BYTES,
max_value=_DEFAULT_TOOL_RESULT_MAX_BYTES,
)
all_lines = content.splitlines()
start_index = offset - 1
if start_index >= len(all_lines) and all_lines:
return {'ok': False, 'error': f'Offset {offset} is beyond end of file ({len(all_lines)} lines total)'}
output_lines: list[str] = []
output_bytes = 0
truncated = False
truncated_by: str | None = None
next_offset: int | None = None
for index, line in enumerate(all_lines[start_index:], start_index + 1):
if len(output_lines) >= max_lines:
truncated = True
truncated_by = 'lines'
next_offset = index
break
line_bytes = len(line.encode('utf-8')) + (1 if output_lines else 0)
if output_bytes + line_bytes > max_bytes:
truncated = True
truncated_by = 'bytes'
next_offset = index
break
output_lines.append(line)
output_bytes += line_bytes
end_line = offset + len(output_lines) - 1
return {
'ok': True,
'content': '\n'.join(output_lines),
'truncated': truncated,
'truncated_by': truncated_by,
'start_line': offset,
'end_line': end_line,
'next_offset': next_offset,
'max_lines': max_lines,
'max_bytes': max_bytes,
}
@staticmethod
def _positive_int(value, *, default: int, max_value: int | None = None) -> int:
try:
parsed = int(value)
except (TypeError, ValueError):
parsed = default
if parsed <= 0:
parsed = default
if max_value is not None:
parsed = min(parsed, max_value)
return parsed
@staticmethod
def _truncate_grep_line(line: str) -> tuple[str, bool]:
if len(line) <= _GREP_MAX_LINE_CHARS:
return line, False
return f'{line[:_GREP_MAX_LINE_CHARS]}... [truncated]', True
@staticmethod
def _truncate_text_to_bytes(text: str, max_bytes: int) -> str:
return NativeToolLoader._truncate_text_to_bytes_with_flag(text, max_bytes)[0]
@staticmethod
def _truncate_text_to_bytes_with_flag(text: str, max_bytes: int) -> tuple[str, bool]:
data = text.encode('utf-8')
if len(data) <= max_bytes:
return text, False
truncated = data[:max_bytes]
while truncated and (truncated[-1] & 0xC0) == 0x80:
truncated = truncated[:-1]
return truncated.decode('utf-8', errors='ignore'), True
@staticmethod
def _format_size(bytes_count: int) -> str:
if bytes_count < 1024:
return f'{bytes_count}B'
return f'{bytes_count / 1024:.1f}KB'
def _summarize_parameters(self, parameters: dict) -> dict:
summary = dict(parameters)
cmd = str(summary.get('command', '')).strip()

View File

@@ -72,45 +72,6 @@ def register_activated_skill(query: pipeline_query.Query, skill_data: dict) -> N
activated[skill_name] = skill_data
def normalize_skill_names(value: typing.Any) -> list[str]:
"""Return a de-duplicated list of non-empty skill names."""
if not isinstance(value, list):
return []
names: list[str] = []
for item in value:
skill_name = str(item or '').strip()
if skill_name and skill_name not in names:
names.append(skill_name)
return names
def get_activated_skill_names(query: pipeline_query.Query) -> list[str]:
"""Return activated skill names for callers that own persistence policy."""
return normalize_skill_names(list(get_activated_skills(query).keys()))
def restore_activated_skills(
ap: app.Application,
query: pipeline_query.Query,
skill_names: typing.Any,
) -> list[str]:
"""Restore caller-provided activated skill names into Query variables.
Persistence and state scope ownership belong to higher-level flows. This
helper only rebuilds current Query state from pipeline-visible skills, so
removed or unbound skills stay unavailable to native exec/write/edit.
"""
restored: list[str] = []
for skill_name in normalize_skill_names(skill_names):
skill_data = get_visible_skill(ap, query, skill_name)
if skill_data is None:
continue
register_activated_skill(query, skill_data)
restored.append(skill_name)
return restored
def parse_skill_mount_path(sandbox_path: str) -> tuple[str | None, str]:
normalized_path = str(sandbox_path or '/workspace').strip() or '/workspace'
if normalized_path == SKILL_MOUNT_PREFIX:

View File

@@ -6,7 +6,6 @@ import typing
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
from .. import loader
from .availability import is_box_backend_available
# Align with Claude Code's Skill tool design:
# - activate: Activate a skill via Tool Call, returns SKILL.md content
@@ -46,7 +45,18 @@ class SkillToolLoader(loader.ToolLoader):
async def _check_sandbox_available(self) -> bool:
"""Check if the box backend is truly available (not just the runtime)."""
return await is_box_backend_available(self.ap)
box_service = getattr(self.ap, 'box_service', None)
if box_service is None:
return False
if not getattr(box_service, 'available', False):
return False
# Check if backend is truly available via get_status
try:
status = await box_service.get_status()
backend_info = status.get('backend', {})
return backend_info.get('available', False)
except Exception:
return False
async def get_tools(self, bound_plugins: list[str] | None = None) -> list[resource_tool.LLMTool]:
if not self._is_available():
@@ -82,15 +92,16 @@ class SkillToolLoader(loader.ToolLoader):
if not skill_name:
raise ValueError('skill_name is required')
from . import skill as skill_loader
skill_data = skill_loader.get_visible_skill(self.ap, query, skill_name)
skill_mgr = self.ap.skill_mgr
skill_data = skill_mgr.get_skill_by_name(skill_name)
if skill_data is None:
visible_skills = skill_loader.get_visible_skills(self.ap, query)
visible_skills = getattr(skill_mgr, 'skills', {})
available_names = ', '.join(sorted(visible_skills.keys())) or 'none'
raise ValueError(f'Skill "{skill_name}" not found. Available skills: {available_names}')
# Register activated skill for sandbox mount path resolution
from . import skill as skill_loader
skill_loader.register_activated_skill(query, skill_data)
# Return SKILL.md content as Tool Result (injects into context)
@@ -116,7 +127,6 @@ class SkillToolLoader(loader.ToolLoader):
'activated': True,
'skill_name': skill_name,
'mount_path': mount_path,
'activated_skill_names': skill_loader.get_activated_skill_names(query),
'content': result_content,
}
@@ -191,13 +201,13 @@ class SkillToolLoader(loader.ToolLoader):
return resource_tool.LLMTool(
name=ACTIVATE_SKILL_TOOL_NAME,
human_desc='Activate a skill',
description='Activate a pipeline-visible skill by name and return its instructions as a tool result.',
description=self._build_activate_tool_description(),
parameters={
'type': 'object',
'properties': {
'skill_name': {
'type': 'string',
'description': 'The skill name to activate.',
'description': 'The skill name to activate (no arguments). E.g., "pdf" or "data-analysis"',
},
},
'required': ['skill_name'],
@@ -245,3 +255,50 @@ class SkillToolLoader(loader.ToolLoader):
},
func=lambda parameters: parameters,
)
def _build_activate_tool_description(self) -> str:
"""Build tool description with embedded available_skills list."""
skill_mgr = getattr(self.ap, 'skill_mgr', None)
if skill_mgr is None:
return 'Activate a skill. No skills are currently available.'
skills = getattr(skill_mgr, 'skills', {})
if not skills:
return 'Activate a skill. No skills are currently available.'
# Build <available_skills> section
available_skills_lines = ['<available_skills>']
for skill_name, skill_data in sorted(skills.items()):
description = skill_data.get('description', '')
available_skills_lines.append('<skill>')
available_skills_lines.append(f'<name>{skill_name}</name>')
available_skills_lines.append(f'<description>{description}</description>')
available_skills_lines.append('</skill>')
available_skills_lines.append('</available_skills>')
available_skills_block = '\n'.join(available_skills_lines)
return f"""Activate a skill within the main conversation.
<skills_instructions>
When users ask you to perform tasks, check if any of the available skills
below can help complete the task more effectively. Skills provide specialized
capabilities and domain knowledge.
How to use skills:
- Invoke skills using this tool with the skill name only (no arguments)
- When you invoke a skill, you will see <command-message>
The skill is activated
</command-message>
- The skill's instructions will be provided in the tool result
- Examples:
- skill_name: "pdf" - invoke the pdf skill
- skill_name: "data-analysis" - invoke the data-analysis skill
Important:
- Only use skills listed in <available_skills> below
- Do not invoke a skill that is already running
- To create a new skill: prepare it in /workspace, then use register_skill tool
</skills_instructions>
{available_skills_block}"""

View File

@@ -1,6 +1,7 @@
# LangBot Test Suite
This directory contains the test suite for LangBot, with a focus on comprehensive unit testing of pipeline stages.
This directory contains the LangBot backend test suite, including unit tests,
integration tests, startup E2E tests, and container-backed Box runtime tests.
## Quality Gate Layers
@@ -10,10 +11,15 @@ LangBot uses a layered quality gate system for developers and CI:
|-------|---------|--------------|-------------|
| **Quick** | `make test-quick` or `bash scripts/test-quick.sh` | Ruff lint + Unit tests + Smoke tests | Before every commit |
| **Fast Integration** | `make test-integration-fast` or `bash scripts/test-integration-fast.sh` | SQLite/API/Pipeline integration (no external services) | Before PR, weekly |
| **Backend E2E** | `uv run --python 3.12 pytest tests/e2e -q --tb=short` | Starts a real LangBot process with minimal config | Before release, CI |
| **Box Integration** | `uv run --python 3.12 pytest tests/integration_tests -q --tb=short` | Real Box sandbox/runtime integration | Before Box/runtime changes, CI |
| **Frontend E2E** | `cd web && pnpm test:e2e` | Playwright smoke tests with mocked backend and Space APIs | Before web changes, CI |
| **Coverage Gate** | `make test-coverage` or `bash scripts/test-coverage.sh` | All tests with coverage, threshold: 18% | Before merge, CI |
| **Full Local** | `make test-all-local` | Quick + Integration + Coverage | Before major changes |
**Note**: PostgreSQL migration tests and slow tests are NOT in local default gates. They run in separate CI workflows.
**Note**: PostgreSQL migration tests and slow tests are NOT in local default
gates. They run in separate CI workflows. Frontend Playwright tests live under
`web/tests/e2e` and are documented in `web/README.md`.
### Developer Workflow
@@ -28,6 +34,9 @@ make test-all-local
bash scripts/test-quick.sh # ~2 min
bash scripts/test-integration-fast.sh # ~3 min
bash scripts/test-coverage.sh # ~8 min
uv run --python 3.12 pytest tests/e2e -q --tb=short
uv run --python 3.12 pytest tests/integration_tests -q --tb=short
cd web && pnpm test:e2e
```
### Coverage Baseline
@@ -70,6 +79,12 @@ tests/
│ └── persistence/ # Database/persistence tests
│ ├── __init__.py
│ └── test_migrations.py # Alembic migration tests
├── e2e/ # Real LangBot startup E2E tests
│ ├── conftest.py
│ ├── test_startup.py
│ └── utils/
├── integration_tests/ # Container-backed integration tests
│ └── box/ # Box runtime and MCP process tests
├── smoke/ # Smoke tests (quick validation)
│ └── test_fake_message_flow.py
├── unit_tests/ # Unit tests
@@ -303,6 +318,44 @@ These tests:
- Test prevent_default, exception handling, and full message flow
- Do not require real LLM provider keys
### Running backend E2E startup tests
Backend E2E tests start a real LangBot process with a generated minimal
`data/config.yaml`, SQLite database, local storage, and embedded Chroma path.
They do not require provider keys or external services.
```bash
uv run --python 3.12 pytest tests/e2e -q --tb=short
```
These tests verify startup orchestration, migrations, API route registration,
and the minimal no-LLM startup path. The E2E process manager disables ambient
proxy variables for subprocess startup and uses direct localhost HTTP clients,
so local proxy settings should not affect the health checks.
### Running Box integration tests
Box integration tests exercise the real sandbox runtime path, including command
execution, session persistence, managed process WebSocket attachment, and
cleanup behavior.
```bash
uv run --python 3.12 pytest tests/integration_tests -q --tb=short
```
These tests require a working Docker or Podman runtime. In CI, the dedicated
Box integration job checks Docker availability before running the tests.
### Running frontend E2E tests
Frontend E2E tests live in `web/tests/e2e` and use Playwright. They start Vite
and mock the LangBot backend and Space APIs, so no backend process is required.
```bash
cd web
pnpm test:e2e
```
### Known Issues
Some tests may encounter circular import errors. This is a known issue with the current module structure. The test infrastructure is designed to work around this using lazy imports, but if you encounter issues:
@@ -320,6 +373,9 @@ Tests are automatically run on:
- Push to master/develop branches
The workflow runs tests on Python 3.11, 3.12, and 3.13 to ensure compatibility.
Startup E2E and Box integration tests run as separate Python 3.12 jobs because
they exercise process/container behavior instead of pure Python compatibility.
Frontend Playwright smoke tests run in `.github/workflows/frontend-tests.yml`.
## Adding New Tests
@@ -406,4 +462,4 @@ Check that you're mocking at the right level and using `AsyncMock` for async fun
- [ ] Add E2E tests
- [ ] Add performance benchmarks
- [ ] Add mutation testing for better coverage quality
- [ ] Add property-based testing with Hypothesis
- [ ] Add property-based testing with Hypothesis

View File

@@ -92,11 +92,11 @@ def e2e_client(e2e_port, langbot_process):
base_url = f'http://127.0.0.1:{e2e_port}'
with httpx.Client(base_url=base_url, timeout=10.0) as client:
with httpx.Client(base_url=base_url, timeout=10.0, trust_env=False) as client:
yield client
@pytest.fixture(scope='session')
def e2e_db_path(e2e_tmpdir):
"""Path to SQLite database file."""
return e2e_tmpdir / 'data' / 'langbot.db'
return e2e_tmpdir / 'data' / 'langbot.db'

View File

@@ -38,7 +38,7 @@ class TestStartupFlow:
# System info should contain version info
assert 'version' in data['data'] or 'edition' in data['data']
def test_database_initialized(self, e2e_db_path):
def test_database_initialized(self, langbot_process, e2e_db_path):
"""Verify SQLite database was created and initialized."""
assert e2e_db_path.exists()
@@ -75,7 +75,7 @@ class TestStartupFlow:
"""Test auth endpoint."""
# First startup may allow initial setup
response = e2e_client.post('/api/v1/user/auth', json={
'username': 'admin',
'user': 'admin',
'password': 'admin',
})
@@ -94,7 +94,7 @@ class TestStartupStages:
# If API responds on e2e_port, config was loaded
assert e2e_client.get('/api/v1/system/info').status_code == 200
def test_migrations_applied(self, e2e_db_path):
def test_migrations_applied(self, langbot_process, e2e_db_path):
"""Verify database migrations were applied."""
import sqlite3
conn = sqlite3.connect(str(e2e_db_path))

View File

@@ -44,6 +44,17 @@ class LangBotProcess:
# Prepare environment
env = os.environ.copy()
env['PYTHONPATH'] = str(self.project_root / 'src')
for proxy_key in (
'HTTP_PROXY',
'HTTPS_PROXY',
'ALL_PROXY',
'http_proxy',
'https_proxy',
'all_proxy',
):
env.pop(proxy_key, None)
env['NO_PROXY'] = '127.0.0.1,localhost'
env['no_proxy'] = '127.0.0.1,localhost'
# Set API port via environment variable
env['API__PORT'] = str(self.port)
@@ -113,6 +124,8 @@ precision = 2
r = httpx.get(
f'http://127.0.0.1:{self.port}/api/v1/system/info',
timeout=2.0,
follow_redirects=False,
trust_env=False,
)
if r.status_code == 200:
logger.info(f'LangBot started successfully on port {self.port}')
@@ -185,6 +198,8 @@ precision = 2
r = httpx.get(
f'http://127.0.0.1:{self.port}/api/v1/system/info',
timeout=5.0,
follow_redirects=False,
trust_env=False,
)
return r.status_code == 200
except Exception:
@@ -201,4 +216,4 @@ def find_project_root() -> Path:
return parent
# Fallback to LangBot-test-build directory
return Path('/home/glwuy/langbot-app/LangBot-test-build')
return Path('/home/glwuy/langbot-app/LangBot-test-build')

View File

@@ -54,9 +54,7 @@ def test_classify_python_workspace_detects_package_and_requirements():
def test_wrap_python_command_with_env_contains_bootstrap_and_command():
command = wrap_python_command_with_env('python script.py')
assert '_LB_SYSTEM_PYTHON="$(command -v python3 || command -v python || true)"' in command
assert '"$_LB_SYSTEM_PYTHON" -m venv "$_LB_VENV_DIR"' in command
assert 'kill -0 "$_LB_LOCK_OWNER"' in command
assert 'python -m venv "$_LB_VENV_DIR"' in command
assert 'export VIRTUAL_ENV="$_LB_VENV_DIR"' in command
assert command.rstrip().endswith('python script.py')

View File

@@ -180,7 +180,7 @@ class TestMCPServerBoxConfig:
assert cfg.host_path is None
assert cfg.host_path_mode == 'ro'
assert cfg.env == {}
assert cfg.startup_timeout_sec == 300
assert cfg.startup_timeout_sec == 120
assert cfg.cpus is None
assert cfg.memory_mb is None
assert cfg.pids_limit is None
@@ -494,84 +494,6 @@ class TestBuildBoxProcessPayload:
assert payload['args'] == ['/opt/other/server.py', '--flag']
# ── Python Workspace Preparation ────────────────────────────────────
class TestPythonWorkspacePreparation:
def test_requirements_workspace_uses_venv_bootstrap(self, mcp_module, tmp_path):
host_path = tmp_path / 'mcp-source'
host_path.mkdir()
(host_path / 'requirements.txt').write_text('mcp==1.26.0\n', encoding='utf-8')
command = mcp_module.BoxStdioSessionRuntime.detect_install_command(
str(host_path),
'/workspace/.mcp/u1/workspace',
)
assert command is not None
assert '_LB_SYSTEM_PYTHON="$(command -v python3 || command -v python || true)"' in command
assert '"$_LB_SYSTEM_PYTHON" -m venv "$_LB_VENV_DIR"' in command
assert 'python -m pip install -r "/workspace/.mcp/u1/workspace/requirements.txt"' in command
assert 'pip install --no-cache-dir -r' not in command
def test_staging_refresh_removes_stale_source_files_but_preserves_runtime_dirs(self, mcp_module, tmp_path):
source = tmp_path / 'source'
source.mkdir()
(source / 'server.py').write_text('print("new")\n', encoding='utf-8')
(source / 'requirements.txt').write_text('mcp==1.26.0\n', encoding='utf-8')
(source / '.env').write_text('TOKEN=new\n', encoding='utf-8')
process_root = tmp_path / 'shared' / '.mcp' / 'u1'
workspace = process_root / 'workspace'
(workspace / '.venv' / 'bin').mkdir(parents=True)
(workspace / '.venv' / 'bin' / 'python').write_text('', encoding='utf-8')
(workspace / '.langbot').mkdir()
(workspace / '.langbot' / 'python-env.lock').mkdir()
(workspace / '.env').write_text('TOKEN=old\n', encoding='utf-8')
(workspace / 'server.py').write_text('print("old")\n', encoding='utf-8')
(workspace / 'removed.py').write_text('stale\n', encoding='utf-8')
(workspace / 'removed_dir').mkdir()
(workspace / 'removed_dir' / 'old.txt').write_text('stale\n', encoding='utf-8')
mcp_module.BoxStdioSessionRuntime._copy_workspace_tree(str(source), str(process_root), str(workspace))
assert (workspace / 'server.py').read_text(encoding='utf-8') == 'print("new")\n'
assert (workspace / 'requirements.txt').read_text(encoding='utf-8') == 'mcp==1.26.0\n'
assert (workspace / '.env').read_text(encoding='utf-8') == 'TOKEN=new\n'
assert not (workspace / 'removed.py').exists()
assert not (workspace / 'removed_dir').exists()
assert (workspace / '.venv' / 'bin' / 'python').exists()
assert (workspace / '.langbot' / 'python-env.lock').is_dir()
def test_staging_refresh_ignores_unlink_race(self, mcp_module, tmp_path, monkeypatch):
mcp_stdio_module = sys.modules['langbot.pkg.provider.tools.loaders.mcp_stdio']
source = tmp_path / 'source'
source.mkdir()
(source / 'server.py').write_text('print("new")\n', encoding='utf-8')
process_root = tmp_path / 'shared' / '.mcp' / 'u1'
workspace = process_root / 'workspace'
workspace.mkdir(parents=True)
stale_file = workspace / 'removed.py'
stale_file.write_text('stale\n', encoding='utf-8')
real_unlink = os.unlink
def unlink_with_race(path):
if os.fspath(path) == str(stale_file):
real_unlink(path)
raise FileNotFoundError(path)
real_unlink(path)
monkeypatch.setattr(mcp_stdio_module.os, 'unlink', unlink_with_race)
mcp_module.BoxStdioSessionRuntime._copy_workspace_tree(str(source), str(process_root), str(workspace))
assert not stale_file.exists()
assert (workspace / 'server.py').read_text(encoding='utf-8') == 'print("new")\n'
# ── get_runtime_info_dict ───────────────────────────────────────────

View File

@@ -193,29 +193,6 @@ class TestSkillPathHelpers:
assert list(result.keys()) == ['visible']
def test_restore_activated_skills_uses_caller_provided_names_and_visibility(self):
from langbot.pkg.provider.tools.loaders.skill import (
ACTIVATED_SKILLS_KEY,
PIPELINE_BOUND_SKILLS_KEY,
get_activated_skill_names,
restore_activated_skills,
)
ap = _make_ap()
ap.skill_mgr = SimpleNamespace(
skills={
'visible': _make_skill_data(name='visible'),
'hidden': _make_skill_data(name='hidden'),
}
)
query = SimpleNamespace(variables={PIPELINE_BOUND_SKILLS_KEY: ['visible']})
restored = restore_activated_skills(ap, query, ['visible', 'hidden', 'visible', ''])
assert restored == ['visible']
assert list(query.variables[ACTIVATED_SKILLS_KEY].keys()) == ['visible']
assert get_activated_skill_names(query) == ['visible']
def test_resolve_virtual_skill_path_allows_visible_skill_reads(self):
from langbot.pkg.provider.tools.loaders.skill import (
PIPELINE_BOUND_SKILLS_KEY,
@@ -268,8 +245,7 @@ class TestSkillPathHelpers:
command = wrap_skill_command_with_python_env('python scripts/run.py')
assert '_LB_SYSTEM_PYTHON="$(command -v python3 || command -v python || true)"' in command
assert '"$_LB_SYSTEM_PYTHON" -m venv "$_LB_VENV_DIR"' in command
assert 'python -m venv "$_LB_VENV_DIR"' in command
assert 'export VIRTUAL_ENV="$_LB_VENV_DIR"' in command
assert command.rstrip().endswith('python scripts/run.py')
@@ -305,7 +281,6 @@ class TestSkillToolLoader:
assert result['activated'] is True
assert result['skill_name'] == 'demo'
assert result['mount_path'] == '/workspace/.skills/demo'
assert result['activated_skill_names'] == ['demo']
assert 'Step 1' in result['content']
assert set(query.variables[ACTIVATED_SKILLS_KEY].keys()) == {'demo'}
@@ -481,9 +456,7 @@ class TestNativeToolLoaderSkillPaths:
SimpleNamespace(query_id='q1', variables={PIPELINE_BOUND_SKILLS_KEY: ['demo']}),
)
assert result['ok'] is True
assert result['content'] == 'demo instructions'
assert result['truncated'] is False
assert result == {'ok': True, 'content': 'demo instructions'}
@pytest.mark.asyncio
async def test_exec_in_activated_skill_mount_rewrites_command_and_refreshes(self):
@@ -512,7 +485,7 @@ class TestNativeToolLoaderSkillPaths:
query,
)
assert result['ok'] is True
assert result == {'ok': True}
tool_parameters = ap.box_service.execute_tool.await_args.args[0]
assert tool_parameters['command'] == 'python /workspace/.skills/demo/scripts/run.py'
assert tool_parameters['workdir'] == '/workspace/.skills/demo'

View File

@@ -248,135 +248,3 @@ async def test_path_escape_blocked():
with pytest.raises(ValueError, match='escapes'):
await loader.invoke_tool('read', {'path': '/workspace/../../etc/passwd'}, _make_query())
@pytest.mark.asyncio
async def test_box_availability_helper_handles_unavailable_and_errors():
from langbot.pkg.provider.tools.loaders.availability import is_box_backend_available
assert await is_box_backend_available(SimpleNamespace()) is False
assert await is_box_backend_available(SimpleNamespace(box_service=SimpleNamespace(available=False))) is False
unavailable_backend = SimpleNamespace(
available=True,
get_status=AsyncMock(return_value={'backend': {'available': False}}),
)
assert await is_box_backend_available(SimpleNamespace(box_service=unavailable_backend)) is False
failing_backend = SimpleNamespace(
available=True,
get_status=AsyncMock(side_effect=RuntimeError('box unavailable')),
)
assert await is_box_backend_available(SimpleNamespace(box_service=failing_backend)) is False
@pytest.mark.asyncio
async def test_read_file_supports_offset_limit_and_truncation_metadata():
with tempfile.TemporaryDirectory() as tmpdir:
loader, _ = _make_loader_with_workspace(tmpdir)
with open(os.path.join(tmpdir, 'lines.txt'), 'w', encoding='utf-8') as f:
f.write('one\ntwo\nthree\nfour\n')
result = await loader.invoke_tool(
'read',
{'path': '/workspace/lines.txt', 'offset': 2, 'limit': 2},
_make_query(),
)
assert result == {
'ok': True,
'content': 'two\nthree',
'truncated': True,
'truncated_by': 'lines',
'start_line': 2,
'end_line': 3,
'next_offset': 4,
'max_lines': 2,
'max_bytes': 50 * 1024,
}
@pytest.mark.asyncio
async def test_read_file_handles_line_larger_than_byte_limit():
with tempfile.TemporaryDirectory() as tmpdir:
loader, _ = _make_loader_with_workspace(tmpdir)
with open(os.path.join(tmpdir, 'long-line.txt'), 'w', encoding='utf-8') as f:
f.write('abcdef\n')
result = await loader.invoke_tool(
'read',
{'path': '/workspace/long-line.txt', 'max_bytes': 3},
_make_query(),
)
assert result['ok'] is True
assert result['truncated'] is True
assert result['truncated_by'] == 'bytes'
assert result['next_offset'] == 1
assert 'exceeds the 3B read limit' in result['content']
@pytest.mark.asyncio
async def test_exec_result_is_capped_and_exposes_preview_metadata():
with tempfile.TemporaryDirectory() as tmpdir:
box_service = SimpleNamespace(
available=True,
default_workspace=tmpdir,
execute_tool=AsyncMock(
return_value={
'ok': True,
'stdout': 'a' * 60000,
'stderr': 'b' * 60000,
'exit_code': 0,
}
),
)
loader = NativeToolLoader(SimpleNamespace(box_service=box_service, logger=Mock()))
result = await loader.invoke_tool('exec', {'command': 'python -V'}, _make_query())
assert result['ok'] is True
assert len(result['stdout'].encode('utf-8')) == 50 * 1024
assert len(result['stderr'].encode('utf-8')) == 50 * 1024
assert len(result['preview'].encode('utf-8')) == 50 * 1024
assert result['stdout_truncated'] is True
assert result['stderr_truncated'] is True
assert result['truncated'] is True
assert result['truncated_by'] == 'bytes'
@pytest.mark.asyncio
async def test_glob_caps_match_count_and_returns_preview():
with tempfile.TemporaryDirectory() as tmpdir:
loader, _ = _make_loader_with_workspace(tmpdir)
for index in range(105):
with open(os.path.join(tmpdir, f'file-{index:03d}.txt'), 'w', encoding='utf-8') as f:
f.write(str(index))
result = await loader.invoke_tool('glob', {'path': '/workspace', 'pattern': '*.txt'}, _make_query())
assert result['ok'] is True
assert result['total'] == 105
assert len(result['matches']) == 100
assert result['preview'] == '\n'.join(result['matches'])
assert result['truncated'] is True
assert result['truncated_by'] == 'matches'
@pytest.mark.asyncio
async def test_grep_reports_invalid_regex_and_truncates_long_matching_lines():
with tempfile.TemporaryDirectory() as tmpdir:
loader, _ = _make_loader_with_workspace(tmpdir)
with open(os.path.join(tmpdir, 'data.txt'), 'w', encoding='utf-8') as f:
f.write('needle ' + ('x' * 600) + '\n')
invalid = await loader.invoke_tool('grep', {'path': '/workspace', 'pattern': '['}, _make_query())
result = await loader.invoke_tool('grep', {'path': '/workspace', 'pattern': 'needle'}, _make_query())
assert invalid['ok'] is False
assert 'Invalid regex' in invalid['error']
assert result['ok'] is True
assert result['truncated'] is True
assert result['truncated_by'] == 'line'
assert result['matches'][0]['file'] == '/workspace/data.txt'
assert result['matches'][0]['content'].endswith('... [truncated]')

2
web/.gitignore vendored
View File

@@ -12,6 +12,8 @@
# testing
/coverage
/playwright-report
/test-results
# next.js
/dist/

View File

@@ -1,3 +1,13 @@
# Debug LangBot Frontend
Please refer to the [Development Guide](https://link.langbot.app/en/docs/dev-config) for more information.
## Tests
Run the frontend smoke tests without a backend process:
```bash
pnpm test:e2e
```
The Playwright suite starts Vite and mocks the LangBot backend and Space APIs.

View File

@@ -6,6 +6,7 @@
"dev": "vite",
"build": "tsc && vite build",
"preview": "vite preview",
"test:e2e": "playwright test",
"lint": "eslint .",
"format": "prettier --write ."
},
@@ -86,6 +87,7 @@
"zod": "^3.24.4"
},
"devDependencies": {
"@playwright/test": "^1.61.0",
"@types/debug": "^4.1.12",
"@types/estree": "^1.0.8",
"@types/estree-jsx": "^1.0.5",

25
web/playwright.config.ts Normal file
View File

@@ -0,0 +1,25 @@
import { defineConfig, devices } from '@playwright/test';
export default defineConfig({
testDir: './tests/e2e',
fullyParallel: true,
forbidOnly: !!process.env.CI,
retries: process.env.CI ? 1 : 0,
reporter: process.env.CI ? [['github'], ['list']] : 'list',
use: {
baseURL: 'http://127.0.0.1:4173',
trace: 'on-first-retry',
},
projects: [
{
name: 'chromium',
use: { ...devices['Desktop Chrome'] },
},
],
webServer: {
command: 'pnpm exec vite --host 127.0.0.1 --port 4173',
url: 'http://127.0.0.1:4173',
reuseExistingServer: !process.env.CI,
timeout: 120_000,
},
});

35
web/pnpm-lock.yaml generated
View File

@@ -192,6 +192,9 @@ dependencies:
version: 3.25.76
devDependencies:
'@playwright/test':
specifier: ^1.61.0
version: 1.61.0
'@types/debug':
specifier: ^4.1.12
version: 4.1.12
@@ -529,6 +532,14 @@ packages:
engines: {node: ^12.20.0 || ^14.18.0 || >=16.0.0}
dev: true
/@playwright/test@1.61.0:
resolution: {integrity: sha512-cKA5B6lpFEMyMGjxF54QihfYpB4FkEGH+qZhtArDEG+wezQAJY8Pq6C7T1SjWz+FFzt3TbyoXBQYk/0292TdJA==}
engines: {node: '>=18'}
hasBin: true
dependencies:
playwright: 1.61.0
dev: true
/@radix-ui/number@1.1.1:
resolution: {integrity: sha512-MkKCwxlXTgz6CFoJx3pCwn07GKp36+aZyu/u2Ln2VrA5DcdyCZkASEDBTd8x5whTQQL5CiYf4prXKLcgQdv29g==}
dev: false
@@ -3204,6 +3215,14 @@ packages:
engines: {node: '>=0.4.x'}
dev: false
/fsevents@2.3.2:
resolution: {integrity: sha512-xiqMQR4xAeHTuB9uWm+fFRcIOgKBMiOBP+eXiyT7jsgVCq1bkVygt00oASowB7EdtpOHaaPgKt812P9ab+DDKA==}
engines: {node: ^8.16.0 || ^10.6.0 || >=11.0.0}
os: [darwin]
requiresBuild: true
dev: true
optional: true
/fsevents@2.3.3:
resolution: {integrity: sha512-5xoDfX+fL7faATnagmWPpbFtwh/R77WmMMqqHGS65C3vvB0YHrgF+B1YmZ3441tMj5n63k0212XNoJwzlhffQw==}
engines: {node: ^8.16.0 || ^10.6.0 || >=11.0.0}
@@ -4940,6 +4959,22 @@ packages:
hasBin: true
dev: true
/playwright-core@1.61.0:
resolution: {integrity: sha512-caX7TrY3Ml6egyDX0WUcTHDxodl/b51y5wJOdCEA36QviK/s2g081hvmGs8eaE3DWb6NYZQ6BjO/QkNRPenoPA==}
engines: {node: '>=18'}
hasBin: true
dev: true
/playwright@1.61.0:
resolution: {integrity: sha512-Z+7BeeqQPRRzklHsVFP4KTGIyMxKUmfeRA4WisM6G3/XW6nwGeX6fX9qYaDa+CiUqpOkb2f6X3nar05R3kSuJQ==}
engines: {node: '>=18'}
hasBin: true
dependencies:
playwright-core: 1.61.0
optionalDependencies:
fsevents: 2.3.2
dev: true
/pngjs@5.0.0:
resolution: {integrity: sha512-40QW5YalBNfQo5yRYmiw7Yz6TKKVr3h6970B2YE+3fQpsWcrbj1PzJgxeJ19DRQjhMbKPIuMY8rFaXc8moolVw==}
engines: {node: '>=10.13.0'}

View File

@@ -0,0 +1,417 @@
import { Page, Route } from '@playwright/test';
type JsonRecord = Record<string, unknown>;
interface SkillMock {
name: string;
display_name: string;
description: string;
instructions: string;
package_root: string;
updated_at: string;
}
interface LangBotApiMockState {
skills: SkillMock[];
}
function ok(data: unknown) {
return {
code: 0,
message: 'ok',
data,
timestamp: Date.now(),
};
}
async function fulfillJson(route: Route, data: unknown) {
await route.fulfill({
status: 200,
contentType: 'application/json',
body: JSON.stringify(ok(data)),
});
}
function routePath(route: Route) {
return new URL(route.request().url()).pathname;
}
function emptyMonitoringData() {
return {
overview: {
total_messages: 0,
llm_calls: 0,
embedding_calls: 0,
model_calls: 0,
success_rate: 0,
active_sessions: 0,
},
messages: [],
llmCalls: [],
embeddingCalls: [],
sessions: [],
errors: [],
totalCount: {
messages: 0,
llmCalls: 0,
embeddingCalls: 0,
sessions: 0,
errors: 0,
},
};
}
function emptyTokenStatistics() {
return {
summary: {
total_calls: 0,
success_calls: 0,
error_calls: 0,
total_input_tokens: 0,
total_output_tokens: 0,
total_tokens: 0,
total_cost: 0,
avg_tokens_per_call: 0,
avg_duration_ms: 0,
avg_tokens_per_second: 0,
zero_token_success_calls: 0,
},
by_model: [],
timeseries: [],
bucket: 'day',
};
}
function makeSkill(data: JsonRecord): SkillMock {
return {
name: String(data.name || ''),
display_name: String(data.display_name || ''),
description: String(data.description || ''),
instructions: String(data.instructions || ''),
package_root: String(data.package_root || ''),
updated_at: new Date().toISOString(),
};
}
async function handleBackendApi(route: Route, state: LangBotApiMockState) {
const request = route.request();
const url = new URL(request.url());
const path = url.pathname;
const method = request.method();
if (path === '/api/v1/system/info') {
return fulfillJson(route, {
debug: false,
version: 'frontend-smoke',
edition: 'community',
cloud_service_url: 'https://space.langbot.app',
enable_marketplace: true,
allow_modify_login_info: true,
disable_models_service: false,
limitation: {
max_bots: -1,
max_pipelines: -1,
max_extensions: -1,
},
outbound_ips: [],
wizard_status: 'completed',
wizard_progress: null,
});
}
if (path === '/api/v1/user/account-info') {
return fulfillJson(route, {
initialized: true,
account_type: 'local',
has_password: true,
});
}
if (path === '/api/v1/user/check-token') {
return fulfillJson(route, { token: '' });
}
if (path === '/api/v1/user/auth') {
return fulfillJson(route, { token: 'playwright-token' });
}
if (path === '/api/v1/user/info') {
return fulfillJson(route, {
user: 'admin@example.com',
account_type: 'local',
has_password: true,
});
}
if (path === '/api/v1/user/space-credits') {
return fulfillJson(route, { credits: null });
}
if (path === '/api/v1/platform/bots') {
return fulfillJson(route, { bots: [] });
}
if (path === '/api/v1/pipelines') {
return fulfillJson(route, { pipelines: [] });
}
if (path === '/api/v1/knowledge/bases') {
return fulfillJson(route, { bases: [] });
}
if (path === '/api/v1/knowledge/migration/status') {
return fulfillJson(route, {
needed: false,
internal_kb_count: 0,
external_kb_count: 0,
});
}
if (path === '/api/v1/plugins') {
return fulfillJson(route, { plugins: [] });
}
if (path === '/api/v1/extensions') {
return fulfillJson(route, { extensions: [] });
}
if (path === '/api/v1/mcp/servers') {
return fulfillJson(route, { servers: [] });
}
if (path === '/api/v1/skills') {
if (method === 'POST') {
const skill = makeSkill(
JSON.parse(request.postData() || '{}') as JsonRecord,
);
state.skills = [
...state.skills.filter((item) => item.name !== skill.name),
skill,
];
return fulfillJson(route, { skill });
}
return fulfillJson(route, { skills: state.skills });
}
const skillFileMatch = path.match(
/^\/api\/v1\/skills\/([^/]+)\/files\/(.+)$/,
);
if (skillFileMatch) {
const skillName = decodeURIComponent(skillFileMatch[1]);
const filePath = decodeURIComponent(skillFileMatch[2]);
const skill = state.skills.find((item) => item.name === skillName);
return fulfillJson(route, {
skill: { name: skillName },
path: filePath,
content: skill?.instructions || '',
});
}
const skillFilesMatch = path.match(/^\/api\/v1\/skills\/([^/]+)\/files$/);
if (skillFilesMatch) {
const skillName = decodeURIComponent(skillFilesMatch[1]);
return fulfillJson(route, {
skill: { name: skillName },
base_path: '.',
entries: [
{
path: 'SKILL.md',
name: 'SKILL.md',
is_dir: false,
size: null,
},
],
truncated: false,
});
}
const skillMatch = path.match(/^\/api\/v1\/skills\/([^/]+)$/);
if (skillMatch) {
const skillName = decodeURIComponent(skillMatch[1]);
const skill = state.skills.find((item) => item.name === skillName) || {
name: skillName,
display_name: '',
description: '',
instructions: '',
package_root: '',
updated_at: new Date().toISOString(),
};
return fulfillJson(route, { skill });
}
if (path === '/api/v1/system/status/plugin-system') {
return fulfillJson(route, {
is_enable: true,
is_connected: true,
plugin_connector_error: '',
});
}
if (path === '/api/v1/plugins/debug-info') {
return fulfillJson(route, {
debug_url: 'ws://127.0.0.1:5300/plugin/debug',
plugin_debug_key: 'test-debug-key',
});
}
if (path === '/api/v1/box/status') {
return fulfillJson(route, {
available: true,
enabled: true,
profile: 'playwright',
recent_error_count: 0,
active_sessions: 0,
managed_processes: 0,
session_ttl_sec: 3600,
backend: {
name: 'playwright',
available: true,
},
});
}
if (path === '/api/v1/box/sessions') {
return fulfillJson(route, []);
}
if (path === '/api/v1/monitoring/data') {
return fulfillJson(route, emptyMonitoringData());
}
if (path === '/api/v1/monitoring/overview') {
return fulfillJson(route, emptyMonitoringData().overview);
}
if (path === '/api/v1/monitoring/token-statistics') {
return fulfillJson(route, emptyTokenStatistics());
}
if (path === '/api/v1/monitoring/feedback/stats') {
return fulfillJson(route, {
total_feedback: 0,
total_likes: 0,
total_dislikes: 0,
satisfaction_rate: 0,
});
}
if (path === '/api/v1/monitoring/feedback') {
return fulfillJson(route, { feedback: [], total: 0 });
}
if (path === '/api/v1/survey/pending') {
return fulfillJson(route, { survey: null });
}
if (path === '/api/v1/system/tasks') {
return fulfillJson(route, { tasks: [] });
}
if (
path === '/api/v1/marketplace/plugins' ||
path === '/api/v1/marketplace/plugins/search' ||
path === '/api/v1/marketplace/extensions/search' ||
path === '/api/v1/marketplace/mcps/search' ||
path === '/api/v1/marketplace/skills/search'
) {
return fulfillJson(route, { plugins: [], total: 0 });
}
if (path === '/api/v1/marketplace/tags') {
return fulfillJson(route, { tags: [] });
}
if (path === '/api/v1/marketplace/recommendation-lists') {
return fulfillJson(route, { lists: [] });
}
if (path === '/api/v1/dist/info/releases') {
return fulfillJson(route, []);
}
if (path === '/api/v1/dist/info/repo') {
return fulfillJson(route, {
repo: {
stargazers_count: 0,
forks_count: 0,
open_issues_count: 0,
},
contributors: [],
});
}
await fulfillJson(route, {});
}
async function handleCloudApi(route: Route) {
const path = routePath(route);
if (
path === '/api/v1/marketplace/plugins' ||
path === '/api/v1/marketplace/plugins/search' ||
path === '/api/v1/marketplace/extensions/search' ||
path === '/api/v1/marketplace/mcps/search' ||
path === '/api/v1/marketplace/skills/search'
) {
return fulfillJson(route, { plugins: [], total: 0 });
}
if (path === '/api/v1/marketplace/tags') {
return fulfillJson(route, { tags: [] });
}
if (path === '/api/v1/marketplace/recommendation-lists') {
return fulfillJson(route, { lists: [] });
}
if (path === '/api/v1/dist/info/releases') {
return fulfillJson(route, []);
}
if (path === '/api/v1/dist/info/repo') {
return fulfillJson(route, {
repo: {
stargazers_count: 0,
forks_count: 0,
open_issues_count: 0,
},
contributors: [],
});
}
await fulfillJson(route, {});
}
export async function installLangBotApiMocks(
page: Page,
options: { authenticated?: boolean; storage?: JsonRecord } = {},
) {
const { authenticated = false, storage = {} } = options;
const state: LangBotApiMockState = {
skills: [],
};
await page.addInitScript(
({ authenticated, storage }) => {
localStorage.setItem('langbot_language', 'en-US');
localStorage.setItem('extensions_group_by_type', 'false');
if (authenticated) {
localStorage.setItem('token', 'playwright-token');
localStorage.setItem('userEmail', 'admin@example.com');
} else {
localStorage.removeItem('token');
localStorage.removeItem('userEmail');
}
for (const [key, value] of Object.entries(storage)) {
localStorage.setItem(key, String(value));
}
},
{ authenticated, storage },
);
await page.route('**/api/v1/**', (route) => handleBackendApi(route, state));
await page.route('https://space.langbot.app/**', handleCloudApi);
}

View File

@@ -0,0 +1,133 @@
import { expect, test } from '@playwright/test';
import { installLangBotApiMocks } from './fixtures/langbot-api';
const appRoutes = [
{
path: '/home/bots',
heading: 'Bots',
bodyText: 'Select a bot from the sidebar',
},
{
path: '/home/pipelines',
heading: 'Pipelines',
bodyText: 'Select a pipeline from the sidebar',
},
{
path: '/home/extensions',
heading: 'Extensions',
bodyText: 'No extensions installed',
},
{
path: '/home/mcp',
heading: 'MCP',
bodyText: 'Select an MCP server from the sidebar',
},
{
path: '/home/knowledge',
heading: 'Knowledge',
bodyText: 'Select a knowledge base from the sidebar',
},
];
test.describe('authenticated app shell', () => {
for (const route of appRoutes) {
test(`${route.path} renders without a backend process`, async ({
page,
}) => {
await installLangBotApiMocks(page, { authenticated: true });
await page.goto(route.path);
await expect(page).toHaveURL(new RegExp(`${route.path}$`));
await expect(page.getByText('Home').first()).toBeVisible();
await expect(
page.getByRole('button', { name: 'Dashboard' }),
).toBeVisible();
await expect(page.getByText('Extensions').first()).toBeVisible();
await expect(page.getByText(route.heading).first()).toBeVisible();
await expect(page.getByText(route.bodyText)).toBeVisible();
await expect(page.getByText('Backend unavailable')).toHaveCount(0);
});
}
test('/home/monitoring loads dashboard data from mocked APIs', async ({
page,
}) => {
await installLangBotApiMocks(page, { authenticated: true });
await page.goto('/home/monitoring');
await expect(page).toHaveURL(/\/home\/monitoring$/);
await expect(page.getByText('Total Messages').first()).toBeVisible();
await expect(
page.getByRole('tab', { name: 'Message Records' }),
).toBeVisible();
await expect(
page.getByRole('tab', { name: 'Token Monitoring' }),
).toBeVisible();
await page.getByRole('tab', { name: 'Token Monitoring' }).click();
await expect(
page.getByText('No token usage in the selected time range'),
).toBeVisible();
await expect(page.getByText('Unable to connect to server')).toHaveCount(0);
});
test('/home/extensions shows plugin debug information from the backend', async ({
page,
}) => {
await installLangBotApiMocks(page, { authenticated: true });
await page.goto('/home/extensions');
await page.getByRole('button', { name: 'Debug Info' }).click();
await expect(page.getByText('Plugin Debug Information')).toBeVisible();
await expect(page.getByRole('textbox').nth(0)).toHaveValue(
'ws://127.0.0.1:5300/plugin/debug',
);
await expect(page.getByRole('textbox').nth(1)).toHaveValue(
'test-debug-key',
);
});
test('/home/skills?action=create creates a manual skill', async ({
page,
}) => {
await installLangBotApiMocks(page, { authenticated: true });
await page.goto('/home/skills?action=create');
await expect(page).toHaveURL(/\/home\/skills\?action=create$/);
await expect(page.getByText('Create Skill').first()).toBeVisible();
await expect(page.getByText('Import Local Skill Directory')).toBeVisible();
const saveButton = page.getByRole('button', { name: 'Save' });
await expect(saveButton).toBeEnabled();
await saveButton.click();
await expect(page.getByText('Skill name cannot be empty')).toBeVisible();
await page.locator('#display_name').fill('Daily Summary');
await page.locator('#name').fill('daily_summary');
await page
.locator('#description')
.fill('Summarizes the current conversation for handoff.');
await page
.locator('#instructions')
.fill('Summarize the conversation in five concise bullet points.');
await saveButton.click();
await expect(page).toHaveURL(/\/home\/skills\?id=daily_summary$/);
await expect(
page.getByRole('heading', { name: 'Daily Summary' }),
).toBeVisible();
await expect(page.locator('#name')).toHaveValue('daily_summary');
await expect(page.locator('#description')).toHaveValue(
'Summarizes the current conversation for handoff.',
);
await expect(page.locator('#instructions')).toHaveValue(
'Summarize the conversation in five concise bullet points.',
);
});
});

View File

@@ -0,0 +1,22 @@
import { expect, test } from '@playwright/test';
import { installLangBotApiMocks } from './fixtures/langbot-api';
test('local account login reaches the authenticated home shell', async ({
page,
}) => {
await installLangBotApiMocks(page);
await page.goto('/login');
await expect(page.getByText('Welcome')).toBeVisible();
await page.getByPlaceholder('Enter email address').fill('admin@example.com');
await page.getByPlaceholder('Enter password').fill('password');
await page.getByRole('button', { name: 'Login with password' }).click();
await expect(page).toHaveURL(/\/home$/);
await expect(page.getByText('Home').first()).toBeVisible();
await expect(page.getByRole('button', { name: 'Dashboard' })).toBeVisible();
await expect(page.getByText('Total Messages').first()).toBeVisible();
await expect(page.getByText('Unable to connect to server')).toHaveCount(0);
});