Как стать автором
Обновить

Всё хорошо, но нужно переписать или почему ваш код не попадает в прод

Уровень сложностиПростой
Время на прочтение7 мин
Количество просмотров4K

Разберу простую задачу, получение rss-ленты, и то, чем будет отличаться код, который просто получает ленту, от того, который собственно используется в производстве. 

Надеюсь материал будет полезен начинающим программистам и покажет, как примерно должна осуществляться разработка с прицелом на получение результата применимого в проектах. 

Всё же работает, почему не берут?

Получение ответа сервера по гиперссылке_Kandinsky 2.1
Получение ответа сервера по гиперссылке_Kandinsky 2.1

Начнем с конфликта: решение задачи получения rss-ленты выглядит очень просто, вот так:

import requests
import feedparser

response = requests.get('https://lenta.ru/rss')
lenta = feedparser.parse(response.text)

for items in lenta['entries']:
    print(items["title"])

Для одноразового удовлетворения любопытства вполне достаточно, но что если нам нужно получать несколько рсс-лент в плотном режиме (например каждую минуту)?

Пожалуй главное отличие прода в том, что код запускается не вручную (в целях любопытства: “А что мы интересно получим?”), а для получения стабильного предсказуемого результата в больших количествах. В нашем случае, это мониторинг ленты рсс, т.е. нам нужно будет слать много запросов и получать много ответов. 

Разберу самые необходимые элементы, которые нужно добавить, к этому коду, чтобы он мог стабильно работать, получая несколько фидов или даже несколько фидов одновременно).

Как будет работать код, какой результат нужен

Картинка: Программист, понимающий предназначение написанного им кода

Программист, понимающий предназначение написанного им кода_Kandinsky 2.1
Программист, понимающий предназначение написанного им кода_Kandinsky 2.1

Вопрос номер 1 где и как будет запускаться код. В данном случае, это будет или бесконечный цикл while true в виде сервиса на сервере или запуск по расписанию. В целом, оба подхода требуют одного: нам нужен стабильный перезапуск, чтобы если получим одну ошибку, вся система не падала. Но это несколько забегая вперёд. Сперва разберемся с самым простым.

Тут важно понимать, где и в каких условиях будет запускаться то, что вы пишите.

Проверка 200-ответа

Картинка: Проверка 200-ответа
Проверка 200-ответа_Kandinsky 2.1
Проверка 200-ответа_Kandinsky 2.1

Итак requests.get(url), что не так, и что нужно добавить. 

Начнем с того, что requests.get довольно капризная история, и если вы планируете посылать регулярные запросы к серверу, хорошо бы обрабатывать ответы с кодом отличным от 200. 

Добавляем проверку, строчку будет лучше поместить в функцию.

def get_response(url):
    response = requests.get(url)
    if response.status_code == 200:
        return response
    else:
        return False

Если ответ 200, получаем ответ и двигаемся дальше, если нет, тоже двигаемся дальше с небольшим но.

Логгирование, контроль исполнения и отладка

Картинка: Логгирование, контроль исполнения и отладка
Логгирование, контроль исполнения и отладка_Kandinsky 2.1
Логгирование, контроль исполнения и отладка_Kandinsky 2.1

Если задуматься о ситуации когда сервер вернет не 200-й ответ, тут думаю, должно возникнуть интуитивное желание записать происходящее, с тем чтобы:

  1. Отследить, случаи когда ответ не получен

  2. Понять почему это происходит.

Лучше вынести эту проверку в отдельную функцию:

def response_res(response):
    status = True
    if response.status_code != 200:
        status = False    
    return {'status':status, 'code': response.status_code, 'reason':response.reason}

Функция возвращает словарь, в котором есть код (нужен для проверки следующего шага) и причину (нужна для отладки).

Благодаря такой проверке мы можем получить что-то вроде:

HTTPSConnectionPool(host='riafan.ru', port=443): Max retries exceeded with url: /feed (Caused by ProxyError('Cannot connect to proxy.', timeout('_ssl.c:1114: The handshake operation timed out')))

И думать что с этим делать.

Немного маскировки

Картинка: Немного маскировки
Немного маскировки_Kandinsky 2.1
Немного маскировки_Kandinsky 2.1

Как можно понять из приведенной выше ошибки, автоматический сбор данных не очень приветствуется, даже в таком вроде бы легальном поле как получение рсс-ленты. Поэтому для устойчивого функционирования кода, эту ситуацию тоже нужно учитывать.

Как хорошо известно более опытным товарищам, голый запрос, скорее всего или словит капчу на второй-третий раз или просто будет заблокирован сервером, хорошо бы добавить маскировку и какой-то заголовок. Немного усовершенствуем функцию:

import fake_useragent
import logging

def get_response(url):
    s = requests.Session()
    user = fake_useragent.UserAgent().random
    header = {"user-agent": user}
    response = s.get(url, headers=header)
    logging.info(f"{url}, {res.status_code}, {s.cookies}")
    return response

Этого конечно же мало, как минимум не хорошо слать запросы с пустыми cookies, referer и так далее, но в этой статье в такие подробности углубляться не буду, главное, чтобы направление дальнейших исследований узких мест было понятным.

Если вообще не сработает

Картинка: try-except декоратор
try-except декоратор_Kandinsky 2.1
try-except декоратор_Kandinsky 2.1

Идём дальше, капризность requests не ограничивается ответами сервера, очень часто она может нам вернуть неприятность в виде ошибки. Если мы будем работать с запросами нескольких лент, ошибка в одной убьёт весь сбор. 

Добавляем, так любимый многими try-except, получаем ещё одну функцию:

def try_request():
    try:
        return get_response(url)
    except:
        return False

Тут видно, что в случае успеха, мы получаем наш response, а вот с исключением возникает, вопрос, как его правильно обработать.

Чтобы не писать дополнительных функций, используем в исключении объект Response() с ответом отличным от 200, и передадим с ним ошибку. Примерно так:

from requests.models import Response

def try_request():
    try:
        return get_response(url)
    except Exception as ex:
        response = Response()
        response.reason = ex    
        response.status_code = 444
        return response

Внесём немного разнообразия в процесс, и сделаем функцию try_request() в виде декоратора.

import sys

def try_request(req):
    def wrap(url):
        status = False
        try:
            response = req(url)
            error = 0
            status = True
        except Exception as ex:
            response = Response()
            response.reason = ex
            response.status_code = 444
            error = sys.exc_info()[1]
        return {"status": status, "response": response, "error": error}

    return wrap


@try_request
def get_response(url):
    s = requests.Session()
    user = fake_useragent.UserAgent().random    
    header = {"user-agent": user}
    response = s.get(url, headers=header)
    logging.info(f"{url}, {response.status_code}, {s.cookies}")
    return response

Опять используем словарь, на случай получения ошибок и их отладки. Если функция не сработает вернётся сгенерированный нами response, который отловит функция по неверному коду ответа.

Всё готово к развертыванию

Картинка: А теперь запустим многопоточный режим
А теперь запустим многопоточный режим_Kandinsky 2.1
А теперь запустим многопоточный режим_Kandinsky 2.1

Узкие места учтены, можно пробовать развернуть скрипт в рабочем варианте. При этом, можно использовать многопоточный режим, и отдельные возможные проблемы не скажутся на общем выполнении. Для одновременных запросов многопоточность вообще хороша, так как экономит много времени на исполнение.

from multiprocessing.pool import ThreadPool

def pool_data(rss_list):
    pool = ThreadPool(len(rss_list))
    try:
        feeds = pool.map(harvest_all, rss_list)
        pool.close()
        return feeds
    except Exception as ex:
        logging.info(f"многопоточность сломалась {ex}")
        return []

Тут всё просто, при помощи ThreadPool создаём количество потоков, равное количеству лент, и всё одновременно отрабатываем.

Ошибок на этой функции ловить не приходилось, возможно try-except тут излишний, но вроде как есть не просит и особо не мешает.

Вроде всё готово..

Запускаем программу... и кладём сервер

Стабильно это работать не будет. Мы забыли указать timeout в s.get!

Если запустить программу в режиме планировщика (например каждые 30 секунд), может возникнуть ситуация, когда ожидается ответ сервера, и уходит новый запрос, потом ещё и ещё, и

out of memory killed process

Добавим таймаут:

response = s.get(url, headers=header, timeout=3)

Ответ 200 не гарантирует, что вы получили, что хотели, ещё одна проверка

И ещё нужно проверить, что в ответе сервера есть, то что вы хотите. Ответ сервера может быть и с кодом 200, но внутри может не оказаться содержания которого вы ждёте. Например капча может быть вполне с 200 кодом или страница блокировки ваших бесконечных запросов без заголовка. 

В нашем случае, мы получаем словарь с определенными полями, поэтому можно сделать универсальную проверку. 

def check_feed(response):
    status = False
    lenta = feedparser.parse(response.text)     
    if lenta['entries']:
        status = True    
    return {'status':status, 'lenta': lenta['entries']} 

Как выглядит строчка requests.get(url) в готовом проекте

Строчка программного кода превращается в Лернейскую гидру_Kandinsky 2.1
Строчка программного кода превращается в Лернейскую гидру_Kandinsky 2.1

Итоговый вид такой: у нас список адресов рсс-лент, который мы передаём в программу, запросы по всем адресам отправляются одновременно, после чего проверяется:

  1. Что запрос вообще отработал без ошибки

  2. Полученный ответ сервера (с фиксацией причин проблем)

  3. Что в ответе есть нужное содержание.

При этом, если какая-то ссылка по какой-то причине не отработает, мы получим код и значение, которое позволит разобраться в причинах неполадок, без прекращения работы скрипта. 

В финале, вот так строчка response = requests.get(url) выглядит в рабочем проекте:

Как выглядит итоговый код
import requests
import feedparser
import sys
from requests.models import Response
import fake_useragent
from multiprocessing.pool import ThreadPool
import logging

# проверка статуса ответа
def response_res(response):
    status = True
    if response.status_code != 200:
        status = False
    return {"status": status, "code": response.status_code, "reason": response.reason}

# try-except декоратор
def try_request(req):
    def wrap(url):
        status = False
        try:
            response = req(url)
            error = 0
            status = True
        except Exception as ex:
            response = Response()
            response.reason = ex
            response.status_code = 444
            error = sys.exc_info()[1]
        return {"status": status, "response": response, "error": error}

    return wrap

# основная функция запроса
@try_request
def get_response(url):
    s = requests.Session()
    user = fake_useragent.UserAgent().random   
    header = {"user-agent": user}
    response = s.get(url, headers=header)
    logging.info(f"{url}, {response.status_code}, {s.cookies}")
    return response

# проверка содержания ответа
def check_feed(response):
    status = False
    lenta = feedparser.parse(response.text)
    if lenta["entries"]:
        status = True
    return {"status": status, "lenta": lenta["entries"]}

# сборная всех проверок и запроса
def harvest_all(url):
    response = get_response(url)
    response_stat = response_res(response["response"])
    feed_res = check_feed(response["response"])
    res_dict = {
        "feed": url,
        "response": response,
        "response_status": response_stat,
        "feed_cheker": feed_res,
    }
    return res_dict

# многопоточная функция
def pool_data(rss_list):
    pool = ThreadPool(len(rss_list))
    try:
        feeds = pool.map(harvest_all, rss_list)
        pool.close()
        return feeds
    except Exception as ex:
        logging.exception(f"многопоточность сломалась")
        return []


def main():
    rss_list = [
        "https://feed1.xml",
        "https://feed2.xml",
        "https://feed3.xml",
    ]
    feeds = pool_data(rss_list)
    for item in feeds:
        if item["feed_cheker"]["status"]:
            lenta = feedparser.parse(item["response"]["response"].text)
            for titles in lenta["entries"]:
                print(titles["title"])


if __name__ == "__main__":
    main()

Чтобы код использовался в рабочем проекте, его желательно продумывать схожим образом, на все возможные случаи, которые очевидные и не совсем.

PS

Как ещё больше углубиться в проблему и довести до ума код очень рекомендую к прочтению комментарий ув. andreymal

Теги:
Хабы:
Всего голосов 12: ↑6 и ↓60
Комментарии15

Публикации

Истории

Работа

Data Scientist
84 вакансии
Python разработчик
139 вакансий

Ближайшие события

Конференция HR API 2024
Дата14 – 15 июня
Время10:00 – 18:00
Место
Санкт-ПетербургОнлайн
Конференция «IT IS CONF 2024»
Дата20 июня
Время09:00 – 19:00
Место
Екатеринбург
Summer Merge
Дата28 – 30 июня
Время11:00
Место
Ульяновская область