Всем привет!
В этой статье мы рассмотрим следующий кейс. Есть некоторый внешний поставщик zip архивов, эти архивы содержат в себе множество папок, которые в свою очередь содержат различные js, html и css файлы. Архивы являются 3D турами, для обзора различных помещений музеев. Наша задача, как бекенд разработчика, предоставить пользователю возможность загружать эти архивы на сервер, для последующего просмотра их на соотвествующих страницах веб-сайта.
Графическое администрирование файлов, может осуществляться посредством сторонних средств бекенда. Например сайт, на основе Symfony, бандл SonataAdmin либо подобного.
Размер архива от 80 мб и выше, может доходить до 2-3 Гб.
Среднее кол-во файлов в одном архиве ~ 15 000 шт.
Задачи сервиса
Автоматизировать загрузку файлов в хранилище, загружать файлы посретством REST апи, контролировать сохранность файлов, возвращать фронту url до main точки каждого архива.
Стек.
Подготовим наш тестовый бекенд стек.
Хранилище файлов - S3 MinIO
Мета-данные - Postgresql
И собственно сам golang
Структура go-проекта.
Ниже представлена структура нашего проекта.

- Docker/docker-compose.yaml - наши minio и postgresql.
- cmd/main.go - главный файл приложения.
- internal/controller/loader.go - контроллер апи.
- internal/handler/file_bd_handler.go - обработчик архива для БД postgresql.
- internal/handler/file_handler.go - общий обработчик архива.
- internal/handler/file_storage_handler.go - обработчик архива для S3 Minio.
- internal/listener/event_listener.go - слушатель событий загрузки файлов (как для каждого файла в архиве, так и для архива в целом).
- internal/logger/logger.go - логер
- internal/repository/repository.go - интерфейс и структура-обёртка для работы с этим интерфейсом.
- internal/repository/pg_repository.go - ��еализация интерфейса для postgresql.
- internal/storage/storage.go - интерфейс для определения методов для работы с хранилищем.
- internal/storage/minio_storage.go - реализация интерфейса для S3 Minio.
- internal/pkg/file_actions.go - пакет функций для работы с файлами.
- config.go - работа с конфигурацией приложения.
- config.yaml - конфигурация приложения.
Тестовая конфигурация приложения
##Режим работы логирования## env: "dev" ##Адрес и порт web-сервера## url: "localhost" port: 8080 ##Путь до временной директории хранения архивов на сервере## load_dir: "./public" ##Ограничение на кол-во одновременно загружаемых архивов в одном http запросе## file_count_group: 10 ##Максимальное кол-во файлов которое может быть в одной группе, при делении zip архива ##на группы файлов для последующей загрузки в хранилище## size_zip_group: 2500 storage: end_point: "localhost:9010" access_key: "" secret_key: "" bucket: "loader-test" region: "ru-1" ##Наименование общей для всех архивов директории## name_path: "test" ##Наименование файла, "точки", наличие которой считается условно успешной ##загрузкой архива в хранилище## point: "Tour.html" database_url: "postgres://root:12345@localhost:54330/loader" database_table: "tour" ## Статусы обработки архива## statuses: ##Успех## ok: 6 ##Ошибка## err: 0 ##Загружено на сервер## server: 4 ##В процессе загрузки в хранилище## process_storage: 2
Опишу принцип работы приложения на основе конфигурации.
Каждый архив, загружаемый в апи, сохраняется в указанную в конфигурации папку, путь к ней указывается в параметре - load_dir. Приложением поддерживается multi-part загрузка, за ёё настройку отвечает параметр - file_count_group в котором указывается ограничение одновременно загружаемых архивов в запросе. После загрузки архива на сервер, начинается его параллельная распаковка и параллельно запускается загрузка в хранилище S3 Minio.
Т.к наше приложение нацелено на работу с объёмными архивами, то для ускорения их загрузки в хранилище, реализован функционал разбиения архива на группы файлов. Каждая группа содержит некоторое кол-во файлов, максимальное их кол-во указывается в парамете - size_zip_group.
В конфигурации присутствуют настройки S3 Minio и Postgresql , параметры - storage и database_url с database_table, рассмотрим их поподробнее. При настройке хранилища указываются стандартные ключи, но помимо них в конфигурации присутствуют дополнительные параметры - storage.name_pathи storage.point. Первый - отвечает за наименование объекта под которым будут храниться все архивы, а второй за наименование и тип искомого файла, так называемого main файла. Наличие данного файла считается условно успешной загрузкой тура в хранилище, путь до этого файла хранится в БД и передаётся фронту, для отображения тура на странице.
Стоит расписать принцип хранения файлов в Postgres и S3 Minio.
При запуске приложения в БД Postgres создаётся новая таблица (либо проверяется наличие таблицы, с необходимой структурой). Название таблицы берется из параметра (database_table). В начале обработки для каждого архива создаётся отдельная запись. (см. структура таблицы).
Столбец | Тип | Описание |
|
| Ключ |
|
| Статус загрузки |
|
| Тех. наименование |
|
| Ссылка до main файла, для отправки фронту |
|
| Дата и время последнего обновления |
|
| Текст ошибки (при наличии) |
В процессе обработки архива ему присваиваются статусы, они настраиваются так же в конфигурации, через параметр (statuses).
Код инициализации приложения
Весь код скидывать не вижу смысла, поэтому разобью его на основные блоки.
Подключаем конфигурацию.
flag.StringVar(&path, "c", "./config.yaml", "путь до конфиг. файла") flag.Parse() ctx := context.Background() cfg, err := goloader.NewConfig(path) if err != nil { log.Fatal(err) }
2. Инициализируем новый логер.
appLogger := logger.InitLogger(os.Stdout, cfg.Env)
3. Подключаемся к БД Postgres.
pgStg := repository.PgSettings{ Url: cfg.DataBaseUrl, Table: cfg.DataBaseTable, } pg, err := repository.PgInit(context.WithoutCancel(ctx), pgStg, true) if err != nil { appLogger.Error("Инициализация подключения к БД Postgres", slog.String("error", err.Error())) return }
4. Создаём клиента S3 Minio.
minio, err := storage.MinioInit(ctx, *cfg.S3, appLogger) if err != nil { appLogger.Error("Инициализация подключения к S3 Minio", slog.String("error", err.Error())) return }
5. Инициализируем загрузчик и объявляем апи роуты.
loader := controller.InitLoader(repository.InitFileFactory(pg), minio, *cfg, appLogger) mux := http.NewServeMux() mux.HandleFunc("/down", loader.Down(ctx)) mux.HandleFunc("/del", loader.Delete(ctx)) mux.HandleFunc("/check", loader.Sync(ctx))
На этом шаге остановимся подробнее, по сути это и есть связующее звено всех компонентов приложения. Его описание в следующем разделе.
6. Создаём и запускаем сервер.
server := &http.Server{ Addr: cfg.Url + ":" + strconv.Itoa(cfg.Port), IdleTimeout: 10 * time.Second, Handler: mux, } go func() { appLogger.Debug("Запуск http сервера") err = server.ListenAndServe() if err != nil && !errors.Is(err, http.ErrServerClosed) { appLogger.Error("Запуск http сервера", slog.String("error", err.Error())) } }()
Loader
Перейдем к главному компоненту системы - загрузчику, структура ниже.
type Loader struct { //Фабрика файлов, для работы с файлами в БД Postgres ff *repository.FileFactory //Интерфейс хранилища st storage.FileStorage //Конфигурация cfg goloader.Config //Логер log *slog.Logger }
Код инициализации.
func InitLoader(ff *repository.FileFactory, st storage.FileStorage, cfg goloader.Config, log *logger.Logger) *Loader { httpLogger := log.WithGroup("http") return &Loader{ ff: ff, st: st, cfg: cfg, log: httpLogger, } }
Надо заметить, тут наше подключение repository.PgInit дополнительно оборачивается в фабрику, подробнее об этом дальше в статье.
Методы загрузчика.
// Down - загрузить архив в БД и хранилище [POST] func (l *Loader) Down(ctx context.Context) func(w http.ResponseWriter, r *http.Request) // Delete - удалить архив из БД и хранилища [DELETE] func (l *Loader) Delete(ctx context.Context) func(w http.ResponseWriter, r *http.Request) // Sync - синхронизация архивов в БД и в хранилище [GET] func (l *Loader) Sync(ctx context.Context) func(w http.ResponseWriter, r *http.Request)
Методы содержат отдельные условия проверки типа http запроса и валидации нужных get параметров.
Использую стандартный пакет net/http, считаю для данного микросервиса его более чем достаточно.
Все методы возвращают следующую структуру ответа.
type Result struct { //сообщение Message string `json:"message"` //слайс сообщений валидации Valid []string `json:"valid,omitempty"` //слайс подробной информации Details []string `json:"details,omitempty"` }
Чтобы не растягивать статью, опишу основной и самый главный метод - "Down", который представляет можно сказать 80% всего функционала микросервиса.
Метод "Down"
По аналогии с предыдущим описанием, разобью метод на блоки кода.
Основная задача этого метода, пройтись по part (разделам) multipart тела запроса и запустить для каждого отдельный handler.
Метод принимает get-параметр - names это массив, в методе идёт сопоставление файл -> наименование.
Проходимся в цикле по разделам. Handler запускаются только для разделов с key = "files", вилидации в описании пропускаю.
var countFile uint8 var wg *sync.WaitGroup wg = &sync.WaitGroup{} // новая группа синхронизации, для группы handler for { var part *multipart.Part part, err = mr.NextPart() /*****тут код валидации******/ err = l.downHandler(ctx, part, names[countFile], wg) if err != nil && !errors.Is(err, handler.IsNotValid) { code = http.StatusInternalServerError res.Message = err.Error() break } if errors.Is(err, handler.IsNotValid) { res.Valid = append(res.Valid, fmt.Sprintf("Файл %s загружен не будет, т.к его тип неподдерживается", part.FileName())) continue } countFile++ }
Вызываю функцию downHandler, она создаёт группу handler, которые предназначенны для обработки архива. Получается, что для каждого архива создаётся отдельная группа обработчиков.
Аргументы функции downHandler - контекст, интерфейс для чтения потока данных, техническое наименование архива (которое указал пользователь в запросе), а так же группа для синхронизации обработчиков.
// downHandler - обработчик загрузки zip файла func (l *Loader) downHandler(ctx context.Context, r io.Reader, name string, wg *sync.WaitGroup) error { wg.Add(1) //добавляем один элемент в группу sth := handler.FileStorageHandler{ Group: l.cfg.S3.NamePath, Name: name, Storage: l.st, Object: l.cfg.S3.Point, } h := handler.FileHandler{ Dir: l.cfg.LoadDir, ParentDir: name, Handler: sth, } bdh := handler.FileBDHandler{ Log: l.log, Status: l.cfg.Statuses, Wg: wg, Ff: l.ff, Handler: sth, } he := &listener.EventHandler{ Workers: map[int]func(he *listener.EventHandler, event listener.FileEvent){ listener.Start: bdh.StartEvent(), listener.Error: bdh.ErrorEvent(), listener.OK: bdh.OkEvent(), }, Default: bdh.Event(), } err := h.Start(ctx, r, listener.Listener{Handler: he, Logger: l.log}, l.cfg.SizeZipGroup) if err != nil { l.log.Error("Обработка zip архива", slog.String("Ошибка", err.Error())) } return err }
После прохода цикла for по разделам multipart, запускается, ниже представленная, функция. Она отслеживает завершение загрузки всех архивов, и возвращает результат в консоль. (так же стоит учитывать, что, завершение не равно успех).
go func() { wg.Wait() l.log.Debug("Загрузка файлов закончена") }()
Handlers
Как вы видели выше, функция downHandler содержит ряд обработчиков, начнем с основного, ниже его структура.
handler.FileHandler{ //Директория для хранения архивов на сервере Dir: l.cfg.LoadDir, //Наименование временного файла на сервере ParentDir: name, //Handler для загрузки файлов в minio Handler: sth, }
Задача основного обработчика запустить загрузку архива на сервер, разбить его на группы и отправить на загрузку в s3 minio с использованием minio handler. Вот ряд методов которые он для этого использует.
1.Метод Start - запуск загрузки zip файлов (публичный метод)
func (handler *FileHandler) Start(ctx context.Context, r io.Reader, l listener.Listener, countFilesInGroup int) error
Код
func (handler *FileHandler) Start(ctx context.Context, r io.Reader, l listener.Listener, countFilesInGroup int) error { var err error st, r := pkg.ValidateZipFile(r) if !st { return IsNotValid } fl := l.Listener() nf, err := handler.loadFilesServer(r, fl) if err != nil { err = errors.Join(err, pkg.RemoveTmpFile(handler.ParentDir, nf.name)) fl.CloseFlow() return err } nf.gr = countFilesInGroup go handler.start(ctx, fl, nf) return nil }
2.Метод start - запуск обработки zip архива и загрузка его в хранилище, на этом этапе архив загружен на сервер и заполнена внутренняя структура newFile
func (handler *FileHandler) start(ctx context.Context, fl *listener.Listener, nf newFile)
Код
func (handler *FileHandler) start(ctx context.Context, fl *listener.Listener, nf newFile) { defer func(dir string, nf newFile, fl *listener.Listener) { err := errors.Join(nf.f.Close(), pkg.RemoveTmpFile(dir, nf.name)) if err != nil { fl.Add(listener.FileEvent{ Parent: dir, Name: nf.name, Description: "Завершение загрузки архива в хранилище", Status: listener.Error, Err: err, }) } fl.CloseFlow() }(handler.Dir, nf, fl) handler.Handler.Delete(fl) fl.Add(listener.FileEvent{ Parent: handler.ParentDir, Description: "Запуск операции чтения архива и загрузки его в хранилище", Status: listener.StorageStart, }) zr, err := zip.NewReader(nf.f, nf.b) if err != nil { fl.Add(listener.FileEvent{ Parent: handler.ParentDir, Description: "Чтение архива", Status: listener.Error, Err: err, }) return } if !handler.partZipLoadStorage(ctx, fl, handler.splitZip(zr, nf)) { fl.Add(listener.FileEvent{ Parent: handler.ParentDir, Status: listener.OK, }) } else { handler.Handler.Delete(fl) } }
Стоти упомянуть, что внутри пакета, обработчик формирует вот такую структуру файла и передаёт ее между своими приватными методами.
type newFile struct { //Наименование файла name string //Указатель на файл f *os.File //Кол-во записанных байт во временный файл b int64 //Кол-во файлов в группе (если разбиваем архив на группы файлов) gr int }
3.Метод partZipLoadStorage - загрузка частей архива в хранилище S3 Minio.
func (handler *FileHandler) partZipLoadStorage(ctx context.Context, fl *listener.Listener, gr [][]*zip.File) bool
Код
func (handler *FileHandler) partZipLoadStorage(ctx context.Context, fl *listener.Listener, gr [][]*zip.File) bool { ctx, cancel := context.WithCancel(ctx) defer cancel() var st bool wg := &sync.WaitGroup{} for _, files := range gr { wg.Add(1) go func(ctx context.Context, files []*zip.File, wg *sync.WaitGroup) { var err error err = handler.Handler.Load(ctx, files, fl) if (err != nil || errors.Is(err, IsStop)) && !errors.Is(err, io.EOF) { st = true cancel() fl.Add(listener.FileEvent{ Parent: handler.ParentDir, Description: "Загрузка группы файлов закончилась ошибками", Status: listener.Error, Err: IsStop, }) } wg.Done() }(ctx, files, wg) } wg.Wait() return st }
4.Метод splitZip - разбить архив на части
func (handler *FileHandler) splitZip(r *zip.Reader, nf newFile) [][]*zip.File
Код
func (handler *FileHandler) splitZip(r *zip.Reader, nf newFile) [][]*zip.File { var ( gr [][]*zip.File fs []*zip.File ) if nf.gr <= 0 { nf.gr = defaultCountFilesInGroup } for _, zf := range r.File { if zf.FileInfo().IsDir() { continue } fs = append(fs, zf) if len(fs)%nf.gr == 0 { gr = append(gr, fs) fs = []*zip.File{} } } if len(fs) > 0 { gr = append(gr, fs) } return gr }
5.loadFilesServer - загрузить файлы на сервер
func (handler FileHandler) loadFilesServer(r io.Reader, fl listener.Listener) (newFile, error)
Код
func (handler *FileHandler) loadFilesServer(r io.Reader, fl *listener.Listener) (newFile, error) { var err error var f *os.File name := uuid.New().String() + ".zip" fl.Add(listener.FileEvent{ Parent: handler.ParentDir, Name: name, Status: listener.Start, Description: "Запуск загрузки", }) f, err = os.Create(filepath.Join(handler.Dir, name)) if err != nil { fl.Add(listener.FileEvent{ Parent: handler.ParentDir, Name: name, Status: listener.Error, Description: "Создание временного файла", Err: err, }) return newFile{name: name}, err } b, err := io.Copy(f, r) if err != nil { fl.Add(listener.FileEvent{ Parent: handler.ParentDir, Name: name, Status: listener.Error, Description: "Копирование архива во временный файл", Err: err, }) return newFile{name: name}, err } _, err = f.Seek(0, 0) if err != nil { fl.Add(listener.FileEvent{ Parent: handler.ParentDir, Name: name, Status: listener.Error, Description: "Сдвиг указателя на начало файла", Err: err, }) return newFile{name: name}, err } fl.Add(listener.FileEvent{ Parent: handler.ParentDir, Name: name, Status: listener.Server, Description: "Копирование на сервер завершено", }) return newFile{name: name, f: f, b: b}, nil }
Далее перейдем к структуре storage handler. Этот обработчик (его методы) используются главным обработчиком загрузки.
handler.FileStorageHandler{ //Наименование главного раздела в S3, куда будут загружаться все файлы Group: l.cfg.S3.NamePath, //Тех. наименование, раздела в который попадут все файлы Name: name, //Интерфейс для работы с файлами Storage: l.st, //Наименование main файла Object: l.cfg.S3.Point, }
Задача storage handler собственно загрузить/удалить/проверить наличие файлов в хранилище. Ниже его методы.
1.Метод Load - загрузить файлы в хранилище
func (handler *FileStorageHandler) Load(ctx context.Context, files []*zip.File, fl *listener.Listener) error
Код
func (handler *FileStorageHandler) Load(ctx context.Context, files []*zip.File, fl *listener.Listener) error { var err error for _, file := range files { select { case <-ctx.Done(): return errors.Join(err, IsStop) default: var fCloser io.ReadCloser fCloser, err = file.Open() if err != nil { fl.Add(listener.FileEvent{ Parent: handler.Name, Name: file.Name, Description: "Чтение файла из архива", Status: listener.StorageProcess, Err: err, }) break } var ctt string var fReader io.Reader fReader, err = pkg.GetTypeFile(fCloser, &ctt) fl.Add(listener.FileEvent{ Parent: handler.Name, Name: file.Name, Description: "Обработка файла", Status: listener.StorageProcess, }) if err != nil && !errors.Is(err, io.EOF) { fl.Add(listener.FileEvent{ Parent: handler.Name, Name: file.Name, Description: "Ошибка чтения файла из архива, попытка определения Content Type", Status: listener.StorageProcess, Err: err, }) break } stg := storage.Stg{ Name: filepath.Join(handler.Group, handler.Name, file.Name), Size: int64(file.UncompressedSize64), ContentType: ctt, } err = errors.Join(err, handler.Storage.Put(ctx, fReader, stg), fCloser.Close()) if err != nil && !errors.Is(err, io.EOF) { fl.Add(listener.FileEvent{ Parent: handler.Name, Name: file.Name, Description: "Ошибка загрузки файла", Status: listener.StorageProcess, Err: err, }) break } } } return err }
2.Метод Delete - удалить файл из хранилища
func (handler *FileStorageHandler) Delete(fl *listener.Listener)
Код
func (handler *FileStorageHandler) Delete(fl *listener.Listener) { _, err := handler.Storage.Delete(context.Background(), storage.Stg{Name: handler.Name, Dir: handler.Group}) if err != nil { fl.Add(listener.FileEvent{ Parent: handler.Name, Description: "Удаление из хранилища", Status: listener.Error, Err: err, }) } }
3.Метод IsObj - проверить существование объекта
func (handler *FileStorageHandler) IsObj(ctx context.Context, file repository.File) (repository.File, error)
Код
func (handler *FileStorageHandler) IsObj(ctx context.Context, file repository.File) (repository.File, error) { var message string st, key, err := handler.Storage.IsObj(ctx, storage.Stg{Dir: handler.Group, Name: handler.Name, Point: handler.Object}) if !st || err != nil || len(key) == 0 { message = fmt.Sprintf("Объект %s не обнаружен в хранилище, для файла %s", handler.Object, file.TechnicalName) file.Url = "" file.Err = message return file, errors.New(message) } else { file.Url = key file.Err = "" return file, nil } }
Репозиторий, методы PGSQL
Перед переходом к описанию последнего обработчика, стоит упомянуть методы репозитория файлов, а так же привести описание структуры для сохранения мета. данных в БД.
Ниже представлена структура файла, который сохраняется в БД. Поля структуры соответствуют с полями в таблице, из начала статьи (там же есть их описание).
type File struct { Id uuid.UUID Status StatusDown TechnicalName string Url string DateUpdate time.Time Err string }
Ниже интерфейс для репозитория, содержит набор функций для работы с БД.
type FileRepository interface { //создать новые записи Create(context.Context, []File) error //найти запись(-и) FindBy(context.Context, Filter) ([]File, error) //обновить записи Update(context.Context, []File) error //удалить запись(-и) Delete(context.Context, Filter) error //получить кол-во записей Count(context.Context, Filter) (int, error) }
Для фильтрации данных заранее создана такая структура фильтра.
type Filter struct { //Слайс наименований Names []string //Слайс Id Ids []uuid.UUID //Слайс статусов Statuses []StatusDown //Слайс Id которые нужно исключить ExcludeIds []uuid.UUID //Слайс статусов которые нужно исключить ExcludeStatuses []StatusDown //Дата обновления, До DateTo time.Time //Дата обновления, От DateFrom time.Time //Искать записи учитывающие все полученные в структуре фильтры (AND) по умолчанию OR IsAll bool //Ограничение по записям Limit uint64 }
Описанный интерфейс, реализован в пакете internal/repository/pg_repository.go, там же идёт и работа с фильтрами. В этой статье его не описываю, дабы не растягивать ее еще больше.
Для реализации набора функций под конкретную задачу, создана фабрика файлов.
type FileFactory struct { //Механизм управления файлами в БД rep FileRepository }
Ниже представлены методы фабрики.
1.Метод FindByNames - извлекает файлы из БД по наименованиям
func (fF *FileFactory) FindByNames(names []string) ([]File, error)
Код
func (fF *FileFactory) FindByNames(names []string) ([]File, error) { if len(names) == 0 { return nil, nil } files, err := fF.rep.FindBy(context.Background(), Filter{Names: names}) if err != nil { return nil, err } return files, nil }
2.Метод FindByName - извлекает файл из БД по наименованию
func (fF *FileFactory) FindByName(names string) (File, error)
Код
func (fF *FileFactory) FindByName(names string) (File, error) { files, err := fF.rep.FindBy(context.Background(), Filter{Names: []string{names}}) if err != nil { return File{}, err } if len(files) == 0 { return File{}, nil } return files[0], nil }
3.Метод SaveFile - сохранить или обновть файл в БД (по флагу up, если true выполняет UPDATE)
func (fF *FileFactory) SaveFile(f File, up bool) error
Код
func (fF *FileFactory) SaveFile(f File, up bool) error { var err error if !up { err = fF.rep.Create(context.Background(), []File{f}) } else { err = fF.rep.Update(context.Background(), []File{f}) } return err }
4.Метод FindByDate - извлекает файлы из БД от текущей даты и на 1 год младше.
func (fF *FileFactory) FindByDate(limit uint64) ([]File, error)
Метод FindByDate используется в апи, при синхронизации архивов в БД и в хранилище, извлекает файлы из БД с указанным лимитом, от текущей даты и на год младше.
Код
func (fF *FileFactory) FindByDate(limit uint64) ([]File, error) { ctx := context.Background() if limit == 0 { limit = LimitDefault } filter := Filter{ DateFrom: time.Now().AddDate(-1, 0, 0), DateTo: time.Now(), IsAll: true, Limit: limit, } return fF.rep.FindBy(ctx, filter) }
5.Метод DeleteByNames - удалить файлы по названию в БД
func (fF *FileFactory) DeleteByNames(names []string, ExcludeSt StatusDown) error
Метод DeleteByNames удаляет перечень файлов по их техническим наименованиям, исключая при этом файлы с указанными статусами.
Код
func (fF *FileFactory) DeleteByNames(names []string, ExcludeSt StatusDown) error { if len(names) == 0 { return nil } filter := Filter{ Names: names, ExcludeStatuses: []StatusDown{ExcludeSt}, IsAll: true, } return fF.rep.Delete(context.Background(), filter) }
Теперь перейдем к описанию последнего обработчика, это обработчик загрузки файла в БД, вот его структура.
handler.FileBDHandler{ //Логер Log: l.log, //Конфигурация статусов Status: l.cfg.Statuses, //Группа для синхронизации Wg: wg, //Фабрика файлов, для работы с файлами в БД Postgres Ff: l.ff, //Handler для обработки загрузки в хранилище Handler: sth, }
В него мы передаём handler для загрузки в хранилище, т.к при сохранении статусов в БД, необходимо проверять наличие main файла. А его наличие как я и описывал выше, считается условно успешным завершением загрузки тура.
Как видно сервис узкоспециализированный, т.к если загрузить архив без нужного main файла тур не запустится на странице.
Все статусы фиксируются в БД, и могут быть подхвачены сторонними приложениями, это конечно не потоковое web-soket вещание, но при желании можно переделать и на него.
Ниже его методы.
1. Метод StartEvent - обработчик событий запуска загрузки файла.
func (handler *FileBDHandler) StartEvent() func(he *listener.EventHandler, event listener.FileEvent)
Код
func (handler *FileBDHandler) StartEvent() func(he *listener.EventHandler, event listener.FileEvent) { return func(he *listener.EventHandler, event listener.FileEvent) { up := true f, err := handler.Ff.FindByName(event.Parent) if err != nil { handler.Log.Error("Ошибка при получении информации по файлу с БД", slog.String("error", err.Error()), slog.String("file", event.Parent)) } if f.Id == uuid.Nil { f = repository.File{ Id: uuid.New(), TechnicalName: event.Parent, DateUpdate: time.Now(), } up = false } f.DateUpdate = time.Now() f.Err = "" _ = handler.Ff.SaveFile(f, up) he.F = f } }
2. Метод Event - обработчик событий загрузки на сервер и в хранилище.
func (handler *FileBDHandler) Event() func(he *listener.EventHandler, event listener.FileEvent)
Код
func (handler *FileBDHandler) Event() func(he *listener.EventHandler, event listener.FileEvent) { return func(he *listener.EventHandler, event listener.FileEvent) { if event.Status == listener.Server { he.F.Status = handler.Status.Server } if event.Status == listener.StorageStart { he.F.Status = handler.Status.ProcessStorage } err := handler.Ff.SaveFile(he.F, true) if err != nil { handler.Log.Error("Ошибка при обновлении записи в БД", slog.String("error", err.Error()), slog.String("file", event.Parent)) } } }
3. Метод OkEvent - обработчик событий успешной загрузки.
func (handler *FileBDHandler) OkEvent() func(he *listener.EventHandler, event listener.FileEvent)
Код
func (handler *FileBDHandler) OkEvent() func(he *listener.EventHandler, event listener.FileEvent) { return func(he *listener.EventHandler, event listener.FileEvent) { var err error he.F, err = handler.Handler.IsObj(context.Background(), he.F) if err != nil { he.F.Status = handler.Status.Err } else { he.F.Status = handler.Status.Ok } err = handler.Ff.SaveFile(he.F, true) if err != nil { handler.Log.Error("Ошибка при обновлении записи в БД", slog.String("error", err.Error()), slog.String("file", event.Parent)) } handler.Wg.Done() } }
4. Метод ErrorEvent - обработчик событий ошибки загрузки
func (handler *FileBDHandler) ErrorEvent() func(he *listener.EventHandler, event listener.FileEvent)
Код
func (handler *FileBDHandler) ErrorEvent() func(he *listener.EventHandler, event listener.FileEvent) { return func(he *listener.EventHandler, event listener.FileEvent) { he.F.Status = handler.Status.Err he.F.Err = event.Err.Error() err := handler.Ff.SaveFile(he.F, true) if err != nil { handler.Log.Error("Ошибка при обновлении записи в БД", slog.String("error", err.Error()), slog.String("file", event.Parent)) } handler.Wg.Done() } }
Перейдём к запуску загрузки, и к логике взаимодействия обработчиков, вызываемых в функции downHandler. Запуск осуществляется вызовом следующего метода. Метод вызывается у главного обработчика.
h.Start(ctx, r, listener.Listener{Handler: he, Logger: l.log}, l.cfg.SizeZipGroup)
В аргементы функции передаются - контекст, интерфейс для чтения потока данных, кол-во файлов в группе (для ограничения при делении архива), и еще одна важная для правильной работы сохранения данных, вещь, а именно слушатель загрузки архива.
Слушатель создаёт отдельный канал событий загрузки, который используется всеми handler в процессе своей работы. У слушателя есть свой handler, ниже его структура.
listener.EventHandler{ // Мапа функций, где ключ это тип события, а значение метод который будет вызван // при получении этого события Workers: map[int]func(he *listener.EventHandler, event listener.FileEvent){ listener.Start: bdh.StartEvent(), listener.Error: bdh.ErrorEvent(), listener.OK: bdh.OkEvent(), }, // Функция которая отработает по умолчанию (если событие не будет найдено в мапе выше) Default: bdh.Event(), }
При создании слушателя, в него пробрасывается его обработчик и логер приложения. По аналогии, приведу часть кода запуска слушателя. Запуск случателя осуществляет так же главный handler, вызывая метод Listener().
func (l *Listener) Listener() *Listener { log := l.Logger.WithGroup("fileEvent") l.flow = make(chan FileEvent) go func() { for fe := range l.flow { switch fe.Status { case Error: log.Error("Загрузка файла", slog.String("error", fe.Err.Error()), slog.String("technicalName", fe.Parent), slog.String("fileName", fe.Name), slog.String("description", fe.Description), ) case StorageStart: log.Debug("Начало загрузки файла", slog.String("technicalName", fe.Parent), slog.String("fileName", fe.Name), slog.String("description", fe.Description), ) case StorageProcess: log.Debug("Загрузка файла", slog.String("technicalName", fe.Parent), slog.String("fileName", fe.Name), slog.String("description", fe.Description), ) case OK: log.Info("Загрузка файла успешно завершена", slog.String("technicalName", fe.Parent), slog.String("fileName", fe.Name), slog.String("description", fe.Description), ) default: } fe.worker(l.Handler) } }() return l }
Как видно из кода выше, создаётся новый канал событий, и запускается его прослушивание. Ниже по коду для каждого полученного события канала вызывается метод события worker(). Данный метод на основе статуса события, и переданного в него handler (той самой мапы которую мы определили выше), вызывает нужную функцию обработчик. Про эти функции писалось выше (StartEvent(), Event(), OkEvent(), ErrorEvent())
Рассмотри структуру события.
type FileEvent struct { //Родительская директория Parent string //Наименование файла Name string //Статус события Status int //Описание события Description string //Ошибка Err error }
События генерируются обработчиками, и отправляются в канал слушателя.
Для отслеживания загрузки архива внутри приложения используются свои статусы отличные от статусов в конфигурации. (см. ниже). Статусы в конфигурации используются только для хранения в БД.
const ( Start = iota //Запуск обработки файла OK //Успешное завершение загрузки Server //Загрузка архива на сервер StorageStart //Запуск загрузки в хранилище StorageProcess //Процесс загрузки в хранилище Error //Ошибка )
Стоит упомянуть, событие StorageProcess игнорируется при вызове
Default: bdh.Event(), под него можно при желании настроить отдельный обработчик и добавить его в мапу, однако в данном случае это не нужно. Данное событие срабатывает для каждого файла в архиве, и нет необходимости шатать БД из за этого.
Для управления каналом вне слушателя, присутствуют отдельные методы.
// Add - добавить событие в канал func (l *Listener) Add(fe FileEvent) *Listener { if !l.stop { l.flow <- fe } return l } // CloseFlow - закрыть канал func (l *Listener) CloseFlow() { l.stop = true close(l.flow) }
Заключение
По итогу в статье я привел описание микросервиса для загрузки файлов 3D туров в хранилище S3 Minio. В целом описание может быть не достаточно подробное, я не описал все реализованные http методы, дабы не растягивать статью. Не приводил описание docker-compose.yaml и деплоя в целом, но целью статьи скорее является рассмотрение самого принципа обработки архивов. Результат работы кстати достаточно хороший по моему мнению, я проверял на 10 архивах по 529 МБ каждый (в каждом приблизительно по 28 600 файлов, небольшого объёма с высокой степенью вложенности). Загружались такие файлы около 2,5~3 минут, с учётом распаковки.
