Привет! Мы обучаем людей работе с большими данными. Невозможно себе представить образовательную программу по большим данным без своего кластера, на котором все участники совместно работают. По этой причине на нашей программе он всегда есть :) Мы занимаемся его настройкой, тюнингом и администрированием, а ребята непосредственно запускают там MapReduce-джобы и пользуются Spark'ом.
В этом посте мы расскажем, как мы решали проблему неравномерной загрузки кластера, написав свой автоскейлер, используя облако Mail.ru Cloud Solutions.
Проблема
Кластер у нас используется не совсем в типичном режиме. Утилизация сильно неравномерная. Например, есть практические занятия, когда все 30 человек и преподаватель заходят на кластер и начинают им пользоваться. Или опять же есть дни перед дедлайном, когда загрузка сильно возрастает. Во все остальное время кластер работает в режиме недозагрузки.
Решение №1 – это держать кластер, который будет выдерживать пиковые загрузки, но будет простаивать во все остальное время.
Решение №2 – это держать небольшой кластер, в который вручную добавлять ноды перед занятиями и во время пиковых нагрузок.
Решение №3 – это держать небольшой кластер и написать автоскейлер, который будет следить за текущей загрузкой кластера и сам, используя различные API, добавлять и удалять ноды из кластера.
В этом посте мы будем говорить о решении №3. Такой автоскейлер сильно зависит от внешних факторов, а не от внутренних, и провайдеры его часто не предоставляют. Мы пользуемся облачной инфраструктурой Mail.ru Cloud Solutions и написали автоскейлер, используя API MCS. А так как мы обучаем работе с данными, решили показать, как вы можете написать подобный автоскейлер для своих целей и использовать со своим облаком
Prerequisites
Во-первых, у вас должен быть Hadoop-кластер. Мы, например, пользуемся дистрибутивом HDP.
Чтобы у вас ноды могли быстро добавляться и удаляться, у вас должно быть определенное распределение ролей по нодам.
- Мастер-нода. Ну тут пояснять ничего особенно не надо: главная нода кластера, на которой запускается, например, драйвер Spark'а, если вы пользуетесь интерактивным режимом.
- Дата-нода. Это нода, на которой у вас хранятся данные на HDFS и на ней же происходят вычисления.
- Вычислительная нода. Это нода, на которой у вас ничего не хранится на HDFS, но на ней происходят вычисления.
Важный момент. Автоскейлинг будет происходить за счет нод третьего типа. Если вы начнете забирать и добавлять ноды второго типа, то скорость реагирования будет сильно низкой – декомишен и рекомишен будет занимать часы на вашем кластере. Это, конечно, не то, что ожидаешь от автоскейлинга. То есть ноды первого и второго типа мы не трогаем. Они будут представлять собой минимально жизнеспособный кластер, который будет существовать на протяжении всего действия программы.
Итак, наш автоскейлер написан на Python 3, использует Ambari API для управления сервисами кластера, использует API от Mail.ru Cloud Solutions (MCS) для запуска и остановки машин.
Архитектура решения
- Модуль
autoscaler.py
. В нем прописаны три класса: 1) функции для работы с Ambari, 2) функции для работы с MCS, 3) функции, связанные непосредственно с логикой работы автоскейлера. - Скрипт
observer.py
. По сути состоит из разных правил: когда и в какие моменты вызывать функции автоскейлера. - Файл с конфигурационными параметрами
config.py
. Там содержится, например, список нод, разрешенных для автоскейлинга и другие параметры, влияющие, например, на то, сколько времени подождать с того момента, когда была добавлена новая нода. Там же находятся еще и таймстемпы начала занятий, чтобы перед занятием была запущена максимальная разрешенная конфигурация кластера.
Давайте теперь посмотрим на куски кода, находящиеся внутри первых двух файлов.
1. Модуль autoscaler.py
Класс Ambari
Так выглядит кусочек кода, содержащий класс Ambari
:
class Ambari:
def __init__(self, ambari_url, cluster_name, headers, auth):
self.ambari_url = ambari_url
self.cluster_name = cluster_name
self.headers = headers
self.auth = auth
def stop_all_services(self, hostname):
url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/'
url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname
req0 = requests.get(url2, headers=self.headers, auth=self.auth)
services = req0.json()['host_components']
services_list = list(map(lambda x: x['HostRoles']['component_name'], services))
data = {
"RequestInfo": {
"context":"Stop All Host Components",
"operation_level": {
"level":"HOST",
"cluster_name": self.cluster_name,
"host_names": hostname
},
"query":"HostRoles/component_name.in({0})".format(",".join(services_list))
},
"Body": {
"HostRoles": {
"state":"INSTALLED"
}
}
}
req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth)
if req.status_code in [200, 201, 202]:
message = 'Request accepted'
else:
message = req.status_code
return message
Выше для примера можно посмотреть на реализацию функции stop_all_services
, которая останавливает все сервисы на нужной ноде кластера.
На вход классу Ambari
вы передаете:
ambari_url
, например, вида'http://localhost:8080/api/v1/clusters/'
,cluster_name
– название вашего кластера в Ambari,headers = {'X-Requested-By': 'ambari'}
- и внутри
auth
лежит ваш логин и пароль от Ambari:auth = ('login', 'password')
.
Сама функция представляет из себя не более чем парочку обращений через REST API к Ambari. С точки зрения логики мы вначале получаем список запущенных сервисов на ноде, а затем просим на данном кластере, на данной ноде перевести сервисы из списка в состояние INSTALLED
. Функции по запуску все сервисов, по переводу нод в состояние Maintenance
и др. выглядят похожим образом – это просто несколько запросов через API.
Класс Mcs
Так выглядит кусочек кода, содержащий класс Mcs
:
class Mcs:
def __init__(self, id1, id2, password):
self.id1 = id1
self.id2 = id2
self.password = password
self.mcs_host = 'https://infra.mail.ru:8774/v2.1'
def vm_turn_on(self, hostname):
self.token = self.get_mcs_token()
host = self.hostname_to_vmname(hostname)
vm_id = self.get_vm_id(host)
mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action'
headers = {
'X-Auth-Token': '{0}'.format(self.token),
'Content-Type': 'application/json'
}
data = {'os-start' : 'null'}
mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers)
return mcs.status_code
На вход классу Mcs
мы передаем id проекта внутри облака и id пользователя, а также его пароль. В функции vm_turn_on
мы хотим включить одну из машин. Логика здесь чуть сложнее. В начале кода идет вызов трех других функций: 1) нам нужно получить токен, 2) нам нужно конвертировать hostname в название машины в MCS, 3) получить id этой машины. Далее мы делаем просто post-запрос и запускаем эту машину.
Так выглядит сама функция по получению токена:
def get_mcs_token(self):
url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog'
headers = {'Content-Type': 'application/json'}
data = {
'auth': {
'identity': {
'methods': ['password'],
'password': {
'user': {
'id': self.id1,
'password': self.password
}
}
},
'scope': {
'project': {
'id': self.id2
}
}
}
}
params = (('nocatalog', ''),)
req = requests.post(url, data=json.dumps(data), headers=headers, params=params)
self.token = req.headers['X-Subject-Token']
return self.token
Класс Autoscaler
В этом классе содержатся функции, относящиеся к самой логике работы.
Так выглядит кусок кода этого класса:
class Autoscaler:
def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node):
self.scaling_hosts = scaling_hosts
self.ambari = ambari
self.mcs = mcs
self.q_ram = deque()
self.q_cpu = deque()
self.num = 0
self.yarn_ram_per_node = yarn_ram_per_node
self.yarn_cpu_per_node = yarn_cpu_per_node
def scale_down(self, hostname):
flag1 = flag2 = flag3 = flag4 = flag5 = False
if hostname in self.scaling_hosts:
while True:
time.sleep(5)
status1 = self.ambari.decommission_nodemanager(hostname)
if status1 == 'Request accepted' or status1 == 500:
flag1 = True
logging.info('Decomission request accepted: {0}'.format(flag1))
break
while True:
time.sleep(5)
status3 = self.ambari.check_service(hostname, 'NODEMANAGER')
if status3 == 'INSTALLED':
flag3 = True
logging.info('Nodemaneger decommissioned: {0}'.format(flag3))
break
while True:
time.sleep(5)
status2 = self.ambari.maintenance_on(hostname)
if status2 == 'Request accepted' or status2 == 500:
flag2 = True
logging.info('Maintenance request accepted: {0}'.format(flag2))
break
while True:
time.sleep(5)
status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER')
if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST':
flag4 = True
self.ambari.stop_all_services(hostname)
logging.info('Maintenance is on: {0}'.format(flag4))
logging.info('Stopping services')
break
time.sleep(90)
status5 = self.mcs.vm_turn_off(hostname)
while True:
time.sleep(5)
status5 = self.mcs.get_vm_info(hostname)['server']['status']
if status5 == 'SHUTOFF':
flag5 = True
logging.info('VM is turned off: {0}'.format(flag5))
break
if flag1 and flag2 and flag3 and flag4 and flag5:
message = 'Success'
logging.info('Scale-down finished')
logging.info('Cooldown period has started. Wait for several minutes')
return message
На вход мы принимаем классы Ambari
и Mcs
, список нод, которые разрешены для скейлинга, а также параметры конфигурации нод: память и cpu, выделенные на ноду в YARN. Также есть 2 внутренних параметра q_ram, q_cpu, являющихся очередями. При помощи них мы храним значения текущей нагрузки кластера. Если мы видим, что в течение последних 5 минут стабильно была повышенная нагрузка, то мы принимаем решение о том, что нужно добавить +1 ноду в кластер. То же самое справедливо и для состояния недозагрузки кластера.
В коде выше приведен пример функции, которая удаляет машину из кластера и останавливает ее в облаке. Вначале происходит декомишен YARN Nodemanager
, дальше включается режим Maintenance
, дальше мы останавливаем все сервисы на машине и выключаем виртуальную машину в облаке.
2. Скрипт observer.py
Пример кода оттуда:
if scaler.assert_up(config.scale_up_thresholds) == True:
hostname = cloud.get_vm_to_up(config.scaling_hosts)
if hostname != None:
status1 = scaler.scale_up(hostname)
if status1 == 'Success':
text = {"text": "{0} has been successfully scaled-up".format(hostname)}
post = {"text": "{0}".format(text)}
json_data = json.dumps(post)
req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'})
time.sleep(config.cooldown_period*60)
В нем мы проверяем, сложились ли условия для увеличения мощностей кластера и есть ли вообще в резерве машины, получаем хостнейм одной из них, добавляем ее в кластер и публикуем об этом сообщение в Slack нашей команды. После чего запускается cooldown_period
, когда мы ничего не добавляем и не убираем из кластера, а просто мониторим загрузку. Если она стабилизировалась и находится внутри коридора оптимальных значений загрузки, то мы просто продолжаем мониторинг. Если же одной ноды не хватило, то добавляем еще одну.
Для случаев когда у нас впереди занятие, мы уже знаем наверняка, что одной ноды не хватит, поэтому мы сразу стартуем все свободные ноды и держим их активными до конца занятия. Это происходит при помощи списка таймстемпов занятий.
Заключение
Автоскейлер – это хорошее и удобное решение для тех случаев, когда у вас наблюдается неравномерная загрузка кластера. Вы одновременно добиваетесь нужной конфигурации кластера под пиковые нагрузки и при этом не держите этот кластер во время недозагрузки, экономя средства. Ну и плюс это все происходит автоматизированно без вашего участия. Сам автоскейлер – это не более, чем набор запросов к API кластер-менеджера и API облачного провайдера, прописанных по определенной логике. О чем точно нужно помнить – это о разделении нод на 3 типа, как мы писали ранее. И будет вам счастье.