Пару слов обо мне
Меня зовут Дмитрий, я являюсь PHP разработчиком. Работаю с Битрикс24, Laravel и Go.
Проблема Битрикса
Как известно, рекомендуемое окружение для Битрикса – их собственная разработка BitrixVM на базе CentOS. Иногда такое окружение не устраивает заказчиков, поэтому выбирают Docker или сервер с установленным LEMP стеком.
При переходе на окружение отличное от BitrixVM, существует две основные проблемы – отсутствие сервера очередей Push&Pull и сервера конвертации файлов.
Первая проблема решаема: на просторах гитхаба уже существует рабочее решение для развертывания локального сервера в Docker. А также можно использовать облачный сервер, так как для его работы не требуется, чтобы портал был доступен извне.
Со второй проблемой облако уже не всегда возможно использовать по ряду причин:
необходимость доступности сайта извне (не подходит для полностью закрытых окружений);
опасения заказчиков по поводу передачи конфиденциальных документов на облачный сервер.
Единственное решение – установка BitrixVM на отдельном сервере/в докере, с развертыванием бэкапа внутри и использование штатного сервера, встроенного в окружение, что далеко не всегда удобно.
С этим сталкивался и я на закрытых окружениях крупных российских заказчиков, которые далеко не всегда соглашались открывать доступ к порталу из-за соображений безопасности.
После нескольких таких кейсов было решено написать свой полноценный аналог, который бы отвечал нескольким критериям:
Работал в докере и запускался в несколько кликов;
Была возможность легко задать количество воркеров под различную нагрузку.
В качестве языка программирования было решено выбрать Go.
Реализация
Для начала нам необходимо понять, что представляет собой штатный сервер конвертации: по сути, это не более чем:
API сервер с одним эндпоинтом;
RabbitMQ сервер;
Consumer для обработки заданий из очереди, со следующим установленным ПО: LibreOffice, MPEG, ImageMagic.
Посмотреть реализацию на PHP можно в исходниках модуля Битрикса. Для упрощения я покажу только основной код, остальное всегда можно будет посмотреть в репозитории.
API сервер
Для API мы будем использовать библиотеку go-chi/chi.
Реализуем единственный обработчик – convert, с преобразованием данных в структуру ConvertTask и последующим добавлением в одну из очередей:
main_preview – используется в предпросмотре файлов;
documentgenerator_create – используется в генераторе документов CRM.
// internal/http-server/handlers/convert/convert.go func New(ctx context.Context, log *slog.Logger, rabbit *rabbitmq.Rabbit) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { const op = "handlers.convert.New" reqId := middleware.GetReqID(r.Context()) log = log.With( slog.String("op", op), slog.String("request_id", reqId), ) err := r.ParseForm() if err != nil { log.Error("failed to decode request body", sl.Err(err)) render.JSON(w, r, resp.Error("failed to decode request body", 152)) } task, err := prepareOptions(r.Form, reqId) if err != nil { log.Error("failed to prepare options", sl.Err(err)) render.JSON(w, r, resp.Error("failed to parse task", 0)) return } if task.Queue == "" { task.Queue = rabbit.DefaultQueue() log.Warn("not found queue. Set default", slog.String("default_queue", task.Queue)) } taskMsg, err := json.Marshal(task) if err != nil { log.Error("Error parse request", sl.Err(err)) render.JSON(w, r, resp.Error("Error parse request", 0)) return } err = rabbit.Publish(task.Queue, taskMsg) if err != nil { log.Error("error publish task", slog.String("queue", task.Queue), sl.Err(err)) render.JSON(w, r, resp.Error("error publish task", 0)) } render.JSON(w, r, resp.Success()) } }
FileUploader
Создадим структуру FileUploader и реализуем следующие базовые методы:
Download – скачивание конвертируемого файла из Б24;
uploadFile – загрузка готового файла в Б24;
Complete – отправка запроса о завершении конвертации;
getUploadInfo – получение информации о загружаемом файле.
Для каждого запроса заводим структуры. Оборачиваем запросы в функцию retry.Do из библиотеки avast/retry-go. Устанавливаем 3 попытки на подключение (на случай сетевых проблем).
// internal/lib/fileuploader/fileuploader.go Пример метода получения информации о файле func (f *FileUploader) getUploadInfo(file string, key string) (*uploadInfoResp, error) { fileInfo, err := os.Stat(file) if err != nil { return nil, fmt.Errorf("error get file info [%s]: [%w]", file, err) } uploadReq := uploadInfoRequest{ FileId: key, FileSize: fileInfo.Size(), Upload: "where", } v, err := query.Values(uploadReq) if err != nil { return nil, fmt.Errorf("error convert struct request to query: [%w]", err) } res, err := http.PostForm(f.url, v) if err != nil { return nil, fmt.Errorf("error get upload info from [%s]: [%w]", f.url, err) } var uploadInfoRes uploadInfoResp body, err := io.ReadAll(res.Body) if err != nil { return nil, fmt.Errorf("wrong response upload info request to url [%s]: [%w]", f.url, err) } if err = json.Unmarshal(body, &uploadInfoRes); err != nil { return nil, fmt.Errorf("error unmarshal upload info request to url [%s]: [%w]", f.url, err) } return &uploadInfoRes, nil }
Клиент RabbitMQ
Создаем структуру RabbitMQ и реализуем базовые методы:
Connect - подключение к rabbitMQ;
Reconnect - реконнект в случае разрыва соединения, запускаем в отдельной горутине;
InitQueue - инициализация основной очереди и Dead Letter;
Consume - чтение сообщений из очереди;
Publish - отправка сообщения в очередь.
// internal/lib/rabbitmq/rabbitmq.go package rabbitmq import ( "bitrix-converter/internal/config" "bitrix-converter/internal/lib/logger/sl" "context" "fmt" amqp "github.com/rabbitmq/amqp091-go" "log/slog" "time" ) type Rabbit struct { conn *amqp.Connection log *slog.Logger cfg config.RabbitConfig } func New(log *slog.Logger, cfg config.RabbitConfig) *Rabbit { return &Rabbit{ log: log, cfg: cfg, } } func (r *Rabbit) DefaultQueue() string { return r.cfg.DefaultQueue } func (r *Rabbit) Connect() error { url := fmt.Sprintf("amqp://%s:%s@%s:%s", r.cfg.User, r.cfg.Password, r.cfg.Host, r.cfg.Port) conn, err := amqp.Dial(url) if err != nil { return fmt.Errorf("failed to connect to RabbitMQ: [%w]", err) } r.conn = conn return nil } func (r *Rabbit) Channel() (*amqp.Channel, error) { return r.conn.Channel() } func (r *Rabbit) Reconnect() { for { _, ok := <-r.conn.NotifyClose(make(chan *amqp.Error)) if !ok { r.log.Error("failed notifying rabbitMQ channel. Reconnecting...") } r.log.Error("rabbitmq connection closed unexpectedly. Reconnecting...") for { err := r.Connect() if err == nil { r.log.Info("rabbitMQ reconnect success") break } r.log.Error("rabbitmq reconnect failed. Retry after 10 seconds", sl.Err(err)) time.Sleep(10 * time.Second) } } } func (r *Rabbit) InitQueue(ch *amqp.Channel, queue string) error { dlQueue := queue + "_dead" _, err := ch.QueueDeclare( dlQueue, true, false, false, false, amqp.Table{}, ) if err != nil { return fmt.Errorf("failed to declare dead letter queue: [%w]", err) } _, err = ch.QueueDeclare( queue, true, false, false, false, amqp.Table{ "x-dead-letter-exchange": "", "x-dead-letter-routing-key": dlQueue, "x-message-ttl": 60480000, }, ) if err != nil { return fmt.Errorf("failed to declare queue: [%w]", err) } return nil } func (r *Rabbit) Consume(ch *amqp.Channel, queue string) (msgs <-chan amqp.Delivery, err error) { msgs, err = ch.Consume( queue, "", false, false, false, false, nil, ) if err != nil { return nil, fmt.Errorf("failed to consume queue: [%w]", err) } return msgs, nil } func (r *Rabbit) Publish(queue string, message []byte) error { ch, err := r.Channel() if err != nil { return fmt.Errorf("failed to open channel: [%w]", err) } defer ch.Close() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() err = ch.PublishWithContext( ctx, "", queue, false, false, amqp.Publishing{ ContentType: "text/plain", Body: message, }, ) if err != nil { return fmt.Errorf("failed to publish message: [%w]", err) } return nil } func (r *Rabbit) Connection() *amqp.Connection { return r.conn }
Command
Создадим BaseCommand с общим методом Execute и две реализации: DocumentCommand (LibreOffice) и VideoCommand (MPEG).
// internal/lib/command/command.go func (bs *BaseCommand) Execute() error { if err := bs.validate(); err != nil { return fmt.Errorf("failed validate transform task: [%w]", err) } directory := bs.DownloadDir() err := os.MkdirAll(directory, 0755) if err != nil { return fmt.Errorf("error creating directory [%s]: [%w]", directory, err) } filePath := bs.genTmpFilePath(directory) err = retry.Do( func() error { return bs.uploader.Download(bs.task.File, filePath, bs.MaxSize()) }, retry.Attempts(3), retry.OnRetry(func(n uint, err error) { time.Sleep(1 * time.Second) }), ) bs.uploader.AddFileToDelete(filePath) defer bs.uploader.DeleteFiles() if err != nil { return fmt.Errorf("error download file [%s]: [%w]", bs.task.File, err) } bs.file = filePath for _, format := range bs.task.Formats { if _, ok := bs.files[format]; ok { continue } pre, err := bs.preConvert(format, filePath) if err != nil { return err } if pre { continue } convertedFile, err := bs.transform(format, filePath) bs.uploader.AddFileToDelete(convertedFile) if err != nil { return fmt.Errorf("error transform file [%s] to [%s]: [%w]", bs.task.File, format, err) } bs.files[format] = convertedFile } bs.uploader.SetFiles(bs.files) err = bs.uploader.UploadFiles() if err != nil { return fmt.Errorf("error uploading files: [%w]", err) } err = bs.uploader.Complete() if err != nil { return fmt.Errorf("failed complete: [%w]", err) } return nil }
Consumer
В текущей реализации запускаем 3 горутины на каждую очередь, в одном контейнере.
Настроим graceful завершение (с таймаутом в 5 минут) и реконнект к rabbitmq (в случае разрыва соединения обработчики будут подключаться заново и после восстановления соединения продолжат работу в штатном режиме)
// cmd/consumer/main.go func main() { cfg := config.MustLoad() logger := sl.SetupLogger(cfg.Env) rabbit := rabbitmq.New(logger, cfg.Rabbit) conErr := rabbit.Connect() if conErr != nil { log.Fatalf("failed connect to RabbitMQ with start %v", conErr) } go rabbit.Reconnect() cancelCtx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} for i := 0; i <= 3; i++ { for _, queue := range queues { go func() { uniqId := fmt.Sprintf("%s_%d", queue, i) logger.Info("start consumer", slog.String("queue", queue)) wg.Add(1) done: for { time.Sleep(10 * time.Second) ch, err := rabbit.Channel() if err != nil { logger.Error("failed to open channel. Retry", slog.String("queue", queue), sl.Err(err)) continue } defer ch.Close() err = rabbit.InitQueue(ch, queue) if err != nil { logger.Error("failed init queue. Retry", slog.String("queue", queue), sl.Err(err)) continue } logger.Info("success init queue", slog.String("queue", queue)) msgs, err := rabbit.Consume(ch, queue) if err != nil { logger.Error("failed consume. Retry", slog.String("queue", queue), sl.Err(err)) continue } logger.Info("success consume. Waiting messages", slog.String("queue", queue)) closed: for { select { case <-cancelCtx.Done(): break done default: } select { case d, ok := <-msgs: if !ok { logger.Info("channel closed", slog.String("queue", queue)) break closed } handleMessage(d, logger, cfg, uniqId) default: } time.Sleep(1 * time.Second) } } wg.Done() }() } } waitCh := make(chan struct{}) ch := make(chan os.Signal) signal.Notify(ch, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGINT, syscall.SIGKILL) <-ch logger.Info("receive a shutdown signal") go func() { logger.Info("cancel, wait consumer") cancel() wg.Wait() close(waitCh) }() select { case <-waitCh: logger.Info("graceful shutdown") case <-time.After(5 * time.Minute): logger.Info("shutdown before 5 minutes timeout") } }
Логирование
Для логирования используем библиотеку slog, для группировки логов используем параметр request_id.
Dead Letter Queue
Для каждой очереди инициализируем одноимённую очередь с префиксом dead.
В случае ошибки конвертации делаем reject, и задание попадает в dead очередь, для последующего анализа. По умолчанию время жизни сообщения - неделя.
Где посмотреть?
Сервер разместил на GitHub, текущая версия 1.0.0. В дальнейшем возможно будет обновляться. Инструкция по развертыванию прилагается.
Итого
Надеюсь, это решение упростит отказ от CentOS там, где это необходимо. При возникновении проблем создавайте issue на GitHub.
