Sobes.tech
Back to questions
Senior
7

// Клиент передаёт на вход некоторый объект (Task) с данными для выполнения задачи, // в нашем примере будем использовать пустую структуру. // // Обработчик одновременно может обрабатывать не более N задач, // и не более X задач могут быть поставлены в очередь на обработку. // Если нет места в очереди, сразу возвращаем клиенту ошибку. // // Задача берется в обработку если имеются на это свободные обработчики. // Имитируем длительность обработки через time.Sleep(5*time.Second). // Как только очередная задача выполнилась – берём следующую задачу из очереди. // Если в очереди пусто, ожидаем новых задач от клиентов. // // Со звездочкой: дополнить структуру Task и получать статусы задач, через отдельный запрос package scheduler type Scheduler interface { AddTask(t Task) error Close() } type sheduler struct { queue chan Task wg sync.WaitGroup } func New(workers, queue, int) Scheduler { } type Task interface { Do() }

Companies where asked
Cloud.ru

Answer from AI

sobes.tech AI

Ниже пример реализации Scheduler в Go, который ограничивает количество одновременно обрабатываемых задач (workers) и размер очереди (queue). Если очередь заполнена, возвращается ошибка. Задачи обрабатываются параллельно, после завершения задачи берётся следующая из очереди.

package scheduler

import (
	"errors"
	"sync"
	"time"
)

type Task interface {
	Do()
}

type scheduler struct {
	queue   chan Task
	workers int
	wg      sync.WaitGroup
	quit    chan struct{}
}

func New(workers, queue int) Scheduler {
	s := &scheduler{
		queue:   make(chan Task, queue),
		workers: workers,
		quit:    make(chan struct{}),
	}
	for i := 0; i < workers; i++ {
		go s.worker()
	}
	return s
}

func (s *scheduler) worker() {
	for {
		select {
		case task := <-s.queue:
			s.wg.Add(1)
			task.Do()
			s.wg.Done()
		case <-s.quit:
			return
		}
	}
}

func (s *scheduler) AddTask(t Task) error {
	select {
	case s.queue <- t:
		return nil
	default:
		return errors.New("queue is full")
	}
}

func (s *scheduler) Close() {
	close(s.quit)
	// Ждём завершения всех задач
	s.wg.Wait()
}

// Пример реализации Task
// type MyTask struct{}
// func (t MyTask) Do() {
//     time.Sleep(5 * time.Second) // имитация работы
// }

Пояснения:

  • queue — буферизированный канал для задач, размером queue.
  • Если канал заполнен, AddTask возвращает ошибку.
  • workers — количество горутин-обработчиков.
  • Каждая горутина читает задачи из канала и выполняет Do().
  • Close останавливает обработчики и ждёт завершения всех задач.

Для реализации статусов задач можно расширить Task структурой с ID и статусом, а также добавить мапу для хранения статусов с синхронизацией.