Полный код проекта можно найти на github, а по этой ссылке посмотреть демо-версию приложения.
Краткий обзор
Идея
Идея приложения — дать пользователям возможность создавать GIF-файлы из видеороликов с YouTube. Для этого нужно просто указать ссылку на нужное видео, а также задать время начала и окончания GIF.
Функциональные возможности приложения
Создание GIF из видео на YouTube с определённым временным диапазоном.
Предварительный просмотр результата перед выполнением фактической конвертации.
Проектирование системы
Компоненты системы
Каждый компонент системы отвечает за работу в отдельности и взаимодействие с другими компонентами.
Клиентская часть на React (Next.js). Общается с бэкенд-сервером для создания новых запросов на конвертацию GIF, а также для получения информации об уже созданных конвертациях.
Бэкенд-сервер на Node. Обрабатывает запросы от клиента, а также отправляет новые задачи на конвертацию в брокер сообщений.
Node как Service Worker. Будет отвечать за выполнение / обработку задач по конвертации.
В качестве брокера сообщений — RabbitMQ. Будет выступать в качестве очереди задач (Task Queue), в которую бэкенд-сервер будет отправлять задачи, а Service Worker — потреблять задачи из неё.
Для хранения данных — MongoDB. Будет хранить информацию о задачах по конвертации GIF.
Для хранения медиафайлов — Google Cloud Storage. Будет использоваться для хранения конвертированных GIF-файлов.
Архитектура

Диаграмма последовательности
Это диаграмма показывает поток данных, включающий все компоненты, упомянутые выше.

Давайте подробнее рассмотрим бэкенд-сервер.
Бэкенд-сервер
Функциональные возможности

Как видно из приведённой выше диаграммы, бэкенд-сервер выполняет три основные функции:
Обработка запросов на конвертацию GIF путём создания новой записи задачи в базе данных.
Отправка событий в RabbitMQ, указывающих, что была создана новая задача на конвертацию (постановка задач в очередь).
Обработка запросов на получение задач путём запроса задачи по её ID из базы данных и возврата соответствующего ответа.
Архитектура проекта
Архитектура приложения состоит из трёх основных компонентов:
Обработчик маршрутов
Контроллер
Сервис
Каждый из них обладает определённым функционалом, который мы сейчас рассмотрим подробнее; а также разберёмся, почему архитектура построена именно таким образом.

Обработчик маршрутов
Отвечает за распределение запросов по обработчикам различных маршрутов. Обычно эти обработчики маршрутов состоят из массива обработчиков, которые мы называем "Middleware Chain", а конечным обработчиком в этой цепочке является контроллер маршрута (Route Controller).
Middleware Chain обычно отвечает за выполнение «проверок» входящего запроса, а также в некоторых случаях за модификацию объекта запроса. В нашем случае мы будем выполнять проверку с помощью своего обработчика-валидатора.
Контроллер
Извлекает данные из запроса, а также очищает при необходимости.
Делегирует управление соответствующему сервису.
Обрабатывает ответы.
Перенаправляет ошибки промежуточному обработчику ошибок.
Сервис
Содержит всю бизнес-логику.
Имеет доступ к данным с помощью уровня доступа к данным (ORM/ODM).
Контроллеры должны быть глупыми, то есть не иметь никаких подробностей о бизнес-логике. Всё, что они знают, это «какой сервис может обработать этот запрос», «какие данные нужны этому сервису», «как должен выглядеть ответ». Это позволяет избежать наличия толстых контроллеров (Fat Controllers).
Реализация
Схема базы данных
В этом проекте мы используем TypeORM, который представляет собой ORM с поддержкой TypeScript и поддерживает множество баз данных (мы будем использовать MongoDB).
Мы представим каждую конвертацию GIF в виде задачи (Job), которая будет нашей единственной коллекцией (Collection).
Job Collection в TypeORM выглядит так:
import { BaseEntity, Entity, ObjectID, Column, CreateDateColumn, UpdateDateColumn, ObjectIdColumn } from 'typeorm';
@Entity('jobs')
export class Job extends BaseEntity {
@ObjectIdColumn()
id: ObjectID;
@Column({
nullable: false,
})
youtubeUrl: string;
@Column({
nullable: false,
})
youtubeId: string;
@Column({
nullable: true,
})
gifUrl: string;
@Column({
nullable: false,
})
startTime: number;
@Column({
nullable: false,
})
endTime: number;
@Column({
type: 'enum',
enum: ['pending', 'processing', 'done', 'error'],
})
status: 'pending' | 'processing' | 'done' | 'error';
@Column()
@CreateDateColumn()
createdAt: Date;
@Column()
@UpdateDateColumn()
updatedAt: Date;
}
Здесь важно обратить внимание на поле status
— по сути, оно является перечислением (enum), указывающим на текущий статус конвертации GIF. Все остальные поля представляют собой стандартные данные, необходимые для выполнения конвертации.
Обработка маршрутов
У нас будет только два маршрута.
Маршрут для создания новой задачи по конвертации GIF.
Маршрут для получения данных о задачи преобразования из его идентификатора, который будет использоваться для опроса в дальнейшем на стороне клиента.
Вот как выглядит наш обработчик маршрутов:
//routes.interface
import { Router } from 'express';
interface Route {
path?: string;
router: Router;
}
export default Route;
//jobs.route.ts
import { Router } from 'express';
import { CreateJobDto } from '../../common/dtos/createJob.dto';
import Route from '../../common/interfaces/routes.interface';
import JobsController from '../../controllers/jobs.controller';
import validationMiddleware from '../middlewares/validation.middleware';
class JobsRoute implements Route {
public path = '/jobs';
public router = Router();
constructor(private jobsController = new JobsController()) {
this.initializeRoutes();
}
private initializeRoutes() {
this.router.get(`${this.path}/:id`, this.jobsController.getJobById);
this.router.post(`${this.path}`, validationMiddleware(CreateJobDto, 'body'), this.jobsController.createJob);
}
}
export default JobsRoute;
Для проверки мы используем свой обработчик-валидатор, который проверяет DTO с помощью class-validator и class-transformer
//createJob.dto
import { Expose } from 'class-transformer';
import { IsNotEmpty, IsNumber, IsString, Matches } from 'class-validator';
import { IsGreaterThan } from './validators/isGreaterThan';
import { MaximumDifference } from './validators/maximumDifference';
export class CreateJobDto {
@IsNotEmpty()
@IsString()
@Matches(/^(?:https?:\/\/)?(?:www\.)?(?:youtu\.be\/|youtube\.com\/(?:embed\/|v\/|watch\?v=|watch\?.+&v=))((\w|-){11})(?:\S+)?$/, {
message: 'Invalid youtube url',
})
@Expose()
public youtubeUrl: string;
@IsNotEmpty()
@IsNumber()
@Expose()
public startTime: number;
@IsNotEmpty()
@IsNumber()
@IsGreaterThan('startTime', {
message: 'end time must be greater than start time',
})
@MaximumDifference('startTime', {
message: 'maximum gif duration is 30 seconds',
})
@Expose()
public endTime: number;
}
Обратите внимание, что IsGreaterThan
и MaximumDifference
— это пользовательские декораторы валидации для class-validator
. По сути, они выглядят так (подробнее читайте в документации):
//isGreaterThan.ts
import { registerDecorator, ValidationOptions, ValidationArguments } from 'class-validator';
export function IsGreaterThan(property: string, validationOptions?: ValidationOptions) {
return function (object: Object, propertyName: string) {
registerDecorator({
name: 'isGreaterThan',
target: object.constructor,
propertyName: propertyName,
constraints: [property],
options: validationOptions,
validator: {
validate(value: any, args: ValidationArguments) {
const [relatedPropertyName] = args.constraints;
const relatedValue = (args.object as any)[relatedPropertyName];
return typeof value === 'number' && typeof relatedValue === 'number' && value > relatedValue;
},
},
});
};
}
MaximumDifference
выглядит аналогично, но его возврат выглядит следующим образом:
return typeof value === 'number' && typeof relatedValue === 'number' && value - relatedValue <= difference;
И теперь наш обработчик (validation middleware) выглядит следующим образом:
validation.middleware.ts
import { plainToClass } from 'class-transformer';
import { validate, ValidationError } from 'class-validator';
import { RequestHandler } from 'express';
const validationMiddleware = (type: any, value: string | 'body' | 'query' | 'params' = 'body', skipMissingProperties = false): RequestHandler => {
return (req, res, next) => {
validate(plainToClass(type, req[value]), { skipMissingProperties }).then((errors: ValidationError[]) => {
if (errors.length > 0) {
const message = errors.map((error: ValidationError) => Object.values(error.constraints)).join(', ');
res.status(400).send(message);
} else {
next();
}
});
};
};
export default validationMiddleware;
Контроллер
Наш контроллер выглядит довольно стандартно. Единственное, что можно отметить — это извлечение объекта CreateJobDto
из тела с помощью plainToClass
из class-transformer
с excludeExtraneousValues: true
, который уничтожает только помеченные поля (с декоратором @Expose()
в классе CreateJobDto
) Подробнее об этом в документации по class-transformer.
//jobs.controllers.ts
import { plainToClass } from 'class-transformer';
import { NextFunction, Request, Response } from 'express';
import { CreateJobDto } from '../common/dtos/createJob.dto';
import { Job } from '../entities/jobs.entity';
import JobsService from '../services/jobs.service';
class JobsController {
constructor(private jobService = new JobsService()) {}
public createJob = async (req: Request, res: Response, next: NextFunction): Promise<void> => {
try {
const jobDto: CreateJobDto = plainToClass(CreateJobDto, req.body, { excludeExtraneousValues: true });
const createdJob: Job = await this.jobService.createJob(jobDto);
res.status(201).json(createdJob);
} catch (error) {
next(error);
}
};
public getJobById = async (req: Request, res: Response, next: NextFunction): Promise<void> => {
try {
const jobId = req.params.id;
const job: Job = await this.jobService.findJobById(jobId);
const responseStatus = job.status === 'done' ? 200 : 202;
res.status(responseStatus).json(job);
} catch (error) {
next(error);
}
};
}
export default JobsController;
Также стоит отметить, что код состояния ответа [GET] /job/{id} равен 202, если задача на преобразование всё ещё находится в обработке. Подробнее об этом читайте в посте «Асинхронный шаблон запроса-ответа».
В случае возникновения ошибки она передаётся в обработчик ошибок (error middleware), который является последним посредником в нашей цепочке и выглядит следующим образом:
//error.middleware.ts
import { NextFunction, Request, Response } from 'express';
import { isBoom, Boom } from '@hapi/boom';
import { logger } from '../../common/utils/logger';
function errorMiddleware(error: Boom | Error, req: Request, res: Response, next: NextFunction) {
const statusCode: number = isBoom(error) ? error.output.statusCode : 500;
const errorMessage: string = isBoom(error) ? error.message : 'Something went wrong';
logger.error(`StatusCode : ${statusCode}, Message : ${error}`);
return res.status(statusCode).send(errorMessage);
}
export default errorMiddleware;
Вы можете заметить, что мы импортировали пакет под названием Boom — о нём мы поговорим позже в разделе «Сервисы».
Сервисы
Служба задач (jobs.service)
JobService
имеет всю бизнес-логику и доступ к слою Data Access, а также взаимодействует с RabbitMQ Service для отправки событий в очередь.
//jobs.service.ts
import * as Boom from '@hapi/boom';
import Container from 'typedi';
import { CreateJobDto } from '../common/dtos/createJob.dto';
import EventEmitter from '../common/utils/eventEmitter';
import { Job } from '../entities/jobs.entity';
import RabbitMQService from './rabbitmq.service';
class JobsService {
private events = {
JobCreated: 'JobCreated',
};
constructor() {
this.intiializeEvents();
}
private intiializeEvents() {
EventEmitter.on(this.events.JobCreated, (job: Job) => {
const rabbitMQInstance = Container.get(RabbitMQService);
rabbitMQInstance.sendToQueue(JSON.stringify(job));
});
}
public async findJobById(jobId: string): Promise<Job> {
const job: Job = await Job.findOne(jobId);
if (!job) throw Boom.notFound();
return job;
}
public async createJob(jobDto: CreateJobDto): Promise<Job> {
const createdJob: Job = await Job.save({ ...jobDto, youtubeId: jobDto.youtubeUrl.split('v=')[1]?.slice(0, 11), status: 'pending' } as Job);
EventEmitter.emit(this.events.JobCreated, createdJob);
return createdJob;
}
}
export default JobsService;
Сразу же вы увидите два импорта, с которыми вы можете быть незнакомы — мы быстро пройдёмся по ним, а затем подробно объясним каждую функцию в этом классе.
Boom
Используется для создания http-объектов с мощным, простым и дружелюбным интерфейсом.TypeDI
TypeDI — это мощный пакет для инъекции зависимостей, который обладает множеством фичей. Одной из таких фич является наличие Singleton Services, что мы и используем в нашем случае.
Теперь давайте подробнее рассмотрим некоторые функции в классе.
intiializeEvents()
Эта функция использует глобальный EventEmitter
, который мы используем во всём проекте, чтобы добавить слой pub/sub.
это так же просто, как
//eventEmitter.ts
import { EventEmitter } from 'events';
export default new EventEmitter();
и теперь мы можем начать прослушивать события. В частности, событие, которое мы создадим позже при создании новой задачи под названием 'JobCreated'
// Defines all the events in our service
private events = {
JobCreated: 'JobCreated',
};
private intiializeEvents() {
// Start listening for the event 'JobCreated'
EventEmitter.on(this.events.JobCreated, (job: Job) => {
// Get a singleton instance of our RabbitMQService
const rabbitMQInstance = Container.get(RabbitMQService);
// Dispatch an event containing the data of the created job
rabbitMQInstance.sendToQueue(JSON.stringify(job));
});
}
Подробнее читайте в посте «Добавление слоя Pub/Sub в Express бэкенд».
createJob()
Эта функция:
Создаёт новый документ задачи в базе данных.
Отправляет событие 'JobCreated' о том, что была создана новая задача. Таким образом, слушатель события (event listener) будет обрабатывать логику отправки этого события в службу RabbitMQ.
RabbitMQ Service
Этот сервис отвечает за подключение к серверу RabbitMQ, создание канала и инициализацию очереди, которая будет использоваться для создания задач (и которые будут потребляться нашим Service Worker-ом).
В качестве клиента для RabbitMQ Server используется amqplib.
//rabbitmq.service.ts
import { Service } from 'typedi';
import amqp, { Channel, Connection } from 'amqplib';
import { logger } from '../common/utils/logger';
@Service()
export default class RabbitMQService {
private connection: Connection;
private channel: Channel;
private queueName = 'ytgif-jobs';
constructor() {
this.initializeService();
}
private async initializeService() {
try {
await this.initializeConnection();
await this.initializeChannel();
await this.initializeQueues();
} catch (err) {
logger.error(err);
}
}
private async initializeConnection() {
try {
this.connection = await amqp.connect(process.env.NODE_ENV === 'production' ? process.env.RABBITMQ_PROD : process.env.RABBITMQ_DEV);
logger.info('Connected to RabbitMQ Server');
} catch (err) {
throw err;
}
}
private async initializeChannel() {
try {
this.channel = await this.connection.createChannel();
logger.info('Created RabbitMQ Channel');
} catch (err) {
throw err;
}
}
private async initializeQueues() {
try {
await this.channel.assertQueue(this.queueName, {
durable: true,
});
logger.info('Initialized RabbitMQ Queues');
} catch (err) {
throw err;
}
}
public async sendToQueue(message: string) {
this.channel.sendToQueue(this.queueName, Buffer.from(message), {
persistent: true,
});
logger.info(`sent: ${message} to queue ${this.queueName}`);
}
}
Код для настройки соединения / каналов / очередей довольно стандартен. Подробнее ознакомиться с этими функциями можно в документации RabbitMQ или anqplib. Единственная функция, которую нам нужно будет использовать вне этого класса — это sendToQueue()
. Она используется для отправки сообщения в нашу очередь задач, как показано в JobService, путём отправки строкового объекта Job
.
rabbitMQInstance.sendToQueue(JSON.stringify(job));
Теперь нам нужно только инициализировать RabbitMQ Service в начале работы приложения следующим образом:
import Container from 'typedi';
// Call initializeRabbitMQ() somewhere when starting the app
private initializeRabbitMQ() {
Container.get(RabbitMqService);
}
Работа нашего бэкенд-сервиса завершена. Осталость только, чтобы Node Service Worker использовал очередь задач и выполнил фактическое преобразование GIF. Давайте рассмотрим, как это сделать.
Node Service Worker
Функциональные возможности

Как видите, Service Worker отвечает за:
Потребление задач из очереди задач;
Преобразование части видео на YouTube в GIF;
Загрузку GIF в облачное хранилище;
Обновление gifUrl и статуса задачи в базе данных.
Блок-схема
Эта блок-схема наглядно показывает работу Service Worker-а.

Реализация
RabbitMQ Service
Потребление задач из очереди
RabbitMQ Service в Service Worker-е похож на RabbitMQ Service на бэкенд-сервере, за исключением одной единственной функции — startConsuming()
//rabbitmq.service.ts
import amqp, { Channel, Connection, ConsumeMessage } from 'amqplib';
import Container, { Service } from 'typedi';
import { Job } from '../entities/jobs.entity';
import ConversionService from './conversion.service';
@Service()
export default class RabbitMQService {
private connection: Connection;
private channel: Channel;
private queueName = 'ytgif-jobs';
constructor() {
this.initializeService();
}
private async initializeService() {
try {
await this.initializeConnection();
await this.initializeChannel();
await this.initializeQueues();
await this.startConsuming();
} catch (err) {
console.error(err);
}
}
private async initializeConnection() {
try {
this.connection = await amqp.connect(process.env.NODE_ENV === 'production' ? process.env.RABBITMQ_PROD : process.env.RABBITMQ_DEV);
console.info('Connected to RabbitMQ Server');
} catch (err) {
throw err;
}
}
private async initializeChannel() {
try {
this.channel = await this.connection.createChannel();
console.info('Created RabbitMQ Channel');
} catch (err) {
throw err;
}
}
private async initializeQueues() {
try {
await this.channel.assertQueue(this.queueName, {
durable: true,
});
console.info('Initialized RabbitMQ Queues');
} catch (err) {
throw err;
}
}
public async startConsuming() {
const conversionService = Container.get(ConversionService);
this.channel.prefetch(1);
console.info(' ? Waiting for messages in %s. To exit press CTRL+C', this.queueName);
this.channel.consume(
this.queueName,
async (msg: ConsumeMessage | null) => {
if (msg) {
const job: Job = JSON.parse(msg.content.toString());
console.info(`Received new job ? `, job.id);
try {
await conversionService.beginConversion(
job,
() => {
this.channel.ack(msg);
},
() => {
this.channel.reject(msg, false);
},
);
} catch (err) {
console.error('Failed to process job', job.id, err);
}
}
},
{
noAck: false,
},
);
}
}
startConsuming()
потребляет сообщение из очереди, разбирает его JSON-объект и делегирует процесс конвертации в ConversionService
.
Всё, что нужно ConversionService
для выполнения преобразования, — это объект Job
, а также два коллбэка, используемых для подтверждения или отклонения сообщения из очереди (рассмотрим ниже).
Также обратите внимание, что в этом примере мы используем
this.channel.prefetch(1);
Об этом мы также поговорим немного позже.
Подтверждение сообщения
Чтобы удалить задачу из очереди (что свидетельствует о том, что сервис успешно обработал задачу либо отрицательно, либо положительно), нам нужно сделать ручное подтверждение.
Это можно сделать в amqplib, используя либо
channel.ack(msg);
Чтобы указать на положительное подтверждение сообщения.
Либо
// Второй параметр указывает, следует ли повторно поставить сообщение в очередь или нет
channel.reject(msg, false);
чтобы указать на отрицательное подтверждение сообщения.
Обратите внимание, что в случае ошибки мы не возвращаем сообщение в очередь, а считаем его «неудачным преобразованием». Но это можно оставить на усмотрение программиста.
Подробнее о подтверждении сообщений в RabbitMQ можно прочитать здесь.
Сервис преобразования
Этот сервис содержит основную логику нашего Service Worker-а.

Он раскрывает функцию beginConversion()
, которая вызывается из службы RabbitMQ при получении сообщения
public async beginConversion(job: Job, { onSuccess, onError }: { onSuccess: () => void; onError: () => void }) {
...
}
Эта функция выполнит все шаги, необходимые для преобразования, а затем вызовет либо onSuccess()
, либо onError()
в зависимости от успеха или неудачи.
Шаги, необходимые для преобразования видео с YouTube в GIF:
Загрузка видео с YouTube.
Видео с YouTube загружается локально.
Преобразование загруженного видео в GIF.
Видео конвертируется в GIF (конвертируется только выбранный диапазон по времени начала/окончания).
Загрузка GIF в облачное хранилище Google.
Обновление базы данных.
Вызывается onSuccess() или onError() соответственно.
Начнём с локальной загрузки видео с YouTube.
Загрузка видео с YouTube
Чтобы загрузить видео с YouTube локально, мы используем пакет ytdl-core, предназначенный для этой задачи.
За это отвечает функция downloadVideo()
, которая принимает URL/ID видео и возвращает ReadableStream, который можно использовать для сохранения видеофайла локально, а также его расширения, например: mp4, avi... и так далее.
//conversion.service.ts
import { Readable } from 'stream';
import ytdl from 'ytdl-core';
import YoutubeDownload from '../common/interfaces/YoutubeDownload';
private async downloadVideo({ youtubeId, youtubeUrl }: YoutubeDownload): Promise<{ video: Readable ; formatExtension: string }> {
const info = await ytdl.getInfo(youtubeId);
const format: ytdl.videoFormat = info.formats[0];
if (!format) throw new Error('No matching format found');
const video = ytdl(youtubeUrl, {
format,
});
return { video, formatExtension: format.container };
}
public async beginConversion(job: Job, { onSuccess, onError }: { onSuccess: () => void; onError: () => void }) {
try {
console.info('Started Processing Job :', job.id);
const { video, formatExtension } = await this.downloadVideo({
youtubeId: job.youtubeId,
youtubeUrl: job.youtubeUrl,
});
const srcFileName = `./src/media/temp.${formatExtension}`;
video.on('progress', (chunkLength, downloaded, total) => {
//... Logic for showing progress to the user..i.e progress bar
});
video.pipe(
fs
.createWriteStream(srcFileName)
.on('open', () => {
//Video download started
console.log('Downloading Video');
})
.on('finish', async () => {
//Video finished downloading locally in srcFileName
console.info('Downloaded video for job ', job.id);
//...Logic for converting the locally downloaded video to GIF
})
.on('error', async () => {
//...handle failure logic
}),
);
} catch (err) {
//...handle failure logic
}
}
Преобразование видео в GIF
Для преобразования локального видео в GIF мы будем использовать ffmpeg.wasm, который по сути является Webassembly-портом FFmpeg. Этот процесс можно представить себе как асинхронное использование FFmpeg внутри ноды для выполнения преобразования. Никаких порождений внешних процессов, никаких зависимых инструментов и так далее — что одновременно очень мощно и просто.
//conversion.service.ts
import { createFFmpeg, fetchFile, FFmpeg } from '@ffmpeg/ffmpeg';
import GifConversion from '../common/interfaces/GifConversion';
//...somewhere in our code
const ffmpeg = createFFmpeg({
log: false,
progress: p => {
progressBar.update(Math.floor(p.ratio * 100));
},
});
await ffmpeg.load();
//Converts a video range to GIF from srcFileName to destFileName
private async convertToGIF({ startTime, endTime, srcFileName, destFileName, formatExtension }: GifConversion) {
try {
console.info('Converting Video to GIF');
this.ffmpeg.FS('writeFile', `temp.${formatExtension}`, await fetchFile(srcFileName));
await this.ffmpeg.run(
'-i',
`temp.${formatExtension}`,
'-vcodec',
'gif',
'-ss',
`${startTime}`,
'-t',
`${endTime - startTime}`,
'-vf',
'fps=10',
`temp.gif`,
);
await fs.promises.writeFile(destFileName, this.ffmpeg.FS('readFile', 'temp.gif'));
console.info('Converted video to gif');
} catch (err) {
throw err;
}
}
public async beginConversion(job: Job, { onSuccess, onError }: { onSuccess: () => void; onError: () => void }) {
try {
console.info('Started Processing Job :', job.id);
const srcFileName = `./src/media/temp.${formatExtension}`;
const destFileName = `./src/media/temp.gif`;
//... Video download logic
// GIF Conversion
await this.convertToGIF({
startTime: job.startTime,
endTime: job.endTime,
srcFileName,
destFileName,
formatExtension,
});
} catch (err) {
//...handle failure logic
}
}
Загрузка GIF в облачное хранилище Google
После того как локальный видеофайл был сконвертирован в GIF, можно загрузить его в облачное хранилище Google.
Сначала создадим службу CloudStorageService
, которая будет отвечать именно за это.
В нашем случае используем Google Cloud Storage.
import { Storage } from '@google-cloud/storage';
import * as _ from 'lodash';
import { Service } from 'typedi';
@Service()
class CloudStorageService {
private storage;
private BUCKET_NAME;
constructor() {
const privateKey = _.replace(process.env.GCS_PRIVATE_KEY, new RegExp('\\\\n', 'g'), '\n');
this.BUCKET_NAME = 'yourbucketname';
this.storage = new Storage({
projectId: process.env.GCS_PROJECT_ID,
credentials: {
private_key: privateKey,
client_email: process.env.GCS_CLIENT_EMAIL,
},
});
}
async uploadGif(gifImage: Buffer, uploadName: string) {
try {
const bucket = await this.storage.bucket(this.BUCKET_NAME);
uploadName = `ytgif/${uploadName}`;
const file = bucket.file(uploadName);
await file.save(gifImage, {
metadata: { contentType: 'image/gif' },
public: true,
validation: 'md5',
});
return `https://storage.googleapis.com/${this.BUCKET_NAME}/${uploadName}`;
} catch (err) {
throw new Error('Something went wrong while uploading image');
}
}
}
export default CloudStorageService;
Теперь можно использовать его для загрузки сгенерированного GIF.
//conversion.service.ts
import Container from 'typedi';
import CloudStorageService from './cloudStorage.service';
private async uploadGifToCloudStorage(destFileName, uploadName): Promise<string> {
try {
console.info('Uploading gif to cloud storage');
const gifImage = await fs.promises.readFile(destFileName);
const cloudStorageInstance = Container.get(CloudStorageService);
const gifUrl = await cloudStorageInstance.uploadGif(gifImage, `gifs/${uploadName}`);
return gifUrl;
} catch (err) {
throw err;
}
}
public async beginConversion(job: Job, { onSuccess, onError }: { onSuccess: () => void; onError: () => void }) {
try {
const destFileName = `./src/media/temp.gif`;
//... Video download logic
//... Video conversion logic
const gifUrl = await this.uploadGifToCloudStorage(destFileName, job.id);
} catch (err) {
//...handle failure logic
}
}
Обработка success/failure
Она довольно проста. Сначала мы должны обновить задачу в базе данных.
В случае успеха:
Установите статус задачи на 'done' и обновите gifUrl на загруженный GIF в Google Cloud Storage.В случае неудачи:
Установите статус задачи на 'error'.
После этого мы вызовем функцию onSuccess()
или onError()
, которая, по сути, будет обрабатывать положительное / отрицательное подтверждение сообщения RabbitMQ.
public async beginConversion(job: Job, { onSuccess, onError }: { onSuccess: () => void; onError: () => void }) {
try {
const destFileName = `./src/media/temp.gif`;
//... Video download logic
//... Video conversion logic
const gifUrl = await this.uploadGifToCloudStorage(destFileName, job.id);
//Success scenario
await this.jobService.updateJobById(job.id as any, { status: 'done', gifUrl });
console.info(`Finished job ${job.id}, gif at ${gifUrl}`);
onSuccess();
} catch (err) {
//Failure scenario
console.error('Failed to process job', job.id);
await this.jobService.updateJobById(job.id as any, { status: 'error' });
onError();
}
}
Собираем всё вместе
Если собрать всё вместе, а также добавить шкалу прогресса выполнения с помощью cli-progress, то ConversionService
будет выглядеть следующим образом:
import Container, { Service } from 'typedi';
import JobsService from './jobs.service';
import ytdl from 'ytdl-core';
import { Readable } from 'stream';
import { Job } from '../entities/jobs.entity';
import { createFFmpeg, fetchFile, FFmpeg } from '@ffmpeg/ffmpeg';
import fs from 'fs';
import cliProgress from 'cli-progress';
import CloudStorageService from './cloudStorage.service';
import GifConversion from '../common/interfaces/GifConversion';
import YoutubeDownload from '../common/interfaces/YoutubeDownload';
const progressBar = new cliProgress.SingleBar({}, cliProgress.Presets.shades_classic);
@Service()
export default class ConversionService {
private ffmpeg: FFmpeg = null;
constructor(private jobService = new JobsService()) {}
public async initializeService() {
try {
this.ffmpeg = createFFmpeg({
log: false,
progress: p => {
progressBar.update(Math.floor(p.ratio * 100));
},
});
await this.ffmpeg.load();
} catch (err) {
console.error(err);
}
}
private async downloadVideo({ youtubeId, youtubeUrl }: YoutubeDownload): Promise<{ video: Readable; formatExtension: string }> {
const info = await ytdl.getInfo(youtubeId);
const format: ytdl.videoFormat = info.formats[0];
if (!format) throw new Error('No matching format found');
const video = ytdl(youtubeUrl, {
format,
});
return { video, formatExtension: format.container };
}
private async convertToGIF({ startTime, endTime, srcFileName, destFileName, formatExtension }: GifConversion) {
try {
console.info('Converting Video to GIF');
this.ffmpeg.FS('writeFile', `temp.${formatExtension}`, await fetchFile(srcFileName));
progressBar.start(100, 0);
await this.ffmpeg.run(
'-i',
`temp.${formatExtension}`,
'-vcodec',
'gif',
'-ss',
`${startTime}`,
'-t',
`${endTime - startTime}`,
'-vf',
'fps=10',
`temp.gif`,
);
progressBar.stop();
await fs.promises.writeFile(destFileName, this.ffmpeg.FS('readFile', 'temp.gif'));
console.info('Converted video to gif');
} catch (err) {
throw err;
}
}
private async uploadGifToCloudStorage(destFileName, uploadName): Promise<string> {
try {
console.info('Uploading gif to cloud storage');
const gifImage = await fs.promises.readFile(destFileName);
const cloudStorageInstance = Container.get(CloudStorageService);
const gifUrl = await cloudStorageInstance.uploadGif(gifImage, `gifs/${uploadName}`);
return gifUrl;
} catch (err) {
throw err;
}
}
public async beginConversion(job: Job, { onSuccess, onError }: { onSuccess: () => void; onError: () => void }) {
try {
await this.jobService.updateJobById(job.id as any, { status: 'processing' });
console.info('Started Processing Job :', job.id);
const { video, formatExtension } = await this.downloadVideo({
youtubeId: job.youtubeId,
youtubeUrl: job.youtubeUrl,
});
const srcFileName = `./src/media/temp.${formatExtension}`;
const destFileName = `./src/media/temp.gif`;
video.on('progress', (chunkLength, downloaded, total) => {
let percent: any = downloaded / total;
percent = percent * 100;
progressBar.update(percent);
});
video.pipe(
fs
.createWriteStream(srcFileName)
.on('open', () => {
console.log('Downloading Video');
progressBar.start(100, 0);
})
.on('finish', async () => {
progressBar.stop();
console.info('Downloaded video for job ', job.id);
await this.convertToGIF({
startTime: job.startTime,
endTime: job.endTime,
srcFileName,
destFileName,
formatExtension,
});
const gifUrl = await this.uploadGifToCloudStorage(destFileName, job.id);
await this.jobService.updateJobById(job.id as any, { status: 'done', gifUrl });
console.info(`Finished job ${job.id}, gif at ${gifUrl}`);
onSuccess();
})
.on('error', async () => {
progressBar.stop();
console.error('Failed to process job', job.id);
await this.jobService.updateJobById(job.id as any, { status: 'error' });
onError();
}),
);
} catch (err) {
await this.jobService.updateJobById(job.id as any, { status: 'error' });
onError();
throw err;
}
}
}
Вспомните, как мы использовали channel.prefetch(1)
, когда начали потребление из очереди
this.channel.prefetch(1);
Это позволяет убедиться, что каждый потребитель очереди получает только одно сообщение за раз. Это гарантирует, что нагрузка будет равномерно распределена между нашими потребителями, и всякий раз, когда потребитель освободится, он будет готов обрабатывать другие задачи.
Подробнее об этом можно прочитать в RabbitMQ Docs.
Это также означает, что если мы захотим масштабировать задачи на конвертацию / Service Worker-ы, мы можем добавить больше копий этого сервиса.
Подробнее об этом написано в заметке «Конкурирующие потребители».
Вот и всё для нашего Service Worker-а! Теперь мы можем начать копаться в клиентской части приложения.
Нам осталось рассмотреть, как реализовать Next.js Client, который будет отправлять запросы на конвертацию и просматривать сконвертированные GIF.
Создание клиентской части
Функциональные возможности
Клиентская часть нашего приложения очень проста, она должна делать только две вещи:
Предоставить интерфейс для создания запросов на конвертацию.
Предоставить страницу, которая будет опрашивать задачу на конвертацию GIF и просматривать сгенерированный GIF, когда задача выполнена.
Давайте сразу перейдём к первой задаче — созданию главной страницы.
Главная страница
Минимально эта страница должна иметь:
Поля ввода, содержащие:
URL видео
время начала GIF
время окончания GIF
Встроенный проигрыватель YouTube с превью выбранного видео и предварительным просмотром выбранного временного диапазона.
Две кнопки: для предварительного просмотра текущего выбора и для отправки текущего выбора для создания GIF.
Начнём с создания трёх необходимых полей ввода и их соответствующих состояний.
// pages/index.tsx
import React, { useState, useMemo } from 'react';
const Home: React.FC = () => {
const [youtubeUrl, setYoutubeUrl] = useState("");
const [startTime, setStartTime] = useState("");
const [endTime, setEndTime] = useState("");
const validYoutubeUrl = useMemo(() => {
const youtubeUrlRegex = /^(?:https?:\/\/)?(?:www\.)?(?:youtu\.be\/|youtube\.com\/(?:embed\/|v\/|watch\?v=|watch\?.+&v=))((\w|-){11})(?:\S+)?$/;
return youtubeUrl.match(youtubeUrlRegex);
}, [youtubeUrl]);
return (
<>
<input
className={`input ${youtubeUrl === "" ? "is-dark" : validYoutubeUrl? "is-success": "is-danger" }`}
type="text"
placeholder="Youtube URL, eg: https://www.youtube.com/watch?v=I-QfPUz1es8"
value={youtubeUrl}
onChange={(e) => {
setYoutubeUrl(e.target.value);
}}
/>
<input
className="input is-dark"
type="number"
placeholder="Start Second, eg: 38"
value={startTime}
onChange={(e) => {
setStartTime(e.target.value);
}}
/>
<input
className="input is-dark"
type="number"
placeholder="End Second, eg: 72"
value={endTime}
onChange={(e) => {
setEndTime(e.target.value);
}}
/>
</>
)
}
Обратите внимание, что мы проверяем валидность URL-адреса с помощью регулярных выражений. Это не обязательно, но используется для обеспечения хорошей визуальной обратной связи, а также для условного отображения встроенного YouTube плеера позже, чтобы избежать показа пустого проигрывателя.
Теперь пришло время добавить встроенный YouTube проигрыватель.
Мы будем использовать проигрыватель из react-youtube
// pages/index.tsx
import React, { useState, useMemo } from 'react';
import YouTube from "react-youtube";
const Home: React.FC = () => {
// ...code from before
const [ytPlayer, setYtPlayer] = useState(null);
const ytVideoId = useMemo(() => {
return youtubeUrl.split("v=")[1]?.slice(0, 11);
}, [youtubeUrl]);
return (
<>
<div className="content">
{validYoutubeUrl ? (
<>
<h3>Preview</h3>
<YouTube
videoId={ytVideoId}
opts={{
playerVars: {
start: Number(startTime),
end: Number(endTime),
autoplay: 0,
},
}}
onReady={(e) => {
setYtPlayer(e.target);
}}
/>
</>
) : (
<h4>No Youtube Video Link Selected</h4>
)}
</div>
</>
)
}
Обратите внимание, что мы инициализировали состояние ytPlayer
объектом event target. Мы используем его позже, чтобы программно управлять плеером — в частности, когда будем добавлять кнопку предварительного просмотра.
Теперь пришло время добавить две кнопки: Preview и Generate.
Предварительный просмотр (Preview): Используется для воспроизведения выбранной части видео, чтобы дать пользователю представление о том, как будет выглядеть GIF.
Сгенерировать (Generate): Используется для отправки фактического запроса на конвертацию GIF, то есть запуска фактической конвертации.
// pages/index.tsx
import React, { useState } from 'react';
import axios from "axios";
import { useRouter } from "next/router";
const Home: React.FC = () => {
// ... code from before
const router = useRouter();
const [loading, setLoading] = useState(false);
const submitYoutubeVideo = async () => {
setLoading(true);
try {
const response = await axios.post(
`${process.env.NEXT_PUBLIC_BASE_URL}/api/v1/jobs`,
{
youtubeUrl,
startTime: Number(startTime),
endTime: Number(endTime),
},
{}
);
router.push(`/jobs/${response.data.id}`);
} catch (err) {
alert(err?.response?.data?.message || "Something went wrong");
}
setLoading(false);
};
return (
<>
<button
className="button is-black"
onClick={() => {
if (ytPlayer)
ytPlayer.loadVideoById({
videoId: ytVideoId,
startSeconds: Number(startTime),
endSeconds: Number(endTime),
});
}}
>
Preview
</button>
<button
className={`button is-black is-outlined ${loading ? "is-loading" : ""}`}
onClick={submitYoutubeVideo}
>
Generate GIF
</button>
</>
)
}
Собираем всё вместе
// pages/index.tsx
import axios from "axios";
import { useRouter } from "next/router";
import React, { useMemo, useState } from "react";
import YouTube from "react-youtube";
const Home: React.FC = () => {
const router = useRouter();
const [youtubeUrl, setYoutubeUrl] = useState("");
const [startTime, setStartTime] = useState("");
const [endTime, setEndTime] = useState("");
const [loading, setLoading] = useState(false);
const [ytPlayer, setYtPlayer] = useState(null);
const validYoutubeUrl = useMemo(() => {
const youtubeUrlRegex = /^(?:https?:\/\/)?(?:www\.)?(?:youtu\.be\/|youtube\.com\/(?:embed\/|v\/|watch\?v=|watch\?.+&v=))((\w|-){11})(?:\S+)?$/;
return youtubeUrl.match(youtubeUrlRegex);
}, [youtubeUrl]);
const ytVideoId = useMemo(() => {
return youtubeUrl.split("v=")[1]?.slice(0, 11);
}, [youtubeUrl]);
const submitYoutubeVideo = async () => {
setLoading(true);
try {
const response = await axios.post(
`${process.env.NEXT_PUBLIC_BASE_URL}/api/v1/jobs`,
{
youtubeUrl,
startTime: Number(startTime),
endTime: Number(endTime),
},
{}
);
router.push(`/jobs/${response.data.id}`);
} catch (err) {
console.log(err);
alert(err?.response?.data?.message || "Something went wrong");
}
setLoading(false);
};
return (
<>
{validYoutubeUrl ? (
<>
<h3>Preview</h3>
<YouTube
videoId={ytVideoId}
opts={{
playerVars: {
start: Number(startTime),
end: Number(endTime),
autoplay: 0,
},
}}
onReady={(e) => {
setYtPlayer(e.target);
}}
/>
</>
) : (
<h4>No Youtube Video Link Selected</h4>
)}
<input
className={`input ${youtubeUrl === ""? "is-dark": validYoutubeUrl? "is-success": "is-danger"}`}
type="text"
placeholder="Youtube URL, eg: https://www.youtube.com/watch?v=I-QfPUz1es8"
value={youtubeUrl}
onChange={(e) => {
setYoutubeUrl(e.target.value);
}}
/>
<input
className="input is-dark"
type="number"
placeholder="Start Second, eg: 38"
value={startTime}
onChange={(e) => {
setStartTime(e.target.value);
}}
/>
<input
className="input is-dark"
type="number"
placeholder="End Second, eg: 72"
value={endTime}
onChange={(e) => {
setEndTime(e.target.value);
}}
/>
<button
className={`button is-black`}
onClick={() => {
if (ytPlayer)
ytPlayer.loadVideoById({
videoId: ytVideoId,
startSeconds: Number(startTime),
endSeconds: Number(endTime),
});
}}
>
Preview
</button>
<button
className={`button is-black is-outlined ${loading ? "is-loading" : ""}`}
onClick={submitYoutubeVideo}
>
Generate GIF
</button>
</>
);
};
export default Home;
Страница GIF
Частые опросы задачи преобразования GIF
Здесь мы хотим периодически получать данные о задачи преобразования GIF из бэкенда. Это известно как частые опросы (polling / поллинг).
Для этого мы будем использовать swr — библиотеку сбора данных для React. Она не обязательно используется для частого опроса, но у неё есть хороший API, который частый опрос поддерживает. Существуют и другие библиотеки для получения данных с похожими возможностями, в частности React Query. Также можно выполнять опрос с помощью axios (используя таймауты). Однако такие библиотеки сбора данных, как swr и React Query, имеют хуки для сбора данных — это улучшает опыт разработки, а также предоставляет другие возможности (например, кэширование).
Сначала мы должны предоставить функцию выборки данных:
import axios from "axios";
import Job from "../../common/interfaces/Job.interface";
export default async function fetchJobById(jobId: string): Promise<Job> {
try {
const response = await axios.get(
`${process.env.NEXT_PUBLIC_BASE_URL}/api/v1/jobs/${jobId}`
);
return response.data;
} catch (err) {
if (err.response?.status === 404) window.location.href = "/404";
throw err;
}
}
Затем можно использовать это вместе с swr для опроса задачи по преобразованию GIF:
// pages/jobs/[id].tsx
import { useRouter } from "next/router";
import React from "react";
import useSWR from "swr";
import Job from "../../lib/common/interfaces/Job.interface";
import fetchJobById from "../../lib/requests/fetchers/jobById";
export default function JobPage() {
const router = useRouter()
const { jobId } = router.query
const [jobDone, setJobDone] = React.useState(false);
const { data: job, error: errorJob, isValidating: isValidatingJob } = useSWR(
[`/api/jobs/${jobId}`, jobId],
async (url, jobId) => await fetchJobById(jobId),
{
initialData: null,
revalidateOnFocus: false,
// job will be polled from the backend every 2 seconds until its status change to 'done'
refreshInterval: jobDone ? 0 : 2000,
}
);
React.useEffect(() => {
if (job?.status === "done") setJobDone(true);
}, [job]);
const loadingJob = !job;
return (
<>
{/* rendering logic */}
</>
);
}
Обратите внимание, что в этом фрагменте кода refreshInterval
— это частота опроса данных с бэкенда. Мы использовали булево состояние, которое будет отслеживать статус задачи, и как только она будет выполнена, мы прекратим частый опрос бэкенда.
Рендеринг на стороне сервера
Мы можем использовать рендеринг Next на стороне сервера для динамического получения ID из URL, а также для первоначальной выборки задачи один раз перед загрузкой страницы.
Для этого мы используем getServerSideProps()
Подробнее об этом смотрите в документации Next.js.
// pages/jobs/[id].tsx
// ...other imports
import { InferGetServerSidePropsType } from "next";
export const getServerSideProps = async (context) => {
const jobId = context.params.id;
try {
const initialJob: Job = await fetchJobById(jobId);
return { props: { jobId, initialJob: initialJob } };
} catch (err) {
return { props: { jobId, initialJob: null } };
}
};
export default function JobPage({
jobId,
initialJob,
}: InferGetServerSidePropsType<typeof getServerSideProps>) {
//...other code
const { data: job, error: errorJob, isValidating: isValidatingJob } = useSWR(
[`/api/jobs/${jobId}`, jobId],
async (url, jobId) => await fetchJobById(jobId),
{
// use initialJob instead of null
initialData: initialJob,
revalidateOnFocus: false,
refreshInterval: jobDone ? 0 : 2000,
}
);
return (
<>
{/* rendering logic */}
</>
);
}
Обратите внимание, что мы использовали initialJob
в свойстве initialData
в swr опциях.
Собираем всё вместе
// pages/jobs/[id].tsx
import { InferGetServerSidePropsType } from "next";
import React from "react";
import useSWR from "swr";
import Job from "../../lib/common/interfaces/Job.interface";
import fetchJobById from "../../lib/requests/fetchers/jobById";
export default function JobPage({
jobId,
initialJob,
}: InferGetServerSidePropsType<typeof getServerSideProps>) {
const [jobDone, setJobDone] = React.useState(false);
const { data: job, error: errorJob, isValidating: isValidatingJob } = useSWR(
[`/api/jobs/${jobId}`, jobId],
async (url, jobId) => await fetchJobById(jobId),
{
initialData: initialJob,
revalidateOnFocus: false,
refreshInterval: jobDone ? 0 : 2000,
}
);
React.useEffect(() => {
if (job?.status === "done") setJobDone(true);
}, [job]);
const loadingJob = !job;
return (
<>
{loadingJob ? (
<>
<h4>Getting conversion status..</h4>
<progress className="progress is-medium is-dark" max="100">
45%
</progress>
</>
) : (
<div className="content">
{job.status === "error" ? (
<h4 style={{ color: "#FF0000" }}>Conversion Failed</h4>
) : job.status === "done" ? (
<>
{!job.gifUrl ? (
<h4 style={{ color: "#FF0000" }}>Conversion Failed</h4>
) : (
<>
<h4>Gif</h4>
<img src={job.gifUrl}></img>
<h6>
GIF Url : <a href={job.gifUrl}>{job.gifUrl}</a>
</h6>
<h6>
Converted from :
<a href={job.youtubeUrl}>{job.youtubeUrl}</a>
</h6>
</>
)}
</>
) : (
<>
<h4>Working..</h4>
<h5>Conversion Status : {job.status}</h5>
<progress className="progress is-medium is-dark" max="100">
45%
</progress>
</>
)}
</div>
)}
</>
);
}
export const getServerSideProps = async (context) => {
const jobId = context.params.id;
try {
const initialJob: Job = await fetchJobById(jobId);
return { props: { jobId, initialJob: initialJob } };
} catch (err) {
return { props: { jobId, initialJob: null } };
}
};
На этом туториал завершаем и надеемся, что вы узнали что-то новое. Полный исходный код можно посмотреть в github репозитории.
В заключение приглашаем всех желающих на открытый урок "React Testing Library", который пройдёт сегодня вечером. На нём научимся:
Применять на практике подход к разработке компонентов на React.js через тестирование. Писать модульные тесты для компонентов, контролировать корректность обработки событий и документировать техническое решение.
Упрощать рефакторинг, сокращать количество ошибок на проде и упрощать командную работу.
Использовать react testing library и jest. Описывать тестовые сценарии.
Записаться на урок можно на странице онлайн-курса "React.js Developer".