Если мы захотим сделать несколько вычитывающих воркеров из outbox-таблицы, как их синхронизировать, чтобы они не брали одни и те же данные?
Ответ от нейросети
sobes.tech AI
Для синхронизации нескольких воркеров, читающих из одной outbox-таблицы, чтобы избежать обработки одних и тех же данных, обычно используют следующие подходы:
-
Флаг состояния или статус записи — добавляют в таблицу поле, например,
processedилиin_progress. Воркеры при выборе записей фильтруют только те, у которых статус "новый". При начале обработки воркер обновляет статус на "в обработке" с помощью атомарной операции. -
Оптимистичная блокировка с помощью версии или timestamp — воркер читает запись с версией, пытается обновить статус, проверяя, что версия не изменилась. Если обновление прошло успешно, воркер обрабатывает запись.
-
Использование SQL-запроса с блокировкой строк (SELECT ... FOR UPDATE SKIP LOCKED) — в PostgreSQL и некоторых других СУБД можно выбрать записи с блокировкой, пропуская уже заблокированные другими транзакциями. Это позволяет нескольким воркерам параллельно брать непересекающиеся записи.
Пример на Go с использованием PostgreSQL и FOR UPDATE SKIP LOCKED:
func fetchNextOutboxItem(db *sql.DB) (*OutboxItem, error) {
tx, err := db.Begin()
if err != nil {
return nil, err
}
defer tx.Rollback()
row := tx.QueryRow(`
SELECT id, payload FROM outbox
WHERE processed = false
FOR UPDATE SKIP LOCKED
LIMIT 1
`)
var item OutboxItem
if err := row.Scan(&item.ID, &item.Payload); err != nil {
return nil, err
}
// Отметить как в обработке или processed
_, err = tx.Exec(`UPDATE outbox SET processed = true WHERE id = $1`, item.ID)
if err != nil {
return nil, err
}
err = tx.Commit()
if err != nil {
return nil, err
}
return &item, nil
}
Таким образом, блокировка строк и атомарное обновление статуса позволяют избежать гонок и дублирующей обработки.