Sobes.tech
Назад к вопросам
Senior
4

// Написать асинхронный обработчик задач как библиотеку // Клиент передаёт на вход некоторый объект (Task) с данными для выполнения задачи, // в нашем примере будем использовать пустую структуру. // Обработчик одновременно может обрабатывать не более N задач, // и не более X задач могут быть поставлены в очередь на обработку. // Если нет места в очереди, сразу возвращаем клиенту ошибку. // Задача берется в обработку если имеются на это свободные обработчики. // Иммитируем длительность обработки через time.Sleep(5*time.Second). // Как только очередная задача выполнилась - берём следующую задачу из очереди. // Если в очереди пусто, ожидаем новых задач от клиентов. // Со звездочкой: дополнить структуру Task и получать статусы задач, через отдельный запрос package scheduler type Scheduler interface { AddTask(t Task) error Close() } type Task interface { Do() }

Компании, где спрашивали
Cloud.ru

Ответ от нейросети

sobes.tech AI

go package scheduler

import ( "errors" "sync" "time" )

type Task interface { Do() }

type taskWrapper struct { task Task status string // "queued", "running", "done", "error" }

type Scheduler interface { AddTask(t Task) error Close() GetStatus(id int) (string, error) }

type scheduler struct { maxWorkers int maxQueue int queue chan *taskWrapper wg sync.WaitGroup closed bool mu sync.Mutex tasks map[int]*taskWrapper nextID int }

func NewScheduler(maxWorkers, maxQueue int) Scheduler { s := &scheduler{ maxWorkers: maxWorkers, maxQueue: maxQueue, queue: make(chan *taskWrapper, maxQueue), tasks: make(map[int]*taskWrapper), }

for i := 0; i < maxWorkers; i++ {
	go s.worker()
}

return s

}

func (s *scheduler) AddTask(t Task) error { s.mu.Lock() defer s.mu.Unlock()

if s.closed {
	return errors.New("scheduler is closed")
}

if len(s.queue) >= s.maxQueue {
	return errors.New("queue is full")
}

id := s.nextID
s.nextID++

tw := &taskWrapper{task: t, status: "queued"}
s.tasks[id] = tw
s.queue <- tw

return nil

}

func (s *scheduler) worker() { for tw := range s.queue { s.mu.Lock() tw.status = "running" s.mu.Unlock()

	// Имитация длительной обработки
	tw.task.Do()

	s.mu.Lock()
	tw.status = "done"
	s.mu.Unlock()
}

}

func (s *scheduler) Close() { s.mu.Lock() if s.closed { s.mu.Unlock() return } close(s.queue) s.closed = true s.mu.Unlock() }

func (s *scheduler) GetStatus(id int) (string, error) { s.mu.Lock() defer s.mu.Unlock()

tw, ok := s.tasks[id]
if !ok {
	return "", errors.New("task not found")
}
return tw.status, nil

}

// Пример реализации Task // type MyTask struct{} // func (t MyTask) Do() { // time.Sleep(5 * time.Second) // }


В этом примере:
- Scheduler ограничивает количество одновременно выполняемых задач (maxWorkers).
- Очередь задач ограничена по размеру (maxQueue).
- Если очередь полна, AddTask возвращает ошибку.
- Каждая задача имеет статус, который можно получить через GetStatus.
- Обработка задачи симулируется вызовом Do(), где можно использовать time.Sleep.
- Scheduler можно закрыть, после чего новые задачи не принимаются.