Sobes.tech
Back to questions
Senior
7

/ * Нам нужно передать данные из некоторого источника некоторому потребителю. При этом источник отдает данные небольшими пачками (~ десятки записей), а потребитель оптимальнее работает с крупными батчами (~тысячи записей). Реальный пример - поставка данных из очередей типа Kafka в базу Clickhouse. Источник: - Условно бесконечный. - Источник никогда не возвращает более MaxItems записей за один вызов Next. - В рамках одной "сессии" (одного вызова функции Pipe) источник каждый раз возвращает новые данные на каждый вызов Next. - Однако, после перезапуска источник начнет с прошлой "подтвержденной" позиции, задаваемой cookie. Поэтому *каждое* значение cookie, которое вернул вызов Next, после сохранения данных в приемнике, должно быть фиксировано вызовом Commit, причем строго в той же последовательности, в которой их вернул Next Приемник: - Не может обработать более MaxItems за один раз. Базовый уровень: Требуется реализовать функцию func Pipe(p Producer, c Consumer) error которая читает данные из источника, группирует их в буфер размером не более MaxItems и сохраняет в приёмник, после чего фиксирует прогресс в источнике. Усложнение: Методы Next, Process и Commit связаны с сетевыми вызовами и могут работать довольно долго. Для ускорения процесса передачи нужно распараллелить процессы чтения, записи и подтверждения прогресса. Так, чтобы при вызове Process или Commit продолжалось чтение из источника и формирование нового буфера. * / const MaxItems = 9999 type Producer interface { // Next returns: // - batch of items to be processed // - cookie to be commited when processing is done // - error Next() (items []any, cookie int, err error) // Commit is used to mark data batch as processed Commit(cookie int) error } type Consumer interface { Process(items []any) error } func Pipe(p Producer, c Consumer) error { // TODO }

Companies where asked
Яндекс

Answer from AI

sobes.tech AI

Для реализации функции Pipe с параллельной обработкой чтения, записи и подтверждения прогресса можно использовать каналы и горутины. Идея:

  • Одна горутина читает данные из источника (Producer.Next), формируя батчи не более MaxItems.
  • Вторая горутина обрабатывает батчи (Consumer.Process).
  • Третья горутина подтверждает прогресс (Producer.Commit) строго в порядке получения cookie.

Для сохранения порядка подтверждения можно использовать очередь или канал с буфером, где cookie будут храниться в порядке поступления.

Пример реализации:

func Pipe(p Producer, c Consumer) error {
    type batch struct {
        items  []any
        cookie int
    }

    batches := make(chan batch, 10)      // Канал для передачи батчей на обработку
    commitCh := make(chan int, 10)       // Канал для передачи cookie на подтверждение
    errCh := make(chan error, 3)         // Канал для ошибок
    done := make(chan struct{})           // Канал для остановки

    // Чтение данных
    go func() {
        defer close(batches)
        for {
            items, cookie, err := p.Next()
            if err != nil {
                errCh <- err
                return
            }
            if len(items) == 0 {
                // Источник пуст, можно завершать
                return
            }

            // Формируем батчи не более MaxItems
            for start := 0; start < len(items); start += MaxItems {
                end := start + MaxItems
                if end > len(items) {
                    end = len(items)
                }
                batches <- batch{items: items[start:end], cookie: cookie}
            }
        }
    }()

    // Обработка данных
    go func() {
        defer close(commitCh)
        for b := range batches {
            if err := c.Process(b.items); err != nil {
                errCh <- err
                return
            }
            commitCh <- b.cookie
        }
    }()

    // Подтверждение
    go func() {
        for cookie := range commitCh {
            if err := p.Commit(cookie); err != nil {
                errCh <- err
                return
            }
        }
        close(done)
    }()

    select {
    case err := <-errCh:
        return err
    case <-done:
        return nil
    }
}

В этом примере:

  • Чтение и формирование батчей происходит в первой горутине.
  • Обработка батчей — во второй.
  • Подтверждение cookie — в третьей.

Каналы обеспечивают поток данных и порядок подтверждения. Ошибки передаются через errCh, что позволяет прервать работу при возникновении ошибки.

Такой подход позволяет параллельно читать, обрабатывать и подтверждать данные, повышая общую производительность.