Формулировка задачи и цель
Заявка представляет собой большой JSON документ с несколькими дочерними документами. В разное время разные участники процесса обработки заявки обновляют этот документ.
Например, по инициативе клиента. Клиент может поменять залоговый авто, недвижимость, изменить сумму или срок кредита и т.д. Некоторым системам важно знать, что произошло событие по заявке, чтобы выполнить свою логику.
Сегодня Product Storage периодически используют в формате - одна система что-то сделала с заявкой, а другая ждет когда это произойдет.
Ожидание происходит в следующих форматах:
- регулярном пинговании заявки и ожидании целевого состояния (уже приехали? а сейчас? а сейчас? а сейчас? О, поехали)
- ожидании событий от status informer’a
- подписка на топики sdp, в которые пишет Kafka-коннектор из _aud таблиц
Подходы далеки от совершенства:
- пингование - лишняя нагрузка, получается что 3к рпс в Product Storage это пузырь, к тому же увеличивается время ожидания
- status informer - ограничен в событиях и невозможнна конфигурация
- на топиках sdp мы допускаем пропуск событий, клиенты обрабатывают вообще все возможные события (десериализуют содержимое каждого сообщения!!), а наши массовые операции могут их задевать
Необходимо спроектировать механизм, который позволял бы следующее:
- Задавать список событий (по типам), на которые пользователи могли бы подписываться
- Давать возможность пользователям задавать критерии отправки сообщения (например, статус сменился на такой-то, или добавилось поле такое-то)
- Описать подход к добавлению и изменению новых событий
- Определить формат оповещения пользователей (Kafka topic или что еще?)
- Определить зону ответственности, чего сервис не должен делать (вдруг на нем можно построить целый продуктовый процесс)
Окружение
В качестве окружения выступает Product Storage, где происходит изменение данных. Платформенные и продуктовые сервисы являются инициаторами и потребителями данных.
тут диаграмма контекста <<<<←
Пользователи
Сегодня существует топик top-storage.export.product-instance. Изначально топик не предполагался к использованию сторонними клиентами, но так как не было других вариантов - выдавали доступ. В этот топик льется довольно большой поток, любые события связанные с продуктами любого типа.
Сейчас есть возможность сделать все хорошо и снять бОльшую часть нагрузки + мы перестанем задевать массовыми операциями потребителей, если раньше задевали.
Из текущих пользователей топика:
- Домен Согласий - общался с @e.buylin, из топика регистрируют согласия на поход в БКИ по продуктам с productSourceInfo.utmBkiAgreement = true, https://wiki.tcsbank.ru/pages/viewpage.action?pageId=2770547752
- AFS- антифрод - https://time.tbank.ru/tinkoff/pl/z34dsq5wrfyddmh5mz9su767zc
Остальные пользователи
- balance-transfer.cc-doctor - k.saydashev - не использовался, доступ отобран
- top-varys.unicorn - не используется, доступ отобран
- top-pp-cross-domain.afs-online-uploader - устарел, та же команда что и пользователь выше, доступ отобран
- top-storage -командный клиент, использовался командой для всех дел с Kafka. Основное назначение было у экспортера, но сейчас более не используется.
На других топиках актуальный пользователь только top-storage, в связи с чем, ожидается, что топики будут удалены в скором времени.
- realty-info
- product-car
- product-task
- participant-role
Не все использовали Kafka, некоторые выжидали обновление продукта через REST. Следующие потенциальные пользователи были найдены через Gitlab :
- https://gitlab.tcsbank.ru/top-mortgage/credit-mortgage-product-processes-kitqat/-/blob/e3ce42e569f94524b23ff79ab1f2a110a4dfce93/src/main/kotlin/ru/tbank/kitqat/services/AwaitilityService.kt#L35
- https://gitlab.tcsbank.ru/top-mortgage/mortgage-origination-autotest/-/blob/0f22b7cd214db5ad20ec67bcd0e3c21dcaca42c7/mortgage-origination-autotest/src/main/kotlin/ru/tinkoff/qa/mortgage/origination/storage/methods/productControllerV2/ProductList.kt
- top-debit-cards / top-deposit-products-automation: junior-application-process/src/main/kotlin/ru/tinkoff/qa/debit/juniorprocess/process/JuniorAccountProduct.kt
- https://gitlab.tcsbank.ru/finsaving/finsaving-products-automation/-/blob/cbfc4f1ee66ee10362623a3dd8b5c81f7076f8f1/saving-account-process/src/main/kotlin/ru/tinkoff/qa/debit/savingaccount/helper/SavingAccountHelper.kt
- top-salestech-automation / salestech-automation: plugin-x-sell/src/main/kotlin/ru/tbank/salestech/xsell/pap/steps/PapSteps.kt
- top-mortgage / credit-mortgage-product-processes-kitqat: src/main/kotlin/ru/tbank/kitqat/api/handlers/MortgagePrimaryApiHandler.kt
- top-tfamily / junior-application-process: junior-application-process/src/main/kotlin/ru/tinkoff/bpm/juniorapplicationprocess/service/parentdebitcard/ParentDebitCardService.kt
К тому же, изменения в сторедже могут быть интересны:
- Online-decision-adapter https://wiki.tcsbank.ru/pages/viewpage.action?pageId=2219924417
- Status provider
- и прочим системам
Нефункциональные требования
- Задавать список событий (по типам), на которые пользователи могли бы подписываться
- Давать возможность пользователям задавать критерии отправки сообщения (например, статус сменился на такой-то, или добавилось поле такое-то)
- Описать подход к добавлению и изменению новых событий
- Определить формат оповещения пользователей (Kafka topic или что еще?)
В команде Ecosystem Multiorigination сущетсвует необходимость получать события по продуктам, которых еще не существует на момент возникновения необходимости. Они знают только integrationId, с которым в будущем должен появится продукт и хотят получать события по нему с некоторым дополнительным условием.
Функциональные требования
- Гарантированная доставка события клиенту хотя бы раз (at least once)
- Правила фильтрации сообщений работают на основании полей из тела
- Есть возможность создать правило для сообщений из топиков в рантайме
- Правила используют выражения, которые можно передать через REST в удобном формате
- Наличие приоритета правил и детерменированного поведения при одинаковом приоритете.
- Если несколько правил соответствуют событию — выполнить их в порядке priority/order.
- Наличие аудита - кто создал/удалил правило, когда; лог срабатываний через трейсинг.
Предисловие В любом из вариантов потребуются доработки в сторедже, которые также позволят в дальнейшем снизить нагрузку. Так, потребуется новый, единый на все сущности топик, в котором мы будем писать обогащенное событие, возникшее в рамках продукта. Например сейчас, при изменении product-task или participant-role мы в любом случае будем проверять продукт, а для этого ходить в базу и читать его. Таким образом в событие в кафку без затрат мы можем положить:
- JsonPatch
- Сущность, к которой был применен патч
- ProductInstance
- тип события
- тип сущности, к которой применяем патч
Детальный дизайн
1 Вариант
Кратко: Новый микросервис слушает указанные в конфиге Kafka-топики, батчами берёт сообщения, извлекает из них id по набору путей из конфига, по собранным ключам загружает правила из Postgres (у них TTL = 14 дней, можно применить LRU кэш), выполняет CEL-правила и при совпадении публикует новое сообщение в целевой топик другой команды с заданным в правиле шаблоном. Правила существуют статические (в коде) и динамические (создаются через REST). У каждого динамического правила есть приоритет и порядок применения.
****CEL - компилируемый язык выражений, безопасный для выполнения в любой среде
Подробно: Ключевая особенность решения - в динамическом изменении правил, в рантайме можно будет задать правило, которое сразу же начнет применятся к событиям в топике. В частности такая потребность есть у команды Ecosystem Multiorigination, чтобы мы уведомляли их только по integrationId, который они нам заранее сообщат. Возникает ситуация, что через REST они нам должны передать условия срабатывания правила, в результате которого мы должны направить им сообщение. При этом, нет способов как-то определить integrationId так, чтобы по нему было понятно, что инициатором создания продукта является проектируемый оркестратор.
Способов задать правило может быть много:
- https://jsoneditoronline.org/indepth/query/10-best-json-query-languages/
- https://jsonlogic.com
- https://cel.dev
- wasm Из представленных решений наиболее подходящим считаю CEL. Так как:
- на момент добавления правила мы можем его провалидировать и скомпилировать, т.е. предварительно подготовить
- скомпилированный байт код условий работает практически с такой же скоростью
- C-подобный синтаксис, наиболее удобно описывать условия
- потокобезопасный
- не может уйти в рекурсию, переполнить стэк или отпасть по таймауту по определению
Справка по результату прогона на одном потоке при применении одинакового условия: avg CEL extraction from bytes time: 0.0193 ms avg cel runtime time: 0.0022 ms avg java runtime time: 0.0002 ms
Чтобы применить правило, мы должны десериализовать сообщение, далее из пришедшего сообщения достать нужные поля, чтобы найти правила по значению этого поля. По этим значениям полей выполняем поиск внутри единой таблицы postgres. Таблица будет со следующим DDL:
CREATE TABLE rules (
id BIGSERIAL PRIMARY KEY,
source_topic TEXT NOT NULL,
key_path TEXT, -- путь
key_value TEXT, -- ожидаемое значение поля
expression BYTEA NOT NULL, -- скомпилированное условие
target_topic TEXT NOT NULL,
message_template JSONB NULL, -- JSON со строками/значениями; строки — CEL-выражения
priority INTEGER NOT NULL DEFAULT 0,
created_by TEXT,
created_at TIMESTAMP NOT NULL DEFAULT now()
);
CREATE INDEX idx_rules_lookup
ON rules (source_topic, key_path, key_value, priority DESC);Предполагается, что поля, по которым нужно фильтровать сообщения будут определятся через конфигурацию, например:
kafka:
consumers:
product-instance:
topic:
key_paths:
- product.integrationId
Таким образом, регистрация правила будет происходить по отдельному URL, куда будет передаваться параметрами к запросу: - топик - ключ поля из конфигурации, по которому будет срабатывать правило и тело запроса:
{
"source_topic": "product-instance",
"expression": "product.productTasks.contains(a, a.status == DONE)",
"target_topic": "done-products",
"message_template": { // cel - шаблон, весь этот объект это исполняемая программа
"channel": "product.productSourceInfo.channel",
"appliedAt": "timestamp.now()"
},
"priority": 10
}
При регистрации правила его необходимо провалидировать и скомпилировать (самое условие и шаблон), при валидации передавать случайно сгенерированные переменные. После успешной компиляции - CEL программы преобразовать в последовательность байт и сохранить в БД. Пользователю отдать информацию о успешной регистрации правила.
Затем, мы слушаем топик, получаем батч целиком, десериализуем каждое сообщение (можно многопоточно), достаем содержимое полей из конфига и делаем запрос:
SELECT id, expression, target_topic, message_template
FROM rules
WHERE source_topic = 'product-instance'
AND key_path = 'product.contactCrmId'
AND key_value in ('12345', "213")
ORDER BY priority DESC;Из полученных записей восстанавливаем сначала только выражения и проверяем, сработало ли выражение или нет. В случае, если выражение возвращает true, восстанавливаем шаблон в программу, выполняем ее с передачей внутрь переменных и пишем в target_topic.
Зачем нужны статические правила? Есть правила, которые могут выполнятся постоянно, без необходимости каждый раз регистрировать новое правило., они наиболее актуальны текущим потребителям - afs (антифрод) и доменсогласий. Такие правила можно описывать индивидуально через конфигурацию или кодом. Считаю, что лучше описывать кодом через удобный интрефейс, который нужно реализовать, так как так или иначе необходимо вносить через MR, а время исполнения нативного года меньше.
- Кеширование — кешируем
Programв памяти (ключ: rule_id + expr_type). Не компилируем заново при каждом событии. - Мониторинг и откат ошибок — лог ошибок выражений, fallback-сообщения или пустые поля, метрики latency/failed_evals.
Рекомендации пользователям
- при использовании динамических - сначала регистрировать правило, затем выполнять логику. В противном случае может возникнуть гонка, событие наступит до того, как вы успеете зарегистрировать правило.
Что продумать: идемпотентность
пришло 50 сообщений
какое-то время на парсинг и достать нужные id затем сходить в бд за нужными правилами - 8 ms восстановить программы из правил - 0.02 ms * 50 = 1 ms применить правила к 0.0022 * 50 = 0.11 ms запродюсить -
Плюсы:
- правила можно гибко задавать на лету, безрелизный цикл
- решение позволяет генерировать уникальные правила в зависимости от логики процесса (но таких требований) Минусы:
- сложность реализации, еще один сервис на поддержке
- можно легко допустить гонку событий, сначала произойдет событие, а затем клиент создаст правило (только в случае с динамическим)
- возникает некоторая задержка из-за десериализации и подбора правил по ключам из json
- в сумме больше нагрузка на KaaS, больше топиков на поддержке
Вариант 2
Сейчас есть топики, которые были созданы для эпика по хранению истории изменений, в них мы гарантировано доставим событие.
Есть идея обогащать сообщения в Kafka дополнительной информацией, конфигурируемой из репозитория.
Например, сервису нужно знать события по фильтру:
productBefore.pid "MY_PID" && (productBefore.productTasks.type MEETING && productBefore.productTasks.status != DONE) && (productAfter.productTasks.type MEETING && productAfter.productTasks.status DONE)
Это условие мы можем посчитать на Product Storage и положить хэдер: meeting_completed: true
В свою очередь клиент будет пропускать на топике все записи, не содержащие ему необходимый хэдер через фильтр. Как только хэдер появился - десериализовывать и выполнять свою логику.
Плюсы:
- минимум затрат, намного дешевле в реализации
- меньше связность
- каждому клиенту нужно будет вычитать все сообщения, больше нагрузка на сеть (600 Кб/с, так как 300 messages per second)
- задержки практически нет, правила посчитать ничего не стоит Минусы:
- менее гибко
- новые конфигурации проходят через иннерсорс в конфиг, а затем через релиз
- механизм может стать сложным при большом количестве правил
- менее безопасно, все могут читать содержимое сообщений (насколько актуально?)
Вариант 3 Гибрид - сделать новый сервис, который будет слушать новый топик.
В него мы будем писать:
продукт
Уровень критичности - BC+ аналогично стореджу
Вариант 4
Cпециализированный Complex Event Processing (CEP): правила — шаблоны/паттерны времени/последовательности (e.g., «если A затем B в течение 5 минут»). Использовать Flink CEP или Esper — мощный stateful engine.