// Написать асинхронный обработчик задач как библиотеку // Клиент передаёт на вход некоторый объект (Task) с данными для выполнения задачи, // в нашем примере будем использовать пустую структуру. // Обработчик одновременно может обрабатывать не более N задач, // и не более X задач могут быть поставлены в очередь на обработку. // Если нет места в очереди, сразу возвращаем клиенту ошибку. // Задача берется в обработку если имеются на это свободные обработчики. // Иммитируем длительность обработки через time.Sleep(5*time.Second). // Как только очередная задача выполнилась - берём следующую задачу из очереди. // Если в очереди пусто, ожидаем новых задач от клиентов. // Со звездочкой: дополнить структуру Task и получать статусы задач, через отдельный запрос package scheduler type Scheduler interface { AddTask(t Task) error Close() } type Task interface { Do() }
Ответ от нейросети
sobes.tech AI
go package scheduler
import ( "errors" "sync" "time" )
type Task interface { Do() }
type taskWrapper struct { task Task status string // "queued", "running", "done", "error" }
type Scheduler interface { AddTask(t Task) error Close() GetStatus(id int) (string, error) }
type scheduler struct { maxWorkers int maxQueue int queue chan *taskWrapper wg sync.WaitGroup closed bool mu sync.Mutex tasks map[int]*taskWrapper nextID int }
func NewScheduler(maxWorkers, maxQueue int) Scheduler { s := &scheduler{ maxWorkers: maxWorkers, maxQueue: maxQueue, queue: make(chan *taskWrapper, maxQueue), tasks: make(map[int]*taskWrapper), }
for i := 0; i < maxWorkers; i++ {
go s.worker()
}
return s
}
func (s *scheduler) AddTask(t Task) error { s.mu.Lock() defer s.mu.Unlock()
if s.closed {
return errors.New("scheduler is closed")
}
if len(s.queue) >= s.maxQueue {
return errors.New("queue is full")
}
id := s.nextID
s.nextID++
tw := &taskWrapper{task: t, status: "queued"}
s.tasks[id] = tw
s.queue <- tw
return nil
}
func (s *scheduler) worker() { for tw := range s.queue { s.mu.Lock() tw.status = "running" s.mu.Unlock()
// Имитация длительной обработки
tw.task.Do()
s.mu.Lock()
tw.status = "done"
s.mu.Unlock()
}
}
func (s *scheduler) Close() { s.mu.Lock() if s.closed { s.mu.Unlock() return } close(s.queue) s.closed = true s.mu.Unlock() }
func (s *scheduler) GetStatus(id int) (string, error) { s.mu.Lock() defer s.mu.Unlock()
tw, ok := s.tasks[id]
if !ok {
return "", errors.New("task not found")
}
return tw.status, nil
}
// Пример реализации Task // type MyTask struct{} // func (t MyTask) Do() { // time.Sleep(5 * time.Second) // }
В этом примере:
- Scheduler ограничивает количество одновременно выполняемых задач (maxWorkers).
- Очередь задач ограничена по размеру (maxQueue).
- Если очередь полна, AddTask возвращает ошибку.
- Каждая задача имеет статус, который можно получить через GetStatus.
- Обработка задачи симулируется вызовом Do(), где можно использовать time.Sleep.
- Scheduler можно закрыть, после чего новые задачи не принимаются.