Здравствуйте дорогие хабровчане, в этом посте я хочу показать, как написать свой агрегатор новостей. Конечно, сразу становится очевидно, что это очередное изобретение велосипеда, однако анализируя существующие решения я всё время натыкался на камни преткновения. То они слишком медленно обновлялись, то не было нужных мне источников или часто бывало, что вообще ничего не работало без возможности починить. В итоге я написал своё решение.
Автор статьи приторговывает на бирже, и главной мотивацией было собрать все новости по интересующей теме в одном месте, чтобы не мониторить десяток различных источников вручную.
Текст под катом по большей части технический и будет, скорее всего, интересен читателям, которые сами торгуют на бирже и при этом в IT теме, либо тем, кто сам давно хотел написать агрегатор чего-нибудь.
Об агрегаторе новостей я размышлял уже давно. Во время торговли на бирже мне постоянно приходилось мониторить десяток авторитетных источников, особенно это напрягало, когда должна была выйти какая-нибудь новость, которая точно будет влиять на курс цены акций. В такие моменты было особенно сложно и обидно, когда подобную новость я пропускал. В общем, мне нужен был инструмент, с которым я мог бы оставаться в курсе всего.
Чтобы упростить понимание я написал два агрегатора, один — простой, его рассмотрю здесь. Код второго агрегатора, которым я пользуюсь сам, будет приложен в конце статьи. Простой агрегатор, в сущности, является более упрощённой версией сложного.
Основными источниками информации были телеграм каналы и новостные сайты. Для парсинга телеграма я выбрал telethon. Новости с сайтов можно забирать через RSS каналы с помощью feedparser. Однако, не на всех сайтах есть RSS, в этом случае буду парсить сайт напрямую используя scrapy. Полученные новости сливаются в отдельный телеграм канал с указанием источника.
Каждый парсер написан таким образом, чтобы его можно было запустить отдельно от остальных. Это значительно упрощает процесс добавления новых источников, их лучше проверять отдельно, чтобы убедиться в работоспособности. Например, feedparser может не прочитать RSS канал и тогда его придется парсить вручную.
Репозиторий с исходным кодом (простой парсер) — на GitHub.
1. Парсим телеграм канал

Чтобы telethon работал, необходимо для своего телеграм аккаунта создать переменные api_id и api_hash на сайте my.telegram.org и добавить эти параметры в скрипт. При первом запуске telethon сам создаёт файл с названием сессии (в нашем случае это gazp.session), его удалять не нужно, иначе придётся проходить аутентификацию ещё раз при следующем запуске.
Парсер сам по себе очень простой, считывает посты из канала @prime1 и печатает их в консоль, либо отсылает их, если в параметрах метода telegram_parser определена функция send_message_func.
from telethon import TelegramClient, events def telegram_parser(send_message_func=None, loop=None): '''Телеграм парсер''' # Параметры из my.telegram.org api_id = <Твой api_id> api_hash = <Твой api_hash> # Канал источник новостей @prime1 channel_source = 'https://t.me/prime1' # Сессия клиента telethon session = 'gazp' client = TelegramClient(session, api_id, api_hash, loop=loop) client.start() @client.on(events.NewMessage(chats=channel_source)) async def handler(event): '''Забирает посты из телеграмм каналов и посылает их в наш канал''' if send_message_func is None: print(event.raw_text, '\n') else: await send_message_func(f'@prime1\n{event.raw_text}') return client if __name__ == "__main__": client = telegram_parser() client.run_until_disconnected()
2. Парсим RSS

Примечание: RSS — крайне удобная вещь, в сущности это xml-файл, который мало весит, и в котором нет ничего лишнего. Такой файл сервер отдаёт без лишней нагрузки и клиент может легко его распарсить, имея при этом минимальные задержки для обновления.
Так как парсер забирает каждый раз N новостей, то каждый раз скачиваются старые новости, которые уже были напечатаны/отправлены. Для решения этой проблемы я ввёл очередь posted_q. Ясно, что вообще все сообщения сохранять не вариант, т.к. это потребует много памяти и, в конечном счёте, приведёт к ошибке MemoryError, когда она закончится. Кроме того в большом массиве долго проверять сообщения на повтор.
Таким образом, устаревшие сообщения нужно удалять, а новые сохранять, что и происходит в очереди. Слева в неё входят свежие новости, а справа удаляются старые, т.е. хранится всего N сообщений в моменте. В качестве ключа в очередь сохраняются первые 50 символов от текста новости, что также сделано для ускорения работы скрипта.
Парсер скачивает новости с RSS канала сайта www.rbc.ru.
import httpx import asyncio from collections import deque import feedparser async def rss_parser(httpx_client, posted_q, n_test_chars, send_message_func=None): '''Парсер rss ленты''' rss_link = 'https://rssexport.rbc.ru/rbcnews/news/20/full.rss' while True: try: response = await httpx_client.get(rss_link) except: await asyncio.sleep(10) continue feed = feedparser.parse(response.text) for entry in feed.entries[::-1]: summary = entry['summary'] title = entry['title'] news_text = f'{title}\n{summary}' head = news_text[:n_test_chars].strip() if head in posted_q: continue if send_message_func is None: print(news_text, '\n') else: await send_message_func(f'rbc.ru\n{news_text}') posted_q.appendleft(head) await asyncio.sleep(5) if __name__ == "__main__": # Очередь из уже опубликованных постов, чтобы их не дублировать posted_q = deque(maxlen=20) # 50 первых символов от текста новости - это ключ для проверки повторений n_test_chars = 50 httpx_client = httpx.AsyncClient() asyncio.run(rss_parser(httpx_client, posted_q, n_test_chars))
3. Парсим сайт напрямую

Кастомный парсер работает так же, как и RSS парсер, за тем лишь исключением, что используется scrapy вместо feedparser и скачивается вся страница, в которой кроме новостей ещё есть куча всего. Из-за этого приходится выставлять бо́льшую паузу между обращениями, ведь если слишком активно напрягать сервер, он может и забанить на какое-то время.
Подобный вид скриптов приходится писать, если у новостного сайта нет оперативно обновляемого телеграм и/или RSS канала. Парсер скачивает новости напрямую с сайта www.bcs-express.ru.
import httpx import asyncio from collections import deque from scrapy.selector import Selector async def bcs_parser(httpx_client, posted_q, n_test_chars, send_message_func=None): '''Кастомный парсер сайта bcs-express.ru''' bcs_link = 'https://bcs-express.ru/category' while True: try: response = await httpx_client.get(bcs_link) except: await asyncio.sleep(20) continue selector = Selector(text=response.text) for row in selector.xpath('//div[@class="feed__list"]/div/div')[::-1]: raw_text = row.xpath('*//text()').extract() title = raw_text[3] summary = raw_text[5] news_text = f'{title}\n{summary}' head = news_text[:n_test_chars].strip() if head in posted_q: continue if send_message_func is None: print(news_text, '\n') else: await send_message_func(f'bcs-express.ru\n{news_text}') posted_q.appendleft(head) await asyncio.sleep(10) if __name__ == "__main__": # Очередь из уже опубликованных постов, чтобы их не дублировать posted_q = deque(maxlen=20) # 50 первых символов от текста новости - это ключ для проверки повторений n_test_chars = 50 httpx_client = httpx.AsyncClient() asyncio.run(bcs_parser(httpx_client, posted_q, n_test_chars))
4. Запускаем все парсеры разом

Т.к. каждый парсер реализован асинхронно, то, чтобы они работали все вместе, добавим их в один общий цикл событий (event_loop). Это сделано для экономии ресурсов.
Примечание: в обычном синхронном коде, когда процесс в исполняющем потоке доходит до места, где требуются внешние ресурсы, он блокирует исполнение, ожидая ответа. При асинхронной реализации программы исполняющий поток занимается другим процессом — за счет этого и увеличивается производительность.
Тут же стоит отметить, что очередь posted_q (класс deque() модуля collections в python) является потокобезопасной, т.е. можно спокойно добавлять в неё новости из разных парсеров.
import httpx import asyncio from collections import deque from telegram_parser import telegram_parser from rss_parser import rss_parser from bcs_parser import bcs_parser loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # Канал куда скидываем новости, например @habr_agg, сюда введи свой канал channel_habr_agg = 'https://t.me/habr_agg' # Очередь из уже опубликованных постов, чтобы их не дублировать posted_q = deque(maxlen=40) # 50 первых символов от текста новости - это ключ для проверки повторений n_test_chars = 50 async def send_message_func(message): '''Отправляет посты в канал''' await client.send_message(entity=channel_habr_agg, message=message) # Телеграм парсер client = telegram_parser(send_message_func, loop) httpx_client = httpx.AsyncClient() # Добавляет в текущий event_loop rss парсер loop.create_task(rss_parser(httpx_client, posted_q, n_test_chars, send_message_func)) # Добавляет в текущий event_loop парсер сайта bcs-express.ru loop.create_task(bcs_parser(httpx_client, posted_q, n_test_chars, send_message_func)) # Запускает все парсеры client.run_until_disconnected()
Заключение
Репозиторий с исходным кодом (сложный парсер) — на GitHub.
Как было сказано выше, сложный агрегатор — это усложнённый вариант агрегатора простого. Основные отличия — фильтр для постов, увеличенное количество источников новостей, логирование, доработанная обработка ошибок, имитация запроса пользователя через браузер, докер контейнер и др.
Сложный агрегатор написан таким образом, чтобы быть максимально живучим, однако в принципе состоит из тех же модулей, что и простой.
Можно конечно запустить готовый агрегатор новостей где-то в облаке, но лично у меня он работает на очень слабеньком тонком клиенте, в котором всего 4 Gb оперативной памяти и двухъядерный процессор 1.2 GHz, этого железа хватает с большим запасом. Для меня это удобно, т.к. не приходится постоянно держать включенным настольный компьютер или ноутбук, плюс тонкий клиент совершенно бесшумный.
В целом его работой я доволен, это действительно очень удобно, когда едешь куда-то или отошёл по делам, можно легко следить за новостями через мобильный телефон.
Спасибо за внимание.
UPD
Телеграм канал, на котором можно оценить работу агрегатора @gazp_news, новости добавляются в будние дни в промежутке с 9:30 до 18:45-23:00, в это время у меня обычно включен тонкий клиент.

