Sobes.tech
Назад к вопросам
Senior
3

В микросервисной архитектуре каждый сервис имеет свою базу данных. Как работает паттерн Outbox в этом контексте, если Kafka ещё не получила сообщение?

Компании, где спрашивали
ИП Ганус Александр Андреевич

Ответ от нейросети

sobes.tech AI

Паттерн Outbox используется для обеспечения атомарности между записью данных в базу микросервиса и публикацией событий в Kafka, когда у каждого сервиса своя база данных.

Идея в том, что при изменении состояния сервиса в одной транзакции записывается не только основная бизнес-данные, но и событие (сообщение) в специальную таблицу outbox внутри той же базы данных. Таким образом, запись события и бизнес-операция атомарны — либо обе успешны, либо обе откатываются.

Отдельный процесс (или поток) периодически читает новые записи из таблицы outbox и отправляет их в Kafka. Если отправка не удалась, запись остаётся в таблице, и попытка повторяется позже. После успешной отправки запись помечается как отправленная или удаляется.

Таким образом, даже если Kafka временно недоступна, данные не потеряются, а сервис не нарушит консистентность, так как событие хранится локально и будет отправлено позже.

Пример упрощённой схемы:

// В транзакции
func SaveOrderAndOutbox(db *sql.DB, order Order, event Event) error {
    tx, _ := db.Begin()
    defer tx.Rollback()

    // Сохраняем заказ
    _, err := tx.Exec("INSERT INTO orders (...) VALUES (...)", ...)
    if err != nil {
        return err
    }

    // Записываем событие в outbox
    _, err = tx.Exec("INSERT INTO outbox (event_type, payload, processed) VALUES (?, ?, false)", event.Type, event.Payload)
    if err != nil {
        return err
    }

    return tx.Commit()
}

// Отдельный воркер читает из outbox и отправляет в Kafka
func OutboxWorker(db *sql.DB, kafkaProducer KafkaProducer) {
    for {
        rows, _ := db.Query("SELECT id, event_type, payload FROM outbox WHERE processed = false")
        for rows.Next() {
            var id int
            var eventType, payload string
            rows.Scan(&id, &eventType, &payload)

            err := kafkaProducer.Send(eventType, payload)
            if err == nil {
                db.Exec("UPDATE outbox SET processed = true WHERE id = ?", id)
            }
        }
        time.Sleep(time.Second * 5)
    }
}

Так обеспечивается надежная доставка сообщений и согласованность данных между сервисами.