Sobes.tech
Back to questions
Senior
6

// Написать асинхронный обработчик задач как библиотеку // Клиент передаёт на вход некоторый объект (Task) с данными для выполнения задачи, // в нашем примере будем использовать пустую структуру. // Обработчик одновременно может обрабатывать не более N задач, // и не более X задач могут быть поставлены в очередь на обработку. // Если нет места в очереди, сразу возвращаем клиенту ошибку. // Задача берется в обработку если имеются на это свободные обработчики. // Иммитируем длительность обработки через time.Sleep(5*time.Second). // Как только очередная задача выполнилась - берём следующую задачу из очереди. // Если в очереди пусто, ожидаем новых задач от клиентов. // Со звездочкой: дополнить структуру Task и получать статусы задач через отдельный запрос

Companies where asked
Cloud.ru

Answer from AI

sobes.tech AI

go package taskhandler

import ( "errors" "sync" "time" )

type Task struct { ID int Status string // "queued", "processing", "done", "error" }

type Handler struct { maxWorkers int maxQueue int queue chan *Task wg sync.WaitGroup mu sync.Mutex tasks map[int]*Task workerSem chan struct{} idCounter int }

func NewHandler(maxWorkers, maxQueue int) *Handler { h := &Handler{ maxWorkers: maxWorkers, maxQueue: maxQueue, queue: make(chan *Task, maxQueue), tasks: make(map[int]*Task), workerSem: make(chan struct{}, maxWorkers), } go h.dispatch() return h }

func (h *Handler) Submit(task *Task) (int, error) { h.mu.Lock() defer h.mu.Unlock()

if len(h.queue) >= h.maxQueue {
	return 0, errors.New("queue is full")
}
h.idCounter++
task.ID = h.idCounter
task.Status = "queued"
h.tasks[task.ID] = task
h.queue <- task
return task.ID, nil

}

func (h *Handler) dispatch() { for task := range h.queue { h.workerSem <- struct{}{} // acquire worker slot h.wg.Add(1) go h.process(task) } }

func (h *Handler) process(task *Task) { defer func() { <-h.workerSem // release worker slot h.wg.Done() }()

// Обновляем статус
h.mu.Lock()
task.Status = "processing"
h.mu.Unlock()

// Имитация длительной обработки
time.Sleep(5 * time.Second)

// Завершение задачи
h.mu.Lock()
task.Status = "done"
h.mu.Unlock()

}

func (h *Handler) Status(taskID int) (string, error) { h.mu.Lock() defer h.mu.Unlock() if task, ok := h.tasks[taskID]; ok { return task.Status, nil } return "", errors.New("task not found") }

func (h *Handler) Shutdown() { close(h.queue) h.wg.Wait() }


Этот код реализует асинхронный обработчик задач с ограничением на количество одновременно обрабатываемых задач (maxWorkers) и размер очереди (maxQueue). Если очередь заполнена, Submit возвращает ошибку. Статусы задач можно получить через метод Status по ID задачи.