Кастомный сервер конвертации файлов для Битрикс24
Пару слов обо мне
Меня зовут Дмитрий, я являюсь PHP разработчиком. Работаю с Битрикс24, Laravel и Go.
Проблема Битрикса
Как известно, рекомендуемое окружение для Битрикса – их собственная разработка BitrixVM на базе CentOS. Иногда такое окружение не устраивает заказчиков, поэтому выбирают Docker или сервер с установленным LEMP стеком.
При переходе на окружение отличное от BitrixVM, существует две основные проблемы – отсутствие сервера очередей Push&Pull и сервера конвертации файлов.
Первая проблема решаема: на просторах гитхаба уже существует рабочее решение для развертывания локального сервера в Docker. А также можно использовать облачный сервер, так как для его работы не требуется, чтобы портал был доступен извне.
Со второй проблемой облако уже не всегда возможно использовать по ряду причин:
необходимость доступности сайта извне (не подходит для полностью закрытых окружений);
опасения заказчиков по поводу передачи конфиденциальных документов на облачный сервер.
Единственное решение – установка BitrixVM на отдельном сервере/в докере, с развертыванием бэкапа внутри и использование штатного сервера, встроенного в окружение, что далеко не всегда удобно.
С этим сталкивался и я на закрытых окружениях крупных российских заказчиков, которые далеко не всегда соглашались открывать доступ к порталу из-за соображений безопасности.
После нескольких таких кейсов было решено написать свой полноценный аналог, который бы отвечал нескольким критериям:
Работал в докере и запускался в несколько кликов;
Была возможность легко задать количество воркеров под различную нагрузку.
В качестве языка программирования было решено выбрать Go.
Реализация
Для начала нам необходимо понять, что представляет собой штатный сервер конвертации: по сути, это не более чем:
API сервер с одним эндпоинтом;
RabbitMQ сервер;
Consumer для обработки заданий из очереди, со следующим установленным ПО: LibreOffice, MPEG, ImageMagic.
Посмотреть реализацию на PHP можно в исходниках модуля Битрикса. Для упрощения я покажу только основной код, остальное всегда можно будет посмотреть в репозитории.
API сервер
Для API мы будем использовать библиотеку go-chi/chi.
Реализуем единственный обработчик – convert, с преобразованием данных в структуру ConvertTask и последующим добавлением в одну из очередей:
main_preview – используется в предпросмотре файлов;
documentgenerator_create – используется в генераторе документов CRM.
// internal/http-server/handlers/convert/convert.go
func New(ctx context.Context, log *slog.Logger, rabbit *rabbitmq.Rabbit) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
const op = "handlers.convert.New"
reqId := middleware.GetReqID(r.Context())
log = log.With(
slog.String("op", op),
slog.String("request_id", reqId),
)
err := r.ParseForm()
if err != nil {
log.Error("failed to decode request body", sl.Err(err))
render.JSON(w, r, resp.Error("failed to decode request body", 152))
}
task, err := prepareOptions(r.Form, reqId)
if err != nil {
log.Error("failed to prepare options", sl.Err(err))
render.JSON(w, r, resp.Error("failed to parse task", 0))
return
}
if task.Queue == "" {
task.Queue = rabbit.DefaultQueue()
log.Warn("not found queue. Set default", slog.String("default_queue", task.Queue))
}
taskMsg, err := json.Marshal(task)
if err != nil {
log.Error("Error parse request", sl.Err(err))
render.JSON(w, r, resp.Error("Error parse request", 0))
return
}
err = rabbit.Publish(task.Queue, taskMsg)
if err != nil {
log.Error("error publish task", slog.String("queue", task.Queue), sl.Err(err))
render.JSON(w, r, resp.Error("error publish task", 0))
}
render.JSON(w, r, resp.Success())
}
}
FileUploader
Создадим структуру FileUploader и реализуем следующие базовые методы:
Download – скачивание конвертируемого файла из Б24;
uploadFile – загрузка готового файла в Б24;
Complete – отправка запроса о завершении конвертации;
getUploadInfo – получение информации о загружаемом файле.
Для каждого запроса заводим структуры. Оборачиваем запросы в функцию retry.Do из библиотеки avast/retry-go. Устанавливаем 3 попытки на подключение (на случай сетевых проблем).
// internal/lib/fileuploader/fileuploader.go
Пример метода получения информации о файле
func (f *FileUploader) getUploadInfo(file string, key string) (*uploadInfoResp, error) {
fileInfo, err := os.Stat(file)
if err != nil {
return nil, fmt.Errorf("error get file info [%s]: [%w]", file, err)
}
uploadReq := uploadInfoRequest{
FileId: key,
FileSize: fileInfo.Size(),
Upload: "where",
}
v, err := query.Values(uploadReq)
if err != nil {
return nil, fmt.Errorf("error convert struct request to query: [%w]", err)
}
res, err := http.PostForm(f.url, v)
if err != nil {
return nil, fmt.Errorf("error get upload info from [%s]: [%w]", f.url, err)
}
var uploadInfoRes uploadInfoResp
body, err := io.ReadAll(res.Body)
if err != nil {
return nil, fmt.Errorf("wrong response upload info request to url [%s]: [%w]", f.url, err)
}
if err = json.Unmarshal(body, &uploadInfoRes); err != nil {
return nil, fmt.Errorf("error unmarshal upload info request to url [%s]: [%w]", f.url, err)
}
return &uploadInfoRes, nil
}
Клиент RabbitMQ
Создаем структуру RabbitMQ и реализуем базовые методы:
Connect - подключение к rabbitMQ;
Reconnect - реконнект в случае разрыва соединения, запускаем в отдельной горутине;
InitQueue - инициализация основной очереди и Dead Letter;
Consume - чтение сообщений из очереди;
Publish - отправка сообщения в очередь.
// internal/lib/rabbitmq/rabbitmq.go
package rabbitmq
import (
"bitrix-converter/internal/config"
"bitrix-converter/internal/lib/logger/sl"
"context"
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
"log/slog"
"time"
)
type Rabbit struct {
conn *amqp.Connection
log *slog.Logger
cfg config.RabbitConfig
}
func New(log *slog.Logger, cfg config.RabbitConfig) *Rabbit {
return &Rabbit{
log: log,
cfg: cfg,
}
}
func (r *Rabbit) DefaultQueue() string {
return r.cfg.DefaultQueue
}
func (r *Rabbit) Connect() error {
url := fmt.Sprintf("amqp://%s:%s@%s:%s", r.cfg.User, r.cfg.Password, r.cfg.Host, r.cfg.Port)
conn, err := amqp.Dial(url)
if err != nil {
return fmt.Errorf("failed to connect to RabbitMQ: [%w]", err)
}
r.conn = conn
return nil
}
func (r *Rabbit) Channel() (*amqp.Channel, error) {
return r.conn.Channel()
}
func (r *Rabbit) Reconnect() {
for {
_, ok := <-r.conn.NotifyClose(make(chan *amqp.Error))
if !ok {
r.log.Error("failed notifying rabbitMQ channel. Reconnecting...")
}
r.log.Error("rabbitmq connection closed unexpectedly. Reconnecting...")
for {
err := r.Connect()
if err == nil {
r.log.Info("rabbitMQ reconnect success")
break
}
r.log.Error("rabbitmq reconnect failed. Retry after 10 seconds", sl.Err(err))
time.Sleep(10 * time.Second)
}
}
}
func (r *Rabbit) InitQueue(ch *amqp.Channel, queue string) error {
dlQueue := queue + "_dead"
_, err := ch.QueueDeclare(
dlQueue,
true,
false,
false,
false,
amqp.Table{},
)
if err != nil {
return fmt.Errorf("failed to declare dead letter queue: [%w]", err)
}
_, err = ch.QueueDeclare(
queue,
true,
false,
false,
false,
amqp.Table{
"x-dead-letter-exchange": "",
"x-dead-letter-routing-key": dlQueue,
"x-message-ttl": 60480000,
},
)
if err != nil {
return fmt.Errorf("failed to declare queue: [%w]", err)
}
return nil
}
func (r *Rabbit) Consume(ch *amqp.Channel, queue string) (msgs <-chan amqp.Delivery, err error) {
msgs, err = ch.Consume(
queue,
"",
false,
false,
false,
false,
nil,
)
if err != nil {
return nil, fmt.Errorf("failed to consume queue: [%w]", err)
}
return msgs, nil
}
func (r *Rabbit) Publish(queue string, message []byte) error {
ch, err := r.Channel()
if err != nil {
return fmt.Errorf("failed to open channel: [%w]", err)
}
defer ch.Close()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = ch.PublishWithContext(
ctx,
"",
queue,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: message,
},
)
if err != nil {
return fmt.Errorf("failed to publish message: [%w]", err)
}
return nil
}
func (r *Rabbit) Connection() *amqp.Connection {
return r.conn
}
Command
Создадим BaseCommand с общим методом Execute и две реализации: DocumentCommand (LibreOffice) и VideoCommand (MPEG).
// internal/lib/command/command.go
func (bs *BaseCommand) Execute() error {
if err := bs.validate(); err != nil {
return fmt.Errorf("failed validate transform task: [%w]", err)
}
directory := bs.DownloadDir()
err := os.MkdirAll(directory, 0755)
if err != nil {
return fmt.Errorf("error creating directory [%s]: [%w]", directory, err)
}
filePath := bs.genTmpFilePath(directory)
err = retry.Do(
func() error {
return bs.uploader.Download(bs.task.File, filePath, bs.MaxSize())
},
retry.Attempts(3),
retry.OnRetry(func(n uint, err error) {
time.Sleep(1 * time.Second)
}),
)
bs.uploader.AddFileToDelete(filePath)
defer bs.uploader.DeleteFiles()
if err != nil {
return fmt.Errorf("error download file [%s]: [%w]", bs.task.File, err)
}
bs.file = filePath
for _, format := range bs.task.Formats {
if _, ok := bs.files[format]; ok {
continue
}
pre, err := bs.preConvert(format, filePath)
if err != nil {
return err
}
if pre {
continue
}
convertedFile, err := bs.transform(format, filePath)
bs.uploader.AddFileToDelete(convertedFile)
if err != nil {
return fmt.Errorf("error transform file [%s] to [%s]: [%w]", bs.task.File, format, err)
}
bs.files[format] = convertedFile
}
bs.uploader.SetFiles(bs.files)
err = bs.uploader.UploadFiles()
if err != nil {
return fmt.Errorf("error uploading files: [%w]", err)
}
err = bs.uploader.Complete()
if err != nil {
return fmt.Errorf("failed complete: [%w]", err)
}
return nil
}
Consumer
В текущей реализации запускаем 3 горутины на каждую очередь, в одном контейнере.
Настроим graceful завершение (с таймаутом в 5 минут) и реконнект к rabbitmq (в случае разрыва соединения обработчики будут подключаться заново и после восстановления соединения продолжат работу в штатном режиме)
// cmd/consumer/main.go
func main() {
cfg := config.MustLoad()
logger := sl.SetupLogger(cfg.Env)
rabbit := rabbitmq.New(logger, cfg.Rabbit)
conErr := rabbit.Connect()
if conErr != nil {
log.Fatalf("failed connect to RabbitMQ with start %v", conErr)
}
go rabbit.Reconnect()
cancelCtx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
for i := 0; i <= 3; i++ {
for _, queue := range queues {
go func() {
uniqId := fmt.Sprintf("%s_%d", queue, i)
logger.Info("start consumer", slog.String("queue", queue))
wg.Add(1)
done:
for {
time.Sleep(10 * time.Second)
ch, err := rabbit.Channel()
if err != nil {
logger.Error("failed to open channel. Retry", slog.String("queue", queue), sl.Err(err))
continue
}
defer ch.Close()
err = rabbit.InitQueue(ch, queue)
if err != nil {
logger.Error("failed init queue. Retry", slog.String("queue", queue), sl.Err(err))
continue
}
logger.Info("success init queue", slog.String("queue", queue))
msgs, err := rabbit.Consume(ch, queue)
if err != nil {
logger.Error("failed consume. Retry", slog.String("queue", queue), sl.Err(err))
continue
}
logger.Info("success consume. Waiting messages", slog.String("queue", queue))
closed:
for {
select {
case <-cancelCtx.Done():
break done
default:
}
select {
case d, ok := <-msgs:
if !ok {
logger.Info("channel closed", slog.String("queue", queue))
break closed
}
handleMessage(d, logger, cfg, uniqId)
default:
}
time.Sleep(1 * time.Second)
}
}
wg.Done()
}()
}
}
waitCh := make(chan struct{})
ch := make(chan os.Signal)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGINT, syscall.SIGKILL)
<-ch
logger.Info("receive a shutdown signal")
go func() {
logger.Info("cancel, wait consumer")
cancel()
wg.Wait()
close(waitCh)
}()
select {
case <-waitCh:
logger.Info("graceful shutdown")
case <-time.After(5 * time.Minute):
logger.Info("shutdown before 5 minutes timeout")
}
}
Логирование
Для логирования используем библиотеку slog, для группировки логов используем параметр request_id.
Dead Letter Queue
Для каждой очереди инициализируем одноимённую очередь с префиксом dead.
В случае ошибки конвертации делаем reject, и задание попадает в dead очередь, для последующего анализа. По умолчанию время жизни сообщения - неделя.
Где посмотреть?
Сервер разместил на GitHub, текущая версия 1.0.0. В дальнейшем возможно будет обновляться. Инструкция по развертыванию прилагается.
Итого
Надеюсь, это решение упростит отказ от CentOS там, где это необходимо. При возникновении проблем создавайте issue на GitHub.