Назад к вопросам
Senior
8
/ * Нам нужно передать данные из некоторого источника некоторому потребителю. При этом источник отдает данные небольшими пачками (~ десятки записей), а потребитель оптимальнее работает с крупными батчами (~тысячи записей). Реальный пример - поставка данных из очередей типа Kafka в базу Clickhouse. Источник: - Условно бесконечный. - Источник никогда не возвращает более MaxItems записей за один вызов Next. - В рамках одной "сессии" (одного вызова функции Pipe) источник каждый раз возвращает новые данные на каждый вызов Next. - Однако, после перезапуска источник начнет с прошлой "подтвержденной" позиции, задаваемой cookie. Поэтому *каждое* значение cookie, которое вернул вызов Next, после сохранения данных в приемнике, должно быть фиксировано вызовом Commit, причем строго в той же последовательности, в которой их вернул Next Приемник: - Не может обработать более MaxItems за один раз. Базовый уровень: Требуется реализовать функцию func Pipe(p Producer, c Consumer) error которая читает данные из источника, группирует их в буфер размером не более MaxItems и сохраняет в приёмник, после чего фиксирует прогресс в источнике. Усложнение: Методы Next, Process и Commit связаны с сетевыми вызовами и могут работать довольно долго. Для ускорения процесса передачи нужно распараллелить процессы чтения, записи и подтверждения прогресса. Так, чтобы при вызове Process или Commit продолжалось чтение из источника и формирование нового буфера. * / const MaxItems = 9999 type Producer interface { // Next returns: // - batch of items to be processed // - cookie to be commited when processing is done // - error Next() (items []any, cookie int, err error) // Commit is used to mark data batch as processed Commit(cookie int) error } type Consumer interface { Process(items []any) error } func Pipe(p Producer, c Consumer) error { // TODO }
Компании, где спрашивали
Яндекс
Ответ от нейросети
sobes.tech AI
Для реализации функции Pipe с параллельной обработкой чтения, записи и подтверждения прогресса можно использовать каналы и горутины. Идея:
- Одна горутина читает данные из источника (Producer.Next), формируя батчи не более MaxItems.
- Вторая горутина обрабатывает батчи (Consumer.Process).
- Третья горутина подтверждает прогресс (Producer.Commit) строго в порядке получения cookie.
Для сохранения порядка подтверждения можно использовать очередь или канал с буфером, где cookie будут храниться в порядке поступления.
Пример реализации:
func Pipe(p Producer, c Consumer) error {
type batch struct {
items []any
cookie int
}
batches := make(chan batch, 10) // Канал для передачи батчей на обработку
commitCh := make(chan int, 10) // Канал для передачи cookie на подтверждение
errCh := make(chan error, 3) // Канал для ошибок
done := make(chan struct{}) // Канал для остановки
// Чтение данных
go func() {
defer close(batches)
for {
items, cookie, err := p.Next()
if err != nil {
errCh <- err
return
}
if len(items) == 0 {
// Источник пуст, можно завершать
return
}
// Формируем батчи не более MaxItems
for start := 0; start < len(items); start += MaxItems {
end := start + MaxItems
if end > len(items) {
end = len(items)
}
batches <- batch{items: items[start:end], cookie: cookie}
}
}
}()
// Обработка данных
go func() {
defer close(commitCh)
for b := range batches {
if err := c.Process(b.items); err != nil {
errCh <- err
return
}
commitCh <- b.cookie
}
}()
// Подтверждение
go func() {
for cookie := range commitCh {
if err := p.Commit(cookie); err != nil {
errCh <- err
return
}
}
close(done)
}()
select {
case err := <-errCh:
return err
case <-done:
return nil
}
}
В этом примере:
- Чтение и формирование батчей происходит в первой горутине.
- Обработка батчей — во второй.
- Подтверждение cookie — в третьей.
Каналы обеспечивают поток данных и порядок подтверждения. Ошибки передаются через errCh, что позволяет прервать работу при возникновении ошибки.
Такой подход позволяет параллельно читать, обрабатывать и подтверждать данные, повышая общую производительность.