Сразу скажу, хаба для F# на хабре нет, поэтому пишу в C#.
Для тех кто не знаком с F#, но знаком с C#, рекомендую наисвежайшую статью от Microsoft.
Она поможет Вам испытывать меньше WTF моментов при прочтении, т.к. моя статья не туториал к синтаксису.
Есть сервис, написанный на Akka.NET, он вываливает в разные текстовые логи кучу инфы. Отдел эксплуатации грепает эти логи, жарит по ним регекспами, чтобы узнать о кол-ве ошибок (бизнесовых и не очень), о кол-ве входящих в сервис сообщений и кол-ве исходящих. Далее эта информация заливается в ElasticDB, InfluxDB и показывается в Grafana и Kibana в разных срезах и агрегациях.
Звучит сложно, да и парсить текстовые логи сервиса, который генерит несколько десятков ГБ текстового мусора в день — занятие неблагодарное. Поэтому встала задача — сервис должен быть способен поднять ендпоинт, который можно дёрнуть и получить сразу всю инфу о нём.
Решать задачу будем так:
В App.Metrics есть 6 типов представлений:
В первой итерации нам вполне хватит счётчиков, таймеров и… метров :)
В начале опишем типы и интерфейсы (все приводить не буду, можно посмотреть в репозитории, ссылка в конце).
Так же условимся, что все наши сообщения для метрик будут приходить к особому актору (его мы определим позже) через EventStream (шина сообщений в самой Akka.Net).
Например таймер, который должен уметь замерить некоторое количество времени для какого-то объекта:
Или счётчик, который должен уметь увеличиваться/уменьшаться как с указанием количества, так и без:
И пара примеров команд для шины:
Самое главное — определим возможные сообщения, которые могут ходить по шине, и на которые будет реагировать наш метрик-актор. Для этого воспользуемся Discriminated Union:
Теперь надо реализовать интерфейсы и на этом закончить первый пункт. Реализовывать мы их будем в функциональном стиле, т.е. через функции.
Пример создания метра:
Для людей из мира C# даю аналог:
Пусть вас не смущает что аналог не компилируется, это нормально, т.к. приватный класс в теле метода смущает компилятор. А вот в F# вы можете вернуть анонимный класс через интерфейс.
Основное на что надо обратить внимание — мы кидаем в шину сообщение что надо подвинуть измеритель, который определяется через MeterId.
Аналогично поступаем с IMetricsAdapter, но т.к. методов у него много приведу один:
При запросе на создание таймера мы отправляем в шину сообщение о создании, а вызывающему возвращаем результат метода createMeter с аргументами evtStream и cmd.MeterId.
Результат её, как видно выше — IMetricsMeter.
После этого создадим расширение для ActorSystem, чтобы можно было вызывать наш IMetricsAdapter откуда угодно:
Нам понадобятся два актора:
Сразу сообразим ApiController, он тривиален:
Далее объявим функцию актора, который будет считывать все MetricsMessage из EventStream и что-то с ними делать. В функцию внедрим зависимость IMetrics через аргументы, внутри создадим кэши для всех метрик через обычные Dictionary.
Почему не ConcurrentDictionary, спросите Вы? А потому что актор обрабатывает сообщения по очереди. Чтобы словить внутри актора race condition, надо целенаправленно стрелять себе в ногу.
Краткий смысл — объявили внутреннее состояние в виде словарей разных метрик, объявили функцию обработки сообщения MetricsMessage, подписались на MetricsMessage и вернули рекурсивную функцию обработки сообщения из мейлбокса.
Сообщения для работы с метриками обрабатывается так:
Так же нам понадобится актор, который поднимает Owin хост с контроллером выше.
Для этого напишем функцию, которая принимает зависимость в виде конфига и IDependencyResolver. Чтобы не завалиться на старте, актор сам себе посылает сообщение, которое инициирует возможный Dispose() старого API и создание нового. И опять таки, т.к. актор внутри себя синхронен, мы можем использовать mutable state.
Так же мы кидаем метод api.Dispose в отложенные задачи при окончательной остановке актора с помощью mailbox.Defer. А для начального состояния переменной api используем заглушку через object expression, которое конструирует пустой IDisposable объект.
Смысл задачи — сделать обёртку для логгера из Akka.Net (он представлен через интерфейс ILoggingAdapter), которую можно будет использовать для замера времени операции и типизированного заноса инфы (не просто стринги, а внятные бизнесовые случаи).
Вся типизация логгера заключена в одном union.
А сам логгер будет работать по такому интерфейсу:
Создавать его будем через обычный класс:
Возможно Вы спросите, почему обычный Dictionary? Как было сказано выше, данный LogBuilder предназначен для использования внутри актора при обработке одной операции. Нет смысла использовать конкурентную структуру данных.
Приведу пример методов реализации интерфейса:
Самое интересное это логика работы OnOperationCompleted():
Самая магическая часть, которая позволит нам писать простые функции обработки сообщений без бойлерплейта, который несёт за собой тотальное логирование.
Чего мы хотим добиться? Для начала мы хотим залогировать:
Сделать всё выше перечисленное нам помогут Linq.Expressions. Как сделать это через QuotationExpressions из F# я не знаю, т.к. не нашёл простого способа скомпилировать их. Буду рад, если кто предложит варианты.
И так, для начала объявим пару вспомогательных типов и один метод:
Expr — это выражение, которое содержит Action от мейлбокса (на случай если надо наплодить детей, остановить себя или детей и вообще), обрабатываемого сообщения и логгера (если надо делать с ним какие-то особые действия).
Wrap.Handler(Expr) — позволит нам писать в него обычные F# выражения вида «fun mb msg log -> ()», а на выходе получать Linq.Expressions.
toExprName — метод, который получает название метода, если выражение является вызовом метода (MethodCallExpression) или просто пытается привести наше выражение к строке.
Для выражения вида «fun mb msg log -> handleMsg msg» — toExprName вернёт «handleMsg».
Теперь напишем обёртку для создания функциональных акторов. Начало объявления выглядит так:
На вход мы будем подавать только handler, т.к. mailbox потом докинет сама Akka (partial application).
С помощью написанного нами расширения к ActorSystem получим экземпляр IMetricsAdapter в значение metrics. Так же получим логгер Akka в значение logger.
Затем мы создадим для данного актора все необходимые метрики и тут же ими воспользуемся:
Как видите, мы увеличиваем значение instanceCounter и закладываем уменьшение этого счётчика на остановке актора.
Нам понадобятся ещё пара методов, которые будут заполнять известные нам параметры в логгер и дёргать нужные метрики.
В этом куске кода мы кидаем в логгер название операции, вызываем завершение её логирования, закидываем в метрику таймеров время операции, а в метрику мессаджей — тип сообщения:
В обработке исключений внутри актора нам поможет следующий метод:
Осталось немного чтобы всё заработало. Свяжем всё вместе через обёртку над обработчиком:
wrapHandler обладает сложной сигнатурой. На языке C# это выглядело бы так:
При этом на все остальные типы никаких ограничений нет.
По смыслу wrapHandler должен на выходе дать функцию, которая получает TMsg и выдаёт TResults. Порядок действий в этой функции будет следующий:
Для преобразования Expression в Action и подачи в каждое действие актора нового экземпляра логгера сделаем ещё одну вспомогательную функцию:
В ней мы как раз получаем наш Expression, компилим его и подаём в wrapHandler выше, вместе с мейлбоксом и функцией на получение нового LogBuilder().
Сигнатура данного метода так же непростая. На C# это выглядело бы так:
Ограничений на TMsg всё ещё нет.
Осталось только создать рекурсивную функцию :)
Вот это выражение «wrapExpr handler mailbox akkaLogger», как видно из объяснения выше, возвращает Action, т.е. метод, в который можно подать любой тип на вход и получить unit (void в c#).
Дописав в конце выражения «msg» мы кидаем в эту функцию аргумент msg и выполняем наше действие над полученным сообщением.
На этом мы закончили с кодированием нашей задачи и перейдём к примерам!
Чтобы это всё работало необязательно писать много кода.
Можно вообще писать исключительно обработчики сообщений без знания о том, что нам нужны мейлбоксы, логгеры или обработка ошибок.
Простой случай может выглядеть так:
А чтобы завернуть эту функцию в loggerActor и получить все плюшки ради которых мы так старались можно написать так:
Если у вас сложная логика и нужен доступ к мейлбоксу и логеру:
EntryPoint самой программы, создание ActorSystem, поднятие метрик и акторов можно посмотреть под спойлером, там ничего примечательного нет.
Самое главное — метрики!
Если во время работы зайти по ссылке localhost:10001/metrics, увидим достаточно большой json, в котором будет много информации. Приведу кусок для функции waitProcess:
Из него можно узнать, что:
В консоли будет примерно следующее.
В данной статье много кода и, скорее всего, мало пояснений (отвечу в коментах, если что непонятно), но это потому что статья призвана показать решение нескольких рутинных задач из реального проекта.
Возможно кому-то пригодится, тем более этот код был изначально написан для акторов на C#, так что при желании можно всё это перенести (дам хинт, можно сделать свою версию Receive() с теми же экспрешнами внутри).
Рекомендую изучить F# тем кто занимается моделированием сложных доменных моделей, т.к. его система типов намного богаче, отсутствие null и проектирование в типах позволяет сделать модель устойчивой к ошибкам программиста.
Репозиторий с примером лежит тут.
Спасибо за внимание!
Для тех кто не знаком с F#, но знаком с C#, рекомендую наисвежайшую статью от Microsoft.
Она поможет Вам испытывать меньше WTF моментов при прочтении, т.к. моя статья не туториал к синтаксису.
Контекст задачи
Есть сервис, написанный на Akka.NET, он вываливает в разные текстовые логи кучу инфы. Отдел эксплуатации грепает эти логи, жарит по ним регекспами, чтобы узнать о кол-ве ошибок (бизнесовых и не очень), о кол-ве входящих в сервис сообщений и кол-ве исходящих. Далее эта информация заливается в ElasticDB, InfluxDB и показывается в Grafana и Kibana в разных срезах и агрегациях.
Звучит сложно, да и парсить текстовые логи сервиса, который генерит несколько десятков ГБ текстового мусора в день — занятие неблагодарное. Поэтому встала задача — сервис должен быть способен поднять ендпоинт, который можно дёрнуть и получить сразу всю инфу о нём.
Решать задачу будем так:
- Напишем доменную модель для метрик
- Замапим доменную модель метрик на реализацию App.Metrics и поднимем апишечку
- Сделаем структурированный доменный логгер, который натянем на внутренний логгер Akka
- Сделаем обёртку для функциональных акторов, которая спрячет работу с метриками и логгером
- Соберём всё вместе и запустим
Доменная модель для метрик
В App.Metrics есть 6 типов представлений:
- Counters
- Apdex
- Gauges
- Histograms
- Meters
- Timers
В первой итерации нам вполне хватит счётчиков, таймеров и… метров :)
В начале опишем типы и интерфейсы (все приводить не буду, можно посмотреть в репозитории, ссылка в конце).
Так же условимся, что все наши сообщения для метрик будут приходить к особому актору (его мы определим позже) через EventStream (шина сообщений в самой Akka.Net).
Например таймер, который должен уметь замерить некоторое количество времени для какого-то объекта:
type IMetricsTimer =
abstract member Measure : Amount -> unit
abstract member Measure : Amount * Item -> unit
Или счётчик, который должен уметь увеличиваться/уменьшаться как с указанием количества, так и без:
type IMetricsCounter =
abstract member Decrement : unit -> unit
abstract member Decrement : Amount -> unit
abstract member Decrement : Amount * Item -> unit
abstract member Increment : unit -> unit
abstract member Increment : Amount -> unit
abstract member Increment : Amount * Item -> unit
И пара примеров команд для шины:
type DecrementCounterCommand =
{ CounterId : CounterId
DecrementAmount : Amount
Item : Item }
type CreateCounterCommand =
{ CounterId : CounterId
Context : ContextName
Name : MetricName
MeasurementUnit : MeasurementUnit
ReportItemPercentages : bool
ReportSetItems : bool
ResetOnReporting : bool }
Самое главное — определим возможные сообщения, которые могут ходить по шине, и на которые будет реагировать наш метрик-актор. Для этого воспользуемся Discriminated Union:
type MetricsMessage =
| DecrementCounter of DecrementCounterCommand
| IncrementCounter of IncrementCounterCommand
| MarkMeter of MarkMeterCommand
| MeasureTime of MeasureTimeCommand
| CreateCounter of CreateCounterCommand
| CreateMeter of CreateMeterCommand
| CreateTimer of CreateTimerCommand
Теперь надо реализовать интерфейсы и на этом закончить первый пункт. Реализовывать мы их будем в функциональном стиле, т.е. через функции.
Пример создания метра:
let private createMeter (evtStream: EventStream) meterId =
{ new IMetricsMeter with
member this.Mark amount =
this.Mark (amount, Item None)
member this.Mark item =
this.Mark (Amount 1L, item)
member this.Mark (amount, item) =
evtStream.Publish <| MarkMeter { MeterId = meterId; Amount = amount; Item = item }
Для людей из мира C# даю аналог:
private IMetricsMeter createMeter(EventStream evtStream, MeterId meterId)
{
private class TempClass : IMetricsMeter
{
public void Mark(long amount)
{
Mark(amount, "");
}
public void Mark(string item)
{
Mark(1, item);
}
public void Mark(long amount, string item)
{
evtStream.Publish(new MarkMeter {...});//omitted
}
}
return new TempClass();
}
Пусть вас не смущает что аналог не компилируется, это нормально, т.к. приватный класс в теле метода смущает компилятор. А вот в F# вы можете вернуть анонимный класс через интерфейс.
Основное на что надо обратить внимание — мы кидаем в шину сообщение что надо подвинуть измеритель, который определяется через MeterId.
Аналогично поступаем с IMetricsAdapter, но т.к. методов у него много приведу один:
member this.CreateMeter (name, measureUnit, rateUnit) =
let cmd =
{ MeterId = MeterId (toId name)
Context = context
Name = name
MeasurementUnit = measureUnit
RateUnit = rateUnit }
evtStream.Publish <| CreateMeter cmd
createMeter evtStream cmd.MeterId
При запросе на создание таймера мы отправляем в шину сообщение о создании, а вызывающему возвращаем результат метода createMeter с аргументами evtStream и cmd.MeterId.
Результат её, как видно выше — IMetricsMeter.
После этого создадим расширение для ActorSystem, чтобы можно было вызывать наш IMetricsAdapter откуда угодно:
type IActorContext with
member x.GetMetricsProducer context =
createAdapter x.System.EventStream context
Акторы для метрик и апишечка
Нам понадобятся два актора:
- Первый будет слушать шину на наличие в ней MetricsMessage и создавать/писать в метрики.
- Второй актор будет держать WebApi с одним методом, который будет отгружать по GET запросу всю собранную инфу.
Сразу сообразим ApiController, он тривиален:
type public MetricController(metrics: IMetrics) =
inherit ApiController()
[<HttpGet>]
[<Route("metrics")>]
member __.GetMetrics() =
__.Ok(metrics.Snapshot.Get())
Далее объявим функцию актора, который будет считывать все MetricsMessage из EventStream и что-то с ними делать. В функцию внедрим зависимость IMetrics через аргументы, внутри создадим кэши для всех метрик через обычные Dictionary.
Почему не ConcurrentDictionary, спросите Вы? А потому что актор обрабатывает сообщения по очереди. Чтобы словить внутри актора race condition, надо целенаправленно стрелять себе в ногу.
let createRecorder (metrics: IMetrics) (mailbox: Actor<_>) =
let self = mailbox.Self
let counters = new Dictionary<CounterId, ICounter>()
let meters = new Dictionary<MeterId, IMeter>()
let timers = new Dictionary<TimerId, ITimer * TimeUnit>()
//Часть кода для мапинга пропущена...
let handle = function
| DecrementCounter evt ->
match counters.TryGetValue evt.CounterId with
| (false, _) -> ()
| (true, c) ->
let (Amount am) = evt.DecrementAmount
match evt.Item with
| Item (Some i) -> c.Decrement (i, am)
| Item None -> c.Decrement (am)
| CreateMeter cmd ->
match meters.TryGetValue cmd.MeterId with
| (false, _) ->
let (ContextName ctxName) = cmd.Context
let (MetricName name) = cmd.Name
let options = new MeterOptions(
Context = ctxName,
MeasurementUnit = toUnit cmd.MeasurementUnit,
Name = name,
RateUnit = toTimeUnit cmd.RateUnit)
let m = metrics.Provider.Meter.Instance options
meters.Add(cmd.MeterId, m)
| _ -> ()
//Остальные случае в этом match пропущены
subscribe typedefof<MetricsMessage> self mailbox.Context.System.EventStream |> ignore
let rec loop() = actor {
let! msg = mailbox.Receive()
handle msg
return! loop()
}
loop()
Краткий смысл — объявили внутреннее состояние в виде словарей разных метрик, объявили функцию обработки сообщения MetricsMessage, подписались на MetricsMessage и вернули рекурсивную функцию обработки сообщения из мейлбокса.
Сообщения для работы с метриками обрабатывается так:
- Смотрим какое именно сообщение (через паттерн матчинг)
- Ищем в соответствующем словаре метрику с этим Id (для этого есть прекрасный паттерн через пару (bool, obj), который возвращает TryGetValue в F#
- Если это запрос на создание метрики и её нет — создаём, добавляем в словарь
- Если это запрос на использование метрики и она есть — используем
Так же нам понадобится актор, который поднимает Owin хост с контроллером выше.
Для этого напишем функцию, которая принимает зависимость в виде конфига и IDependencyResolver. Чтобы не завалиться на старте, актор сам себе посылает сообщение, которое инициирует возможный Dispose() старого API и создание нового. И опять таки, т.к. актор внутри себя синхронен, мы можем использовать mutable state.
type IMetricApiConfig =
abstract member Host: string
abstract member Port: int
type ApiMessage = ReStartApiMessage
let createReader (config: IMetricApiConfig) resolver (mailbox: Actor<_>) =
let startUp (app: IAppBuilder) =
let httpConfig = new HttpConfiguration(DependencyResolver = resolver)
httpConfig.Formatters.JsonFormatter.SerializerSettings.Converters.Add(new MetricDataConverter())
httpConfig.Formatters.JsonFormatter.Indent <- true
httpConfig.MapHttpAttributeRoutes()
httpConfig.EnsureInitialized()
app.UseWebApi(httpConfig) |> ignore
let uri = sprintf "http://%s:%d" config.Host config.Port
let mutable api = {new IDisposable with member this.Dispose() = ()}
let handleMsg (ReStartApiMessage) =
api.Dispose()
api <- WebApp.Start(uri, startUp)
mailbox.Defer api.Dispose
mailbox.Self <! ReStartApiMessage
let rec loop() = actor {
let! msg = mailbox.Receive()
handleMsg msg
return! loop()
}
loop()
Так же мы кидаем метод api.Dispose в отложенные задачи при окончательной остановке актора с помощью mailbox.Defer. А для начального состояния переменной api используем заглушку через object expression, которое конструирует пустой IDisposable объект.
Делаем структурированный логгер
Смысл задачи — сделать обёртку для логгера из Akka.Net (он представлен через интерфейс ILoggingAdapter), которую можно будет использовать для замера времени операции и типизированного заноса инфы (не просто стринги, а внятные бизнесовые случаи).
Вся типизация логгера заключена в одном union.
type Fragment =
| OperationName of string
| OperationDuration of TimeSpan
| TotalDuration of TimeSpan
| ReceivedOn of DateTimeOffset
| MessageType of Type
| Exception of exn
А сам логгер будет работать по такому интерфейсу:
type ILogBuilder =
abstract OnOperationBegin: unit -> unit
abstract OnOperationCompleted: unit -> unit
abstract Set: LogLevel -> unit
abstract Set: Fragment -> unit
abstract Fail: exn -> unit
abstract Supress: unit -> unit
abstract TryGet: Fragment -> Fragment option
Создавать его будем через обычный класс:
type LogBuilder(logger: ILoggingAdapter) =
let logFragments = new Dictionary<System.Type, Fragment>()
let stopwatch = new Stopwatch()
let mutable logLevel = LogLevel.DebugLevel
interface ILogBuilder with
//Реализация интерфейса
Возможно Вы спросите, почему обычный Dictionary? Как было сказано выше, данный LogBuilder предназначен для использования внутри актора при обработке одной операции. Нет смысла использовать конкурентную структуру данных.
Приведу пример методов реализации интерфейса:
let set fragment =
logFragments.[fragment.GetType()] <- fragment
member x.OnOperationBegin() =
stopwatch.Start()
member this.Fail e =
logLevel <- LogLevel.ErrorLevel
set <| Exception e
member this.OnOperationCompleted() =
stopwatch.Stop()
set <| OperationDuration stopwatch.Elapsed
match tryGet <| ReceivedOn DateTimeOffset.MinValue with
| Some (ReceivedOn date) -> set <| TotalDuration (DateTimeOffset.UtcNow - date)
| _ -> ()
match status with
| Active ->
match (logLevel) with
| LogLevel.DebugLevel -> logger.Debug(message())
| LogLevel.InfoLevel -> logger.Info(message())
| LogLevel.WarningLevel -> logger.Warning(message())
| LogLevel.ErrorLevel -> logger.Error(message())
| x -> failwith(sprintf "Log level %s is not supported" <| string x)
| Supressed -> ()
Самое интересное это логика работы OnOperationCompleted():
- Останавливаем таймер и пишем прошедшее время в логгер через фрагмент OperationDuration
- Если у нас есть в логе фрагмент ReceivedOn (который в моей модели означает время прихода сообщения в сервис ВООБЩЕ), то пишем в лог общее время нахождения сообщения в сервисе через TotalDuration
- Если логгер не был выключен (через метод Supress()), то пишем инфу в Akka логгер через метод message(), который я не привёл, но он просто как-то собирает все Fragments в строку с учётом типов сообщений
Создаём обёртку для функциональных акторов
Самая магическая часть, которая позволит нам писать простые функции обработки сообщений без бойлерплейта, который несёт за собой тотальное логирование.
Чего мы хотим добиться? Для начала мы хотим залогировать:
- Что мы делаем. В нашем случае — это название операции, т.к. функциональные акторы имеют один тип FuncActor
- Тип обрабатываемого сообщения
- Сколько таких функций (по сути акторов) живёт в системе
- Показать время, которое ушло на выполнение операции
- Показать суммарное время, которое прошло с момента получения данного сообщения на входе в сервис
- Залогировать ошибку, если она возникла, особым образом
- иметь возможность писать простые функции обработки сообщений, не думая обо всём выше
Сделать всё выше перечисленное нам помогут Linq.Expressions. Как сделать это через QuotationExpressions из F# я не знаю, т.к. не нашёл простого способа скомпилировать их. Буду рад, если кто предложит варианты.
И так, для начала объявим пару вспомогательных типов и один метод:
type Expr<'T,'TLog when 'TLog :> ILogBuilder> = Expression<System.Action<Actor<'T>, 'T, 'TLog>>
type Wrap =
static member Handler(e: Expression<System.Action<Actor<'T>, 'T, #ILogBuilder>>) = e
let toExprName (expr: Expr<_,_>) =
match expr.Body with
| :? MethodCallExpression as methodCall -> methodCall.Method.Name
| x -> x.ToString()
Expr — это выражение, которое содержит Action от мейлбокса (на случай если надо наплодить детей, остановить себя или детей и вообще), обрабатываемого сообщения и логгера (если надо делать с ним какие-то особые действия).
Wrap.Handler(Expr) — позволит нам писать в него обычные F# выражения вида «fun mb msg log -> ()», а на выходе получать Linq.Expressions.
toExprName — метод, который получает название метода, если выражение является вызовом метода (MethodCallExpression) или просто пытается привести наше выражение к строке.
Для выражения вида «fun mb msg log -> handleMsg msg» — toExprName вернёт «handleMsg».
Теперь напишем обёртку для создания функциональных акторов. Начало объявления выглядит так:
let loggerActor<'TMsg> (handler: Expr<'TMsg,_>) (mailbox: Actor<'TMsg>) =
let exprName = handler |> toExprName
let metrics = mailbox.Context.GetMetricsProducer (ContextName exprName)
let logger = mailbox.Log.Value
На вход мы будем подавать только handler, т.к. mailbox потом докинет сама Akka (partial application).
С помощью написанного нами расширения к ActorSystem получим экземпляр IMetricsAdapter в значение metrics. Так же получим логгер Akka в значение logger.
Затем мы создадим для данного актора все необходимые метрики и тут же ими воспользуемся:
let errorMeter = metrics.CreateMeter (MetricName "Error Rate", Errors)
let instanceCounter = metrics.CreateCounter (MetricName "Instances Counter", Items)
let messagesMeter = metrics.CreateMeter (MetricName "Message Processing Rate", Items)
let operationsTimer = metrics.CreateTimer (MetricName "Operation Durations", Requests, MilliSeconds, MilliSeconds)
instanceCounter.Increment()
mailbox.Defer instanceCounter.Decrement
Как видите, мы увеличиваем значение instanceCounter и закладываем уменьшение этого счётчика на остановке актора.
Нам понадобятся ещё пара методов, которые будут заполнять известные нам параметры в логгер и дёргать нужные метрики.
В этом куске кода мы кидаем в логгер название операции, вызываем завершение её логирования, закидываем в метрику таймеров время операции, а в метрику мессаджей — тип сообщения:
let completeOperation (msgType: Type) (logger: #ILogBuilder) =
logger.Set (OperationName exprName)
logger.OnOperationCompleted()
match logger.TryGet(OperationDuration TimeSpan.Zero) with
| Some(OperationDuration dur) ->
operationsTimer.Measure(Amount (int64 dur.TotalMilliseconds), Item (Some exprName))
| _ -> ()
messagesMeter.Mark(Item (Some msgType.Name))
В обработке исключений внутри актора нам поможет следующий метод:
let registerExn (msgType: Type) e (logger: #ILogBuilder) =
errorMeter.Mark(Item (Some msgType.Name))
logger.Fail e
Осталось немного чтобы всё заработало. Свяжем всё вместе через обёртку над обработчиком:
let wrapHandler handler mb (logBuilder: unit -> #ILogBuilder) =
let innherHandler mb msg =
let logger = logBuilder()
let msgType = msg.GetType()
logger.Set (MessageType msgType)
try
try
logger.OnOperationBegin()
handler mb msg logger
with
| e -> registerExn msgType e logger; reraise()
finally
completeOperation msgType logger
innherHandler mb
wrapHandler обладает сложной сигнатурой. На языке C# это выглядело бы так:
Func<TMsg, TResult> wrapHandler<Tmsg, TResult, TLogBuilder, TMailbox>(
Func<TMailbox, TMsg, TLogBuilder, TResult> handler,
TMailbox mb,
Func<TLogBuilder> logBuilder)
where TLogBuilder: ILogBuilder
При этом на все остальные типы никаких ограничений нет.
По смыслу wrapHandler должен на выходе дать функцию, которая получает TMsg и выдаёт TResults. Порядок действий в этой функции будет следующий:
- Начинаем логирование операции
- Выполняем операцию
- В случае возникновения необработанного в handler исключения, логируем его и пробрасываем выше (родителю данного актора)
- Завершаем логирование
Для преобразования Expression в Action и подачи в каждое действие актора нового экземпляра логгера сделаем ещё одну вспомогательную функцию:
let wrapExpr (expr: Expr<_,_>) mailbox logger =
let action = expr.Compile()
wrapHandler
(fun mb msg log -> action.Invoke(mailbox, msg, log))
mailbox
(fun () -> new LogBuilder(logger))
В ней мы как раз получаем наш Expression, компилим его и подаём в wrapHandler выше, вместе с мейлбоксом и функцией на получение нового LogBuilder().
Сигнатура данного метода так же непростая. На C# это выглядело бы так:
Action<TMsg> wrapExpr<TMsg>(
Expr<TMsg, LogBuilder> expr,
Actor<TMsg> mb,
ILoggingAdapterlogger)
Ограничений на TMsg всё ещё нет.
Осталось только создать рекурсивную функцию :)
let rec loop() =
actor {
let! msg = mailbox.Receive()
wrapExpr handler mailbox akkaLogger msg
return! loop()
}
loop()
Вот это выражение «wrapExpr handler mailbox akkaLogger», как видно из объяснения выше, возвращает Action, т.е. метод, в который можно подать любой тип на вход и получить unit (void в c#).
Дописав в конце выражения «msg» мы кидаем в эту функцию аргумент msg и выполняем наше действие над полученным сообщением.
На этом мы закончили с кодированием нашей задачи и перейдём к примерам!
Как всё это запустить?
Чтобы это всё работало необязательно писать много кода.
Можно вообще писать исключительно обработчики сообщений без знания о том, что нам нужны мейлбоксы, логгеры или обработка ошибок.
Простой случай может выглядеть так:
type ActorMessages =
| Wait of int
| Stop
let waitProcess = function
| Wait d -> Async.Sleep d |> Async.RunSynchronously
| Stop -> ()
А чтобы завернуть эту функцию в loggerActor и получить все плюшки ради которых мы так старались можно написать так:
let spawnWaitWorker() =
loggerActor <| Wrap.Handler(fun mb msg log -> waitProcess msg)
let waitWorker = spawn system "worker-wait" <| spawnWaitWorker()
waitWorker <! Wait 1000 //Будет залогировано действие длительностью ~1000мс
waitWorker <! Wait 500
Если у вас сложная логика и нужен доступ к мейлбоксу и логеру:
let failOrStopProcess (mailbox: Actor<_>) msg (log: ILogBuilder) =
try
match msg with
| Wait d -> failwith "can't wait!"
| Stop -> mailbox.Context.Stop mailbox.Self
with
| e -> log.Fail e
let spawnFailOrStopWorker() =
loggerActor <| Wrap.Handler(fun mb msg log -> failOrStopProcess mb msg log)
let failOrStopWorker = spawn system "worker-vocal" <| spawnFailOrStopWorker()
failOrStopWorker <! Wait 1000 //Будет залогирована ошибка "can't wait!"
failOrStopWorker <! Wait 500 //Будет залогирована ошибка "can't wait!"
failOrStopWorker <! Stop
failOrStopWorker <! Wait 500 //Данное сообщение уже уйдёт в DeadLetters
EntryPoint самой программы, создание ActorSystem, поднятие метрик и акторов можно посмотреть под спойлером, там ничего примечательного нет.
Program.fs
open Akka.FSharp
open SimpleInjector
open App.Metrics;
open Microsoft.Extensions.DependencyInjection
open SimpleInjector.Integration.WebApi
open System.Reflection
open System
open Metrics.MetricActors
open ExampleActors
let createSystem =
let configStr = System.IO.File.ReadAllText("system.json")
System.create "system-for-metrics" (Configuration.parse(configStr))
let createMetricActors system container =
let dependencyResolver = new SimpleInjectorWebApiDependencyResolver(container)
let apiConfig =
{ new IMetricApiConfig with
member x.Host = "localhost"
member x.Port = 10001 }
let metricsReaderSpawner = createReader apiConfig dependencyResolver
let metricsReader = spawn system "metrics-reader" metricsReaderSpawner
let metricsRecorderSpawner = createRecorder (container.GetInstance<IMetrics>())
let metricsRecorder = spawn system "metrics-recorder" metricsRecorderSpawner
()
type Container with
member x.AddMetrics() =
let serviceCollection = new ServiceCollection()
let entryAssemblyName = Assembly.GetEntryAssembly().GetName()
let metricsHostBuilder = serviceCollection.AddMetrics(entryAssemblyName)
serviceCollection.AddLogging() |> ignore
let provider = serviceCollection.BuildServiceProvider()
x.Register(fun () -> provider.GetRequiredService<IMetrics>())
[<EntryPoint>]
let main argv =
let container = new Container()
let system = createSystem
container.RegisterSingleton system
container.AddMetrics()
container.Verify()
createMetricActors system container
let waitWorker1 = spawn system "worker-wait1" <| spawnWaitWorker()
let waitWorker2 = spawn system "worker-wait2" <| spawnWaitWorker()
let waitWorker3 = spawn system "worker-wait3" <| spawnWaitWorker()
let waitWorker4 = spawn system "worker-wait4" <| spawnWaitWorker()
let failWorker = spawn system "worker-fail" <| spawnFailWorker()
let waitOrStopWorker = spawn system "worker-silent" <| spawnWaitOrStopWorker()
let failOrStopWorker = spawn system "worker-vocal" <| spawnFailOrStopWorker()
waitWorker1 <! Wait 1000
waitWorker2 <! Wait 500
waitWorker3 <! Wait 5000
waitWorker4 <! Wait 8000
failWorker <! Wait 5000
waitOrStopWorker <! Wait 1000
waitOrStopWorker <! Wait 500
waitOrStopWorker <! Stop
waitOrStopWorker <! Wait 500
failOrStopWorker <! Wait 1000
failOrStopWorker <! Wait 500
failOrStopWorker <! Stop
failOrStopWorker <! Wait 500
Console.ReadKey() |> ignore
0
Самое главное — метрики!
Если во время работы зайти по ссылке localhost:10001/metrics, увидим достаточно большой json, в котором будет много информации. Приведу кусок для функции waitProcess:
Скрытый текст
{
"Context": "waitProcess",
"Counters": [
{
"Name": "Instances Counter",
"Unit": "items",
"Count": 4
}
],
"Meters": [
{
"Name": "Message Processing Rate",
"Unit": "items",
"Count": 4,
"FifteenMinuteRate": 35.668327519112893,
"FiveMinuteRate": 35.01484385742755,
"Items": [
{
"Count": 4,
"FifteenMinuteRate": 0.0,
"FiveMinuteRate": 0.0,
"Item": "Wait",
"MeanRate": 13.082620551464204,
"OneMinuteRate": 0.0,
"Percent": 100.0
}
],
"MeanRate": 13.082613248856632,
"OneMinuteRate": 31.356094372926623,
"RateUnit": "min"
}
],
"Timers": [
{
"Name": "Operation Durations",
"Unit": "req",
"ActiveSessions": 0,
"Count": 4,
"DurationUnit": "ms",
"Histogram": {
"LastUserValue": "waitProcess",
"LastValue": 8001.0,
"Max": 8001.0,
"MaxUserValue": "waitProcess",
"Mean": 3927.1639786164278,
"Median": 5021.0,
"Min": 1078.0,
"MinUserValue": "waitProcess",
"Percentile75": 8001.0,
"Percentile95": 8001.0,
"Percentile98": 8001.0,
"Percentile99": 8001.0,
"Percentile999": 8001.0,
"SampleSize": 4,
"StdDev": 2932.0567172627871,
"Sum": 15190.0
},
"Rate": {
"FifteenMinuteRate": 0.00059447212531854826,
"FiveMinuteRate": 0.00058358073095712587,
"MeanRate": 0.00021824579927905906,
"OneMinuteRate": 0.00052260157288211038
}
}
]
}
Из него можно узнать, что:
- У нас сейчас активно 4 инстанса workProcess
- Они обработали 4 сообщения типа Wait
- Медианное время обработки сообщений 5021 мс
В консоли будет примерно следующее.
Заключение
В данной статье много кода и, скорее всего, мало пояснений (отвечу в коментах, если что непонятно), но это потому что статья призвана показать решение нескольких рутинных задач из реального проекта.
Возможно кому-то пригодится, тем более этот код был изначально написан для акторов на C#, так что при желании можно всё это перенести (дам хинт, можно сделать свою версию Receive() с теми же экспрешнами внутри).
Рекомендую изучить F# тем кто занимается моделированием сложных доменных моделей, т.к. его система типов намного богаче, отсутствие null и проектирование в типах позволяет сделать модель устойчивой к ошибкам программиста.
Репозиторий с примером лежит тут.
Спасибо за внимание!