feat(box): configurable sandbox scope and unified skill containers

Replace the per-message session_id with a template-based system
configurable per pipeline via 'Sandbox Scope' in the local-agent panel.
Default scope is per-chat ({launcher_type}_{launcher_id}).

Unify skill exec into the same container as default exec — skills are
mounted at /workspace/.skills/{name}/ via extra_mounts instead of
getting separate containers. All pipeline-bound skills are injected
at container creation time.

- Add box-session-id-template to pipeline metadata (select, 4 options, 8 languages)
- Add resolve_box_session_id() and build_skill_extra_mounts() to BoxService
- Rewrite native.py skill exec path to use execute_tool with shared session
- Update tests for new session_id format
- Add design doc: docs/review/box-session-scope.md
This commit is contained in:
Junyan Qin
2026-04-18 22:11:28 +08:00
committed by WangCham
parent ec00e49ef1
commit 7e50063731
5 changed files with 466 additions and 44 deletions
+45 -2
View File
@@ -32,11 +32,11 @@ def _is_path_under(path: str, root: str) -> bool:
return path == root or path.startswith(f'{root}{os.sep}')
def _is_path_under(path: str, root: str) -> bool:
"""Check whether *path* equals *root* or is a child of *root*."""
return path == root or path.startswith(f'{root}{os.sep}')
if TYPE_CHECKING:
from ..core import app as core_app
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
@@ -123,6 +123,45 @@ class BoxService:
)
return self._serialize_result(result)
def resolve_box_session_id(self, query: pipeline_query.Query) -> str:
"""Resolve the Box session_id from the pipeline's template and query variables."""
template = (
(query.pipeline_config or {})
.get('ai', {})
.get('local-agent', {})
.get('box-session-id-template', '{launcher_type}_{launcher_id}')
)
variables = query.variables or {}
return template.format_map(collections.defaultdict(lambda: 'unknown', variables))
def build_skill_extra_mounts(self, query: pipeline_query.Query) -> list[dict]:
"""Build extra_mounts entries for all pipeline-bound skills.
This ensures that when a container is first created it already has
all skill packages mounted, regardless of which skill is currently
activated.
"""
skill_mgr = getattr(self.ap, 'skill_mgr', None)
if skill_mgr is None:
return []
from ..provider.tools.loaders import skill as skill_loader
visible_skills = skill_loader.get_visible_skills(self.ap, query)
mounts: list[dict] = []
for skill_name, skill_data in visible_skills.items():
package_root = str(skill_data.get('package_root', '') or '').strip()
if not package_root:
continue
mounts.append(
{
'host_path': package_root,
'mount_path': f'/workspace/.skills/{skill_name}',
'mode': 'rw',
}
)
return mounts
async def execute_tool(self, parameters: dict, query: pipeline_query.Query) -> dict:
"""Execute an agent-facing ``exec`` tool call.
@@ -137,7 +176,11 @@ class BoxService:
spec_payload[key] = parameters[key]
# Inject context the agent must not control
spec_payload.setdefault('session_id', str(query.query_id))
spec_payload.setdefault('session_id', self.resolve_box_session_id(query))
# Mount all pipeline-bound skills so they are available in the container
if 'extra_mounts' not in spec_payload:
spec_payload['extra_mounts'] = self.build_skill_extra_mounts(query)
return await self.execute_spec_payload(spec_payload, query)
@@ -6,7 +6,6 @@ import os
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
from langbot_plugin.api.entities.events import pipeline_query
from ....box.workspace import BoxWorkspaceSession
from .. import loader
from . import skill as skill_loader
@@ -61,7 +60,8 @@ class NativeToolLoader(loader.ToolLoader):
command = str(parameters['command'])
workdir = str(parameters.get('workdir', '/workspace') or '/workspace')
selected_skill, rewritten_workdir = skill_loader.resolve_virtual_skill_path(
# Validate that skill references target activated skills.
selected_skill, _ = skill_loader.resolve_virtual_skill_path(
self.ap,
query,
workdir,
@@ -78,37 +78,30 @@ class NativeToolLoader(loader.ToolLoader):
raise ValueError(
f'Skill "{referenced_skill_names[0]}" must be activated before exec can run in its package.'
)
rewritten_workdir = '/workspace'
if selected_skill is None:
return await self.ap.box_service.execute_tool(parameters, query)
if selected_skill is not None:
selected_skill_name = str(selected_skill.get('name', '') or '')
if referenced_skill_names and any(name != selected_skill_name for name in referenced_skill_names):
raise ValueError('exec can reference files from only one activated skill package per call.')
selected_skill_name = str(selected_skill.get('name', '') or '')
if referenced_skill_names and any(name != selected_skill_name for name in referenced_skill_names):
raise ValueError('exec can reference files from only one activated skill package per call.')
package_root = str(selected_skill.get('package_root', '') or '').strip()
if not package_root:
raise ValueError(f'Activated skill "{selected_skill_name}" has no package_root.')
package_root = str(selected_skill.get('package_root', '') or '').strip()
if not package_root:
raise ValueError(f'Activated skill "{selected_skill_name}" has no package_root.')
# Wrap command with Python venv bootstrap if the skill has a Python project.
# The venv is created inside the skill's mount path.
skill_mount = f'/workspace/.skills/{selected_skill_name}'
if skill_loader.should_prepare_skill_python_env(package_root):
parameters = dict(parameters)
parameters['command'] = skill_loader.wrap_skill_command_with_python_env(command, mount_path=skill_mount)
rewritten_command = skill_loader.rewrite_command_for_skill_mount(command, selected_skill_name)
if skill_loader.should_prepare_skill_python_env(package_root):
rewritten_command = skill_loader.wrap_skill_command_with_python_env(rewritten_command)
# All exec calls (with or without skills) go through the same container
# via execute_tool. Skills are mounted at /workspace/.skills/{name}/
# via extra_mounts built by BoxService.
result = await self.ap.box_service.execute_tool(parameters, query)
workspace = BoxWorkspaceSession(
self.ap.box_service,
skill_loader.build_skill_session_id(selected_skill, query),
host_path=package_root,
host_path_mode='rw',
)
result = await workspace.execute_for_query(
query,
rewritten_command,
workdir=rewritten_workdir,
timeout_sec=parameters.get('timeout_sec'),
env=parameters.get('env'),
)
self._refresh_skill_from_disk(selected_skill)
if selected_skill is not None:
self._refresh_skill_from_disk(selected_skill)
return result
def _resolve_host_path(
@@ -1,6 +1,5 @@
from __future__ import annotations
import os
import re
import typing
@@ -14,6 +13,8 @@ ACTIVATED_SKILLS_KEY = '_activated_skills'
PIPELINE_BOUND_SKILLS_KEY = '_pipeline_bound_skills'
SKILL_MOUNT_PREFIX = '/workspace/.skills'
_SKILL_MOUNT_PATTERN = re.compile(r'/workspace/\.skills/([A-Za-z0-9_-]+)')
def get_virtual_skill_mount_path(skill_name: str) -> str:
return f'{SKILL_MOUNT_PREFIX}/{skill_name}'
@@ -152,5 +153,5 @@ def should_prepare_skill_python_env(package_root: str | None) -> bool:
return box_workspace.should_prepare_python_env(package_root)
def wrap_skill_command_with_python_env(command: str) -> str:
return box_workspace.wrap_python_command_with_env(command).rstrip()
def wrap_skill_command_with_python_env(command: str, *, mount_path: str = '/workspace') -> str:
return box_workspace.wrap_python_command_with_env(command, mount_path=mount_path).rstrip()