Sobes.tech
Назад к вопросам
Senior
4

/ * Есть два сервера PostgreSQL: * PROD - OLTP сервер, * STATS - сервер для долгих аналитических запросов. На текущем сервер в базе prod есть большая (10Tb) таблица вида: CREATE TABLE profiles( id SERIAL, data JSONB ) В таблице могут быть "дырки", т.е. некоторые `id` могут быть пропущены. Нужно написать программу для копирования таблицы profiles с PROD на STATS. Для работы с базами данных предполагается использовать следующие интерфейсы: type Row []interface{} type Database interface { // реализация интерфейса Database умеет переустанавливать подключения // вызов SaveRows идемпотентен io.Closer GetMaxID(ctx context.Context) (uint64, error) LoadRows(ctx context.Context, minID, maxID uint64) ([]Row, error) // [minID, maxID] SaveRows(ctx context.Context, rows []Row) error } func Connect(ctx context.Context, dbname string) (Database, error) // CopyTable // Если full=false то продолжить переливку данных с места прошлой ошибки // Если full=true - то перелить все данные func CopyTable(fromName string, toName string, full bool) error { // ... your code } Если передана опция `full=false`, то программа должна продолжить переливку данных с места прошлой ошибки. Если `full=true` - то должна перелить все данные. **Базовый уровень**: - последовательная переливка данных в 1 поток - восстановление после сбоев (опция `full=false`) Дополнительная информация: - при необходимости вы можете расширить интерфейс, добавив свои методы - при необходимости вы можете использовать пакет **database/sql** напрямую

Компании, где спрашивали
Яндекс

Ответ от нейросети

sobes.tech AI

Для решения задачи копирования большой таблицы profiles с сервера PROD на STATS с возможностью продолжения после сбоев и опцией полного копирования, можно реализовать следующий подход:

  1. Определить последний успешно скопированный ID на сервере STATS. Для этого можно добавить в интерфейс Database метод, например, GetMaxCopiedID(ctx context.Context) (uint64, error), который вернёт максимальный id, уже скопированный в таблицу на сервере STATS.

  2. При запуске функции CopyTable:

    • Если full=true, то начинаем копирование с id=1 (или минимального id в таблице PROD).
    • Если full=false, то запрашиваем последний скопированный id на STATS и начинаем копирование с id = lastCopiedID + 1.
  3. Для копирования данных используем батчи по диапазонам id, например, по 1000 записей за раз. В цикле:

    • Получаем максимальный id в таблице PROD через GetMaxID.
    • Загружаем строки из PROD с id в диапазоне [startID, startID+batchSize-1] через LoadRows.
    • Сохраняем их на STATS через SaveRows.
    • Обновляем startID = startID + batchSize.
  4. Если во время копирования происходит сбой, при следующем запуске с full=false процесс возобновится с последнего успешно сохранённого id.

Пример расширения интерфейса и реализации CopyTable:

// Расширение интерфейса Database для получения максимального скопированного id
func (db Database) GetMaxCopiedID(ctx context.Context) (uint64, error) {
    // Реализация зависит от структуры таблицы на STATS
    // Например, SELECT MAX(id) FROM profiles
}

func CopyTable(fromName string, toName string, full bool) error {
    ctx := context.Background()
    fromDB, err := Connect(ctx, fromName)
    if err != nil {
        return err
    }
    defer fromDB.Close()

    toDB, err := Connect(ctx, toName)
    if err != nil {
        return err
    }
    defer toDB.Close()

    var startID uint64 = 1
    if !full {
        startID, err = toDB.GetMaxCopiedID(ctx)
        if err != nil {
            return err
        }
        startID++
    }

    maxID, err := fromDB.GetMaxID(ctx)
    if err != nil {
        return err
    }

    batchSize := uint64(1000)

    for currentID := startID; currentID <= maxID; currentID += batchSize {
        endID := currentID + batchSize - 1
        if endID > maxID {
            endID = maxID
        }

        rows, err := fromDB.LoadRows(ctx, currentID, endID)
        if err != nil {
            return err
        }

        if len(rows) == 0 {
            continue
        }

        err = toDB.SaveRows(ctx, rows)
        if err != nil {
            return err
        }
    }

    return nil
}

Таким образом, программа последовательно переливает данные, восстанавливается после сбоев и поддерживает опцию полного копирования.