Пару слов обо мне

Меня зовут Дмитрий, я являюсь PHP разработчиком. Работаю с Битрикс24, Laravel и Go.

Проблема Битрикса

Как известно, рекомендуемое окружение для Битрикса – их собственная разработка BitrixVM на базе CentOS. Иногда такое окружение не устраивает заказчиков, поэтому выбирают Docker или сервер с установленным LEMP стеком. 

При переходе на окружение отличное от BitrixVM, существует две основные проблемы – отсутствие сервера очередей Push&Pull и сервера конвертации файлов. 

Первая проблема решаема: на просторах гитхаба уже существует рабочее решение для развертывания локального сервера в Docker. А также можно использовать облачный сервер, так как для его работы не требуется, чтобы портал был доступен извне. 

Со второй проблемой облако уже не всегда возможно использовать по ряду причин:

  • необходимость доступности сайта извне (не подходит для полностью закрытых окружений);

  • опасения заказчиков по поводу передачи конфиденциальных документов на облачный сервер.

Единственное решение – установка BitrixVM на отдельном сервере/в докере, с развертыванием бэкапа внутри и использование штатного сервера, встроенного в окружение, что далеко не всегда удобно.

С этим сталкивался и я на закрытых окружениях крупных российских заказчиков, которые далеко не всегда соглашались открывать доступ к порталу из-за соображений безопасности.

После нескольких таких кейсов было решено написать свой полноценный аналог, который бы отвечал нескольким критериям:

  • Работал в докере и запускался в несколько кликов;

  • Была возможность легко задать количество воркеров под различную нагрузку.

В качестве языка программирования было решено выбрать Go.

Реализация

Для начала нам необходимо понять, что представляет собой штатный сервер конвертации: по сути, это не более чем:

  • API сервер с одним эндпоинтом;

  • RabbitMQ сервер;

  • Consumer для обработки заданий из очереди, со следующим установленным ПО: LibreOffice, MPEG, ImageMagic.

Посмотреть реализацию на PHP можно в исходниках модуля Битрикса. Для упрощения я покажу только основной код, остальное всегда можно будет посмотреть в репозитории.

API сервер

Для API мы будем использовать библиотеку go-chi/chi.

Реализуем единственный обработчик – convert, с преобразованием данных в структуру ConvertTask и последующим добавлением в одну из очередей:

  • main_preview – используется в предпросмотре файлов;

  • documentgenerator_create – используется в генераторе документов CRM.

// internal/http-server/handlers/convert/convert.go
func New(ctx context.Context, log *slog.Logger, rabbit *rabbitmq.Rabbit) http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {

		const op = "handlers.convert.New"

		reqId := middleware.GetReqID(r.Context())

		log = log.With(
			slog.String("op", op),
			slog.String("request_id", reqId),
		)

		err := r.ParseForm()

		if err != nil {
			log.Error("failed to decode request body", sl.Err(err))
			render.JSON(w, r, resp.Error("failed to decode request body", 152))
		}

		task, err := prepareOptions(r.Form, reqId)

		if err != nil {
			log.Error("failed to prepare options", sl.Err(err))
			render.JSON(w, r, resp.Error("failed to parse task", 0))
			return
		}

		if task.Queue == "" {
			task.Queue = rabbit.DefaultQueue()
			log.Warn("not found queue. Set default", slog.String("default_queue", task.Queue))
		}

		taskMsg, err := json.Marshal(task)

		if err != nil {
			log.Error("Error parse request", sl.Err(err))

			render.JSON(w, r, resp.Error("Error parse request", 0))

			return
		}

		err = rabbit.Publish(task.Queue, taskMsg)
		if err != nil {
			log.Error("error publish task", slog.String("queue", task.Queue), sl.Err(err))
			render.JSON(w, r, resp.Error("error publish task", 0))
		}
		render.JSON(w, r, resp.Success())
	}
}

FileUploader 

Создадим структуру FileUploader и реализуем следующие базовые методы:

  • Download – скачивание конвертируемого файла из Б24;

  • uploadFile – загрузка готового файла в Б24;

  • Complete – отправка запроса о завершении конвертации;

  • getUploadInfo – получение информации о загружаемом файле.

    Для каждого запроса заводим структуры. Оборачиваем запросы в функцию retry.Do из библиотеки avast/retry-go. Устанавливаем 3 попытки на подключение (на случай сетевых проблем).

// internal/lib/fileuploader/fileuploader.go
Пример метода получения информации о файле
func (f *FileUploader) getUploadInfo(file string, key string) (*uploadInfoResp, error) {
	fileInfo, err := os.Stat(file)

	if err != nil {
		return nil, fmt.Errorf("error get file info [%s]: [%w]", file, err)
	}

	uploadReq := uploadInfoRequest{
		FileId:   key,
		FileSize: fileInfo.Size(),
		Upload:   "where",
	}

	v, err := query.Values(uploadReq)

	if err != nil {
		return nil, fmt.Errorf("error convert struct request to query: [%w]", err)
	}

	res, err := http.PostForm(f.url, v)

	if err != nil {
		return nil, fmt.Errorf("error get upload info from [%s]: [%w]", f.url, err)
	}

	var uploadInfoRes uploadInfoResp

	body, err := io.ReadAll(res.Body)

	if err != nil {
		return nil, fmt.Errorf("wrong response upload info request to url [%s]: [%w]", f.url, err)
	}

	if err = json.Unmarshal(body, &uploadInfoRes); err != nil {
		return nil, fmt.Errorf("error unmarshal upload info request to url [%s]: [%w]", f.url, err)
	}

	return &uploadInfoRes, nil
}

Клиент RabbitMQ

Создаем структуру RabbitMQ и реализуем базовые методы:

  • Connect - подключение к rabbitMQ;

  • Reconnect - реконнект в случае разрыва соединения, запускаем в отдельной горутине;

  • InitQueue - инициализация основной очереди и Dead Letter;

  • Consume - чтение сообщений из очереди;

  • Publish - отправка сообщения в очередь.

// internal/lib/rabbitmq/rabbitmq.go
package rabbitmq

import (
	"bitrix-converter/internal/config"
	"bitrix-converter/internal/lib/logger/sl"
	"context"
	"fmt"
	amqp "github.com/rabbitmq/amqp091-go"
	"log/slog"
	"time"
)

type Rabbit struct {
	conn *amqp.Connection
	log  *slog.Logger
	cfg  config.RabbitConfig
}

func New(log *slog.Logger, cfg config.RabbitConfig) *Rabbit {
	return &Rabbit{
		log: log,
		cfg: cfg,
	}
}

func (r *Rabbit) DefaultQueue() string {
	return r.cfg.DefaultQueue
}

func (r *Rabbit) Connect() error {
	url := fmt.Sprintf("amqp://%s:%s@%s:%s", r.cfg.User, r.cfg.Password, r.cfg.Host, r.cfg.Port)

	conn, err := amqp.Dial(url)
	if err != nil {
		return fmt.Errorf("failed to connect to RabbitMQ: [%w]", err)
	}

	r.conn = conn
	return nil
}

func (r *Rabbit) Channel() (*amqp.Channel, error) {
	return r.conn.Channel()
}

func (r *Rabbit) Reconnect() {
	for {
		_, ok := <-r.conn.NotifyClose(make(chan *amqp.Error))
		if !ok {
			r.log.Error("failed notifying rabbitMQ channel. Reconnecting...")
		}
		r.log.Error("rabbitmq connection closed unexpectedly. Reconnecting...")

		for {

			err := r.Connect()

			if err == nil {
				r.log.Info("rabbitMQ reconnect success")
				break
			}

			r.log.Error("rabbitmq reconnect failed. Retry after 10 seconds", sl.Err(err))
			time.Sleep(10 * time.Second)
		}

	}
}

func (r *Rabbit) InitQueue(ch *amqp.Channel, queue string) error {

	dlQueue := queue + "_dead"

	_, err := ch.QueueDeclare(
		dlQueue,
		true,
		false,
		false,
		false,
		amqp.Table{},
	)

	if err != nil {
		return fmt.Errorf("failed to declare dead letter queue: [%w]", err)
	}

	_, err = ch.QueueDeclare(
		queue,
		true,
		false,
		false,
		false,
		amqp.Table{
			"x-dead-letter-exchange":    "",
			"x-dead-letter-routing-key": dlQueue,
			"x-message-ttl":             60480000,
		},
	)

	if err != nil {
		return fmt.Errorf("failed to declare queue: [%w]", err)
	}
	return nil
}

func (r *Rabbit) Consume(ch *amqp.Channel, queue string) (msgs <-chan amqp.Delivery, err error) {
	msgs, err = ch.Consume(
		queue,
		"",
		false,
		false,
		false,
		false,
		nil,
	)
	if err != nil {
		return nil, fmt.Errorf("failed to consume queue: [%w]", err)
	}
	return msgs, nil
}

func (r *Rabbit) Publish(queue string, message []byte) error {
	ch, err := r.Channel()
	if err != nil {
		return fmt.Errorf("failed to open channel: [%w]", err)
	}

	defer ch.Close()

	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	err = ch.PublishWithContext(
		ctx,
		"",
		queue,
		false,
		false,
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        message,
		},
	)

	if err != nil {
		return fmt.Errorf("failed to publish message: [%w]", err)
	}
	return nil
}

func (r *Rabbit) Connection() *amqp.Connection {
	return r.conn
}

Command

Создадим BaseCommand с общим методом Execute и две реализации: DocumentCommand (LibreOffice) и VideoCommand (MPEG).

// internal/lib/command/command.go
func (bs *BaseCommand) Execute() error {

	if err := bs.validate(); err != nil {
		return fmt.Errorf("failed validate transform task: [%w]", err)
	}

	directory := bs.DownloadDir()

	err := os.MkdirAll(directory, 0755)

	if err != nil {
		return fmt.Errorf("error creating directory [%s]: [%w]", directory, err)
	}

	filePath := bs.genTmpFilePath(directory)

	err = retry.Do(
		func() error {
			return bs.uploader.Download(bs.task.File, filePath, bs.MaxSize())
		},
		retry.Attempts(3),
		retry.OnRetry(func(n uint, err error) {
			time.Sleep(1 * time.Second)
		}),
	)

	bs.uploader.AddFileToDelete(filePath)

	defer bs.uploader.DeleteFiles()

	if err != nil {
		return fmt.Errorf("error download file [%s]: [%w]", bs.task.File, err)
	}

	bs.file = filePath

	for _, format := range bs.task.Formats {

		if _, ok := bs.files[format]; ok {
			continue
		}
		pre, err := bs.preConvert(format, filePath)
		if err != nil {
			return err
		}
		if pre {
			continue
		}

		convertedFile, err := bs.transform(format, filePath)
		bs.uploader.AddFileToDelete(convertedFile)
		if err != nil {
			return fmt.Errorf("error transform file [%s] to [%s]: [%w]", bs.task.File, format, err)
		}
		bs.files[format] = convertedFile
	}

	bs.uploader.SetFiles(bs.files)

	err = bs.uploader.UploadFiles()
	if err != nil {
		return fmt.Errorf("error uploading files: [%w]", err)
	}

	err = bs.uploader.Complete()
	if err != nil {
		return fmt.Errorf("failed complete: [%w]", err)
	}
	return nil
}

Consumer

В текущей реализации запускаем 3 горутины на каждую очередь, в одном контейнере.

Настроим graceful завершение (с таймаутом в 5 минут) и реконнект к rabbitmq (в случае разрыва соединения обработчики будут подключаться заново и после восстановления соединения продолжат работу в штатном режиме)

// cmd/consumer/main.go
func main() {

	cfg := config.MustLoad()

	logger := sl.SetupLogger(cfg.Env)

	rabbit := rabbitmq.New(logger, cfg.Rabbit)

	conErr := rabbit.Connect()
	if conErr != nil {
		log.Fatalf("failed connect to RabbitMQ with start %v", conErr)
	}

	go rabbit.Reconnect()

	cancelCtx, cancel := context.WithCancel(context.Background())
	wg := &sync.WaitGroup{}
	for i := 0; i <= 3; i++ {
		for _, queue := range queues {

			go func() {
				uniqId := fmt.Sprintf("%s_%d", queue, i)
				logger.Info("start consumer", slog.String("queue", queue))
				wg.Add(1)
			done:
				for {
					time.Sleep(10 * time.Second)
					ch, err := rabbit.Channel()
					if err != nil {
						logger.Error("failed to open channel. Retry", slog.String("queue", queue), sl.Err(err))
						continue
					}
					defer ch.Close()

					err = rabbit.InitQueue(ch, queue)

					if err != nil {
						logger.Error("failed init queue. Retry", slog.String("queue", queue), sl.Err(err))
						continue
					}

					logger.Info("success init queue", slog.String("queue", queue))
					msgs, err := rabbit.Consume(ch, queue)

					if err != nil {
						logger.Error("failed consume. Retry", slog.String("queue", queue), sl.Err(err))
						continue
					}

					logger.Info("success consume. Waiting messages", slog.String("queue", queue))
				closed:
					for {
						select {
						case <-cancelCtx.Done():
							break done
						default:
						}
						select {
						case d, ok := <-msgs:
							if !ok {
								logger.Info("channel closed", slog.String("queue", queue))
								break closed
							}
							handleMessage(d, logger, cfg, uniqId)
						default:
						}
                        time.Sleep(1 * time.Second)
					}
				}
				wg.Done()
			}()

		}
	}
	waitCh := make(chan struct{})
	ch := make(chan os.Signal)
	signal.Notify(ch, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGINT, syscall.SIGKILL)

	<-ch
	logger.Info("receive a shutdown signal")
	go func() {
		logger.Info("cancel, wait consumer")
		cancel()
		wg.Wait()
		close(waitCh)
	}()

	select {
	case <-waitCh:
		logger.Info("graceful shutdown")
	case <-time.After(5 * time.Minute):
		logger.Info("shutdown before 5 minutes timeout")
	}

}

Логирование

Для логирования используем библиотеку slog, для группировки логов используем параметр request_id.

Dead Letter Queue

Для каждой очереди инициализируем одноимённую очередь с префиксом dead.

В случае ошибки конвертации делаем reject, и задание попадает в dead очередь, для последующего анализа. По умолчанию время жизни сообщения - неделя.

Где посмотреть?

Сервер разместил на GitHub, текущая версия 1.0.0. В дальнейшем возможно будет обновляться. Инструкция по развертыванию прилагается.

Итого

Надеюсь, это решение упростит отказ от CentOS там, где это необходимо. При возникновении проблем создавайте issue на GitHub.