diff --git a/src/langbot/pkg/box/service.py b/src/langbot/pkg/box/service.py index 82eb60b4..e25b86d9 100644 --- a/src/langbot/pkg/box/service.py +++ b/src/langbot/pkg/box/service.py @@ -24,6 +24,7 @@ from langbot_plugin.box.models import ( _INT_ADAPTER = pydantic.TypeAdapter(int) _UTC = _dt.timezone.utc _MAX_RECENT_ERRORS = 50 +_MIB = 1024 * 1024 def _is_path_under(path: str, root: str) -> bool: @@ -54,6 +55,7 @@ class BoxService: self.allowed_host_mount_roots = self._load_allowed_host_mount_roots() self.default_host_workspace = self._load_default_host_workspace() self.profile = self._load_profile() + self.workspace_quota_mb = self._load_workspace_quota_mb() self._recent_errors: collections.deque[dict] = collections.deque(maxlen=_MAX_RECENT_ERRORS) self._shutdown_task = None self._available = False @@ -93,11 +95,22 @@ class BoxService: f'query_id={query.query_id} ' f'spec={json.dumps(self._summarize_spec(spec), ensure_ascii=False)}' ) + try: + self._enforce_workspace_quota(spec, phase='before execution') + except BoxError as exc: + self._record_error(exc, query) + raise try: result = await self.client.execute(spec) except BoxError as exc: self._record_error(exc, query) raise + try: + self._enforce_workspace_quota(spec, phase='after execution') + except BoxError as exc: + await self._cleanup_exceeded_session(spec) + self._record_error(exc, query) + raise self.ap.logger.info( 'LangBot Box result: ' f'query_id={query.query_id} ' @@ -141,6 +154,8 @@ class BoxService: spec_payload.setdefault('env', {}) if spec_payload.get('host_path') in (None, '') and self.default_host_workspace is not None: spec_payload['host_path'] = self.default_host_workspace + if spec_payload.get('workspace_quota_mb') in (None, '') and self.workspace_quota_mb is not None: + spec_payload['workspace_quota_mb'] = self.workspace_quota_mb self._apply_profile(spec_payload) @@ -241,6 +256,7 @@ class BoxService: 'memory_mb': spec.memory_mb, 'pids_limit': spec.pids_limit, 'read_only_rootfs': spec.read_only_rootfs, + 'workspace_quota_mb': spec.workspace_quota_mb, 'env_keys': sorted(spec.env.keys()), 'cmd': cmd, } @@ -292,6 +308,18 @@ class BoxService: default_host_workspace = os.path.join(self.shared_host_root, 'default') return os.path.realpath(os.path.abspath(default_host_workspace)) + def _load_workspace_quota_mb(self) -> int | None: + raw_value = _get_box_config(self.ap).get('workspace_quota_mb') + if raw_value in (None, ''): + return None + try: + value = _INT_ADAPTER.validate_python(raw_value) + except pydantic.ValidationError as exc: + raise BoxValidationError('workspace_quota_mb must be an integer greater than or equal to 0') from exc + if value < 0: + raise BoxValidationError('workspace_quota_mb must be greater than or equal to 0') + return value + def _ensure_default_host_workspace(self): if self.default_host_workspace is None: return @@ -356,6 +384,7 @@ class BoxService: 'memory_mb', 'pids_limit', 'read_only_rootfs', + 'workspace_quota_mb', ) for field in _PROFILE_FIELDS: @@ -376,6 +405,58 @@ class BoxService: if normalized_timeout > profile.max_timeout_sec: params['timeout_sec'] = profile.max_timeout_sec + def _get_workspace_size_bytes(self, root: str) -> int: + total = 0 + + def _walk(path: str): + nonlocal total + try: + with os.scandir(path) as entries: + for entry in entries: + try: + if entry.is_symlink(): + total += entry.stat(follow_symlinks=False).st_size + continue + if entry.is_dir(follow_symlinks=False): + _walk(entry.path) + continue + total += entry.stat(follow_symlinks=False).st_size + except FileNotFoundError: + continue + except FileNotFoundError: + return + + _walk(root) + return total + + def _enforce_workspace_quota(self, spec: BoxSpec, *, phase: str) -> None: + if spec.host_path is None or spec.workspace_quota_mb <= 0: + return + + host_path = os.path.realpath(spec.host_path) + if not os.path.isdir(host_path): + return + + used_bytes = self._get_workspace_size_bytes(host_path) + limit_bytes = spec.workspace_quota_mb * _MIB + if used_bytes <= limit_bytes: + return + + raise BoxValidationError( + f'workspace quota exceeded {phase}: ' + f'used={used_bytes} bytes limit={limit_bytes} bytes ' + f'host_path={host_path} session_id={spec.session_id}' + ) + + async def _cleanup_exceeded_session(self, spec: BoxSpec) -> None: + try: + await self.client.delete_session(spec.session_id) + except Exception as exc: + self.ap.logger.warning( + 'Failed to clean up Box session after workspace quota was exceeded: ' + f'session_id={spec.session_id} error={exc}' + ) + # ── Observability ───────────────────────────────────────────────── def _record_error(self, exc: Exception, query: pipeline_query.Query): diff --git a/tests/unit_tests/box/test_box_service.py b/tests/unit_tests/box/test_box_service.py index 71f61dea..e6ddd0e3 100644 --- a/tests/unit_tests/box/test_box_service.py +++ b/tests/unit_tests/box/test_box_service.py @@ -133,18 +133,21 @@ def make_app( allowed_host_mount_roots: list[str] | None = None, profile: str = 'default', shared_host_root: str = '', + workspace_quota_mb: int | None = None, ): + box_config = { + 'profile': profile, + 'shared_host_root': shared_host_root, + 'allowed_host_mount_roots': allowed_host_mount_roots or [], + 'default_host_workspace': '', + } + if workspace_quota_mb is not None: + box_config['workspace_quota_mb'] = workspace_quota_mb + return SimpleNamespace( logger=logger, instance_config=SimpleNamespace( - data={ - 'box': { - 'profile': profile, - 'shared_host_root': shared_host_root, - 'allowed_host_mount_roots': allowed_host_mount_roots or [], - 'default_host_workspace': '', - } - } + data={'box': box_config} ), ) @@ -429,6 +432,32 @@ class FakeBackendWithOutput(FakeBackend): ) +class FakeBackendWritingFiles(FakeBackend): + """Fake backend that writes files into the mounted host workspace during exec.""" + + def __init__(self, logger: Mock, files_to_write: list[tuple[str, int]]): + super().__init__(logger) + self._files_to_write = files_to_write + + async def exec(self, session: BoxSessionInfo, spec: BoxSpec) -> BoxExecutionResult: + self.exec_calls.append((session.session_id, spec.cmd)) + if session.host_path: + for relative_path, size in self._files_to_write: + host_path = os.path.join(session.host_path, relative_path) + os.makedirs(os.path.dirname(host_path), exist_ok=True) + with open(host_path, 'wb') as f: + f.write(b'x' * size) + return BoxExecutionResult( + session_id=session.session_id, + backend_name=self.name, + status=BoxExecutionStatus.COMPLETED, + exit_code=0, + stdout='wrote files', + stderr='', + duration_ms=5, + ) + + @pytest.mark.asyncio async def test_truncate_short_output_unchanged(): logger = Mock() @@ -648,6 +677,64 @@ async def test_profile_default_applies_resource_limits(): assert spec.memory_mb == profile.memory_mb assert spec.pids_limit == profile.pids_limit assert spec.read_only_rootfs == profile.read_only_rootfs + assert spec.workspace_quota_mb == profile.workspace_quota_mb + + +@pytest.mark.asyncio +async def test_box_service_applies_workspace_quota_from_config(tmp_path): + logger = Mock() + backend = FakeBackend(logger) + runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) + host_dir = tmp_path / 'default-workspace' + host_dir.mkdir() + app = make_app(logger, [str(tmp_path)], workspace_quota_mb=32) + app.instance_config.data['box']['default_host_workspace'] = str(host_dir) + service = BoxService(app, client=_InProcessBoxRuntimeClient(logger, runtime)) + + await service.initialize() + await service.execute_tool({'command': 'echo hi'}, make_query(43)) + + assert backend.start_specs[0].workspace_quota_mb == 32 + + +@pytest.mark.asyncio +async def test_box_service_rejects_execution_when_workspace_already_exceeds_quota(tmp_path): + logger = Mock() + backend = FakeBackend(logger) + runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) + host_dir = tmp_path / 'quota-workspace' + host_dir.mkdir() + (host_dir / 'already-too-large.bin').write_bytes(b'x' * (2 * 1024 * 1024)) + app = make_app(logger, [str(tmp_path)], workspace_quota_mb=1) + app.instance_config.data['box']['default_host_workspace'] = str(host_dir) + service = BoxService(app, client=_InProcessBoxRuntimeClient(logger, runtime)) + + await service.initialize() + + with pytest.raises(BoxValidationError, match='workspace quota exceeded before execution'): + await service.execute_tool({'command': 'echo hi'}, make_query(44)) + + assert backend.start_calls == [] + + +@pytest.mark.asyncio +async def test_box_service_rejects_and_cleans_up_when_execution_exceeds_workspace_quota(tmp_path): + logger = Mock() + backend = FakeBackendWritingFiles(logger, files_to_write=[('output.bin', 2 * 1024 * 1024)]) + runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) + host_dir = tmp_path / 'quota-workspace-post' + host_dir.mkdir() + app = make_app(logger, [str(tmp_path)], workspace_quota_mb=1) + app.instance_config.data['box']['default_host_workspace'] = str(host_dir) + service = BoxService(app, client=_InProcessBoxRuntimeClient(logger, runtime)) + + await service.initialize() + + with pytest.raises(BoxValidationError, match='workspace quota exceeded after execution'): + await service.execute_tool({'command': 'generate-output'}, make_query(45)) + + assert backend.start_calls == ['45'] + assert backend.stop_calls == ['45'] @pytest.mark.asyncio @@ -695,6 +782,8 @@ def test_box_spec_validates_resource_limits(): BoxSpec.model_validate({'cmd': 'echo', 'session_id': 's1', 'memory_mb': 10}) with pytest.raises(Exception): BoxSpec.model_validate({'cmd': 'echo', 'session_id': 's1', 'pids_limit': 0}) + with pytest.raises(Exception): + BoxSpec.model_validate({'cmd': 'echo', 'session_id': 's1', 'workspace_quota_mb': -1}) # ── Observability tests ───────────────────────────────────────────────