Back to questions
Senior
7
// Клиент передаёт на вход некоторый объект (Task) с данными для выполнения задачи, // в нашем примере будем использовать пустую структуру. // // Обработчик одновременно может обрабатывать не более N задач, // и не более X задач могут быть поставлены в очередь на обработку. // Если нет места в очереди, сразу возвращаем клиенту ошибку. // // Задача берется в обработку если имеются на это свободные обработчики. // Имитируем длительность обработки через time.Sleep(5*time.Second). // Как только очередная задача выполнилась – берём следующую задачу из очереди. // Если в очереди пусто, ожидаем новых задач от клиентов. // // Со звездочкой: дополнить структуру Task и получать статусы задач, через отдельный запрос package scheduler type Scheduler interface { AddTask(t Task) error Close() } type sheduler struct { queue chan Task wg sync.WaitGroup } func New(workers, queue, int) Scheduler { } type Task interface { Do() }
Companies where asked
Cloud.ru
Answer from AI
sobes.tech AI
Ниже пример реализации Scheduler в Go, который ограничивает количество одновременно обрабатываемых задач (workers) и размер очереди (queue). Если очередь заполнена, возвращается ошибка. Задачи обрабатываются параллельно, после завершения задачи берётся следующая из очереди.
package scheduler
import (
"errors"
"sync"
"time"
)
type Task interface {
Do()
}
type scheduler struct {
queue chan Task
workers int
wg sync.WaitGroup
quit chan struct{}
}
func New(workers, queue int) Scheduler {
s := &scheduler{
queue: make(chan Task, queue),
workers: workers,
quit: make(chan struct{}),
}
for i := 0; i < workers; i++ {
go s.worker()
}
return s
}
func (s *scheduler) worker() {
for {
select {
case task := <-s.queue:
s.wg.Add(1)
task.Do()
s.wg.Done()
case <-s.quit:
return
}
}
}
func (s *scheduler) AddTask(t Task) error {
select {
case s.queue <- t:
return nil
default:
return errors.New("queue is full")
}
}
func (s *scheduler) Close() {
close(s.quit)
// Ждём завершения всех задач
s.wg.Wait()
}
// Пример реализации Task
// type MyTask struct{}
// func (t MyTask) Do() {
// time.Sleep(5 * time.Second) // имитация работы
// }
Пояснения:
queue— буферизированный канал для задач, размеромqueue.- Если канал заполнен,
AddTaskвозвращает ошибку. workers— количество горутин-обработчиков.- Каждая горутина читает задачи из канала и выполняет
Do(). Closeостанавливает обработчики и ждёт завершения всех задач.
Для реализации статусов задач можно расширить Task структурой с ID и статусом, а также добавить мапу для хранения статусов с синхронизацией.