mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-05 05:16:03 +00:00
test(unit): improve taskmgr tests to test real classes
U-004 improved: Tests now import and test actual classes: - TaskContext: new(), trace(), to_dict(), placeholder() - TaskWrapper: task creation, context, exception/result capture, cancel, to_dict - AsyncTaskManager: create_task, create_user_task, cancel_task, cancel_by_scope - Task pruning behavior Uses pre-mocking technique: - Mock langbot.pkg.core.app before import (breaks circular chain) - Mock langbot.pkg.core.entities with proper Enum All 24 tests now test real class behavior, not patterns. taskmgr.py coverage should improve significantly. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -8,7 +8,7 @@ Tests cover async task lifecycle management:
|
||||
- Task cancellation
|
||||
- Multiple task isolation
|
||||
|
||||
Uses module mocking to break circular import chain.
|
||||
Uses module pre-mocking to break circular import chain.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -16,314 +16,509 @@ from __future__ import annotations
|
||||
import pytest
|
||||
import asyncio
|
||||
import sys
|
||||
import enum
|
||||
from unittest.mock import MagicMock
|
||||
from importlib import import_module
|
||||
|
||||
|
||||
# Break the circular import chain before importing taskmgr:
|
||||
# Pre-mock app module BEFORE importing taskmgr to break circular chain:
|
||||
# taskmgr → app → http_controller → groups/knowledge/migration → taskmgr (partial)
|
||||
_mock_app = MagicMock()
|
||||
_mock_app.AsyncTaskManager = object
|
||||
_mock_app.TaskWrapper = object
|
||||
_mock_app.TaskContext = object
|
||||
sys.modules.setdefault('langbot.pkg.core.app', _mock_app)
|
||||
class FakeMinimalApp:
|
||||
"""Minimal app that only provides event_loop."""
|
||||
|
||||
def __init__(self, event_loop):
|
||||
self.event_loop = event_loop
|
||||
self.instance_config = MagicMock()
|
||||
self.instance_config.data = {}
|
||||
|
||||
# Pre-register mock app module
|
||||
_mock_app_module = MagicMock()
|
||||
_mock_app_module.Application = FakeMinimalApp
|
||||
sys.modules['langbot.pkg.core.app'] = _mock_app_module
|
||||
|
||||
# Pre-register mock entities module - use proper Enum
|
||||
class LifecycleControlScope(enum.Enum):
|
||||
APPLICATION = 'application'
|
||||
PLATFORM = 'platform'
|
||||
PLUGIN = 'plugin'
|
||||
PROVIDER = 'provider'
|
||||
|
||||
_mock_entities_module = MagicMock()
|
||||
_mock_entities_module.LifecycleControlScope = LifecycleControlScope
|
||||
sys.modules['langbot.pkg.core.entities'] = _mock_entities_module
|
||||
|
||||
|
||||
def get_taskmgr():
|
||||
"""Import taskmgr with circular import workaround."""
|
||||
"""Import taskmgr after pre-mocking."""
|
||||
return import_module('langbot.pkg.core.taskmgr')
|
||||
|
||||
|
||||
def get_entities():
|
||||
"""Import entities."""
|
||||
return import_module('langbot.pkg.core.entities')
|
||||
"""Get pre-registered mock entities module."""
|
||||
return sys.modules['langbot.pkg.core.entities']
|
||||
|
||||
|
||||
class TestTaskContextBasic:
|
||||
"""Basic tests for TaskContext behavior."""
|
||||
class TestTaskContextReal:
|
||||
"""Tests for real TaskContext class (no circular import)."""
|
||||
|
||||
def test_task_context_trace_format(self):
|
||||
"""TaskContext trace should format log entries."""
|
||||
# Import TaskContext class definition directly from source
|
||||
import datetime
|
||||
@pytest.mark.asyncio
|
||||
async def test_task_context_new(self):
|
||||
"""TaskContext.new() creates instance."""
|
||||
taskmgr = get_taskmgr()
|
||||
|
||||
# Simulate TaskContext behavior without importing the module
|
||||
# (since the circular import breaks the class definition)
|
||||
ctx = taskmgr.TaskContext.new()
|
||||
|
||||
# We can test the logic inline
|
||||
log = ''
|
||||
current_action = 'default'
|
||||
assert ctx.current_action == 'default'
|
||||
assert ctx.log == ''
|
||||
assert ctx.metadata == {}
|
||||
|
||||
def trace(msg: str, action: str = None):
|
||||
nonlocal current_action, log
|
||||
if action is not None:
|
||||
current_action = action
|
||||
log += f'{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")} | {current_action} | {msg}\n'
|
||||
@pytest.mark.asyncio
|
||||
async def test_task_context_trace(self):
|
||||
"""TaskContext.trace adds formatted log."""
|
||||
taskmgr = get_taskmgr()
|
||||
|
||||
trace('test message', action='test_action')
|
||||
ctx = taskmgr.TaskContext.new()
|
||||
ctx.trace('test message', action='test_action')
|
||||
|
||||
assert current_action == 'test_action'
|
||||
assert 'test message' in log
|
||||
assert 'test_action' in log
|
||||
assert ctx.current_action == 'test_action'
|
||||
assert 'test message' in ctx.log
|
||||
assert 'test_action' in ctx.log
|
||||
# Contains timestamp format
|
||||
assert '|' in ctx.log
|
||||
|
||||
def test_task_context_to_dict_format(self):
|
||||
"""TaskContext to_dict should include expected fields."""
|
||||
# Expected fields based on source code
|
||||
expected_keys = ['current_action', 'log', 'metadata']
|
||||
@pytest.mark.asyncio
|
||||
async def test_task_context_multiple_traces(self):
|
||||
"""TaskContext accumulates multiple traces."""
|
||||
taskmgr = get_taskmgr()
|
||||
|
||||
# Simulate
|
||||
result = {
|
||||
'current_action': 'default',
|
||||
'log': 'test log',
|
||||
'metadata': {},
|
||||
}
|
||||
ctx = taskmgr.TaskContext.new()
|
||||
ctx.trace('first')
|
||||
ctx.trace('second')
|
||||
|
||||
for key in expected_keys:
|
||||
assert key in result
|
||||
assert 'first' in ctx.log
|
||||
assert 'second' in ctx.log
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_task_context_to_dict(self):
|
||||
"""TaskContext.to_dict returns all fields."""
|
||||
taskmgr = get_taskmgr()
|
||||
|
||||
ctx = taskmgr.TaskContext.new()
|
||||
ctx.trace('log entry')
|
||||
|
||||
result = ctx.to_dict()
|
||||
|
||||
assert 'current_action' in result
|
||||
assert 'log' in result
|
||||
assert 'metadata' in result
|
||||
assert result['log'] == ctx.log
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_task_context_set_current_action(self):
|
||||
"""set_current_action updates action."""
|
||||
taskmgr = get_taskmgr()
|
||||
|
||||
ctx = taskmgr.TaskContext.new()
|
||||
ctx.set_current_action('new_action')
|
||||
|
||||
assert ctx.current_action == 'new_action'
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_task_context_metadata(self):
|
||||
"""TaskContext metadata can be set."""
|
||||
taskmgr = get_taskmgr()
|
||||
|
||||
ctx = taskmgr.TaskContext.new()
|
||||
ctx.metadata['key'] = 'value'
|
||||
|
||||
assert ctx.metadata['key'] == 'value'
|
||||
assert ctx.to_dict()['metadata']['key'] == 'value'
|
||||
|
||||
def test_task_context_placeholder_singleton(self):
|
||||
"""placeholder returns same instance."""
|
||||
taskmgr = get_taskmgr()
|
||||
|
||||
ctx1 = taskmgr.TaskContext.placeholder()
|
||||
ctx2 = taskmgr.TaskContext.placeholder()
|
||||
|
||||
assert ctx1 is ctx2
|
||||
|
||||
|
||||
class TestTaskWrapperBehavior:
|
||||
"""Tests for TaskWrapper behavior patterns."""
|
||||
class TestTaskWrapperReal:
|
||||
"""Tests for real TaskWrapper class."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_task_wrapper_creates_task(self):
|
||||
"""TaskWrapper should create asyncio.Task."""
|
||||
# Test the pattern without importing
|
||||
async def coro():
|
||||
return 42
|
||||
"""TaskWrapper creates and wraps asyncio.Task."""
|
||||
taskmgr = get_taskmgr()
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
task = loop.create_task(coro())
|
||||
app = FakeMinimalApp(loop)
|
||||
|
||||
assert isinstance(task, asyncio.Task)
|
||||
async def simple_coro():
|
||||
return 42
|
||||
|
||||
result = await task
|
||||
wrapper = taskmgr.TaskWrapper(app, simple_coro(), name='test')
|
||||
|
||||
assert wrapper.name == 'test'
|
||||
assert wrapper.task is not None
|
||||
assert isinstance(wrapper.task, asyncio.Task)
|
||||
|
||||
result = await wrapper.task
|
||||
assert result == 42
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_task_exception_capture_pattern(self):
|
||||
"""Task exception can be captured via task.exception()."""
|
||||
async def failing():
|
||||
raise ValueError('error')
|
||||
async def test_task_wrapper_with_custom_context(self):
|
||||
"""TaskWrapper uses provided TaskContext."""
|
||||
taskmgr = get_taskmgr()
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
task = loop.create_task(failing())
|
||||
app = FakeMinimalApp(loop)
|
||||
|
||||
# Let it fail
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
# Capture exception
|
||||
try:
|
||||
exception = task.exception()
|
||||
assert isinstance(exception, ValueError)
|
||||
except asyncio.CancelledError:
|
||||
pass # Task was cancelled
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_task_cancel_pattern(self):
|
||||
"""Task can be cancelled."""
|
||||
async def long_running():
|
||||
await asyncio.sleep(10)
|
||||
return 'done'
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
task = loop.create_task(long_running())
|
||||
|
||||
task.cancel()
|
||||
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
assert task.cancelled() or task.done()
|
||||
|
||||
|
||||
class TestAsyncTaskManagerPatterns:
|
||||
"""Tests for AsyncTaskManager behavior patterns."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_task_tracking_pattern(self):
|
||||
"""Manager tracks created tasks."""
|
||||
# Simulate manager behavior pattern
|
||||
tasks = []
|
||||
ctx = taskmgr.TaskContext.new()
|
||||
ctx.set_current_action('custom')
|
||||
|
||||
async def coro():
|
||||
return 'result'
|
||||
return 'done'
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
wrapper = {'id': 0, 'name': 'test', 'task': loop.create_task(coro())}
|
||||
tasks.append(wrapper)
|
||||
wrapper = taskmgr.TaskWrapper(app, coro(), context=ctx)
|
||||
|
||||
assert wrapper in tasks
|
||||
assert len(tasks) == 1
|
||||
assert wrapper.task_context.current_action == 'custom'
|
||||
|
||||
await wrapper['task']
|
||||
await wrapper.task
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_multiple_tasks_isolated(self):
|
||||
"""Multiple tasks run independently."""
|
||||
results = []
|
||||
|
||||
async def task_a():
|
||||
await asyncio.sleep(0.01)
|
||||
results.append('a')
|
||||
|
||||
async def task_b():
|
||||
await asyncio.sleep(0.01)
|
||||
results.append('b')
|
||||
async def test_task_wrapper_exception_capture(self):
|
||||
"""TaskWrapper captures exception from failed task."""
|
||||
taskmgr = get_taskmgr()
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
task_a_inst = loop.create_task(task_a())
|
||||
task_b_inst = loop.create_task(task_b())
|
||||
app = FakeMinimalApp(loop)
|
||||
|
||||
await asyncio.gather(task_a_inst, task_b_inst)
|
||||
async def failing_coro():
|
||||
raise ValueError('test error')
|
||||
|
||||
assert 'a' in results
|
||||
assert 'b' in results
|
||||
wrapper = taskmgr.TaskWrapper(app, failing_coro())
|
||||
|
||||
# Let task complete with exception
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
exception = wrapper.assume_exception()
|
||||
assert exception is not None
|
||||
assert isinstance(exception, ValueError)
|
||||
assert 'test error' in str(exception)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cancel_by_id_pattern(self):
|
||||
"""Task can be cancelled by ID lookup."""
|
||||
tasks = []
|
||||
async def test_task_wrapper_result_capture(self):
|
||||
"""TaskWrapper captures result from completed task."""
|
||||
taskmgr = get_taskmgr()
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
app = FakeMinimalApp(loop)
|
||||
|
||||
async def coro():
|
||||
return 'result_value'
|
||||
|
||||
wrapper = taskmgr.TaskWrapper(app, coro())
|
||||
|
||||
await wrapper.task
|
||||
|
||||
result = wrapper.assume_result()
|
||||
assert result == 'result_value'
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_task_wrapper_cancel(self):
|
||||
"""TaskWrapper.cancel cancels the task."""
|
||||
taskmgr = get_taskmgr()
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
app = FakeMinimalApp(loop)
|
||||
|
||||
async def long_coro():
|
||||
await asyncio.sleep(10)
|
||||
return 'done'
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
wrapper = {'id': 1, 'task': loop.create_task(long_coro())}
|
||||
tasks.append(wrapper)
|
||||
wrapper = taskmgr.TaskWrapper(app, long_coro())
|
||||
|
||||
# Cancel by ID
|
||||
task_id = 1
|
||||
for w in tasks:
|
||||
if w['id'] == task_id:
|
||||
w['task'].cancel()
|
||||
wrapper.cancel()
|
||||
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
assert wrapper['task'].cancelled() or wrapper['task'].done()
|
||||
assert wrapper.task.cancelled() or wrapper.task.done()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stats_calculation_pattern(self):
|
||||
"""Stats count running/completed tasks."""
|
||||
tasks = []
|
||||
|
||||
async def quick():
|
||||
return 'done'
|
||||
async def test_task_wrapper_to_dict(self):
|
||||
"""TaskWrapper.to_dict serializes task info."""
|
||||
taskmgr = get_taskmgr()
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
for i in range(3):
|
||||
wrapper = {'id': i, 'task': loop.create_task(quick())}
|
||||
tasks.append(wrapper)
|
||||
await wrapper['task']
|
||||
app = FakeMinimalApp(loop)
|
||||
|
||||
completed = sum(1 for w in tasks if w['task'].done())
|
||||
async def coro():
|
||||
return 42
|
||||
|
||||
assert completed == 3
|
||||
wrapper = taskmgr.TaskWrapper(app, coro(), name='dict_test', label='Test')
|
||||
|
||||
await wrapper.task
|
||||
|
||||
result = wrapper.to_dict()
|
||||
|
||||
assert result['name'] == 'dict_test'
|
||||
assert result['label'] == 'Test'
|
||||
assert 'runtime' in result
|
||||
assert result['runtime']['done'] is True
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_pruning_completed_tasks(self):
|
||||
"""Completed tasks are pruned when over limit."""
|
||||
completed_limit = 5
|
||||
tasks = []
|
||||
|
||||
async def quick():
|
||||
return 'done'
|
||||
async def test_task_wrapper_id_increment(self):
|
||||
"""TaskWrapper IDs increment."""
|
||||
taskmgr = get_taskmgr()
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
for i in range(10):
|
||||
wrapper = {'id': i, 'task': loop.create_task(quick())}
|
||||
tasks.append(wrapper)
|
||||
await wrapper['task']
|
||||
app = FakeMinimalApp(loop)
|
||||
|
||||
# Prune
|
||||
completed = [w for w in tasks if w['task'].done()]
|
||||
overflow = len(completed) - completed_limit
|
||||
if overflow > 0:
|
||||
remove_ids = {w['id'] for w in completed[:overflow]}
|
||||
tasks = [w for w in tasks if w['id'] not in remove_ids]
|
||||
async def coro():
|
||||
return 1
|
||||
|
||||
remaining_completed = sum(1 for w in tasks if w['task'].done())
|
||||
assert remaining_completed <= completed_limit
|
||||
wrapper1 = taskmgr.TaskWrapper(app, coro())
|
||||
wrapper2 = taskmgr.TaskWrapper(app, coro())
|
||||
|
||||
assert wrapper2.id > wrapper1.id
|
||||
|
||||
|
||||
class TestScopeBasedCancellation:
|
||||
"""Tests for scope-based cancellation pattern."""
|
||||
class TestAsyncTaskManagerReal:
|
||||
"""Tests for real AsyncTaskManager class."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cancel_by_scope_pattern(self):
|
||||
"""Tasks can be filtered and cancelled by scope."""
|
||||
tasks = []
|
||||
async def test_manager_create_task(self):
|
||||
"""AsyncTaskManager creates and tracks tasks."""
|
||||
taskmgr = get_taskmgr()
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
app = FakeMinimalApp(loop)
|
||||
|
||||
manager = taskmgr.AsyncTaskManager(app)
|
||||
|
||||
async def coro():
|
||||
return 'result'
|
||||
|
||||
wrapper = manager.create_task(coro(), name='test')
|
||||
|
||||
assert wrapper in manager.tasks
|
||||
assert wrapper.name == 'test'
|
||||
|
||||
await wrapper.task
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_manager_create_user_task(self):
|
||||
"""create_user_task creates user-type task."""
|
||||
taskmgr = get_taskmgr()
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
app = FakeMinimalApp(loop)
|
||||
|
||||
manager = taskmgr.AsyncTaskManager(app)
|
||||
|
||||
async def coro():
|
||||
return 'user_result'
|
||||
|
||||
wrapper = manager.create_user_task(coro())
|
||||
|
||||
assert wrapper.task_type == 'user'
|
||||
|
||||
await wrapper.task
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_manager_multiple_tasks_isolated(self):
|
||||
"""Multiple tasks run independently."""
|
||||
taskmgr = get_taskmgr()
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
app = FakeMinimalApp(loop)
|
||||
|
||||
manager = taskmgr.AsyncTaskManager(app)
|
||||
|
||||
results = []
|
||||
|
||||
async def task_a():
|
||||
results.append('a')
|
||||
|
||||
async def task_b():
|
||||
results.append('b')
|
||||
|
||||
w1 = manager.create_task(task_a(), name='a')
|
||||
w2 = manager.create_task(task_b(), name='b')
|
||||
|
||||
await asyncio.gather(w1.task, w2.task)
|
||||
|
||||
assert 'a' in results
|
||||
assert 'b' in results
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_manager_get_task_by_id(self):
|
||||
"""get_task_by_id finds task."""
|
||||
taskmgr = get_taskmgr()
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
app = FakeMinimalApp(loop)
|
||||
|
||||
manager = taskmgr.AsyncTaskManager(app)
|
||||
|
||||
async def coro():
|
||||
return 1
|
||||
|
||||
wrapper = manager.create_task(coro())
|
||||
|
||||
found = manager.get_task_by_id(wrapper.id)
|
||||
assert found is wrapper
|
||||
|
||||
not_found = manager.get_task_by_id(99999)
|
||||
assert not_found is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_manager_cancel_task(self):
|
||||
"""cancel_task cancels specific task."""
|
||||
taskmgr = get_taskmgr()
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
app = FakeMinimalApp(loop)
|
||||
|
||||
manager = taskmgr.AsyncTaskManager(app)
|
||||
|
||||
async def long():
|
||||
await asyncio.sleep(10)
|
||||
return 'done'
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
wrapper = manager.create_task(long())
|
||||
|
||||
# Create tasks with different scopes
|
||||
platform_task = {
|
||||
'id': 1,
|
||||
'scopes': ['platform'],
|
||||
'task': loop.create_task(long()),
|
||||
}
|
||||
app_task = {
|
||||
'id': 2,
|
||||
'scopes': ['application'],
|
||||
'task': loop.create_task(long()),
|
||||
}
|
||||
tasks.extend([platform_task, app_task])
|
||||
|
||||
# Cancel platform scope
|
||||
scope = 'platform'
|
||||
for w in tasks:
|
||||
if not w['task'].done() and scope in w['scopes']:
|
||||
w['task'].cancel()
|
||||
manager.cancel_task(wrapper.id)
|
||||
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
assert platform_task['task'].cancelled() or platform_task['task'].done()
|
||||
# App task still running (pending)
|
||||
|
||||
|
||||
class TestTaskTypeFiltering:
|
||||
"""Tests for filtering tasks by type/kind."""
|
||||
assert wrapper.task.cancelled() or wrapper.task.done()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_filter_by_type_pattern(self):
|
||||
"""Tasks can be filtered by type."""
|
||||
tasks = [
|
||||
{'id': 1, 'task_type': 'system', 'kind': 'internal'},
|
||||
{'id': 2, 'task_type': 'user', 'kind': 'user_action'},
|
||||
{'id': 3, 'task_type': 'system', 'kind': 'maintenance'},
|
||||
]
|
||||
async def test_manager_cancel_by_scope(self):
|
||||
"""cancel_by_scope cancels matching scope tasks."""
|
||||
taskmgr = get_taskmgr()
|
||||
entities = get_entities()
|
||||
|
||||
system_tasks = [t for t in tasks if t['task_type'] == 'system']
|
||||
user_tasks = [t for t in tasks if t['task_type'] == 'user']
|
||||
loop = asyncio.get_running_loop()
|
||||
app = FakeMinimalApp(loop)
|
||||
|
||||
assert len(system_tasks) == 2
|
||||
assert len(user_tasks) == 1
|
||||
assert all(t['task_type'] == 'system' for t in system_tasks)
|
||||
manager = taskmgr.AsyncTaskManager(app)
|
||||
|
||||
async def long():
|
||||
await asyncio.sleep(10)
|
||||
|
||||
class TestWaitAllTasks:
|
||||
"""Tests for waiting for all tasks."""
|
||||
async def app_long():
|
||||
await asyncio.sleep(10)
|
||||
|
||||
# Create task with PLATFORM scope
|
||||
platform_wrapper = manager.create_task(
|
||||
long(),
|
||||
scopes=[entities.LifecycleControlScope.PLATFORM],
|
||||
)
|
||||
|
||||
# Create task with APPLICATION scope
|
||||
manager.create_task(
|
||||
app_long(),
|
||||
scopes=[entities.LifecycleControlScope.APPLICATION],
|
||||
)
|
||||
|
||||
manager.cancel_by_scope(entities.LifecycleControlScope.PLATFORM)
|
||||
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
# Platform task cancelled
|
||||
assert platform_wrapper.task.cancelled() or platform_wrapper.task.done()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_wait_all_pattern(self):
|
||||
"""Can wait for all tasks to complete."""
|
||||
tasks = []
|
||||
async def test_manager_get_stats(self):
|
||||
"""get_stats returns task counts."""
|
||||
taskmgr = get_taskmgr()
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
app = FakeMinimalApp(loop)
|
||||
|
||||
manager = taskmgr.AsyncTaskManager(app)
|
||||
|
||||
async def quick():
|
||||
return 1
|
||||
|
||||
for _ in range(3):
|
||||
w = manager.create_task(quick())
|
||||
await w.task
|
||||
|
||||
stats = manager.get_stats()
|
||||
|
||||
assert stats['total'] >= 3
|
||||
assert stats['completed'] >= 3
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_manager_get_tasks_dict(self):
|
||||
"""get_tasks_dict filters by type."""
|
||||
taskmgr = get_taskmgr()
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
app = FakeMinimalApp(loop)
|
||||
|
||||
manager = taskmgr.AsyncTaskManager(app)
|
||||
|
||||
async def coro():
|
||||
return 1
|
||||
|
||||
system_w = manager.create_task(coro(), task_type='system')
|
||||
user_w = manager.create_user_task(coro())
|
||||
|
||||
await asyncio.gather(system_w.task, user_w.task)
|
||||
|
||||
system_tasks = manager.get_tasks_dict(type='system')
|
||||
assert all(t['task_type'] == 'system' for t in system_tasks['tasks'])
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_manager_wait_all(self):
|
||||
"""wait_all waits for all tasks."""
|
||||
taskmgr = get_taskmgr()
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
app = FakeMinimalApp(loop)
|
||||
|
||||
manager = taskmgr.AsyncTaskManager(app)
|
||||
|
||||
async def delayed():
|
||||
await asyncio.sleep(0.05)
|
||||
return 'delayed'
|
||||
|
||||
for _ in range(3):
|
||||
manager.create_task(delayed())
|
||||
|
||||
await manager.wait_all()
|
||||
|
||||
stats = manager.get_stats()
|
||||
assert stats['running'] == 0
|
||||
|
||||
|
||||
class TestTaskPruningReal:
|
||||
"""Tests for real task pruning behavior."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_prune_completed_tasks(self):
|
||||
"""Completed tasks are pruned when exceeding limit."""
|
||||
taskmgr = get_taskmgr()
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
for i in range(3):
|
||||
wrapper = {'id': i, 'task': loop.create_task(delayed())}
|
||||
tasks.append(wrapper)
|
||||
app = FakeMinimalApp(loop)
|
||||
app.instance_config.data = {'system': {'task_retention': {'completed_limit': 3}}}
|
||||
|
||||
# Wait for all
|
||||
await asyncio.gather(*[w['task'] for w in tasks], return_exceptions=True)
|
||||
manager = taskmgr.AsyncTaskManager(app)
|
||||
|
||||
# All done
|
||||
assert all(w['task'].done() for w in tasks)
|
||||
async def quick():
|
||||
return 1
|
||||
|
||||
# Create more than limit
|
||||
for _ in range(5):
|
||||
w = manager.create_task(quick())
|
||||
await w.task
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
# Completed count should be <= limit
|
||||
completed = sum(1 for w in manager.tasks if w.task.done())
|
||||
assert completed <= 3
|
||||
Reference in New Issue
Block a user