// Есть интерфейс Backend с методом Execute // Нужно написать балансировщик нагрузки // Балансировщик удовлетворяет интерфейсу Backend // Задача балансировщика - принять запрос и делегировать запрос одному из доступных бэкендов // Балансировщик содержит в себе коллекцию бэкендов // Балансировщик при исполнении запроса выбирает какой то из бэкендов, по какому то алгоритму // // Алгоритм Least Connection, health check и автоматическое восстановление, делаем thread safety через atomic, проброс контекста для отрубки, пороговое значение ошибок при котором мы отключим бэкенд // Request представляет входной запрос для Backend. type Request struct { // Payload – произвольные данные запроса. Payload []byte } // Response представляет ответ от Backend. type Response struct { // Payload – произвольные данные ответа. Payload []byte } // Backend – интерфейс, определяющий метод для выполнения запроса type Backend interface { Execute(ctx context.Context, req Request) (Response, error) } type Balancer struct { backends []Backend } func NewBalancer(backends []Backend) *Balancer { return &Balancer{backends: backends} } func (b *Balancer) Execute(ctx context.Context, req Request) (Response, error) { // todo: implement balancer logic return Response{}, nil } type TestBackend struct { }
Answer from AI
sobes.tech AI
go import ( "context" "errors" "sync/atomic" "time" )
type backendWrapper struct { backend Backend activeConns int64 // atomic errorCount int64 // atomic healthy int32 // atomic (1 - healthy, 0 - unhealthy) lastCheckTime int64 // unix timestamp }
func (bw *backendWrapper) isHealthy() bool { return atomic.LoadInt32(&bw.healthy) == 1 }
func (bw *backendWrapper) markUnhealthy() { atomic.StoreInt32(&bw.healthy, 0) atomic.StoreInt64(&bw.errorCount, 0) }
func (bw *backendWrapper) markHealthy() { atomic.StoreInt32(&bw.healthy, 1) atomic.StoreInt64(&bw.errorCount, 0) }
func (bw *backendWrapper) incrementError() { atomic.AddInt64(&bw.errorCount, 1) }
func (bw *backendWrapper) resetError() { atomic.StoreInt64(&bw.errorCount, 0) }
func (bw *backendWrapper) incrementConns() { atomic.AddInt64(&bw.activeConns, 1) }
func (bw *backendWrapper) decrementConns() { atomic.AddInt64(&bw.activeConns, -1) }
const ( errorThreshold = 5 healthCheckInterval = 10 * time.Second )
type Balancer struct { backends []*backendWrapper }
func NewBalancer(backends []Backend) *Balancer { wrappers := make([]*backendWrapper, len(backends)) for i, b := range backends { wrappers[i] = &backendWrapper{ backend: b, healthy: 1, } } return &Balancer{backends: wrappers} }
func (b *Balancer) Execute(ctx context.Context, req Request) (Response, error) { // Запускаем health check в фоне b.healthCheck()
// Выбираем backend с наименьшим количеством активных соединений и который здоров
var selected *backendWrapper
minConns := int64(1<<63 - 1) // max int64
for _, bw := range b.backends {
if !bw.isHealthy() {
continue
}
conns := atomic.LoadInt64(&bw.activeConns)
if conns < minConns {
minConns = conns
selected = bw
}
}
if selected == nil {
return Response{}, errors.New("no healthy backends available")
}
selected.incrementConns()
defer selected.decrementConns()
resp, err := selected.backend.Execute(ctx, req)
if err != nil {
selected.incrementError()
if atomic.LoadInt64(&selected.errorCount) >= errorThreshold {
selected.markUnhealthy()
}
return resp, err
}
selected.resetError()
return resp, nil
}
func (b *Balancer) healthCheck() { now := time.Now().Unix() for _, bw := range b.backends { lastCheck := atomic.LoadInt64(&bw.lastCheckTime) if now-lastCheck < int64(healthCheckInterval.Seconds()) { continue } atomic.StoreInt64(&bw.lastCheckTime, now)
go func(bw *backendWrapper) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
// Пробуем выполнить пустой запрос или специальный health check запрос
_, err := bw.backend.Execute(ctx, Request{Payload: []byte("healthcheck")})
if err != nil {
bw.markUnhealthy()
} else {
bw.markHealthy()
}
}(bw)
}
}
В этом примере реализован балансировщик нагрузки с алгоритмом Least Connection, поддержкой health check, автоматическим восстановлением backend'ов, thread safety через atomic операции и порогом ошибок для отключения backend'а. Контекст передаётся для возможности отмены запроса.