Недавно ко мне обратился друг с просьбой написать бота, импортирующего новости из RSS-канала на сайте в Telegram-канал. Огромнейшим плюсом данного способа оповещения являются push-уведомления, которые приходят каждому подписанному пользователю на его устройство. Уже давно хотелось заняться чем-то подобным. Недолго думая, в качестве образца я выбрал канал Хабра telegram.me/habr_ru. В качестве языка программирования был выбран Python.
В итоге, мне надо было решить следующие проблемы:
- Парсинг RSS.
- Одним из условий был отложенный постинг сообщений (если после того, как новость была выложена, в течение n часов её скрыли/удалили/переименовали, то она не должна быть опубликована, вместо нее отправляется оповещение о корректной новости)
- Постинг сообщений в телеграм.
- Сокращение целевой ссылки с помощью сервиса bit.ly
От себя добавил еще:
- Ведение логов с помощью библиотеки (logging).
- Обработка конфига (configparser).
1. Отложенный постинг сообщений
Для решения данной проблемы было принято решение использовать SQLite базу данных. Для работы с БД использовалась библиотека SQLalchemy.
Структура до банального проста — всего одна таблица. Код объекта представлен ниже:
class News(Base): __tablename__ = 'news' id = Column(Integer, primary_key=True) # Порядковый номер новости text = Column(String) # Текст (Заголовок), который будет отправлен в сообщении link = Column(String) # Ссылка на статью на сайте. Так же отправляется в сообщении date = Column(Integer) # Дата появления новости на сайте. Носит Чисто информационный характер. UNIX_TIME. publish = Column(Integer) # Планируемая дата публикации. Сообщение будет отправлено НЕ РАНЬШЕ этой даты. UNIX_TIME. chat_id = Column(Integer) # Информационный столбец. В данное поле логируется чат, в который было отправлено сообщение message_id = Column(Integer) # Информационный столбец. В данный столбец логирует внутренний идентификатор сообщения в канале. def __init__(self, text, link, date, publish=0,chat_id=0,message_id=0): self.link = link self.text = text self.date = date self.publish = publish self.chat_id = chat_id self.message_id = message_id def _keys(self): return (self.text, self.link) def __eq__(self, other): return self._keys() == other._keys() def __hash__(self): return hash(self._keys()) def __repr__(self): return "<News ('%s','%s', %s)>" % (base64.b64decode(self.text).decode(),\ base64.b64decode(self.link).decode(),\ datetime.fromtimestamp(self.publish)) # Для зрительного восприятия данные декодируются
Для хранения текстовой информации и ссылок использется base64, форматом хранения даты-времени был выбран Unix Timestamp.
Обработка данных сессии осуществляется отдельным классом.
Base = declarative_base() class Database: """ Класс для обработки сессии SQLAlchemy. Также включает в себя минимальный набор методов, вызываемых в управляющем классе. Названия методов говорящие. """ def __init__(self, obj): engine = create_engine(obj, echo=False) Session = sessionmaker(bind=engine) self.session = Session() def add_news(self, news): self.session.add(news) self.session.commit() def get_post_without_message_id(self): return self.session.query(News).filter(and_(News.message_id == 0,\ News.publish<=int(time.mktime(time.localtime())))).all() def update(self, link, chat, msg_id): self.session.query(News).filter_by(link = link).update({"chat_id":chat, "message_id":msg_id}) self.session.commit() def find_link(self,link): if self.session.query(News).filter_by(link = link).first(): return True else: return False
При обнаружении новости, она добавляется в базу. Сразу же задается время публикации.
Для обнаружения новостей готовых к публикации используется метод get_post_withwithout_message_id. Фактически, мы выбираем из базы все посты, у которых message_id=0 и дата публикации меньше текущего времени.
Для проверки на новизну отправляем запрос базе данных на факт содержания ссылки на новость (метод find_link).
Метод update служит для обновления данных, после публикации новости в канале.
2. Парсинг RSS
Стоит признаться, что писать свой RSS парсер совсем не хотелось, поэтому в бой вступила библиотека feedparser.
import feedparser class Source(object): def __init__(self, link): self.link = link self.news = [] self.refresh() def refresh(self): data = feedparser.parse(self.link) self.news = [News(binascii.b2a_base64(i['title'].encode()).decode(),\ binascii.b2a_base64(i['link'].encode()).decode(),\ int(time.mktime(i['published_parsed']))) for i in data['entries']]
Код до смешного прост. При вызове метода refresh с помощью генератора формируется список объектов класса News из последних 30 размещенных постов в rss ленте.
3. Сокращение ссылок
Как упоминалось выше, в качестве сервиса был выбран bit.ly. API не вызвает лишних вопросов.
class Bitly: def __init__(self,access_token): self.access_token = access_token def short_link(self, long_link): url = 'https://api-ssl.bitly.com/v3/shorten?access_token=%s&longUrl=%s&format=json'\ % (self.access_token, long_link) try: return json.loads(urllib.request.urlopen(url).read().decode('utf8'))['data']['url'] except: return long_link
В инит метод передается только наш access_token. В случае неудачного получения сокращенной ссылки, метод short_link возвращает переданную ему изначальную ссылку.
4. Управляющий класс
class ExportBot: def __init__(self): config = configparser.ConfigParser() config.read('./config') log_file = config['Export_params']['log_file'] self.pub_pause = int(config['Export_params']['pub_pause']) self.delay_between_messages = int(config['Export_params']['delay_between_messages']) logging.basicConfig(format = u'%(filename)s[LINE:%(lineno)d]# %(levelname)-8s \ [%(asctime)s] %(message)s',level = logging.INFO, filename = u'%s'%log_file) self.db = database(config['Database']['Path']) self.src = source(config['RSS']['link']) self.chat_id = config['Telegram']['chat'] bot_access_token = config['Telegram']['access_token'] self.bot = telegram.Bot(token=bot_access_token) self.bit_ly = bitly(config['Bitly']['access_token']) def detect(self): #получаем 30 последних постов из rss-канала self.src.refresh() news = self.src.news news.reverse() #Проверяем на наличие в базе ссылки на новость. Если нет, то добавляем в базу данных с #отложенной публикацией for i in news: if not self.db.find_link(i.link): now = int(time.mktime(time.localtime())) i.publish = now + self.pub_pause logging.info( u'Detect news: %s' % i) self.db.add_news(i) def public_posts(self): #Получаем 30 последних записей из rss канала и новости из БД, у которых message_id=0 posts_from_db = self.db.get_post_without_message_id() self.src.refresh() line = [i for i in self.src.news] #Выбор пересечний этих списков for_publishing = list(set(line) & set(posts_from_db)) for_publishing.reverse() #Постинг каждого сообщений for post in for_publishing: text = '%s %s' % (base64.b64decode(post.text).decode('utf8'),\ self.bit_ly.short_link(base64.b64decode(post.link).decode('utf-8'))) a = self.bot.sendMessage(chat_id=self.chat_id, text=text, parse_mode=telegram.ParseMode.HTML) message_id = a.message_id chat_id = a['chat']['id'] self.db.update(post.link, chat_id, message_id) logging.info( u'Public: %s;%s;' % (post, message_id)) time.sleep(self.delay_between_messages)
При инциализации с помощью библиотеки configparser считываем наш конфиг-файл и настраиваем логгирование.
Чтобы детектировать новости, используем метод detect. Получаем последние 30 опубликованных постов, поочередно проверяем наличие ссылки в базе данных.
Перед публикацией, необходимо проверить наличие постов, выгруженных из базы данных в rss-канале. В этом нам помогут множества. И после этого уже публикуем новость с помощью библиотеки telegram. Её функционал довольно широк и ориентирован на написание ботов. После публикации необходимо обновить message_id и chat_id.
В итоге получаем:

Стоит отметить то, что если переписать класс rss, то так же можно будет импортировать новости из других источников (VK, facebook и т.д.).
Исходники можно найти на Github: https://github.com/Vispiano/rss2telegram
UPD: Да, случайно забравшийся "print" выглядит ужасно и названия классов не в CamelCase не лучше.
