Files
LangBot/tests/unit_tests/agent/test_run_ledger_store.py
2026-06-15 18:09:05 +08:00

168 lines
5.6 KiB
Python

"""Tests for RunLedgerStore host primitives."""
from __future__ import annotations
import datetime
import pytest
import sqlalchemy
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import sessionmaker
from langbot.pkg.agent.runner.run_ledger_store import RunLedgerStore
from langbot.pkg.entity.persistence.agent_run import AgentRun
from langbot.pkg.entity.persistence.base import Base
UTC = datetime.timezone.utc
@pytest.fixture
async def db_engine(tmp_path):
db_path = tmp_path / 'run_ledger_store.db'
engine = create_async_engine(f'sqlite+aiosqlite:///{db_path}', echo=False)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield engine
await engine.dispose()
@pytest.fixture
def store(db_engine):
return RunLedgerStore(db_engine)
@pytest.mark.asyncio
async def test_create_queued_run_claim_renew_release(store):
run = await store.create_run(
run_id='run-queued',
event_id='evt-1',
binding_id='binding-1',
runner_id='runner-a',
status='queued',
queue_name='default',
priority=10,
requested_runtime_id='runtime-a',
)
assert run['status'] == 'queued'
assert run['started_at'] is None
assert run['queue_name'] == 'default'
assert run['priority'] == 10
assert run['requested_runtime_id'] == 'runtime-a'
assert await store.claim_next_run(runtime_id='runtime-b', queue_name='default') is None
claimed = await store.claim_next_run(runtime_id='runtime-a', queue_name='default', lease_seconds=30)
assert claimed is not None
assert claimed['run_id'] == 'run-queued'
assert claimed['status'] == 'claimed'
assert claimed['claimed_by_runtime_id'] == 'runtime-a'
assert claimed['claim_token']
assert claimed['dispatch_attempts'] == 1
assert claimed['claim_lease_expires_at'] is not None
assert claimed['last_claimed_at'] is not None
token = claimed['claim_token']
assert await store.renew_claim(run_id='run-queued', claim_token='wrong-token') is None
renewed = await store.renew_claim(run_id='run-queued', claim_token=token, lease_seconds=90)
assert renewed is not None
assert renewed['claim_token'] == token
assert renewed['claim_lease_expires_at'] >= claimed['claim_lease_expires_at']
released = await store.release_claim(
run_id='run-queued',
claim_token=token,
status='queued',
status_reason='runtime released capacity',
)
assert released is not None
assert released['status'] == 'queued'
assert released['status_reason'] == 'runtime released capacity'
assert released['claimed_by_runtime_id'] is None
assert released['claim_token'] is None
assert released['claim_lease_expires_at'] is None
assert released['dispatch_attempts'] == 1
@pytest.mark.asyncio
async def test_expired_claim_can_be_reclaimed(store, db_engine):
await store.create_run(
run_id='run-expired',
event_id='evt-2',
binding_id='binding-1',
runner_id='runner-a',
status='queued',
queue_name='default',
)
first_claim = await store.claim_next_run(runtime_id='runtime-a', queue_name='default', lease_seconds=60)
assert first_claim is not None
session_factory = sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False)
async with session_factory() as session:
await session.execute(
sqlalchemy.update(AgentRun)
.where(AgentRun.run_id == 'run-expired')
.values(claim_lease_expires_at=datetime.datetime.now(UTC) - datetime.timedelta(seconds=1))
)
await session.commit()
reclaimed = await store.claim_next_run(runtime_id='runtime-b', queue_name='default', lease_seconds=60)
assert reclaimed is not None
assert reclaimed['run_id'] == 'run-expired'
assert reclaimed['claimed_by_runtime_id'] == 'runtime-b'
assert reclaimed['claim_token'] != first_claim['claim_token']
assert reclaimed['dispatch_attempts'] == 2
@pytest.mark.asyncio
async def test_runtime_register_heartbeat_list_and_mark_stale(store):
registered = await store.register_runtime(
runtime_id='runtime-a',
display_name='Runtime A',
endpoint='http://runtime-a',
version='1.0.0',
capabilities={'stream': True},
labels={'region': 'test'},
metadata={'slot_count': 2},
heartbeat_deadline_seconds=30,
)
assert registered['runtime_id'] == 'runtime-a'
assert registered['status'] == 'online'
assert registered['display_name'] == 'Runtime A'
assert registered['capabilities'] == {'stream': True}
assert registered['labels'] == {'region': 'test'}
assert registered['metadata'] == {'slot_count': 2}
assert registered['last_heartbeat_at'] is not None
assert registered['heartbeat_deadline_at'] is not None
heartbeat = await store.heartbeat_runtime(
runtime_id='runtime-a',
metadata={'active_runs': 1},
heartbeat_deadline_seconds=30,
)
assert heartbeat is not None
assert heartbeat['metadata'] == {'slot_count': 2, 'active_runs': 1}
runtimes = await store.list_runtimes(statuses=['online'])
assert [runtime['runtime_id'] for runtime in runtimes] == ['runtime-a']
stale = await store.mark_stale_runtimes(
now=datetime.datetime.now(UTC) + datetime.timedelta(seconds=31),
)
assert [runtime['runtime_id'] for runtime in stale] == ['runtime-a']
assert stale[0]['status'] == 'stale'
assert (await store.get_runtime('runtime-a'))['status'] == 'stale'