Привет, Хабр!
Сегодня поговорим о паттерне Poison Pill и его реализацию в Java.
Poison Pill, или «ядовитая пилюля», — это шаблон, используемый для мягкой остановки потока или процесса. Суть его заключается в отправке специального, заранее определенного сообщения, которое сигнализирует о необходимости завершения работы. Это сообщение обрабатывается в рамках нормального потока сообщений и позволяет потоку корректно завершить работу, освободить ресурсы и закрыть все активные соединения.
Принцип работы
Архитектура паттерна Poison Pill включает несколько компонентов: Message, Producer, Consumer и MessageQueue.
Message который определяет структуру сообщений. Сообщения могут включать различные заголовки и тело сообщения. Пример реализации — SimpleMessage:
MessageQueue объединяет точки публикации MqPublishPoint и подписки MqSubscribePoint. Он представляет собой очередь, через которую сообщения передаются от производителей к потребителям. Пример реализации — SimpleMessageQueue, использующий BlockingQueue для хранения сообщений.
Producer создает сообщения и помещает их в очередь. Т. е. когда производитель завершает свою работу, он отправляет сообщение Poison Pill, чтобы уведомить потребителей о необходимости остановки.
Consumer является потребителем и извлекает сообщения из очереди и обрабатывает их. Если потребитель получает Poison Pill, он завершает свою работу
В целом, принцип работы Poison Pill основан на циклическом взаимодействии между производителями и потребителями через очередь сообщений. Производители генерируют сообщения и помещают их в очередь. Каждое сообщение обрабатывается потребителями, которые извлекают его из очереди. Когда производитель завершает работу и отправляет Poison Pill, потребитель, получив этот сигнал, прекращает обработку и завершает выполнение.
Реализация в Java
Определим интерфейс Message и класс SimpleMessage:
public interface Message { void addHeader(String key, String value); String getHeader(String key); String getBody(); void setBody(String body); } public class SimpleMessage implements Message { private Map<String, String> headers = new HashMap<>(); private String body; @Override public void addHeader(String key, String value) { headers.put(key, value); } @Override public String getHeader(String key) { return headers.get(key); } @Override public void setBody(String body) { this.body = body; } @Override public String getBody() { return body; } }
Определим MessageQueue и его реализации SimpleMessageQueue:
import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public interface MessageQueue extends MqPublishPoint, MqSubscribePoint {} public class SimpleMessageQueue implements MessageQueue { private BlockingQueue<Message> queue; public SimpleMessageQueue(int capacity) { this.queue = new LinkedBlockingQueue<>(capacity); } @Override public void put(Message message) throws InterruptedException { queue.put(message); } @Override public Message take() throws InterruptedException { return queue.take(); } }
Реализация Producer и Consumer:
public class Producer implements Runnable { private final MessageQueue queue; private volatile boolean isStopped; public Producer(MessageQueue queue) { this.queue = queue; } @Override public void run() { try { while (!isStopped) { SimpleMessage message = new SimpleMessage(); message.setBody("Important data"); queue.put(message); Thread.sleep(1000); // Имитация работы } queue.put(new PoisonPillMessage()); // Отправка Poison Pill } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } public void stop() { isStopped = true; } } public class Consumer implements Runnable { private final MessageQueue queue; public Consumer(MessageQueue queue) { this.queue = queue; } @Override public void run() { try { Message message; while (true) { message = queue.take(); if (message instanceof PoisonPillMessage) break; // Остановка если Poison Pill processMessage(message); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } private void processMessage(Message message) { System.out.println("Processed: " + message.getBody()); } } public class PoisonPillMessage extends SimpleMessage { public PoisonPillMessage() { setBody("POISON_PILL"); } }
Запуск и координация потоков:
public class Main { public static void main(String[] args) { MessageQueue queue = new SimpleMessageQueue(10); Producer producer = new Producer(queue); Consumer consumer = new Consumer(queue); Thread producerThread = new Thread(producer); Thread consumerThread = new Thread(consumer); producerThread.start(); consumerThread.start(); try { Thread.sleep(5000); // позволяем производить сообщения producer.stop(); // останавливаем производителя producerThread.join(); consumerThread.join(); } catch (InterruptedException e) { e.printStackTrace(); } } }
Альтернативы
Manual Offset Management — один из подходов к управлению отказами заключается в ручном смещении офсета до первого сообщения после Poison Pill. Так можно получить контроль над тем, какие сообщения будут пропущены, но требует доступа к Kafka или другому брокеру сообщений.
Error‑Handling Deserializers: в случае использования Apache Kafka можно применить специальный десериализатор с обработчиком ошибок. Он пропускает некорректные сообщения, тем самым избегая зацикливания при обработке данных, и передаёт сообщения об ошибках в спец. лог.
С конфигурацией Spring Kafka можно использовать SeekToCurrentErrorHandler для управления ошибками при десериализации сообщений. Обработчик смещает офсет в точку после неудачного сообщения, избегая блокировки обработки последующих сообщений.
Используя паттерн команды, можно создать единый интерфейс для различных типов задач и использовать общий механизм завершения их выполнения, что также можно адаптировать под конкретные сценарии.
А можно воспользоваться просто прямым завершением потоков. Это можно реализовать через управление потоком через класс Thread:
public class Main { public static void main(String[] args) { Thread worker1 = new Thread(new Worker()); Thread worker2 = new Thread(new Worker()); worker1.start(); worker2.start(); try { Thread.sleep(1000); // даем время потокам поработать } catch (InterruptedException e) { e.printStackTrace(); } // прямое завершение потоков worker1.interrupt(); worker2.interrupt(); System.out.println("Потоки были прямо завершены."); } } class Worker implements Runnable { @Override public void run() { while (!Thread.currentThread().isInterrupted()) { try { System.out.println("Работа потока " + Thread.currentThread().getName()); Thread.sleep(100); } catch (InterruptedException e) { System.out.println("Поток " + Thread.currentThread().getName() + " прерван."); Thread.currentThread().interrupt(); // рекомендуется восстановить статус прерванного состояния } } } }
Поток выполняет свою работу в цикле до тех пор, пока не будет прерван. Метод interrupt() используется для отправки запроса на прерывание потоку, который проверяет свое состояние с помощью Thread.currentThread().isInterrupted() .
В заключение напомню про открытый урок, который пройдет сегодня вечером в OTUS: «Переопределение, скрытие и передекларация в Java». Записаться можно по ссылке.
