diff --git a/src/langbot/pkg/persistence/alembic/env.py b/src/langbot/pkg/persistence/alembic/env.py index ec76d8e9..2ac48ed0 100644 --- a/src/langbot/pkg/persistence/alembic/env.py +++ b/src/langbot/pkg/persistence/alembic/env.py @@ -16,23 +16,23 @@ from langbot.pkg.entity.persistence.base import Base # Import all ORM models so they are registered with Base.metadata # This is required for autogenerate to detect model changes from langbot.pkg.entity.persistence import ( - agent_runner_state, - apikey, - artifact, - bot, - bstorage, - event_log, - mcp, - metadata, - model, - monitoring, - pipeline, - plugin, - rag, - transcript, - user, - vector, - webhook, + agent_runner_state, # noqa: F401 + apikey, # noqa: F401 + artifact, # noqa: F401 + bot, # noqa: F401 + bstorage, # noqa: F401 + event_log, # noqa: F401 + mcp, # noqa: F401 + metadata, # noqa: F401 + model, # noqa: F401 + monitoring, # noqa: F401 + pipeline, # noqa: F401 + plugin, # noqa: F401 + rag, # noqa: F401 + transcript, # noqa: F401 + user, # noqa: F401 + vector, # noqa: F401 + webhook, # noqa: F401 ) target_metadata = Base.metadata diff --git a/src/langbot/pkg/persistence/alembic/versions/58846a8d7a81_add_event_log_and_transcript_tables.py b/src/langbot/pkg/persistence/alembic/versions/58846a8d7a81_add_event_log_and_transcript_tables.py index c26da0db..79644352 100644 --- a/src/langbot/pkg/persistence/alembic/versions/58846a8d7a81_add_event_log_and_transcript_tables.py +++ b/src/langbot/pkg/persistence/alembic/versions/58846a8d7a81_add_event_log_and_transcript_tables.py @@ -14,89 +14,111 @@ branch_labels = None depends_on = None +def _table_exists(table_name: str) -> bool: + return table_name in sa.inspect(op.get_bind()).get_table_names() + + +def _index_exists(table_name: str, index_name: str) -> bool: + return index_name in {index['name'] for index in sa.inspect(op.get_bind()).get_indexes(table_name)} + + +def _create_index_if_missing(table_name: str, index_name: str, columns: list[str], *, unique: bool = False) -> None: + if not _table_exists(table_name) or _index_exists(table_name, index_name): + return + with op.batch_alter_table(table_name, schema=None) as batch_op: + batch_op.create_index(index_name, columns, unique=unique) + + +def _drop_index_if_exists(table_name: str, index_name: str) -> None: + if not _table_exists(table_name) or not _index_exists(table_name, index_name): + return + with op.batch_alter_table(table_name, schema=None) as batch_op: + batch_op.drop_index(index_name) + + def upgrade() -> None: # Create event_log table - op.create_table( - 'event_log', - sa.Column('id', sa.Integer(), primary_key=True, autoincrement=True), - sa.Column('event_id', sa.String(255), nullable=False, unique=True), - sa.Column('event_type', sa.String(100), nullable=False), - sa.Column('event_time', sa.DateTime(), nullable=True), - sa.Column('source', sa.String(50), nullable=False), - sa.Column('bot_id', sa.String(255), nullable=True), - sa.Column('workspace_id', sa.String(255), nullable=True), - sa.Column('conversation_id', sa.String(255), nullable=True), - sa.Column('thread_id', sa.String(255), nullable=True), - sa.Column('actor_type', sa.String(50), nullable=True), - sa.Column('actor_id', sa.String(255), nullable=True), - sa.Column('actor_name', sa.String(255), nullable=True), - sa.Column('subject_type', sa.String(50), nullable=True), - sa.Column('subject_id', sa.String(255), nullable=True), - sa.Column('input_summary', sa.Text(), nullable=True), - sa.Column('input_json', sa.Text(), nullable=True), - sa.Column('raw_ref', sa.String(255), nullable=True), - sa.Column('run_id', sa.String(255), nullable=True), - sa.Column('runner_id', sa.String(255), nullable=True), - sa.Column('created_at', sa.DateTime(), nullable=False, server_default=sa.text('(CURRENT_TIMESTAMP)')), - sa.Column('metadata_json', sa.Text(), nullable=True), - ) + if not _table_exists('event_log'): + op.create_table( + 'event_log', + sa.Column('id', sa.Integer(), primary_key=True, autoincrement=True), + sa.Column('event_id', sa.String(255), nullable=False, unique=True), + sa.Column('event_type', sa.String(100), nullable=False), + sa.Column('event_time', sa.DateTime(), nullable=True), + sa.Column('source', sa.String(50), nullable=False), + sa.Column('bot_id', sa.String(255), nullable=True), + sa.Column('workspace_id', sa.String(255), nullable=True), + sa.Column('conversation_id', sa.String(255), nullable=True), + sa.Column('thread_id', sa.String(255), nullable=True), + sa.Column('actor_type', sa.String(50), nullable=True), + sa.Column('actor_id', sa.String(255), nullable=True), + sa.Column('actor_name', sa.String(255), nullable=True), + sa.Column('subject_type', sa.String(50), nullable=True), + sa.Column('subject_id', sa.String(255), nullable=True), + sa.Column('input_summary', sa.Text(), nullable=True), + sa.Column('input_json', sa.Text(), nullable=True), + sa.Column('raw_ref', sa.String(255), nullable=True), + sa.Column('run_id', sa.String(255), nullable=True), + sa.Column('runner_id', sa.String(255), nullable=True), + sa.Column('created_at', sa.DateTime(), nullable=False, server_default=sa.text('(CURRENT_TIMESTAMP)')), + sa.Column('metadata_json', sa.Text(), nullable=True), + ) # Create indexes for event_log - with op.batch_alter_table('event_log', schema=None) as batch_op: - batch_op.create_index('ix_event_log_event_id', ['event_id'], unique=True) - batch_op.create_index('ix_event_log_event_type', ['event_type'], unique=False) - batch_op.create_index('ix_event_log_bot_id', ['bot_id'], unique=False) - batch_op.create_index('ix_event_log_conversation_id', ['conversation_id'], unique=False) - batch_op.create_index('ix_event_log_run_id', ['run_id'], unique=False) + _create_index_if_missing('event_log', 'ix_event_log_event_id', ['event_id'], unique=True) + _create_index_if_missing('event_log', 'ix_event_log_event_type', ['event_type']) + _create_index_if_missing('event_log', 'ix_event_log_bot_id', ['bot_id']) + _create_index_if_missing('event_log', 'ix_event_log_conversation_id', ['conversation_id']) + _create_index_if_missing('event_log', 'ix_event_log_run_id', ['run_id']) # Create transcript table - op.create_table( - 'transcript', - sa.Column('id', sa.Integer(), primary_key=True, autoincrement=True), - sa.Column('transcript_id', sa.String(255), nullable=False, unique=True), - sa.Column('event_id', sa.String(255), nullable=False), - sa.Column('conversation_id', sa.String(255), nullable=False), - sa.Column('thread_id', sa.String(255), nullable=True), - sa.Column('role', sa.String(50), nullable=False), - sa.Column('item_type', sa.String(50), nullable=False, server_default='message'), - sa.Column('content', sa.Text(), nullable=True), - sa.Column('content_json', sa.Text(), nullable=True), - sa.Column('artifact_refs_json', sa.Text(), nullable=True), - sa.Column('seq', sa.Integer(), nullable=False), - sa.Column('run_id', sa.String(255), nullable=True), - sa.Column('runner_id', sa.String(255), nullable=True), - sa.Column('created_at', sa.DateTime(), nullable=False, server_default=sa.text('(CURRENT_TIMESTAMP)')), - sa.Column('metadata_json', sa.Text(), nullable=True), - ) + if not _table_exists('transcript'): + op.create_table( + 'transcript', + sa.Column('id', sa.Integer(), primary_key=True, autoincrement=True), + sa.Column('transcript_id', sa.String(255), nullable=False, unique=True), + sa.Column('event_id', sa.String(255), nullable=False), + sa.Column('conversation_id', sa.String(255), nullable=False), + sa.Column('thread_id', sa.String(255), nullable=True), + sa.Column('role', sa.String(50), nullable=False), + sa.Column('item_type', sa.String(50), nullable=False, server_default='message'), + sa.Column('content', sa.Text(), nullable=True), + sa.Column('content_json', sa.Text(), nullable=True), + sa.Column('artifact_refs_json', sa.Text(), nullable=True), + sa.Column('seq', sa.Integer(), nullable=False), + sa.Column('run_id', sa.String(255), nullable=True), + sa.Column('runner_id', sa.String(255), nullable=True), + sa.Column('created_at', sa.DateTime(), nullable=False, server_default=sa.text('(CURRENT_TIMESTAMP)')), + sa.Column('metadata_json', sa.Text(), nullable=True), + ) # Create indexes for transcript - with op.batch_alter_table('transcript', schema=None) as batch_op: - batch_op.create_index('ix_transcript_transcript_id', ['transcript_id'], unique=True) - batch_op.create_index('ix_transcript_event_id', ['event_id'], unique=False) - batch_op.create_index('ix_transcript_conversation_id', ['conversation_id'], unique=False) - batch_op.create_index('ix_transcript_conversation_seq', ['conversation_id', 'seq'], unique=False) - batch_op.create_index('ix_transcript_conversation_created', ['conversation_id', 'created_at'], unique=False) - batch_op.create_index('ix_transcript_run_id', ['run_id'], unique=False) + _create_index_if_missing('transcript', 'ix_transcript_transcript_id', ['transcript_id'], unique=True) + _create_index_if_missing('transcript', 'ix_transcript_event_id', ['event_id']) + _create_index_if_missing('transcript', 'ix_transcript_conversation_id', ['conversation_id']) + _create_index_if_missing('transcript', 'ix_transcript_conversation_seq', ['conversation_id', 'seq']) + _create_index_if_missing('transcript', 'ix_transcript_conversation_created', ['conversation_id', 'created_at']) + _create_index_if_missing('transcript', 'ix_transcript_run_id', ['run_id']) def downgrade() -> None: # Drop transcript table - with op.batch_alter_table('transcript', schema=None) as batch_op: - batch_op.drop_index('ix_transcript_run_id') - batch_op.drop_index('ix_transcript_conversation_created') - batch_op.drop_index('ix_transcript_conversation_seq') - batch_op.drop_index('ix_transcript_conversation_id') - batch_op.drop_index('ix_transcript_event_id') - batch_op.drop_index('ix_transcript_transcript_id') + _drop_index_if_exists('transcript', 'ix_transcript_run_id') + _drop_index_if_exists('transcript', 'ix_transcript_conversation_created') + _drop_index_if_exists('transcript', 'ix_transcript_conversation_seq') + _drop_index_if_exists('transcript', 'ix_transcript_conversation_id') + _drop_index_if_exists('transcript', 'ix_transcript_event_id') + _drop_index_if_exists('transcript', 'ix_transcript_transcript_id') - op.drop_table('transcript') + if _table_exists('transcript'): + op.drop_table('transcript') # Drop event_log table - with op.batch_alter_table('event_log', schema=None) as batch_op: - batch_op.drop_index('ix_event_log_run_id') - batch_op.drop_index('ix_event_log_conversation_id') - batch_op.drop_index('ix_event_log_bot_id') - batch_op.drop_index('ix_event_log_event_type') - batch_op.drop_index('ix_event_log_event_id') + _drop_index_if_exists('event_log', 'ix_event_log_run_id') + _drop_index_if_exists('event_log', 'ix_event_log_conversation_id') + _drop_index_if_exists('event_log', 'ix_event_log_bot_id') + _drop_index_if_exists('event_log', 'ix_event_log_event_type') + _drop_index_if_exists('event_log', 'ix_event_log_event_id') - op.drop_table('event_log') + if _table_exists('event_log'): + op.drop_table('event_log') diff --git a/src/langbot/pkg/persistence/alembic/versions/6dfd3dd7f0c7_add_agent_runner_state_table_for_host_.py b/src/langbot/pkg/persistence/alembic/versions/6dfd3dd7f0c7_add_agent_runner_state_table_for_host_.py index 06551664..dfe36c06 100644 --- a/src/langbot/pkg/persistence/alembic/versions/6dfd3dd7f0c7_add_agent_runner_state_table_for_host_.py +++ b/src/langbot/pkg/persistence/alembic/versions/6dfd3dd7f0c7_add_agent_runner_state_table_for_host_.py @@ -16,53 +16,79 @@ branch_labels = None depends_on = None +def _table_exists(table_name: str) -> bool: + return table_name in sa.inspect(op.get_bind()).get_table_names() + + +def _index_exists(table_name: str, index_name: str) -> bool: + return index_name in {index['name'] for index in sa.inspect(op.get_bind()).get_indexes(table_name)} + + +def _create_index_if_missing(table_name: str, index_name: str, columns: list[str], *, unique: bool = False) -> None: + if not _table_exists(table_name) or _index_exists(table_name, index_name): + return + with op.batch_alter_table(table_name, schema=None) as batch_op: + batch_op.create_index(index_name, columns, unique=unique) + + +def _drop_index_if_exists(table_name: str, index_name: str) -> None: + if not _table_exists(table_name) or not _index_exists(table_name, index_name): + return + with op.batch_alter_table(table_name, schema=None) as batch_op: + batch_op.drop_index(index_name) + + def upgrade() -> None: # ### commands auto generated by Alembic - please adjust! ### - op.create_table('agent_runner_state', - sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), - sa.Column('runner_id', sa.String(length=255), nullable=False), - sa.Column('binding_identity', sa.String(length=255), nullable=False), - sa.Column('scope', sa.String(length=50), nullable=False), - sa.Column('scope_key', sa.String(length=512), nullable=False), - sa.Column('state_key', sa.String(length=255), nullable=False), - sa.Column('value_json', sa.Text(), nullable=True), - sa.Column('bot_id', sa.String(length=255), nullable=True), - sa.Column('workspace_id', sa.String(length=255), nullable=True), - sa.Column('conversation_id', sa.String(length=255), nullable=True), - sa.Column('thread_id', sa.String(length=255), nullable=True), - sa.Column('actor_type', sa.String(length=50), nullable=True), - sa.Column('actor_id', sa.String(length=255), nullable=True), - sa.Column('subject_type', sa.String(length=50), nullable=True), - sa.Column('subject_id', sa.String(length=255), nullable=True), - sa.Column('created_at', sa.DateTime(), nullable=False), - sa.Column('updated_at', sa.DateTime(), nullable=False), - sa.PrimaryKeyConstraint('id'), - sa.UniqueConstraint('scope_key', 'state_key', name='uq_agent_runner_state_scope_key_state_key') + if not _table_exists('agent_runner_state'): + op.create_table('agent_runner_state', + sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), + sa.Column('runner_id', sa.String(length=255), nullable=False), + sa.Column('binding_identity', sa.String(length=255), nullable=False), + sa.Column('scope', sa.String(length=50), nullable=False), + sa.Column('scope_key', sa.String(length=512), nullable=False), + sa.Column('state_key', sa.String(length=255), nullable=False), + sa.Column('value_json', sa.Text(), nullable=True), + sa.Column('bot_id', sa.String(length=255), nullable=True), + sa.Column('workspace_id', sa.String(length=255), nullable=True), + sa.Column('conversation_id', sa.String(length=255), nullable=True), + sa.Column('thread_id', sa.String(length=255), nullable=True), + sa.Column('actor_type', sa.String(length=50), nullable=True), + sa.Column('actor_id', sa.String(length=255), nullable=True), + sa.Column('subject_type', sa.String(length=50), nullable=True), + sa.Column('subject_id', sa.String(length=255), nullable=True), + sa.Column('created_at', sa.DateTime(), nullable=False), + sa.Column('updated_at', sa.DateTime(), nullable=False), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('scope_key', 'state_key', name='uq_agent_runner_state_scope_key_state_key') + ) + _create_index_if_missing('agent_runner_state', 'ix_agent_runner_state_actor_id', ['actor_id']) + _create_index_if_missing('agent_runner_state', 'ix_agent_runner_state_binding_identity', ['binding_identity']) + _create_index_if_missing('agent_runner_state', 'ix_agent_runner_state_bot_id', ['bot_id']) + _create_index_if_missing('agent_runner_state', 'ix_agent_runner_state_conversation_id', ['conversation_id']) + _create_index_if_missing( + 'agent_runner_state', + 'ix_agent_runner_state_runner_binding', + ['runner_id', 'binding_identity'], ) - with op.batch_alter_table('agent_runner_state', schema=None) as batch_op: - batch_op.create_index(batch_op.f('ix_agent_runner_state_actor_id'), ['actor_id'], unique=False) - batch_op.create_index(batch_op.f('ix_agent_runner_state_binding_identity'), ['binding_identity'], unique=False) - batch_op.create_index(batch_op.f('ix_agent_runner_state_bot_id'), ['bot_id'], unique=False) - batch_op.create_index(batch_op.f('ix_agent_runner_state_conversation_id'), ['conversation_id'], unique=False) - batch_op.create_index('ix_agent_runner_state_runner_binding', ['runner_id', 'binding_identity'], unique=False) - batch_op.create_index(batch_op.f('ix_agent_runner_state_runner_id'), ['runner_id'], unique=False) - batch_op.create_index(batch_op.f('ix_agent_runner_state_scope'), ['scope'], unique=False) - batch_op.create_index(batch_op.f('ix_agent_runner_state_scope_key'), ['scope_key'], unique=False) + _create_index_if_missing('agent_runner_state', 'ix_agent_runner_state_runner_id', ['runner_id']) + _create_index_if_missing('agent_runner_state', 'ix_agent_runner_state_scope', ['scope']) + _create_index_if_missing('agent_runner_state', 'ix_agent_runner_state_scope_key', ['scope_key']) # ### end Alembic commands ### def downgrade() -> None: # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('agent_runner_state', schema=None) as batch_op: - batch_op.drop_index(batch_op.f('ix_agent_runner_state_scope_key')) - batch_op.drop_index(batch_op.f('ix_agent_runner_state_scope')) - batch_op.drop_index(batch_op.f('ix_agent_runner_state_runner_id')) - batch_op.drop_index('ix_agent_runner_state_runner_binding') - batch_op.drop_index(batch_op.f('ix_agent_runner_state_conversation_id')) - batch_op.drop_index(batch_op.f('ix_agent_runner_state_bot_id')) - batch_op.drop_index(batch_op.f('ix_agent_runner_state_binding_identity')) - batch_op.drop_index(batch_op.f('ix_agent_runner_state_actor_id')) + _drop_index_if_exists('agent_runner_state', 'ix_agent_runner_state_scope_key') + _drop_index_if_exists('agent_runner_state', 'ix_agent_runner_state_scope') + _drop_index_if_exists('agent_runner_state', 'ix_agent_runner_state_runner_id') + _drop_index_if_exists('agent_runner_state', 'ix_agent_runner_state_runner_binding') + _drop_index_if_exists('agent_runner_state', 'ix_agent_runner_state_conversation_id') + _drop_index_if_exists('agent_runner_state', 'ix_agent_runner_state_bot_id') + _drop_index_if_exists('agent_runner_state', 'ix_agent_runner_state_binding_identity') + _drop_index_if_exists('agent_runner_state', 'ix_agent_runner_state_actor_id') - op.drop_table('agent_runner_state') + if _table_exists('agent_runner_state'): + op.drop_table('agent_runner_state') # ### end Alembic commands ### diff --git a/src/langbot/pkg/persistence/alembic/versions/a1b2c3d4e5f6_add_agent_artifact_table.py b/src/langbot/pkg/persistence/alembic/versions/a1b2c3d4e5f6_add_agent_artifact_table.py index 244d3e45..527083a5 100644 --- a/src/langbot/pkg/persistence/alembic/versions/a1b2c3d4e5f6_add_agent_artifact_table.py +++ b/src/langbot/pkg/persistence/alembic/versions/a1b2c3d4e5f6_add_agent_artifact_table.py @@ -14,42 +14,64 @@ branch_labels = None depends_on = None +def _table_exists(table_name: str) -> bool: + return table_name in sa.inspect(op.get_bind()).get_table_names() + + +def _index_exists(table_name: str, index_name: str) -> bool: + return index_name in {index['name'] for index in sa.inspect(op.get_bind()).get_indexes(table_name)} + + +def _create_index_if_missing(table_name: str, index_name: str, columns: list[str], *, unique: bool = False) -> None: + if not _table_exists(table_name) or _index_exists(table_name, index_name): + return + with op.batch_alter_table(table_name, schema=None) as batch_op: + batch_op.create_index(index_name, columns, unique=unique) + + +def _drop_index_if_exists(table_name: str, index_name: str) -> None: + if not _table_exists(table_name) or not _index_exists(table_name, index_name): + return + with op.batch_alter_table(table_name, schema=None) as batch_op: + batch_op.drop_index(index_name) + + def upgrade() -> None: # Create agent_artifact table - op.create_table( - 'agent_artifact', - sa.Column('id', sa.Integer(), primary_key=True, autoincrement=True), - sa.Column('artifact_id', sa.String(255), nullable=False, unique=True), - sa.Column('artifact_type', sa.String(50), nullable=False), - sa.Column('mime_type', sa.String(255), nullable=True), - sa.Column('name', sa.String(255), nullable=True), - sa.Column('size_bytes', sa.BigInteger(), nullable=True), - sa.Column('sha256', sa.String(64), nullable=True), - sa.Column('source', sa.String(50), nullable=False), - sa.Column('storage_key', sa.String(255), nullable=True), - sa.Column('storage_type', sa.String(50), nullable=False, server_default='binary_storage'), - sa.Column('conversation_id', sa.String(255), nullable=True), - sa.Column('run_id', sa.String(255), nullable=True), - sa.Column('runner_id', sa.String(255), nullable=True), - sa.Column('bot_id', sa.String(255), nullable=True), - sa.Column('workspace_id', sa.String(255), nullable=True), - sa.Column('created_at', sa.DateTime(), nullable=False, server_default=sa.text('(CURRENT_TIMESTAMP)')), - sa.Column('expires_at', sa.DateTime(), nullable=True), - sa.Column('metadata_json', sa.Text(), nullable=True), - ) + if not _table_exists('agent_artifact'): + op.create_table( + 'agent_artifact', + sa.Column('id', sa.Integer(), primary_key=True, autoincrement=True), + sa.Column('artifact_id', sa.String(255), nullable=False, unique=True), + sa.Column('artifact_type', sa.String(50), nullable=False), + sa.Column('mime_type', sa.String(255), nullable=True), + sa.Column('name', sa.String(255), nullable=True), + sa.Column('size_bytes', sa.BigInteger(), nullable=True), + sa.Column('sha256', sa.String(64), nullable=True), + sa.Column('source', sa.String(50), nullable=False), + sa.Column('storage_key', sa.String(255), nullable=True), + sa.Column('storage_type', sa.String(50), nullable=False, server_default='binary_storage'), + sa.Column('conversation_id', sa.String(255), nullable=True), + sa.Column('run_id', sa.String(255), nullable=True), + sa.Column('runner_id', sa.String(255), nullable=True), + sa.Column('bot_id', sa.String(255), nullable=True), + sa.Column('workspace_id', sa.String(255), nullable=True), + sa.Column('created_at', sa.DateTime(), nullable=False, server_default=sa.text('(CURRENT_TIMESTAMP)')), + sa.Column('expires_at', sa.DateTime(), nullable=True), + sa.Column('metadata_json', sa.Text(), nullable=True), + ) # Create indexes for agent_artifact - with op.batch_alter_table('agent_artifact', schema=None) as batch_op: - batch_op.create_index('ix_agent_artifact_artifact_id', ['artifact_id'], unique=True) - batch_op.create_index('ix_agent_artifact_conversation_id', ['conversation_id'], unique=False) - batch_op.create_index('ix_agent_artifact_run_id', ['run_id'], unique=False) + _create_index_if_missing('agent_artifact', 'ix_agent_artifact_artifact_id', ['artifact_id'], unique=True) + _create_index_if_missing('agent_artifact', 'ix_agent_artifact_conversation_id', ['conversation_id']) + _create_index_if_missing('agent_artifact', 'ix_agent_artifact_run_id', ['run_id']) def downgrade() -> None: # Drop agent_artifact table - with op.batch_alter_table('agent_artifact', schema=None) as batch_op: - batch_op.drop_index('ix_agent_artifact_run_id') - batch_op.drop_index('ix_agent_artifact_conversation_id') - batch_op.drop_index('ix_agent_artifact_artifact_id') + _drop_index_if_exists('agent_artifact', 'ix_agent_artifact_run_id') + _drop_index_if_exists('agent_artifact', 'ix_agent_artifact_conversation_id') + _drop_index_if_exists('agent_artifact', 'ix_agent_artifact_artifact_id') - op.drop_table('agent_artifact') + if _table_exists('agent_artifact'): + op.drop_table('agent_artifact') diff --git a/tests/integration/persistence/test_migrations.py b/tests/integration/persistence/test_migrations.py index 944b4524..1eadfcc3 100644 --- a/tests/integration/persistence/test_migrations.py +++ b/tests/integration/persistence/test_migrations.py @@ -10,10 +10,12 @@ Run: uv run pytest tests/integration/persistence/test_migrations.py -q from __future__ import annotations import pytest +from alembic.script import ScriptDirectory from sqlalchemy.ext.asyncio import create_async_engine from langbot.pkg.entity.persistence.base import Base from langbot.pkg.persistence.alembic_runner import ( + _ALEMBIC_DIR, run_alembic_upgrade, run_alembic_stamp, get_alembic_current, @@ -38,6 +40,19 @@ async def sqlite_engine(sqlite_db_url): await engine.dispose() +def alembic_head_revision() -> str: + """Return the repository's current Alembic head revision.""" + return ScriptDirectory.from_config(_alembic_script_config()).get_current_head() + + +def _alembic_script_config(): + from alembic.config import Config + + cfg = Config() + cfg.set_main_option('script_location', _ALEMBIC_DIR) + return cfg + + class TestSQLiteMigrationBaseline: """Tests for baseline stamp workflow.""" @@ -103,8 +118,7 @@ class TestSQLiteMigrationUpgrade: # Verify revision rev = await get_alembic_current(sqlite_engine) assert rev is not None, "Expected a revision after upgrade" - # Head should be the latest migration - assert rev.startswith('0003'), f"Expected head to be 0003_*, got {rev}" + assert rev == alembic_head_revision() @pytest.mark.asyncio async def test_upgrade_idempotent(self, sqlite_engine): @@ -248,4 +262,4 @@ class TestSQLiteMigrationGetCurrent: await run_alembic_stamp(sqlite_engine, '0001_baseline') rev = await get_alembic_current(sqlite_engine) - assert rev == '0001_baseline' \ No newline at end of file + assert rev == '0001_baseline' diff --git a/tests/integration/pipeline/test_full_flow.py b/tests/integration/pipeline/test_full_flow.py index 08acce4c..8c6ad383 100644 --- a/tests/integration/pipeline/test_full_flow.py +++ b/tests/integration/pipeline/test_full_flow.py @@ -14,14 +14,14 @@ from __future__ import annotations import pytest import asyncio from unittest.mock import AsyncMock, Mock -import sys from tests.factories import FakeApp, text_query, mock_platform_adapter -from tests.factories.provider import FakeProvider from tests.factories.platform import FakePlatform +from tests.factories.message import text_chain pytestmark = pytest.mark.integration +DEFAULT_RUNNER_ID = 'plugin:langbot/local-agent/default' # ============== FIXTURE FOR SYS.MODULES ISOLATION ============== @@ -47,10 +47,6 @@ def mock_circular_import_chain(): # Mock core.app - Application class is referenced but not instantiated mock_core_app = Mock() - # Mock provider.runner with preregistered_runners list - mock_runner = Mock() - mock_runner.preregistered_runners = [] # Will be populated in tests - # Mock utils.importutil - prevents auto-import of runners mock_importutil = Mock() mock_importutil.import_modules_in_pkg = lambda pkg: None @@ -74,7 +70,7 @@ def mock_circular_import_chain(): mocks={ 'langbot.pkg.core.entities': mock_core_entities, 'langbot.pkg.core.app': mock_core_app, - 'langbot.pkg.provider.runner': mock_runner, + 'langbot.pkg.provider.runner': Mock(preregistered_runners=[]), 'langbot.pkg.utils.importutil': mock_importutil, 'langbot.pkg.pipeline.controller': Mock(), 'langbot.pkg.pipeline.pipelinemgr': Mock(), @@ -104,48 +100,23 @@ def mock_circular_import_chain(): # ============== FAKE RUNNER ============== class FakeRunner: - """Minimal fake runner class for pipeline integration tests. - - Note: preregistered_runners expects a CLASS, not an instance. - The handler calls runner_cls(self.ap, query.pipeline_config) to instantiate. - """ + """Minimal fake runner behavior for the orchestrator-backed pipeline tests.""" name = 'local-agent' - def __init__(self, app=None, config=None): - self.app = app - self.config = config or {} - self._provider = FakeProvider() - # Instance-level configuration set via class attribute - self._response_text = "fake response" - self._raise_error = None + def __init__(self, response_text: str = "fake response", error: Exception | None = None): + self._response_text = response_text + self._raise_error = error @classmethod - def returns(cls, text: str): - """Create a runner class configured to return specific text.""" - # We create a subclass with configured response - class ConfiguredRunner(cls): - name = cls.name - _response_text = text - _raise_error = None - - def __init__(self, app=None, config=None): - super().__init__(app, config) - self._response_text = text - return ConfiguredRunner + def returns(cls, text: str) -> "FakeRunner": + """Create a fake runner configured to return specific text.""" + return cls(response_text=text) @classmethod - def raises(cls, error: Exception): + def raises(cls, error: Exception) -> "FakeRunner": """Create a runner class configured to raise an error.""" - class ConfiguredRunner(cls): - name = cls.name - _response_text = None - _raise_error = error - - def __init__(self, app=None, config=None): - super().__init__(app, config) - self._raise_error = error - return ConfiguredRunner + return cls(error=error) async def run(self, query): """Run the fake provider and yield messages.""" @@ -159,6 +130,22 @@ class FakeRunner: yield Message(role='assistant', content=self._response_text) +class FakeAgentRunOrchestrator: + """Adapter that exposes FakeRunner through the current AgentRunOrchestrator surface.""" + + def __init__(self, runner: FakeRunner | None = None): + self.runner = runner or FakeRunner() + self.queries = [] + + async def run_from_query(self, query): + self.queries.append(query) + async for result in self.runner.run(query): + yield result + + def resolve_runner_id_for_telemetry(self, query): + return DEFAULT_RUNNER_ID + + # ============== PIPELINE APP FIXTURE ============== @pytest.fixture @@ -222,6 +209,7 @@ def pipeline_app(): # Survey mock app.survey = None + app.agent_run_orchestrator = FakeAgentRunOrchestrator() return app @@ -235,11 +223,10 @@ def fake_platform_adapter(): @pytest.fixture -def set_fake_runner(): - """Factory fixture to set a fake runner CLASS in preregistered_runners.""" - def _set_runner(runner_cls): - # preregistered_runners expects a list of runner classes - sys.modules['langbot.pkg.provider.runner'].preregistered_runners = [runner_cls] +def set_fake_runner(pipeline_app): + """Factory fixture to set fake runner behavior on the orchestrator surface.""" + def _set_runner(runner: FakeRunner): + pipeline_app.agent_run_orchestrator.runner = runner return _set_runner @@ -249,11 +236,13 @@ def create_minimal_pipeline_config(): """Create minimal pipeline configuration for tests.""" return { 'ai': { - 'runner': {'runner': 'local-agent', 'expire-time': None}, - 'local-agent': { - 'model': {'primary': 'test-model-uuid', 'fallbacks': []}, - 'prompt': 'default', - 'knowledge-bases': [], + 'runner': {'id': DEFAULT_RUNNER_ID, 'expire-time': None}, + 'runner_config': { + DEFAULT_RUNNER_ID: { + 'model': {'primary': 'test-model-uuid', 'fallbacks': []}, + 'prompt': [{'role': 'system', 'content': 'default'}], + 'knowledge-bases': [], + }, }, }, 'output': { @@ -396,7 +385,7 @@ class TestProcessorStage: adapter, platform = fake_platform_adapter # Set fake runner that returns pong - fake_runner = FakeRunner().returns("LANGBOT_FAKE_PONG") + fake_runner = FakeRunner.returns("LANGBOT_FAKE_PONG") set_fake_runner(fake_runner) # Create query @@ -502,7 +491,7 @@ class TestRunnerExceptionFlow: adapter, platform = fake_platform_adapter # Set fake runner that raises exception - fake_runner = FakeRunner().raises(ValueError("API Error: rate limit exceeded")) + fake_runner = FakeRunner.raises(ValueError("API Error: rate limit exceeded")) set_fake_runner(fake_runner) # Create query with exception handling config @@ -541,7 +530,7 @@ class TestRunnerExceptionFlow: adapter, platform = fake_platform_adapter # Set fake runner that raises specific exception - fake_runner = FakeRunner().raises(RuntimeError("Custom runtime error")) + fake_runner = FakeRunner.raises(RuntimeError("Custom runtime error")) set_fake_runner(fake_runner) # Create query with show-error mode @@ -578,7 +567,7 @@ class TestRunnerExceptionFlow: adapter, platform = fake_platform_adapter # Set fake runner that raises exception - fake_runner = FakeRunner().raises(Exception("Hidden error")) + fake_runner = FakeRunner.raises(Exception("Hidden error")) set_fake_runner(fake_runner) # Create query with hide mode @@ -666,7 +655,7 @@ class TestStageChainIntegration: adapter, platform = fake_platform_adapter # Set fake runner - fake_runner = FakeRunner().returns("LANGBOT_FAKE_PONG") + fake_runner = FakeRunner.returns("LANGBOT_FAKE_PONG") set_fake_runner(fake_runner) # Create query @@ -710,7 +699,6 @@ class TestStageChainIntegration: assert len(results) >= 1 # Build resp_message_chain from resp_messages - from tests.factories.message import text_chain for resp_msg in query.resp_messages: if resp_msg.content: query.resp_message_chain.append(text_chain(resp_msg.content)) @@ -775,4 +763,4 @@ class TestStageChainIntegration: assert results[0].result_type == entities.ResultType.INTERRUPT # Chain stops here - no resp_messages - assert len(query.resp_messages) == 0 \ No newline at end of file + assert len(query.resp_messages) == 0 diff --git a/tests/unit_tests/agent/test_orchestrator_artifact.py b/tests/unit_tests/agent/test_orchestrator_artifact.py index af46d468..bf0deefb 100644 --- a/tests/unit_tests/agent/test_orchestrator_artifact.py +++ b/tests/unit_tests/agent/test_orchestrator_artifact.py @@ -8,8 +8,7 @@ from langbot.pkg.agent.runner.orchestrator import ( AgentRunOrchestrator, MAX_ARTIFACT_INLINE_BYTES, ) -from langbot.pkg.agent.runner.descriptor import AgentRunnerDescriptor -from langbot.pkg.agent.runner.host_models import AgentEventEnvelope, AgentBinding +from langbot.pkg.agent.runner.host_models import AgentEventEnvelope from langbot.pkg.agent.runner.errors import RunnerProtocolError from langbot_plugin.api.entities.builtin.agent_runner.input import AgentInput from langbot_plugin.api.entities.builtin.agent_runner.event import ActorContext diff --git a/tests/unit_tests/agent/test_state_api_auth.py b/tests/unit_tests/agent/test_state_api_auth.py index 324f9f2c..4b92e070 100644 --- a/tests/unit_tests/agent/test_state_api_auth.py +++ b/tests/unit_tests/agent/test_state_api_auth.py @@ -20,14 +20,12 @@ Authorization rules: from __future__ import annotations import pytest -import asyncio -from unittest.mock import AsyncMock, MagicMock, patch +from unittest.mock import MagicMock, patch from sqlalchemy.ext.asyncio import create_async_engine -from langbot.pkg.agent.runner.session_registry import AgentRunSessionRegistry, get_session_registry +from langbot.pkg.agent.runner.session_registry import AgentRunSessionRegistry from langbot.pkg.agent.runner.persistent_state_store import PersistentStateStore, reset_persistent_state_store from langbot.pkg.plugin.handler import RuntimeConnectionHandler -from langbot_plugin.runtime.io.connection import Connection from langbot_plugin.entities.io.actions.enums import PluginToRuntimeAction # Import shared test fixtures @@ -72,7 +70,7 @@ async def persistent_store(db_engine): # Create the table from langbot.pkg.entity.persistence.agent_runner_state import AgentRunnerState - from sqlalchemy import text + async with db_engine.begin() as conn: await conn.run_sync(AgentRunnerState.__table__.create, checkfirst=True)