feat: support github skill installation

This commit is contained in:
Junyan Qin
2026-05-17 23:09:10 +08:00
parent e814f359cb
commit bf8b51569f
10 changed files with 560 additions and 48 deletions

View File

@@ -98,10 +98,13 @@ class SkillsRouterGroup(group.RouterGroup):
@self.route('/install/github', methods=['POST'], auth_type=group.AuthType.USER_TOKEN_OR_API_KEY)
async def install_skill_from_github() -> quart.Response:
data = await quart.request.json
required_fields = ['asset_url', 'owner', 'repo', 'release_tag']
required_fields = ['asset_url', 'owner', 'repo']
for field in required_fields:
if field not in data or not data[field]:
return self.http_status(400, -1, f'Missing required field: {field}')
asset_url = str(data['asset_url']).strip().lower().split('?', 1)[0].split('#', 1)[0]
if not asset_url.endswith('skill.md') and not data.get('release_tag'):
return self.http_status(400, -1, 'Missing required field: release_tag')
try:
skill = await self.ap.skill_service.install_from_github(data)
@@ -114,10 +117,13 @@ class SkillsRouterGroup(group.RouterGroup):
@self.route('/install/github/preview', methods=['POST'], auth_type=group.AuthType.USER_TOKEN_OR_API_KEY)
async def preview_skill_from_github() -> quart.Response:
data = await quart.request.json
required_fields = ['asset_url', 'owner', 'repo', 'release_tag']
required_fields = ['asset_url', 'owner', 'repo']
for field in required_fields:
if field not in data or not data[field]:
return self.http_status(400, -1, f'Missing required field: {field}')
asset_url = str(data['asset_url']).strip().lower().split('?', 1)[0].split('#', 1)[0]
if not asset_url.endswith('skill.md') and not data.get('release_tag'):
return self.http_status(400, -1, 'Missing required field: release_tag')
try:
preview = await self.ap.skill_service.preview_install_from_github(data)

View File

@@ -297,7 +297,11 @@ class SkillService:
owner = str(data['owner']).strip()
repo = str(data['repo']).strip()
release_tag = str(data.get('release_tag', '')).strip()
asset_url = self._validate_github_asset_url(data['asset_url'], owner=owner, repo=repo, release_tag=release_tag)
raw_asset_url = str(data['asset_url']).strip()
if self._is_github_skill_md_url(raw_asset_url):
return await self._install_github_skill_md(raw_asset_url, owner=owner, repo=repo, data=data)
asset_url = self._validate_github_asset_url(raw_asset_url, owner=owner, repo=repo, release_tag=release_tag)
source_subdir = str(data.get('source_subdir', '') or '').strip()
box_service = self._box_service()
@@ -335,7 +339,11 @@ class SkillService:
owner = str(data['owner']).strip()
repo = str(data['repo']).strip()
release_tag = str(data.get('release_tag', '')).strip()
asset_url = self._validate_github_asset_url(data['asset_url'], owner=owner, repo=repo, release_tag=release_tag)
raw_asset_url = str(data['asset_url']).strip()
if self._is_github_skill_md_url(raw_asset_url):
return await self._preview_github_skill_md(raw_asset_url, owner=owner, repo=repo)
asset_url = self._validate_github_asset_url(raw_asset_url, owner=owner, repo=repo, release_tag=release_tag)
source_subdir = str(data.get('source_subdir', '') or '').strip()
box_service = self._box_service()
@@ -420,6 +428,63 @@ class SkillService:
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
async def _install_github_skill_md(self, asset_url: str, *, owner: str, repo: str, data: dict) -> list[dict]:
zip_bytes, filename, package_name = await self._download_github_skill_md_as_zip(
asset_url,
owner=owner,
repo=repo,
)
box_service = self._box_service()
if box_service is not None:
installed = await box_service.install_skill_zip(
zip_bytes,
filename,
source_paths=data.get('source_paths') or [],
source_path=str(data.get('source_path', '') or ''),
target_suffix='',
)
await self._reload_skills()
return [self._serialize_skill(skill) for skill in installed]
tmp_dir = tempfile.mkdtemp(prefix='langbot_skill_md_')
try:
skill_root = self._extract_uploaded_skill_to_temp(zip_bytes, tmp_dir)
previews = self._preview_skill_candidates(
skill_root,
base_target_name=package_name,
suffix='',
)
selected_previews = self._select_preview_candidates(previews, data)
scanned = self._install_preview_candidates(skill_root, selected_previews)
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
await self._reload_skills()
return await self._resolve_installed_skills(scanned)
async def _preview_github_skill_md(self, asset_url: str, *, owner: str, repo: str) -> list[dict]:
zip_bytes, _filename, package_name = await self._download_github_skill_md_as_zip(
asset_url,
owner=owner,
repo=repo,
)
box_service = self._box_service()
if box_service is not None:
return await box_service.preview_skill_zip(zip_bytes, f'{package_name}.zip', target_suffix='')
tmp_dir = tempfile.mkdtemp(prefix='langbot_skill_md_preview_')
try:
skill_root = self._extract_uploaded_skill_to_temp(zip_bytes, tmp_dir)
return self._preview_skill_candidates(
skill_root,
base_target_name=package_name,
suffix='',
)
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
async def reload_skills(self) -> list[dict]:
await self._reload_skills()
return await self.list_skills()
@@ -507,6 +572,31 @@ class SkillService:
resp.raise_for_status()
return resp.content
async def _download_github_skill_md_as_zip(
self, asset_url: str, *, owner: str, repo: str
) -> tuple[bytes, str, str]:
info = self._parse_github_skill_md_url(asset_url, owner=owner, repo=repo)
content = await self._download_github_skill_md(info['raw_url'])
package_name = self._resolve_github_skill_md_package_name(content, info['package_name'])
buffer = io.BytesIO()
with zipfile.ZipFile(buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
zf.writestr(f'{package_name}/SKILL.md', content)
return buffer.getvalue(), f'{package_name}.zip', package_name
async def _download_github_skill_md(self, raw_url: str) -> str:
async with httpx.AsyncClient(follow_redirects=True, timeout=60) as client:
try:
resp = await client.get(raw_url)
resp.raise_for_status()
except httpx.HTTPError as exc:
raise ValueError(f'Failed to download SKILL.md from GitHub: {exc}') from exc
try:
return resp.content.decode('utf-8')
except UnicodeDecodeError as exc:
raise ValueError('GitHub SKILL.md must be valid UTF-8 text') from exc
def _extract_uploaded_skill_to_temp(self, file_bytes: bytes, tmp_dir: str) -> str:
extract_dir = os.path.join(tmp_dir, 'extracted')
try:
@@ -656,6 +746,54 @@ class SkillService:
return root_path
return os.path.join(root_path, normalized)
@staticmethod
def _is_github_skill_md_url(asset_url: str) -> bool:
parsed = urlparse(str(asset_url or '').strip())
normalized_path = posixpath.normpath(parsed.path or '/')
return normalized_path.lower().endswith('/skill.md')
def _parse_github_skill_md_url(self, asset_url: str, *, owner: str, repo: str) -> dict:
parsed = urlparse(str(asset_url or '').strip())
if parsed.scheme != 'https' or not parsed.netloc:
raise ValueError('asset_url must be a valid HTTPS GitHub SKILL.md URL')
host = parsed.netloc.lower()
path_parts = [part for part in (parsed.path or '').split('/') if part]
if host == 'github.com':
if len(path_parts) < 5 or path_parts[0] != owner or path_parts[1] != repo or path_parts[2] != 'blob':
raise ValueError('GitHub SKILL.md URL must point to the requested owner/repo blob path')
ref = path_parts[3]
file_path = '/'.join(path_parts[4:])
raw_url = f'https://raw.githubusercontent.com/{owner}/{repo}/{ref}/{file_path}'
elif host == 'raw.githubusercontent.com':
if len(path_parts) < 4 or path_parts[0] != owner or path_parts[1] != repo:
raise ValueError('GitHub SKILL.md URL must point to the requested owner/repo raw path')
ref = path_parts[2]
file_path = '/'.join(path_parts[3:])
raw_url = parsed.geturl()
else:
raise ValueError('asset_url must point to a GitHub SKILL.md file')
normalized_file_path = posixpath.normpath(file_path).lower()
if normalized_file_path != 'skill.md' and not normalized_file_path.endswith('/skill.md'):
raise ValueError('GitHub skill import requires a URL ending with SKILL.md')
parent_dir = posixpath.basename(posixpath.dirname(file_path)) or repo
return {
'raw_url': raw_url,
'ref': ref,
'file_path': file_path,
'package_name': self._uploaded_skill_target_stem(parent_dir),
}
def _resolve_github_skill_md_package_name(self, content: str, fallback: str) -> str:
metadata, _instructions = parse_frontmatter(content)
candidate = str(metadata.get('name') or fallback or '').strip()
try:
return self._validate_skill_name(candidate)
except ValueError:
return self._validate_skill_name(fallback)
@staticmethod
def _validate_github_asset_url(asset_url: str, *, owner: str, repo: str, release_tag: str) -> str:
parsed = urlparse(str(asset_url).strip())