Pull to refresh
0
Rating

Собрать миллион «лайков» или очереди задач в Node.js

Likeastore corporate blog Node.JS *API *
На прошлой неделе мы отметили одну круглую дату — в базе данных Likeastore скопилось, ни много, ни мало — один миллион пользовательских «лайков».

Мы используем JavaScript, все текущие сервисы написаны на JavaScript/Node.js. В общем и целом, я не жалею о использовании Node.js в нашем проекте, он отлично зарекомендовал себя как лучшее средство реализации HTTP API. Но для сбора «лайков», это должен быть daemon, который работает постоянно. Наверно, не самая типичная задача для Node.js — про специфику реализации и некоторые подводные камни, читаем далее.

Сollector


Коллектор (collector), ключевая компонента архитектуры Likeastore. Он собирает данные, через открытые API сервисов, обрабатывает ответы, сохраняет данные и свое текущее состояние. По сути, это «бесконечный цикл», который строит список выполняемых задач, запускает их, после этого процедура повторяется.

Для максимальной эффективности работы, мы запускаем 2 экземпляра коллектора, работающего в разных режимах: initial, normal. В initial режиме, коллектор только обрабатывает только что подключенные сети. Тем самым, пользователь максимально быстро получает «лайки», после подключения сети. После того, как все «лайки» были выгружены, сеть переходит в normal mode, обрабатывется уже другим инстансом, где время между сборами значительно выше.

var argv = require('optimist').argv;

var env = process.env.NODE_ENV = process.env.NODE_ENV || 'development';
var mode = process.env.COLLECTOR_MODE = process.env.COLLECTOR_MODE || argv.mode || 'normal';

var scheduler = require('./source/engine/scheduler');
scheduler(mode).run();


Scheduler


Планировщик, по сути является тем самым циклом while(true), написанным для Node.js. Признаюсь честно, переключение мышления с «синхронного» в «асинхронный» режим, был не самым простым процессом для меня. Запуск бесконечного числа задач в Node.js казался не простой задачей, в результате раздумий родился этот вопрос на SO.

Одним из вариантов, было использование async.queue, которое казалось очевидным, но не лучшим для этой задачи. После нескольких попыток, самым лучшим вариантом ассинхронного while(true) оказался setTimeout.

function scheduler (mode) {
	function schedulerLoop() {
		runCollectingTasks(restartScheduler);
	}

	function restartScheduler (err, results) {
		if (err) {
			logger.error(err);
		}

		// http://stackoverflow.com/questions/16072699/nodejs-settimeout-memory-leak
		var timeout = setTimeout(schedulerLoop, config.collector.schedulerRestart);
	}

	// ...

	return {
		run: function () {
			schedulerLoop();
		}
	};
}


Тут следует отметить тот самый подводный камень daemon'ов на Node.js — утечки памяти. Я заметил, что после продолжительной работы collector'а он начинал потреблять большое кол-во памяти в и самый неожиданный момент просто останавливался. Обратите внимание на комментарий в коде с вопросом на SO. После того как я добавил var timeout =, ситуация улучшилась, но не радикально.

Другая причина открылась после эпичного поста об утечках памяти и расследования инженеров Joyent и Wallmart. С переходом на Node.js v0.10.22 коллектор стал работать еще стабильней. Тем не менее, спонтанные остановки происходят, причину понять крайне тяжело.

Networks and states


Когда пользователь подключает новую сеть, мы кладем в коллекцию networks документ, который содержит: идентификатор пользователя, токен доступа и прочую служебную информацию. В этот же документ, коллектор денормализует свое текущее состояние (в каком режиме он работает, были ли ошибки, сколько их, какая текущая страница данных обрабатывается). Т.е. по сути, это один и тотже документ, на основе которого создается исполняемая задача.

function runCollectingTasks(callback) {
	prepareCollectingTasks(function (err, tasks) {
		if (err) {
			return callback(err);
		}

		runTasks(tasks, 'collecting', callback);
	});
}

function prepareCollectingTasks(callback) {
	networks.findByMode(mode, function (err, states) {
		if (err) {
			return callback({message: 'error during networks query', err: err});
		}

		if (!states) {
			return callback({message: 'failed to read networks states'});
		}

		callback(null, createCollectingTasks(states));
	});
}


Tasks


На основе состояния, мы создаем список исполняемых задач. Почти все открытые API популярных сервисов имеют ограничения по количеству реквестов на период времени. Задача коллектора сводится к тому, чтобы выполнить наиболее эффективное число запросов и не уйти в rate-limit.

К запуску разрешены только те задачи, которые были запланированы после текущего момента времени.

function createCollectingTasks(states) {
	var tasks = states.map(function (state) {
		return allowedToExecute(state) ? collectingTask(state) : null;
	}).filter(function (task) {
		return task !== null;
	});

	return tasks;
}

function allowedToExecute (state) {
	return moment().diff(state.scheduledTo) > 0;
}

function collectingTask(state) {
	return function (callback) { return executor(state, connectors, callback); };
}


Массив данных, преобразуется в массив функций, которые идут на вход runTasks. runTasks последовательно выполняет каждую задачу, через async.series.

function runTasks(tasks, type, callback) {
	async.series(tasks, function (err) {
		if (err) {
			// report error but continue execution to do not break execution chain..
			logger.error(err);
		}

		callback(null, tasks.length);
	});
}


Executor


Обобщенная функция, которая принимает текущее состояние, список существующих коннекторов и функцию обратного вызова (я поубирал всякую обработку ошибок и логгирование, чтобы показать ее суть).

function executor(state, connectors, callback) {
	var service = state.service;
	var connector = connectors[service];
	var connectorStarted = moment();

	connector(state, user, connectorExecuted);

	function connectorExecuted(err, updatedState, results) {
		saveConnectorState(state, connectorStateSaved);

		function saveConnectorState (state, callback) {
			// ...
		}

		function connectorStateSaved (err) {
			// ...

			saveConnectorResults(results, connectorResultsSaved);
		}

		function saveConnectorResults(results, callback) {
			// ...

			connectorResultsSaved(results, connectorResultsSaved);
		}

		function connectorResultsSaved (err, saveDuration) {
			// ...

			callback(null);
		}
	}
}


Connectors


Коннектор (connector), это функция, которая осуществляет HTTP реквест к АПИ, обрабатывает его ответ, обновляет состояние задачи (планируемый следующий запуск) и возвращает собранные данные. Именно коннектор, различает в каком состоянии находится сбор «лайков», в зависимости от этого делает нужный реквест (строит нужный request URI).

На данный момент, у нас реализованно 9 коннекторов, код который более менее схожий. Первоначально я всегда искал готовые библиотеки доступа к API, но это оказалась неверная стратегия: надо выбирать из нескольких вариантов, они не работают либо имеют не удобный интерфейс, отстают от текущей версии API и т.д. Самым гибким и простым решением оказалось использование request (лучший HTTP клиент для Node.js).

Приведу пример кода для GitHub (опять же сокращу детали, чтобы показать саму суть).

var API = 'https://api.github.com';

function connector(state, user, callback) {
	var accessToken = state.accessToken;

	if (!accessToken) {
		return callback('missing accessToken for user: ' + state.user);
	}

	initState(state);

	var uri = formatRequestUri(accessToken, state);
	var headers = { 'Content-Type': 'application/json', 'User-Agent': 'likeastore/collector'};

	request({uri: uri, headers: headers, timeout: config.collector.request.timeout, json: true}, function (err, response, body) {
		if (err) {
			return handleUnexpected(response, body, state, err, function (err) {
				callback (err, state);
			});
		}

		return handleResponse(response, body);
	});

	function initState(state) {
		// intialize state fields (page, errors, mode etc.) ...
	}

	function formatRequestUri(accessToken, state) {
		// create request URI based on values from state ...
	}

	function handleResponse(response, body) {
		var rateLimit = +response.headers['x-ratelimit-remaining'];

		var stars = body.map(function (r) {
			return {
				// transforms response into likeastore item
			};
		});

		return callback(null, scheduleTo(updateState(state, stars, rateLimit, false)), stars);
	}

	function updateState(state, data, rateLimit, failed) {
		state.lastExecution = moment().toDate();

		// update other state fields (next page, mode) ...

		return state;
	}
}


scheduleTo


Наконец, когда коннектор выполнился и состояние обновленно, нужно рассчитать следующий момент запуска. Он рассчитывается на основании ограничений API и режима работы коллектора (для initial mode пауза минимальна, для normal mode больше, как правило 15 минут, если коннектор уходи в rate limit то пауза максимальная).

function scheduleTo(state) {
	var currentMoment = moment();
	var service = state.service;

	var scheduleForMode = {
		initial: config.collector.quotes[service].runAfter,
		normal: config.collector.nextNormalRunAfter,
		rateLimit: config.collector.nextRateLimitRunAfter
	};

	var next = scheduleForMode[state.mode];
	var scheduledTo = currentMoment.add(next, 'milliseconds');

	state.scheduledTo = scheduledTo.toDate();

	return state;
}


Вот такой вот незамысловатый код, который «устоялся» примерно в сентябре прошлого года и все что мы добавляем с тех времен, это новые коннекторы, сам движок остается не изменным. Я задумываюсь о том, чтобы выделить отдельную библиотеку, для запуска очередей задач в Node.js, но не уверен на сколько это обобщенная задача.

Время идет, количество пользователей растет и на данный момент момент обработка 3 тысяч задач занимает около 30 минут, что довольно долго (стараемся держать время цикла не более 15 минут). Думаю, что в будущем архитектура коллектора изменится в сторону очередей сообщений и разделения коллекторов не по режиму работы, а по другому признаку (тип сети, кластер пользователей) для более легкой горизонтальной маштабируемости.
Tags:
Hubs:
Total votes 42: ↑33 and ↓9 +24
Views 15K
Comments Comments 12

Information

Founded
Location
Украина
Website
likeastore.com
Employees
2–10 employees
Registered