feat(skills): add Agent Skills management system (#1917)

* feat(skills): add Agent Skills management system

Implement comprehensive skills management feature inspired by agentskills spec:

Backend:
- Add Skill and SkillPipelineBinding database entities
- Add database migration (dbm018) for skills tables
- Implement SkillManager for skill loading, matching, and resolution
- Implement SkillService for CRUD operations
- Add skills API endpoints for skill and pipeline binding management
- Integrate skill index injection into pipeline preprocessor
- Add skill activation detection in LocalAgentRunner

Frontend:
- Add Skills page with listing, search, and type filter
- Add SkillDetailDialog for create/edit with preview
- Add SkillCard and SkillForm components
- Add skills API methods to BackendClient
- Add skills entry to sidebar navigation
- Add i18n translations (en-US, zh-Hans)

Features:
- Support skill and workflow types
- Sub-skill composition via {{INVOKE_SKILL: name}} syntax
- Progressive disclosure (index in prompt, full instructions on activation)
- Pipeline-specific skill bindings with priority

* fix: resolve cherry-pick conflicts for agentskills onto sandbox

- Remove non-existent external_kb service import
- Add skill_mgr mock to localagent sandbox_exec tests
- Keep database version at 24 (sandbox branch's latest)

* feat(skills): upgrade to package-backed skills with sandbox execution

  Evolve the skills system from pure prompt-based to package-backed with
  sandbox tool execution support:

  - Add source_type/package_root/entry_file/skill_tools fields to Skill entity
  - SkillManager loads SKILL.md from local package directories
  - SkillToolLoader as 4th dispatch layer in ToolManager (query-scoped)
  - LocalAgent injects skill tools into use_funcs on skill activation
  - BoxService.execute_skill_tool() runs scripts in sandbox (ro mount, env params)
  - Skill tool names auto-namespaced as skill__{skill}__{tool}
  - API validation for package_root allowlist and entry path traversal
  - Frontend source_type toggle, package_root input, skill_tools editor
  - Migration renumbered to 025 with ALTER TABLE fallback for existing DBs
  - Fix unclosed limitation section in i18n files
  - Fix skills API methods misplaced outside BackendClient class

* fix: test info

* feat(skills): switch skills to package-backed storage and add import tooling
  - skills 从 inline/package 双轨收敛成 package-first
  - instructions 改为写入并读取 SKILL.md
  - 新增本地目录扫描和 GitHub 安装 skill
  - 前端把 skills 整合进 plugins 页,新增 SkillsComponent 和 GitHub 导入弹窗
  - skill form 去掉 source_type / type 筛选,改成目录扫描驱动
  - Box skill tool 挂载模式从 ro 改成 rw
  - 测试和中英文文案同步更新

* feat: simplify langbot skill create and import

* refactor(skills): clean up legacy skill API and harden activation flow

* refactor(skills): remove skill dependency expansion and add skill_get

* fix: lint

* fix: delete

* fix(skills): align tool manager loader initialization

* refactor: remove sandbox execute skill

* fix(skills): hide activation markers and isolate skill activation flow

* refactor(skills): switch skill model to filesystem-backed packages

* refactor(skills): switch skill model to filesystem-backed packages

* refactor(skills): unify runtime skill access around filesystem paths

* refactor(skills): unify runtime skill access around filesystem paths

* feat(skills): align rw package design and fix skill activation, visibility, and lint issues

* refactor(skills): replace rich authoring API with import/reload flow and update
  Box design doc

* feat(box): add sandbox_exec tool loop for local-agent calculations

* feat(box): add host workspace mounting and sandbox_exec guidance

* feat(box): add BoxProfile with resource limits and improved output truncation

  - Implement head+tail output truncation (60/40 split) so LLM sees both
    beginning and final results; add streaming byte-limited reads in backend
    to prevent unbounded memory usage (_MAX_RAW_OUTPUT_BYTES = 1MB)
  - Define BoxProfile model with locked fields and max_timeout_sec clamping
  - Add four built-in profiles: default, offline_readonly, network_basic,
    network_extended with differentiated resource and security constraints
  - Add resource limit fields to BoxSpec (cpus, memory_mb, pids_limit,
    read_only_rootfs) and pass corresponding container CLI flags
    (--cpus, --memory, --pids-limit, --read-only, --tmpfs)
  - Profile loaded from config (box.profile), applied in service layer
    before BoxSpec validation; locked fields cannot be overridden by
    tool-call parameters

* feat(box): add obs

* refactor(box): unify box service lifecycle and local runtime
  management

* refactor(box): remove legacy in-process runtime code and clean up smells

After the architecture settled on always using an independent Box Runtime
service, several pieces of compatibility code and design shortcuts were
left behind. This commit cleans them up:

- Remove `LocalBoxRuntimeClient` and `create_box_runtime_client` from
  production code (moved to test-only helper).
- Remove unused `_clip_bytes` method from backend.
- Remove `__langbot_session_placeholder__` hack by making `BoxSpec.cmd`
  default to empty and validating non-empty only in `runtime.execute()`.
- Extract `get_box_config()` helper to eliminate 5× duplicated config
  access boilerplate.
- Remove `session_id`/`host_path`/`host_path_mode` from the LLM-facing
  tool schema to enforce request-scoped session isolation.
- Fix dual shutdown path: `NativeToolLoader.shutdown()` no longer calls
  `box_service.shutdown()` (handled by `Application.dispose()`).
- Simplify `_assert_session_compatible` with a loop.
- Inline client creation in `BoxRuntimeConnector`.
- Remove redundant `BOX__RUNTIME_URL` env var from docker-compose
  (auto-detected by code).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat(box/mcp): integrate MCP stdio with Box sandbox — auto-isolation, dep install, security

  ## Summary

  When Podman/Docker is available, all stdio-mode MCP servers now automatically
  run inside Box containers with dependency installation, path rewriting, and
  lifecycle management. When no container runtime exists, LangBot starts normally
  and stdio MCP falls back to host-direct execution.

  ## What changed

  ### MCP stdio → Box integration (mcp.py)
  - Add `MCPServerBoxConfig` pydantic model for structured box configuration
    with validation and defaults (network, host_path_mode, timeouts, resources)
  - Auto-infer `host_path` from command/args with venv detection: recognizes
    `.venv/bin/python` patterns and walks up to the project root
  - Rewrite host paths to container `/workspace` paths transparently
  - Replace venv python commands with container-native `python`
  - Auto-detect `pyproject.toml`/`setup.py`/`requirements.txt` and run
    `pip install` inside the container before starting the MCP server
  - Copy project to `/tmp` before install to handle read-only mounts
  - Add retry with exponential backoff (3 retries, 2s/4s/8s delays)
  - Add Box managed process health monitoring (poll every 5s)
  - Fix session leak: `_cleanup_box_stdio_session()` now runs in `finally`
    block of `_lifecycle_loop`, covering all exit paths
  - Fix retry logic: `_ready_event` is only set after all retries exhaust
    or on success, not on first failure
  - Enhance `get_runtime_info_dict()` with `box_session_id` and `box_enabled`

  ### Box security (security.py — new)
  - `validate_sandbox_security()` blocks dangerous host paths:
    `/etc`, `/proc`, `/sys`, `/dev`, `/root`, `/boot`, `/run`,
    docker.sock, podman socket
  - Called at the start of `CLISandboxBackend.start_session()`

  ### Box models (models.py)
  - Add `BoxHostMountMode.NONE` — skips volume mount entirely
  - Adjust `validate_host_mount_consistency` to allow arbitrary workdir
    when `host_path_mode=NONE`

  ### Box backend (backend.py)
  - Add `validate_sandbox_security()` call in `start_session()`
  - Add `langbot.box.config_hash` label on containers for drift detection
  - Handle `BoxHostMountMode.NONE` — skip `-v` mount arg
  - Add `cleanup_orphaned_containers()` to base class (no-op default) and
    CLI implementation (single batched `rm -f` command)

  ### Box runtime (runtime.py)
  - Call `cleanup_orphaned_containers()` during `initialize()` to remove
    lingering containers from previous runs

  ### Box service (service.py)
  - Graceful degradation: `initialize()` catches runtime errors and sets
    `available=False` instead of crashing LangBot startup
  - Add `available` property and guard on `execute_sandbox_tool()`
  - Add `skip_host_mount_validation` parameter to `build_spec()` and
    `create_session()` — MCP paths are admin-configured and trusted,
    bypassing `allowed_host_mount_roots` restrictions meant for
    LLM-generated sandbox_exec commands

  ### Default behavior
  - stdio MCP servers automatically use Box when `box_service.available`
    is True (Podman/Docker detected); no explicit `box` config needed
  - When no container runtime exists, falls back to host-direct stdio
  - MCP Box defaults: `network=on` (for pip install), `read_only_rootfs=false`
    (for site-packages), `host_path_mode=ro`, `startup_timeout=120s`

  ### Tests
  - `test_box_security.py`: blocked paths, safe paths, subpath rejection
  - `test_mcp_box_integration.py`: config model, path rewriting, venv
    unwrap, host_path inference, payload building, runtime info, box
    availability check
  - `test_box_service.py`: `BoxHostMountMode.NONE` validation tests

* feat(box/mcp): instance-based orphan cleanup, error classification, session API, and integration tests

  ## Changes

  ### Precise orphan container cleanup
  - Runtime generates a unique instance_id on startup
  - Every container gets a `langbot.box.instance_id` label
  - `cleanup_orphaned_containers()` only removes containers from
    previous instances, preserving containers owned by the current one
  - Containers from older versions (no label) are also cleaned up
  - `cleanup_orphaned_containers` added to `BaseSandboxBackend` as
    a no-op default method, removing hasattr duck-typing

  ### Fine-grained MCP error classification
  - New `MCPSessionErrorPhase` enum with 7 phases: session_create,
    dep_install, process_start, relay_connect, mcp_init, runtime,
    tool_call
  - Each phase in `_init_box_stdio_server()` sets the error phase
    before re-raising, enabling precise failure diagnosis
  - `retry_count` tracked across retry attempts
  - `get_runtime_info_dict()` exposes `error_phase` and `retry_count`

  ### GET /v1/sessions/{id} API
  - `BoxRuntime.get_session()` returns session details including
    managed process info when present
  - `handle_get_session` HTTP handler + route in server.py
  - `BoxRuntimeClient.get_session()` abstract method + remote impl

  ### stdio defaults to Box when runtime is available
  - `_uses_box_stdio()` checks `box_service.available` instead of
    requiring explicit `box` key in server_config
  - `BoxService.initialize()` catches runtime errors gracefully,
    sets `available=False` instead of crashing LangBot startup
  - When no container runtime exists, stdio MCP falls back to
    host-direct execution

  ### Code quality (from /simplify review)
  - Extracted `_VENV_DIRS` / `_VENV_BIN_DIRS` module-level constants
  - Removed dead `_box_network_mode()` method and unused `bc` variable
  - Fixed broken import `from ....box.models` → `from ...box.models`
  - Cached `_resolve_host_path()` result — computed once, passed through
  - Config hash now includes `host_path` field
  - Batched orphan cleanup into single `rm -f` command

  ### Session leak fix
  - `_cleanup_box_stdio_session()` now runs in `_lifecycle_loop`'s
    finally block, covering all exit paths (normal shutdown, error,
    retry, final failure)

  ### Integration tests
  - 6 end-to-end tests covering managed process lifecycle, WebSocket
    stdio bidirectional IO, session cleanup verification, single
    session query, process exit detection, and orphan cleanup safety

* refactor: use rpc

* fix: import

* refactor(box): clean up sandbox subsystem code quality and efficiency

  - Fix O(n²) stderr trimming in runtime.py with running length tracker
  - Remove dead code: RESERVED_CONTAINER_PATHS, _subprocess_wait_task,
    unused config_hash computation, unused imports
  - Deduplicate connection callback in BoxRuntimeConnector, parse URL once
  - Use enum comparison instead of stringly-typed spec.network.value check
  - Replace manual _result_to_dict/_session_to_dict with model_dump()
  - Cache NativeToolLoader tool definition and sandbox system guidance
  - Extract _is_path_under() helper to eliminate duplicated path checks
  - Import SANDBOX_EXEC_TOOL_NAME from native.py instead of redefining
  - Add JSON startswith guard in logging_utils to skip futile json.loads
  - Fix ruff lint errors (F401 unused imports, F841 unused variables)

* fix: ruff

* refactor(sandbox): keep box logic out of pipeline and localagent

  - Move sandbox system-prompt guidance from LocalAgentRunner into
    BoxService.get_system_guidance() so all box domain knowledge stays
    in the box module.
  - Remove standalone logging_utils.py; merge format_result_log() into
    MessageHandler base class alongside cut_str().
  - Strip sandbox-specific JSON parsing from log formatting; tool
    results now use generic truncation.
  - Revert TYPE_CHECKING changes in stage.py and runner.py that were
    unrelated to this feature.
  - Skip two test files affected by a pre-existing circular import
    (runner ↔ app) until the import cycle is resolved in a separate PR.

* refactor(box): move box runtime to langbot-plugin-sdk

  Extract self-contained box runtime modules (actions, backend, client,
  errors, models, runtime, security, server) to langbot-plugin-sdk and
  update all imports to use `langbot_plugin.box.*`. Keep only service
  and
  connector in LangBot core as they depend on the Application context.

  - Update docker-compose to use `langbot_plugin.box.server` entry
  point
  - Update pyproject.toml to use local SDK via `tool.uv.sources`
  - Remove migrated source files and their unit/integration tests
  - Update remaining test imports to match new module paths

* fix: ruff

* fix(box): tighten sandbox exposure and restore box integration coverage

* refactor(types): remove quoted annotations under postponed evaluation

* chore(sandbox): move MCP loader changes to follow-up branch

* refactor(plugins): simplify GitHub install flow to default master archive

* revert(api): restore plugin GitHub import flow in plugins controller

* Improve data-root handling and skill install previews

* Add managed skill authoring tools for local agents

* Refactor the skills UI around sidebar detail pages

* Document why managed skill authoring tools bypass box

* fix: lint

* feat(web): refactor plugin/skill install flows and fix skills page

- Fix sidebar skill icon
- Add skills route and error page component
- Refactor plugin GitHub install from dialog modal to inline card
- Add skill install dropdown menu (create/upload/github) in sidebar
- Wire sidebar → skills page communication via pendingSkillInstallAction context
- Add i18n keys for error page and skill install actions

* fix(web): persist sidebar collapsible section open state on navigation

Sections opened via sub-item navigation now retain their expanded state
when the user switches to a different section, instead of collapsing
because the isActive fallback becomes false.

---------

Co-authored-by: youhuanghe <1051233107@qq.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-authored-by: Junyan Qin <rockchinq@gmail.com>
This commit is contained in:
fdc310
2026-04-08 16:09:06 +08:00
committed by WangCham
parent fcf74c3b6c
commit 4b8a8c5e31
50 changed files with 6375 additions and 518 deletions

View File

@@ -73,15 +73,21 @@ class PipelinesRouterGroup(group.RouterGroup):
plugins = await self.ap.plugin_connector.list_plugins(component_kinds=pipeline_component_kinds)
mcp_servers = await self.ap.mcp_service.get_mcp_servers(contain_runtime_info=True)
# Get available skills
available_skills = await self.ap.skill_service.list_skills()
extensions_prefs = pipeline.get('extensions_preferences', {})
return self.success(
data={
'enable_all_plugins': extensions_prefs.get('enable_all_plugins', True),
'enable_all_mcp_servers': extensions_prefs.get('enable_all_mcp_servers', True),
'enable_all_skills': extensions_prefs.get('enable_all_skills', True),
'bound_plugins': extensions_prefs.get('plugins', []),
'available_plugins': plugins,
'bound_mcp_servers': extensions_prefs.get('mcp_servers', []),
'available_mcp_servers': mcp_servers,
'bound_skills': extensions_prefs.get('skills', []),
'available_skills': available_skills,
}
)
elif quart.request.method == 'PUT':
@@ -89,11 +95,19 @@ class PipelinesRouterGroup(group.RouterGroup):
json_data = await quart.request.json
enable_all_plugins = json_data.get('enable_all_plugins', True)
enable_all_mcp_servers = json_data.get('enable_all_mcp_servers', True)
enable_all_skills = json_data.get('enable_all_skills', True)
bound_plugins = json_data.get('bound_plugins', [])
bound_mcp_servers = json_data.get('bound_mcp_servers', [])
bound_skills = json_data.get('bound_skills', [])
await self.ap.pipeline_service.update_pipeline_extensions(
pipeline_uuid, bound_plugins, bound_mcp_servers, enable_all_plugins, enable_all_mcp_servers
pipeline_uuid,
bound_plugins,
bound_mcp_servers,
enable_all_plugins,
enable_all_mcp_servers,
bound_skills=bound_skills,
enable_all_skills=enable_all_skills,
)
return self.success()

View File

@@ -6,6 +6,7 @@ import re
import httpx
import uuid
import os
from urllib.parse import urlparse
from .....core import taskmgr
from .. import group
@@ -14,6 +15,43 @@ from langbot_plugin.runtime.plugin.mgr import PluginInstallSource
@group.group_class('plugins', '/api/v1/plugins')
class PluginsRouterGroup(group.RouterGroup):
@staticmethod
def _parse_github_repo_url(repo_url: str) -> dict | None:
raw_url = str(repo_url or '').strip()
if not raw_url:
return None
if not re.match(r'^[a-zA-Z][a-zA-Z0-9+.-]*://', raw_url):
raw_url = f'https://{raw_url}'
parsed = urlparse(raw_url)
if parsed.netloc.lower() not in ('github.com', 'www.github.com'):
return None
parts = [part for part in parsed.path.strip('/').split('/') if part]
if len(parts) < 2:
return None
owner = parts[0]
repo = parts[1]
if repo.endswith('.git'):
repo = repo[:-4]
if not owner or not repo:
return None
ref = ''
subdir = ''
if len(parts) >= 4 and parts[2] in ('tree', 'blob'):
ref = parts[3]
subdir = '/'.join(parts[4:]).strip('/')
return {
'owner': owner,
'repo': repo,
'ref': ref,
'subdir': subdir,
}
async def _check_extensions_limit(self) -> str | None:
"""Check if extensions limit is reached. Returns error response if limit exceeded, None otherwise."""
limitation = self.ap.instance_config.data.get('system', {}).get('limitation', {})
@@ -151,17 +189,37 @@ class PluginsRouterGroup(group.RouterGroup):
data = await quart.request.json
repo_url = data.get('repo_url', '')
# Parse GitHub repository URL to extract owner and repo
# Supports: https://github.com/owner/repo or github.com/owner/repo
pattern = r'github\.com/([^/]+)/([^/]+?)(?:\.git)?(?:/.*)?$'
match = re.search(pattern, repo_url)
if not match:
parsed_repo = self._parse_github_repo_url(repo_url)
if not parsed_repo:
return self.http_status(400, -1, 'Invalid GitHub repository URL')
owner, repo = match.groups()
owner = parsed_repo['owner']
repo = parsed_repo['repo']
requested_ref = parsed_repo['ref']
requested_subdir = parsed_repo['subdir']
try:
if requested_ref:
return self.success(
data={
'releases': [
{
'id': 0,
'tag_name': requested_ref,
'name': requested_ref,
'published_at': '',
'prerelease': False,
'draft': False,
'source_type': 'branch',
'archive_url': f'https://api.github.com/repos/{owner}/{repo}/zipball/{requested_ref}',
}
],
'owner': owner,
'repo': repo,
'source_subdir': requested_subdir,
}
)
# Fetch releases from GitHub API
url = f'https://api.github.com/repos/{owner}/{repo}/releases'
async with httpx.AsyncClient(
@@ -187,7 +245,14 @@ class PluginsRouterGroup(group.RouterGroup):
}
)
return self.success(data={'releases': formatted_releases, 'owner': owner, 'repo': repo})
return self.success(
data={
'releases': formatted_releases,
'owner': owner,
'repo': repo,
'source_subdir': requested_subdir,
}
)
except httpx.RequestError as e:
return self.http_status(500, -1, f'Failed to fetch releases: {str(e)}')

View File

@@ -0,0 +1,146 @@
from __future__ import annotations
import quart
from .. import group
@group.group_class('skills', '/api/v1/skills')
class SkillsRouterGroup(group.RouterGroup):
"""Skills management API endpoints."""
async def initialize(self) -> None:
@self.route('', methods=['GET', 'POST'], auth_type=group.AuthType.USER_TOKEN_OR_API_KEY)
async def list_or_create_skills() -> quart.Response:
if quart.request.method == 'GET':
skills = await self.ap.skill_service.list_skills()
return self.success(data={'skills': skills})
data = await quart.request.json
if 'name' not in data or not data['name']:
return self.http_status(400, -1, 'Missing required field: name')
try:
skill = await self.ap.skill_service.create_skill(data)
return self.success(data={'skill': skill})
except ValueError as exc:
return self.http_status(400, -1, str(exc))
@self.route('/<skill_name>', methods=['GET', 'PUT', 'DELETE'], auth_type=group.AuthType.USER_TOKEN_OR_API_KEY)
async def get_update_delete_skill(skill_name: str) -> quart.Response:
if quart.request.method == 'GET':
skill = await self.ap.skill_service.get_skill(skill_name)
if not skill:
return self.http_status(404, -1, 'Skill not found')
return self.success(data={'skill': skill})
if quart.request.method == 'PUT':
data = await quart.request.json
try:
skill = await self.ap.skill_service.update_skill(skill_name, data)
return self.success(data={'skill': skill})
except ValueError as exc:
return self.http_status(400, -1, str(exc))
try:
await self.ap.skill_service.delete_skill(skill_name)
return self.success()
except ValueError as exc:
return self.http_status(400, -1, str(exc))
@self.route('/<skill_name>/preview', methods=['GET'], auth_type=group.AuthType.USER_TOKEN_OR_API_KEY)
async def preview_skill(skill_name: str) -> quart.Response:
runtime_data = self.ap.skill_mgr.get_skill_runtime_data(skill_name)
if not runtime_data:
return self.http_status(404, -1, 'Skill not found')
return self.success(data={'instructions': runtime_data['instructions']})
@self.route('/index', methods=['GET'], auth_type=group.AuthType.USER_TOKEN_OR_API_KEY)
async def get_skill_index() -> quart.Response:
pipeline_uuid = quart.request.args.get('pipeline_uuid')
bound_skills = quart.request.args.getlist('bound_skills')
skill_index = self.ap.skill_mgr.get_skill_index(
pipeline_uuid=pipeline_uuid,
bound_skills=bound_skills if bound_skills else None,
)
return self.success(data={'index': skill_index})
@self.route('/install/github', methods=['POST'], auth_type=group.AuthType.USER_TOKEN_OR_API_KEY)
async def install_skill_from_github() -> quart.Response:
data = await quart.request.json
required_fields = ['asset_url', 'owner', 'repo', 'release_tag']
for field in required_fields:
if field not in data or not data[field]:
return self.http_status(400, -1, f'Missing required field: {field}')
try:
skill = await self.ap.skill_service.install_from_github(data)
return self.success(data={'skills': skill})
except ValueError as exc:
return self.http_status(400, -1, str(exc))
except Exception as exc:
return self.http_status(500, -1, f'Failed to install skill: {exc}')
@self.route('/install/github/preview', methods=['POST'], auth_type=group.AuthType.USER_TOKEN_OR_API_KEY)
async def preview_skill_from_github() -> quart.Response:
data = await quart.request.json
required_fields = ['asset_url', 'owner', 'repo', 'release_tag']
for field in required_fields:
if field not in data or not data[field]:
return self.http_status(400, -1, f'Missing required field: {field}')
try:
preview = await self.ap.skill_service.preview_install_from_github(data)
return self.success(data={'skills': preview})
except ValueError as exc:
return self.http_status(400, -1, str(exc))
except Exception as exc:
return self.http_status(500, -1, f'Failed to preview skill: {exc}')
@self.route('/install/upload', methods=['POST'], auth_type=group.AuthType.USER_TOKEN_OR_API_KEY)
async def install_skill_from_upload() -> quart.Response:
file = (await quart.request.files).get('file')
if file is None:
return self.http_status(400, -1, 'file is required')
form = await quart.request.form
try:
skill = await self.ap.skill_service.install_from_zip_upload(
file_bytes=file.read(),
filename=file.filename or '',
source_paths=form.getlist('source_paths'),
)
return self.success(data={'skills': skill})
except ValueError as exc:
return self.http_status(400, -1, str(exc))
except Exception as exc:
return self.http_status(500, -1, f'Failed to install skill: {exc}')
@self.route('/install/upload/preview', methods=['POST'], auth_type=group.AuthType.USER_TOKEN_OR_API_KEY)
async def preview_skill_from_upload() -> quart.Response:
file = (await quart.request.files).get('file')
if file is None:
return self.http_status(400, -1, 'file is required')
try:
preview = await self.ap.skill_service.preview_install_from_zip_upload(
file_bytes=file.read(),
filename=file.filename or '',
)
return self.success(data={'skills': preview})
except ValueError as exc:
return self.http_status(400, -1, str(exc))
except Exception as exc:
return self.http_status(500, -1, f'Failed to preview skill: {exc}')
@self.route('/scan', methods=['GET'], auth_type=group.AuthType.USER_TOKEN_OR_API_KEY)
async def scan_skill_directory() -> quart.Response:
path = quart.request.args.get('path', '').strip()
if not path:
return self.http_status(400, -1, 'Missing required parameter: path')
try:
result = self.ap.skill_service.scan_directory(path)
return self.success(data=result)
except ValueError as exc:
return self.http_status(400, -1, str(exc))

View File

@@ -220,6 +220,8 @@ class PipelineService:
bound_mcp_servers: list[str] = None,
enable_all_plugins: bool = True,
enable_all_mcp_servers: bool = True,
bound_skills: list[str] = None,
enable_all_skills: bool = True,
) -> None:
"""Update the bound plugins and MCP servers for a pipeline"""
# Get current pipeline
@@ -237,9 +239,12 @@ class PipelineService:
extensions_preferences = pipeline.extensions_preferences or {}
extensions_preferences['enable_all_plugins'] = enable_all_plugins
extensions_preferences['enable_all_mcp_servers'] = enable_all_mcp_servers
extensions_preferences['enable_all_skills'] = enable_all_skills
extensions_preferences['plugins'] = bound_plugins
if bound_mcp_servers is not None:
extensions_preferences['mcp_servers'] = bound_mcp_servers
if bound_skills is not None:
extensions_preferences['skills'] = bound_skills
await self.ap.persistence_mgr.execute_async(
sqlalchemy.update(persistence_pipeline.LegacyPipeline)

View File

@@ -0,0 +1,743 @@
from __future__ import annotations
import io
import inspect
import os
import posixpath
import shutil
import tempfile
import zipfile
from typing import Optional
from urllib.parse import urlparse
import httpx
import yaml
from ....core import app
from ....skill.utils import parse_frontmatter
from ....utils import paths
_FRONTMATTER_FIELDS = (
'name',
'display_name',
'description',
'auto_activate',
)
_PUBLIC_SKILL_FIELDS = (
'name',
'display_name',
'description',
'instructions',
'package_root',
'auto_activate',
'created_at',
'updated_at',
)
_GITHUB_ASSET_HOSTS = {
'github.com',
'api.github.com',
'objects.githubusercontent.com',
'githubusercontent.com',
'raw.githubusercontent.com',
'codeload.github.com',
}
def _build_skill_md(metadata: dict, instructions: str) -> str:
frontmatter = {}
for key in _FRONTMATTER_FIELDS:
value = metadata.get(key)
if value is None:
continue
if key == 'auto_activate' and value is True:
continue
if isinstance(value, str) and not value.strip():
continue
frontmatter[key] = value
if not frontmatter:
return instructions
frontmatter_text = yaml.dump(frontmatter, default_flow_style=False, allow_unicode=True, sort_keys=False).strip()
return f'---\n{frontmatter_text}\n---\n\n{instructions}'
class SkillService:
"""Filesystem-backed skill management service."""
ap: app.Application
def __init__(self, ap: app.Application) -> None:
self.ap = ap
@staticmethod
def _serialize_skill(skill: dict) -> dict:
return {field: skill.get(field) for field in _PUBLIC_SKILL_FIELDS if field in skill}
async def list_skills(self) -> list[dict]:
skills = [dict(skill) for skill in getattr(self.ap.skill_mgr, 'skills', {}).values()]
skills.sort(key=lambda item: item.get('updated_at', ''), reverse=True)
return [self._serialize_skill(skill) for skill in skills]
async def get_skill(self, skill_name: str) -> Optional[dict]:
skill = getattr(self.ap.skill_mgr, 'get_skill_by_name', lambda _name: None)(skill_name)
return self._serialize_skill(skill) if skill else None
async def get_skill_by_name(self, name: str) -> Optional[dict]:
return await self.get_skill(name)
async def create_skill(self, data: dict) -> dict:
name = self._validate_skill_name(data.get('name', ''))
if await self.get_skill_by_name(name):
raise ValueError(f'Skill with name "{name}" already exists')
package_root = self._normalize_package_root(data.get('package_root', ''))
managed_root = self._managed_skill_path(name)
target_root = managed_root
imported_skill_data: dict | None = None
if package_root and self._managed_install_root_for_package(package_root):
if not os.path.isdir(package_root):
raise ValueError(f'Directory does not exist: {package_root}')
target_root = package_root
imported_skill_data = self._read_skill_package(target_root)
elif package_root and package_root != managed_root:
if not os.path.isdir(package_root):
raise ValueError(f'Directory does not exist: {package_root}')
if os.path.exists(managed_root):
raise ValueError(f'Skill directory already exists: {managed_root}')
os.makedirs(os.path.dirname(managed_root), exist_ok=True)
shutil.copytree(package_root, managed_root)
imported_skill_data = self._read_skill_package(managed_root)
else:
os.makedirs(managed_root, exist_ok=True)
metadata = {
'name': name,
'display_name': self._resolve_create_field(data, 'display_name', imported_skill_data, default=''),
'description': self._resolve_create_field(data, 'description', imported_skill_data, default=''),
'auto_activate': self._resolve_create_bool(data, 'auto_activate', imported_skill_data, default=True),
}
instructions = self._resolve_create_field(data, 'instructions', imported_skill_data, default='')
self._write_skill_md(target_root, metadata, instructions)
await self._reload_skills()
created = await self.get_skill(name)
if not created:
raise ValueError(f'Failed to create skill "{name}"')
return created
async def update_skill(self, skill_name: str, data: dict) -> dict:
skill = await self.get_skill(skill_name)
if not skill:
raise ValueError(f'Skill "{skill_name}" not found')
requested_name = str(data.get('name', skill['name']) or skill['name']).strip()
if requested_name != skill['name']:
raise ValueError('Renaming skills is not supported')
requested_package_root = str(data.get('package_root', '') or '').strip()
existing_package_root = self._normalize_package_root(skill['package_root'])
if requested_package_root and self._normalize_package_root(requested_package_root) != existing_package_root:
raise ValueError('Updating package_root is not supported; recreate the skill to import a different package')
metadata = {
'name': skill['name'],
'display_name': data.get('display_name', skill.get('display_name', '')),
'description': data.get('description', skill.get('description', '')),
'auto_activate': data.get('auto_activate', skill.get('auto_activate', True)),
}
instructions = str(data.get('instructions', skill.get('instructions', '')) or '')
self._write_skill_md(skill['package_root'], metadata, instructions)
await self._reload_skills()
updated = await self.get_skill(skill_name)
if not updated:
raise ValueError(f'Skill "{skill_name}" not found after update')
return updated
async def delete_skill(self, skill_name: str) -> bool:
skill = await self.get_skill(skill_name)
if not skill:
raise ValueError(f'Skill "{skill_name}" not found')
package_root = self._normalize_package_root(skill['package_root'])
managed_install_root = self._managed_install_root_for_package(package_root)
if not managed_install_root:
raise ValueError('Only managed skills under data/skills can be deleted via LangBot')
shutil.rmtree(managed_install_root, ignore_errors=True)
await self._reload_skills()
return True
async def list_skill_files(
self,
skill_name: str,
path: str = '.',
include_hidden: bool = False,
max_entries: int = 200,
) -> dict:
skill = await self.get_skill(skill_name)
if not skill:
raise ValueError(f'Skill "{skill_name}" not found')
target_dir, relative_path = self._resolve_skill_path(skill, path, expect_directory=True)
entries: list[dict] = []
with os.scandir(target_dir) as iterator:
for entry in sorted(iterator, key=lambda item: item.name):
if not include_hidden and entry.name.startswith('.'):
continue
entry_rel_path = entry.name if relative_path in ('', '.') else os.path.join(relative_path, entry.name)
is_dir = entry.is_dir()
entries.append(
{
'path': entry_rel_path.replace(os.sep, '/'),
'name': entry.name,
'is_dir': is_dir,
'size': None if is_dir else entry.stat().st_size,
}
)
if len(entries) >= max_entries:
break
return {
'skill': {'name': skill['name']},
'base_path': '.' if relative_path in ('', '.') else relative_path.replace(os.sep, '/'),
'entries': entries,
'truncated': len(entries) >= max_entries,
}
async def read_skill_file(self, skill_name: str, path: str) -> dict:
skill = await self.get_skill(skill_name)
if not skill:
raise ValueError(f'Skill "{skill_name}" not found')
target_path, relative_path = self._resolve_skill_path(skill, path, expect_directory=False)
if not os.path.isfile(target_path):
raise ValueError(f'Skill file not found: {relative_path}')
try:
with open(target_path, 'r', encoding='utf-8') as f:
content = f.read()
except UnicodeDecodeError as exc:
raise ValueError(f'Skill file is not valid UTF-8 text: {relative_path}') from exc
return {
'skill': {'name': skill['name']},
'path': relative_path.replace(os.sep, '/'),
'content': content,
}
async def write_skill_file(self, skill_name: str, path: str, content: str) -> dict:
skill = await self.get_skill(skill_name)
if not skill:
raise ValueError(f'Skill "{skill_name}" not found')
target_path, relative_path = self._resolve_skill_path(skill, path, expect_directory=False)
os.makedirs(os.path.dirname(target_path), exist_ok=True)
with open(target_path, 'w', encoding='utf-8') as f:
f.write(content)
skill_mgr = getattr(self.ap, 'skill_mgr', None)
if skill_mgr is not None:
refresh_skill = getattr(skill_mgr, 'refresh_skill_from_disk', None)
if callable(refresh_skill):
refresh_skill(skill.get('name', ''))
return {
'skill': {'name': skill['name']},
'path': relative_path.replace(os.sep, '/'),
'bytes_written': len(content.encode('utf-8')),
}
async def install_from_github(self, data: dict) -> list[dict]:
owner = str(data['owner']).strip()
repo = str(data['repo']).strip()
release_tag = str(data.get('release_tag', '')).strip()
asset_url = self._validate_github_asset_url(data['asset_url'], owner=owner, repo=repo, release_tag=release_tag)
source_subdir = str(data.get('source_subdir', '') or '').strip()
tmp_dir = tempfile.mkdtemp(prefix='langbot_skill_')
try:
skill_root = await self._download_github_skill_to_temp(asset_url, tmp_dir)
skill_root = self._resolve_github_source_root(skill_root, source_subdir)
previews = self._preview_skill_candidates(
skill_root,
base_target_name=repo,
suffix=release_tag.lstrip('v').replace('/', '-') or 'source',
)
selected_previews = self._select_preview_candidates(previews, data)
scanned = self._install_preview_candidates(skill_root, selected_previews)
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
await self._reload_skills()
return await self._resolve_installed_skills(scanned)
async def preview_install_from_github(self, data: dict) -> list[dict]:
owner = str(data['owner']).strip()
repo = str(data['repo']).strip()
release_tag = str(data.get('release_tag', '')).strip()
asset_url = self._validate_github_asset_url(data['asset_url'], owner=owner, repo=repo, release_tag=release_tag)
source_subdir = str(data.get('source_subdir', '') or '').strip()
tmp_dir = tempfile.mkdtemp(prefix='langbot_skill_preview_')
try:
skill_root = await self._download_github_skill_to_temp(asset_url, tmp_dir)
skill_root = self._resolve_github_source_root(skill_root, source_subdir)
return self._preview_skill_candidates(
skill_root,
base_target_name=repo,
suffix=release_tag.lstrip('v').replace('/', '-') or 'source',
)
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
async def install_from_zip_upload(
self,
*,
file_bytes: bytes,
filename: str,
source_paths: list[str] | None = None,
source_path: str = '',
) -> list[dict]:
if not file_bytes:
raise ValueError('Uploaded file is empty')
tmp_dir = tempfile.mkdtemp(prefix='langbot_skill_upload_')
try:
skill_root = self._extract_uploaded_skill_to_temp(file_bytes, tmp_dir)
base_target_name = self._uploaded_skill_target_stem(filename)
previews = self._preview_skill_candidates(
skill_root,
base_target_name=base_target_name,
suffix='upload',
)
selected_previews = self._select_preview_candidates(
previews,
{'source_paths': source_paths or [], 'source_path': source_path},
)
scanned = self._install_preview_candidates(skill_root, selected_previews)
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
await self._reload_skills()
return await self._resolve_installed_skills(scanned)
async def preview_install_from_zip_upload(self, *, file_bytes: bytes, filename: str) -> list[dict]:
if not file_bytes:
raise ValueError('Uploaded file is empty')
tmp_dir = tempfile.mkdtemp(prefix='langbot_skill_upload_preview_')
try:
skill_root = self._extract_uploaded_skill_to_temp(file_bytes, tmp_dir)
return self._preview_skill_candidates(
skill_root,
base_target_name=self._uploaded_skill_target_stem(filename),
suffix='upload',
)
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
async def reload_skills(self) -> list[dict]:
await self._reload_skills()
return await self.list_skills()
def scan_directory(self, path: str) -> dict:
if not os.path.isdir(path):
raise ValueError(f'Directory does not exist: {path}')
discovered = self._discover_skill_directories(path, max_depth=2)
if not discovered:
raise ValueError(f'No SKILL.md found in {path} or its subdirectories (max depth: 2)')
if len(discovered) > 1:
candidates = ', '.join(found_path for found_path, _entry in discovered)
raise ValueError(
f'Multiple skill directories found in {path}. Please choose a more specific path: {candidates}'
)
package_root, entry_file = discovered[0]
entry_path = os.path.join(package_root, entry_file)
with open(entry_path, 'r', encoding='utf-8') as f:
content = f.read()
metadata, instructions = parse_frontmatter(content)
dir_name = os.path.basename(os.path.normpath(package_root))
return {
'package_root': os.path.abspath(package_root),
'entry_file': entry_file,
'name': str(metadata.get('name') or dir_name).strip(),
'display_name': str(metadata.get('display_name') or '').strip(),
'description': str(metadata.get('description') or '').strip(),
'instructions': instructions,
'auto_activate': bool(metadata.get('auto_activate', True)),
}
async def _reload_skills(self) -> None:
skill_mgr = getattr(self.ap, 'skill_mgr', None)
reload_skills = getattr(skill_mgr, 'reload_skills', None)
if not callable(reload_skills):
return
result = reload_skills()
if inspect.isawaitable(result):
await result
def _read_skill_package(self, package_root: str) -> dict:
entry = self._find_skill_entry(package_root)
if entry is None:
raise ValueError(f'No SKILL.md found in {package_root}')
resolved_root, entry_file = entry
entry_path = os.path.join(resolved_root, entry_file)
with open(entry_path, 'r', encoding='utf-8') as f:
content = f.read()
metadata, instructions = parse_frontmatter(content)
return {
'entry_file': entry_file,
'display_name': str(metadata.get('display_name') or '').strip(),
'description': str(metadata.get('description') or '').strip(),
'instructions': instructions,
'auto_activate': bool(metadata.get('auto_activate', True)),
}
async def _download_github_skill_to_temp(self, asset_url: str, tmp_dir: str) -> str:
zip_path = os.path.join(tmp_dir, 'skill.zip')
async with httpx.AsyncClient(follow_redirects=True, timeout=120) as client:
resp = await client.get(asset_url)
resp.raise_for_status()
with open(zip_path, 'wb') as f:
f.write(resp.content)
extract_dir = os.path.join(tmp_dir, 'extracted')
with zipfile.ZipFile(zip_path, 'r') as zf:
self._safe_extract_zip(zf, extract_dir)
entries = os.listdir(extract_dir)
if len(entries) == 1 and os.path.isdir(os.path.join(extract_dir, entries[0])):
return os.path.join(extract_dir, entries[0])
return extract_dir
def _extract_uploaded_skill_to_temp(self, file_bytes: bytes, tmp_dir: str) -> str:
extract_dir = os.path.join(tmp_dir, 'extracted')
try:
with zipfile.ZipFile(io.BytesIO(file_bytes), 'r') as zf:
self._safe_extract_zip(zf, extract_dir)
except zipfile.BadZipFile as exc:
raise ValueError('Uploaded file must be a valid .zip archive') from exc
entries = os.listdir(extract_dir)
if len(entries) == 1 and os.path.isdir(os.path.join(extract_dir, entries[0])):
return os.path.join(extract_dir, entries[0])
return extract_dir
def _resolve_github_source_root(self, root_path: str, source_subdir: str) -> str:
normalized = str(source_subdir or '').strip().replace('\\', '/').strip('/')
if not normalized:
return root_path
target_path = os.path.realpath(os.path.join(root_path, normalized))
root_path = os.path.realpath(root_path)
if target_path != root_path and not target_path.startswith(f'{root_path}{os.sep}'):
raise ValueError('source_subdir must stay within the downloaded repository')
if not os.path.isdir(target_path):
raise ValueError(f'source_subdir does not exist in the downloaded repository: {normalized}')
return target_path
def _uploaded_skill_target_stem(self, filename: str) -> str:
stem = os.path.splitext(os.path.basename(str(filename or '').strip()))[0]
safe_stem = ''.join(ch if ch.isalnum() or ch in ('-', '_') else '-' for ch in stem).strip('-_')
if not safe_stem:
safe_stem = 'uploaded-skill'
return safe_stem
def _build_preview_target_dir(self, base_target_name: str, source_path: str, suffix: str) -> str:
relative = str(source_path or '').strip().replace('\\', '/').strip('/')
leaf_name = relative.split('/')[-1] if relative else ''
target_name = base_target_name
if leaf_name and leaf_name != base_target_name:
target_name = f'{base_target_name}-{leaf_name}'
if suffix:
target_name = f'{target_name}-{suffix}'
return paths.get_data_path('skills', target_name)
def _preview_skill_candidates(self, root_path: str, *, base_target_name: str, suffix: str) -> list[dict]:
discovered = self._discover_skill_directories(root_path, max_depth=2)
if not discovered:
raise ValueError(f'No SKILL.md found in {root_path} or its subdirectories (max depth: 2)')
previews: list[dict] = []
for package_root, entry_file in discovered:
entry_path = os.path.join(package_root, entry_file)
with open(entry_path, 'r', encoding='utf-8') as f:
content = f.read()
metadata, instructions = parse_frontmatter(content)
relative_path = os.path.relpath(package_root, root_path)
if relative_path in ('', '.'):
relative_path = ''
dir_name = os.path.basename(os.path.normpath(package_root))
previews.append(
{
'source_path': relative_path.replace(os.sep, '/'),
'entry_file': entry_file,
'name': str(metadata.get('name') or dir_name).strip(),
'display_name': str(metadata.get('display_name') or '').strip(),
'description': str(metadata.get('description') or '').strip(),
'instructions': instructions,
'auto_activate': bool(metadata.get('auto_activate', True)),
'package_root': self._build_preview_target_dir(base_target_name, relative_path, suffix),
}
)
previews.sort(key=lambda item: item['source_path'])
return previews
def _select_preview_candidates(self, previews: list[dict], data: dict) -> list[dict]:
normalized_paths: list[str] = []
raw_source_paths = data.get('source_paths', [])
if isinstance(raw_source_paths, list):
for source_path in raw_source_paths:
normalized = str(source_path or '').strip().replace('\\', '/').strip('/')
if normalized not in normalized_paths:
normalized_paths.append(normalized)
legacy_source_path = str(data.get('source_path', '') or '').strip().replace('\\', '/').strip('/')
if legacy_source_path and legacy_source_path not in normalized_paths:
normalized_paths.append(legacy_source_path)
if len(previews) == 1 and not normalized_paths:
return previews
if not normalized_paths:
candidates = ', '.join(item['source_path'] or '.' for item in previews)
raise ValueError(f'Multiple skills found. Please choose one or more source_paths: {candidates}')
selected: list[dict] = []
available = {preview['source_path']: preview for preview in previews}
for normalized_path in normalized_paths:
preview = available.get(normalized_path)
if preview is None:
candidates = ', '.join(item['source_path'] or '.' for item in previews)
raise ValueError(f'Invalid source_path "{normalized_path}". Available: {candidates}')
selected.append(preview)
return selected
def _install_preview_candidates(self, root_path: str, selected_previews: list[dict]) -> list[dict]:
target_dirs: list[str] = []
for preview in selected_previews:
target_dir = self._normalize_package_root(preview['package_root'])
if target_dir in target_dirs:
raise ValueError(f'Duplicate target directory selected: {target_dir}')
if os.path.exists(target_dir):
raise ValueError(f'Skill directory already exists: {target_dir}')
target_dirs.append(target_dir)
installed_scans: list[dict] = []
created_dirs: list[str] = []
try:
for preview in selected_previews:
target_dir = self._normalize_package_root(preview['package_root'])
source_root = self._preview_source_root(root_path, preview['source_path'])
os.makedirs(os.path.dirname(target_dir), exist_ok=True)
shutil.copytree(source_root, target_dir)
created_dirs.append(target_dir)
installed_scans.append(self.scan_directory(target_dir))
except Exception:
for target_dir in created_dirs:
shutil.rmtree(target_dir, ignore_errors=True)
raise
return installed_scans
async def _resolve_installed_skills(self, scanned_skills: list[dict]) -> list[dict]:
installed_skills: list[dict] = []
for scanned in scanned_skills:
installed = await self.get_skill(scanned['name'])
if not installed:
installed = self._serialize_skill(scanned)
installed_skills.append(installed)
return installed_skills
@staticmethod
def _preview_source_root(root_path: str, source_path: str) -> str:
normalized = str(source_path or '').strip().replace('\\', '/').strip('/')
if not normalized:
return root_path
return os.path.join(root_path, normalized)
@staticmethod
def _validate_github_asset_url(asset_url: str, *, owner: str, repo: str, release_tag: str) -> str:
parsed = urlparse(str(asset_url).strip())
if parsed.scheme != 'https' or not parsed.netloc:
raise ValueError('asset_url must be a valid HTTPS GitHub asset URL')
host = parsed.netloc.lower()
if host not in _GITHUB_ASSET_HOSTS:
raise ValueError('asset_url must point to a GitHub-hosted release asset or archive')
normalized_path = posixpath.normpath(parsed.path or '/')
allowed_prefixes = [
f'/repos/{owner}/{repo}/',
f'/{owner}/{repo}/',
]
if not any(normalized_path.startswith(prefix) for prefix in allowed_prefixes):
raise ValueError('asset_url does not match the requested owner/repo')
if release_tag and release_tag not in parsed.path and release_tag not in parsed.query:
raise ValueError('asset_url does not match the requested release_tag')
return parsed.geturl()
@staticmethod
def _safe_extract_zip(archive: zipfile.ZipFile, target_dir: str) -> None:
target_root = os.path.realpath(target_dir)
os.makedirs(target_root, exist_ok=True)
for member in archive.infolist():
member_name = member.filename
if not member_name or member_name.endswith('/'):
continue
normalized = posixpath.normpath(member_name)
if normalized.startswith('../') or normalized == '..' or os.path.isabs(normalized):
raise ValueError(f'Archive contains an unsafe path: {member_name}')
destination = os.path.realpath(os.path.join(target_root, normalized))
if destination != target_root and not destination.startswith(f'{target_root}{os.sep}'):
raise ValueError(f'Archive contains an unsafe path: {member_name}')
archive.extractall(target_root)
@staticmethod
def _resolve_create_field(data: dict, field: str, imported_skill_data: dict | None, *, default: str) -> str:
raw_value = data.get(field) if field in data else None
if raw_value is None:
if imported_skill_data is not None:
return str(imported_skill_data.get(field, default) or default)
return default
value = str(raw_value or '')
if imported_skill_data is not None and not value.strip():
return str(imported_skill_data.get(field, default) or default)
return value
@staticmethod
def _resolve_create_bool(data: dict, field: str, imported_skill_data: dict | None, *, default: bool) -> bool:
if field in data and data[field] is not None:
return bool(data[field])
if imported_skill_data is not None:
return bool(imported_skill_data.get(field, default))
return default
def _write_skill_md(self, package_root: str, metadata: dict, instructions: str) -> None:
package_root = self._normalize_package_root(package_root)
os.makedirs(package_root, exist_ok=True)
content = _build_skill_md(metadata, instructions)
with open(os.path.join(package_root, 'SKILL.md'), 'w', encoding='utf-8') as f:
f.write(content)
def _managed_skill_path(self, skill_name: str) -> str:
return self._normalize_package_root(paths.get_data_path('skills', skill_name))
def _managed_install_root_for_package(self, package_root: str) -> str:
managed_root = self._normalize_package_root(paths.get_data_path('skills'))
if not package_root or package_root == managed_root:
return ''
prefix = f'{managed_root}{os.sep}'
if not package_root.startswith(prefix):
return ''
relative = os.path.relpath(package_root, managed_root)
top_level = relative.split(os.sep, 1)[0]
if top_level in ('', '.', '..'):
return ''
return os.path.join(managed_root, top_level)
@staticmethod
def _validate_skill_name(name: str) -> str:
name = str(name or '').strip()
if not name:
raise ValueError('Skill name is required')
if not name.replace('-', '').replace('_', '').isalnum():
raise ValueError('Skill name can only contain letters, numbers, hyphens and underscores')
if len(name) > 64:
raise ValueError('Skill name cannot exceed 64 characters')
return name
@staticmethod
def _normalize_package_root(package_root: str) -> str:
package_root = str(package_root).strip()
if not package_root:
return ''
return os.path.realpath(os.path.abspath(package_root))
@staticmethod
def _find_skill_entry(path: str) -> Optional[tuple[str, str]]:
for candidate in ('SKILL.md', 'skill.md'):
if os.path.isfile(os.path.join(path, candidate)):
return path, candidate
return None
def _discover_skill_directories(self, root_path: str, max_depth: int = 2) -> list[tuple[str, str]]:
discovered: list[tuple[str, str]] = []
queue: list[tuple[str, int]] = [(root_path, 0)]
seen: set[str] = set()
while queue:
current_path, depth = queue.pop(0)
normalized_path = os.path.abspath(current_path)
if normalized_path in seen:
continue
seen.add(normalized_path)
found = self._find_skill_entry(normalized_path)
if found:
discovered.append(found)
continue
if depth >= max_depth:
continue
try:
entries = sorted(os.scandir(normalized_path), key=lambda entry: entry.name)
except OSError:
continue
for entry in entries:
if entry.is_dir():
queue.append((entry.path, depth + 1))
return discovered
def _resolve_skill_path(self, skill: dict, path: str, *, expect_directory: bool) -> tuple[str, str]:
package_root = self._normalize_package_root(skill.get('package_root', ''))
if not package_root:
raise ValueError(f'Skill "{skill.get("name", "")}" has no package_root')
relative_path = str(path or '.').strip() or '.'
if os.path.isabs(relative_path):
raise ValueError('path must be relative to the skill package root')
normalized_relative = os.path.normpath(relative_path)
if normalized_relative.startswith('..') or normalized_relative == '..':
raise ValueError('path must stay within the skill package root')
target_path = os.path.realpath(os.path.join(package_root, normalized_relative))
if target_path != package_root and not target_path.startswith(f'{package_root}{os.sep}'):
raise ValueError('path must stay within the skill package root')
if expect_directory:
if not os.path.isdir(target_path):
raise ValueError(f'Skill directory not found: {relative_path}')
else:
parent_dir = os.path.dirname(target_path) or package_root
if parent_dir != package_root and not parent_dir.startswith(f'{package_root}{os.sep}'):
raise ValueError('path must stay within the skill package root')
return target_path, normalized_relative