
Иванов Максим
Младший Java программист
Рецепт по приготовлению своего «Telegram-монстра Франкенштейна»

Всем привет, данная статья является, своего рода моей первой, но все же постараюсь максимально просто рассказать вам о том, как создать бота, прикрутив к нему все обещанные выше свистелки-тарахтелки.
Статьи будут разделены на 2 части, первая часть - создание основного бота с отправкой логов (Kafka Producer) и записью их в БД, вторая часть - обработка всех логов (Kafka Consumer).
Ингредиенты:
Создание Spring Boot проект, проще всего это сделать через Spring Initializr. (в качестве системы сборки будет использоваться Gradle)
PostgreSQL (для комфортной работы я использую DBeaver)
Если возникнут сложности с воссозданием туториала
Начинаем с нарезки:
Первостепенно нужно настроить build.gradle со всеми зависимостями
build.gradle
buildscript { repositories { mavenCentral() } } plugins { id 'org.springframework.boot' version '2.4.2' id 'io.spring.dependency-management' version '1.0.11.RELEASE' id 'java' } apply from: 'build-test.gradle' group 'com.sercetary.bot' sourceCompatibility = '14' configurations { compileOnly { extendsFrom annotationProcessor } } repositories { mavenCentral() } configurations.all { exclude module: 'slf4j-log4j12' } dependencies { implementation 'org.springframework.boot:spring-boot-starter-web:2.5.6' implementation 'org.springframework.boot:spring-boot-starter-jdbc:2.5.6' implementation 'org.springframework.data:spring-data-commons:2.6.0' implementation 'org.springframework.kafka:spring-kafka:2.7.6' implementation 'org.postgresql:postgresql:42.3.1' implementation 'com.h2database:h2:1.4.200' implementation group: 'org.telegram', name: 'telegrambots-abilities', version: '5.3.0' implementation group: 'org.telegram', name: 'telegrambots', version: '5.3.0' compile group: 'org.slf4j', name: 'slf4j-log4j12', version: '1.7.29' compileOnly 'org.projectlombok:lombok:1.18.22' annotationProcessor 'org.projectlombok:lombok:1.18.22' }
Далее сразу для работы Kafka опишем application.yml, в котором находятся настройки нашего kafka producer
application.yml
server: port: 9000 spring: kafka: producer: bootstrap-servers: localhost:9092 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
Теперь настройки application.properties
application.properties
# HTTP port for incoming requests server.port=8081 app.http.bot=change-me telegram-bot.name=change-me telegram-bot.token=change-me # Bot db app.db.bot-db.url=jdbc:postgresql://localhost:5432/change-me app.db.bot-db.driver=org.postgresql.Driver app.db.bot-db.user=change-me app.db.bot-db.password=change-me app.db.bot-db.pool-size=10 # logging logging.level.root=INFO logging.level.org.springframework.web=DEBUG logging.level.ru.centerinform.webhook=TRACE logging.file.name=change-me
Хорошо, после настроек нашего проекта, давайте обговорим его структуру:

Пакеты:
config - описание бинов и конфигурации проекта
controller - обрабатывает запрос пользователя
dto - хранит данные, а так же описывает модель таблицы БД
exceptions - кастомный пакет обработчика ошибок
repository - логика работа с БД
service - основная бизнес логика проекта
Сейчас мы собираем ингредиенты и маринуем:
Настройки бинов:
- Первым делом прописываем конфигурация бинов нашего приложения в пакете config, тут настройки инициализации TelegramBotsApi и ObjectMapper
AppConfig
@Configuration public class AppConfig { @Bean ObjectMapper customObjectMapper() { return new ObjectMapper(); } @Bean TelegramBotsApi telegramBotsApi() throws TelegramApiException{ return new TelegramBotsApi(DefaultBotSession.class); } }
- Внутри нашего класса DbConfig, есть класс SpringDataJdbcProperties, который описывает настройки SpringDataJdbc
DbConfig
@Configuration public class DbConfig extends DefaultDbConfig { @Bean @Qualifier("bot-db") @ConfigurationProperties(prefix = "app.db.bot-db") SpringDataJdbcProperties gitlabJdbcProperties() { return new SpringDataJdbcProperties(); } @Bean @Qualifier("bot-db") public DataSource gitlabDataSource(@Qualifier("bot-db") SpringDataJdbcProperties properties) { return hikariDataSource("db", properties); } @Bean @Qualifier("bot-db") JdbcTemplate gitlabJdbcTemplate(@Qualifier("bot-db") DataSource dataSource) { return new JdbcTemplate(dataSource); } @Data @NoArgsConstructor public static class SpringDataJdbcProperties { // constants private static final String H2_DATABASE_DRIVER = "org.h2.Driver"; /** * JDBC URL property */ String url; /** * JDBC driver class name property */ String driver; /** * JDBC username property */ String user; /** * JDBC password property */ String password; /** * Hikari / Vertica maxPoolSize property */ String poolSize; /** * Minimum pool size */ int minPoolSize = 4; /** * Maximum pool size */ int maxPoolSize = 10; /** * This property controls the maximum amount of time (in milliseconds) that a connection is allowed to * sit idle in the pool. A value of 0 means that idle connections are never removed from the pool. */ long idleTimeout; /** * This property controls the maximum lifetime of a connection in the pool. When a connection * reaches this timeout, even if recently used, it will be retired from the pool. * An in-use connection will never be retired, only when it is idle will it be removed */ long maxLifetime; /** * Bulk insert size */ Integer bulkSize; /** * All-args constructor for {@link SpringDataJdbcProperties#toString()} (logging) * * @param url JDBC driver class name property * @param driver JDBC driver class name property * @param user JDBC username property * @param password JDBC password property * @param poolSize Hikari / Vertica maxPoolSize property * @param bulkSize bulk insert size */ public SpringDataJdbcProperties( String url, String driver, String user, String password, String poolSize, Integer bulkSize) { this.url = url; this.driver = driver; this.user = user; this.password = password; this.poolSize = poolSize; this.bulkSize = bulkSize; } /** * Возвращает истину, если экземпляр описывает in-memory H2 database * * @return истина, если экземпляр описывает in-memory H2 database */ public boolean isH2Database() { return driver.equals(H2_DATABASE_DRIVER); } /** * Возвращает строковое представление экземпляра объекта в формате JSON * * @return строковое представление экземпляра объекта в формате JSON */ @Override public String toString() { var props = new SpringDataJdbcProperties( url, driver, user, ((password == null) || password.isEmpty()) ? "" : "*****", poolSize, bulkSize); return Json.encode(props); } } }
- Создадим базовый класс для уменьшения дублирования кода инициализации бинов
DefaultDbConfig
@Slf4j class DefaultDbConfig { protected DataSource hikariDataSource(String tag, DbConfig.SpringDataJdbcProperties properties) { log.info("[{}] настройки БД: [{}]", tag, properties.toString()); HikariDataSource ds = new HikariDataSource(); ds.setJdbcUrl(properties.getUrl()); ds.setDriverClassName(properties.getDriver()); ds.setUsername(properties.getUser()); ds.setPassword(properties.getPassword()); ds.setMaximumPoolSize(Integer.parseInt(properties.getPoolSize())); return ds; } }
- После напишем утилитный класс для логирования
Json
public class Json { static final ObjectMapper mapper = new ObjectMapper(); /** * Encode instance as JSON * * @param obj instance * @return JSON */ public static String encode(Object obj) { try { return mapper.writeValueAsString(obj); } catch (JsonProcessingException e) { return obj.toString(); } } public static <T> T decode(String json, Class<T> clazz) throws JsonProcessingException { return mapper.readValue(json, clazz); }
Далее мы напишем контроллер, для доступа к сервису из вне
- Создаем простенький контроллер, для получения списка записей из БД
UsersController
@Slf4j @RestController @RequestMapping("${app.http.bot") @RequiredArgsConstructor @SuppressWarnings("unused") public class UsersController { private final UserService userService; /** * Возвращает список пользователей и связанных с ними планами */ @RequestMapping(path = "/users_idea", method = RequestMethod.GET) public List<User> getIdeaList() { log.debug("Method - getIdeaList was called"); return userService.getUserList(); } }
После переходим к созданию модели
- Создаем модель пользователя User, а так же его маппер UserMapper, который понадобиться для работы с БД и маппинга полей в таблице
User
@Data @RequiredArgsConstructor public class User { /** * user's id */ @JsonProperty("id") private final int id; /** * user's name */ @JsonProperty("name") private final String name; /** * description */ @JsonProperty("description") private final String description; private String startWord = ""; @Override public String toString() { return startWord + description; } }
UserMapper
@Slf4j public class UserMapper implements RowMapper<User> { @Override public User mapRow(ResultSet rs, int rowNum) throws SQLException { var entity = new User( rs.getInt("id"), rs.getString("user_name"), rs.getString("description") ); log.trace("mapRow(): entity = [{}]", entity); return entity; } }
Переходим к созданию кастомных exception
Для чего они нужны
Их мы используем для обработки ошибок, которые могут произойти в процессе работы приложения, чтобы бот не сломался и продолжил свою работу.
- BaseException - класс, который наследуется от RuntimeException, в конструкторе принимает 2 параметра - сообщение и тело ошибки
BaseException
@Slf4j public class BaseException extends RuntimeException{ public BaseException(String msg, Throwable t) { super(msg, t); log.error(msg, t); } public BaseException(String msg) { super(msg); log.error(msg); } }
- NotFoundException - класс, который вызывается, когда ответ не найден, наследуется от BaseException
NotFoundException
@ResponseStatus(HttpStatus.NOT_FOUND) public class NotFoundException extends BaseException { private final static String MESSAGE = "Not Found"; public NotFoundException(Throwable t) { super(MESSAGE, t); } public NotFoundException() { super(MESSAGE); } }
- DbException - класс, который обрабатывает ошибки связанные с БД, наследуется от RuntimeException
DbException
@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR) public class DbException extends RuntimeException { private static final String MESSAGE = "Ошибка БД"; public DbException(String message) { super(message); } public DbException(Throwable cause) { super(MESSAGE, cause); } }
Теперь для работы с БД, создаем repository
- Создадим интерфейс, который описывает методы, для работы с записями в БД
IUserRepository
public interface IUserRepository { /** * Возвращает список записей по id * * @return запрашиваемая запись * @throws DbException в случае ошибки БД */ User getById(int id); /** * Возвращает список записей * * @return список всех записей * @throws DbException в случае ошибки БД */ List<User> getUserList(); /** * Вставка новой записи * * @param entity новая запись * @throws DbException в случае ошибки БД */ void insert(User entity); /** * Удаление записи * * @param entity удаляемая запись * @throws DbException в случае ошибки БД */ void delete(User entity); }
- Теперь напишем класс, который реализует методы интерфейса
UserRepository
@Slf4j @Repository public class UserRepository implements IUserRepository { // constants private static final String SQL_SELECT_BY_NAME = "" + "SELECT id, user_name, description FROM user_table WHERE id=?"; private static final String SQL_SELECT_LIST = "" + "SELECT id, user_name, description FROM user_table"; private static final String SQL_INSERT = "" + "INSERT INTO user_table (user_name, description) VALUES (?, ?)"; private static final String SQL_DELETE = "" + "DELETE FROM user_table WHERE id = ?"; protected final static UserMapper USER_MAPPER = new UserMapper(); // beans protected final JdbcTemplate template; /** * Req-args constructor for Spring DI */ public UserRepository(@Qualifier("bot-db") JdbcTemplate template) { this.template = template; } /** * Возвращает список записей по id * * @return запрашиваемая запись * @throws DbException в случае ошибки БД */ @Override public User getById(int id) throws DbException { try { return DataAccessUtils.singleResult( template.query(SQL_SELECT_BY_NAME, USER_MAPPER, id)); } catch (DataAccessException exception) { throw new DbException(exception); } } /** * Возвращает список записей * * @return запрашиваемая запись * @throws DbException в случае ошибки БД */ @Override public List<User> getUserList() throws DbException { try { return template.query(SQL_SELECT_LIST, USER_MAPPER); } catch (DataAccessException exception) { throw new DbException(exception); } } /** * Вставка новой записи * * @param entity новая запись * @throws DbException в случае ошибки БД */ @Override public void insert(User entity) throws DbException { try { // В параметры запроса все поля сущности кроме идентификатора, т.к. он serial и генерируется автоматом var result = template.update(SQL_INSERT, entity.getName(), entity.getDescription()); if (result != 1) log.trace("UserRepository.update() with {} rows inserted", entity); log.info("insert({}) result={}", entity, result); } catch (DataAccessException exception) { throw new DbException(exception); } } /** * Удаление записи * * @param entity удаляемая запись * @throws DbException в случае ошибки БД */ @Override public void delete(User entity) throws DbException { try { var result = template.update(SQL_DELETE, entity.getId()); if (result != 1) log.trace("UserRepository.delete() with {} rows inserted", entity); log.info("delete({}) result={}", entity, result); } catch (DataAccessException exception) { throw new DbException(exception); } } }
- Далее у нас идет логика бота, тут все тривиально, в от наследованном onUpdateReceived методе от класса родителя TelegramLongPollingBot мы пишем поведение, которое происходит при обновлении чата с пользователем, подробнее об этом здесь, так же в методе обработки сообщений есть вызов нашего producer и запись данных в БД
TelegramBot
@Slf4j @Getter @Component public class TelegramBot extends TelegramLongPollingBot { private Message requestMessage = new Message(); private final SendMessage response = new SendMessage(); private final Producer producerService; private final UserService userService; private final String botUsername; private final String botToken; public TelegramBot( TelegramBotsApi telegramBotsApi, @Value("${telegram-bot.name}") String botUsername, @Value("${telegram-bot.token}") String botToken, Producer producerService, UserService userService) throws TelegramApiException { this.botUsername = botUsername; this.botToken = botToken; this.producerService = producerService; this.userService = userService; telegramBotsApi.registerBot(this); } /** * Этот метод вызывается при получении обновлений через метод GetUpdates. * * @param request Получено обновление */ @SneakyThrows @Override public void onUpdateReceived(Update request) { requestMessage = request.getMessage(); response.setChatId(requestMessage.getChatId().toString()); var entity = new User( 0, requestMessage.getChat().getUserName(), requestMessage.getText()); if (request.hasMessage() && requestMessage.hasText()) log.info("Working onUpdateReceived, request text[{}]", request.getMessage().getText()); if (requestMessage.getText().equals("/start")) defaultMsg(response, "Напишите команду для показа списка мыслей: \n " + "/idea - показать мысли"); else if (requestMessage.getText().equals("/idea")) onIdea(response); else defaultMsg(response, "Я записал вашу мысль :) \n "); log.info("Working, text[{}]", requestMessage.getText()); if (requestMessage.getText().startsWith("/")) { entity.setStartWord("команда: "); producerService.sendMessage( entity); } else { entity.setStartWord("мысль: "); producerService.sendMessage( entity); userService.insert(entity); } } /** * Метод отправки сообщения со списком мыслей - по команде "/idea" * * @param response - метод обработки сообщения */ private void onIdea(SendMessage response) throws TelegramApiException { if (userService.getUserList().isEmpty()) { defaultMsg(response, "В списке нет мыслей. \n"); } else { defaultMsg(response, "Вот список ваших мыслей: \n"); for (User txt : userService.getUserList()) { response.setText(txt.toString()); execute(response); } } } /** * Шабонный метод отправки сообщения пользователю * * @param response - метод обработки сообщения * @param msg - сообщение */ private void defaultMsg(SendMessage response, String msg) throws TelegramApiException { response.setText(msg); execute(response); } }
Фрагмент кода с отправкой в Kafka и записью в БД
if (requestMessage.getText().startsWith("/")) { entity.setStartWord("команда: "); producerService.sendMessage( entity); } else { entity.setStartWord("мысль: "); producerService.sendMessage( entity); userService.insert(entity); }
Переходим к созданию бизнес логики приложения
- BaseService - реализует базовые методы сервисов проекта
BaseService
public class BaseService { /** * Обёртка результата * * @param result результат * @return результат * @throws NotFoundException если результат null */ public <T> T wrapResult(T result) { if(result == null) throw new NotFoundException(); return result; } /** * Обёртка результата * * @param result результат * @return результат * @throws NotFoundException если результат null или пустой */ public <T> List<T> wrapResults(List<T> result) { if(result == null || result.size() == 0) throw new NotFoundException(); return result; } }
- Класс UserService работает с нашим репозиторием IUserRepository и содержит в себе бизнес-логику работы с записями о событиях в БД
UserService
@Service @Slf4j @RequiredArgsConstructor public class UserService extends BaseService { //beans protected final IUserRepository repo; /** * Возвращает список записей * * @return список записей * @throws DbException в случае ошибки БД */ public List<User> getUserList() { log.trace("#### getUserList() - working"); return wrapResults(repo.getUserList()); } /** * Возвращает список записей по id * * @throws DbException в случае ошибки БД */ public User getById(int id) { log.trace("#### getById() [id={}]", id); return wrapResult(repo.getById(id)); } /** * Вставка новой записи * * @param entity новая запись * @throws DbException в случае ошибки БД */ public void insert(User entity) { log.trace("#### insert() [entity={}]", entity); repo.insert(entity); } /** * Удаление записи * * @param entity удаляемая запись * @throws DbException в случае ошибки БД */ public void delete(User entity) { log.trace("#### delete() [entity={}]", entity); repo.delete(entity); } }
- Класс Producer, как раз тот класс, который шлет сообщения в топик users, а так же здесь мы можем изменять формат самого сообщения и данные, которые он отправляет
Producer
@Service @Slf4j public class Producer { private static final String TOPIC = "users"; protected final IUserRepository repo; @Autowired private KafkaTemplate<String, String> kafkaTemplate; public Producer(IUserRepository repo) { this.repo = repo; } public void sendMessage(User user) { if (user.getName() == null || user.getDescription().isEmpty()) log.info("#### Empty name/description message"); log.info("#### Producing message [user={}]", user); kafkaTemplate.send(TOPIC, "Writing in log -> " + user); } }
В конце класс, который собственно и запускает все наше приложение
WebHookApp
@Slf4j @SpringBootApplication public class WebHookApp { public static void main(String[] args) { SpringApplication.run(WebHookApp.class, args); } }
Теперь мы замариновали все ингредиенты и подготовили блюдо к запеканию:
- Сначала проверим, запущена ли Kafka

- После, запускаем Conductor и видим, что у нас работает брокер сообщений, после запуска нашего приложения, тут появится топик users, в который будут лететь сообщения отправленные нашим producer

- Далее запускаем DBeaver и создаем 2 таблицы (log и user_table), вот схема создания таблиц:
CREATE TABLE public.log ( id serial4 NOT NULL, message varchar(500) NOT NULL, date_time date NOT NULL, topic varchar(100) NOT NULL, CONSTRAINT log_pkey PRIMARY KEY (id) );
CREATE TABLE public.user_table ( id serial4 NOT NULL, user_name varchar(100) NOT NULL, description varchar(500) NULL, CONSTRAINT user_table_pkey PRIMARY KEY (id) );



Отлично, блюдо запеклось и готово к подаче:
- Запускаем проект, проверяем, что все настроено и корректно работает

- Открываем телеграм и пробуем на вкус нашего "монстра-Франкенштейна"
Пишем - /start и начинаем тест ... Я в шоке, оно живое !

- Давайте посмотрим, что же нам написал Spring Boot в логах и записались ли данные в Kafka и БД ?
Логи нашего бота, ошибок не наблюдается
. ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.4.2) 2022-01-15 16:46:19.248 INFO 412498 --- [ main] com.secretary.bot.WebHookApp : The following profiles are active: bot 2022-01-15 16:46:19.291 WARN 412498 --- [kground-preinit] o.s.h.c.j.Jackson2ObjectMapperBuilder : For Jackson Kotlin classes support please add "com.fasterxml.jackson.module:jackson-module-kotlin" to the classpath 2022-01-15 16:46:19.882 INFO 412498 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8081 (http) 2022-01-15 16:46:19.887 INFO 412498 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat] 2022-01-15 16:46:19.887 INFO 412498 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.41] 2022-01-15 16:46:19.956 INFO 412498 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext 2022-01-15 16:46:19.957 INFO 412498 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 678 ms 2022-01-15 16:46:20.013 INFO 412498 --- [ main] c.secretary.bot.config.DefaultDbConfig : [db] настройки БД: [{"url":"jdbc:postgresql://localhost:5432/postgres","driver":"org.postgresql.Driver","user":"*****","password":"*****","poolSize":"10","minPoolSize":4,"maxPoolSize":10,"idleTimeout":0,"maxLifetime":0,"bulkSize":null,"h2Database":false}] 2022-01-15 16:46:20.565 INFO 412498 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor' 2022-01-15 16:46:20.574 DEBUG 412498 --- [ main] s.w.s.m.m.a.RequestMappingHandlerAdapter : ControllerAdvice beans: 0 @ModelAttribute, 0 @InitBinder, 1 RequestBodyAdvice, 1 ResponseBodyAdvice 2022-01-15 16:46:20.598 DEBUG 412498 --- [ main] s.w.s.m.m.a.RequestMappingHandlerMapping : 3 mappings in 'requestMappingHandlerMapping' 2022-01-15 16:46:20.619 DEBUG 412498 --- [ main] o.s.w.s.handler.SimpleUrlHandlerMapping : Patterns [/webjars/**, /**] in 'resourceHandlerMapping' 2022-01-15 16:46:20.627 DEBUG 412498 --- [ main] .m.m.a.ExceptionHandlerExceptionResolver : ControllerAdvice beans: 0 @ExceptionHandler, 1 ResponseBodyAdvice 2022-01-15 16:46:20.702 INFO 412498 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8081 (http) with context path '' 2022-01-15 16:46:20.709 INFO 412498 --- [ main] com.secretary.bot.WebHookApp : Started WebHookApp in 1.65 seconds (JVM running for 1.962) SSS2022-01-15 16:52:33.916 INFO 412498 --- [legram Executor] o.a.k.clients.producer.ProducerConfig : ProducerConfig values: acks = 1 batch.size = 16384 bootstrap.servers = [localhost:9092] buffer.memory = 33554432 client.dns.lookup = use_all_dns_ips client.id = producer-1 compression.type = none connections.max.idle.ms = 540000 delivery.timeout.ms = 120000 enable.idempotence = false interceptor.classes = [] internal.auto.downgrade.txn.commit = true key.serializer = class org.apache.kafka.common.serialization.StringSerializer linger.ms = 0 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metadata.max.idle.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 2147483647 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.StringSerializer 2022-01-15 16:52:33.947 INFO 412498 --- [legram Executor] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.6.0 2022-01-15 16:52:33.948 INFO 412498 --- [legram Executor] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 62abe01bee039651 2022-01-15 16:52:33.948 INFO 412498 --- [legram Executor] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1642254753947 2022-01-15 16:52:34.056 INFO 412498 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: faKjxP6CTvGFeeVKJw 2022-01-15 16:54:01.115 INFO 412498 --- [legram Executor] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting... 2022-01-15 16:54:01.188 INFO 412498 --- [legram Executor] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.
- Как мы видим, сообщения отправленные Боту появились в БД

- Открыв кондуктор, перейдите во вкладку topics, после нажимаем на наш топик users

- Далее во вкладке нашего топика нажимаем на кнопку CONSUME DATA

- В открывшемся окне, ставим такие же настройки (самая важная из них это Start From - указывает, с какого момента показывать сообщения в Kafka, наша настройка - показывает все сообщения, включая отправленные ранее)

- Вот и все, теперь мы убедились, что сообщения благополучно прилетели в Kafka, записались в БД и не вызвали ошибок в приложении

Ну что же, большое всем спасибо за время, потраченное на прочтение данной статьи, жду вас во второй части этого туториала, где мы используем Consumer Kafka, с помощью которого будем обрабатывать прилетающие сообщения.
