В микросервисной архитектуре каждый сервис имеет свою базу данных. Как работает паттерн Outbox в этом контексте, если Kafka ещё не получила сообщение?
Ответ от нейросети
sobes.tech AI
Паттерн Outbox используется для обеспечения атомарности между записью данных в базу микросервиса и публикацией событий в Kafka, когда у каждого сервиса своя база данных.
Идея в том, что при изменении состояния сервиса в одной транзакции записывается не только основная бизнес-данные, но и событие (сообщение) в специальную таблицу outbox внутри той же базы данных. Таким образом, запись события и бизнес-операция атомарны — либо обе успешны, либо обе откатываются.
Отдельный процесс (или поток) периодически читает новые записи из таблицы outbox и отправляет их в Kafka. Если отправка не удалась, запись остаётся в таблице, и попытка повторяется позже. После успешной отправки запись помечается как отправленная или удаляется.
Таким образом, даже если Kafka временно недоступна, данные не потеряются, а сервис не нарушит консистентность, так как событие хранится локально и будет отправлено позже.
Пример упрощённой схемы:
// В транзакции
func SaveOrderAndOutbox(db *sql.DB, order Order, event Event) error {
tx, _ := db.Begin()
defer tx.Rollback()
// Сохраняем заказ
_, err := tx.Exec("INSERT INTO orders (...) VALUES (...)", ...)
if err != nil {
return err
}
// Записываем событие в outbox
_, err = tx.Exec("INSERT INTO outbox (event_type, payload, processed) VALUES (?, ?, false)", event.Type, event.Payload)
if err != nil {
return err
}
return tx.Commit()
}
// Отдельный воркер читает из outbox и отправляет в Kafka
func OutboxWorker(db *sql.DB, kafkaProducer KafkaProducer) {
for {
rows, _ := db.Query("SELECT id, event_type, payload FROM outbox WHERE processed = false")
for rows.Next() {
var id int
var eventType, payload string
rows.Scan(&id, &eventType, &payload)
err := kafkaProducer.Send(eventType, payload)
if err == nil {
db.Exec("UPDATE outbox SET processed = true WHERE id = ?", id)
}
}
time.Sleep(time.Second * 5)
}
}
Так обеспечивается надежная доставка сообщений и согласованность данных между сервисами.