Sobes.tech
Back to questions
Senior
4

// Есть интерфейс Backend с методом Execute // Нужно написать балансировщик нагрузки // Балансировщик удовлетворяет интерфейсу Backend // Задача балансировщика - принять запрос и делегировать запрос одному из доступных бэкендов // Балансировщик содержит в себе коллекцию бэкендов // Балансировщик при исполнении запроса выбирает какой то из бэкендов, по какому то алгоритму // // Алгоритм Least Connection, health check и автоматическое восстановление, делаем thread safety через atomic, проброс контекста для отрубки, пороговое значение ошибок при котором мы отключим бэкенд // Request представляет входной запрос для Backend. type Request struct { // Payload – произвольные данные запроса. Payload []byte } // Response представляет ответ от Backend. type Response struct { // Payload – произвольные данные ответа. Payload []byte } // Backend – интерфейс, определяющий метод для выполнения запроса type Backend interface { Execute(ctx context.Context, req Request) (Response, error) } type Balancer struct { backends []Backend } func NewBalancer(backends []Backend) *Balancer { return &Balancer{backends: backends} } func (b *Balancer) Execute(ctx context.Context, req Request) (Response, error) { // todo: implement balancer logic return Response{}, nil } type TestBackend struct { }

Companies where asked
VKVK

Answer from AI

sobes.tech AI

go import ( "context" "errors" "sync/atomic" "time" )

type backendWrapper struct { backend Backend activeConns int64 // atomic errorCount int64 // atomic healthy int32 // atomic (1 - healthy, 0 - unhealthy) lastCheckTime int64 // unix timestamp }

func (bw *backendWrapper) isHealthy() bool { return atomic.LoadInt32(&bw.healthy) == 1 }

func (bw *backendWrapper) markUnhealthy() { atomic.StoreInt32(&bw.healthy, 0) atomic.StoreInt64(&bw.errorCount, 0) }

func (bw *backendWrapper) markHealthy() { atomic.StoreInt32(&bw.healthy, 1) atomic.StoreInt64(&bw.errorCount, 0) }

func (bw *backendWrapper) incrementError() { atomic.AddInt64(&bw.errorCount, 1) }

func (bw *backendWrapper) resetError() { atomic.StoreInt64(&bw.errorCount, 0) }

func (bw *backendWrapper) incrementConns() { atomic.AddInt64(&bw.activeConns, 1) }

func (bw *backendWrapper) decrementConns() { atomic.AddInt64(&bw.activeConns, -1) }

const ( errorThreshold = 5 healthCheckInterval = 10 * time.Second )

type Balancer struct { backends []*backendWrapper }

func NewBalancer(backends []Backend) *Balancer { wrappers := make([]*backendWrapper, len(backends)) for i, b := range backends { wrappers[i] = &backendWrapper{ backend: b, healthy: 1, } } return &Balancer{backends: wrappers} }

func (b *Balancer) Execute(ctx context.Context, req Request) (Response, error) { // Запускаем health check в фоне b.healthCheck()

// Выбираем backend с наименьшим количеством активных соединений и который здоров
var selected *backendWrapper
minConns := int64(1<<63 - 1) // max int64

for _, bw := range b.backends {
    if !bw.isHealthy() {
        continue
    }
    conns := atomic.LoadInt64(&bw.activeConns)
    if conns < minConns {
        minConns = conns
        selected = bw
    }
}

if selected == nil {
    return Response{}, errors.New("no healthy backends available")
}

selected.incrementConns()
defer selected.decrementConns()

resp, err := selected.backend.Execute(ctx, req)
if err != nil {
    selected.incrementError()
    if atomic.LoadInt64(&selected.errorCount) >= errorThreshold {
        selected.markUnhealthy()
    }
    return resp, err
}
selected.resetError()
return resp, nil

}

func (b *Balancer) healthCheck() { now := time.Now().Unix() for _, bw := range b.backends { lastCheck := atomic.LoadInt64(&bw.lastCheckTime) if now-lastCheck < int64(healthCheckInterval.Seconds()) { continue } atomic.StoreInt64(&bw.lastCheckTime, now)

    go func(bw *backendWrapper) {
        ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
        defer cancel()

        // Пробуем выполнить пустой запрос или специальный health check запрос
        _, err := bw.backend.Execute(ctx, Request{Payload: []byte("healthcheck")})
        if err != nil {
            bw.markUnhealthy()
        } else {
            bw.markHealthy()
        }
    }(bw)
}

}


В этом примере реализован балансировщик нагрузки с алгоритмом Least Connection, поддержкой health check, автоматическим восстановлением backend'ов, thread safety через atomic операции и порогом ошибок для отключения backend'а. Контекст передаётся для возможности отмены запроса.