Как стать автором
Поиск
Написать публикацию
Обновить

Игровой бот в Telegram с нуля: Как я автоматизировал создание контента с помощью Apache NiFi и LLM

Уровень сложностиПростой
Время на прочтение13 мин
Количество просмотров3.2K

Предисловие

Как быстро протестировать игровую идею без лишней сложности? Я создал текстовую игру в Telegram за выходные на Apache NiFi и Groovy, весь контент для которой генерируют языковые модели.

В итоге — легковесный, почти не требующий поддержки бот, которого не больно обновлять. Идеально для MVP.

Под катом — архитектура, этапы развития и как AI не просто отвечает, а становится движком продукта.

Все началось с желания сделать что-то интересное для подписчиков моего канала в Telegram. Захотелось интерактива — простой текстовой викторины или квеста. Но разворачивать полноценный бэкенд... Не для такого пет-проекта.

Цель была ясна: создать максимально простого и легковесного бота, которого было бы не больно поддерживать. Выбор пал на связку Apache NiFi для оркестрации и Groovy для скриптовой логики.

P.S. Полная инструкция по запуску, исходники скриптов и шаблон для NiFi я выложил в открытый доступ на GitHub. Буду рад звёздочкам и пул-реквестам!

FutureGuest Bot repository


Ключевые особенности первой версии (As-Is):

  • Текстовая с Markdown: Интерфейс простой и быстрый.

  • Нет состояния: Прогресс не сохраняется. Пользователь всегда начинает заново. Это сознательное решение для упрощения.

  • Контент — это JSON: Вся игра в одном файле. Замена сценария — это просто загрузка нового JSON.

Архитектурное решение: Pull vs Push

Я сознательно выбрал модель опроса (Polling). NiFi раз в 2 секунды сам спрашивает у Telegram Bot API (getUpdates), есть ли новые сообщения.

Почему?

  • Проще: Не нужен белый IP, SSL-сертификаты и открытые порты.

  • Надежнее: Если сервер падает, он просто запросит пропущенные сообщения позже. Ничего не теряется.

  • Идеально для прототипа.

Архитектура as is
Архитектура as is

Магия генерации контента

Вручную писать сотни ответвлений сюжета — скучно. Поэтому весь JSON я генерирую языковыми моделями (например, DeepSeek). Достаточно дать промт: "Сгенерируй JSON по шаблону <прикладываю шаблон task.json> для квеста на тему 'Космос'..." — и через минуту получаешь готовый, валидный файл. Это полностью меняет процесс!

Эволюция архитектуры: от спагетти-кода в PureData к контролируемому хаосу в NiFi

Изначально я пытался реализовать всю логику исключительно стандартными процессорами NiFi, вообще без скриптов.

Почему это казалось хорошей идеей?

  • Полное визуальное программирование: Весь workflow был собран в графе процессоров. RouteOnAttributeUpdateAttributeReplaceText — все это связывалось в причудливые цепочки.

  • Иллюзия простоты: Для простого сценария "вопрос-ответ" это даже работало.

  • Соответствие философии Low-Code/No-Code: Казалось, что это идеальное использование NiFi.

Увы, довольно быстро мой граф начал превращаться в подобие перформанса live-coding на  PureData.

Мой поток на второй день работы
Мой поток на второй день работы
Скрытый текст

Для тех, кто не в теме, PureData — это визуальная среда для создания музыки и мультимедиа-инсталляций, где логика также строится из соединенных друг с другом блоков. И там ценность часто имеет не только звучание, но и сам визуальный вид получившегося «патча» — сложное, причудливое переплетение связей становится частью шоу.

Со мной произошло ровно то же самое! Мой NiFi-граф стремительно терял сходство с инженерной конструкцией и всё больше напоминал авангардное digital-искусство. Было красиво и, признаюсь, даже весело наблюдать за этим хаосом. Но ровно до того момента, пока мне не потребовалось внести первое же изменение.
Я понял, что создаю не систему, а арт-объект. Ценность которого стремится к нулю, когда на кону стоит поддержка и развитие проекта.

Вывод, к которому я пришел:
Pure No-Code подход в NiFi идеален для линейных ETL-задач, но катастрофически не подходит для реализации сложной бизнес-логики, такой как игровой движок.

Решение использовать Groovy в ExecuteScript стало переломным моментом. Оно дало:

  • Гибкость: Сложная логика умещается в одном процессоре, а не в двадцати.

  • Читаемость: Код с условиями и циклами проще для понимания, чем запутанный граф.

  • Лёгкость поддержки: Чтобы добавить новую фичу, я правлю код в одном месте, а не перестраиваю половину потока.

  • Мощь: Полный доступ к Java-стеку и библиотекам прямо из скрипта.

Это идеальный компромисс: визуальный дизайн потоков данных (куда идут сообщения) остался за NiFi, а сложная логика — за кодом. Каждый инструмент делает то, что у него получается лучше всего.

Как это работает?

Вся логика получается наглядной и управляется через UI NiFi. Добавить новое условие или ветвление — дело нескольких кликов.

Готовый поток телеграм бота
Готовый поток телеграм бота

Сердце системы: скрипты на Groovy

Основная бизнес-логика реализована в скриптах на Groovy, работающих внутри процессоров ExecuteScript. Это дает гибкость мощного языка на базе JVM с прямым доступом ко всем возможностям NiFi.

1. Роутинг входящих сообщений
Первый скрипт определяет тип входящего события: нажатие на inline-кнопку (callback), текстовую команду (command) или что-то иное.

Скрытый текст
import groovy.json.JsonSlurper

def flowFile = session.get()
if (!flowFile) return

try {
    def content = session.read(flowFile).getText('UTF-8')
    def update = new JsonSlurper().parseText(content)

    def eventType = update.callback_query ? "callback" :
                    update.message?.text?.startsWith('/') ? "command" : "unknown"
    def messageData = update.callback_query?.data ?: update.message?.text ?: ""
    def chatId = update.message?.chat?.id ?: update.callback_query?.message?.chat?.id ?: ""

    // Сохраняем все в атрибуты и передаем дальше
    flowFile = session.putAllAttributes(flowFile, [
        'telegram.event_type': eventType,
        'telegram.message_data': messageData,
        'telegram.chat_id': chatId.toString()
    ])

    // Просто передаем дальше, а роутинг сделаем RouteOnAttribute
    session.transfer(flowFile, REL_SUCCESS)

} catch (e) {
    log.error("Ошибка формирования запроса: " + e, e)
    flowFile = session.putAttribute(flowFile, 'error.message', e.message)
    session.transfer(flowFile, REL_FAILURE)
}

После этого скрипта процессор RouteOnAttribute направляет FlowFile по соответствующим путям (telegram.event_type), например, на обработку команд или callback'ов.

2. Обработка сцен и игровой логики
Самый объемный скрипт обрабатывает нажатия на кнопки, управляет переходами между сценами и формирует ответы. Вот его ключевые части:

Скрытый текст
import groovy.json.JsonSlurper
import groovy.json.JsonOutput

def flowFile = session.get()
if (!flowFile) return

try {
    // 1. Получаем данные из атрибутов
    def callbackData = flowFile.getAttribute('telegram.message_data')
    def currentScene = flowFile.getAttribute('current_scene') ?: "start"
    def configFile = flowFile.getAttribute('configFile')  

    // 2. Загружаем конфигурацию
    def config = new JsonSlurper().parse(new File(configFile))
    // 3. Обрабатываем callback и получаем результат
    def result = processCallback(callbackData, currentScene, config)
    def response = result.response
    def newScene = result.scene 

    // Определяем endpoint based on media type
    def endpoint = 'sendMessage'
        if (response.photo) {
            endpoint = 'sendPhoto'
        } else if (response.video) {
            endpoint = 'sendVideo'
        } else if (response.audio) {
            endpoint = 'sendAudio'
        }

    // 4. Добавляем кнопки меню
    addMenuButtons(response)

    // Сохраняем результат
    flowFile = session.putAllAttributes(flowFile, [
        'current_scene': newScene,
        'telegram.endpoint': endpoint,
        'response.json': JsonOutput.toJson(response),
        'response.parse_mode': 'MarkdownV2'
    ])

    session.transfer(flowFile, REL_SUCCESS)
} catch (e) {
    log.error("Ошибка формирования запроса: " + e, e)
    flowFile = session.putAttribute(flowFile, 'error.message', e.message)
    session.transfer(flowFile, REL_FAILURE)
}

// Главная функция обработки callback
def processCallback(callbackData, currentScene, config) {
    def result = [response: [:], scene: currentScene]
    switch (callbackData) {
        case { it?.startsWith("scene:") }:
            result = handleSceneTransition(callbackData, config, currentScene)
            break
        case { it?.startsWith("quest:") && !it?.contains(":answer:") }:
            result = handleQuestStart(callbackData, config, currentScene)
            break
        case { it?.startsWith("quest:") && it?.contains(":answer:") }:
            result = handleQuestAnswer(callbackData, config, currentScene)
            break
        default:
            result.response = [text: "⚠️ Неизвестная команда"]
            break
    }
    return result
} 

// Обработка переходов между сценами
def handleSceneTransition(callbackData, config, currentScene) {
    def sceneName = callbackData.replace("scene:", "")
    return [
        response: config.scenes[sceneName],
        scene: sceneName
    ]
}  

// Обработка начала квеста
def handleQuestStart(callbackData, config, currentScene) {
    def questId = callbackData.replace("quest:", "")
    def quest = config.quests[questId]
    if (quest) {
        return [
            response: buildQuestResponse(quest),
            scene: "quest:" + questId
        ]
    } else {
        return [
            response: [text: "❌ Задание не найдено"],
            scene: currentScene
        ]
    }
}

// Обработка ответов на вопросы квестов
def handleQuestAnswer(callbackData, config, currentScene) {
    def parts = callbackData.split(":")
    if (parts.size() >= 4) {
        def questId = parts[1]
        def quest = config.quests[questId]
        if (quest) {
            def answer = quest.answers[callbackData]
            if (answer) {
                return handleAnswerTransition(answer, config, currentScene)
            } else {
                return [
                    response: [text: "❌ Ответ не найден"],
                    scene: currentScene
                ]
            }
        } else {
            return [
                response: [text: "❌ Задание не найдено"],
                scene: currentScene
            ]
        }
    } else {
        return [
            response: [text: "⚠️ Неверный формат ответа"],
            scene: currentScene
        ]
    }
}

// Обработка перехода к следующему квесту (тоже нужно обновить)
def handleQuestTransition(nextQuestId, config, currentScene) {
    def questId = nextQuestId.replace("quest:", "")
    def nextQuest = config.quests[questId]
    if (nextQuest) {
        return [
            response: buildQuestResponse(nextQuest),
            scene: "quest:" + questId
        ]
    } else {
        return [
            response: [text: "❌ Следующее задание не найдено"],
            scene: currentScene
        ]
    }
}

  

// Обновленная функция создания ответа для квеста
def buildQuestResponse(quest) {
    if (!quest) return [text: "❌ Ошибка: задание не найдено"]
    def response = [:]
    // Добавляем медиафайлы если есть
    if (quest.image) {
        response.photo = "/opt/nifi/quest/images/" + quest.image
        response.caption = quest.text
        response.parse_mode = 'MarkdownV2'
    } else if (quest.video) {
        response.video = "/opt/nifi/quest/video/" + quest.video  
        response.caption = quest.text
        response.parse_mode = 'MarkdownV2'
    } else if (quest.audio) {
        response.audio = "/opt/nifi/quest/audio/" + quest.audio
        response.caption = quest.text
        response.parse_mode = 'MarkdownV2'
    } else {
        // Если нет медиа - просто текст
        response.text = quest.text
        response.parse_mode = 'MarkdownV2'
    }

    // Добавляем кнопки ответов если есть
    if (quest.answers) {
        def keyboard = []
        quest.answers.each { key, answer ->
            keyboard << [text: answer.text, callback_data: key]
        }
        response.reply_markup = [inline_keyboard: [keyboard]]
    }
    return response
}

// Добавление кнопок меню к ответу
def addMenuButtons(response) {
    if (response && response instanceof Map) {
        // Если response - простой текст (без reply_markup)
        if (!response.reply_markup && response.text) {
            response.reply_markup = [inline_keyboard: []]
        }

        // Добавляем кнопки меню
        if (response.reply_markup) {
            def menuButton = [text: "🏠 Меню", callback_data: "scene:start"]
            def helpButton = [text: "❓ Помощь", callback_data: "scene:help"]
            def menuRow = [menuButton, helpButton]
            if (response.reply_markup.inline_keyboard instanceof List) {
                response.reply_markup.inline_keyboard.add(menuRow)
            } else {
                response.reply_markup.inline_keyboard = [menuRow]
            }
        }
    }
}

// Обработка переходов по ответам
def handleAnswerTransition(answer, config, currentScene) {
    if (!answer) {
        return [response: [text: "❌ Ответ не распознан"], scene: currentScene]
    }

    def result = [scene: currentScene]
    // Обрабатываем переход
    if (answer.next) {
        if (answer.next.startsWith("quest:")) {
            def nextQuestId = answer.next.replace("quest:", "")
            def nextQuest = config.quests[nextQuestId]
            if (nextQuest) {
                result.response = buildQuestResponse(nextQuest)
                result.scene = "quest:" + nextQuestId
            }
        } else if (answer.next.startsWith("scene:")) {
            def sceneName = answer.next.replace("scene:", "")
            def nextScene = config.scenes[sceneName]
            if (nextScene) {
                result.response = nextScene
                result.scene = sceneName
            }
        }
    }

    // Если есть response - добавляем/заменяем текст

    if (answer.response) {
        if (result.response && result.response.text) {
            // Добавляем response к существующему тексту
            result.response.text = answer.response + "\n\n" + result.response.text
        } else {
            // Создаем новое текстовое сообщение
            result.response = [text: answer.response, parse_mode: 'MarkdownV2']
        }
    }
    return result
}

Это ядро игры. Скрипт загружает JSON-конфигурацию, определяет, какую сцену показать пользователю next, и формирует ответ с кнопками.

3. Подготовка исходящего запроса к Telegram
Финальный скрипт гарантирует, что данные для отправки в Telegram API имеют правильный формат.

Скрытый текст
import org.apache.nifi.processor.io.StreamCallback
import groovy.json.JsonSlurper
import groovy.json.JsonOutput

def flowFile = session.get()
if (!flowFile) return

try {
    // Берем готовый JSON из атрибута
    def responseJson = flowFile.getAttribute('response.json')
    def telegramRequest = new JsonSlurper().parseText(responseJson)
    
    // Добавляем обязательные поля
    telegramRequest.chat_id = flowFile.getAttribute('telegram.chat_id')
    telegramRequest.parse_mode = flowFile.getAttribute('response.parse_mode') ?: 'MarkdownV2'
    
    // Конвертируем обратно в JSON
    def jsonOutput = JsonOutput.toJson(telegramRequest)
    
    // Перезаписываем содержимое FlowFile
    flowFile = session.write(flowFile, { inputStream, outputStream ->
        outputStream.write(jsonOutput.getBytes('UTF-8'))
    } as StreamCallback)
    
    session.transfer(flowFile, REL_SUCCESS)
} catch (e) {
    log.error("Ошибка формирования запроса: " + e, e)
    flowFile = session.putAttribute(flowFile, 'error.message', e.message)
    session.transfer(flowFile, REL_FAILURE)
}

Этот скрипт гарантирует, что каждый запрос к API Telegram содержит правильный chat_id и форматирование, независимо от того, что вернула игровая логика.

Эволюция: дорожная карта (To-Be)

Прототип готов. Но как превратить это в систему, которая работает сама?
Основываясь на опыте работы с RAG-системами, я вижу следующее:

Архитектура to be включающая в себя модуль генерации контента.
Архитектура to be включающая в себя модуль генерации контента.

Полностью автоматизированный AI-конвейер:

  1. Планирование: Владелец бота просто готовит CSV-файл с промтами и темами на год вперед.

  2. Генерация: NiFi парсит CSV, отправляет промт в локальную LLM (Ollama) через API, что дает полный контроль над данными и их приватностью, а также снижает стоимость генерации и получает JSON конфигурации игры по каждому промпту.

  3. Валидация и коррекция: Сгенерированный JSON автоматически проверяется. Если есть ошибки, система сама отправляет его и описание ошибок обратно в модель для исправления.

  4. Деплой: Валидный JSON автоматически подкладывается в бота по расписанию (например раз в 2 недели, или раз в месяц).

  5. Анонсирование: Система может сама публиковать анонсы новых игр в канале.

Такой подход превращает бота из статичного приложения в динамичный, самообновляющийся сервис.

Выводы
Этот проект — отличный пример того, как можно использовать нестандартные инструменты для быстрого прототипирования. Apache NiFi показал себя как гибкая среда для создания бэкенда, а LLM — не просто чат, а движок для генерации структурированного контента.

Такой подход отлично подходит для создания прототипов, образовательных проектов или простых сервисов, где важна скорость разработки.

Этот проект стал для меня отличным полигоном для испытания гипотез. Он наглядно показал, что современные LLM и low-code инструменты вроде NiFi позволяют буквально за несколько дней создать работающий прототип и сразу проверить его на реальной аудитории. Главное — не бояться экспериментировать и находить нестандартные применения знакомым технологиям.

Как вы думаете, насколько оправдано использовать ETL-инструмент вроде NiFi для бэкенда?

Теги:
Хабы:
+1
Комментарии6

Публикации

Ближайшие события