Назад к задачам
Junior — Senior
5

Динамический асинхронный диспетчер задач

Получайте помощь с лайвкодингом в реальном времени с Sobes Copilot
Условие задачи

Необходимо построить асинхронный диспетчер, который позволяет в режиме реального времени добавлять новые корутины с указанием задержки их выполнения. Диспетчер должен правильно управлять запуском задач, фиксировать их окончание и предоставлять механизм ожидания завершения как уже запущенных, так и только что добавленных задач.

import asyncio
import random
from typing import Callable, Coroutine, Any

class AsyncTaskScheduler:
    async def add_task(self, task_func: Callable[[], Coroutine[Any, Any, Any]]) -> asyncio.Task:
        """
        Добавляет новую задачу в планировщик.

        :param task_func: Короутина, которая будет выполнена
        :return: Объект asyncio.Task
        """
        pass

    async def wait_for_all_tasks(self):
        """
        Ожидает завершения всех текущих и добавляемых задач.
        """
        pass

async def example_task(task_id: int, duration: float):
    """
    Пример задачи с переменной длительностью выполнения.

    :param task_id: Уникальный идентификатор задачи
    :param duration: Время выполнения задачи
    """
    print(f"Task {task_id} started")
    await asyncio.sleep(duration)
    print(f"Task {task_id} completed after {duration} seconds")
    return task_id

async def main():
    pass

if __name__ == "__main__":
    asyncio.run(main())