Всем привет! Меня зовут Александр. Последний год тружусь фрилансером на проекте по созданию очередного маркетплейса. Мои задачи помимо всего прочего включают в себя разработку скоринговой системы продуктов, которая бы позволяла формировать выборку товаров по популярности на основе нескольких показателей. Одним из показателей являются оценки пользователей по шкале от 1 до 5. Думаю, что не станет открытием Америки то, что расчет рейтинга товара по среднему арифметическому всех оценок приводит к не совсем корректным результатам выборки. Например, при таком подходе товар "А" с одной оценкой 5 в рейтинге будет выше товара "Б", у которого сотня оценок 5 и одна оценка 4. Решение данной проблемы давно найдено - для расчета можно применить доверительный интервал биномиального распределения по методу Уилсона (Wilson Score Confidence Interval). Формула выглядит следующим образом:

Где:
p - это оценка пропорции успехов в выборке (например, количество положительных отзывов деленное на общее количество отзывов).
n - общее количество наблюдений (например, общее количество отзывов).
z - Z-счет, соответствующий выбранному уровню уверенности. Например, для уровня уверенности 95%, z примерно равно 1.96.
В статье я постараюсь показать один из способов применения этого подхода к расчету рейтинга товара, используя такие технологии как Kafka Streams и OpenSearch. Как и в предыдущих постах, я подготовил небольшой демо проект, который выложил на Github.
В качестве поискового движка я намеренно выбрал OpenSearch, а не ElasticSearch, так как именно он используется на проекте, где я сейчас тружусь. Кроме того, начиная с 20 июля 2023 г��да Yandex Cloud более не предоставляет возможности новым пользователям создавать кластеры ElasticSearch, а с 11 апреля 2024 года и вовсе прекратит предоставлять услуги по доступу к ElasticSearch.
Архитектура проекта
Проект состоит из 2-х независимых микросервисов:
backend-service
review-aggregator-service
А также инфрастукрутных компонентов:
PostgreSQL
Apache Kafka + Zookeeper
Confluent Schema Registry
Kafka Connect
Debezium Postgres Connector
Кластер OpenSearch из двух нод
Backend Service
Предоставляет REST API для:
регистрации пользователей
создания товаров
создания отзывов к товарам
получения списка пользователей
получение списка товаров
получение списка отзывов к товарам
поучение отсортированного по рейтингу списка товаров
Все сущности: AppUser, Product, ProductReview при создании сохраняются в Postgres. Схема данных в Postgres выглядит следующим образом:
CREATE TABLE IF NOT EXISTS app_users(
ID BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
USERNAME TEXT NOT NULL,
FIRST_NAME TEXT NOT NULL,
LAST_NAME TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS products(
ID BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
NAME TEXT NOT NULL,
DESCRIPTION TEXT,
PRICE NUMERIC(20, 5) NOT NULL
);
CREATE TABLE IF NOT EXISTS product_reviews(
ID BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
USER_ID BIGINT NOT NULL,
PRODUCT_ID BIGINT NOT NULL,
RATING INTEGER NOT NULL,
CREATED_AT TIMESTAMP NOT NULL,
COMMENT TEXT,
CONSTRAINT product_reviews_app_users_fk FOREIGN KEY (USER_ID) REFERENCES app_users(ID) ON DELETE NO ACTION,
CONSTRAINT product_reviews_products_fk FOREIGN KEY (PRODUCT_ID) REFERENCES products(ID) ON DELETE NO ACTION
);
CREATE UNIQUE INDEX IF NOT EXISTS product_reviews_user_id_product_id_idx ON product_reviews(USER_ID, PRODUCT_ID);Таблица product_reviews, куда сохраняются данные об отзывах обрабатывается Debezium Postgres Connector, и данные из нее отправляются в топик Kafka. За работу с Kafka отвечает Review Aggregator Service. Подробности работы Debezium Postgres Connector можно почитать в моей статье.
Backend Service получает список отсортированных по рейтингу тваров из кластера OpenSearch, используя org.opensearch.data.client.orhlc.OpenSearchRestTemplate:
Так как акцент в статье не на полнотекстовом поиске, а на ранжировании согласно оценкам пользователей, модель данных в OpenSearch очень упрощенная и выглядит следующим образом:
@Document(indexName = "products")
class OpenSearchProduct(
@Id
var id: String? = null,
@Field(type = FieldType.Text, analyzer = "russian")
var name: String? = null,
@Field(type = FieldType.Text, analyzer = "russian")
var description: String? = null,
@Field(type = FieldType.Double)
var price: BigDecimal? = null,
@Field(name = "wilson-score", type = FieldType.Double)
var wilsonScore: Double = 0.0,
@Field(type = FieldType.Object)
var ratings: Map<String, Int> = hashMapOf(
"1" to 0,
"2" to 0,
"3" to 0,
"4" to 0,
"5" to 0,
),
@Field(type = FieldType.Long, name = "rating_update_idempotency_key")
var ratingIdempotencyKey: Long? = null
)Тут стоит обратить внимание на несколько моментов:
ratingsимеет такую структуру, чтобы была возможность обновлять каждую оценку по отдельности. Это необходимо для корректного подсчета рейтинга.wilson-scoreрассчитывается скриптом через scripted updates
при каждом обновлении рейтинга товара, что дает нам возможность ускорить запрос на выборку товаров, так как поле для сортировки уже подготовлено.ratingIdempotencyKeyнеобходим, чтобы из��ежать повторной обработки запросов на обновление рейтинга из-за возможных сбоев в работе Kafka, так как при взаимодействии с внешними системами Kafka не дает гарантииexactly_once, а толькоat_least_once.
Review Aggregator Service
Собственно говоря, именно этот сервис отвечает за агрегацию данных о пользовательских отзывах к товарам и обновление записей в кластере OpenSearch.
Агрегация осуществляется с помощью Kafka Streams. Далее приведу код с небольшими пояснениями, более подробно работа Kafka Streams разобрана в моей статье "Микросервисы на основе событий с Kafka Streams и Spring Boot".
@Service
class KafkaReviewAggregatorService(
private val topicProps: TopicProps,
private val kafkaProps: KafkaProps
) {
@Autowired
fun processStreams(builder: StreamsBuilder) {
val reviewValueSerde = SpecificAvroSerde<AvroReview>()
configureSerde(reviewValueSerde, false)
val reviewKeySerde = Serdes.String()
val productRatingValueSerde = SpecificAvroSerde<AvroProductRating>()
configureSerde(productRatingValueSerde, false)
val productRatingKeySerde = Serdes.String()
val productReviews: KStream<String, AvroReview> = builder.stream(
topicProps.reviewTopic,
Consumed.with(reviewKeySerde, reviewValueSerde)
)
val aggregatedStore = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(topicProps.aggregatedReviewsStore),
Serdes.String(),
productRatingValueSerde
)
builder.addStateStore(aggregatedStore)
productReviews
.peek { key, value ->
log.info("Processing AvroReview: {}. Message Key: {}", value, key)
}
.mapValues(::toAvroProductRating)
.process(
ProcessorSupplier {
AvroReviewPunctuator(topicProps)
},
topicProps.aggregatedReviewsStore
)
.peek { key, value ->
log.info("Processing AvroProductRating: {}. Message Key: {}", value, key)
}
.to(
topicProps.aggregatedReviewsTopic,
Produced.with(productRatingKeySerde, productRatingValueSerde)
)
}
private fun configureSerde(serde: Serde<*>, isKey: Boolean) {
if (serde is SpecificAvroSerde) {
val config = hashMapOf<String, Any>(
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to kafkaProps.schemaRegistryUrl
)
serde.configure(config, isKey)
}
}
private fun toAvroProductRating(record: AvroReview): AvroProductRating =
AvroProductRating(
record.productId,
hashMapOf(
convertDigitToString(record.rating) to 1
),
record.`lsn$1`
)
private fun convertDigitToString(digit: Int): String {
return when(digit) {
1 -> "one"
2 -> "two"
3 -> "three"
4 -> "four"
5 -> "five"
else -> throw RuntimeException("Unsupported Rating: $digit")
}
}
}Тут все достаточно просто:
Создаем KStream на основе топика, куда отравляются данные об отзывах
val productReviews: KStream<String, AvroReview> = builder.stream(
topicProps.reviewTopic,
Consumed.with(reviewKeySerde, reviewValueSerde)
)Создаем KeyValue хранилище, которое будет использоваться в агрегации данных.
val aggregatedStore = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(topicProps.aggregatedReviewsStore),
Serdes.String(),
productRatingValueSerde
)
builder.addStateStore(aggregatedStore)Мапим значения в стриме в удобную для агрегации сущность
productReviews
.peek { key, value ->
log.info("Processing AvroReview: {}. Message Key: {}", value, key)
}
.mapValues(::toAvroProductRating) AvroProductRating генерируется на основе .avsc файла с помощью Gradle плагина:
{
"type": "record",
"name": "AvroProductRating",
"namespace": "ru.aasmc.avro",
"fields": [
{
"name": "productId",
"type": "long"
},
{
"name": "ratings",
"type": {
"type": "map",
"values": "int"
}
},
{
"name": "idempotencyKey",
"type": [
"null",
"long"
],
"default": null
}
]
}Агрегируем данные в кастомном процесоре (более подробно будет дальше по тексту):
.process(
ProcessorSupplier {
AvroReviewPunctuator(topicProps)
},
topicProps.aggregatedReviewsStore
)Отправляем полученные данные в новый топик Kafka:
.to(
topicProps.aggregatedReviewsTopic,
Produced.with(productRatingKeySerde, productRatingValueSerde)
)При работе с Kafka Streams можно использовать механизмы агрегации данных предоставляемые фреймворком, однако тут есть один несколько нюансов:
По умолчанию в результирующий топик будут попадать в том числе и промежуточные данные, а в нашем случае это увеличит нагрузку на кластер OpenSearch, даже если мы позаботимся о дедуплицировании.
При использовании оператора
suppress, который говорит Kafka Streams отправлять только конечный результат аггрегации в результирующий топик, результат будет отправлен при закрытии окна агрегации, а оно фактически закрывается при создании нового окна после поступления новых записей. Это потенциально может стать проблемой. Представим сценарий, по которому мы хотим обновлять рейтинг товара в кластере OpenSearch каждые 30 минут. Соответственно выставляем окно агрегации в 30 минут. В течение этого времени товару поставили 10 отзывов. После этого в течение суток не было ни одной оценки, а через 24 часа поставили еще 5 оценок. При использовании оператораsuppressданные в результирующий топик попадут только через 24 часа, а не через 30 минут, как мы этого хотели, из-за того, что окно агрегации закроется при открытии нового окна. Чтобы решить эту проблему, приходится писать свою логику агрегации и кастомный процессор с помощью Processor API.
Кастомный процессор выглядит так:
class AvroReviewPunctuator(
private val topicProps: TopicProps
): Processor<String, AvroProductRating, String, AvroProductRating> {
private lateinit var context: ProcessorContext<String, AvroProductRating>
private lateinit var store: KeyValueStore<String, AvroProductRating>
override fun init(context: ProcessorContext<String, AvroProductRating>) {
this.context = context
this.store = context.getStateStore(topicProps.aggregatedReviewsStore) as KeyValueStore<String, AvroProductRating>
context.schedule(topicProps.punctuationInterval, PunctuationType.WALL_CLOCK_TIME, this::punctuate)
}
private fun punctuate(to: Long) {
log.debug("Enter punctuate method.")
store.all().forEachRemaining { entry ->
log.info("Sending new aggregated value from punctuate: {}", entry.value)
context.forward(Record(entry.key, entry.value, to))
store.delete(entry.key)
}
}
override fun process(record: Record<String, AvroProductRating>) {
val key = record.key()
val newRating = record.value()
log.debug("Processing record with value: {}", newRating)
val currentAggregatedRating = store.get(key)
if (currentAggregatedRating != null) {
val currentRatings = currentAggregatedRating.ratings
newRating.ratings.forEach { (ratingKey, ratingValue) ->
currentRatings.merge(ratingKey, ratingValue, Int::plus)
}
val idempotencyKey = maxOf(newRating.idempotencyKey, currentAggregatedRating.idempotencyKey)
store.put(key, AvroProductRating(newRating.productId, currentRatings, idempotencyKey))
} else {
store.put(key, newRating)
}
}
}Метод
initотрабатывает при создании процессора. Тут мы инициализируем ранее подготовленное и добавленное в топологию KeyValue хранилище:
this.store = context.getStateStore(topicProps.aggregatedReviewsStore) as KeyValueStore<String, AvroProductRating>Настраиваем планировщик, который будет отрабатывать 1 раз за указанный интервал времени
topicProps.punctuationInterval, который мы храним в конфигурации.
context.schedule(topicProps.punctuationInterval, PunctuationType.WALL_CLOCK_TIME, this::punctuate)В методе
punctuate, происходит логика отправки данных в процессор ниже по стриму, и очистке хранящихся в KeyValue хранилище данных:
private fun punctuate(to: Long) {
log.debug("Enter punctuate method.")
store.all().forEachRemaining { entry ->
log.info("Sending new aggregated value from punctuate: {}", entry.value)
context.forward(Record(entry.key, entry.value, to))
store.delete(entry.key)
}
}Метод
process(record: Record<String, AvroProductRating>)отрабатывает на каждую запись из вышестоящего стрима. Тут мы получаем данные из KeyValue хранилища, производим слияние рейтингов и обновляем запись в хранилище. Стоит обратить внимание, что в качествеidempotencyKeyиспользуется LSN (Log Sequence Number) отправляемый Debezium Postgres Connector в Kafka при обработке таблицыproduct_reviews. Так как LSN - монотонно увеличивающееся значение, в агрегируемой сущности мы проставляем максимальное значение из старой и текущей записи. В дальнейшем LSN будет использован для обеспечения идемпотентного обновления документа в кластере OpenSearch.
override fun process(record: Record<String, AvroProductRating>) {
val key = record.key()
val newRating = record.value()
log.debug("Processing record with value: {}", newRating)
val currentAggregatedRating = store.get(key)
if (currentAggregatedRating != null) {
val currentRatings = currentAggregatedRating.ratings
newRating.ratings.forEach { (ratingKey, ratingValue) ->
currentRatings.merge(ratingKey, ratingValue, Int::plus)
}
val idempotencyKey = maxOf(newRating.idempotencyKey, currentAggregatedRating.idempotencyKey)
store.put(key, AvroProductRating(newRating.productId, currentRatings, idempotencyKey))
} else {
store.put(key, newRating)
}
}Обновление данных в OpenSearch
Kafka Consumer вычитывает записи из топика с агрегированными данными и обновляет рейтинг товара в OpenSearch с помощью painless скрипта:
private const val SCRIPT_SOURCE = """
if (ctx._source['rating_update_idempotency_key'] == null ||
ctx._source['rating_update_idempotency_key'] < params.idempotency_key) {
if (params.containsKey('one')) {
ctx._source.ratings['1'] = (ctx._source.ratings['1'] ?: 0) + params.one;
}
if (params.containsKey('two')) {
ctx._source.ratings['2'] = (ctx._source.ratings['2'] ?: 0) + params.two;
}
if (params.containsKey('three')) {
ctx._source.ratings['3'] = (ctx._source.ratings['3'] ?: 0) + params.three;
}
if (params.containsKey('four')) {
ctx._source.ratings['4'] = (ctx._source.ratings['4'] ?: 0) + params.four;
}
if (params.containsKey('five')) {
ctx._source.ratings['5'] = (ctx._source.ratings['5'] ?: 0) + params.five;
}
long s1 = ctx._source.ratings.containsKey('1') ? ctx._source.ratings['1'] : 0;
long s2 = ctx._source.ratings.containsKey('2') ? ctx._source.ratings['2'] : 0;
long s3 = ctx._source.ratings.containsKey('3') ? ctx._source.ratings['3'] : 0;
long s4 = ctx._source.ratings.containsKey('4') ? ctx._source.ratings['4'] : 0;
long s5 = ctx._source.ratings.containsKey('5') ? ctx._source.ratings['5'] : 0;
double p = (s1 * 0.0) + (s2 * 0.25) + (s3 * 0.5) + (s4 * 0.75) + (s5 * 1.0);
double n = (s1 * 1.0) + (s2 * 0.75) + (s3 * 0.5) + (s4 * 0.25) + (s5 * 0.0);
double wilsonScore = p + n > 0 ? ((p + 1.9208) / (p + n) - 1.96 * Math.sqrt((p * n) / (p + n) + 0.9604) / (p + n)) / (1 + 3.8416 / (p + n)) : 0;
ctx._source['wilson-score'] = wilsonScore;
ctx._source['rating_update_idempotency_key'] = params.idempotency_key;
}
"""
private const val IDEMPOTENCY_KEY = "idempotency_key"
@Service
class OpenSearchKafkaConsumer(
private val openSearchRestTemplate: OpenSearchRestTemplate,
private val props: OpenSearchProps
) {
@KafkaListener(topics = ["\${topicprops.aggregatedReviewsTopic}"], concurrency = "3")
fun consumeAndSend(record: AvroProductRating) {
updateProductInOpenSearch(record)
log.info("Successfully updated rating of product with id = {}", record.productId)
}
private fun updateProductInOpenSearch(record: AvroProductRating) {
val params = mutableMapOf<String, Any>()
record.ratings.forEach { (key, value) ->
params[key] = value
}
params[IDEMPOTENCY_KEY] = record.idempotencyKey
val updateQuery = UpdateQuery.builder(record.productId.toString())
.withScriptType(ScriptType.INLINE)
.withLang("painless")
.withScript(SCRIPT_SOURCE)
.withParams(params)
.build()
openSearchRestTemplate.update(updateQuery, IndexCoordinates.of(props.productIndex))
}
}Разберем скрипт:
ctx._sourceпозволяет получить доступ к полям обновляемого документа.params- параметры, передаваемые в скриптОбновление производим только в том случае, если
rating_update_idempotency_key==null- это значит что мы обновляем рейтинг впервые, или жеlsnтовара, который используется в качестве ключа идемпотентности, меньше того, что пришел в параметрах скрипта, то есть мы обновляем рейтинг на основе новых оценок пользователей, а не тех, которые Kafka Consumer отправил повторно из-за того, что он по какой-то причине не закомитил оффсет и перечитал записи.Далее обновляем значения каждой оценки в зависимости от того, что пришло в параметрах.
double p = (s1 * 0.0) + (s2 * 0.25) + (s3 * 0.5) + (s4 * 0.75) + (s5 * 1.0);Рассчитываем положительный рейтинг и нормализуем его от 0 до 1, отдавая предпочтение более высоким оценкам.double n = (s1 * 1.0) + (s2 * 0.75) + (s3 * 0.5) + (s4 * 0.25) + (s5 * 0.0);Рассчитываем отрицательный рейтинг и нормализуем его от 0 до 1, отдавая предпочтение более низким оценкам.double wilsonScore = p + n > 0 ? ((p + 1.9208) / (p + n) - 1.96 * Math.sqrt((p * n) / (p + n) + 0.9604) / (p + n)) / (1 + 3.8416 / (p + n)) : 0;Рассчитываем wilson score по упрощенной формуле. В качестве Z-счета используем 1.96, чтосоответствует 95% уровню уверенности.Сохраняем полученный результат в поле
wilson-score, а также обновляем ключ идемпотентности.
Модифицированную формулу для расчета Wilson Score Confidence Interval, а также подход с применением painless скрипта взял из статьи Better than Average: Sort by Best Rating with Elasticsearch.
Проверка работоспособности
Для того, чтобы проверить работоспособность подхода я подготовил Postman коллекцию,в которой создается несколько пользователей и товаров, а затем пользователи ставят оценки товарам:
Товар 1. "1" - 3 оценки, "2" - 2 оценки
Товар 2. "2" - 3 оценки, "3" - 2 оценки
Товар 3. "3" - 2 оценки, "4" - 2 оценки, "5" - 1 оценка
Товар 4. "4" - 3 оценки, "5" - 2 оценки
Товар 5. "4" - 1 оценка, "5" - 4 оценки
Товар 6. "4" - 1 оценка, "5" - 1 оценка
Товар 7. "4" - 1 оценка
Товар 8. "3" - 1 оценка
Товар 9. "5" - 1 оценка
Товар 10. не имеет оценок.
В результате запроса на получение отсортированного по рейтингу списка товаров получаем:
[
{
"id": 5,
"name": "Product Five",
"price": 14.0,
"description": "Description Five",
"wilsonScore": 0.5118538596631975,
"ratings": {
"1": 0,
"2": 0,
"3": 0,
"4": 1,
"5": 4
}
},
{
"id": 4,
"name": "Product Four",
"price": 13.0,
"description": "Description Four",
"wilsonScore": 0.41770741047904003,
"ratings": {
"1": 0,
"2": 0,
"3": 0,
"4": 3,
"5": 2
}
},
{
"id": 3,
"name": "Product Three",
"price": 12.0,
"description": "Description Three",
"wilsonScore": 0.2987857322245606,
"ratings": {
"1": 0,
"2": 0,
"3": 2,
"4": 2,
"5": 1
}
},
{
"id": 6,
"name": "Product Six",
"price": 15.0,
"description": "Description Six",
"wilsonScore": 0.26404786368925837,
"ratings": {
"1": 0,
"2": 0,
"3": 0,
"4": 1,
"5": 1
}
},
{
"id": 9,
"name": "Product Nine",
"price": 18.0,
"description": "Description Nine",
"wilsonScore": 0.20654329147389294,
"ratings": {
"1": 0,
"2": 0,
"3": 0,
"4": 0,
"5": 1
}
},
{
"id": 7,
"name": "Product Seven",
"price": 16.0,
"description": "Description Seven",
"wilsonScore": 0.11790609179425604,
"ratings": {
"1": 0,
"2": 0,
"3": 0,
"4": 1,
"5": 0
}
},
{
"id": 2,
"name": "Product Two",
"price": 11.0,
"description": "Description Two",
"wilsonScore": 0.09409051165901611,
"ratings": {
"1": 0,
"2": 3,
"3": 2,
"4": 0,
"5": 0
}
},
{
"id": 8,
"name": "Product Eight",
"price": 17.0,
"description": "Description Eight",
"wilsonScore": 0.0546190651458835,
"ratings": {
"1": 0,
"2": 0,
"3": 1,
"4": 0,
"5": 0
}
},
{
"id": 1,
"name": "Product One",
"price": 10.0,
"description": "Description One",
"wilsonScore": 0.010529638390925532,
"ratings": {
"1": 3,
"2": 2,
"3": 0,
"4": 0,
"5": 0
}
},
{
"id": 10,
"name": "Product Ten",
"price": 19.0,
"description": "Description Ten",
"wilsonScore": 0.0,
"ratings": {
"1": 0,
"2": 0,
"3": 0,
"4": 0,
"5": 0
}
}
]Как видно, товары с большим количеством положительных оценок находятся вверху списка, а товары вообще без оценок - в самом низу.
Как запустить микросервисы
Поднимаем инфраструктурные контейнеры:
cd docker
docker-compose up -dЖдем, когда поднимутся все контейнеры. Дольше всех закускается Kafka Connect.
Регистрируем Avro схему агрегированных данных о рейтингах в Confluent Schema Registry с помощью Gradle плагина
com.github.imflog.kafka-schema-registry-gradle-plugin. Для этого в корневой директории проекта необходимо выполнить команду:
./gradlew registerSchemaTaskДелаем чистый билд проекта, чтобы сгенерировались Avro классы на основе схем, которые лежат в папке
review-aggregator-service/src/main/avro. Для этого в корневой директории проекта необходимо выполнить команду:
./gradlew clean build -x testОтправляем конфигурацию Debezium Postgres Connector в Kafka Connect.
cd connector
curl -s -S -XPOST -H Accept:application/json -H Content-Type:application/json http://localhost:8083/connectors/ -d @debezium-config.jsonПосле этого можно стартовать сами микросервисы
backend-serviceиreview-aggregator-service. При запускеbackend-serviceнеобходимо добавить JVM опцию--add-opens java.base/java.math=ALL-UNNAMED. Пока не выяснил, по какой причине Spring Data OpenSearch выбрасывает ошибку при получении данных из кластера, если в модели данных на стороне клиента используется тип BigDecimal.
После этого можно эскпериментировать с эндпоинтами. В качестве отправной точки, можно воспользоваться коллекцией Postman, которая также есть в директории postman.