diff --git a/tests/unit_tests/core/test_taskmgr.py b/tests/unit_tests/core/test_taskmgr.py index 6507efdb..f7be2548 100644 --- a/tests/unit_tests/core/test_taskmgr.py +++ b/tests/unit_tests/core/test_taskmgr.py @@ -8,7 +8,7 @@ Tests cover async task lifecycle management: - Task cancellation - Multiple task isolation -Uses module mocking to break circular import chain. +Uses module pre-mocking to break circular import chain. """ from __future__ import annotations @@ -16,314 +16,509 @@ from __future__ import annotations import pytest import asyncio import sys +import enum from unittest.mock import MagicMock from importlib import import_module -# Break the circular import chain before importing taskmgr: +# Pre-mock app module BEFORE importing taskmgr to break circular chain: # taskmgr → app → http_controller → groups/knowledge/migration → taskmgr (partial) -_mock_app = MagicMock() -_mock_app.AsyncTaskManager = object -_mock_app.TaskWrapper = object -_mock_app.TaskContext = object -sys.modules.setdefault('langbot.pkg.core.app', _mock_app) +class FakeMinimalApp: + """Minimal app that only provides event_loop.""" + + def __init__(self, event_loop): + self.event_loop = event_loop + self.instance_config = MagicMock() + self.instance_config.data = {} + +# Pre-register mock app module +_mock_app_module = MagicMock() +_mock_app_module.Application = FakeMinimalApp +sys.modules['langbot.pkg.core.app'] = _mock_app_module + +# Pre-register mock entities module - use proper Enum +class LifecycleControlScope(enum.Enum): + APPLICATION = 'application' + PLATFORM = 'platform' + PLUGIN = 'plugin' + PROVIDER = 'provider' + +_mock_entities_module = MagicMock() +_mock_entities_module.LifecycleControlScope = LifecycleControlScope +sys.modules['langbot.pkg.core.entities'] = _mock_entities_module def get_taskmgr(): - """Import taskmgr with circular import workaround.""" + """Import taskmgr after pre-mocking.""" return import_module('langbot.pkg.core.taskmgr') def get_entities(): - """Import entities.""" - return import_module('langbot.pkg.core.entities') + """Get pre-registered mock entities module.""" + return sys.modules['langbot.pkg.core.entities'] -class TestTaskContextBasic: - """Basic tests for TaskContext behavior.""" +class TestTaskContextReal: + """Tests for real TaskContext class (no circular import).""" - def test_task_context_trace_format(self): - """TaskContext trace should format log entries.""" - # Import TaskContext class definition directly from source - import datetime + @pytest.mark.asyncio + async def test_task_context_new(self): + """TaskContext.new() creates instance.""" + taskmgr = get_taskmgr() - # Simulate TaskContext behavior without importing the module - # (since the circular import breaks the class definition) + ctx = taskmgr.TaskContext.new() - # We can test the logic inline - log = '' - current_action = 'default' + assert ctx.current_action == 'default' + assert ctx.log == '' + assert ctx.metadata == {} - def trace(msg: str, action: str = None): - nonlocal current_action, log - if action is not None: - current_action = action - log += f'{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")} | {current_action} | {msg}\n' + @pytest.mark.asyncio + async def test_task_context_trace(self): + """TaskContext.trace adds formatted log.""" + taskmgr = get_taskmgr() - trace('test message', action='test_action') + ctx = taskmgr.TaskContext.new() + ctx.trace('test message', action='test_action') - assert current_action == 'test_action' - assert 'test message' in log - assert 'test_action' in log + assert ctx.current_action == 'test_action' + assert 'test message' in ctx.log + assert 'test_action' in ctx.log + # Contains timestamp format + assert '|' in ctx.log - def test_task_context_to_dict_format(self): - """TaskContext to_dict should include expected fields.""" - # Expected fields based on source code - expected_keys = ['current_action', 'log', 'metadata'] + @pytest.mark.asyncio + async def test_task_context_multiple_traces(self): + """TaskContext accumulates multiple traces.""" + taskmgr = get_taskmgr() - # Simulate - result = { - 'current_action': 'default', - 'log': 'test log', - 'metadata': {}, - } + ctx = taskmgr.TaskContext.new() + ctx.trace('first') + ctx.trace('second') - for key in expected_keys: - assert key in result + assert 'first' in ctx.log + assert 'second' in ctx.log + + @pytest.mark.asyncio + async def test_task_context_to_dict(self): + """TaskContext.to_dict returns all fields.""" + taskmgr = get_taskmgr() + + ctx = taskmgr.TaskContext.new() + ctx.trace('log entry') + + result = ctx.to_dict() + + assert 'current_action' in result + assert 'log' in result + assert 'metadata' in result + assert result['log'] == ctx.log + + @pytest.mark.asyncio + async def test_task_context_set_current_action(self): + """set_current_action updates action.""" + taskmgr = get_taskmgr() + + ctx = taskmgr.TaskContext.new() + ctx.set_current_action('new_action') + + assert ctx.current_action == 'new_action' + + @pytest.mark.asyncio + async def test_task_context_metadata(self): + """TaskContext metadata can be set.""" + taskmgr = get_taskmgr() + + ctx = taskmgr.TaskContext.new() + ctx.metadata['key'] = 'value' + + assert ctx.metadata['key'] == 'value' + assert ctx.to_dict()['metadata']['key'] == 'value' + + def test_task_context_placeholder_singleton(self): + """placeholder returns same instance.""" + taskmgr = get_taskmgr() + + ctx1 = taskmgr.TaskContext.placeholder() + ctx2 = taskmgr.TaskContext.placeholder() + + assert ctx1 is ctx2 -class TestTaskWrapperBehavior: - """Tests for TaskWrapper behavior patterns.""" +class TestTaskWrapperReal: + """Tests for real TaskWrapper class.""" @pytest.mark.asyncio async def test_task_wrapper_creates_task(self): - """TaskWrapper should create asyncio.Task.""" - # Test the pattern without importing - async def coro(): - return 42 + """TaskWrapper creates and wraps asyncio.Task.""" + taskmgr = get_taskmgr() loop = asyncio.get_running_loop() - task = loop.create_task(coro()) + app = FakeMinimalApp(loop) - assert isinstance(task, asyncio.Task) + async def simple_coro(): + return 42 - result = await task + wrapper = taskmgr.TaskWrapper(app, simple_coro(), name='test') + + assert wrapper.name == 'test' + assert wrapper.task is not None + assert isinstance(wrapper.task, asyncio.Task) + + result = await wrapper.task assert result == 42 @pytest.mark.asyncio - async def test_task_exception_capture_pattern(self): - """Task exception can be captured via task.exception().""" - async def failing(): - raise ValueError('error') + async def test_task_wrapper_with_custom_context(self): + """TaskWrapper uses provided TaskContext.""" + taskmgr = get_taskmgr() loop = asyncio.get_running_loop() - task = loop.create_task(failing()) + app = FakeMinimalApp(loop) - # Let it fail - await asyncio.sleep(0.01) - - # Capture exception - try: - exception = task.exception() - assert isinstance(exception, ValueError) - except asyncio.CancelledError: - pass # Task was cancelled - - @pytest.mark.asyncio - async def test_task_cancel_pattern(self): - """Task can be cancelled.""" - async def long_running(): - await asyncio.sleep(10) - return 'done' - - loop = asyncio.get_running_loop() - task = loop.create_task(long_running()) - - task.cancel() - - await asyncio.sleep(0.01) - - assert task.cancelled() or task.done() - - -class TestAsyncTaskManagerPatterns: - """Tests for AsyncTaskManager behavior patterns.""" - - @pytest.mark.asyncio - async def test_task_tracking_pattern(self): - """Manager tracks created tasks.""" - # Simulate manager behavior pattern - tasks = [] + ctx = taskmgr.TaskContext.new() + ctx.set_current_action('custom') async def coro(): - return 'result' + return 'done' - loop = asyncio.get_running_loop() - wrapper = {'id': 0, 'name': 'test', 'task': loop.create_task(coro())} - tasks.append(wrapper) + wrapper = taskmgr.TaskWrapper(app, coro(), context=ctx) - assert wrapper in tasks - assert len(tasks) == 1 + assert wrapper.task_context.current_action == 'custom' - await wrapper['task'] + await wrapper.task @pytest.mark.asyncio - async def test_multiple_tasks_isolated(self): - """Multiple tasks run independently.""" - results = [] - - async def task_a(): - await asyncio.sleep(0.01) - results.append('a') - - async def task_b(): - await asyncio.sleep(0.01) - results.append('b') + async def test_task_wrapper_exception_capture(self): + """TaskWrapper captures exception from failed task.""" + taskmgr = get_taskmgr() loop = asyncio.get_running_loop() - task_a_inst = loop.create_task(task_a()) - task_b_inst = loop.create_task(task_b()) + app = FakeMinimalApp(loop) - await asyncio.gather(task_a_inst, task_b_inst) + async def failing_coro(): + raise ValueError('test error') - assert 'a' in results - assert 'b' in results + wrapper = taskmgr.TaskWrapper(app, failing_coro()) + + # Let task complete with exception + await asyncio.sleep(0.01) + + exception = wrapper.assume_exception() + assert exception is not None + assert isinstance(exception, ValueError) + assert 'test error' in str(exception) @pytest.mark.asyncio - async def test_cancel_by_id_pattern(self): - """Task can be cancelled by ID lookup.""" - tasks = [] + async def test_task_wrapper_result_capture(self): + """TaskWrapper captures result from completed task.""" + taskmgr = get_taskmgr() + + loop = asyncio.get_running_loop() + app = FakeMinimalApp(loop) + + async def coro(): + return 'result_value' + + wrapper = taskmgr.TaskWrapper(app, coro()) + + await wrapper.task + + result = wrapper.assume_result() + assert result == 'result_value' + + @pytest.mark.asyncio + async def test_task_wrapper_cancel(self): + """TaskWrapper.cancel cancels the task.""" + taskmgr = get_taskmgr() + + loop = asyncio.get_running_loop() + app = FakeMinimalApp(loop) async def long_coro(): await asyncio.sleep(10) return 'done' - loop = asyncio.get_running_loop() - wrapper = {'id': 1, 'task': loop.create_task(long_coro())} - tasks.append(wrapper) + wrapper = taskmgr.TaskWrapper(app, long_coro()) - # Cancel by ID - task_id = 1 - for w in tasks: - if w['id'] == task_id: - w['task'].cancel() + wrapper.cancel() await asyncio.sleep(0.01) - assert wrapper['task'].cancelled() or wrapper['task'].done() + assert wrapper.task.cancelled() or wrapper.task.done() @pytest.mark.asyncio - async def test_stats_calculation_pattern(self): - """Stats count running/completed tasks.""" - tasks = [] - - async def quick(): - return 'done' + async def test_task_wrapper_to_dict(self): + """TaskWrapper.to_dict serializes task info.""" + taskmgr = get_taskmgr() loop = asyncio.get_running_loop() - for i in range(3): - wrapper = {'id': i, 'task': loop.create_task(quick())} - tasks.append(wrapper) - await wrapper['task'] + app = FakeMinimalApp(loop) - completed = sum(1 for w in tasks if w['task'].done()) + async def coro(): + return 42 - assert completed == 3 + wrapper = taskmgr.TaskWrapper(app, coro(), name='dict_test', label='Test') + + await wrapper.task + + result = wrapper.to_dict() + + assert result['name'] == 'dict_test' + assert result['label'] == 'Test' + assert 'runtime' in result + assert result['runtime']['done'] is True @pytest.mark.asyncio - async def test_pruning_completed_tasks(self): - """Completed tasks are pruned when over limit.""" - completed_limit = 5 - tasks = [] - - async def quick(): - return 'done' + async def test_task_wrapper_id_increment(self): + """TaskWrapper IDs increment.""" + taskmgr = get_taskmgr() loop = asyncio.get_running_loop() - for i in range(10): - wrapper = {'id': i, 'task': loop.create_task(quick())} - tasks.append(wrapper) - await wrapper['task'] + app = FakeMinimalApp(loop) - # Prune - completed = [w for w in tasks if w['task'].done()] - overflow = len(completed) - completed_limit - if overflow > 0: - remove_ids = {w['id'] for w in completed[:overflow]} - tasks = [w for w in tasks if w['id'] not in remove_ids] + async def coro(): + return 1 - remaining_completed = sum(1 for w in tasks if w['task'].done()) - assert remaining_completed <= completed_limit + wrapper1 = taskmgr.TaskWrapper(app, coro()) + wrapper2 = taskmgr.TaskWrapper(app, coro()) + + assert wrapper2.id > wrapper1.id -class TestScopeBasedCancellation: - """Tests for scope-based cancellation pattern.""" +class TestAsyncTaskManagerReal: + """Tests for real AsyncTaskManager class.""" @pytest.mark.asyncio - async def test_cancel_by_scope_pattern(self): - """Tasks can be filtered and cancelled by scope.""" - tasks = [] + async def test_manager_create_task(self): + """AsyncTaskManager creates and tracks tasks.""" + taskmgr = get_taskmgr() + + loop = asyncio.get_running_loop() + app = FakeMinimalApp(loop) + + manager = taskmgr.AsyncTaskManager(app) + + async def coro(): + return 'result' + + wrapper = manager.create_task(coro(), name='test') + + assert wrapper in manager.tasks + assert wrapper.name == 'test' + + await wrapper.task + + @pytest.mark.asyncio + async def test_manager_create_user_task(self): + """create_user_task creates user-type task.""" + taskmgr = get_taskmgr() + + loop = asyncio.get_running_loop() + app = FakeMinimalApp(loop) + + manager = taskmgr.AsyncTaskManager(app) + + async def coro(): + return 'user_result' + + wrapper = manager.create_user_task(coro()) + + assert wrapper.task_type == 'user' + + await wrapper.task + + @pytest.mark.asyncio + async def test_manager_multiple_tasks_isolated(self): + """Multiple tasks run independently.""" + taskmgr = get_taskmgr() + + loop = asyncio.get_running_loop() + app = FakeMinimalApp(loop) + + manager = taskmgr.AsyncTaskManager(app) + + results = [] + + async def task_a(): + results.append('a') + + async def task_b(): + results.append('b') + + w1 = manager.create_task(task_a(), name='a') + w2 = manager.create_task(task_b(), name='b') + + await asyncio.gather(w1.task, w2.task) + + assert 'a' in results + assert 'b' in results + + @pytest.mark.asyncio + async def test_manager_get_task_by_id(self): + """get_task_by_id finds task.""" + taskmgr = get_taskmgr() + + loop = asyncio.get_running_loop() + app = FakeMinimalApp(loop) + + manager = taskmgr.AsyncTaskManager(app) + + async def coro(): + return 1 + + wrapper = manager.create_task(coro()) + + found = manager.get_task_by_id(wrapper.id) + assert found is wrapper + + not_found = manager.get_task_by_id(99999) + assert not_found is None + + @pytest.mark.asyncio + async def test_manager_cancel_task(self): + """cancel_task cancels specific task.""" + taskmgr = get_taskmgr() + + loop = asyncio.get_running_loop() + app = FakeMinimalApp(loop) + + manager = taskmgr.AsyncTaskManager(app) async def long(): await asyncio.sleep(10) - return 'done' - loop = asyncio.get_running_loop() + wrapper = manager.create_task(long()) - # Create tasks with different scopes - platform_task = { - 'id': 1, - 'scopes': ['platform'], - 'task': loop.create_task(long()), - } - app_task = { - 'id': 2, - 'scopes': ['application'], - 'task': loop.create_task(long()), - } - tasks.extend([platform_task, app_task]) - - # Cancel platform scope - scope = 'platform' - for w in tasks: - if not w['task'].done() and scope in w['scopes']: - w['task'].cancel() + manager.cancel_task(wrapper.id) await asyncio.sleep(0.01) - assert platform_task['task'].cancelled() or platform_task['task'].done() - # App task still running (pending) - - -class TestTaskTypeFiltering: - """Tests for filtering tasks by type/kind.""" + assert wrapper.task.cancelled() or wrapper.task.done() @pytest.mark.asyncio - async def test_filter_by_type_pattern(self): - """Tasks can be filtered by type.""" - tasks = [ - {'id': 1, 'task_type': 'system', 'kind': 'internal'}, - {'id': 2, 'task_type': 'user', 'kind': 'user_action'}, - {'id': 3, 'task_type': 'system', 'kind': 'maintenance'}, - ] + async def test_manager_cancel_by_scope(self): + """cancel_by_scope cancels matching scope tasks.""" + taskmgr = get_taskmgr() + entities = get_entities() - system_tasks = [t for t in tasks if t['task_type'] == 'system'] - user_tasks = [t for t in tasks if t['task_type'] == 'user'] + loop = asyncio.get_running_loop() + app = FakeMinimalApp(loop) - assert len(system_tasks) == 2 - assert len(user_tasks) == 1 - assert all(t['task_type'] == 'system' for t in system_tasks) + manager = taskmgr.AsyncTaskManager(app) + async def long(): + await asyncio.sleep(10) -class TestWaitAllTasks: - """Tests for waiting for all tasks.""" + async def app_long(): + await asyncio.sleep(10) + + # Create task with PLATFORM scope + platform_wrapper = manager.create_task( + long(), + scopes=[entities.LifecycleControlScope.PLATFORM], + ) + + # Create task with APPLICATION scope + manager.create_task( + app_long(), + scopes=[entities.LifecycleControlScope.APPLICATION], + ) + + manager.cancel_by_scope(entities.LifecycleControlScope.PLATFORM) + + await asyncio.sleep(0.01) + + # Platform task cancelled + assert platform_wrapper.task.cancelled() or platform_wrapper.task.done() @pytest.mark.asyncio - async def test_wait_all_pattern(self): - """Can wait for all tasks to complete.""" - tasks = [] + async def test_manager_get_stats(self): + """get_stats returns task counts.""" + taskmgr = get_taskmgr() + + loop = asyncio.get_running_loop() + app = FakeMinimalApp(loop) + + manager = taskmgr.AsyncTaskManager(app) + + async def quick(): + return 1 + + for _ in range(3): + w = manager.create_task(quick()) + await w.task + + stats = manager.get_stats() + + assert stats['total'] >= 3 + assert stats['completed'] >= 3 + + @pytest.mark.asyncio + async def test_manager_get_tasks_dict(self): + """get_tasks_dict filters by type.""" + taskmgr = get_taskmgr() + + loop = asyncio.get_running_loop() + app = FakeMinimalApp(loop) + + manager = taskmgr.AsyncTaskManager(app) + + async def coro(): + return 1 + + system_w = manager.create_task(coro(), task_type='system') + user_w = manager.create_user_task(coro()) + + await asyncio.gather(system_w.task, user_w.task) + + system_tasks = manager.get_tasks_dict(type='system') + assert all(t['task_type'] == 'system' for t in system_tasks['tasks']) + + @pytest.mark.asyncio + async def test_manager_wait_all(self): + """wait_all waits for all tasks.""" + taskmgr = get_taskmgr() + + loop = asyncio.get_running_loop() + app = FakeMinimalApp(loop) + + manager = taskmgr.AsyncTaskManager(app) async def delayed(): await asyncio.sleep(0.05) - return 'delayed' + + for _ in range(3): + manager.create_task(delayed()) + + await manager.wait_all() + + stats = manager.get_stats() + assert stats['running'] == 0 + + +class TestTaskPruningReal: + """Tests for real task pruning behavior.""" + + @pytest.mark.asyncio + async def test_prune_completed_tasks(self): + """Completed tasks are pruned when exceeding limit.""" + taskmgr = get_taskmgr() loop = asyncio.get_running_loop() - for i in range(3): - wrapper = {'id': i, 'task': loop.create_task(delayed())} - tasks.append(wrapper) + app = FakeMinimalApp(loop) + app.instance_config.data = {'system': {'task_retention': {'completed_limit': 3}}} - # Wait for all - await asyncio.gather(*[w['task'] for w in tasks], return_exceptions=True) + manager = taskmgr.AsyncTaskManager(app) - # All done - assert all(w['task'].done() for w in tasks) \ No newline at end of file + async def quick(): + return 1 + + # Create more than limit + for _ in range(5): + w = manager.create_task(quick()) + await w.task + await asyncio.sleep(0.01) + + # Completed count should be <= limit + completed = sum(1 for w in manager.tasks if w.task.done()) + assert completed <= 3 \ No newline at end of file