Привет, Хабр!
Есть классическая боль очередей: скейлинг по факту отставания. Пока backlog вырос, пока HPA дотянулся, пока новые pod«ы прогрелись — SLO уже упал. Решение напрашивается: считать не сколько наваливается прямо сейчас, а сколько нужно серверов, чтобы вероятность ждать больше T была ниже целевого порога. Ровно это умеет Erlang‑C пришедший из жизни колл‑центров. Берём — интенсивность входа,
— среднюю производительность одного воркера, целевой сервис‑левел по ожиданию в очереди, и получаем требуемое число агентов c. Дальше превращаем это в desired replicas и отдаём в KEDA через External Scaler поверх gRPC. Получается предиктивный автоскейлинг, привязанный к SLO, а не к догоняющим метрикам.
Erlang-C как инструмент для SLO по ожиданию
Работаем в стандартной модели M/M/c: пуассоновский вход, экспоненциальные сервис‑таймы, c независимых серверов, бесконечная очередь. Тогда вероятность того, что пришедший попадёт ждать, выражается формулой Erlang‑C. Пусть — предложенная нагрузка в ерлангах. Тогда
Это вероятность того, что клиент будет ждать начала обслуживания. Это — классика модели M/M/c.
Дальше важная связь с SLO: распределение времени ожидания условно на событие «пришлось ждать» — экспоненциальное с параметром . Значит неусловная вероятность уложиться в порог
такова:
Сервис‑левел по ожиданию до — это и есть формула выше
Цель: найти минимальное cc, при котором при заданных TT и целевом уровне
. Условия устойчивости обязательны:
. Оговорка адекватности: Erlang‑C переоценивает ожидание, если клиенты отваливаются из очереди — в таком случае модель Erlang‑A точнее, а C будет консервативной.
Как превратить это в контроллер мощностей
Шаги простые и повторяемые:
Оценить или спрогнозировать λ на горизонте, который соответствует времени прогрева/холод‑старту и желаемому SLO‑окну. Это можно сделать PromQL функциями
predict_linearили сглаживаниемdouble_exponential_smoothing. В актуальной документации Prometheusholt_wintersпереименован вdouble_exponential_smoothing, аpredict_linear— линейная регрессия на окне.Оценить μ из метрик сервиса. Надёжно брать средний сервис‑тайм за окно и инвертировать:
.
По
посчитать минимальное cc бинарным поиском по формуле выше.
Отдать cc как метрику, из которой HPA без лишних трюков получит desired replicas. В KEDA у
ScaledObjectдля external‑метрик тип по умолчаниюAverageValue, и HPA тогда целится в «глобальная метрика / targetAverageValue». Если положить targetAverageValue = 1, то desired replicas будет равен значению метрики.
Реализация KEDA External Scaler на Go
KEDA ходит к внешнему gRPC‑серверу и вызывает четыре метода: IsActive, StreamIsActive, GetMetricSpec, GetMetrics. Сигнатуры описаны в externalscaler.proto, а в документации KEDA показано, как именно KEDA их использует. Мы реализуем «pull»‑вариант без стрима.
Ниже заготовка. Она:
читает из
ScaledObject.metadataPromQL‑запросы для λ и среднего сервис‑тайма;прогнозирует λ на заданный горизонт;
считает cc по Erlang‑C с бинарным поиском;
фиксирует анти‑дребезг: scale down медленнее scale up, гистерезис по допуску ε;
публикует прометеевые метрики самого скейлера;
поддерживает TLS для gRPC по флагам.
// cmd/scaler/main.go package main import ( "context" "crypto/tls" "encoding/json" "errors" "flag" "fmt" "log" "math" "net" "net/http" "os" "strconv" "time" pb "github.com/kedacore/keda/pkg/scalers/externalscaler" "google.golang.org/grpc" "google.golang.org/grpc/credentials" ) type server struct { pb.UnimplementedExternalScalerServer httpc *http.Client logger *log.Logger now func() time.Time // simple state for hysteresis lastC int lastChange time.Time scaleDownHold time.Duration scaleUpMinStep int epsilonSL float64 minC, maxC int } func main() { addr := flag.String("listen", ":9090", "gRPC listen addr") scaleDownHold := flag.Duration("scale-down-hold", 2*time.Minute, "hold time before scaling down") scaleUpMinStep := flag.Int("scale-up-min-step", 1, "minimal scale up step") useTLS := flag.Bool("tls", false, "enable TLS") certFile := flag.String("tls-cert", "", "TLS cert") keyFile := flag.String("tls-key", "", "TLS key") flag.Parse() s := &server{ httpc: &http.Client{ Timeout: 5 * time.Second, }, logger: log.New(os.Stdout, "erlangc-scaler ", log.LstdFlags|log.Lmsgprefix), now: time.Now, scaleDownHold: *scaleDownHold, scaleUpMinStep: *scaleUpMinStep, epsilonSL: 0.01, minC: 1, maxC: 10000, } var srv *grpc.Server if *useTLS { creds, err := credentials.NewServerTLSFromFile(*certFile, *keyFile) if err != nil { log.Fatalf("tls: %v", err) } srv = grpc.NewServer(grpc.Creds(creds)) } else { srv = grpc.NewServer() } pb.RegisterExternalScalerServer(srv, s) lis, err := net.Listen("tcp", *addr) if err != nil { log.Fatal(err) } log.Printf("listening on %s", *addr) if err := srv.Serve(lis); err != nil { log.Fatal(err) } } func (s *server) IsActive(ctx context.Context, in *pb.ScaledObjectRef) (*pb.IsActiveResponse, error) { // активируемся, если предсказанный λ > 0 и требуемое c >= minC res, _, err := s.compute(ctx, in) active := false if err == nil && res >= int32(s.minC) { active = true } return &pb.IsActiveResponse{Result: active}, nil } func (s *server) GetMetricSpec(ctx context.Context, in *pb.ScaledObjectRef) (*pb.GetMetricSpecResponse, error) { // HPA должен интерпретировать метрику как AverageValue = 1, // значит desiredReplicas = metricValue return &pb.GetMetricSpecResponse{ MetricSpecs: []*pb.MetricSpec{ { MetricName: "erlangc_required_replicas", TargetSize: 1, // AverageValue=1 }, }, }, nil } func (s *server) GetMetrics(ctx context.Context, in *pb.GetMetricsRequest) (*pb.GetMetricsResponse, error) { cReq, sl, err := s.compute(ctx, in.ScaledObjectRef) if err != nil { return nil, err } // гистерезис: не даём дёргаться вниз чаще, чем раз в scaleDownHold now := s.now() cOut := int(cReq) if s.lastC > 0 { if cOut < s.lastC { if now.Sub(s.lastChange) < s.scaleDownHold { cOut = s.lastC } } else if cOut > s.lastC && cOut-s.lastC < s.scaleUpMinStep { cOut = s.lastC + s.scaleUpMinStep } } if cOut != s.lastC { s.lastC = cOut s.lastChange = now } s.logger.Printf("c*=%d, predictedSL=%.3f", cOut, sl) return &pb.GetMetricsResponse{ MetricValues: []*pb.MetricValue{{ MetricName: "erlangc_required_replicas", MetricValue: int64(cOut), }}, }, nil } func (s *server) compute(ctx context.Context, in *pb.ScaledObjectRef) (int32, float64, error) { md := in.ScalerMetadata promURL := md["prometheusURL"] if promURL == "" { return 0, 0, errors.New("prometheusURL is required") } arrQ := md["arrivalRateQuery"] // should return λ [1/s] serQ := md["serviceTimeQuery"] // should return average service time [s] if arrQ == "" || serQ == "" { return 0, 0, errors.New("arrivalRateQuery and serviceTimeQuery are required") } targetSL, _ := strconv.ParseFloat(md["targetSL"], 64) // e.g. 0.95 if targetSL <= 0 || targetSL >= 1 { targetSL = 0.95 } T, _ := strconv.ParseFloat(md["waitThresholdSeconds"], 64) // e.g. 1.0 if T <= 0 { T = 1.0 } lmbd, err := s.instantQuery(ctx, promURL, arrQ) if err != nil { return 0, 0, fmt.Errorf("arrival rate query: %w", err) } svc, err := s.instantQuery(ctx, promURL, serQ) if err != nil { return 0, 0, fmt.Errorf("service time query: %w", err) } if svc <= 0 || lmbd < 0 { return 0, 0, errors.New("invalid metrics") } mu := 1.0 / svc // нижняя граница: хотя бы ceil(λ/μ) lb := int(math.Ceil(lmbd / mu)) if lb < s.minC { lb = s.minC } ub := lb // расширяем верхнюю границу, пока SL не выполнится или пока не упрёмся for ; ub <= s.maxC; ub *= 2 { sl := serviceLevel(lmbd, mu, float64(ub), T) if sl >= targetSL { break } if ub == 0 { ub = 1 } } if ub > s.maxC { return int32(s.maxC), serviceLevel(lmbd, mu, float64(s.maxC), T), nil } // бинарный поиск best := ub for lb <= ub { m := (lb + ub) / 2 sl := serviceLevel(lmbd, mu, float64(m), T) if sl >= targetSL { best = m ub = m - 1 } else { lb = m + 1 } } return int32(best), serviceLevel(lmbd, mu, float64(best), T), nil } func (s *server) instantQuery(ctx context.Context, base, q string) (float64, error) { req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("%s/api/v1/query?query=%s", base, urlQueryEscape(q)), nil) if err != nil { return 0, err } resp, err := s.httpc.Do(req) if err != nil { return 0, err } defer resp.Body.Close() var out struct { Status string `json:"status"` Data struct { ResultType string `json:"resultType"` Result []struct { Value [2]any `json:"value"` } `json:"result"` } `json:"data"` } if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { return 0, err } if out.Status != "success" || len(out.Data.Result) == 0 { return 0, errors.New("no data") } valStr, _ := out.Data.Result[0].Value[1].(string) return strconv.ParseFloat(valStr, 64) } // вероятность ждать func erlangC(a float64, c int) float64 { if c <= 0 || a <= 0 { return 0 } // устойчивое вычисление через рекуррентные коэффициенты term := 1.0 sum := 1.0 for k := 1; k <= c-1; k++ { term *= a / float64(k) sum += term } termC := term * a / float64(c) // a^c / c! if a >= float64(c) { // перегрузка — формально неустойчиво, возвращаем 1 return 1.0 } num := termC * float64(c) / (float64(c) - a) return num / (sum + num) } func serviceLevel(lambda, mu, c float64, T float64) float64 { if c <= 0 { return 0 } Pw := erlangC(lambda/mu, int(c)) gap := c*mu - lambda if gap <= 0 { return 0 } return 1 - Pw*math.Exp(-gap*T) } // простая экранизация без внешних зависимостей func urlQueryEscape(q string) string { r := "" for i := 0; i < len(q); i++ { ch := q[i] switch ch { case ' ': r += "%20" case '"': r += "%22" case '+': r += "%2B" case '%': r += "%25" case '&': r += "%26" default: r += string(ch) } } return r }
Манифесты: деплой скейлера и ScaledObject
Разворачиваем Deployment с нашим gRPC‑сервисом. Если нужен TLS, добавляем секрет с ключом и сертификатом и включаем флаги. ScaledObject настраиваем так, чтобы HPA воспринимал возвращаемую метрику как AverageValue = 1. В metadata кладём PromQL‑запросы для λ и среднего сервис‑тайма.
apiVersion: apps/v1 kind: Deployment metadata: name: erlangc-scaler spec: replicas: 1 selector: matchLabels: { app: erlangc-scaler } template: metadata: labels: { app: erlangc-scaler } spec: containers: - name: scaler image: ghcr.io/org/erlangc-scaler:1.0.0 args: - --listen=:9090 - --scale-down-hold=120s - --scale-up-min-step=2 ports: - name: grpc containerPort: 9090 readinessProbe: tcpSocket: { port: 9090 } initialDelaySeconds: 2 periodSeconds: 5 livenessProbe: tcpSocket: { port: 9090 } initialDelaySeconds: 10 periodSeconds: 10 --- apiVersion: v1 kind: Service metadata: name: erlangc-scaler spec: selector: app: erlangc-scaler ports: - name: grpc port: 9090 targetPort: 9090 --- apiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: name: queue-worker-erlangc spec: scaleTargetRef: name: queue-worker pollingInterval: 30 cooldownPeriod: 300 minReplicaCount: 0 maxReplicaCount: 200 advanced: horizontalPodAutoscalerConfig: behavior: # быстро вверх, аккуратно вниз scaleUp: stabilizationWindowSeconds: 0 policies: - type: Percent value: 100 periodSeconds: 15 scaleDown: stabilizationWindowSeconds: 300 policies: - type: Percent value: 50 periodSeconds: 60 triggers: - type: external metricType: AverageValue metadata: scalerAddress: erlangc-scaler.default.svc.cluster.local:9090 prometheusURL: http://prometheus-server.monitoring.svc.cluster.local # прогноз λ на 60 с вперёд по регрессии на последнем часу arrivalRateQuery: sum(predict_linear(sum(rate(queue_in_total[5m]))[1h:5m], 60)) # средний сервис-тайм по сумме/счётчику serviceTimeQuery: (sum(rate(job_duration_seconds_sum[5m])) / sum(rate(job_duration_seconds_count[5m]))) waitThresholdSeconds: "1" targetSL: "0.95"
Семантика external‑метрик и HPA‑формулы подтверждена в официальных доках Kubernetes и KEDA. Обратите внимание на behavior.scaleDown.stabilizationWindowSeconds — дефолтное значение 300 с подавляет флаппинг при схлопывании, это нормальная практика.
Генератор нагрузки и эталонный воркер
Чтобы воспроизвести поведение M/M/c, возьмём Redis Lists как очередь, генератор с пуассоновским входом и воркер, который моделирует экспоненциальный сервис‑тайм с заданным средним. Нагрузчик просто пушит JSON‑сообщения в лист, воркер блокирующе читает и обрабатывает.
// cmd/loadgen/main.go package main import ( "context" "flag" "log" "math/rand" "time" "github.com/redis/go-redis/v9" ) func main() { addr := flag.String("redis", "redis:6379", "redis addr") queue := flag.String("queue", "jobs", "queue name") lambda := flag.Float64("lambda", 10, "arrival rate per second") flag.Parse() rdb := redis.NewClient(&redis.Options{Addr: *addr}) ctx := context.Background() src := rand.New(rand.NewSource(time.Now().UnixNano())) for { // экспоненциальные межприходы u := src.Float64() wait := -mathLog(u) / *lambda time.Sleep(time.Duration(wait * float64(time.Second))) err := rdb.LPush(ctx, *queue, time.Now().UnixNano()).Err() if err != nil { log.Printf("LPUSH: %v", err) } } } func mathLog(u float64) float64 { return -1 * (math.Log(1-u)) }
// cmd/worker/main.go package main import ( "context" "flag" "log" "math" "math/rand" "time" "github.com/redis/go-redis/v9" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "net/http" ) var ( jobDur = prometheus.NewHistogram(prometheus.HistogramOpts{ Name: "job_duration_seconds", Help: "service time per job", Buckets: prometheus.DefBuckets, }) processed = prometheus.NewCounter(prometheus.CounterOpts{ Name: "queue_processed_total", Help: "jobs processed", }) ) func main() { addr := flag.String("redis", "redis:6379", "redis addr") queue := flag.String("queue", "jobs", "queue name") mean := flag.Float64("mean", 0.2, "mean service time in seconds") flag.Parse() prometheus.MustRegister(jobDur, processed) go func() { http.Handle("/metrics", promhttp.Handler()) _ = http.ListenAndServe(":8080", nil) }() rdb := redis.NewClient(&redis.Options{Addr: *addr}) ctx := context.Background() src := rand.New(rand.NewSource(time.Now().UnixNano())) for { res, err := rdb.BRPop(ctx, 0, *queue).Result() if err != nil { log.Printf("BRPOP: %v", err) time.Sleep(time.Second) continue } start := time.Now() // экспоненциальный сервис-тайм u := src.Float64() t := -math.Log(1-u) * (*mean) time.Sleep(time.Duration(t * float64(time.Second))) jobDur.Observe(time.Since(start).Seconds()) processed.Inc() } }
С такими метриками Prometheus‑запросы в ScaledObject становятся простыми: arrivalRateQuery = sum(rate(queue_in_total[5m])) или используйте queue_enqueued_total в своём стеке. serviceTimeQuery = sum(rate(job_duration_seconds_sum[5m])) / sum(rate(job_duration_seconds_count[5m])). Подчеркну: predict_linear подходит для gauges, для скоростей используйте подход через субквест [...].
Дэшборды: что мониторить
Хочется видеть одновременно прогноз , оценку
, расчётное
, фактические реплики и сервис‑левел. Один из минимальных дашбордов можно собрать так:
Панель 1:
sum(rate(queue_in_total[5m]))иpredict_linear(sum(rate(queue_in_total[5m]))[1h:5m], 60)— текущая и прогнозная интенсивность.Панель 2:
1 / (sum(rate(job_duration_seconds_sum[5m])) / sum(rate(job_duration_seconds_count[5m])))—.
Панель 3: внешняя метрика
erlangc_required_replicas.Панель 4: расчётный сервис‑левел панелью Stat, вычисляем в скейлере и экспортируем или прикидываем в PromQL аналитику, если метрики доступны.
Панель 5:
kube_deployment_status_replicasпо целевому Deployment.
Минимальный JSON‑фрагмент панели Stat для :
{ "type": "stat", "title": "Required replicas (Erlang-C)", "targets": [ { "expr": "erlangc_required_replicas", "legendFormat": "c*" } ], "options": { "reduceOptions": { "calcs": ["lastNotNull"], "values": false } } }
Устойчивость
Даже идеальная модель может заставить HPA дёргаться, если входные оценки шумные. Контур стабилизации в двух местах:
На стороне HPA через
behavior: стабилизационное окно на scale down по умолчанию 300 секунд и ограничение скорости изменений. Это штатная возможность autoscaling/v2.В самом скейлере: держим минимальный шаг апскейла, откладываем даунскейл на
scaleDownHold, накладываем допуск ε на целевой SLO. Это снижает чувствительность к лёгким промахам в λ/μ.
При желании можно добавить EMA к и
или перейти на PromQL
double_exponential_smoothing, но помните, что это сглаживание, а не точное предсказание.
Что в итоге получается
Мы переводим SLO по ожиданию в очереди в конкретное число реплик через Erlang‑C, используя текущие и прогнозные метрики. Это прозрачная математика плюс аккуратный контур стабилизации. Профит — меньше промахов по SLO, меньше догоняющих апскейлов при всплесках, контролируемая стоимость. А главное — логика автоскейлинга перестаёт зависеть от конкретного брокера, потому что мы подаём в скейлер не размер очереди, а первичные параметры потока и сервиса.
В итоге предиктивный автоскейлинг через Erlang‑C и KEDA позволяет проектировать систему не «по факту перегрузки», а исходя из формализованных требований к уровню сервиса. Это уже уровень архитектурных решений, где математика, распределённые системы и практики эксплуатации должны работать вместе.
Если вы хотите глубже разобраться в том, как строить подобные механизмы в продакшн‑среде, рекомендуем обратить внимание на курс Highload Architect. Там вы сможете изучить подходы к проектированию систем, где подобные методы масштабирования — не исключение, а правило. Чтобы узнать, подойдет ли вам программа курса, пройдите вступительный тест.
Рост в IT быстрее с Подпиской — дает доступ к 3-м курсам в месяц по цене одного. Подробнее
