Back to questions
Middle
7
/* Нам нужно передать данные из некоторого источника некоторому потребителю. При этом источник отдает данные небольшими пачками (~ десятки записей), а потребитель оптимальнее работает с крупными батчами Реальный пример - поставка данных из очередей типа Kafka в базу Clickhouse. Источник: - Условно бесконечный. - Источник никогда не возвращает более MaxItems записей за один вызов Next. - В рамках одной "сессии" (одного вызова функции Pipe) источник каждый раз возвращает новые данные на каждый вызов Next. - Однако, после перезапуска источник начнет с прошлой "подтвержденной" позиции, задаваемой cookie. Поэтому *каждое* значение cookie, которое вернул вызов Next, после сохранения данных в приемнике, должно быть фиксировано вызовом Commit, причем строго в той же последовательности, в которой их вернул Next Приемник: - Не может обработать более MaxItems за один раз. Базовый уровень: Требуется реализовать функцию func Pipe(p Producer, c Consumer) error которая читает данные из источника, группирует их в буфер размером не более MaxItems и сохраняет в приемник, после чего фиксирует прогресс в источнике. */ const MaxItems = 9999 type Producer interface { // Next returns: // - batch of items to be processed // - cookie to be commited when processing is done // - error Next() (items []any, cookie int, err error) // Commit is used to mark data batch as processed Commit(cookie int) error } type Consumer interface { Process(items []any) error } func Pipe(p Producer, c Consumer) error { var buf []any var cookies []int for { items, cookie, err := p.Next() if err != nil { return err } buf = append(buf, items...) cookies = append(cookies, cookie) if len(buf) >= MaxItems { if err := c.Process(buf); err != nil { return err } for _, c := range cookies { if err := p.Commit(c); err != nil { return err } } buf = buf[:0] cookies = nil } } if len(buf) > 0 { if err := c.Process(buf); err != nil { return err } for _, c := range cookies { if err := p.Commit(c); err != nil { return err } } } return nil }
Companies where asked
Яндекс
Answer from AI
sobes.tech AI
Ваша задача — реализовать функцию Pipe, которая читает данные из источника (Producer) небольшими пачками, буферизует их до размера не более MaxItems, передает в Consumer и затем фиксирует прогресс в Producer через Commit в правильном порядке.
Основные моменты:
- Источник возвращает данные пачками, максимум MaxItems за вызов Next.
- Нужно аккумулировать данные в буфере, не превышая MaxItems.
- После передачи данных в Consumer вызываем Commit для всех cookie в порядке получения.
- Источник бесконечен, поэтому цикл бесконечный.
В представленном коде есть ошибка: бесконечный цикл не прерывается, и код после цикла недостижим. Нужно предусмотреть условие выхода (например, когда Next возвращает пустой срез и нет ошибки).
Исправленный пример реализации:
func Pipe(p Producer, c Consumer) error {
var buf []any
var cookies []int
for {
items, cookie, err := p.Next()
if err != nil {
return err
}
// Если источник вернул пустой срез, считаем, что данных больше нет
if len(items) == 0 {
break
}
buf = append(buf, items...)
cookies = append(cookies, cookie)
// Если буфер достиг максимума, отправляем на обработку
if len(buf) >= MaxItems {
if err := c.Process(buf); err != nil {
return err
}
for _, c := range cookies {
if err := p.Commit(c); err != nil {
return err
}
}
buf = buf[:0]
cookies = nil
}
}
// Обработка оставшихся данных
if len(buf) > 0 {
if err := c.Process(buf); err != nil {
return err
}
for _, c := range cookies {
if err := p.Commit(c); err != nil {
return err
}
}
}
return nil
}
Таким образом, мы аккумулируем данные, не превышая MaxItems, и после обработки фиксируем cookie в порядке получения, что соответствует требованиям.