fix(tools): decouple runtime from agent runner

This commit is contained in:
huanghuoguoguo
2026-06-14 21:15:21 +08:00
parent 64b7e9c509
commit 9fa3251f3d
8 changed files with 57 additions and 190 deletions

View File

@@ -204,14 +204,14 @@ def wrap_python_command_with_env(command: str, *, mount_path: str = '/workspace'
fi
if [ "$_LB_NEEDS_BOOTSTRAP" -eq 1 ]; then
if [ -d "$_LB_LOCK_DIR" ] && [ ! -f "$_LB_LOCK_DIR/pid" ]; then
echo "Clearing stale Python environment lock without owner: $_LB_LOCK_DIR" >&2
rm -rf "$_LB_LOCK_DIR" 2>/dev/null || true
fi
_LB_LOCK_WAIT=0
while ! mkdir "$_LB_LOCK_DIR" 2>/dev/null; do
if [ "$_LB_LOCK_WAIT" -ge 120 ]; then
_LB_LOCK_OWNER="$(cat "$_LB_LOCK_DIR/pid" 2>/dev/null || true)"
if [ -n "$_LB_LOCK_OWNER" ] && kill -0 "$_LB_LOCK_OWNER" 2>/dev/null; then
echo "Timed out waiting for active Python environment lock: $_LB_LOCK_DIR" >&2
exit 1
fi
echo "Timed out waiting for Python environment lock, clearing stale lock: $_LB_LOCK_DIR" >&2
rm -rf "$_LB_LOCK_DIR" 2>/dev/null || true
if mkdir "$_LB_LOCK_DIR" 2>/dev/null; then

View File

@@ -276,7 +276,7 @@ class BoxStdioSessionRuntime:
# to delete them, so refresh source files in place and preserve runtime
# directories instead of rmtree'ing the whole staging root.
with _workspace_copy_lock(process_host_root):
preserved_names = {'.venv', 'venv', 'env', '.env', '.cache', '.tmp', '.langbot'}
preserved_names = {'.venv', 'venv', 'env', '.cache', '.tmp', '.langbot'}
os.makedirs(process_host_workspace, exist_ok=True)
for name in os.listdir(process_host_workspace):
if name in preserved_names:
@@ -288,6 +288,7 @@ class BoxStdioSessionRuntime:
try:
os.unlink(path)
except FileNotFoundError:
# The entry may disappear between listdir and unlink if cleanup races us.
pass
shutil.copytree(
source_path,
@@ -303,7 +304,6 @@ class BoxStdioSessionRuntime:
'.venv',
'venv',
'env',
'.env',
'.cache',
'.tmp',
'.langbot',
@@ -401,18 +401,12 @@ class BoxStdioSessionRuntime:
workspace_root = workspace_path.rstrip('/') or '/workspace'
venv_dir = f'{workspace_root}/.venv'
venv_bin = f'{venv_dir}/bin'
command = ' '.join(
[shlex.quote(payload['command']), *[shlex.quote(arg) for arg in payload.get('args', [])]]
)
command = ' '.join([shlex.quote(payload['command']), *[shlex.quote(arg) for arg in payload.get('args', [])]])
wrapped = dict(payload)
wrapped['command'] = 'sh'
wrapped['args'] = [
'-lc',
(
f'export VIRTUAL_ENV={shlex.quote(venv_dir)}; '
f'export PATH={shlex.quote(venv_bin)}:$PATH; '
f'exec {command}'
),
(f'export VIRTUAL_ENV={shlex.quote(venv_dir)}; export PATH={shlex.quote(venv_bin)}:$PATH; exec {command}'),
]
return wrapped

View File

@@ -1,7 +1,6 @@
from __future__ import annotations
import json
import mimetypes
import os
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
@@ -478,9 +477,7 @@ else:
if host_path and os.path.exists(host_path):
if os.path.isdir(host_path):
return self._build_directory_result(os.listdir(host_path))
result = self._read_text_file_preview(host_path, parameters)
host_root = str(selected_skill.get('package_root', '') or '')
return await self._attach_file_artifact_ref(result, host_path, host_root, path, query)
return self._read_text_file_preview(host_path, parameters)
try:
result = await self.ap.box_service.read_skill_file(selected_skill['name'], relative)
@@ -506,9 +503,7 @@ else:
if os.path.isdir(host_path):
entries = os.listdir(host_path)
return self._build_directory_result(entries)
result = self._read_text_file_preview(host_path, parameters)
host_root = self._get_host_root(selected_skill)
return await self._attach_file_artifact_ref(result, host_path, host_root, path, query)
return self._read_text_file_preview(host_path, parameters)
async def _invoke_write(self, parameters: dict, query: pipeline_query.Query) -> dict:
path = parameters['path']
@@ -695,8 +690,7 @@ else:
'max_bytes': {
'type': 'integer',
'description': (
'Maximum bytes of file content to return. '
f'Defaults to {_DEFAULT_TOOL_RESULT_MAX_BYTES}.'
f'Maximum bytes of file content to return. Defaults to {_DEFAULT_TOOL_RESULT_MAX_BYTES}.'
),
'default': _DEFAULT_TOOL_RESULT_MAX_BYTES,
'minimum': 1,
@@ -984,84 +978,6 @@ else:
raise ValueError('Path escapes the skill package boundary.')
return host_path
def _get_host_root(self, selected_skill: dict | None) -> str:
if selected_skill is not None:
return str(selected_skill.get('package_root', '') or '')
return str(getattr(self.ap.box_service, 'default_workspace', '') or '')
async def _attach_file_artifact_ref(
self,
result: dict,
host_path: str,
host_root: str,
sandbox_path: str,
query: pipeline_query.Query,
) -> dict:
if not result.get('ok') or not result.get('truncated') or result.get('artifact_refs'):
return result
if not host_root or not os.path.isfile(host_path):
return result
run_session = self._get_agent_run_session(query)
if not run_session:
return result
persistence_mgr = getattr(self.ap, 'persistence_mgr', None)
get_db_engine = getattr(persistence_mgr, 'get_db_engine', None)
if not callable(get_db_engine):
return result
try:
from langbot.pkg.agent.runner.artifact_store import ArtifactStore
authorization = run_session.get('authorization', {}) if isinstance(run_session, dict) else {}
mime_type = mimetypes.guess_type(host_path)[0] or 'text/plain'
size_bytes = os.path.getsize(host_path)
metadata = {
'tool_name': READ_TOOL_NAME,
'sandbox_path': sandbox_path,
'truncated_by': result.get('truncated_by'),
'start_line': result.get('start_line'),
'end_line': result.get('end_line'),
'next_offset': result.get('next_offset'),
}
artifact_id = await ArtifactStore(get_db_engine()).register_file_artifact(
artifact_id=None,
host_path=host_path,
host_root=host_root,
artifact_type='file',
source='tool',
mime_type=mime_type,
name=os.path.basename(host_path),
size_bytes=size_bytes,
conversation_id=authorization.get('conversation_id'),
run_id=run_session.get('run_id') if isinstance(run_session, dict) else None,
runner_id=run_session.get('runner_id') if isinstance(run_session, dict) else None,
bot_id=getattr(query, 'bot_uuid', None),
workspace_id=authorization.get('workspace_id'),
thread_id=authorization.get('thread_id'),
metadata=metadata,
)
artifact_ref = {
'artifact_id': artifact_id,
'artifact_type': 'file',
'mime_type': mime_type,
'name': os.path.basename(host_path),
'size_bytes': size_bytes,
}
enriched = dict(result)
enriched['preview'] = str(result.get('content') or '')
enriched['artifact_refs'] = [artifact_ref]
return enriched
except Exception as exc:
self.ap.logger.warning(f'Failed to register read artifact for {sandbox_path}: {exc}')
return result
@staticmethod
def _get_agent_run_session(query: pipeline_query.Query) -> dict | None:
session = getattr(query, '_agent_run_session', None)
return session if isinstance(session, dict) else None
def _normalize_exec_result(self, result: dict) -> dict:
normalized = dict(result)
stdout = str(normalized.get('stdout') or '')

View File

@@ -10,7 +10,6 @@ if typing.TYPE_CHECKING:
from langbot_plugin.api.entities.events import pipeline_query
ACTIVATED_SKILLS_KEY = '_activated_skills'
ACTIVATED_SKILL_NAMES_STATE_KEY = 'host.activated_skills'
PIPELINE_BOUND_SKILLS_KEY = '_pipeline_bound_skills'
SKILL_MOUNT_PREFIX = '/workspace/.skills'
_SKILL_MOUNT_PATTERN = re.compile(r'/workspace/\.skills/([A-Za-z0-9_-]+)')
@@ -73,7 +72,8 @@ def register_activated_skill(query: pipeline_query.Query, skill_data: dict) -> N
activated[skill_name] = skill_data
def _normalize_skill_names(value: typing.Any) -> list[str]:
def normalize_skill_names(value: typing.Any) -> list[str]:
"""Return a de-duplicated list of non-empty skill names."""
if not isinstance(value, list):
return []
@@ -85,21 +85,24 @@ def _normalize_skill_names(value: typing.Any) -> list[str]:
return names
def restore_activated_skills_from_state(
def get_activated_skill_names(query: pipeline_query.Query) -> list[str]:
"""Return activated skill names for callers that own persistence policy."""
return normalize_skill_names(list(get_activated_skills(query).keys()))
def restore_activated_skills(
ap: app.Application,
query: pipeline_query.Query,
state: dict[str, dict[str, typing.Any]],
skill_names: typing.Any,
) -> list[str]:
"""Restore persisted activated skill names into Query variables.
"""Restore caller-provided activated skill names into Query variables.
The state value stores names only. Full skill metadata is rebuilt from the
current pipeline-visible skill cache so removed or unbound skills remain
unavailable to native exec/write/edit.
Persistence and state scope ownership belong to higher-level flows. This
helper only rebuilds current Query state from pipeline-visible skills, so
removed or unbound skills stay unavailable to native exec/write/edit.
"""
conversation_state = state.get('conversation', {}) if isinstance(state, dict) else {}
skill_names = _normalize_skill_names(conversation_state.get(ACTIVATED_SKILL_NAMES_STATE_KEY))
restored: list[str] = []
for skill_name in skill_names:
for skill_name in normalize_skill_names(skill_names):
skill_data = get_visible_skill(ap, query, skill_name)
if skill_data is None:
continue
@@ -108,81 +111,6 @@ def restore_activated_skills_from_state(
return restored
def _get_agent_run_authorization(query: pipeline_query.Query) -> dict[str, typing.Any] | None:
session = getattr(query, '_agent_run_session', None)
if not isinstance(session, dict):
return None
authorization = session.get('authorization')
return authorization if isinstance(authorization, dict) else None
def _get_conversation_state_target(query: pipeline_query.Query) -> tuple[str, str, str, dict[str, typing.Any]] | None:
session = getattr(query, '_agent_run_session', None)
if not isinstance(session, dict):
return None
authorization = _get_agent_run_authorization(query)
if authorization is None:
return None
state_policy = authorization.get('state_policy') or {}
if not state_policy.get('enable_state', True):
return None
state_scopes = state_policy.get('state_scopes', ['conversation', 'actor'])
if 'conversation' not in state_scopes:
return None
state_context = authorization.get('state_context') or {}
scope_keys = state_context.get('scope_keys') or {}
scope_key = scope_keys.get('conversation')
if not scope_key:
return None
runner_id = str(session.get('runner_id') or 'unknown')
binding_identity = str(state_context.get('binding_identity') or 'unknown')
return scope_key, runner_id, binding_identity, state_context
async def persist_activated_skill(ap: app.Application, query: pipeline_query.Query, skill_name: str) -> bool:
"""Persist activated skill names for the current AgentRunner conversation.
Returns False when the call is outside an AgentRunner run or state policy
does not expose a conversation scope. The in-memory Query activation still
remains valid for the current turn.
"""
target = _get_conversation_state_target(query)
if target is None:
return False
persistence_mgr = getattr(ap, 'persistence_mgr', None)
if persistence_mgr is None or not hasattr(persistence_mgr, 'get_db_engine'):
return False
from ....agent.runner.persistent_state_store import get_persistent_state_store
scope_key, runner_id, binding_identity, state_context = target
store = get_persistent_state_store(persistence_mgr.get_db_engine())
existing_names = _normalize_skill_names(await store.state_get(scope_key, ACTIVATED_SKILL_NAMES_STATE_KEY))
if skill_name not in existing_names:
existing_names.append(skill_name)
success, error = await store.state_set(
scope_key=scope_key,
state_key=ACTIVATED_SKILL_NAMES_STATE_KEY,
value=existing_names,
runner_id=runner_id,
binding_identity=binding_identity,
scope='conversation',
context=state_context,
logger=getattr(ap, 'logger', None),
)
if not success:
logger = getattr(ap, 'logger', None)
if logger is not None:
logger.warning(f'Failed to persist activated skill "{skill_name}": {error}')
return success
def parse_skill_mount_path(sandbox_path: str) -> tuple[str | None, str]:
normalized_path = str(sandbox_path or '/workspace').strip() or '/workspace'
if normalized_path == SKILL_MOUNT_PREFIX:

View File

@@ -92,7 +92,6 @@ class SkillToolLoader(loader.ToolLoader):
# Register activated skill for sandbox mount path resolution
skill_loader.register_activated_skill(query, skill_data)
await skill_loader.persist_activated_skill(self.ap, query, skill_name)
# Return SKILL.md content as Tool Result (injects into context)
instructions = skill_data.get('instructions', '')
@@ -117,6 +116,7 @@ class SkillToolLoader(loader.ToolLoader):
'activated': True,
'skill_name': skill_name,
'mount_path': mount_path,
'activated_skill_names': skill_loader.get_activated_skill_names(query),
'content': result_content,
}

View File

@@ -54,7 +54,9 @@ def test_classify_python_workspace_detects_package_and_requirements():
def test_wrap_python_command_with_env_contains_bootstrap_and_command():
command = wrap_python_command_with_env('python script.py')
assert 'python -m venv "$_LB_VENV_DIR"' in command
assert '_LB_SYSTEM_PYTHON="$(command -v python3 || command -v python || true)"' in command
assert '"$_LB_SYSTEM_PYTHON" -m venv "$_LB_VENV_DIR"' in command
assert 'kill -0 "$_LB_LOCK_OWNER"' in command
assert 'export VIRTUAL_ENV="$_LB_VENV_DIR"' in command
assert command.rstrip().endswith('python script.py')

View File

@@ -519,6 +519,7 @@ class TestPythonWorkspacePreparation:
source.mkdir()
(source / 'server.py').write_text('print("new")\n', encoding='utf-8')
(source / 'requirements.txt').write_text('mcp==1.26.0\n', encoding='utf-8')
(source / '.env').write_text('TOKEN=new\n', encoding='utf-8')
process_root = tmp_path / 'shared' / '.mcp' / 'u1'
workspace = process_root / 'workspace'
@@ -526,6 +527,7 @@ class TestPythonWorkspacePreparation:
(workspace / '.venv' / 'bin' / 'python').write_text('', encoding='utf-8')
(workspace / '.langbot').mkdir()
(workspace / '.langbot' / 'python-env.lock').mkdir()
(workspace / '.env').write_text('TOKEN=old\n', encoding='utf-8')
(workspace / 'server.py').write_text('print("old")\n', encoding='utf-8')
(workspace / 'removed.py').write_text('stale\n', encoding='utf-8')
(workspace / 'removed_dir').mkdir()
@@ -535,6 +537,7 @@ class TestPythonWorkspacePreparation:
assert (workspace / 'server.py').read_text(encoding='utf-8') == 'print("new")\n'
assert (workspace / 'requirements.txt').read_text(encoding='utf-8') == 'mcp==1.26.0\n'
assert (workspace / '.env').read_text(encoding='utf-8') == 'TOKEN=new\n'
assert not (workspace / 'removed.py').exists()
assert not (workspace / 'removed_dir').exists()
assert (workspace / '.venv' / 'bin' / 'python').exists()

View File

@@ -193,6 +193,29 @@ class TestSkillPathHelpers:
assert list(result.keys()) == ['visible']
def test_restore_activated_skills_uses_caller_provided_names_and_visibility(self):
from langbot.pkg.provider.tools.loaders.skill import (
ACTIVATED_SKILLS_KEY,
PIPELINE_BOUND_SKILLS_KEY,
get_activated_skill_names,
restore_activated_skills,
)
ap = _make_ap()
ap.skill_mgr = SimpleNamespace(
skills={
'visible': _make_skill_data(name='visible'),
'hidden': _make_skill_data(name='hidden'),
}
)
query = SimpleNamespace(variables={PIPELINE_BOUND_SKILLS_KEY: ['visible']})
restored = restore_activated_skills(ap, query, ['visible', 'hidden', 'visible', ''])
assert restored == ['visible']
assert list(query.variables[ACTIVATED_SKILLS_KEY].keys()) == ['visible']
assert get_activated_skill_names(query) == ['visible']
def test_resolve_virtual_skill_path_allows_visible_skill_reads(self):
from langbot.pkg.provider.tools.loaders.skill import (
PIPELINE_BOUND_SKILLS_KEY,
@@ -282,6 +305,7 @@ class TestSkillToolLoader:
assert result['activated'] is True
assert result['skill_name'] == 'demo'
assert result['mount_path'] == '/workspace/.skills/demo'
assert result['activated_skill_names'] == ['demo']
assert 'Step 1' in result['content']
assert set(query.variables[ACTIVATED_SKILLS_KEY].keys()) == {'demo'}