Sobes.tech
Назад к вопросам
Middle
8

/* Нам нужно передать данные из некоторого источника некоторому потребителю. При этом источник отдает данные небольшими пачками (~ десятки записей), а потребитель оптимальнее работает с крупными батчами Реальный пример - поставка данных из очередей типа Kafka в базу Clickhouse. Источник: - Условно бесконечный. - Источник никогда не возвращает более MaxItems записей за один вызов Next. - В рамках одной "сессии" (одного вызова функции Pipe) источник каждый раз возвращает новые данные на каждый вызов Next. - Однако, после перезапуска источник начнет с прошлой "подтвержденной" позиции, задаваемой cookie. Поэтому *каждое* значение cookie, которое вернул вызов Next, после сохранения данных в приемнике, должно быть фиксировано вызовом Commit, причем строго в той же последовательности, в которой их вернул Next Приемник: - Не может обработать более MaxItems за один раз. Базовый уровень: Требуется реализовать функцию func Pipe(p Producer, c Consumer) error которая читает данные из источника, группирует их в буфер размером не более MaxItems и сохраняет в приемник, после чего фиксирует прогресс в источнике. */ const MaxItems = 9999 type Producer interface { // Next returns: // - batch of items to be processed // - cookie to be commited when processing is done // - error Next() (items []any, cookie int, err error) // Commit is used to mark data batch as processed Commit(cookie int) error } type Consumer interface { Process(items []any) error } func Pipe(p Producer, c Consumer) error { var buf []any var cookies []int for { items, cookie, err := p.Next() if err != nil { return err } buf = append(buf, items...) cookies = append(cookies, cookie) if len(buf) >= MaxItems { if err := c.Process(buf); err != nil { return err } for _, c := range cookies { if err := p.Commit(c); err != nil { return err } } buf = buf[:0] cookies = nil } } if len(buf) > 0 { if err := c.Process(buf); err != nil { return err } for _, c := range cookies { if err := p.Commit(c); err != nil { return err } } } return nil }

Компании, где спрашивали
Яндекс

Ответ от нейросети

sobes.tech AI

Ваша задача — реализовать функцию Pipe, которая читает данные из источника (Producer) небольшими пачками, буферизует их до размера не более MaxItems, передает в Consumer и затем фиксирует прогресс в Producer через Commit в правильном порядке.

Основные моменты:

  • Источник возвращает данные пачками, максимум MaxItems за вызов Next.
  • Нужно аккумулировать данные в буфере, не превышая MaxItems.
  • После передачи данных в Consumer вызываем Commit для всех cookie в порядке получения.
  • Источник бесконечен, поэтому цикл бесконечный.

В представленном коде есть ошибка: бесконечный цикл не прерывается, и код после цикла недостижим. Нужно предусмотреть условие выхода (например, когда Next возвращает пустой срез и нет ошибки).

Исправленный пример реализации:

func Pipe(p Producer, c Consumer) error {
    var buf []any
    var cookies []int

    for {
        items, cookie, err := p.Next()
        if err != nil {
            return err
        }
        // Если источник вернул пустой срез, считаем, что данных больше нет
        if len(items) == 0 {
            break
        }

        buf = append(buf, items...)
        cookies = append(cookies, cookie)

        // Если буфер достиг максимума, отправляем на обработку
        if len(buf) >= MaxItems {
            if err := c.Process(buf); err != nil {
                return err
            }
            for _, c := range cookies {
                if err := p.Commit(c); err != nil {
                    return err
                }
            }
            buf = buf[:0]
            cookies = nil
        }
    }

    // Обработка оставшихся данных
    if len(buf) > 0 {
        if err := c.Process(buf); err != nil {
            return err
        }
        for _, c := range cookies {
            if err := p.Commit(c); err != nil {
                return err
            }
        }
    }

    return nil
}

Таким образом, мы аккумулируем данные, не превышая MaxItems, и после обработки фиксируем cookie в порядке получения, что соответствует требованиям.