Здравствуйте дорогие хабровчане, в этом посте я хочу показать, как написать свой агрегатор новостей. Конечно, сразу становится очевидно, что это очередное изобретение велосипеда, однако анализируя существующие решения я всё время натыкался на камни преткновения. То они слишком медленно обновлялись, то не было нужных мне источников или часто бывало, что вообще ничего не работало без возможности починить. В итоге я написал своё решение.
Автор статьи приторговывает на бирже, и главной мотивацией было собрать все новости по интересующей теме в одном месте, чтобы не мониторить десяток различных источников вручную.
Текст под катом по большей части технический и будет, скорее всего, интересен читателям, которые сами торгуют на бирже и при этом в IT теме, либо тем, кто сам давно хотел написать агрегатор чего-нибудь.
Об агрегаторе новостей я размышлял уже давно. Во время торговли на бирже мне постоянно приходилось мониторить десяток авторитетных источников, особенно это напрягало, когда должна была выйти какая-нибудь новость, которая точно будет влиять на курс цены акций. В такие моменты было особенно сложно и обидно, когда подобную новость я пропускал. В общем, мне нужен был инструмент, с которым я мог бы оставаться в курсе всего.
Чтобы упростить понимание я написал два агрегатора, один — простой, его рассмотрю здесь. Код второго агрегатора, которым я пользуюсь сам, будет приложен в конце статьи. Простой агрегатор, в сущности, является более упрощённой версией сложного.
Основными источниками информации были телеграм каналы и новостные сайты. Для парсинга телеграма я выбрал telethon. Новости с сайтов можно забирать через RSS каналы с помощью feedparser. Однако, не на всех сайтах есть RSS, в этом случае буду парсить сайт напрямую используя scrapy. Полученные новости сливаются в отдельный телеграм канал с указанием источника.
Каждый парсер написан таким образом, чтобы его можно было запустить отдельно от остальных. Это значительно упрощает процесс добавления новых источников, их лучше проверять отдельно, чтобы убедиться в работоспособности. Например, feedparser может не прочитать RSS канал и тогда его придется парсить вручную.
Репозиторий с исходным кодом (простой парсер) — на GitHub.
1. Парсим телеграм канал
Чтобы telethon работал, необходимо для своего телеграм аккаунта создать переменные api_id
и api_hash
на сайте my.telegram.org и добавить эти параметры в скрипт. При первом запуске telethon сам создаёт файл с названием сессии (в нашем случае это gazp.session
), его удалять не нужно, иначе придётся проходить аутентификацию ещё раз при следующем запуске.
Парсер сам по себе очень простой, считывает посты из канала @prime1 и печатает их в консоль, либо отсылает их, если в параметрах метода telegram_parser
определена функция send_message_func
.
from telethon import TelegramClient, events
def telegram_parser(send_message_func=None, loop=None):
'''Телеграм парсер'''
# Параметры из my.telegram.org
api_id = <Твой api_id>
api_hash = <Твой api_hash>
# Канал источник новостей @prime1
channel_source = 'https://t.me/prime1'
# Сессия клиента telethon
session = 'gazp'
client = TelegramClient(session, api_id, api_hash, loop=loop)
client.start()
@client.on(events.NewMessage(chats=channel_source))
async def handler(event):
'''Забирает посты из телеграмм каналов и посылает их в наш канал'''
if send_message_func is None:
print(event.raw_text, '\n')
else:
await send_message_func(f'@prime1\n{event.raw_text}')
return client
if __name__ == "__main__":
client = telegram_parser()
client.run_until_disconnected()
2. Парсим RSS
Примечание: RSS — крайне удобная вещь, в сущности это xml-файл, который мало весит, и в котором нет ничего лишнего. Такой файл сервер отдаёт без лишней нагрузки и клиент может легко его распарсить, имея при этом минимальные задержки для обновления.
Так как парсер забирает каждый раз N новостей, то каждый раз скачиваются старые новости, которые уже были напечатаны/отправлены. Для решения этой проблемы я ввёл очередь posted_q
. Ясно, что вообще все сообщения сохранять не вариант, т.к. это потребует много памяти и, в конечном счёте, приведёт к ошибке MemoryError
, когда она закончится. Кроме того в большом массиве долго проверять сообщения на повтор.
Таким образом, устаревшие сообщения нужно удалять, а новые сохранять, что и происходит в очереди. Слева в неё входят свежие новости, а справа удаляются старые, т.е. хранится всего N сообщений в моменте. В качестве ключа в очередь сохраняются первые 50 символов от текста новости, что также сделано для ускорения работы скрипта.
Парсер скачивает новости с RSS канала сайта www.rbc.ru.
import httpx
import asyncio
from collections import deque
import feedparser
async def rss_parser(httpx_client, posted_q,
n_test_chars, send_message_func=None):
'''Парсер rss ленты'''
rss_link = 'https://rssexport.rbc.ru/rbcnews/news/20/full.rss'
while True:
try:
response = await httpx_client.get(rss_link)
except:
await asyncio.sleep(10)
continue
feed = feedparser.parse(response.text)
for entry in feed.entries[::-1]:
summary = entry['summary']
title = entry['title']
news_text = f'{title}\n{summary}'
head = news_text[:n_test_chars].strip()
if head in posted_q:
continue
if send_message_func is None:
print(news_text, '\n')
else:
await send_message_func(f'rbc.ru\n{news_text}')
posted_q.appendleft(head)
await asyncio.sleep(5)
if __name__ == "__main__":
# Очередь из уже опубликованных постов, чтобы их не дублировать
posted_q = deque(maxlen=20)
# 50 первых символов от текста новости - это ключ для проверки повторений
n_test_chars = 50
httpx_client = httpx.AsyncClient()
asyncio.run(rss_parser(httpx_client, posted_q, n_test_chars))
3. Парсим сайт напрямую
Кастомный парсер работает так же, как и RSS парсер, за тем лишь исключением, что используется scrapy вместо feedparser и скачивается вся страница, в которой кроме новостей ещё есть куча всего. Из-за этого приходится выставлять бо́льшую паузу между обращениями, ведь если слишком активно напрягать сервер, он может и забанить на какое-то время.
Подобный вид скриптов приходится писать, если у новостного сайта нет оперативно обновляемого телеграм и/или RSS канала. Парсер скачивает новости напрямую с сайта www.bcs-express.ru.
import httpx
import asyncio
from collections import deque
from scrapy.selector import Selector
async def bcs_parser(httpx_client, posted_q,
n_test_chars, send_message_func=None):
'''Кастомный парсер сайта bcs-express.ru'''
bcs_link = 'https://bcs-express.ru/category'
while True:
try:
response = await httpx_client.get(bcs_link)
except:
await asyncio.sleep(20)
continue
selector = Selector(text=response.text)
for row in selector.xpath('//div[@class="feed__list"]/div/div')[::-1]:
raw_text = row.xpath('*//text()').extract()
title = raw_text[3]
summary = raw_text[5]
news_text = f'{title}\n{summary}'
head = news_text[:n_test_chars].strip()
if head in posted_q:
continue
if send_message_func is None:
print(news_text, '\n')
else:
await send_message_func(f'bcs-express.ru\n{news_text}')
posted_q.appendleft(head)
await asyncio.sleep(10)
if __name__ == "__main__":
# Очередь из уже опубликованных постов, чтобы их не дублировать
posted_q = deque(maxlen=20)
# 50 первых символов от текста новости - это ключ для проверки повторений
n_test_chars = 50
httpx_client = httpx.AsyncClient()
asyncio.run(bcs_parser(httpx_client, posted_q, n_test_chars))
4. Запускаем все парсеры разом
Т.к. каждый парсер реализован асинхронно, то, чтобы они работали все вместе, добавим их в один общий цикл событий (event_loop). Это сделано для экономии ресурсов.
Примечание: в обычном синхронном коде, когда процесс в исполняющем потоке доходит до места, где требуются внешние ресурсы, он блокирует исполнение, ожидая ответа. При асинхронной реализации программы исполняющий поток занимается другим процессом — за счет этого и увеличивается производительность.
Тут же стоит отметить, что очередь posted_q
(класс deque() модуля collections в python) является потокобезопасной, т.е. можно спокойно добавлять в неё новости из разных парсеров.
import httpx
import asyncio
from collections import deque
from telegram_parser import telegram_parser
from rss_parser import rss_parser
from bcs_parser import bcs_parser
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Канал куда скидываем новости, например @habr_agg, сюда введи свой канал
channel_habr_agg = 'https://t.me/habr_agg'
# Очередь из уже опубликованных постов, чтобы их не дублировать
posted_q = deque(maxlen=40)
# 50 первых символов от текста новости - это ключ для проверки повторений
n_test_chars = 50
async def send_message_func(message):
'''Отправляет посты в канал'''
await client.send_message(entity=channel_habr_agg, message=message)
# Телеграм парсер
client = telegram_parser(send_message_func, loop)
httpx_client = httpx.AsyncClient()
# Добавляет в текущий event_loop rss парсер
loop.create_task(rss_parser(httpx_client, posted_q,
n_test_chars, send_message_func))
# Добавляет в текущий event_loop парсер сайта bcs-express.ru
loop.create_task(bcs_parser(httpx_client, posted_q,
n_test_chars, send_message_func))
# Запускает все парсеры
client.run_until_disconnected()
Заключение
Репозиторий с исходным кодом (сложный парсер) — на GitHub.
Как было сказано выше, сложный агрегатор — это усложнённый вариант агрегатора простого. Основные отличия — фильтр для постов, увеличенное количество источников новостей, логирование, доработанная обработка ошибок, имитация запроса пользователя через браузер, докер контейнер и др.
Сложный агрегатор написан таким образом, чтобы быть максимально живучим, однако в принципе состоит из тех же модулей, что и простой.
Можно конечно запустить готовый агрегатор новостей где-то в облаке, но лично у меня он работает на очень слабеньком тонком клиенте, в котором всего 4 Gb оперативной памяти и двухъядерный процессор 1.2 GHz, этого железа хватает с большим запасом. Для меня это удобно, т.к. не приходится постоянно держать включенным настольный компьютер или ноутбук, плюс тонкий клиент совершенно бесшумный.
В целом его работой я доволен, это действительно очень удобно, когда едешь куда-то или отошёл по делам, можно легко следить за новостями через мобильный телефон.
Спасибо за внимание.
UPD
Телеграм канал, на котором можно оценить работу агрегатора @gazp_news, новости добавляются в будние дни в промежутке с 9:30 до 18:45-23:00, в это время у меня обычно включен тонкий клиент.