mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-02 03:55:55 +00:00
refactor(skill): remove all local-filesystem fallbacks; Box is the sole source
Skills now flow exclusively through the Box runtime. Every read and write method funnels through ``_box_service()``; when Box is unavailable (disabled in config, connection failed, or simply not installed) the operation either returns an empty surface (``list_skills`` → []) or raises with a clear ``Box runtime ... not initialised / disabled / unavailable: ...`` message via the new ``_require_box(action)`` helper. Why: the legacy local-fallback path scanned ``data/skills/``, but Box manages its own ``box.local.skills_root`` (default ``data/box/skills/``). The two diverging directories caused stale / phantom skill lists when Box flapped, and the local-fallback writes silently bypassed all the sandboxing the operator had configured. SkillService (``api/http/service/skill.py``): - New ``_require_box(action)`` returns the box service or raises a structured ValueError. ``_require_box_for_write`` kept as alias - ``list_skills`` → returns [] when Box is down so the UI can render the disabled banner cleanly - ``get_skill`` / ``get_skill_by_name`` → return None - All read-file / write-file / scan-dir / create / update / delete / install / preview methods → ``_require_box`` then box delegate. Local fallback bodies (shutil.copytree, tempfile.mkdtemp, preview pipelines) removed entirely SkillManager (``pkg/skill/manager.py``): - ``reload_skills`` returns early with empty cache when Box is down. data/skills/ discovery loop removed - ``refresh_skill_from_disk`` now just reports cache presence; the on-disk re-parse is gone since Box is the only writer Tests: - Drop 11 obsolete test_skill_service.py tests that exercised the removed local-fallback paths (create/install/file/delete/update) - Add list-empty + read-refused tests; flip the legacy-allow test to legacy-refuses-too - Rewrite refresh_skill_from_disk test to match the new behaviour Several helper methods (_managed_skill_path, _resolve_skill_path, _preview_skill_candidates, _install_preview_candidates, etc.) are now unreachable; a follow-up commit will prune them so this diff stays reviewable. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -5,7 +5,6 @@ import inspect
|
||||
import os
|
||||
import posixpath
|
||||
import shutil
|
||||
import tempfile
|
||||
import zipfile
|
||||
from typing import Optional
|
||||
from urllib.parse import quote, unquote, urlparse
|
||||
@@ -74,24 +73,19 @@ class SkillService:
|
||||
return box_service
|
||||
return None
|
||||
|
||||
def _require_box_for_write(self, action: str) -> None:
|
||||
"""Refuse a write operation when Box is installed but unavailable.
|
||||
def _require_box(self, action: str):
|
||||
"""Return the Box service or raise if it is not available.
|
||||
|
||||
Distinguishes three states:
|
||||
- Box available → no-op (caller proceeds via the Box delegate path).
|
||||
- Box installed but disabled by config or currently failed → raise
|
||||
with a clear, actionable message. The frontend translates this to
|
||||
a banner / disabled affordance.
|
||||
- Box not installed at all (legacy / pre-Box dev mode, no
|
||||
``ap.box_service`` attribute) → also no-op so the local-skills
|
||||
fallback still works for that minimal setup.
|
||||
Box is the only source of truth for skills. Every read and write
|
||||
operation goes through it — there is no local-filesystem fallback.
|
||||
"""
|
||||
box_service = self._box_service()
|
||||
if box_service is not None:
|
||||
return box_service
|
||||
ap_box = getattr(self.ap, 'box_service', None)
|
||||
if ap_box is None:
|
||||
return # legacy mode, allow local fallback
|
||||
if getattr(ap_box, 'available', False):
|
||||
return # Box is up, delegate path will be used
|
||||
if not getattr(ap_box, 'enabled', True):
|
||||
reason = 'not initialised'
|
||||
elif not getattr(ap_box, 'enabled', True):
|
||||
reason = 'disabled in config (box.enabled = false)'
|
||||
else:
|
||||
connector_error = getattr(ap_box, '_connector_error', '') or 'currently unavailable'
|
||||
@@ -102,130 +96,48 @@ class SkillService:
|
||||
f'runtime is reachable before retrying.'
|
||||
)
|
||||
|
||||
def _require_box_for_write(self, action: str) -> None:
|
||||
"""Backwards-compatible alias preserved for clarity at call sites."""
|
||||
self._require_box(action)
|
||||
|
||||
@staticmethod
|
||||
def _serialize_skill(skill: dict) -> dict:
|
||||
return {field: skill.get(field) for field in _PUBLIC_SKILL_FIELDS if field in skill}
|
||||
|
||||
async def list_skills(self) -> list[dict]:
|
||||
# When Box is unavailable, surface an empty list rather than raising —
|
||||
# the skills page should render cleanly, and the UI separately renders
|
||||
# a "Box disabled / unavailable" banner via useBoxStatus.
|
||||
box_service = self._box_service()
|
||||
if box_service is not None:
|
||||
return [self._serialize_skill(skill) for skill in await box_service.list_skills()]
|
||||
|
||||
skills = [dict(skill) for skill in getattr(self.ap.skill_mgr, 'skills', {}).values()]
|
||||
skills.sort(key=lambda item: item.get('updated_at', ''), reverse=True)
|
||||
return [self._serialize_skill(skill) for skill in skills]
|
||||
if box_service is None:
|
||||
return []
|
||||
return [self._serialize_skill(skill) for skill in await box_service.list_skills()]
|
||||
|
||||
async def get_skill(self, skill_name: str) -> Optional[dict]:
|
||||
box_service = self._box_service()
|
||||
if box_service is not None:
|
||||
skill = await box_service.get_skill(skill_name)
|
||||
return self._serialize_skill(skill) if skill else None
|
||||
|
||||
skill = getattr(self.ap.skill_mgr, 'get_skill_by_name', lambda _name: None)(skill_name)
|
||||
if box_service is None:
|
||||
return None
|
||||
skill = await box_service.get_skill(skill_name)
|
||||
return self._serialize_skill(skill) if skill else None
|
||||
|
||||
async def get_skill_by_name(self, name: str) -> Optional[dict]:
|
||||
return await self.get_skill(name)
|
||||
|
||||
async def create_skill(self, data: dict) -> dict:
|
||||
self._require_box_for_write('Creating a skill')
|
||||
box_service = self._box_service()
|
||||
if box_service is not None:
|
||||
created = await box_service.create_skill(data)
|
||||
await self._reload_skills()
|
||||
return self._serialize_skill(created)
|
||||
|
||||
name = self._validate_skill_name(data.get('name', ''))
|
||||
if await self.get_skill_by_name(name):
|
||||
raise ValueError(f'Skill with name "{name}" already exists')
|
||||
|
||||
package_root = self._normalize_package_root(data.get('package_root', ''))
|
||||
managed_root = self._managed_skill_path(name)
|
||||
target_root = managed_root
|
||||
imported_skill_data: dict | None = None
|
||||
|
||||
if package_root and self._managed_install_root_for_package(package_root):
|
||||
if not os.path.isdir(package_root):
|
||||
raise ValueError(f'Directory does not exist: {package_root}')
|
||||
target_root = package_root
|
||||
imported_skill_data = self._read_skill_package(target_root)
|
||||
elif package_root and package_root != managed_root:
|
||||
if not os.path.isdir(package_root):
|
||||
raise ValueError(f'Directory does not exist: {package_root}')
|
||||
if os.path.exists(managed_root):
|
||||
raise ValueError(f'Skill directory already exists: {managed_root}')
|
||||
os.makedirs(os.path.dirname(managed_root), exist_ok=True)
|
||||
shutil.copytree(package_root, managed_root)
|
||||
imported_skill_data = self._read_skill_package(managed_root)
|
||||
else:
|
||||
os.makedirs(managed_root, exist_ok=True)
|
||||
|
||||
metadata = {
|
||||
'name': name,
|
||||
'display_name': self._resolve_create_field(data, 'display_name', imported_skill_data, default=''),
|
||||
'description': self._resolve_create_field(data, 'description', imported_skill_data, default=''),
|
||||
}
|
||||
instructions = self._resolve_create_field(data, 'instructions', imported_skill_data, default='')
|
||||
self._write_skill_md(target_root, metadata, instructions)
|
||||
|
||||
box_service = self._require_box('Creating a skill')
|
||||
created = await box_service.create_skill(data)
|
||||
await self._reload_skills()
|
||||
created = await self.get_skill(name)
|
||||
if not created:
|
||||
raise ValueError(f'Failed to create skill "{name}"')
|
||||
return created
|
||||
return self._serialize_skill(created)
|
||||
|
||||
async def update_skill(self, skill_name: str, data: dict) -> dict:
|
||||
self._require_box_for_write('Editing a skill')
|
||||
box_service = self._box_service()
|
||||
if box_service is not None:
|
||||
updated = await box_service.update_skill(skill_name, data)
|
||||
await self._reload_skills()
|
||||
return self._serialize_skill(updated)
|
||||
|
||||
skill = await self.get_skill(skill_name)
|
||||
if not skill:
|
||||
raise ValueError(f'Skill "{skill_name}" not found')
|
||||
|
||||
requested_name = str(data.get('name', skill['name']) or skill['name']).strip()
|
||||
if requested_name != skill['name']:
|
||||
raise ValueError('Renaming skills is not supported')
|
||||
|
||||
requested_package_root = str(data.get('package_root', '') or '').strip()
|
||||
existing_package_root = self._normalize_package_root(skill['package_root'])
|
||||
if requested_package_root and self._normalize_package_root(requested_package_root) != existing_package_root:
|
||||
raise ValueError('Updating package_root is not supported; recreate the skill to import a different package')
|
||||
|
||||
metadata = {
|
||||
'name': skill['name'],
|
||||
'display_name': data.get('display_name', skill.get('display_name', '')),
|
||||
'description': data.get('description', skill.get('description', '')),
|
||||
}
|
||||
instructions = str(data.get('instructions', skill.get('instructions', '')) or '')
|
||||
self._write_skill_md(skill['package_root'], metadata, instructions)
|
||||
|
||||
box_service = self._require_box('Editing a skill')
|
||||
updated = await box_service.update_skill(skill_name, data)
|
||||
await self._reload_skills()
|
||||
updated = await self.get_skill(skill_name)
|
||||
if not updated:
|
||||
raise ValueError(f'Skill "{skill_name}" not found after update')
|
||||
return updated
|
||||
return self._serialize_skill(updated)
|
||||
|
||||
async def delete_skill(self, skill_name: str) -> bool:
|
||||
box_service = self._box_service()
|
||||
if box_service is not None:
|
||||
await box_service.delete_skill(skill_name)
|
||||
await self._reload_skills()
|
||||
return True
|
||||
|
||||
skill = await self.get_skill(skill_name)
|
||||
if not skill:
|
||||
raise ValueError(f'Skill "{skill_name}" not found')
|
||||
|
||||
package_root = self._normalize_package_root(skill['package_root'])
|
||||
managed_install_root = self._managed_install_root_for_package(package_root)
|
||||
if not managed_install_root:
|
||||
raise ValueError('Only managed skills under data/skills can be deleted via LangBot')
|
||||
|
||||
shutil.rmtree(managed_install_root, ignore_errors=True)
|
||||
box_service = self._require_box('Deleting a skill')
|
||||
await box_service.delete_skill(skill_name)
|
||||
await self._reload_skills()
|
||||
return True
|
||||
|
||||
@@ -236,96 +148,21 @@ class SkillService:
|
||||
include_hidden: bool = False,
|
||||
max_entries: int = 200,
|
||||
) -> dict:
|
||||
box_service = self._box_service()
|
||||
if box_service is not None:
|
||||
return await box_service.list_skill_files(skill_name, path, include_hidden, max_entries)
|
||||
|
||||
skill = await self.get_skill(skill_name)
|
||||
if not skill:
|
||||
raise ValueError(f'Skill "{skill_name}" not found')
|
||||
|
||||
target_dir, relative_path = self._resolve_skill_path(skill, path, expect_directory=True)
|
||||
entries: list[dict] = []
|
||||
with os.scandir(target_dir) as iterator:
|
||||
for entry in sorted(iterator, key=lambda item: item.name):
|
||||
if not include_hidden and entry.name.startswith('.'):
|
||||
continue
|
||||
entry_rel_path = entry.name if relative_path in ('', '.') else os.path.join(relative_path, entry.name)
|
||||
is_dir = entry.is_dir()
|
||||
entries.append(
|
||||
{
|
||||
'path': entry_rel_path.replace(os.sep, '/'),
|
||||
'name': entry.name,
|
||||
'is_dir': is_dir,
|
||||
'size': None if is_dir else entry.stat().st_size,
|
||||
}
|
||||
)
|
||||
if len(entries) >= max_entries:
|
||||
break
|
||||
|
||||
return {
|
||||
'skill': {'name': skill['name']},
|
||||
'base_path': '.' if relative_path in ('', '.') else relative_path.replace(os.sep, '/'),
|
||||
'entries': entries,
|
||||
'truncated': len(entries) >= max_entries,
|
||||
}
|
||||
box_service = self._require_box('Browsing skill files')
|
||||
return await box_service.list_skill_files(skill_name, path, include_hidden, max_entries)
|
||||
|
||||
async def read_skill_file(self, skill_name: str, path: str) -> dict:
|
||||
box_service = self._box_service()
|
||||
if box_service is not None:
|
||||
return await box_service.read_skill_file(skill_name, path)
|
||||
|
||||
skill = await self.get_skill(skill_name)
|
||||
if not skill:
|
||||
raise ValueError(f'Skill "{skill_name}" not found')
|
||||
|
||||
target_path, relative_path = self._resolve_skill_path(skill, path, expect_directory=False)
|
||||
if not os.path.isfile(target_path):
|
||||
raise ValueError(f'Skill file not found: {relative_path}')
|
||||
|
||||
try:
|
||||
with open(target_path, 'r', encoding='utf-8') as f:
|
||||
content = f.read()
|
||||
except UnicodeDecodeError as exc:
|
||||
raise ValueError(f'Skill file is not valid UTF-8 text: {relative_path}') from exc
|
||||
|
||||
return {
|
||||
'skill': {'name': skill['name']},
|
||||
'path': relative_path.replace(os.sep, '/'),
|
||||
'content': content,
|
||||
}
|
||||
box_service = self._require_box('Reading a skill file')
|
||||
return await box_service.read_skill_file(skill_name, path)
|
||||
|
||||
async def write_skill_file(self, skill_name: str, path: str, content: str) -> dict:
|
||||
self._require_box_for_write('Editing skill files')
|
||||
box_service = self._box_service()
|
||||
if box_service is not None:
|
||||
result = await box_service.write_skill_file(skill_name, path, content)
|
||||
await self._reload_skills()
|
||||
return result
|
||||
|
||||
skill = await self.get_skill(skill_name)
|
||||
if not skill:
|
||||
raise ValueError(f'Skill "{skill_name}" not found')
|
||||
|
||||
target_path, relative_path = self._resolve_skill_path(skill, path, expect_directory=False)
|
||||
os.makedirs(os.path.dirname(target_path), exist_ok=True)
|
||||
with open(target_path, 'w', encoding='utf-8') as f:
|
||||
f.write(content)
|
||||
|
||||
skill_mgr = getattr(self.ap, 'skill_mgr', None)
|
||||
if skill_mgr is not None:
|
||||
refresh_skill = getattr(skill_mgr, 'refresh_skill_from_disk', None)
|
||||
if callable(refresh_skill):
|
||||
refresh_skill(skill.get('name', ''))
|
||||
|
||||
return {
|
||||
'skill': {'name': skill['name']},
|
||||
'path': relative_path.replace(os.sep, '/'),
|
||||
'bytes_written': len(content.encode('utf-8')),
|
||||
}
|
||||
box_service = self._require_box('Editing skill files')
|
||||
result = await box_service.write_skill_file(skill_name, path, content)
|
||||
await self._reload_skills()
|
||||
return result
|
||||
|
||||
async def install_from_github(self, data: dict) -> list[dict]:
|
||||
self._require_box_for_write('Installing a skill from GitHub')
|
||||
box_service = self._require_box('Installing a skill from GitHub')
|
||||
owner = str(data['owner']).strip()
|
||||
repo = str(data['repo']).strip()
|
||||
release_tag = str(data.get('release_tag', '')).strip()
|
||||
@@ -336,38 +173,20 @@ class SkillService:
|
||||
asset_url = self._validate_github_asset_url(raw_asset_url, owner=owner, repo=repo, release_tag=release_tag)
|
||||
source_subdir = str(data.get('source_subdir', '') or '').strip()
|
||||
|
||||
box_service = self._box_service()
|
||||
if box_service is not None:
|
||||
zip_bytes = await self._download_github_asset(asset_url)
|
||||
filename = f'{repo}-{release_tag.lstrip("v").replace("/", "-") or "source"}.zip'
|
||||
installed = await box_service.install_skill_zip(
|
||||
zip_bytes,
|
||||
filename,
|
||||
source_paths=data.get('source_paths') or [],
|
||||
source_path=str(data.get('source_path', '') or ''),
|
||||
source_subdir=source_subdir,
|
||||
)
|
||||
await self._reload_skills()
|
||||
return [self._serialize_skill(skill) for skill in installed]
|
||||
|
||||
tmp_dir = tempfile.mkdtemp(prefix='langbot_skill_')
|
||||
try:
|
||||
skill_root = await self._download_github_skill_to_temp(asset_url, tmp_dir)
|
||||
skill_root = self._resolve_github_source_root(skill_root, source_subdir)
|
||||
previews = self._preview_skill_candidates(
|
||||
skill_root,
|
||||
base_target_name=repo,
|
||||
suffix=release_tag.lstrip('v').replace('/', '-') or 'source',
|
||||
)
|
||||
selected_previews = self._select_preview_candidates(previews, data)
|
||||
scanned = self._install_preview_candidates(skill_root, selected_previews)
|
||||
finally:
|
||||
shutil.rmtree(tmp_dir, ignore_errors=True)
|
||||
|
||||
zip_bytes = await self._download_github_asset(asset_url)
|
||||
filename = f'{repo}-{release_tag.lstrip("v").replace("/", "-") or "source"}.zip'
|
||||
installed = await box_service.install_skill_zip(
|
||||
zip_bytes,
|
||||
filename,
|
||||
source_paths=data.get('source_paths') or [],
|
||||
source_path=str(data.get('source_path', '') or ''),
|
||||
source_subdir=source_subdir,
|
||||
)
|
||||
await self._reload_skills()
|
||||
return await self._resolve_installed_skills(scanned)
|
||||
return [self._serialize_skill(skill) for skill in installed]
|
||||
|
||||
async def preview_install_from_github(self, data: dict) -> list[dict]:
|
||||
box_service = self._require_box('Previewing a skill from GitHub')
|
||||
owner = str(data['owner']).strip()
|
||||
repo = str(data['repo']).strip()
|
||||
release_tag = str(data.get('release_tag', '')).strip()
|
||||
@@ -378,26 +197,12 @@ class SkillService:
|
||||
asset_url = self._validate_github_asset_url(raw_asset_url, owner=owner, repo=repo, release_tag=release_tag)
|
||||
source_subdir = str(data.get('source_subdir', '') or '').strip()
|
||||
|
||||
box_service = self._box_service()
|
||||
if box_service is not None:
|
||||
zip_bytes = await self._download_github_asset(asset_url)
|
||||
return await box_service.preview_skill_zip(
|
||||
zip_bytes,
|
||||
f'{repo}-{release_tag.lstrip("v").replace("/", "-") or "source"}.zip',
|
||||
source_subdir=source_subdir,
|
||||
)
|
||||
|
||||
tmp_dir = tempfile.mkdtemp(prefix='langbot_skill_preview_')
|
||||
try:
|
||||
skill_root = await self._download_github_skill_to_temp(asset_url, tmp_dir)
|
||||
skill_root = self._resolve_github_source_root(skill_root, source_subdir)
|
||||
return self._preview_skill_candidates(
|
||||
skill_root,
|
||||
base_target_name=repo,
|
||||
suffix=release_tag.lstrip('v').replace('/', '-') or 'source',
|
||||
)
|
||||
finally:
|
||||
shutil.rmtree(tmp_dir, ignore_errors=True)
|
||||
zip_bytes = await self._download_github_asset(asset_url)
|
||||
return await box_service.preview_skill_zip(
|
||||
zip_bytes,
|
||||
f'{repo}-{release_tag.lstrip("v").replace("/", "-") or "source"}.zip',
|
||||
source_subdir=source_subdir,
|
||||
)
|
||||
|
||||
async def install_from_zip_upload(
|
||||
self,
|
||||
@@ -407,116 +212,46 @@ class SkillService:
|
||||
source_paths: list[str] | None = None,
|
||||
source_path: str = '',
|
||||
) -> list[dict]:
|
||||
self._require_box_for_write('Installing a skill from upload')
|
||||
box_service = self._box_service()
|
||||
if box_service is not None:
|
||||
installed = await box_service.install_skill_zip(
|
||||
file_bytes,
|
||||
filename,
|
||||
source_paths=source_paths or [],
|
||||
source_path=source_path,
|
||||
)
|
||||
await self._reload_skills()
|
||||
return [self._serialize_skill(skill) for skill in installed]
|
||||
|
||||
if not file_bytes:
|
||||
raise ValueError('Uploaded file is empty')
|
||||
|
||||
tmp_dir = tempfile.mkdtemp(prefix='langbot_skill_upload_')
|
||||
try:
|
||||
skill_root = self._extract_uploaded_skill_to_temp(file_bytes, tmp_dir)
|
||||
base_target_name = self._uploaded_skill_target_stem(filename)
|
||||
previews = self._preview_skill_candidates(
|
||||
skill_root,
|
||||
base_target_name=base_target_name,
|
||||
suffix='upload',
|
||||
)
|
||||
selected_previews = self._select_preview_candidates(
|
||||
previews,
|
||||
{'source_paths': source_paths or [], 'source_path': source_path},
|
||||
)
|
||||
scanned = self._install_preview_candidates(skill_root, selected_previews)
|
||||
finally:
|
||||
shutil.rmtree(tmp_dir, ignore_errors=True)
|
||||
|
||||
box_service = self._require_box('Installing a skill from upload')
|
||||
installed = await box_service.install_skill_zip(
|
||||
file_bytes,
|
||||
filename,
|
||||
source_paths=source_paths or [],
|
||||
source_path=source_path,
|
||||
)
|
||||
await self._reload_skills()
|
||||
return await self._resolve_installed_skills(scanned)
|
||||
return [self._serialize_skill(skill) for skill in installed]
|
||||
|
||||
async def preview_install_from_zip_upload(self, *, file_bytes: bytes, filename: str) -> list[dict]:
|
||||
box_service = self._box_service()
|
||||
if box_service is not None:
|
||||
return await box_service.preview_skill_zip(file_bytes, filename)
|
||||
|
||||
if not file_bytes:
|
||||
raise ValueError('Uploaded file is empty')
|
||||
|
||||
tmp_dir = tempfile.mkdtemp(prefix='langbot_skill_upload_preview_')
|
||||
try:
|
||||
skill_root = self._extract_uploaded_skill_to_temp(file_bytes, tmp_dir)
|
||||
return self._preview_skill_candidates(
|
||||
skill_root,
|
||||
base_target_name=self._uploaded_skill_target_stem(filename),
|
||||
suffix='upload',
|
||||
)
|
||||
finally:
|
||||
shutil.rmtree(tmp_dir, ignore_errors=True)
|
||||
box_service = self._require_box('Previewing a skill upload')
|
||||
return await box_service.preview_skill_zip(file_bytes, filename)
|
||||
|
||||
async def _install_github_skill_md(self, asset_url: str, *, owner: str, repo: str, data: dict) -> list[dict]:
|
||||
zip_bytes, filename, package_name = await self._download_github_skill_directory_as_zip(
|
||||
box_service = self._require_box('Installing a skill from GitHub')
|
||||
zip_bytes, filename, _package_name = await self._download_github_skill_directory_as_zip(
|
||||
asset_url,
|
||||
owner=owner,
|
||||
repo=repo,
|
||||
)
|
||||
|
||||
box_service = self._box_service()
|
||||
if box_service is not None:
|
||||
installed = await box_service.install_skill_zip(
|
||||
zip_bytes,
|
||||
filename,
|
||||
source_paths=data.get('source_paths') or [],
|
||||
source_path=str(data.get('source_path', '') or ''),
|
||||
target_suffix='',
|
||||
)
|
||||
await self._reload_skills()
|
||||
return [self._serialize_skill(skill) for skill in installed]
|
||||
|
||||
tmp_dir = tempfile.mkdtemp(prefix='langbot_skill_md_')
|
||||
try:
|
||||
skill_root = self._extract_uploaded_skill_to_temp(zip_bytes, tmp_dir)
|
||||
previews = self._preview_skill_candidates(
|
||||
skill_root,
|
||||
base_target_name=package_name,
|
||||
suffix='',
|
||||
)
|
||||
selected_previews = self._select_preview_candidates(previews, data)
|
||||
scanned = self._install_preview_candidates(skill_root, selected_previews)
|
||||
finally:
|
||||
shutil.rmtree(tmp_dir, ignore_errors=True)
|
||||
|
||||
installed = await box_service.install_skill_zip(
|
||||
zip_bytes,
|
||||
filename,
|
||||
source_paths=data.get('source_paths') or [],
|
||||
source_path=str(data.get('source_path', '') or ''),
|
||||
target_suffix='',
|
||||
)
|
||||
await self._reload_skills()
|
||||
return await self._resolve_installed_skills(scanned)
|
||||
return [self._serialize_skill(skill) for skill in installed]
|
||||
|
||||
async def _preview_github_skill_md(self, asset_url: str, *, owner: str, repo: str) -> list[dict]:
|
||||
box_service = self._require_box('Previewing a skill from GitHub')
|
||||
zip_bytes, _filename, package_name = await self._download_github_skill_directory_as_zip(
|
||||
asset_url,
|
||||
owner=owner,
|
||||
repo=repo,
|
||||
)
|
||||
|
||||
box_service = self._box_service()
|
||||
if box_service is not None:
|
||||
return await box_service.preview_skill_zip(zip_bytes, f'{package_name}.zip', target_suffix='')
|
||||
|
||||
tmp_dir = tempfile.mkdtemp(prefix='langbot_skill_md_preview_')
|
||||
try:
|
||||
skill_root = self._extract_uploaded_skill_to_temp(zip_bytes, tmp_dir)
|
||||
return self._preview_skill_candidates(
|
||||
skill_root,
|
||||
base_target_name=package_name,
|
||||
suffix='',
|
||||
)
|
||||
finally:
|
||||
shutil.rmtree(tmp_dir, ignore_errors=True)
|
||||
return await box_service.preview_skill_zip(zip_bytes, f'{package_name}.zip', target_suffix='')
|
||||
|
||||
async def reload_skills(self) -> list[dict]:
|
||||
await self._reload_skills()
|
||||
@@ -552,10 +287,8 @@ class SkillService:
|
||||
}
|
||||
|
||||
async def scan_directory_async(self, path: str) -> dict:
|
||||
box_service = self._box_service()
|
||||
if box_service is not None:
|
||||
return await box_service.scan_skill_directory(path)
|
||||
return self.scan_directory(path)
|
||||
box_service = self._require_box('Scanning a skill directory')
|
||||
return await box_service.scan_skill_directory(path)
|
||||
|
||||
async def _reload_skills(self) -> None:
|
||||
skill_mgr = getattr(self.ap, 'skill_mgr', None)
|
||||
|
||||
@@ -34,90 +34,57 @@ class SkillManager:
|
||||
await self.reload_skills()
|
||||
|
||||
async def reload_skills(self):
|
||||
"""Reload all skills.
|
||||
"""Reload all skills from the Box runtime.
|
||||
|
||||
In sandbox deployments, skills are owned by the Box runtime so the
|
||||
sandbox can mount them without requiring LangBot to share the same
|
||||
filesystem. If Box is unavailable, fall back to the legacy local
|
||||
data/skills directory.
|
||||
|
||||
NOTE: This performs a full scan. For registering a single new skill,
|
||||
consider adding it directly to self.skills instead of reloading all.
|
||||
Current implementation is acceptable for typical skill counts (<50).
|
||||
Box is the only source of truth for skills. When Box is unavailable
|
||||
(disabled in config or unreachable) the cache is emptied — there is
|
||||
no local filesystem fallback. Skills whose ``package_root`` is no
|
||||
longer visible on the LangBot-side filesystem are dropped so they
|
||||
don't surface as stale ``extra_mounts``.
|
||||
"""
|
||||
self.skills = {}
|
||||
|
||||
box_service = getattr(self.ap, 'box_service', None)
|
||||
if box_service is not None and getattr(box_service, 'available', False):
|
||||
try:
|
||||
dropped = 0
|
||||
for skill_data in await box_service.list_skills():
|
||||
skill_name = skill_data.get('name')
|
||||
if not skill_name:
|
||||
continue
|
||||
# Drop skills whose package_root is no longer visible on the
|
||||
# LangBot-side filesystem (e.g. Box volume was rebuilt or the
|
||||
# directory was deleted out-of-band). Keeping them in the cache
|
||||
# would cause stale extra_mounts and confusing UI states.
|
||||
package_root = str(skill_data.get('package_root', '') or '').strip()
|
||||
if package_root and not os.path.isdir(package_root):
|
||||
self.ap.logger.warning(
|
||||
f'Skill "{skill_name}" reported by Box runtime but '
|
||||
f'package_root missing on LangBot filesystem '
|
||||
f'({package_root}); dropping from in-memory cache.'
|
||||
)
|
||||
dropped += 1
|
||||
continue
|
||||
self.skills[skill_name] = skill_data
|
||||
if dropped:
|
||||
self.ap.logger.warning(
|
||||
f'Loaded {len(self.skills)} skills from Box runtime '
|
||||
f'({dropped} dropped due to missing package_root).'
|
||||
)
|
||||
else:
|
||||
self.ap.logger.info(f'Loaded {len(self.skills)} skills from Box runtime')
|
||||
return
|
||||
except Exception as exc:
|
||||
self.ap.logger.warning(f'Failed to load skills from Box runtime, falling back to local data: {exc}')
|
||||
if box_service is None or not getattr(box_service, 'available', False):
|
||||
self.ap.logger.info('Box runtime unavailable; skill cache is empty.')
|
||||
return
|
||||
|
||||
# Ensure data/skills/ exists
|
||||
managed_root = self.get_managed_skills_root()
|
||||
os.makedirs(managed_root, exist_ok=True)
|
||||
|
||||
# Load all skills from data/skills/
|
||||
if os.path.isdir(managed_root):
|
||||
for package_root, entry_file in self._discover_skill_directories(managed_root):
|
||||
skill_data = {
|
||||
'package_root': package_root,
|
||||
'entry_file': entry_file,
|
||||
}
|
||||
if not self._load_skill_file(skill_data):
|
||||
try:
|
||||
dropped = 0
|
||||
for skill_data in await box_service.list_skills():
|
||||
skill_name = skill_data.get('name')
|
||||
if not skill_name:
|
||||
continue
|
||||
package_root = str(skill_data.get('package_root', '') or '').strip()
|
||||
if package_root and not os.path.isdir(package_root):
|
||||
self.ap.logger.warning(
|
||||
f'Skill "{skill_name}" reported by Box runtime but '
|
||||
f'package_root missing on LangBot filesystem '
|
||||
f'({package_root}); dropping from in-memory cache.'
|
||||
)
|
||||
dropped += 1
|
||||
continue
|
||||
|
||||
skill_name = skill_data['name']
|
||||
self.skills[skill_name] = skill_data
|
||||
|
||||
self.ap.logger.info(f'Loaded {len(self.skills)} skills')
|
||||
if dropped:
|
||||
self.ap.logger.warning(
|
||||
f'Loaded {len(self.skills)} skills from Box runtime '
|
||||
f'({dropped} dropped due to missing package_root).'
|
||||
)
|
||||
else:
|
||||
self.ap.logger.info(f'Loaded {len(self.skills)} skills from Box runtime')
|
||||
except Exception as exc:
|
||||
self.ap.logger.warning(f'Failed to load skills from Box runtime: {exc}')
|
||||
|
||||
def refresh_skill_from_disk(self, skill_name: str) -> bool:
|
||||
"""Refresh a single skill from disk."""
|
||||
"""Confirm a single skill is present in the cache.
|
||||
|
||||
With Box as the only source of truth, the actual reload is driven by
|
||||
SkillService callers awaiting ``reload_skills``; this method only
|
||||
reports whether the cache still has the skill.
|
||||
"""
|
||||
if not skill_name:
|
||||
return False
|
||||
|
||||
box_service = getattr(self.ap, 'box_service', None)
|
||||
if box_service is not None and getattr(box_service, 'available', False):
|
||||
# Box refresh is async; callers that need a guaranteed refresh call
|
||||
# SkillService.write_skill_file/update_skill, which awaits reload.
|
||||
return skill_name in self.skills
|
||||
|
||||
skill_data = self.skills.get(skill_name)
|
||||
if not skill_data:
|
||||
return False
|
||||
|
||||
if not self._load_skill_file(skill_data):
|
||||
return False
|
||||
|
||||
self.skills[skill_name] = skill_data
|
||||
return skill_name in self.skills
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
|
||||
@@ -54,29 +54,25 @@ class TestSkillManagerPackageLoading:
|
||||
assert skill_data['instructions'] == '# Test Skill\nDo things.'
|
||||
assert skill_data['description'] == 'Test skill'
|
||||
|
||||
def test_refresh_skill_from_disk_updates_cached_dict_in_place(self):
|
||||
def test_refresh_skill_from_disk_reports_cache_presence(self):
|
||||
"""Box is the only source of truth for skill content. refresh_skill_from_disk
|
||||
now just reports whether the skill is still in the in-memory cache —
|
||||
the actual content refresh is driven by SkillService awaiting
|
||||
``reload_skills`` after every Box mutation."""
|
||||
from langbot.pkg.skill.manager import SkillManager
|
||||
|
||||
ap = _make_ap()
|
||||
mgr = SkillManager(ap)
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
skill_md = os.path.join(tmpdir, 'SKILL.md')
|
||||
with open(skill_md, 'w', encoding='utf-8') as f:
|
||||
f.write('---\ndescription: First\n---\n\nOriginal instructions')
|
||||
# Empty cache → returns False
|
||||
assert mgr.refresh_skill_from_disk('test-skill') is False
|
||||
|
||||
skill_data = _make_skill_data(name='test-skill', package_root=tmpdir)
|
||||
assert mgr._load_skill_file(skill_data) is True
|
||||
|
||||
mgr.skills['test-skill'] = skill_data
|
||||
|
||||
with open(skill_md, 'w', encoding='utf-8') as f:
|
||||
f.write('---\ndescription: Second\n---\n\nUpdated instructions')
|
||||
|
||||
assert mgr.refresh_skill_from_disk('test-skill') is True
|
||||
assert mgr.skills['test-skill'] is skill_data
|
||||
assert skill_data['instructions'] == 'Updated instructions'
|
||||
assert skill_data['description'] == 'Second'
|
||||
# Cache populated → returns True; method does NOT mutate the cache
|
||||
cached = _make_skill_data(name='test-skill', instructions='Cached')
|
||||
mgr.skills['test-skill'] = cached
|
||||
assert mgr.refresh_skill_from_disk('test-skill') is True
|
||||
assert mgr.skills['test-skill'] is cached
|
||||
assert mgr.refresh_skill_from_disk('') is False
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_reload_skills_drops_box_skills_with_missing_package_root(self):
|
||||
|
||||
@@ -1,12 +1,9 @@
|
||||
import io
|
||||
import os
|
||||
import zipfile
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import AsyncMock
|
||||
|
||||
import pytest
|
||||
|
||||
from src.langbot.pkg.api.http.service.skill import SkillService
|
||||
from langbot.pkg.api.http.service.skill import SkillService
|
||||
|
||||
|
||||
def _create_skill_file(
|
||||
@@ -73,9 +70,9 @@ def test_scan_directory_errors_when_skill_is_deeper_than_two_levels(skill_servic
|
||||
|
||||
|
||||
class TestRequireBoxForWrite:
|
||||
"""Writes must refuse when ``ap.box_service`` is installed but unavailable
|
||||
(disabled in config OR connection failed). Legacy setups without
|
||||
``ap.box_service`` continue to use the local fallback."""
|
||||
"""Box is the only source of truth for skills — there is no local
|
||||
filesystem fallback. Every write and (most) read methods refuse cleanly
|
||||
when the Box runtime is disabled, unreachable, or simply not installed."""
|
||||
|
||||
def _ap_with_disabled_box(self):
|
||||
return SimpleNamespace(
|
||||
@@ -134,347 +131,22 @@ class TestRequireBoxForWrite:
|
||||
await service.install_from_zip_upload(file_bytes=b'', filename='x.zip')
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_legacy_setup_without_box_service_still_allows_local_create(self, tmp_path, monkeypatch):
|
||||
"""Setups that never installed ap.box_service (pre-Box dev mode) keep
|
||||
using the local-skills fallback path — that's the whole point of the
|
||||
``getattr`` distinguisher in _require_box_for_write."""
|
||||
monkeypatch.setenv('LANGBOT_DATA_ROOT', str(tmp_path / 'data'))
|
||||
async def test_create_skill_refused_when_box_service_missing_entirely(self):
|
||||
"""No ap.box_service attribute at all (truly minimal setup):
|
||||
Box is the only source of truth, so creation must still refuse."""
|
||||
service = SkillService(SimpleNamespace(skill_mgr=SimpleNamespace(reload_skills=AsyncMock())))
|
||||
service.get_skill_by_name = AsyncMock(return_value=None)
|
||||
service.get_skill = AsyncMock(
|
||||
return_value={
|
||||
'name': 'local-skill',
|
||||
'package_root': str(tmp_path / 'data' / 'skills' / 'local-skill'),
|
||||
'description': '',
|
||||
'instructions': '',
|
||||
}
|
||||
)
|
||||
|
||||
# Does not raise — gate is a no-op without ap.box_service.
|
||||
await service.create_skill(
|
||||
{'name': 'local-skill', 'display_name': 'Local', 'description': '', 'instructions': 'hi'}
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_skill_import_preserves_existing_skill_content_when_form_fields_blank(tmp_path, monkeypatch):
|
||||
source_dir = tmp_path / 'external-skills' / 'manual-skill'
|
||||
source_dir.mkdir(parents=True)
|
||||
_create_skill_file(
|
||||
source_dir / 'SKILL.md',
|
||||
display_name='Imported Skill',
|
||||
description='Imported description',
|
||||
body='Original instructions',
|
||||
)
|
||||
|
||||
service = SkillService(SimpleNamespace(skill_mgr=SimpleNamespace(reload_skills=AsyncMock())))
|
||||
service.get_skill_by_name = AsyncMock(return_value=None)
|
||||
managed_root = tmp_path / 'data' / 'skills' / 'imported-skill'
|
||||
service.get_skill = AsyncMock(
|
||||
return_value={
|
||||
'name': 'imported-skill',
|
||||
'package_root': str(managed_root.resolve()),
|
||||
'description': 'Imported description',
|
||||
'instructions': 'Original instructions',
|
||||
}
|
||||
)
|
||||
|
||||
monkeypatch.setenv('LANGBOT_DATA_ROOT', str(tmp_path / 'data'))
|
||||
|
||||
await service.create_skill(
|
||||
{
|
||||
'name': 'imported-skill',
|
||||
'package_root': str(source_dir),
|
||||
'display_name': '',
|
||||
'description': '',
|
||||
'instructions': '',
|
||||
}
|
||||
)
|
||||
|
||||
content = (managed_root / 'SKILL.md').read_text(encoding='utf-8')
|
||||
assert 'display_name: Imported Skill' in content
|
||||
assert 'description: Imported description' in content
|
||||
assert content.endswith('Original instructions')
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_skill_reuses_existing_managed_directory_without_copying(tmp_path, monkeypatch):
|
||||
managed_root = tmp_path / 'data' / 'skills' / 'demo-repo' / 'skills' / 'nested-skill'
|
||||
managed_root.mkdir(parents=True)
|
||||
_create_skill_file(
|
||||
managed_root / 'SKILL.md',
|
||||
name='nested-skill',
|
||||
display_name='Nested Skill',
|
||||
description='Already managed',
|
||||
body='Managed instructions',
|
||||
)
|
||||
|
||||
service = SkillService(SimpleNamespace(skill_mgr=SimpleNamespace(reload_skills=AsyncMock())))
|
||||
service.get_skill_by_name = AsyncMock(return_value=None)
|
||||
service.get_skill = AsyncMock(
|
||||
return_value={
|
||||
'name': 'nested-skill',
|
||||
'package_root': str(managed_root.resolve()),
|
||||
'description': 'Already managed',
|
||||
'instructions': 'Managed instructions',
|
||||
}
|
||||
)
|
||||
|
||||
monkeypatch.setenv('LANGBOT_DATA_ROOT', str(tmp_path / 'data'))
|
||||
|
||||
await service.create_skill(
|
||||
{
|
||||
'name': 'nested-skill',
|
||||
'package_root': str(managed_root),
|
||||
'display_name': '',
|
||||
'description': '',
|
||||
'instructions': '',
|
||||
}
|
||||
)
|
||||
|
||||
copied_root = tmp_path / 'data' / 'skills' / 'nested-skill'
|
||||
assert not copied_root.exists()
|
||||
content = (managed_root / 'SKILL.md').read_text(encoding='utf-8')
|
||||
assert 'display_name: Nested Skill' in content
|
||||
assert content.endswith('Managed instructions')
|
||||
|
||||
|
||||
def _build_skill_archive() -> bytes:
|
||||
stream = io.BytesIO()
|
||||
with zipfile.ZipFile(stream, 'w') as archive:
|
||||
archive.writestr(
|
||||
'demo-repo-main/skills/nested-skill/SKILL.md',
|
||||
'---\nname: imported-skill\ndescription: Imported from GitHub archive\n---\n\nSkill instructions\n',
|
||||
)
|
||||
return stream.getvalue()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_install_from_github_supports_nested_skill_archive(skill_service, tmp_path, monkeypatch):
|
||||
archive_bytes = _build_skill_archive()
|
||||
|
||||
class _FakeResponse:
|
||||
def __init__(self, content: bytes) -> None:
|
||||
self.content = content
|
||||
|
||||
def raise_for_status(self) -> None:
|
||||
return None
|
||||
|
||||
class _FakeAsyncClient:
|
||||
def __init__(self, *args, **kwargs) -> None:
|
||||
pass
|
||||
|
||||
async def __aenter__(self):
|
||||
return self
|
||||
|
||||
async def __aexit__(self, exc_type, exc, tb):
|
||||
return None
|
||||
|
||||
async def get(self, url: str) -> _FakeResponse:
|
||||
return _FakeResponse(archive_bytes)
|
||||
|
||||
monkeypatch.setenv('LANGBOT_DATA_ROOT', str(tmp_path / 'data'))
|
||||
monkeypatch.setattr('src.langbot.pkg.api.http.service.skill.httpx.AsyncClient', _FakeAsyncClient)
|
||||
skill_service.get_skill = AsyncMock(return_value=None)
|
||||
|
||||
result = await skill_service.install_from_github(
|
||||
{
|
||||
'asset_url': 'https://api.github.com/repos/example/demo-repo/zipball/main',
|
||||
'owner': 'example',
|
||||
'repo': 'demo-repo',
|
||||
'release_tag': 'main',
|
||||
}
|
||||
)
|
||||
|
||||
expected_root = tmp_path / 'data' / 'skills' / 'demo-repo-nested-skill-main'
|
||||
assert result[0]['package_root'] == str(expected_root.resolve())
|
||||
assert (expected_root / 'SKILL.md').read_text(encoding='utf-8').endswith('Skill instructions\n')
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_install_from_github_rejects_asset_url_outside_requested_repo(skill_service, tmp_path, monkeypatch):
|
||||
monkeypatch.setenv('LANGBOT_DATA_ROOT', str(tmp_path / 'data'))
|
||||
|
||||
with pytest.raises(ValueError, match='owner/repo'):
|
||||
await skill_service.install_from_github(
|
||||
{
|
||||
'asset_url': 'https://api.github.com/repos/example/other-repo/zipball/main',
|
||||
'owner': 'example',
|
||||
'repo': 'demo-repo',
|
||||
'release_tag': 'main',
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_install_from_github_rejects_zip_with_path_traversal(skill_service, tmp_path, monkeypatch):
|
||||
stream = io.BytesIO()
|
||||
with zipfile.ZipFile(stream, 'w') as archive:
|
||||
archive.writestr('../escape.txt', 'boom')
|
||||
archive_bytes = stream.getvalue()
|
||||
|
||||
class _FakeResponse:
|
||||
def __init__(self, content: bytes) -> None:
|
||||
self.content = content
|
||||
|
||||
def raise_for_status(self) -> None:
|
||||
return None
|
||||
|
||||
class _FakeAsyncClient:
|
||||
def __init__(self, *args, **kwargs) -> None:
|
||||
pass
|
||||
|
||||
async def __aenter__(self):
|
||||
return self
|
||||
|
||||
async def __aexit__(self, exc_type, exc, tb):
|
||||
return None
|
||||
|
||||
async def get(self, url: str) -> _FakeResponse:
|
||||
return _FakeResponse(archive_bytes)
|
||||
|
||||
monkeypatch.setenv('LANGBOT_DATA_ROOT', str(tmp_path / 'data'))
|
||||
monkeypatch.setattr('src.langbot.pkg.api.http.service.skill.httpx.AsyncClient', _FakeAsyncClient)
|
||||
|
||||
with pytest.raises(ValueError, match='unsafe path'):
|
||||
await skill_service.install_from_github(
|
||||
{
|
||||
'asset_url': 'https://api.github.com/repos/example/demo-repo/zipball/main',
|
||||
'owner': 'example',
|
||||
'repo': 'demo-repo',
|
||||
'release_tag': 'main',
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_skill_file_operations_stay_within_package_root(skill_service, tmp_path):
|
||||
skill_dir = tmp_path / 'mood-logger'
|
||||
skill_dir.mkdir()
|
||||
_create_skill_file(skill_dir / 'SKILL.md')
|
||||
(skill_dir / 'resources').mkdir()
|
||||
(skill_dir / 'resources' / 'keywords_zh.json').write_text('{"hello": 1}\n', encoding='utf-8')
|
||||
|
||||
skill_record = {
|
||||
'name': 'mood-logger',
|
||||
'package_root': str(skill_dir),
|
||||
'entry_file': 'SKILL.md',
|
||||
}
|
||||
skill_service.get_skill = AsyncMock(return_value=skill_record)
|
||||
|
||||
listed = await skill_service.list_skill_files('mood-logger', path='resources')
|
||||
assert listed['entries'] == [
|
||||
{
|
||||
'path': 'resources/keywords_zh.json',
|
||||
'name': 'keywords_zh.json',
|
||||
'is_dir': False,
|
||||
'size': os.path.getsize(skill_dir / 'resources' / 'keywords_zh.json'),
|
||||
}
|
||||
]
|
||||
|
||||
read_back = await skill_service.read_skill_file('mood-logger', 'resources/keywords_zh.json')
|
||||
assert read_back['content'] == '{"hello": 1}\n'
|
||||
|
||||
written = await skill_service.write_skill_file('mood-logger', 'resources/affinity.py', 'print("ok")\n')
|
||||
assert written['path'] == 'resources/affinity.py'
|
||||
assert (skill_dir / 'resources' / 'affinity.py').read_text(encoding='utf-8') == 'print("ok")\n'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_skill_file_operations_reject_path_traversal(skill_service, tmp_path):
|
||||
skill_dir = tmp_path / 'mood-logger'
|
||||
skill_dir.mkdir()
|
||||
_create_skill_file(skill_dir / 'SKILL.md')
|
||||
|
||||
skill_service.get_skill = AsyncMock(
|
||||
return_value={
|
||||
'name': 'mood-logger',
|
||||
'package_root': str(skill_dir),
|
||||
'entry_file': 'SKILL.md',
|
||||
}
|
||||
)
|
||||
|
||||
with pytest.raises(ValueError, match='path must stay within the skill package root'):
|
||||
await skill_service.read_skill_file('mood-logger', '../outside.txt')
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_update_skill_rejects_package_root_change(tmp_path):
|
||||
service = SkillService(SimpleNamespace(skill_mgr=SimpleNamespace(reload_skills=AsyncMock())))
|
||||
skill_root = tmp_path / 'data' / 'skills' / 'writer'
|
||||
service.get_skill = AsyncMock(
|
||||
return_value={
|
||||
'name': 'writer',
|
||||
'package_root': str(skill_root.resolve()),
|
||||
'display_name': 'Writer',
|
||||
'description': 'Writes things',
|
||||
'instructions': 'Do work',
|
||||
}
|
||||
)
|
||||
|
||||
with pytest.raises(ValueError, match='Updating package_root is not supported'):
|
||||
await service.update_skill('writer', {'package_root': str(tmp_path / 'other-root')})
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_delete_skill_removes_managed_skill_directory(tmp_path, monkeypatch):
|
||||
managed_root = tmp_path / 'data' / 'skills' / 'self-improving-agent'
|
||||
managed_root.mkdir(parents=True)
|
||||
_create_skill_file(managed_root / 'SKILL.md')
|
||||
|
||||
service = SkillService(SimpleNamespace(skill_mgr=SimpleNamespace(reload_skills=AsyncMock())))
|
||||
service.get_skill = AsyncMock(
|
||||
return_value={
|
||||
'name': 'self-improving-agent',
|
||||
'package_root': str(managed_root.resolve()),
|
||||
}
|
||||
)
|
||||
|
||||
monkeypatch.setenv('LANGBOT_DATA_ROOT', str(tmp_path / 'data'))
|
||||
|
||||
result = await service.delete_skill('self-improving-agent')
|
||||
|
||||
assert result is True
|
||||
assert not managed_root.exists()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_delete_skill_removes_managed_install_root_for_nested_package(tmp_path, monkeypatch):
|
||||
install_root = tmp_path / 'data' / 'skills' / 'demo-repo'
|
||||
package_root = install_root / 'skills' / 'nested-skill'
|
||||
package_root.mkdir(parents=True)
|
||||
_create_skill_file(package_root / 'SKILL.md')
|
||||
|
||||
service = SkillService(SimpleNamespace(skill_mgr=SimpleNamespace(reload_skills=AsyncMock())))
|
||||
service.get_skill = AsyncMock(
|
||||
return_value={
|
||||
'name': 'nested-skill',
|
||||
'package_root': str(package_root.resolve()),
|
||||
}
|
||||
)
|
||||
|
||||
monkeypatch.setenv('LANGBOT_DATA_ROOT', str(tmp_path / 'data'))
|
||||
|
||||
await service.delete_skill('nested-skill')
|
||||
|
||||
assert not install_root.exists()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_delete_skill_rejects_external_package_directory(tmp_path, monkeypatch):
|
||||
external_root = tmp_path / 'external-skills' / 'manual-skill'
|
||||
external_root.mkdir(parents=True)
|
||||
_create_skill_file(external_root / 'SKILL.md')
|
||||
|
||||
service = SkillService(SimpleNamespace(skill_mgr=SimpleNamespace(reload_skills=AsyncMock())))
|
||||
service.get_skill = AsyncMock(
|
||||
return_value={
|
||||
'name': 'manual-skill',
|
||||
'package_root': str(external_root.resolve()),
|
||||
}
|
||||
)
|
||||
|
||||
monkeypatch.chdir(tmp_path)
|
||||
|
||||
with pytest.raises(ValueError, match='Only managed skills under data/skills'):
|
||||
await service.delete_skill('manual-skill')
|
||||
with pytest.raises(ValueError, match='not initialised'):
|
||||
await service.create_skill({'name': 'x'})
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_skills_returns_empty_when_box_unavailable(self):
|
||||
"""list_skills should render an empty surface (not crash) so the
|
||||
skills page can show a banner instead of a broken state."""
|
||||
service = SkillService(self._ap_with_disabled_box())
|
||||
assert await service.list_skills() == []
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_read_skill_file_refused_when_box_unavailable(self):
|
||||
service = SkillService(self._ap_with_disabled_box())
|
||||
with pytest.raises(ValueError, match='Reading a skill file'):
|
||||
await service.read_skill_file('x', 'a.txt')
|
||||
|
||||
Reference in New Issue
Block a user