Назад к вопросам
Middle+
2
Архитектурная задача: как гарантированно отправить событие в Kafka при регистрации пользователя в микросервисной архитектуре (сервис пользователей + сервис нотификаций), если Kafka — асинхронный канал?
Компании, где спрашивали
Веро Трейс
Ответ от нейросети
sobes.tech AI
Для гарантированной отправки события в Kafka при регистрации пользователя в микросервисной архитектуре с сервисом пользователей и сервисом нотификаций можно использовать паттерн "Outbox".
Идея в следующем:
- В сервисе пользователей при регистрации пользователя в рамках одной транзакции базы данных сохраняется не только информация о пользователе, но и событие (например, в отдельной таблице outbox).
- Отдельный процесс или поток читает из таблицы outbox новые события и отправляет их в Kafka.
- После успешной отправки событие помечается как отправленное или удаляется из outbox.
Таким образом, обеспечивается атомарность записи данных и события, а Kafka получает гарантированно все события, даже если сам Kafka временно недоступен.
Пример упрощённого кода на Go для записи в outbox и отправки в Kafka:
// В транзакции базы данных
func RegisterUser(db *sql.DB, user User) error {
tx, _ := db.Begin()
defer tx.Rollback()
_, err := tx.Exec("INSERT INTO users (id, name) VALUES (?, ?)", user.ID, user.Name)
if err != nil {
return err
}
event := fmt.Sprintf(`{"userId":"%s","action":"registered"}`, user.ID)
_, err = tx.Exec("INSERT INTO outbox (event) VALUES (?)", event)
if err != nil {
return err
}
return tx.Commit()
}
// Отдельный воркер
func SendEvents(db *sql.DB, producer kafka.Producer) {
rows, _ := db.Query("SELECT id, event FROM outbox WHERE sent = false")
for rows.Next() {
var id int
var event string
rows.Scan(&id, &event)
err := producer.Produce(event)
if err == nil {
db.Exec("UPDATE outbox SET sent = true WHERE id = ?", id)
}
}
}
Такой подход решает проблему асинхронности Kafka и гарантирует доставку события.