Как стать автором
Обновить

Читаем telegram-каналы в виде новостной ленты, часть вторая, с осмыслением

Время на прочтение8 мин
Количество просмотров17K

В течение нескольких часов после публикации статьи о запуске Телегрегатора приходили сообщения с благодарностями и вопросами по нюансам работы.

Но недолго я радовался: потом пошел поток сообщений о том, что бот перестал добавлять в поток новые каналы либо вообще не работал.

Я это списывал на не слишком подробную инструкцию по запуску, сбои в работе самого бота (о множестве существующих лимитов я узнал из недавней статьи), возможные ошибки пользователей.

Путем общения с пользователями, чтением логов и раздумий пришел к выводу, что юзербот (а именно так, в соответствие со статьей выше, принято называть аккаунт телеграма, работающий в режиме бота) упёрся в лимит каналов (500 на аккаунт, спасибо @Slavenin999 за наводку). Я не уточнял, приватных или публичных, да это и не важно. Юзер бот упёрся в них за день.

У меня было в планах выложить код Телегрегатора в открытый доступ, но, перед этим, я хотел его хоть немного привести в подобающий вид. Однако, учитывая события с лимитом, решил не откладывать в долгий ящик, и публикую его на гитхабе, чтобы каждый желающий смог развернуть его для себя и не ждать решения с обходом лимитов, т.к. это может затянуться..

Сами подробности по установке будут в репе с ботом. В общем-то, на этом всё, ниже приведены несколько ключевых моментов из кода, который местами даже прокомментирован.

inb4: пинать за код не надо, питон только осваиваю. это прототип, который не взлетел.

Инициализация всего

from telethon import TelegramClient, events, sync, functions, types, utils
from telethon.tl.custom import Button
import asyncio
import os
import logging
from TelegramCommand import *
from database import *
import importlib
from dotenv import load_dotenv
load_dotenv()

logger = logging.getLogger("tggt")
logger.setLevel(logging.DEBUG)

fh = logging.FileHandler("app.log")
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
logger.addHandler(fh)

api_id = os.getenv("API_ID")
api_hash = os.getenv("API_HASH")
master_account = os.getenv("MASTER_ACCOUNT")
bot_token = os.getenv("BOT_TOKEN")

client = TelegramClient('telegregator_session', api_id, api_hash)

В начале подключаем все необходимые библиотеки, настраиваю логгер, подключение самого telethon'а. Дальнейшее общение с телегой будет происходить через client и его декораторы.

Все действия Телегрегатора разделены на три группы: реакция на команды пользователя, реакция на новые сообщения в добавленных каналах и реакция на служебные события (бот добавлен в чат, написал новый пользователь и т.д.).

Команды

На все входящие сообщения от пользователя бот реагирует через декоратор @client.on(events.NewMessage):

@client.on(events.NewMessage)
    async def my_event_handler(event):
        logger.info("\r\n\r\n\r\n-------------Новое сообщение----------------")

        ## Определяем кто к нам постучался
        if event.message.from_id:
            contact = await add_contact(event.message.from_id)

        # личка бота
        management = 0
        handled = 0

        if hasattr(event.message.to_id, 'channel_id'):
            handled = 1
            # logger.info('сообщение пришло в канал (тип channel_id), нужно обрабатывать для пересылки')

            try:
                chat = await client.get_input_entity(event.message.to_id)
            except Exception as e:
                logger.error(e)

        if hasattr(event.message.to_id, 'user_id'):
            if event.message.to_id.user_id == 887053090 or event.message.to_id.user_id == None:
                logger.info("сообщение в личку телегрегатора, обрабатывать как управляющую команду")
                management = 1
                try:
                    chat = await client.get_input_entity(event.message.from_id)
                    await client.send_read_acknowledge(chat, event.message)
                except Exception as e:
                    logger.error(e)
            else:
                try:
                    chat = await client.get_input_entity(event.message.to_id)
                except Exception as e:
                    logger.error(e)
        if hasattr(event.message.to_id, 'chat_id'):
            logger.info('сообщение в группу (chat_id) %s, нужно обрабатывать как команду' % event.message.to_id.chat_id)
            try:
                chat = await client.get_input_entity(event.message.to_id)
            except Exception as e:
                logger.error(e)

        if hasattr(event.message.to_id, 'channel_id'):
            logger.info('сообщение в канал (channel_id) %s, нужно обрабатывать как команду' % event.message.to_id.channel_id)

            try:
                chat = await client.get_input_entity(event.message.to_id)
            except Exception as e:
                logger.error(e)

            # для отладки, пересылаем сообщение админу
            await event.message.forward_to(master_account)

        # mark message as read
        await client.send_read_acknowledge(chat, event.message)

        #Парсим команду. 
        command_handler = TelegramCommand()
        command, command_arg = command_handler.parseMessage(event.message.message)
        # если распознана команда
        if command:
            # помечаем сообщение прочитанным
            await send_read(event.message.from_id, event.message)

            chat_command = command
            logger.info('Команда: {}, аргумент: {}'.format(chat_command, command_arg))

            # на присоединение к каналу
            if chat_command == 'help':
                await call_command(module='helpmessage', answer_to=chat)
            if chat_command == 'new':
                await call_command(module='new', answer_to=chat, contact=contact, group_name = command_arg)
            if chat_command == 'test':
                await call_command(module='test')
            if chat_command in ['join', 'add']:
                channels_list = command_arg.split(' ')
                for channel in channels_list:
                    await call_command(module='join', answer_to=chat, channel_name=channel)
            # на отписку от канала
            if chat_command in ['leave', 'remove', 'delete']:
                channels_list = command_arg.split(' ')
                for channel in channels_list:
                    await call_command(module='leave', answer_to=chat, channel_name=channel)
            # на список подписанных каналов
            if chat_command == 'list':
                await call_command(module='listchannels', answer_to=chat, feed_id=event.message.to_id.chat_id)
            ## Вкл/выкл поток
            if chat_command in ['stop', 'pause']:
                await call_command(module='stop', answer_to=chat)
            if chat_command in ['start', 'resume']:
                await call_command(module='start', answer_to=chat)
            if chat_command in ['deleteall', 'exit']:
                await call_command(module='deleteall', answer_to=chat)
            ## Фильтры и стоп слова
            if chat_command == 'filter':
                await call_command(module='filter', answer_to=chat, word = command_arg, contact_id = contact.contact_id)
            if chat_command == 'filterremove' or chat_command == 'removefilter' or chat_command == 'unfilter':
                await call_command(module='unfilter', answer_to=chat, word = command_arg, contact_id = contact.contact_id)
            if chat_command == 'filterlist':
                await call_command(module='filterlist', answer_to=chat, contact_id = contact.contact_id)
            if chat_command == 'filterclear' or chat_command == 'clearfilter':
                await call_command(module='filterclear', answer_to=chat, contact_id = contact.contact_id)
            if chat_command == 'filterstop' or chat_command == 'stopfilter':
                await call_command(module='filterstop', answer_to=chat)
            if chat_command == 'filterstart' or chat_command == 'sartfilter':
                await call_command(module='filterstart', answer_to=chat)
            if chat_command == 'message':
                await call_command(module='message', message=command_arg)

Тут проверяются наличие необходимых полей во входящем сообщении event.message. Определяем, в личку, в канал или в чат написали., И, через кучу ifов, выбирается модуль с соответствующей командой. Они лежат в отдельных файлах, которые импортируются динамически. Это первый кандидат на рефакторинг, надо обертки написать, чтобы поменьше кода дублировалось.

Все команды /join, /filter и прочие обрабатываются здесь.

Как поток подписывается на новые каналы

Тот же самый декоратор @client.on(events.NewMessage)обрабатывает сообщения в группах и каналах (PeerChat и PeerChannel), если переслать в него сообщение из канала для подписки:

				if event.message.fwd_from and event.message.fwd_from.channel_id and event.message.to_id.__class__.__name__ == 'PeerChat':
            logger.info("Вижу репост канала в группу")
            try:
                channelInfo = await client.get_entity(event.message.fwd_from.channel_id)
            except Exception as e:
                logger.error(e)
                logger.error("не можем получить channelInfo для канала %s" % event.message.fwd_from.channel_id)
                channelInfo = None
                await event.reply('не могу подписаться. Если канал закрытый, попробуйте по инвайту')
            else:
                channelEntity = await client.get_input_entity(event.message.fwd_from.channel_id)
                logger.info("Репост канала {} (@{}), id {}".format(channelInfo.title, channelInfo.username, channelInfo.id))
                await call_command(module='join', answer_to=chat, channel_name=channelInfo.username)

Пересылка постов из каналов в ленту

Ну и главный функционал, это пересылка всех постов из подписанных каналов в одну ленту (поток):

async def handle_message(event):
    try:
        logger.info('ищем в БД поток, где channel_id = %s' % event.message.to_id.channel_id)
        forwards = Forward.select().where(Forward.channel_id == event.message.to_id.channel_id)
    except Forward.DoesNotExist:
        logger.info('Канала нет среди пересылаемых')
        forwards = None
        return
    if not len(forwards):
        logger.info('нет подписок на этот канал')
        return

    logger.info("Нашли следующие потоки, подписанные на канал channel_id %s" % event.message.to_id.channel_id)
    logger.info(forwards)

    chat = await client.get_input_entity(event.message.to_id)
    await client.send_read_acknowledge(chat, event.message)

    for forward in forwards:
        # определяем надо ли пересылать
        try:
            feed = Feed.get(Feed.feed_id == forward.feed_id)
        except Feed.DoesNotExist:
            logger.info('Поток не найден')
            feed = None
            return

        if not feed.is_enable:
            logger.info("Поток был отключен, не пересылаем")
            return
        # logger.info('Определяем создателя потока, куда должен транслироваться канал, откуда пришло сообщение')
        log_message = 'Сообщение пришло в канал {}'.format(event.message.to_id)
        if hasattr(event.message.to_id, 'channel_id'):
            feed_id = forward.feed_id
            try:
                owner = Contact.select().join(Feed, on=(Contact.contact_id == Feed.contact_id)).where(Feed.feed_id == feed_id).get()
            except Contact.DoesNotExist:
                log_message = log_message + ', владелец канала не определен'
                owner = None
            else:
                log_message = log_message + ', id владельца канала: {}'.format(owner.contact_id)
            logger.info(log_message)
        else:
            logger.error('нет параметра event.message.to_id.chat_id')

        log_message = 'пробуем фильтровать.'
        filterlist = []

        # TODO получать для конкретного юзера. нужно будет поменять логику и порядок, перенести на 438 строку
        try:
            filters = Filter.select().where(Filter.contact_id == owner.contact_id)
        except filterlist.DoesNotExist:
            log_message = log_message + ' нет фильтров'
            filters = []
        else:
            log_message = log_message + ' найдены фильтры для юзера'
            filterlist = [filter.word.lower() for filter in filters]
        blacklistword = filterlist + ['запрещенный', 'выиграй', 'удалю', 'читать продолжение', 'joinchat', 'подписаться', 'подписывайся', 'подпишитесь', 'переходи', 'перейти в канал', 'подписываемся', 'дамы и господа', 'автор канала']
        message = event.message
        message_text = message.message
        if message.entities:
            for entity in message.entities:
                if entity.__class__.__name__ == 'MessageEntityTextUrl':
                    message_text = message_text + entity.url
        if message.reply_markup:
            for row in message.reply_markup.rows:
                for button in row.buttons:
                    if hasattr(button, 'url'):
                        message_text = message_text + button.url
                    if hasattr(button, 'text'):
                        message_text = message_text + button.text

        if any([word in message_text.lower() for word in blacklistword]): # ищем стоп слова во всех текстах и ссылках
            log_message = log_message + "Найдены стоп-слова, не пересылаем"
            logger.info(log_message)
            return
        logger.info(log_message)
        logger.info("Фильтры прошли, пересылаем из канала %s в поток %s" % (forward.channel_id, forward.feed_id))
        await event.message.forward_to(forward.feed_id)

Здесь по id канала (event.message.to_id.channel_id), куда пришло сообщение определяется, в какие потоки его следует переслать, путем выборки из БД через пивотную таблицу Forward. Forward это many-to-many связь между Feed и Channel (структура будет ниже). Проверяется, активен ли поток (я иногда отключаю потоки, чтобы не флудили, когда нет желания ничего читать).

Сообщения фильтруются по стоп-словам во второй части приведенного кода.

Надо будет вытащить список стоп-слов из кода куда-нибудь в конфиг. Как это лучше сделать?

Ну и вообще, много текстовых переменных в коде, нужно прикрутить какой-нибудь i18n со словарями. Что обычно для такого юзают?

Структура БД для хранения подписок

from peewee import *
db = SqliteDatabase('telegregator.db')

class BaseModel(Model):
    class Meta:
        database = db
class Contact(BaseModel):
    contact_id = IntegerField(unique=True)
    added_at = DateTimeField(default=datetime.datetime.now)
class Channel(BaseModel):
    channel_id = IntegerField(unique=True)
    channel_name = CharField(default='')
    channel_title = CharField(default='')
class Feed(BaseModel):
    feed_id = IntegerField(unique=True)
    contact = ForeignKeyField(Contact, backref='contacts')
    is_filter = BooleanField(default=True)
    is_enable = BooleanField(default=True)
    feed_title = CharField(default='')
class Forward(BaseModel):
    feed = ForeignKeyField(Feed, backref='feeds')
    channel = ForeignKeyField(Channel, backref='channels')
class Filter(BaseModel):
    word = CharField(unique=True)
    contact = ForeignKeyField(Contact, backref='contacts')

На этом, пожалуй, всё. Надеюсь, кому-то будет полезно. Хотелось бы обсудить, как вы решали схожие задачи (призываю @aav в комменты).

Я старался везде оставить отладочные сообщения логгера с комментариями, так что код не должен оказаться слишком запутанным. Еще раз ссылка на репу.

Если будут какие-то вопросы, то можно писать в телегу @parotikov. Ну, и, перед работой с внешними сервисами и API, проверяйте их возможности и лимиты, чтобы не быть как я :)

Теги:
Хабы:
Всего голосов 1: ↑1 и ↓0+1
Комментарии2

Публикации

Истории

Работа

Data Scientist
61 вакансия
Python разработчик
138 вакансий

Ближайшие события