From 2c09af406edfb2df6413ff3544c94b800a5c6f03 Mon Sep 17 00:00:00 2001 From: huanghuoguoguo <60681390+huanghuoguoguo@users.noreply.github.com> Date: Tue, 16 Jun 2026 23:29:04 +0800 Subject: [PATCH] feat(agent-runner): expose stats for control plane --- .../pkg/agent/runner/run_ledger_store.py | 356 +++++++++++++++++- src/langbot/pkg/plugin/handler.py | 171 ++++++++- .../unit_tests/agent/test_run_ledger_store.py | 3 +- 3 files changed, 511 insertions(+), 19 deletions(-) diff --git a/src/langbot/pkg/agent/runner/run_ledger_store.py b/src/langbot/pkg/agent/runner/run_ledger_store.py index 5f0b2de7c..8f89df941 100644 --- a/src/langbot/pkg/agent/runner/run_ledger_store.py +++ b/src/langbot/pkg/agent/runner/run_ledger_store.py @@ -466,17 +466,56 @@ class RunLedgerStore: self, *, statuses: list[str] | None = None, + labels: dict[str, str] | None = None, limit: int = 100, - ) -> list[dict[str, typing.Any]]: - """List runtime registry rows.""" + ) -> tuple[list[dict[str, typing.Any]], int]: + """List runtime registry rows. + + Args: + statuses: Filter by status list + labels: Filter by labels (key-value pairs) + limit: Maximum number of rows to return + + Returns: + Tuple of (runtimes, total_count). + """ limit = min(max(int(limit), 1), 500) async with self._session_factory() as session: - query = sqlalchemy.select(AgentRuntime) + # Build base query with status filter + base_query = sqlalchemy.select(AgentRuntime) if statuses: - query = query.where(AgentRuntime.status.in_(statuses)) - query = query.order_by(AgentRuntime.id.asc()).limit(limit) - result = await session.execute(query) - return [self._runtime_to_dict(row) for row in result.scalars().all()] + base_query = base_query.where(AgentRuntime.status.in_(statuses)) + + # Get total count (before label filtering) + if not labels: + # Simple case - can count directly in DB + count_query = sqlalchemy.select(sqlalchemy.func.count(AgentRuntime.id)) + if statuses: + count_query = count_query.where(AgentRuntime.status.in_(statuses)) + count_result = await session.execute(count_query) + total_count = count_result.scalar() or 0 + + # Get items + query = base_query.order_by(AgentRuntime.id.asc()).limit(limit) + result = await session.execute(query) + runtimes = [self._runtime_to_dict(row) for row in result.scalars().all()] + else: + # Need to fetch all and filter by labels in Python + query = base_query.order_by(AgentRuntime.id.asc()) + result = await session.execute(query) + all_runtimes = [self._runtime_to_dict(row) for row in result.scalars().all()] + + # Filter by labels + runtimes = [ + rt for rt in all_runtimes + if all(rt.get('labels', {}).get(k) == v for k, v in labels.items()) + ] + total_count = len(runtimes) + + # Apply limit after filtering + runtimes = runtimes[:limit] + + return runtimes, total_count async def mark_stale_runtimes( self, @@ -532,10 +571,25 @@ class RunLedgerStore: workspace_id: str | None = None, thread_id: str | None = None, strict_thread: bool = False, - ) -> tuple[list[dict[str, typing.Any]], int | None, bool]: - """Page runs by scope.""" + ) -> tuple[list[dict[str, typing.Any]], int | None, bool, int]: + """Page runs by scope. + + Returns: + Tuple of (items, next_cursor, has_more, total_count). + """ limit = min(max(int(limit), 1), 100) async with self._session_factory() as session: + # First get total count + count_query = sqlalchemy.select(sqlalchemy.func.count(AgentRun.id)) + if conversation_id is not None: + count_query = count_query.where(AgentRun.conversation_id == conversation_id) + if statuses: + count_query = count_query.where(AgentRun.status.in_(statuses)) + count_query = self._apply_scope_filters(count_query, bot_id, workspace_id, thread_id, strict_thread) + count_result = await session.execute(count_query) + total_count = count_result.scalar() or 0 + + # Then get items query = sqlalchemy.select(AgentRun) if conversation_id is not None: query = query.where(AgentRun.conversation_id == conversation_id) @@ -551,7 +605,8 @@ class RunLedgerStore: items = [self._run_to_dict(row) for row in rows[:limit]] has_more = len(rows) > limit next_cursor = items[-1]['id'] if items and has_more else None - return items, next_cursor, has_more + + return items, next_cursor, has_more, total_count async def page_run_events( self, @@ -689,3 +744,284 @@ class RunLedgerStore: 'artifact_refs': _json_loads(row.artifact_refs_json, []), 'metadata': _json_loads(row.metadata_json, {}), } + + async def get_run_stats( + self, + *, + start_time: int, + end_time: int, + runner_id: str | None = None, + ) -> dict[str, typing.Any]: + """Get run statistics within a time window. + + Args: + start_time: Unix timestamp for start of window + end_time: Unix timestamp for end of window + runner_id: Optional filter by runner + + Returns: + Dict with status counts, rates, and duration stats. + """ + from sqlalchemy import func + + start_dt = _epoch_to_datetime(start_time) + end_dt = _epoch_to_datetime(end_time) + + async with self._session_factory() as session: + # Base filter for time window + base_filter = [ + AgentRun.created_at >= start_dt, + AgentRun.created_at <= end_dt, + ] + if runner_id: + base_filter.append(AgentRun.runner_id == runner_id) + + # Count by status + status_query = ( + sqlalchemy.select( + AgentRun.status, + func.count(AgentRun.id).label('count') + ) + .where(*base_filter) + .group_by(AgentRun.status) + ) + status_result = await session.execute(status_query) + status_counts = {row.status: row.count for row in status_result} + + total_count = sum(status_counts.values()) + completed_count = status_counts.get('completed', 0) + failed_count = status_counts.get('failed', 0) + status_counts.get('timeout', 0) + + # Calculate rates + window_hours = max((end_time - start_time) / 3600, 0.001) + throughput = total_count / window_hours if total_count > 0 else 0 + success_rate = completed_count / total_count if total_count > 0 else None + failure_rate = failed_count / total_count if total_count > 0 else None + + # Duration stats for completed runs - compute in Python for DB compatibility + avg_duration_seconds = None + avg_queue_wait_seconds = None + + # Fetch completed runs with timing data + timing_query = ( + sqlalchemy.select( + AgentRun.started_at, + AgentRun.finished_at, + AgentRun.created_at, + ) + .where( + AgentRun.status == 'completed', + AgentRun.started_at.is_not(None), + AgentRun.finished_at.is_not(None), + *base_filter + ) + ) + timing_result = await session.execute(timing_query) + timing_rows = timing_result.all() + + if timing_rows: + durations = [] + for row in timing_rows: + if row.finished_at and row.started_at: + delta = row.finished_at - row.started_at + durations.append(delta.total_seconds()) + if durations: + avg_duration_seconds = round(sum(durations) / len(durations), 2) + + # Queue wait time - compute in Python + queue_query = ( + sqlalchemy.select( + AgentRun.created_at, + AgentRun.started_at, + ) + .where( + AgentRun.started_at.is_not(None), + *base_filter + ) + ) + queue_result = await session.execute(queue_query) + queue_rows = queue_result.all() + + if queue_rows: + waits = [] + for row in queue_rows: + if row.started_at and row.created_at: + delta = row.started_at - row.created_at + wait_seconds = delta.total_seconds() + if wait_seconds >= 0: # Only count positive waits + waits.append(wait_seconds) + if waits: + avg_queue_wait_seconds = round(sum(waits) / len(waits), 2) + + return { + 'start_time': start_time, + 'end_time': end_time, + 'total_count': total_count, + 'created_count': status_counts.get('created', 0), + 'queued_count': status_counts.get('queued', 0), + 'claimed_count': status_counts.get('claimed', 0), + 'running_count': status_counts.get('running', 0), + 'completed_count': completed_count, + 'failed_count': status_counts.get('failed', 0), + 'cancelled_count': status_counts.get('cancelled', 0), + 'timeout_count': status_counts.get('timeout', 0), + 'throughput_per_hour': round(throughput, 2), + 'success_rate': round(success_rate, 4) if success_rate is not None else None, + 'failure_rate': round(failure_rate, 4) if failure_rate is not None else None, + 'avg_duration_seconds': avg_duration_seconds, + 'p50_duration_seconds': None, # Requires more complex calculation + 'p95_duration_seconds': None, + 'p99_duration_seconds': None, + 'avg_queue_wait_seconds': avg_queue_wait_seconds, + } + + async def get_runtime_stats(self) -> dict[str, typing.Any]: + """Get runtime registry statistics. + + Returns: + Dict with counts, heartbeat health, and capacity. + """ + import time + from sqlalchemy import func + + now = _utc_now() + + async with self._session_factory() as session: + # Count by status + status_query = ( + sqlalchemy.select( + AgentRuntime.status, + func.count(AgentRuntime.id).label('count') + ) + .group_by(AgentRuntime.status) + ) + status_result = await session.execute(status_query) + status_counts = {row.status: row.count for row in status_result} + + total_count = sum(status_counts.values()) + online_count = status_counts.get('online', 0) + stale_count = status_counts.get('stale', 0) + + # Heartbeat age stats - compute in Python for DB compatibility + avg_heartbeat_age = None + max_heartbeat_age = None + + heartbeat_query = ( + sqlalchemy.select(AgentRuntime.last_heartbeat_at) + .where(AgentRuntime.last_heartbeat_at.is_not(None)) + ) + heartbeat_result = await session.execute(heartbeat_query) + heartbeat_rows = heartbeat_result.all() + + if heartbeat_rows: + ages = [] + for row in heartbeat_rows: + if row.last_heartbeat_at: + delta = now - row.last_heartbeat_at + age_seconds = delta.total_seconds() + if age_seconds >= 0: + ages.append(age_seconds) + if ages: + avg_heartbeat_age = round(sum(ages) / len(ages), 2) + max_heartbeat_age = round(max(ages), 2) + + # Count active runs (claimed by runtimes) + active_runs_query = ( + sqlalchemy.select(func.count(AgentRun.id)) + .where(AgentRun.status.in_(['running', 'claimed'])) + ) + active_runs_result = await session.execute(active_runs_query) + active_runs = active_runs_result.scalar() or 0 + + return { + 'total_count': total_count, + 'online_count': online_count, + 'stale_count': stale_count, + 'avg_heartbeat_age_seconds': avg_heartbeat_age, + 'max_heartbeat_age_seconds': max_heartbeat_age, + 'active_runs': active_runs, + 'claimed_runs': active_runs, # Same as active_runs for now + } + + async def get_runner_stats( + self, + *, + start_time: int, + end_time: int, + limit: int = 50, + ) -> list[dict[str, typing.Any]]: + """Get runner-aggregated statistics. + + Args: + start_time: Unix timestamp for start of window + end_time: Unix timestamp for end of window + limit: Maximum number of runners to return + + Returns: + List of dicts with per-runner statistics. + """ + from sqlalchemy import func + + start_dt = _epoch_to_datetime(start_time) + end_dt = _epoch_to_datetime(end_time) + limit = min(max(limit, 1), 100) + + async with self._session_factory() as session: + # Aggregate runs by runner_id + query = ( + sqlalchemy.select( + AgentRun.runner_id, + func.count(AgentRun.id).label('total'), + func.sum( + sqlalchemy.case( + (AgentRun.status.in_(['queued', 'claimed', 'running']), 1), + else_=0 + ) + ).label('active'), + func.sum( + sqlalchemy.case( + (AgentRun.status == 'completed', 1), + else_=0 + ) + ).label('completed'), + func.sum( + sqlalchemy.case( + (AgentRun.status.in_(['failed', 'timeout']), 1), + else_=0 + ) + ).label('failed'), + ) + .where( + AgentRun.created_at >= start_dt, + AgentRun.created_at <= end_dt, + AgentRun.runner_id.is_not(None), + ) + .group_by(AgentRun.runner_id) + .order_by(func.count(AgentRun.id).desc()) + .limit(limit) + ) + + result = await session.execute(query) + rows = result.all() + + stats = [] + for row in rows: + runner_id = row.runner_id or 'unknown' + total = row.total or 0 + completed = row.completed or 0 + failed = row.failed or 0 + success_rate = completed / total if total > 0 else None + + stats.append({ + 'runner_id': runner_id, + 'runner_label': None, # Would need to join with runner descriptors + 'plugin_identity': None, + 'total_runs': total, + 'active_runs': row.active or 0, + 'completed_runs': completed, + 'failed_runs': failed, + 'success_rate': round(success_rate, 4) if success_rate else None, + 'avg_duration_seconds': None, # Would need more complex query + }) + + return stats diff --git a/src/langbot/pkg/plugin/handler.py b/src/langbot/pkg/plugin/handler.py index 2d78cb9b3..05e9ad195 100644 --- a/src/langbot/pkg/plugin/handler.py +++ b/src/langbot/pkg/plugin/handler.py @@ -2194,7 +2194,7 @@ class RuntimeConnectionHandler(handler.Handler): store = RunLedgerStore(self.ap.persistence_mgr.get_db_engine()) try: - items, next_cursor, has_more = await store.list_runs( + items, next_cursor, has_more, total_count = await store.list_runs( conversation_id=conversation_id, statuses=[str(status) for status in statuses] if statuses else None, before_id=before_id, @@ -2219,6 +2219,7 @@ class RuntimeConnectionHandler(handler.Handler): 'next_cursor': str(next_cursor) if next_cursor else None, 'prev_cursor': None, 'has_more': has_more, + 'total_count': total_count, } ) except Exception as e: @@ -2723,16 +2724,11 @@ class RuntimeConnectionHandler(handler.Handler): store = RunLedgerStore(self.ap.persistence_mgr.get_db_engine()) try: - runtimes = await store.list_runtimes( + runtimes, total_count = await store.list_runtimes( statuses=[str(status) for status in statuses] if statuses else None, + labels=labels, limit=data.get('limit', 50), ) - if labels: - runtimes = [ - runtime - for runtime in runtimes - if all(runtime.get('labels', {}).get(key) == value for key, value in labels.items()) - ] if is_admin: await _record_agent_runner_admin_action( self.ap, @@ -2751,6 +2747,7 @@ class RuntimeConnectionHandler(handler.Handler): 'next_cursor': None, 'prev_cursor': None, 'has_more': False, + 'total_count': total_count, } ) except Exception as e: @@ -2822,6 +2819,164 @@ class RuntimeConnectionHandler(handler.Handler): self.ap.logger.error(f'RUNTIME_RECONCILE error: {e}', exc_info=True) return handler.ActionResponse.error(message=f'Runtime reconcile error: {e}') + @self.action(_plugin_runtime_action('RUN_STATS', 'run_stats')) + async def run_stats(data: dict[str, Any]) -> handler.ActionResponse: + """Get run statistics within a time window (admin-only).""" + run_id = data.get('run_id') + caller_plugin_identity = data.get('caller_plugin_identity') + is_admin = _has_agent_runner_admin_permission( + self.ap, + caller_plugin_identity, + AGENT_RUN_ADMIN_PERMISSION, + ) + + if not is_admin: + return handler.ActionResponse.error(message='Run stats access not authorized') + + _session, error = await _validate_agent_run_session( + run_id, + caller_plugin_identity, + self.ap, + 'Run stats', + api_capability='run_stats', + admin_permission=AGENT_RUN_ADMIN_PERMISSION, + ) + if error: + return error + + import time + end_time = data.get('end_time') or int(time.time()) + start_time = data.get('start_time') or (end_time - 3600) # Default: 1 hour + runner_id = data.get('runner_id') + + from ..agent.runner.run_ledger_store import RunLedgerStore + + store = RunLedgerStore(self.ap.persistence_mgr.get_db_engine()) + + try: + stats = await store.get_run_stats( + start_time=start_time, + end_time=end_time, + runner_id=runner_id, + ) + await _record_agent_runner_admin_action( + self.ap, + store, + action='run_stats', + caller_plugin_identity=caller_plugin_identity, + permission=AGENT_RUN_ADMIN_PERMISSION, + detail={ + 'start_time': start_time, + 'end_time': end_time, + 'runner_id': runner_id, + }, + ) + return handler.ActionResponse.success(data=stats) + except Exception as e: + self.ap.logger.error(f'RUN_STATS error: {e}', exc_info=True) + return handler.ActionResponse.error(message=f'Run stats error: {e}') + + @self.action(_plugin_runtime_action('RUNTIME_STATS', 'runtime_stats')) + async def runtime_stats(data: dict[str, Any]) -> handler.ActionResponse: + """Get runtime registry statistics (admin-only).""" + run_id = data.get('run_id') + caller_plugin_identity = data.get('caller_plugin_identity') + is_admin = _has_agent_runner_admin_permission( + self.ap, + caller_plugin_identity, + RUNTIME_ADMIN_PERMISSION, + ) + + if not is_admin: + return handler.ActionResponse.error(message='Runtime stats access not authorized') + + _session, error = await _validate_agent_run_session( + run_id, + caller_plugin_identity, + self.ap, + 'Runtime stats', + api_capability='runtime_stats', + admin_permission=RUNTIME_ADMIN_PERMISSION, + ) + if error: + return error + + from ..agent.runner.run_ledger_store import RunLedgerStore + + store = RunLedgerStore(self.ap.persistence_mgr.get_db_engine()) + + try: + stats = await store.get_runtime_stats() + await _record_agent_runner_admin_action( + self.ap, + store, + action='runtime_stats', + caller_plugin_identity=caller_plugin_identity, + permission=RUNTIME_ADMIN_PERMISSION, + detail={}, + ) + return handler.ActionResponse.success(data=stats) + except Exception as e: + self.ap.logger.error(f'RUNTIME_STATS error: {e}', exc_info=True) + return handler.ActionResponse.error(message=f'Runtime stats error: {e}') + + @self.action(_plugin_runtime_action('RUNNER_STATS', 'runner_stats')) + async def runner_stats(data: dict[str, Any]) -> handler.ActionResponse: + """Get runner-aggregated statistics (admin-only).""" + run_id = data.get('run_id') + caller_plugin_identity = data.get('caller_plugin_identity') + is_admin = _has_agent_runner_admin_permission( + self.ap, + caller_plugin_identity, + AGENT_RUN_ADMIN_PERMISSION, + ) + + if not is_admin: + return handler.ActionResponse.error(message='Runner stats access not authorized') + + _session, error = await _validate_agent_run_session( + run_id, + caller_plugin_identity, + self.ap, + 'Runner stats', + api_capability='runner_stats', + admin_permission=AGENT_RUN_ADMIN_PERMISSION, + ) + if error: + return error + + import time + end_time = data.get('end_time') or int(time.time()) + start_time = data.get('start_time') or (end_time - 3600) # Default: 1 hour + limit = min(int(data.get('limit', 50)), 100) + + from ..agent.runner.run_ledger_store import RunLedgerStore + + store = RunLedgerStore(self.ap.persistence_mgr.get_db_engine()) + + try: + stats = await store.get_runner_stats( + start_time=start_time, + end_time=end_time, + limit=limit, + ) + await _record_agent_runner_admin_action( + self.ap, + store, + action='runner_stats', + caller_plugin_identity=caller_plugin_identity, + permission=AGENT_RUN_ADMIN_PERMISSION, + detail={ + 'start_time': start_time, + 'end_time': end_time, + 'limit': limit, + }, + ) + return handler.ActionResponse.success(data={'items': stats, 'total_count': len(stats), 'has_more': False}) + except Exception as e: + self.ap.logger.error(f'RUNNER_STATS error: {e}', exc_info=True) + return handler.ActionResponse.error(message=f'Runner stats error: {e}') + @self.action(_plugin_runtime_action('RUN_CLAIM', 'run_claim')) async def run_claim(data: dict[str, Any]) -> handler.ActionResponse: """Claim one queued run for a runtime lease.""" diff --git a/tests/unit_tests/agent/test_run_ledger_store.py b/tests/unit_tests/agent/test_run_ledger_store.py index d8d51d176..f5692ee66 100644 --- a/tests/unit_tests/agent/test_run_ledger_store.py +++ b/tests/unit_tests/agent/test_run_ledger_store.py @@ -233,8 +233,9 @@ async def test_runtime_register_heartbeat_list_and_mark_stale(store): assert heartbeat is not None assert heartbeat['metadata'] == {'slot_count': 2, 'active_runs': 1} - runtimes = await store.list_runtimes(statuses=['online']) + runtimes, total_count = await store.list_runtimes(statuses=['online']) assert [runtime['runtime_id'] for runtime in runtimes] == ['runtime-a'] + assert total_count == 1 stale = await store.mark_stale_runtimes( now=datetime.datetime.now(UTC) + datetime.timedelta(seconds=31),