Назад к задачамПолучайте помощь с лайвкодингом в реальном времени с Sobes Copilot
Junior — Senior
5
Динамический асинхронный диспетчер задач
Условие задачи
Необходимо построить асинхронный диспетчер, который позволяет в режиме реального времени добавлять новые корутины с указанием задержки их выполнения. Диспетчер должен правильно управлять запуском задач, фиксировать их окончание и предоставлять механизм ожидания завершения как уже запущенных, так и только что добавленных задач.
import asyncio
import random
from typing import Callable, Coroutine, Any
class AsyncTaskScheduler:
async def add_task(self, task_func: Callable[[], Coroutine[Any, Any, Any]]) -> asyncio.Task:
"""
Добавляет новую задачу в планировщик.
:param task_func: Короутина, которая будет выполнена
:return: Объект asyncio.Task
"""
pass
async def wait_for_all_tasks(self):
"""
Ожидает завершения всех текущих и добавляемых задач.
"""
pass
async def example_task(task_id: int, duration: float):
"""
Пример задачи с переменной длительностью выполнения.
:param task_id: Уникальный идентификатор задачи
:param duration: Время выполнения задачи
"""
print(f"Task {task_id} started")
await asyncio.sleep(duration)
print(f"Task {task_id} completed after {duration} seconds")
return task_id
async def main():
pass
if __name__ == "__main__":
asyncio.run(main())