За что я люблю технологии 1С?
Это за широкую возможность интеграции с другими системами. А сколько возможностей открывают расширения, написанные на внешней библиотеке DLL. Предлагаю рассмотреть одну из таких возможностей - интеграции через брокер-сообщения.
Интеграция 1С в экосистеме

Я столкнулся с ситуацией, когда экосистема проектов написаны на разных платформах. У нас есть мобильное приложение, web-портал и несколько РИБ 1С баз и нам нужно настроить мгновенный обмен данными номенклатур между этими базами, желательно с минимальной задержкой. Я не буду говорить о структуре сообщения, которое нам нужно передать, это может быть XML, и JSON, что угодно. Давайте поговорим о способе передачи этих данных.
Требования:
большие объемы данных
оперативная выгрузка с минимальной задержкой
возможность принимать данные в нескольких потоках
Сейчас в 1С популярны способы интеграции:
HTTP и web-сервис на стороне 1С
подключение HTTP get/post запрос
синхронизация данных XML
ODATA соединение к базе 1С
внешние источники
Каждый из этих способов хорошо подходит для решения определенных задач, но не в нашей ситуации. Коротко опишу почему.
HTTP и web-сервис имеют ограничения на одновременное подключение, по умолчанию установлено ограничение на 10 параллельных запросов, можем увеличить количество по необходимости. Представьте, стороннее приложение начинает отправлять в цикле get/post запрос, этот лимит мгновенно заполнится, и начнут вылетать ошибки time-out. Эта война между приложениями никогда не закончится.
Еще одной причиной неэффективности таких способов является то, что мы не сможем выполнять оперативную выгрузку данных с минимальной задержкой. Эти способы не смогут обеспечить мгновенное получение данных из систем.
Решение этих задач я вижу в брокерах сообщений, таких как RabbiMQ, Kafka, NUTS, ActiveMQ. Мое внимание привлекла именно Kafka - непрерывная передача информации со smart-периферии (конечных устройств) в IoT-платформу.
Когда данные не только передаются, но и обрабатываются множеством клиентов, которые называются подписчиками (consumers).
В роли подписчиков выступают приложения и программные сервисы. Здесь имеют место отложенные вычисления, когда подписчиков меньше, чем сообщений от издателей - источников данных (producer). Сообщения (messages) записываются по разделам (partition), темам (topic) и хранятся в течение заданного периода. Подписчики сами опрашивают Kafka на предмет наличия новых сообщений и указывают, какие записи им нужно прочесть, увеличивая или уменьшая смещение к нужной записи.
Давайте перейдем к практике создания простого приложения на 1С для обмена данными. План создания архитектуры интеграции посредством Kafka будет следующий.
сначала напишем библиотеку DLL для установки соединения
чтобы стать подписчиком (consumers) и слушать топик на наличие новых данных, мы создадим «бессмертное» фоновое задание
для отправки данных в топик (producer) мы сделаем подписку в 1С на запись объектов, для мгновенной отправки
Поднять и настроить сервер Kafka не сложно, в интернете достаточно необходимой информации.
Напишем библиотеку DLL. Я выбрал язык программирования C# (опыта в этом языке у меня немного), вот мой пример:
using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Net.NetworkInformation; using System.Security.Cryptography; using System.Runtime.InteropServices; using Confluent.Kafka; using System.Linq; namespace Kafka1CConnect { [ComVisible(true)] [InterfaceType(ComInterfaceType.InterfaceIsIDispatch)] [Guid("764A91C9-7472-4DC6-810B-4EB31BE26DB9")] public interface Kafka1CEvent { [DispId(0x60020000)] string GetInfo(); } [ComVisible(true)] [ClassInterface(ClassInterfaceType.AutoDual)] [Guid("A6498157-CCC0-47B9-8B0F-40BBD3AFF096")] [ComSourceInterfaces(typeof(Kafka1CEvent))] [ProgId("Kafka1C.ConnectKafka")] public class ConnectKafka : IDisposable { public ConnectKafka() { // do nothing } ~ConnectKafka() { Dispose(); } private IProducer<string, string> Producer; private IConsumer<string, string> consumer; private CancellationTokenSource cts; public void Dispose() { } public void ProducerCreate(string ConnectString, int ComprType=1, int LingerMs=0, int mmb=1000000) { // do nothing var config = new ProducerConfig { BootstrapServers = ConnectString, Acks = Acks.All, LingerMs = LingerMs, MessageMaxBytes = mmb, CompressionType = (CompressionType) ComprType }; Producer = new ProducerBuilder<string, string>(config).Build(); } public string SendProducer(string topic, string table, string value) { var msg = new Message<string, string> { Key = table, Value = value }; //byte[] bytes = Encoding.UTF8.GetBytes("123123231"); //msg.Headers.Add("head", bytes); var res = Producer.ProduceAsync(topic, msg).GetAwaiter().GetResult(); return res.Status.ToString(); } public void ProducerClose() { Producer.Flush(TimeSpan.FromSeconds(5)); Producer.Dispose(); } public void ConsumerCreate(string ConnectString, string topic, string GroupId, bool eof=false) { var config = new ConsumerConfig { GroupId = GroupId, BootstrapServers = ConnectString, AutoOffsetReset = AutoOffsetReset.Earliest, EnablePartitionEof = eof, Debug = "cgrp" }; consumer = new ConsumerBuilder<string, string>(config).Build(); consumer.Subscribe(new string[] { topic }); } public string[] Consume() { var AnswerArr = new List<string>(); cts = new CancellationTokenSource(); var tokenCancel = cts.Token; try { var cr = consumer.Consume(tokenCancel); if (cr == null) { return null; } else { if (!cr.IsPartitionEOF) { AnswerArr.Add(cr.Topic + ""); AnswerArr.Add(cr.Key + ""); AnswerArr.Add(cr.Value + ""); AnswerArr.Add(cr.Timestamp.UtcDateTime.ToString()); } else { AnswerArr.Add("EOF"); } } } catch (ConsumeException e) { AnswerArr.Add(e.Error.Reason); } return AnswerArr.ToArray(); } public void ConsumeClose() { cts.Cancel(); consumer.Close(); } }}
В этой библиотеке основные методы это:
для чтения данных: ConsumerCreate(), Consume(), ConsumeClose()
для отправки данных: ProducerCreate(), SendProducer(), ProducerClose()
Теперь напишем фоновое задание с «бессмертным» циклом для непрерывного чтения данных:
kafka = Новый COMОбъект("Kafka1C.ConnectKafka"); kafka.ConsumerCreate("localhost","9092","test.topic","base01") Пока Истина Цикл ArrAns = kafka.Consume(); Если ArrAns = NULL или ArrAns = Неопределено Тогда Сообщить("Обработано " + Строка(к) + " записей!" ); Прервать; КонецЕсли; массив = ArrAns.Выгрузить(); Если массив.Количество()=4 Тогда Топик = массив[0]; // Топик (тема), с которого было прочитано сообщение Ключ = массив[0]; // Ключ сообщения Значение = массив[0]; // Сообщение Время = массив[0]; // Время отправки ИначеЕсли массив.Количество()=1 Тогда Сообщить(массив[0];); КонецЕсли; КонецЦикла; kafka = NULL;
Думаю, в этом коде ничего сложного, мы создаем экземпляр библиотеки DLL в коде 1С и пользуемся его методами.
Обратите внимание, хоть цикл и непрерывный, при вызове метода kafka.Consume() цикл становится на паузу, в ожидании нового сообщения, это позволяет уменьшить лишнюю нагрузку на процессоры 1С.
Так мы обеспечили мгновенную отправку данных с базы А при записи объекта и мгновенное чтение в базе Б.
Я показал самый простой пример применения брокер сообщений для интеграции внутри баз 1С и с другими базами. Это привело к развитию микросервисных систем с моментально быстрым с обменом данным. Все задачи решались на много быстрее. Также решили проблему логирования сообщений и хранение истории обмена для отказоустойчивой системы интеграции.
Целью данной статьи было не показать best-practices или не убеждать вас, уважаемые читатели, что для шины данных может использоваться только Apache Kafka, а предать свой опыт, для раздумья и развития вашего собственной экосистемы. Вы можете использовать любой брокер сообщения, для шины данных, главное правильно построить архитектуру и без "фанатизма".
В итоге мы применение нашей шины в таких проектах, как: MDM (быстрое согласование и контроль данных), DWH (сбор структурированных данных), CRM (быстрое согласование заявок).