From 36292102f9646d9eb9dc6efb14390b64fd576b7b Mon Sep 17 00:00:00 2001 From: huanghuoguoguo <1051233107@qq.com> Date: Thu, 4 Jun 2026 11:10:29 +0800 Subject: [PATCH] feat(agent-runner): add bounded native tool artifacts --- .../pkg/agent/runner/artifact_store.py | 132 ++++- src/langbot/pkg/plugin/handler.py | 5 +- .../pkg/provider/tools/loaders/native.py | 521 ++++++++++++++++-- src/langbot/pkg/provider/tools/toolmgr.py | 30 +- tests/unit_tests/agent/test_artifact_store.py | 56 +- .../unit_tests/plugin/test_handler_actions.py | 1 + .../provider/test_tool_manager_native.py | 276 +++++++++- 7 files changed, 952 insertions(+), 69 deletions(-) diff --git a/src/langbot/pkg/agent/runner/artifact_store.py b/src/langbot/pkg/agent/runner/artifact_store.py index 299f2ff0..888e8c0e 100644 --- a/src/langbot/pkg/agent/runner/artifact_store.py +++ b/src/langbot/pkg/agent/runner/artifact_store.py @@ -6,6 +6,7 @@ import datetime import typing import uuid import base64 +import os import sqlalchemy from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession @@ -14,6 +15,8 @@ from sqlalchemy.orm import sessionmaker from ...entity.persistence.artifact import AgentArtifact from ...entity.persistence.bstorage import BinaryStorage +_FILE_ARTIFACT_METADATA_KEY = '_langbot_file_artifact' + class ArtifactStore: """Store for AgentArtifact records. @@ -36,6 +39,64 @@ class ArtifactStore: engine, class_=AsyncSession, expire_on_commit=False ) + async def register_file_artifact( + self, + *, + artifact_id: str | None, + host_path: str, + host_root: str, + artifact_type: str = 'file', + source: str = 'tool', + mime_type: str | None = None, + name: str | None = None, + size_bytes: int | None = None, + sha256: str | None = None, + conversation_id: str | None = None, + run_id: str | None = None, + runner_id: str | None = None, + bot_id: str | None = None, + workspace_id: str | None = None, + expires_at: datetime.datetime | None = None, + metadata: dict[str, typing.Any] | None = None, + ) -> str: + """Register a Host-owned artifact backed by a bounded local file path. + + The public metadata intentionally excludes the real host path. Reads go + through read_artifact(), which revalidates the path against host_root. + """ + real_path, real_root = self._validate_file_artifact_path(host_path, host_root) + if not os.path.isfile(real_path): + raise ValueError('file artifact path must point to a file') + + public_metadata = dict(metadata or {}) + public_metadata[_FILE_ARTIFACT_METADATA_KEY] = { + 'path': real_path, + 'root': real_root, + } + + if size_bytes is None: + size_bytes = os.path.getsize(real_path) + + return await self.register_artifact( + artifact_id=artifact_id, + artifact_type=artifact_type, + source=source, + storage_key=f'file:{uuid.uuid4().hex}', + storage_type='file', + mime_type=mime_type, + name=name or os.path.basename(real_path), + size_bytes=size_bytes, + sha256=sha256, + conversation_id=conversation_id, + run_id=run_id, + runner_id=runner_id, + bot_id=bot_id, + workspace_id=workspace_id, + expires_at=expires_at, + metadata=public_metadata, + content=None, + ) + async def register_artifact( self, artifact_id: str | None, @@ -244,6 +305,9 @@ class ArtifactStore: 'has_more': has_more, } + if storage_type == 'file': + return self._read_file_storage(record, artifact_id, offset, limit) + # For other storage types, return storage reference # (caller can use file_key for chunked transfer) return { @@ -277,6 +341,72 @@ class ArtifactStore: return None return row.value + def _read_file_storage( + self, + record: AgentArtifact, + artifact_id: str, + offset: int, + limit: int, + ) -> dict[str, typing.Any] | None: + metadata = self._load_metadata(record.metadata_json) + file_info = metadata.get(_FILE_ARTIFACT_METADATA_KEY) + if not isinstance(file_info, dict): + return None + + host_path = file_info.get('path') + host_root = file_info.get('root') + if not isinstance(host_path, str) or not isinstance(host_root, str): + return None + + real_path, _ = self._validate_file_artifact_path(host_path, host_root) + if not os.path.isfile(real_path): + return None + + file_size = os.path.getsize(real_path) + if offset >= file_size: + content = b'' + else: + with open(real_path, 'rb') as f: + f.seek(offset) + content = f.read(limit) + + return { + 'artifact_id': artifact_id, + 'mime_type': record.mime_type, + 'size_bytes': file_size, + 'offset': offset, + 'length': len(content), + 'content_base64': base64.b64encode(content).decode('utf-8'), + 'file_key': None, + 'has_more': offset + len(content) < file_size, + } + + @staticmethod + def _validate_file_artifact_path(host_path: str, host_root: str) -> tuple[str, str]: + real_path = os.path.realpath(host_path) + real_root = os.path.realpath(host_root) + if not real_root: + raise ValueError('file artifact root is required') + if not (real_path == real_root or real_path.startswith(real_root + os.sep)): + raise ValueError('file artifact path escapes allowed root') + return real_path, real_root + + @staticmethod + def _load_metadata(metadata_json: str | None) -> dict[str, typing.Any]: + if not metadata_json: + return {} + try: + metadata = json.loads(metadata_json) + except Exception: + return {} + return metadata if isinstance(metadata, dict) else {} + + @staticmethod + def _public_metadata(metadata_json: str | None) -> dict[str, typing.Any]: + metadata = ArtifactStore._load_metadata(metadata_json) + metadata.pop(_FILE_ARTIFACT_METADATA_KEY, None) + return metadata + def _row_to_public_dict(self, row: AgentArtifact) -> dict[str, typing.Any]: """Convert an AgentArtifact row to public dict. @@ -296,5 +426,5 @@ class ArtifactStore: 'runner_id': row.runner_id, 'created_at': int(row.created_at.timestamp()) if row.created_at else None, 'expires_at': int(row.expires_at.timestamp()) if row.expires_at else None, - 'metadata': json.loads(row.metadata_json) if row.metadata_json else {}, + 'metadata': self._public_metadata(row.metadata_json), } diff --git a/src/langbot/pkg/plugin/handler.py b/src/langbot/pkg/plugin/handler.py index 2fb78cb0..46eb9891 100644 --- a/src/langbot/pkg/plugin/handler.py +++ b/src/langbot/pkg/plugin/handler.py @@ -354,7 +354,10 @@ def _resolve_action_query(data: dict[str, Any], session: Any | None, ap: app.App query_id = session.get('query_id') if query_id is None: query_id = data.get('query_id') - return _get_cached_query(ap, query_id) + query = _get_cached_query(ap, query_id) + if query is not None and session is not None: + object.__setattr__(query, '_agent_run_session', session) + return query def _resolve_remove_think(data: dict[str, Any], query: Any | None) -> bool: diff --git a/src/langbot/pkg/provider/tools/loaders/native.py b/src/langbot/pkg/provider/tools/loaders/native.py index d6ef11d1..ca2b3f97 100644 --- a/src/langbot/pkg/provider/tools/loaders/native.py +++ b/src/langbot/pkg/provider/tools/loaders/native.py @@ -1,6 +1,7 @@ from __future__ import annotations import json +import mimetypes import os import langbot_plugin.api.entities.builtin.resource.tool as resource_tool @@ -21,6 +22,15 @@ _ALL_TOOL_NAMES = {EXEC_TOOL_NAME, READ_TOOL_NAME, WRITE_TOOL_NAME, EDIT_TOOL_NA # Skip these dirs during grep walk to avoid noise _SKIP_DIRS = {'.git', 'node_modules', '__pycache__', '.venv', 'venv', '.tox', 'dist', 'build'} +_DEFAULT_READ_MAX_LINES = 2000 +_MAX_READ_MAX_LINES = 10000 +_DEFAULT_TOOL_RESULT_MAX_BYTES = 50 * 1024 +_BOX_FILE_SCRIPT_MAX_BYTES = 2048 +_GLOB_MAX_MATCHES = 100 +_GREP_MAX_MATCHES = 200 +_GREP_MAX_FILES = 5000 +_GREP_MAX_LINE_CHARS = 500 + class NativeToolLoader(loader.ToolLoader): def __init__(self, ap): @@ -138,6 +148,7 @@ class NativeToolLoader(loader.ToolLoader): # via execute_tool. Skills are mounted at /workspace/.skills/{name}/ # via extra_mounts built by BoxService. result = await self.ap.box_service.execute_tool(parameters, query) + result = self._normalize_exec_result(result) if selected_skill is not None: self._refresh_skill_from_disk(selected_skill) @@ -226,19 +237,65 @@ class NativeToolLoader(loader.ToolLoader): except Exception: return {'ok': False, 'error': stdout or 'Box file operation returned no result'} - async def _read_workspace_via_box(self, path: str, query: pipeline_query.Query) -> dict: + async def _read_workspace_via_box(self, path: str, parameters: dict, query: pipeline_query.Query) -> dict: + offset = self._positive_int(parameters.get('offset'), default=1) + max_lines = self._positive_int( + parameters.get('limit'), + default=_DEFAULT_READ_MAX_LINES, + max_value=_MAX_READ_MAX_LINES, + ) + # Box file fallback returns through exec stdout, which is already capped + # by BoxService. Keep this payload small enough to remain valid JSON. + max_bytes = min( + self._positive_int(parameters.get('max_bytes'), default=_DEFAULT_TOOL_RESULT_MAX_BYTES), + _BOX_FILE_SCRIPT_MAX_BYTES, + ) script = f""" import json, os path = {json.dumps(path)} +offset = {offset} +max_lines = {max_lines} +max_bytes = {max_bytes} if not path.startswith('/workspace'): print(json.dumps({{'ok': False, 'error': 'Path must be under /workspace.'}})) elif not os.path.exists(path): print(json.dumps({{'ok': False, 'error': f'File not found: {{path}}'}})) elif os.path.isdir(path): - print(json.dumps({{'ok': True, 'content': '\\n'.join(sorted(os.listdir(path))), 'is_directory': True}})) + entries = sorted(os.listdir(path)) + content = '\\n'.join(entries) + print(json.dumps({{'ok': True, 'content': content, 'is_directory': True, 'total': len(entries), 'truncated': False}})) else: + lines = [] + output_bytes = 0 + end_line = offset - 1 + truncated = False + next_offset = None with open(path, 'r', encoding='utf-8', errors='replace') as f: - print(json.dumps({{'ok': True, 'content': f.read()}})) + for line_number, line in enumerate(f, 1): + if line_number < offset: + continue + if len(lines) >= max_lines: + truncated = True + next_offset = line_number + break + line_bytes = len(line.encode('utf-8')) + if output_bytes + line_bytes > max_bytes: + truncated = True + next_offset = line_number + break + lines.append(line.rstrip('\\n')) + output_bytes += line_bytes + end_line = line_number + print(json.dumps({{ + 'ok': True, + 'content': '\\n'.join(lines), + 'truncated': truncated, + 'start_line': offset, + 'end_line': end_line, + 'next_offset': next_offset, + 'max_lines': max_lines, + 'max_bytes': max_bytes, + }})) """.strip() return await self._run_workspace_file_script(script, query) @@ -306,12 +363,27 @@ else: if not any(part in skip_dirs for part in item.parts) ] hits.sort(key=lambda item: item.stat().st_mtime if item.exists() else 0, reverse=True) - shown = hits[:100] + shown = hits[:{_GLOB_MAX_MATCHES}] matches = [] + output_bytes = 0 + truncated_by_bytes = False for item in shown: rel = os.path.relpath(str(item), path) - matches.append(os.path.join(path, rel).replace(os.sep, '/')) - print(json.dumps({{'ok': True, 'matches': matches, 'total': len(hits), 'truncated': len(hits) > 100}})) + sandbox_path = os.path.join(path, rel).replace(os.sep, '/') + entry_bytes = len(sandbox_path.encode('utf-8')) + (1 if matches else 0) + if output_bytes + entry_bytes > {_DEFAULT_TOOL_RESULT_MAX_BYTES}: + truncated_by_bytes = True + break + matches.append(sandbox_path) + output_bytes += entry_bytes + print(json.dumps({{ + 'ok': True, + 'matches': matches, + 'preview': '\\n'.join(matches), + 'total': len(hits), + 'truncated': len(hits) > len(matches) or truncated_by_bytes, + 'truncated_by': 'bytes' if truncated_by_bytes else ('matches' if len(hits) > len(matches) else None), + }})) """.strip() return await self._run_workspace_file_script(script, query) @@ -349,29 +421,54 @@ else: continue if item.is_file(): files.append(item) - if len(files) >= 5000: + if len(files) >= {_GREP_MAX_FILES}: break matches = [] + output_bytes = 0 + truncated_by = None for fp in files: try: - text = fp.read_text(errors='ignore') + handle = fp.open('r', encoding='utf-8', errors='ignore') except OSError: continue - for lineno, line in enumerate(text.splitlines(), 1): - if regex.search(line): - if base.is_file(): - file_path = path - else: - rel = os.path.relpath(str(fp), path) - file_path = os.path.join(path, rel).replace(os.sep, '/') - matches.append({{'file': file_path, 'line': lineno, 'content': line.rstrip()}}) - if len(matches) >= 200: - break - if len(matches) >= 200: + with handle: + for lineno, line in enumerate(handle, 1): + if regex.search(line): + if base.is_file(): + file_path = path + else: + rel = os.path.relpath(str(fp), path) + file_path = os.path.join(path, rel).replace(os.sep, '/') + content = line.rstrip() + line_truncated = False + if len(content) > {_GREP_MAX_LINE_CHARS}: + content = content[:{_GREP_MAX_LINE_CHARS}] + '... [truncated]' + line_truncated = True + entry = {{'file': file_path, 'line': lineno, 'content': content}} + entry_bytes = len(json.dumps(entry, ensure_ascii=False).encode('utf-8')) + 1 + if output_bytes + entry_bytes > {_DEFAULT_TOOL_RESULT_MAX_BYTES}: + truncated_by = 'bytes' + break + if line_truncated and truncated_by is None: + truncated_by = 'line' + matches.append(entry) + output_bytes += entry_bytes + if len(matches) >= {_GREP_MAX_MATCHES}: + truncated_by = truncated_by or 'matches' + break + if truncated_by == 'bytes' or len(matches) >= {_GREP_MAX_MATCHES}: + break + if truncated_by == 'bytes' or len(matches) >= {_GREP_MAX_MATCHES}: break - print(json.dumps({{'ok': True, 'matches': matches, 'total': len(matches), 'truncated': len(matches) >= 200}})) + print(json.dumps({{ + 'ok': True, + 'matches': matches, + 'total': len(matches), + 'truncated': truncated_by is not None, + 'truncated_by': truncated_by, + }})) """.strip() return await self._run_workspace_file_script(script, query) @@ -386,14 +483,22 @@ else: ) if skill_request is not None and hasattr(self.ap.box_service, 'read_skill_file'): selected_skill, relative = skill_request + host_path = self._resolve_skill_host_path(selected_skill, relative) + if host_path and os.path.exists(host_path): + if os.path.isdir(host_path): + return self._build_directory_result(os.listdir(host_path)) + result = self._read_text_file_preview(host_path, parameters) + host_root = str(selected_skill.get('package_root', '') or '') + return await self._attach_file_artifact_ref(result, host_path, host_root, path, query) + try: result = await self.ap.box_service.read_skill_file(selected_skill['name'], relative) - return {'ok': True, 'content': result.get('content', '')} + return self._build_read_result_from_text(str(result.get('content', '')), parameters) except Exception: try: result = await self.ap.box_service.list_skill_files(selected_skill['name'], relative) entries = [entry['name'] for entry in result.get('entries', [])] - return {'ok': True, 'content': '\n'.join(sorted(entries)), 'is_directory': True} + return self._build_directory_result(entries) except Exception as exc: return {'ok': False, 'error': str(exc)} @@ -404,15 +509,15 @@ else: include_activated=True, ) if self._should_use_box_workspace_files(selected_skill): - return await self._read_workspace_via_box(path, query) + return await self._read_workspace_via_box(path, parameters, query) if not os.path.exists(host_path): return {'ok': False, 'error': f'File not found: {path}'} if os.path.isdir(host_path): entries = os.listdir(host_path) - return {'ok': True, 'content': '\n'.join(sorted(entries)), 'is_directory': True} - with open(host_path, 'r', errors='replace') as f: - content = f.read() - return {'ok': True, 'content': content} + return self._build_directory_result(entries) + result = self._read_text_file_preview(host_path, parameters) + host_root = self._get_host_root(selected_skill) + return await self._attach_file_artifact_ref(result, host_path, host_root, path, query) async def _invoke_write(self, parameters: dict, query: pipeline_query.Query) -> dict: path = parameters['path'] @@ -583,6 +688,29 @@ else: 'type': 'string', 'description': 'Absolute path to the file (must be under /workspace).', }, + 'offset': { + 'type': 'integer', + 'description': '1-indexed line number to start reading from. Defaults to 1.', + 'default': 1, + 'minimum': 1, + }, + 'limit': { + 'type': 'integer', + 'description': f'Maximum number of lines to return. Defaults to {_DEFAULT_READ_MAX_LINES}.', + 'default': _DEFAULT_READ_MAX_LINES, + 'minimum': 1, + 'maximum': _MAX_READ_MAX_LINES, + }, + 'max_bytes': { + 'type': 'integer', + 'description': ( + 'Maximum bytes of file content to return. ' + f'Defaults to {_DEFAULT_TOOL_RESULT_MAX_BYTES}.' + ), + 'default': _DEFAULT_TOOL_RESULT_MAX_BYTES, + 'minimum': 1, + 'maximum': _DEFAULT_TOOL_RESULT_MAX_BYTES, + }, }, 'required': ['path'], 'additionalProperties': False, @@ -739,22 +867,30 @@ else: hits.sort(key=lambda p: p.stat().st_mtime if p.exists() else 0, reverse=True) total = len(hits) - shown = hits[:100] + shown = hits[:_GLOB_MAX_MATCHES] # Convert back to sandbox paths sandbox_paths = [] + output_bytes = 0 + truncated_by_bytes = False for h in shown: rel = os.path.relpath(str(h), host_path) sandbox_path = os.path.join(path, rel) + entry_bytes = len(sandbox_path.encode('utf-8')) + (1 if sandbox_paths else 0) + if output_bytes + entry_bytes > _DEFAULT_TOOL_RESULT_MAX_BYTES: + truncated_by_bytes = True + break sandbox_paths.append(sandbox_path) + output_bytes += entry_bytes - result_lines = sandbox_paths - result = '\n'.join(result_lines) - - if total > 100: - result += f'\n... ({total} matches, showing first 100)' - - return {'ok': True, 'matches': result_lines, 'total': total, 'truncated': total > 100} + return { + 'ok': True, + 'matches': sandbox_paths, + 'preview': '\n'.join(sandbox_paths), + 'total': total, + 'truncated': total > len(sandbox_paths) or truncated_by_bytes, + 'truncated_by': 'bytes' if truncated_by_bytes else ('matches' if total > len(sandbox_paths) else None), + } async def _invoke_grep(self, parameters: dict, query: pipeline_query.Query) -> dict: pattern = parameters['pattern'] @@ -790,32 +926,46 @@ else: files = self._grep_walk(base, include) matches = [] + output_bytes = 0 + truncated_by = None for fp in files: try: - text = fp.read_text(errors='ignore') + handle = fp.open('r', encoding='utf-8', errors='ignore') except OSError: continue - for lineno, line in enumerate(text.splitlines(), 1): - if regex.search(line): - rel = os.path.relpath(str(fp), host_path) - sandbox_path = os.path.join(path, rel) - matches.append( - { + with handle: + for lineno, line in enumerate(handle, 1): + if regex.search(line): + rel = os.path.relpath(str(fp), host_path) + sandbox_path = os.path.join(path, rel) + content, line_truncated = self._truncate_grep_line(line.rstrip()) + entry = { 'file': sandbox_path, 'line': lineno, - 'content': line.rstrip(), + 'content': content, } - ) - if len(matches) >= 200: - break - if len(matches) >= 200: + entry_bytes = len(json.dumps(entry, ensure_ascii=False).encode('utf-8')) + 1 + if output_bytes + entry_bytes > _DEFAULT_TOOL_RESULT_MAX_BYTES: + truncated_by = 'bytes' + break + if line_truncated and truncated_by is None: + truncated_by = 'line' + matches.append(entry) + output_bytes += entry_bytes + if len(matches) >= _GREP_MAX_MATCHES: + truncated_by = truncated_by or 'matches' + break + if truncated_by == 'bytes' or len(matches) >= _GREP_MAX_MATCHES: + break + if truncated_by == 'bytes' or len(matches) >= _GREP_MAX_MATCHES: break return { 'ok': True, 'matches': matches, 'total': len(matches), - 'truncated': len(matches) >= 200, + 'truncated': truncated_by is not None, + 'truncated_by': truncated_by, } @staticmethod @@ -827,10 +977,283 @@ else: continue if item.is_file(): results.append(item) - if len(results) >= 5000: + if len(results) >= _GREP_MAX_FILES: break return results + @staticmethod + def _resolve_skill_host_path(selected_skill: dict, relative: str) -> str | None: + package_root = str(selected_skill.get('package_root', '') or '').strip() + if not package_root: + return None + + host_root = os.path.realpath(package_root) + host_path = os.path.realpath(os.path.join(host_root, relative)) + if not (host_path == host_root or host_path.startswith(host_root + os.sep)): + raise ValueError('Path escapes the skill package boundary.') + return host_path + + def _get_host_root(self, selected_skill: dict | None) -> str: + if selected_skill is not None: + return str(selected_skill.get('package_root', '') or '') + return str(getattr(self.ap.box_service, 'default_workspace', '') or '') + + async def _attach_file_artifact_ref( + self, + result: dict, + host_path: str, + host_root: str, + sandbox_path: str, + query: pipeline_query.Query, + ) -> dict: + if not result.get('ok') or not result.get('truncated') or result.get('artifact_refs'): + return result + if not host_root or not os.path.isfile(host_path): + return result + + run_session = self._get_agent_run_session(query) + if not run_session: + return result + + persistence_mgr = getattr(self.ap, 'persistence_mgr', None) + get_db_engine = getattr(persistence_mgr, 'get_db_engine', None) + if not callable(get_db_engine): + return result + + try: + from langbot.pkg.agent.runner.artifact_store import ArtifactStore + + authorization = run_session.get('authorization', {}) if isinstance(run_session, dict) else {} + mime_type = mimetypes.guess_type(host_path)[0] or 'text/plain' + size_bytes = os.path.getsize(host_path) + metadata = { + 'tool_name': READ_TOOL_NAME, + 'sandbox_path': sandbox_path, + 'truncated_by': result.get('truncated_by'), + 'start_line': result.get('start_line'), + 'end_line': result.get('end_line'), + 'next_offset': result.get('next_offset'), + } + artifact_id = await ArtifactStore(get_db_engine()).register_file_artifact( + artifact_id=None, + host_path=host_path, + host_root=host_root, + artifact_type='file', + source='tool', + mime_type=mime_type, + name=os.path.basename(host_path), + size_bytes=size_bytes, + conversation_id=authorization.get('conversation_id'), + run_id=run_session.get('run_id') if isinstance(run_session, dict) else None, + runner_id=run_session.get('runner_id') if isinstance(run_session, dict) else None, + bot_id=getattr(query, 'bot_uuid', None), + metadata=metadata, + ) + artifact_ref = { + 'artifact_id': artifact_id, + 'artifact_type': 'file', + 'mime_type': mime_type, + 'name': os.path.basename(host_path), + 'size_bytes': size_bytes, + } + enriched = dict(result) + enriched['preview'] = str(result.get('content') or '') + enriched['artifact_refs'] = [artifact_ref] + return enriched + except Exception as exc: + self.ap.logger.warning(f'Failed to register read artifact for {sandbox_path}: {exc}') + return result + + @staticmethod + def _get_agent_run_session(query: pipeline_query.Query) -> dict | None: + session = getattr(query, '_agent_run_session', None) + return session if isinstance(session, dict) else None + + def _normalize_exec_result(self, result: dict) -> dict: + normalized = dict(result) + stdout = str(normalized.get('stdout') or '') + stderr = str(normalized.get('stderr') or '') + stdout, stdout_capped = self._truncate_text_to_bytes_with_flag(stdout, _DEFAULT_TOOL_RESULT_MAX_BYTES) + stderr, stderr_capped = self._truncate_text_to_bytes_with_flag(stderr, _DEFAULT_TOOL_RESULT_MAX_BYTES) + normalized['stdout'] = stdout + normalized['stderr'] = stderr + normalized['stdout_truncated'] = bool(normalized.get('stdout_truncated') or stdout_capped) + normalized['stderr_truncated'] = bool(normalized.get('stderr_truncated') or stderr_capped) + + if stdout and stderr: + preview_raw = f'stdout:\n{stdout}\n\nstderr:\n{stderr}' + else: + preview_raw = stdout or stderr + preview, preview_capped = self._truncate_text_to_bytes_with_flag(preview_raw, _DEFAULT_TOOL_RESULT_MAX_BYTES) + normalized['preview'] = preview + normalized['truncated'] = bool( + normalized['stdout_truncated'] or normalized['stderr_truncated'] or preview_capped + ) + if preview_capped and not normalized.get('truncated_by'): + normalized['truncated_by'] = 'bytes' + return normalized + + def _build_directory_result(self, entries: list[str]) -> dict: + sorted_entries = sorted(str(entry) for entry in entries) + content = '\n'.join(sorted_entries) + preview = self._truncate_text_to_bytes(content, _DEFAULT_TOOL_RESULT_MAX_BYTES) + truncated = preview != content + return { + 'ok': True, + 'content': preview, + 'is_directory': True, + 'total': len(sorted_entries), + 'truncated': truncated, + 'truncated_by': 'bytes' if truncated else None, + } + + def _read_text_file_preview(self, host_path: str, parameters: dict) -> dict: + offset = self._positive_int(parameters.get('offset'), default=1) + max_lines = self._positive_int( + parameters.get('limit'), + default=_DEFAULT_READ_MAX_LINES, + max_value=_MAX_READ_MAX_LINES, + ) + max_bytes = self._positive_int( + parameters.get('max_bytes'), + default=_DEFAULT_TOOL_RESULT_MAX_BYTES, + max_value=_DEFAULT_TOOL_RESULT_MAX_BYTES, + ) + lines: list[str] = [] + output_bytes = 0 + end_line = offset - 1 + truncated = False + truncated_by: str | None = None + next_offset: int | None = None + + with open(host_path, 'r', encoding='utf-8', errors='replace') as f: + for line_number, line in enumerate(f, 1): + if line_number < offset: + continue + if len(lines) >= max_lines: + truncated = True + truncated_by = 'lines' + next_offset = line_number + break + + line_bytes = len(line.encode('utf-8')) + if output_bytes + line_bytes > max_bytes: + truncated = True + truncated_by = 'bytes' + next_offset = line_number + break + + lines.append(line.rstrip('\n')) + output_bytes += line_bytes + end_line = line_number + + if not lines and truncated_by == 'bytes': + content = ( + f'[Line {next_offset or offset} exceeds the {self._format_size(max_bytes)} read limit. ' + 'Use exec with a byte-range command for this line, or read a different offset.]' + ) + else: + content = '\n'.join(lines) + + return { + 'ok': True, + 'content': content, + 'truncated': truncated, + 'truncated_by': truncated_by, + 'start_line': offset, + 'end_line': end_line, + 'next_offset': next_offset, + 'max_lines': max_lines, + 'max_bytes': max_bytes, + } + + def _build_read_result_from_text(self, content: str, parameters: dict) -> dict: + offset = self._positive_int(parameters.get('offset'), default=1) + max_lines = self._positive_int( + parameters.get('limit'), + default=_DEFAULT_READ_MAX_LINES, + max_value=_MAX_READ_MAX_LINES, + ) + max_bytes = self._positive_int( + parameters.get('max_bytes'), + default=_DEFAULT_TOOL_RESULT_MAX_BYTES, + max_value=_DEFAULT_TOOL_RESULT_MAX_BYTES, + ) + all_lines = content.splitlines() + start_index = offset - 1 + if start_index >= len(all_lines) and all_lines: + return {'ok': False, 'error': f'Offset {offset} is beyond end of file ({len(all_lines)} lines total)'} + output_lines: list[str] = [] + output_bytes = 0 + truncated = False + truncated_by: str | None = None + next_offset: int | None = None + for index, line in enumerate(all_lines[start_index:], start_index + 1): + if len(output_lines) >= max_lines: + truncated = True + truncated_by = 'lines' + next_offset = index + break + line_bytes = len(line.encode('utf-8')) + (1 if output_lines else 0) + if output_bytes + line_bytes > max_bytes: + truncated = True + truncated_by = 'bytes' + next_offset = index + break + output_lines.append(line) + output_bytes += line_bytes + + end_line = offset + len(output_lines) - 1 + return { + 'ok': True, + 'content': '\n'.join(output_lines), + 'truncated': truncated, + 'truncated_by': truncated_by, + 'start_line': offset, + 'end_line': end_line, + 'next_offset': next_offset, + 'max_lines': max_lines, + 'max_bytes': max_bytes, + } + + @staticmethod + def _positive_int(value, *, default: int, max_value: int | None = None) -> int: + try: + parsed = int(value) + except (TypeError, ValueError): + parsed = default + if parsed <= 0: + parsed = default + if max_value is not None: + parsed = min(parsed, max_value) + return parsed + + @staticmethod + def _truncate_grep_line(line: str) -> tuple[str, bool]: + if len(line) <= _GREP_MAX_LINE_CHARS: + return line, False + return f'{line[:_GREP_MAX_LINE_CHARS]}... [truncated]', True + + @staticmethod + def _truncate_text_to_bytes(text: str, max_bytes: int) -> str: + return NativeToolLoader._truncate_text_to_bytes_with_flag(text, max_bytes)[0] + + @staticmethod + def _truncate_text_to_bytes_with_flag(text: str, max_bytes: int) -> tuple[str, bool]: + data = text.encode('utf-8') + if len(data) <= max_bytes: + return text, False + truncated = data[:max_bytes] + while truncated and (truncated[-1] & 0xC0) == 0x80: + truncated = truncated[:-1] + return truncated.decode('utf-8', errors='ignore'), True + + @staticmethod + def _format_size(bytes_count: int) -> str: + if bytes_count < 1024: + return f'{bytes_count}B' + return f'{bytes_count / 1024:.1f}KB' + def _summarize_parameters(self, parameters: dict) -> dict: summary = dict(parameters) cmd = str(summary.get('command', '')).strip() diff --git a/src/langbot/pkg/provider/tools/toolmgr.py b/src/langbot/pkg/provider/tools/toolmgr.py index 51bc8217..7e59d56f 100644 --- a/src/langbot/pkg/provider/tools/toolmgr.py +++ b/src/langbot/pkg/provider/tools/toolmgr.py @@ -68,23 +68,33 @@ class ToolManager: return all_functions async def get_tool_by_name(self, name: str) -> resource_tool.LLMTool | None: - """Get tool by name from plugin or MCP loaders. + """Get tool by name from any active loader. Args: - name: Tool name (format: plugin_author/plugin_name/tool_name or mcp_server/tool_name) + name: Tool name. Returns: LLMTool if found, None otherwise """ - # Try plugin loader first - tool = await self.plugin_tool_loader._get_tool(name) - if tool: - return tool + for tool_loader in ( + self.native_tool_loader, + self.plugin_tool_loader, + self.mcp_tool_loader, + self.skill_tool_loader, + ): + tool = await self._get_tool_from_loader(tool_loader, name) + if tool: + return tool - # Try MCP loader - tool = await self.mcp_tool_loader._get_tool(name) - if tool: - return tool + return None + + async def _get_tool_from_loader(self, tool_loader: typing.Any, name: str) -> resource_tool.LLMTool | None: + if hasattr(tool_loader, '_get_tool'): + return await tool_loader._get_tool(name) + + for tool in await tool_loader.get_tools(): + if tool.name == name: + return tool return None diff --git a/tests/unit_tests/agent/test_artifact_store.py b/tests/unit_tests/agent/test_artifact_store.py index 8d599362..13ea9b8a 100644 --- a/tests/unit_tests/agent/test_artifact_store.py +++ b/tests/unit_tests/agent/test_artifact_store.py @@ -5,11 +5,9 @@ import pytest from unittest.mock import MagicMock, AsyncMock, patch import base64 import datetime -import asyncio from langbot.pkg.agent.runner.artifact_store import ArtifactStore from langbot.pkg.agent.runner.session_registry import ( - AgentRunSessionRegistry, get_session_registry, ) from .conftest import make_session @@ -24,7 +22,6 @@ class TestArtifactStore: Note: The new store uses AsyncSession, so we need to mock the session factory behavior. """ - from unittest.mock import MagicMock, AsyncMock, patch from sqlalchemy.ext.asyncio import AsyncEngine engine = MagicMock(spec=AsyncEngine) @@ -452,10 +449,7 @@ class TestArtifactStoreRealSQLite: async def db_engine(self): """Create an in-memory SQLite database for testing.""" from sqlalchemy.ext.asyncio import create_async_engine - from sqlalchemy import text from langbot.pkg.entity.persistence.base import Base - from langbot.pkg.entity.persistence.artifact import AgentArtifact - from langbot.pkg.entity.persistence.bstorage import BinaryStorage engine = create_async_engine("sqlite+aiosqlite:///:memory:") @@ -580,6 +574,56 @@ class TestArtifactStoreRealSQLite: assert result["has_more"] is True assert result["length"] == 100 + @pytest.mark.asyncio + async def test_file_artifact_range_read_and_public_metadata(self, db_engine, tmp_path): + """File-backed artifacts read ranges without exposing host paths.""" + store = ArtifactStore(db_engine) + content = b"0123456789" * 20 + file_path = tmp_path / "large.txt" + file_path.write_bytes(content) + + artifact_id = await store.register_file_artifact( + artifact_id="art_file_001", + host_path=str(file_path), + host_root=str(tmp_path), + source="tool", + mime_type="text/plain", + name="large.txt", + conversation_id="conv_001", + run_id="run_001", + metadata={"sandbox_path": "/workspace/large.txt"}, + ) + + metadata = await store.get_metadata(artifact_id) + assert metadata is not None + assert metadata["artifact_id"] == "art_file_001" + assert metadata["metadata"] == {"sandbox_path": "/workspace/large.txt"} + assert str(file_path) not in str(metadata) + + result = await store.read_artifact(artifact_id, offset=10, limit=15) + assert result is not None + assert result["offset"] == 10 + assert result["length"] == 15 + assert result["size_bytes"] == len(content) + assert result["has_more"] is True + assert base64.b64decode(result["content_base64"]) == content[10:25] + + @pytest.mark.asyncio + async def test_register_file_artifact_rejects_path_escape(self, db_engine, tmp_path): + """File-backed artifacts must stay inside their declared host root.""" + store = ArtifactStore(db_engine) + root = tmp_path / "root" + root.mkdir() + outside = tmp_path / "outside.txt" + outside.write_text("outside") + + with pytest.raises(ValueError, match="escapes"): + await store.register_file_artifact( + artifact_id="art_file_escape", + host_path=str(outside), + host_root=str(root), + ) + @pytest.mark.asyncio async def test_metadata_sdk_validation(self, db_engine): """Test that metadata can be validated by SDK ArtifactMetadata.""" diff --git a/tests/unit_tests/plugin/test_handler_actions.py b/tests/unit_tests/plugin/test_handler_actions.py index 60b52552..6c7480b9 100644 --- a/tests/unit_tests/plugin/test_handler_actions.py +++ b/tests/unit_tests/plugin/test_handler_actions.py @@ -589,6 +589,7 @@ class TestAgentRunProxyActions: await registry.unregister(run_id) assert response.code == 0 + assert getattr(query, '_agent_run_session')['run_id'] == run_id app.tool_mgr.execute_func_call.assert_awaited_once_with( name='test/search', parameters={'q': 'langbot'}, diff --git a/tests/unit_tests/provider/test_tool_manager_native.py b/tests/unit_tests/provider/test_tool_manager_native.py index 117a20fd..2d654fac 100644 --- a/tests/unit_tests/provider/test_tool_manager_native.py +++ b/tests/unit_tests/provider/test_tool_manager_native.py @@ -3,13 +3,18 @@ from __future__ import annotations import os import tempfile from types import SimpleNamespace -from unittest.mock import AsyncMock, Mock +from unittest.mock import AsyncMock, Mock, patch import pytest import langbot_plugin.api.entities.builtin.resource.tool as resource_tool -from langbot.pkg.provider.tools.loaders.native import NativeToolLoader +from langbot.pkg.provider.tools.loaders.native import ( + _DEFAULT_TOOL_RESULT_MAX_BYTES, + _GLOB_MAX_MATCHES, + _GREP_MAX_MATCHES, + NativeToolLoader, +) from langbot.pkg.provider.tools.toolmgr import ToolManager @@ -81,6 +86,23 @@ async def test_tool_manager_routes_native_tool_calls(): assert result == {'backend': 'fake'} +@pytest.mark.asyncio +async def test_tool_manager_get_tool_by_name_resolves_native_and_skill_tools(): + manager = ToolManager(SimpleNamespace()) + manager.native_tool_loader = StubLoader([make_tool('exec')]) + manager.skill_tool_loader = StubLoader([make_tool('activate')]) + manager.plugin_tool_loader = StubLoader([make_tool('plugin_tool')]) + manager.mcp_tool_loader = StubLoader([make_tool('mcp_tool')]) + + native_tool = await manager.get_tool_by_name('exec') + skill_tool = await manager.get_tool_by_name('activate') + + assert native_tool is not None + assert native_tool.name == 'exec' + assert skill_tool is not None + assert skill_tool.name == 'activate' + + @pytest.mark.asyncio async def test_native_tool_loader_hides_tools_when_box_unavailable(): loader = NativeToolLoader(SimpleNamespace(box_service=SimpleNamespace(available=False))) @@ -119,6 +141,7 @@ def _make_loader_with_workspace(tmpdir: str) -> tuple[NativeToolLoader, Mock]: def _make_query() -> Mock: q = Mock() q.query_id = 'test-query-1' + q.variables = {} return q @@ -133,6 +156,9 @@ async def test_read_file(): assert result['ok'] is True assert result['content'] == 'hello world' + assert result['truncated'] is False + assert result['start_line'] == 1 + assert result['end_line'] == 1 @pytest.mark.asyncio @@ -159,6 +185,136 @@ async def test_read_directory(): assert result['ok'] is True assert result['is_directory'] is True assert 'a.txt' in result['content'] + assert result['total'] == 2 + assert result['truncated'] is False + + +@pytest.mark.asyncio +async def test_read_file_supports_line_window(): + with tempfile.TemporaryDirectory() as tmpdir: + loader, _ = _make_loader_with_workspace(tmpdir) + content = '\n'.join(f'line-{line_no}' for line_no in range(1, 7)) + with open(os.path.join(tmpdir, 'large.txt'), 'w') as f: + f.write(content) + + result = await loader.invoke_tool( + 'read', + {'path': '/workspace/large.txt', 'offset': 2, 'limit': 3}, + _make_query(), + ) + + assert result['ok'] is True + assert result['content'] == 'line-2\nline-3\nline-4' + assert result['truncated'] is True + assert result['truncated_by'] == 'lines' + assert result['start_line'] == 2 + assert result['end_line'] == 4 + assert result['next_offset'] == 5 + + +@pytest.mark.asyncio +async def test_read_file_is_bounded_by_bytes(): + with tempfile.TemporaryDirectory() as tmpdir: + loader, _ = _make_loader_with_workspace(tmpdir) + with open(os.path.join(tmpdir, 'wide.txt'), 'w') as f: + f.write(('x' * 128) + '\nsecond line') + + result = await loader.invoke_tool( + 'read', + {'path': '/workspace/wide.txt', 'max_bytes': 32}, + _make_query(), + ) + + assert result['ok'] is True + assert result['truncated'] is True + assert result['truncated_by'] == 'bytes' + assert result['next_offset'] == 1 + assert result['content'].startswith('[Line 1 exceeds') + assert len(result['content']) < 200 + + +@pytest.mark.asyncio +async def test_skill_read_uses_host_preview_when_package_root_available(): + with tempfile.TemporaryDirectory() as tmpdir: + skill_root = os.path.join(tmpdir, 'skill-demo') + os.makedirs(skill_root) + with open(os.path.join(skill_root, 'large.txt'), 'w') as f: + f.write('first\nsecond\nthird') + + box_service = SimpleNamespace( + available=True, + default_workspace=tmpdir, + read_skill_file=AsyncMock(return_value={'content': 'should not be used'}), + ) + skill_mgr = SimpleNamespace(skills={'demo': {'name': 'demo', 'package_root': skill_root}}) + loader = NativeToolLoader(SimpleNamespace(box_service=box_service, skill_mgr=skill_mgr, logger=Mock())) + + result = await loader.invoke_tool( + 'read', + {'path': '/workspace/.skills/demo/large.txt', 'limit': 1}, + _make_query(), + ) + + assert result['ok'] is True + assert result['content'] == 'first' + assert result['truncated'] is True + assert result['next_offset'] == 2 + box_service.read_skill_file.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_read_truncated_file_returns_host_artifact_ref_for_agent_run(): + with tempfile.TemporaryDirectory() as tmpdir: + engine = object() + logger = Mock() + box_service = SimpleNamespace(available=True, default_workspace=tmpdir) + persistence_mgr = SimpleNamespace(get_db_engine=Mock(return_value=engine)) + loader = NativeToolLoader( + SimpleNamespace(box_service=box_service, persistence_mgr=persistence_mgr, logger=logger) + ) + with open(os.path.join(tmpdir, 'large.txt'), 'w') as f: + f.write('first\nsecond\nthird') + + query = _make_query() + query.bot_uuid = 'bot-001' + query._agent_run_session = { + 'run_id': 'run-001', + 'runner_id': 'plugin:test/runner/default', + 'authorization': {'conversation_id': 'conv-001'}, + } + + with patch('langbot.pkg.agent.runner.artifact_store.ArtifactStore') as store_cls: + store = store_cls.return_value + store.register_file_artifact = AsyncMock(return_value='artifact-file-001') + + result = await loader.invoke_tool( + 'read', + {'path': '/workspace/large.txt', 'limit': 1}, + query, + ) + + assert result['ok'] is True + assert result['content'] == 'first' + assert result['preview'] == 'first' + assert result['truncated'] is True + assert result['artifact_refs'] == [ + { + 'artifact_id': 'artifact-file-001', + 'artifact_type': 'file', + 'mime_type': 'text/plain', + 'name': 'large.txt', + 'size_bytes': os.path.getsize(os.path.join(tmpdir, 'large.txt')), + } + ] + store_cls.assert_called_once_with(engine) + store.register_file_artifact.assert_awaited_once() + call_kwargs = store.register_file_artifact.await_args.kwargs + assert call_kwargs['host_path'] == os.path.realpath(os.path.join(tmpdir, 'large.txt')) + assert call_kwargs['host_root'] == tmpdir + assert call_kwargs['conversation_id'] == 'conv-001' + assert call_kwargs['run_id'] == 'run-001' + assert call_kwargs['runner_id'] == 'plugin:test/runner/default' + assert call_kwargs['metadata']['sandbox_path'] == '/workspace/large.txt' @pytest.mark.asyncio @@ -248,3 +404,119 @@ async def test_path_escape_blocked(): with pytest.raises(ValueError, match='escapes'): await loader.invoke_tool('read', {'path': '/workspace/../../etc/passwd'}, _make_query()) + + +@pytest.mark.asyncio +async def test_glob_result_is_bounded(): + with tempfile.TemporaryDirectory() as tmpdir: + loader, _ = _make_loader_with_workspace(tmpdir) + for index in range(_GLOB_MAX_MATCHES + 5): + with open(os.path.join(tmpdir, f'file-{index:03d}.txt'), 'w') as f: + f.write(str(index)) + + result = await loader.invoke_tool( + 'glob', + {'path': '/workspace', 'pattern': '*.txt'}, + _make_query(), + ) + + assert result['ok'] is True + assert len(result['matches']) == _GLOB_MAX_MATCHES + assert result['total'] == _GLOB_MAX_MATCHES + 5 + assert result['truncated'] is True + assert result['truncated_by'] == 'matches' + assert result['preview'].splitlines() == result['matches'] + + +@pytest.mark.asyncio +async def test_grep_result_is_bounded_by_match_count(): + with tempfile.TemporaryDirectory() as tmpdir: + loader, _ = _make_loader_with_workspace(tmpdir) + with open(os.path.join(tmpdir, 'hits.txt'), 'w') as f: + for index in range(_GREP_MAX_MATCHES + 5): + f.write(f'needle {index}\n') + + result = await loader.invoke_tool( + 'grep', + {'path': '/workspace', 'pattern': 'needle', 'include': '*.txt'}, + _make_query(), + ) + + assert result['ok'] is True + assert len(result['matches']) == _GREP_MAX_MATCHES + assert result['total'] == _GREP_MAX_MATCHES + assert result['truncated'] is True + assert result['truncated_by'] == 'matches' + + +@pytest.mark.asyncio +async def test_grep_truncates_long_matching_line(): + with tempfile.TemporaryDirectory() as tmpdir: + loader, _ = _make_loader_with_workspace(tmpdir) + with open(os.path.join(tmpdir, 'wide.txt'), 'w') as f: + f.write('needle ' + ('x' * 600)) + + result = await loader.invoke_tool( + 'grep', + {'path': '/workspace', 'pattern': 'needle', 'include': '*.txt'}, + _make_query(), + ) + + assert result['ok'] is True + assert len(result['matches']) == 1 + assert result['matches'][0]['content'].endswith('... [truncated]') + assert result['truncated'] is True + assert result['truncated_by'] == 'line' + + +@pytest.mark.asyncio +async def test_exec_result_adds_preview_and_truncated_flag(): + with tempfile.TemporaryDirectory() as tmpdir: + box_service = SimpleNamespace( + available=True, + default_workspace=tmpdir, + execute_tool=AsyncMock( + return_value={ + 'ok': True, + 'stdout': 'stdout text', + 'stderr': 'stderr text', + 'stdout_truncated': True, + 'stderr_truncated': False, + } + ), + ) + loader = NativeToolLoader(SimpleNamespace(box_service=box_service, logger=Mock())) + + result = await loader.invoke_tool('exec', {'command': 'echo ok'}, _make_query()) + + assert result['ok'] is True + assert result['truncated'] is True + assert result['preview'] == 'stdout:\nstdout text\n\nstderr:\nstderr text' + box_service.execute_tool.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_exec_result_caps_untrusted_large_output(): + with tempfile.TemporaryDirectory() as tmpdir: + box_service = SimpleNamespace( + available=True, + default_workspace=tmpdir, + execute_tool=AsyncMock( + return_value={ + 'ok': True, + 'stdout': 'x' * (_DEFAULT_TOOL_RESULT_MAX_BYTES + 128), + 'stderr': '', + 'stdout_truncated': False, + 'stderr_truncated': False, + } + ), + ) + loader = NativeToolLoader(SimpleNamespace(box_service=box_service, logger=Mock())) + + result = await loader.invoke_tool('exec', {'command': 'echo ok'}, _make_query()) + + assert result['ok'] is True + assert len(result['stdout'].encode('utf-8')) <= _DEFAULT_TOOL_RESULT_MAX_BYTES + assert result['stdout_truncated'] is True + assert result['truncated'] is True + assert result['preview'] == result['stdout']