chore(skill): prune dead local-filesystem helpers left over from Box migration

Follow-up to the Box-only refactor. The previous commit removed the
local-fallback BRANCHES from every public method; this one removes the
HELPERS those branches called, which are now unreachable.

SkillService (service/skill.py): 787 → 449 lines
  Removed: scan_directory (sync), _read_skill_package, _write_skill_md,
  _resolve_create_field, _managed_skill_path,
  _managed_install_root_for_package, _normalize_package_root,
  _resolve_skill_path, _find_skill_entry, _discover_skill_directories,
  _safe_extract_zip, _extract_uploaded_skill_to_temp,
  _download_github_skill_to_temp, _resolve_github_source_root,
  _build_preview_target_dir, _preview_skill_candidates,
  _select_preview_candidates, _install_preview_candidates,
  _preview_source_root, _resolve_installed_skills, plus the
  module-level _FRONTMATTER_FIELDS and _build_skill_md.
  Kept (still needed by the surviving GitHub-import path):
  _download_github_asset, _download_github_skill_directory_as_zip,
  _find_github_skill_archive_entry, _copy_github_skill_directory_to_zip,
  _is_github_skill_md_url, _parse_github_skill_md_url,
  _resolve_github_skill_md_package_name, _validate_github_asset_url,
  _uploaded_skill_target_stem, _validate_skill_name.
  Imports dropped: shutil, tempfile, yaml, ....utils.paths.

SkillManager (skill/manager.py): 187 → 88 lines
  Removed: get_managed_skills_root, _discover_skill_directories,
  _find_skill_entry, _load_skill_file, _normalize_package_root.
  Imports dropped: datetime, parse_frontmatter, paths.

Tests:
  - test_skill_service.py: drop the 3 sync scan_directory tests +
    skill_service fixture + _create_skill_file helper
  - test_skill_tools.py: drop test_load_skill_file_success; rename
    TestSkillManagerPackageLoading → TestSkillManagerCache

Full unit suite: 277 passed, 1 skipped. ``ruff check`` clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Junyan Qin
2026-05-20 22:24:08 +08:00
parent cc072be7f7
commit 42855cf4cc
4 changed files with 4 additions and 526 deletions

View File

@@ -4,23 +4,15 @@ import io
import inspect import inspect
import os import os
import posixpath import posixpath
import shutil
import zipfile import zipfile
from typing import Optional from typing import Optional
from urllib.parse import quote, unquote, urlparse from urllib.parse import quote, unquote, urlparse
import httpx import httpx
import yaml
from ....core import app from ....core import app
from ....skill.utils import parse_frontmatter from ....skill.utils import parse_frontmatter
from ....utils import paths
_FRONTMATTER_FIELDS = (
'name',
'display_name',
'description',
)
_PUBLIC_SKILL_FIELDS = ( _PUBLIC_SKILL_FIELDS = (
'name', 'name',
@@ -42,23 +34,6 @@ _GITHUB_ASSET_HOSTS = {
} }
def _build_skill_md(metadata: dict, instructions: str) -> str:
frontmatter = {}
for key in _FRONTMATTER_FIELDS:
value = metadata.get(key)
if value is None:
continue
if isinstance(value, str) and not value.strip():
continue
frontmatter[key] = value
if not frontmatter:
return instructions
frontmatter_text = yaml.dump(frontmatter, default_flow_style=False, allow_unicode=True, sort_keys=False).strip()
return f'---\n{frontmatter_text}\n---\n\n{instructions}'
class SkillService: class SkillService:
"""Filesystem-backed skill management service.""" """Filesystem-backed skill management service."""
@@ -257,35 +232,6 @@ class SkillService:
await self._reload_skills() await self._reload_skills()
return await self.list_skills() return await self.list_skills()
def scan_directory(self, path: str) -> dict:
if not os.path.isdir(path):
raise ValueError(f'Directory does not exist: {path}')
discovered = self._discover_skill_directories(path, max_depth=2)
if not discovered:
raise ValueError(f'No SKILL.md found in {path} or its subdirectories (max depth: 2)')
if len(discovered) > 1:
candidates = ', '.join(found_path for found_path, _entry in discovered)
raise ValueError(
f'Multiple skill directories found in {path}. Please choose a more specific path: {candidates}'
)
package_root, entry_file = discovered[0]
entry_path = os.path.join(package_root, entry_file)
with open(entry_path, 'r', encoding='utf-8') as f:
content = f.read()
metadata, instructions = parse_frontmatter(content)
dir_name = os.path.basename(os.path.normpath(package_root))
return {
'package_root': os.path.abspath(package_root),
'entry_file': entry_file,
'name': str(metadata.get('name') or dir_name).strip(),
'display_name': str(metadata.get('display_name') or '').strip(),
'description': str(metadata.get('description') or '').strip(),
'instructions': instructions,
}
async def scan_directory_async(self, path: str) -> dict: async def scan_directory_async(self, path: str) -> dict:
box_service = self._require_box('Scanning a skill directory') box_service = self._require_box('Scanning a skill directory')
return await box_service.scan_skill_directory(path) return await box_service.scan_skill_directory(path)
@@ -299,39 +245,6 @@ class SkillService:
if inspect.isawaitable(result): if inspect.isawaitable(result):
await result await result
def _read_skill_package(self, package_root: str) -> dict:
entry = self._find_skill_entry(package_root)
if entry is None:
raise ValueError(f'No SKILL.md found in {package_root}')
resolved_root, entry_file = entry
entry_path = os.path.join(resolved_root, entry_file)
with open(entry_path, 'r', encoding='utf-8') as f:
content = f.read()
metadata, instructions = parse_frontmatter(content)
return {
'entry_file': entry_file,
'display_name': str(metadata.get('display_name') or '').strip(),
'description': str(metadata.get('description') or '').strip(),
'instructions': instructions,
}
async def _download_github_skill_to_temp(self, asset_url: str, tmp_dir: str) -> str:
zip_path = os.path.join(tmp_dir, 'skill.zip')
content = await self._download_github_asset(asset_url)
with open(zip_path, 'wb') as f:
f.write(content)
extract_dir = os.path.join(tmp_dir, 'extracted')
with zipfile.ZipFile(zip_path, 'r') as zf:
self._safe_extract_zip(zf, extract_dir)
entries = os.listdir(extract_dir)
if len(entries) == 1 and os.path.isdir(os.path.join(extract_dir, entries[0])):
return os.path.join(extract_dir, entries[0])
return extract_dir
async def _download_github_asset(self, asset_url: str) -> bytes: async def _download_github_asset(self, asset_url: str) -> bytes:
async with httpx.AsyncClient(follow_redirects=True, timeout=120) as client: async with httpx.AsyncClient(follow_redirects=True, timeout=120) as client:
resp = await client.get(asset_url) resp = await client.get(asset_url)
@@ -418,32 +331,6 @@ class SkillService:
if copied_files == 0: if copied_files == 0:
raise ValueError('GitHub skill directory is empty') raise ValueError('GitHub skill directory is empty')
def _extract_uploaded_skill_to_temp(self, file_bytes: bytes, tmp_dir: str) -> str:
extract_dir = os.path.join(tmp_dir, 'extracted')
try:
with zipfile.ZipFile(io.BytesIO(file_bytes), 'r') as zf:
self._safe_extract_zip(zf, extract_dir)
except zipfile.BadZipFile as exc:
raise ValueError('Uploaded file must be a valid .zip archive') from exc
entries = os.listdir(extract_dir)
if len(entries) == 1 and os.path.isdir(os.path.join(extract_dir, entries[0])):
return os.path.join(extract_dir, entries[0])
return extract_dir
def _resolve_github_source_root(self, root_path: str, source_subdir: str) -> str:
normalized = str(source_subdir or '').strip().replace('\\', '/').strip('/')
if not normalized:
return root_path
target_path = os.path.realpath(os.path.join(root_path, normalized))
root_path = os.path.realpath(root_path)
if target_path != root_path and not target_path.startswith(f'{root_path}{os.sep}'):
raise ValueError('source_subdir must stay within the downloaded repository')
if not os.path.isdir(target_path):
raise ValueError(f'source_subdir does not exist in the downloaded repository: {normalized}')
return target_path
def _uploaded_skill_target_stem(self, filename: str) -> str: def _uploaded_skill_target_stem(self, filename: str) -> str:
stem = os.path.splitext(os.path.basename(str(filename or '').strip()))[0] stem = os.path.splitext(os.path.basename(str(filename or '').strip()))[0]
safe_stem = ''.join(ch if ch.isalnum() or ch in ('-', '_') else '-' for ch in stem).strip('-_') safe_stem = ''.join(ch if ch.isalnum() or ch in ('-', '_') else '-' for ch in stem).strip('-_')
@@ -451,122 +338,6 @@ class SkillService:
safe_stem = 'uploaded-skill' safe_stem = 'uploaded-skill'
return safe_stem return safe_stem
def _build_preview_target_dir(self, base_target_name: str, source_path: str, suffix: str) -> str:
relative = str(source_path or '').strip().replace('\\', '/').strip('/')
leaf_name = relative.split('/')[-1] if relative else ''
target_name = base_target_name
if leaf_name and leaf_name != base_target_name:
target_name = f'{base_target_name}-{leaf_name}'
if suffix:
target_name = f'{target_name}-{suffix}'
return paths.get_data_path('skills', target_name)
def _preview_skill_candidates(self, root_path: str, *, base_target_name: str, suffix: str) -> list[dict]:
discovered = self._discover_skill_directories(root_path, max_depth=2)
if not discovered:
raise ValueError(f'No SKILL.md found in {root_path} or its subdirectories (max depth: 2)')
previews: list[dict] = []
for package_root, entry_file in discovered:
entry_path = os.path.join(package_root, entry_file)
with open(entry_path, 'r', encoding='utf-8') as f:
content = f.read()
metadata, instructions = parse_frontmatter(content)
relative_path = os.path.relpath(package_root, root_path)
if relative_path in ('', '.'):
relative_path = ''
dir_name = os.path.basename(os.path.normpath(package_root))
previews.append(
{
'source_path': relative_path.replace(os.sep, '/'),
'entry_file': entry_file,
'name': str(metadata.get('name') or dir_name).strip(),
'display_name': str(metadata.get('display_name') or '').strip(),
'description': str(metadata.get('description') or '').strip(),
'instructions': instructions,
'package_root': self._build_preview_target_dir(base_target_name, relative_path, suffix),
}
)
previews.sort(key=lambda item: item['source_path'])
return previews
def _select_preview_candidates(self, previews: list[dict], data: dict) -> list[dict]:
normalized_paths: list[str] = []
raw_source_paths = data.get('source_paths', [])
if isinstance(raw_source_paths, list):
for source_path in raw_source_paths:
normalized = str(source_path or '').strip().replace('\\', '/').strip('/')
if normalized not in normalized_paths:
normalized_paths.append(normalized)
legacy_source_path = str(data.get('source_path', '') or '').strip().replace('\\', '/').strip('/')
if legacy_source_path and legacy_source_path not in normalized_paths:
normalized_paths.append(legacy_source_path)
if len(previews) == 1 and not normalized_paths:
return previews
if not normalized_paths:
candidates = ', '.join(item['source_path'] or '.' for item in previews)
raise ValueError(f'Multiple skills found. Please choose one or more source_paths: {candidates}')
selected: list[dict] = []
available = {preview['source_path']: preview for preview in previews}
for normalized_path in normalized_paths:
preview = available.get(normalized_path)
if preview is None:
candidates = ', '.join(item['source_path'] or '.' for item in previews)
raise ValueError(f'Invalid source_path "{normalized_path}". Available: {candidates}')
selected.append(preview)
return selected
def _install_preview_candidates(self, root_path: str, selected_previews: list[dict]) -> list[dict]:
target_dirs: list[str] = []
for preview in selected_previews:
target_dir = self._normalize_package_root(preview['package_root'])
if target_dir in target_dirs:
raise ValueError(f'Duplicate target directory selected: {target_dir}')
if os.path.exists(target_dir):
raise ValueError(f'Skill directory already exists: {target_dir}')
target_dirs.append(target_dir)
installed_scans: list[dict] = []
created_dirs: list[str] = []
try:
for preview in selected_previews:
target_dir = self._normalize_package_root(preview['package_root'])
source_root = self._preview_source_root(root_path, preview['source_path'])
os.makedirs(os.path.dirname(target_dir), exist_ok=True)
shutil.copytree(source_root, target_dir)
created_dirs.append(target_dir)
installed_scans.append(self.scan_directory(target_dir))
except Exception:
for target_dir in created_dirs:
shutil.rmtree(target_dir, ignore_errors=True)
raise
return installed_scans
async def _resolve_installed_skills(self, scanned_skills: list[dict]) -> list[dict]:
installed_skills: list[dict] = []
for scanned in scanned_skills:
installed = await self.get_skill(scanned['name'])
if not installed:
installed = self._serialize_skill(scanned)
installed_skills.append(installed)
return installed_skills
@staticmethod
def _preview_source_root(root_path: str, source_path: str) -> str:
normalized = str(source_path or '').strip().replace('\\', '/').strip('/')
if not normalized:
return root_path
return os.path.join(root_path, normalized)
@staticmethod @staticmethod
def _is_github_skill_md_url(asset_url: str) -> bool: def _is_github_skill_md_url(asset_url: str) -> bool:
parsed = urlparse(str(asset_url or '').strip()) parsed = urlparse(str(asset_url or '').strip())
@@ -645,64 +416,6 @@ class SkillService:
return parsed.geturl() return parsed.geturl()
@staticmethod
def _safe_extract_zip(archive: zipfile.ZipFile, target_dir: str) -> None:
target_root = os.path.realpath(target_dir)
os.makedirs(target_root, exist_ok=True)
for member in archive.infolist():
member_name = member.filename
if not member_name or member_name.endswith('/'):
continue
normalized = posixpath.normpath(member_name)
if normalized.startswith('../') or normalized == '..' or os.path.isabs(normalized):
raise ValueError(f'Archive contains an unsafe path: {member_name}')
destination = os.path.realpath(os.path.join(target_root, normalized))
if destination != target_root and not destination.startswith(f'{target_root}{os.sep}'):
raise ValueError(f'Archive contains an unsafe path: {member_name}')
archive.extractall(target_root)
@staticmethod
def _resolve_create_field(data: dict, field: str, imported_skill_data: dict | None, *, default: str) -> str:
raw_value = data.get(field) if field in data else None
if raw_value is None:
if imported_skill_data is not None:
return str(imported_skill_data.get(field, default) or default)
return default
value = str(raw_value or '')
if imported_skill_data is not None and not value.strip():
return str(imported_skill_data.get(field, default) or default)
return value
def _write_skill_md(self, package_root: str, metadata: dict, instructions: str) -> None:
package_root = self._normalize_package_root(package_root)
os.makedirs(package_root, exist_ok=True)
content = _build_skill_md(metadata, instructions)
with open(os.path.join(package_root, 'SKILL.md'), 'w', encoding='utf-8') as f:
f.write(content)
def _managed_skill_path(self, skill_name: str) -> str:
return self._normalize_package_root(paths.get_data_path('skills', skill_name))
def _managed_install_root_for_package(self, package_root: str) -> str:
managed_root = self._normalize_package_root(paths.get_data_path('skills'))
if not package_root or package_root == managed_root:
return ''
prefix = f'{managed_root}{os.sep}'
if not package_root.startswith(prefix):
return ''
relative = os.path.relpath(package_root, managed_root)
top_level = relative.split(os.sep, 1)[0]
if top_level in ('', '.', '..'):
return ''
return os.path.join(managed_root, top_level)
@staticmethod @staticmethod
def _validate_skill_name(name: str) -> str: def _validate_skill_name(name: str) -> str:
name = str(name or '').strip() name = str(name or '').strip()
@@ -713,75 +426,3 @@ class SkillService:
if len(name) > 64: if len(name) > 64:
raise ValueError('Skill name cannot exceed 64 characters') raise ValueError('Skill name cannot exceed 64 characters')
return name return name
@staticmethod
def _normalize_package_root(package_root: str) -> str:
package_root = str(package_root).strip()
if not package_root:
return ''
return os.path.realpath(os.path.abspath(package_root))
@staticmethod
def _find_skill_entry(path: str) -> Optional[tuple[str, str]]:
for candidate in ('SKILL.md', 'skill.md'):
if os.path.isfile(os.path.join(path, candidate)):
return path, candidate
return None
def _discover_skill_directories(self, root_path: str, max_depth: int = 2) -> list[tuple[str, str]]:
discovered: list[tuple[str, str]] = []
queue: list[tuple[str, int]] = [(root_path, 0)]
seen: set[str] = set()
while queue:
current_path, depth = queue.pop(0)
normalized_path = os.path.abspath(current_path)
if normalized_path in seen:
continue
seen.add(normalized_path)
found = self._find_skill_entry(normalized_path)
if found:
discovered.append(found)
continue
if depth >= max_depth:
continue
try:
entries = sorted(os.scandir(normalized_path), key=lambda entry: entry.name)
except OSError:
continue
for entry in entries:
if entry.is_dir():
queue.append((entry.path, depth + 1))
return discovered
def _resolve_skill_path(self, skill: dict, path: str, *, expect_directory: bool) -> tuple[str, str]:
package_root = self._normalize_package_root(skill.get('package_root', ''))
if not package_root:
raise ValueError(f'Skill "{skill.get("name", "")}" has no package_root')
relative_path = str(path or '.').strip() or '.'
if os.path.isabs(relative_path):
raise ValueError('path must be relative to the skill package root')
normalized_relative = os.path.normpath(relative_path)
if normalized_relative.startswith('..') or normalized_relative == '..':
raise ValueError('path must stay within the skill package root')
target_path = os.path.realpath(os.path.join(package_root, normalized_relative))
if target_path != package_root and not target_path.startswith(f'{package_root}{os.sep}'):
raise ValueError('path must stay within the skill package root')
if expect_directory:
if not os.path.isdir(target_path):
raise ValueError(f'Skill directory not found: {relative_path}')
else:
parent_dir = os.path.dirname(target_path) or package_root
if parent_dir != package_root and not parent_dir.startswith(f'{package_root}{os.sep}'):
raise ValueError('path must stay within the skill package root')
return target_path, normalized_relative

View File

@@ -1,12 +1,9 @@
from __future__ import annotations from __future__ import annotations
import datetime as dt
import os import os
import typing import typing
from ..core import app from ..core import app
from .utils import parse_frontmatter
from ..utils import paths
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
pass pass
@@ -85,89 +82,6 @@ class SkillManager:
if not skill_name: if not skill_name:
return False return False
return skill_name in self.skills return skill_name in self.skills
return True
@staticmethod
def get_managed_skills_root() -> str:
"""Get the root directory for managed user skills."""
return paths.get_data_path('skills')
def _discover_skill_directories(self, root_path: str, max_depth: int = 6) -> list[tuple[str, str]]:
"""Discover all skill directories under root_path."""
discovered: list[tuple[str, str]] = []
root_path = os.path.realpath(os.path.abspath(root_path))
root_depth = root_path.rstrip(os.sep).count(os.sep)
for current_root, dirs, _files in os.walk(root_path):
current_root = os.path.realpath(current_root)
depth = current_root.rstrip(os.sep).count(os.sep) - root_depth
if depth > max_depth:
dirs[:] = []
continue
found = self._find_skill_entry(current_root)
if found is not None:
discovered.append(found)
dirs[:] = []
discovered.sort(key=lambda item: item[0])
return discovered
@staticmethod
def _find_skill_entry(path: str) -> tuple[str, str] | None:
"""Find SKILL.md entry file in a directory."""
for candidate in ('SKILL.md', 'skill.md'):
if os.path.isfile(os.path.join(path, candidate)):
return path, candidate
return None
def _load_skill_file(self, skill_data: dict) -> bool:
"""Load skill data from SKILL.md file."""
package_root = self._normalize_package_root(skill_data.get('package_root', ''))
entry_file = skill_data.get('entry_file', 'SKILL.md')
if not package_root:
self.ap.logger.warning('Skill package_root is empty, skipping')
return False
entry_path = os.path.join(package_root, entry_file)
try:
with open(entry_path, 'r', encoding='utf-8') as f:
content = f.read()
except FileNotFoundError:
self.ap.logger.warning(f'Skill entry file not found: {entry_path}, skipping')
return False
except OSError as exc:
self.ap.logger.warning(f'Failed to read skill entry file {entry_path}: {exc}, skipping')
return False
metadata, instructions = parse_frontmatter(content)
name = str(metadata.get('name') or os.path.basename(os.path.normpath(package_root))).strip()
if not name:
self.ap.logger.warning(f'Skill at {package_root} has no valid name, skipping')
return False
stat = os.stat(entry_path)
skill_data.clear()
skill_data.update(
{
'name': name,
'display_name': str(metadata.get('display_name') or name).strip(),
'description': str(metadata.get('description') or '').strip(),
'instructions': instructions,
'raw_content': content,
'package_root': package_root,
'entry_file': entry_file,
'created_at': dt.datetime.fromtimestamp(stat.st_ctime, tz=dt.timezone.utc).isoformat(),
'updated_at': dt.datetime.fromtimestamp(stat.st_mtime, tz=dt.timezone.utc).isoformat(),
}
)
return True
@staticmethod
def _normalize_package_root(package_root: str) -> str:
if not package_root:
return ''
return os.path.realpath(os.path.abspath(package_root))
def get_skill_by_name(self, name: str) -> dict | None: def get_skill_by_name(self, name: str) -> dict | None:
"""Get skill data by name.""" """Get skill data by name."""

View File

@@ -35,24 +35,10 @@ def _make_skill_data(
} }
class TestSkillManagerPackageLoading: class TestSkillManagerCache:
def test_load_skill_file_success(self): """The Box runtime is the only source of truth — SkillManager just holds
from langbot.pkg.skill.manager import SkillManager an in-memory cache populated by ``reload_skills``. There is no local
filesystem reader anymore."""
ap = _make_ap()
mgr = SkillManager(ap)
with tempfile.TemporaryDirectory() as tmpdir:
skill_md = os.path.join(tmpdir, 'SKILL.md')
with open(skill_md, 'w', encoding='utf-8') as f:
f.write('---\ndescription: Test skill\n---\n\n# Test Skill\nDo things.')
skill_data = _make_skill_data(package_root=tmpdir)
result = mgr._load_skill_file(skill_data)
assert result is True
assert skill_data['instructions'] == '# Test Skill\nDo things.'
assert skill_data['description'] == 'Test skill'
def test_refresh_skill_from_disk_reports_cache_presence(self): def test_refresh_skill_from_disk_reports_cache_presence(self):
"""Box is the only source of truth for skill content. refresh_skill_from_disk """Box is the only source of truth for skill content. refresh_skill_from_disk

View File

@@ -6,69 +6,6 @@ import pytest
from langbot.pkg.api.http.service.skill import SkillService from langbot.pkg.api.http.service.skill import SkillService
def _create_skill_file(
path,
*,
name: str = 'imported-skill',
display_name: str = '',
description: str = 'Imported from local directory',
body: str = 'Skill instructions',
) -> None:
frontmatter = ['name: ' + name, 'description: ' + description]
if display_name:
frontmatter.insert(1, 'display_name: ' + display_name)
path.write_text(
'---\n' + '\n'.join(frontmatter) + f'\n---\n\n{body}\n',
encoding='utf-8',
)
@pytest.fixture
def skill_service():
app = SimpleNamespace(
skill_mgr=SimpleNamespace(
refresh_skill_from_disk=lambda *_args, **_kwargs: True,
reload_skills=AsyncMock(),
)
)
return SkillService(app)
def test_scan_directory_supports_nested_skill_within_two_levels(skill_service, tmp_path):
nested_dir = tmp_path / 'downloaded' / 'self-improving-agent'
nested_dir.mkdir(parents=True)
_create_skill_file(nested_dir / 'SKILL.md')
result = skill_service.scan_directory(str(tmp_path))
assert result['package_root'] == str(nested_dir.resolve())
assert result['entry_file'] == 'SKILL.md'
assert result['name'] == 'imported-skill'
assert result['instructions'] == 'Skill instructions'
def test_scan_directory_rejects_ambiguous_nested_skill_directories(skill_service, tmp_path):
first_dir = tmp_path / 'skills' / 'alpha'
second_dir = tmp_path / 'skills' / 'beta'
first_dir.mkdir(parents=True)
second_dir.mkdir(parents=True)
_create_skill_file(first_dir / 'SKILL.md', body='alpha instructions')
_create_skill_file(second_dir / 'SKILL.md', body='beta instructions')
with pytest.raises(ValueError, match='Multiple skill directories found'):
skill_service.scan_directory(str(tmp_path))
def test_scan_directory_errors_when_skill_is_deeper_than_two_levels(skill_service, tmp_path):
deep_dir = tmp_path / 'a' / 'b' / 'c'
deep_dir.mkdir(parents=True)
_create_skill_file(deep_dir / 'SKILL.md')
with pytest.raises(ValueError, match='max depth: 2'):
skill_service.scan_directory(str(tmp_path))
class TestRequireBoxForWrite: class TestRequireBoxForWrite:
"""Box is the only source of truth for skills — there is no local """Box is the only source of truth for skills — there is no local
filesystem fallback. Every write and (most) read methods refuse cleanly filesystem fallback. Every write and (most) read methods refuse cleanly