mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-02 03:55:55 +00:00
feat(box): add session workspace quota enforcement and SDK quota metadata
This commit is contained in:
@@ -24,6 +24,7 @@ from langbot_plugin.box.models import (
|
||||
_INT_ADAPTER = pydantic.TypeAdapter(int)
|
||||
_UTC = _dt.timezone.utc
|
||||
_MAX_RECENT_ERRORS = 50
|
||||
_MIB = 1024 * 1024
|
||||
|
||||
|
||||
def _is_path_under(path: str, root: str) -> bool:
|
||||
@@ -54,6 +55,7 @@ class BoxService:
|
||||
self.allowed_host_mount_roots = self._load_allowed_host_mount_roots()
|
||||
self.default_host_workspace = self._load_default_host_workspace()
|
||||
self.profile = self._load_profile()
|
||||
self.workspace_quota_mb = self._load_workspace_quota_mb()
|
||||
self._recent_errors: collections.deque[dict] = collections.deque(maxlen=_MAX_RECENT_ERRORS)
|
||||
self._shutdown_task = None
|
||||
self._available = False
|
||||
@@ -93,11 +95,22 @@ class BoxService:
|
||||
f'query_id={query.query_id} '
|
||||
f'spec={json.dumps(self._summarize_spec(spec), ensure_ascii=False)}'
|
||||
)
|
||||
try:
|
||||
self._enforce_workspace_quota(spec, phase='before execution')
|
||||
except BoxError as exc:
|
||||
self._record_error(exc, query)
|
||||
raise
|
||||
try:
|
||||
result = await self.client.execute(spec)
|
||||
except BoxError as exc:
|
||||
self._record_error(exc, query)
|
||||
raise
|
||||
try:
|
||||
self._enforce_workspace_quota(spec, phase='after execution')
|
||||
except BoxError as exc:
|
||||
await self._cleanup_exceeded_session(spec)
|
||||
self._record_error(exc, query)
|
||||
raise
|
||||
self.ap.logger.info(
|
||||
'LangBot Box result: '
|
||||
f'query_id={query.query_id} '
|
||||
@@ -141,6 +154,8 @@ class BoxService:
|
||||
spec_payload.setdefault('env', {})
|
||||
if spec_payload.get('host_path') in (None, '') and self.default_host_workspace is not None:
|
||||
spec_payload['host_path'] = self.default_host_workspace
|
||||
if spec_payload.get('workspace_quota_mb') in (None, '') and self.workspace_quota_mb is not None:
|
||||
spec_payload['workspace_quota_mb'] = self.workspace_quota_mb
|
||||
|
||||
self._apply_profile(spec_payload)
|
||||
|
||||
@@ -241,6 +256,7 @@ class BoxService:
|
||||
'memory_mb': spec.memory_mb,
|
||||
'pids_limit': spec.pids_limit,
|
||||
'read_only_rootfs': spec.read_only_rootfs,
|
||||
'workspace_quota_mb': spec.workspace_quota_mb,
|
||||
'env_keys': sorted(spec.env.keys()),
|
||||
'cmd': cmd,
|
||||
}
|
||||
@@ -292,6 +308,18 @@ class BoxService:
|
||||
default_host_workspace = os.path.join(self.shared_host_root, 'default')
|
||||
return os.path.realpath(os.path.abspath(default_host_workspace))
|
||||
|
||||
def _load_workspace_quota_mb(self) -> int | None:
|
||||
raw_value = _get_box_config(self.ap).get('workspace_quota_mb')
|
||||
if raw_value in (None, ''):
|
||||
return None
|
||||
try:
|
||||
value = _INT_ADAPTER.validate_python(raw_value)
|
||||
except pydantic.ValidationError as exc:
|
||||
raise BoxValidationError('workspace_quota_mb must be an integer greater than or equal to 0') from exc
|
||||
if value < 0:
|
||||
raise BoxValidationError('workspace_quota_mb must be greater than or equal to 0')
|
||||
return value
|
||||
|
||||
def _ensure_default_host_workspace(self):
|
||||
if self.default_host_workspace is None:
|
||||
return
|
||||
@@ -356,6 +384,7 @@ class BoxService:
|
||||
'memory_mb',
|
||||
'pids_limit',
|
||||
'read_only_rootfs',
|
||||
'workspace_quota_mb',
|
||||
)
|
||||
|
||||
for field in _PROFILE_FIELDS:
|
||||
@@ -376,6 +405,58 @@ class BoxService:
|
||||
if normalized_timeout > profile.max_timeout_sec:
|
||||
params['timeout_sec'] = profile.max_timeout_sec
|
||||
|
||||
def _get_workspace_size_bytes(self, root: str) -> int:
|
||||
total = 0
|
||||
|
||||
def _walk(path: str):
|
||||
nonlocal total
|
||||
try:
|
||||
with os.scandir(path) as entries:
|
||||
for entry in entries:
|
||||
try:
|
||||
if entry.is_symlink():
|
||||
total += entry.stat(follow_symlinks=False).st_size
|
||||
continue
|
||||
if entry.is_dir(follow_symlinks=False):
|
||||
_walk(entry.path)
|
||||
continue
|
||||
total += entry.stat(follow_symlinks=False).st_size
|
||||
except FileNotFoundError:
|
||||
continue
|
||||
except FileNotFoundError:
|
||||
return
|
||||
|
||||
_walk(root)
|
||||
return total
|
||||
|
||||
def _enforce_workspace_quota(self, spec: BoxSpec, *, phase: str) -> None:
|
||||
if spec.host_path is None or spec.workspace_quota_mb <= 0:
|
||||
return
|
||||
|
||||
host_path = os.path.realpath(spec.host_path)
|
||||
if not os.path.isdir(host_path):
|
||||
return
|
||||
|
||||
used_bytes = self._get_workspace_size_bytes(host_path)
|
||||
limit_bytes = spec.workspace_quota_mb * _MIB
|
||||
if used_bytes <= limit_bytes:
|
||||
return
|
||||
|
||||
raise BoxValidationError(
|
||||
f'workspace quota exceeded {phase}: '
|
||||
f'used={used_bytes} bytes limit={limit_bytes} bytes '
|
||||
f'host_path={host_path} session_id={spec.session_id}'
|
||||
)
|
||||
|
||||
async def _cleanup_exceeded_session(self, spec: BoxSpec) -> None:
|
||||
try:
|
||||
await self.client.delete_session(spec.session_id)
|
||||
except Exception as exc:
|
||||
self.ap.logger.warning(
|
||||
'Failed to clean up Box session after workspace quota was exceeded: '
|
||||
f'session_id={spec.session_id} error={exc}'
|
||||
)
|
||||
|
||||
# ── Observability ─────────────────────────────────────────────────
|
||||
|
||||
def _record_error(self, exc: Exception, query: pipeline_query.Query):
|
||||
|
||||
@@ -133,18 +133,21 @@ def make_app(
|
||||
allowed_host_mount_roots: list[str] | None = None,
|
||||
profile: str = 'default',
|
||||
shared_host_root: str = '',
|
||||
workspace_quota_mb: int | None = None,
|
||||
):
|
||||
box_config = {
|
||||
'profile': profile,
|
||||
'shared_host_root': shared_host_root,
|
||||
'allowed_host_mount_roots': allowed_host_mount_roots or [],
|
||||
'default_host_workspace': '',
|
||||
}
|
||||
if workspace_quota_mb is not None:
|
||||
box_config['workspace_quota_mb'] = workspace_quota_mb
|
||||
|
||||
return SimpleNamespace(
|
||||
logger=logger,
|
||||
instance_config=SimpleNamespace(
|
||||
data={
|
||||
'box': {
|
||||
'profile': profile,
|
||||
'shared_host_root': shared_host_root,
|
||||
'allowed_host_mount_roots': allowed_host_mount_roots or [],
|
||||
'default_host_workspace': '',
|
||||
}
|
||||
}
|
||||
data={'box': box_config}
|
||||
),
|
||||
)
|
||||
|
||||
@@ -429,6 +432,32 @@ class FakeBackendWithOutput(FakeBackend):
|
||||
)
|
||||
|
||||
|
||||
class FakeBackendWritingFiles(FakeBackend):
|
||||
"""Fake backend that writes files into the mounted host workspace during exec."""
|
||||
|
||||
def __init__(self, logger: Mock, files_to_write: list[tuple[str, int]]):
|
||||
super().__init__(logger)
|
||||
self._files_to_write = files_to_write
|
||||
|
||||
async def exec(self, session: BoxSessionInfo, spec: BoxSpec) -> BoxExecutionResult:
|
||||
self.exec_calls.append((session.session_id, spec.cmd))
|
||||
if session.host_path:
|
||||
for relative_path, size in self._files_to_write:
|
||||
host_path = os.path.join(session.host_path, relative_path)
|
||||
os.makedirs(os.path.dirname(host_path), exist_ok=True)
|
||||
with open(host_path, 'wb') as f:
|
||||
f.write(b'x' * size)
|
||||
return BoxExecutionResult(
|
||||
session_id=session.session_id,
|
||||
backend_name=self.name,
|
||||
status=BoxExecutionStatus.COMPLETED,
|
||||
exit_code=0,
|
||||
stdout='wrote files',
|
||||
stderr='',
|
||||
duration_ms=5,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_truncate_short_output_unchanged():
|
||||
logger = Mock()
|
||||
@@ -648,6 +677,64 @@ async def test_profile_default_applies_resource_limits():
|
||||
assert spec.memory_mb == profile.memory_mb
|
||||
assert spec.pids_limit == profile.pids_limit
|
||||
assert spec.read_only_rootfs == profile.read_only_rootfs
|
||||
assert spec.workspace_quota_mb == profile.workspace_quota_mb
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_box_service_applies_workspace_quota_from_config(tmp_path):
|
||||
logger = Mock()
|
||||
backend = FakeBackend(logger)
|
||||
runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300)
|
||||
host_dir = tmp_path / 'default-workspace'
|
||||
host_dir.mkdir()
|
||||
app = make_app(logger, [str(tmp_path)], workspace_quota_mb=32)
|
||||
app.instance_config.data['box']['default_host_workspace'] = str(host_dir)
|
||||
service = BoxService(app, client=_InProcessBoxRuntimeClient(logger, runtime))
|
||||
|
||||
await service.initialize()
|
||||
await service.execute_tool({'command': 'echo hi'}, make_query(43))
|
||||
|
||||
assert backend.start_specs[0].workspace_quota_mb == 32
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_box_service_rejects_execution_when_workspace_already_exceeds_quota(tmp_path):
|
||||
logger = Mock()
|
||||
backend = FakeBackend(logger)
|
||||
runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300)
|
||||
host_dir = tmp_path / 'quota-workspace'
|
||||
host_dir.mkdir()
|
||||
(host_dir / 'already-too-large.bin').write_bytes(b'x' * (2 * 1024 * 1024))
|
||||
app = make_app(logger, [str(tmp_path)], workspace_quota_mb=1)
|
||||
app.instance_config.data['box']['default_host_workspace'] = str(host_dir)
|
||||
service = BoxService(app, client=_InProcessBoxRuntimeClient(logger, runtime))
|
||||
|
||||
await service.initialize()
|
||||
|
||||
with pytest.raises(BoxValidationError, match='workspace quota exceeded before execution'):
|
||||
await service.execute_tool({'command': 'echo hi'}, make_query(44))
|
||||
|
||||
assert backend.start_calls == []
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_box_service_rejects_and_cleans_up_when_execution_exceeds_workspace_quota(tmp_path):
|
||||
logger = Mock()
|
||||
backend = FakeBackendWritingFiles(logger, files_to_write=[('output.bin', 2 * 1024 * 1024)])
|
||||
runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300)
|
||||
host_dir = tmp_path / 'quota-workspace-post'
|
||||
host_dir.mkdir()
|
||||
app = make_app(logger, [str(tmp_path)], workspace_quota_mb=1)
|
||||
app.instance_config.data['box']['default_host_workspace'] = str(host_dir)
|
||||
service = BoxService(app, client=_InProcessBoxRuntimeClient(logger, runtime))
|
||||
|
||||
await service.initialize()
|
||||
|
||||
with pytest.raises(BoxValidationError, match='workspace quota exceeded after execution'):
|
||||
await service.execute_tool({'command': 'generate-output'}, make_query(45))
|
||||
|
||||
assert backend.start_calls == ['45']
|
||||
assert backend.stop_calls == ['45']
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -695,6 +782,8 @@ def test_box_spec_validates_resource_limits():
|
||||
BoxSpec.model_validate({'cmd': 'echo', 'session_id': 's1', 'memory_mb': 10})
|
||||
with pytest.raises(Exception):
|
||||
BoxSpec.model_validate({'cmd': 'echo', 'session_id': 's1', 'pids_limit': 0})
|
||||
with pytest.raises(Exception):
|
||||
BoxSpec.model_validate({'cmd': 'echo', 'session_id': 's1', 'workspace_quota_mb': -1})
|
||||
|
||||
|
||||
# ── Observability tests ───────────────────────────────────────────────
|
||||
|
||||
Reference in New Issue
Block a user