Начало
Давненько я ничего не публиковал на Хабре — пора это исправлять.
В этот раз хочу поделиться темой, которая кажется простой, но на деле вызывает интерес у многих разработчиков и системных администраторов: как создать свою легковесную систему планирования задач на Python. Что-то вроде мини-аналога cron, но под свои задачи и со своими фишками.
Ведь часто бывает так: хочется, чтобы какие-то проверки или скрипты запускались в определённое время — например, в обеденный перерыв у сотрудников можно поставить автоматическую проверку всех машин на наличие вредоносного ПО. Или наоборот — распределить рутинные проверки так, чтобы они не мешали основной работе.
Сегодня я расскажу:
1.Этап: Теория
2.Этап: Теоретическая разработка такой программы
3.Этап: Техническая разработка ПО
4.Этап: Итоги
Итак, поехали!
1.Этап: Теория
Что такое планирование задач и зачем оно нужно?
В самой основе планирование задач – это механизм, который позволяет запустить функцию, скрипт или команду не прямо сейчас, а позже, основываясь на некотором правиле:
В определенное время (например, "каждый день в 3:00 ночи").
Через определенный интервал (например, "каждые 5 минут").
В определенную дату и время (например, "31 декабря 2023 года в 23:59").
При наступлении определенного события (хотя это уже более продвинутые планировщики).
Зачем это нужно?
Автоматизация: Избавление от рутинных, повторяющихся действий.
Выполнение задач вне рабочего времени: Запуск ресурсоемких процессов (обработка данных, отчеты) ночью, когда нагрузка на систему минимальна.
Периодические проверки: Мониторинг состояния, проверка обновлений.
Отложенное выполнение: Запуск задач, которые зависят от готовности данных или наступления определенного момента.
Теория под капотом: Как работают планировщики?
Представим, что у нас есть список задач, каждая из которых должна выполниться по своему расписанию. Как планировщик решает, когда и что запускать? Существует несколько базовых подходов:
Polling (Опрос): Самый простой метод. Планировщик постоянно (или через короткие промежутки времени) проверяет список всех задач. Для каждой задачи он смотрит: "Пришло ли время ее выполнять?". Если да, он запускает задачу.
Плюсы: Простота реализации.
Минусы: Неэффективность. Если задач много или интервал проверки слишком частый, это может потреблять много CPU. Если интервал проверки слишком редкий, задачи могут запускаться с опозданием. Также может быть проблема с "busy-waiting" (активное ожидание), если используется простой цикл while True.
Event-Driven (На основе событий/таймеров): Более изящный подход. Вместо постоянного опроса, планировщик вычисляет время до следующей запланированной задачи. Затем он "засыпает" (блокируется) на это время. Когда таймер срабатывает, планировщик просыпается, выполняет задачу и вычисляет время до следующей задачи (возможно, это та же задача, если она периодическая, или другая).
Плюсы: Эффективность. Планировщик не тратит ресурсы впустую, когда нет задач для выполнения. Более точное планирование.
Минусы: Чуть сложнее в реализации, нужно правильно управлять таймерами.
Использование системных планировщиков: Делегирование работы ОС (cron, systemd timers, Windows Task Scheduler). В этом случае ваше приложение или скрипт просто запускается самой операционной системой по расписанию.
Плюсы: Надежность (системные планировщики отказоустойчивы), не нужно писать свою логику расписаний.
Минусы: Меньше гибкости (сложно передавать контекст выполнения, обмениваться данными между запусками), нет централизованного управления из вашего приложения.
Выполнение задач:
Что происходит, когда приходит время запустить задачу?
Последовательное выполнение: Планировщик запускает задачу и ждет ее завершения, прежде чем перейти к проверке или запуску следующих задач.
Проблема: Если одна задача зависнет или выполняется долго, это заблокирует выполнение всех остальных задач.
Параллельное выполнение (в потоках или процессах): Планировщик запускает задачу в отдельном потоке (thread) или процессе. Сам планировщик при этом продолжает работать, проверяя и запуская другие задачи.
Плюсы: Долго выполняющиеся задачи не блокируют планировщик и другие задачи.
Минусы: Нужно управлять параллелизмом, избегать состояния гонки, следить за потреблением ресурсов (слишком много потоков/процессов могут нагрузить систему).
Для нашего простого планировщика на Python мы выберем комбинацию: polling (простой цикл проверки) для определения, когда запускать задачи, и параллельное выполнение в потоках (threading), чтобы сами задачи не блокировали главный цикл планировщика. Это хороший баланс между простотой и практичностью для базовых нужд.
Лично по своему опыту скажу: когда сталкиваешься с подобной теорией впервые, всё кажется сложным и запутанным. Но стоит один раз вникнуть в базовые принципы, и перед вами открывается огромный простор — можно строить целые центры автоматизации процессов.
2.Этап: Алгоритм нашей ПО
Наш планировщик будет уметь:
Добавлять задачи.
Запускать задачи по двум типам расписания:
С заданной периодичностью (интервал в секундах).
Один раз в определенное время.
Выполнять задачи в отдельных потоках.
Работать в одном процессе.
Он НЕ будет уметь (что отличает его от "взрослых" решений):
Сохранять список задач между перезапусками (нет персистентности).
Иметь гибкий синтаксис расписаний (как cron).
Обрабатывать ошибки выполнения задач.
Распределяться на несколько машин.
Учитывать часовые пояса.
Это будет именно простой планировщик для понимания принципов и выполнения базовых локальных задач.
Как же выглядит его алгоритм?

3.Этап: Техническая разработка ПО
Для создания нашего планировщика задач мы будем использовать только стандартные средства Python, без сторонних зависимостей. Это делает проект максимально лёгким и переносимым.
Нам понадобятся:
Python (подойдёт версия 3.8 и выше).
Библиотеки стандартной библиотеки Python:
datetime
uuid
threading
time
Давайте чуть подробнее рассмотрим две ключевые библиотеки, которые сыграют важную роль в работе нашего планировщика:
datetime
Модуль datetime
— это стандартный инструмент для работы с датой и временем в Python.
Он позволяет:
получать текущую дату и время (
datetime.datetime.now
()
),выполнять арифметику дат (например, прибавлять интервалы),
сравнивать моменты времени,
форматировать дату и время в разные строки.
Почему нам важен datetime
?
Мы будем с его помощью определять, когда пора запускать задачи — сравнивая текущее время с временем, назначенным для задачи.
uuid
Модуль uuid
позволяет генерировать уникальные идентификаторы.
UUID расшифровывается как Universally Unique Identifier.
Почему он нужен в нашем проекте?
Каждой задаче мы будем присваивать уникальный ID. Это удобно для:
внутренней идентификации задач,
безопасного удаления или поиска конкретной задачи,
предотвращения конфликтов между задачами.
Мы будем использовать uuid.uuid4()
, чтобы создавать случайные уникальные идентификаторы.
Теперь, когда все подготовительные шаги пройдены, можно переходить к написанию первой версии планировщика.
import threading
import time
from datetime import datetime
import uuid
class Task:
"""Класс для представления задачи"""
def __init__(self, name, function, execution_time=None, interval=None):
self.id = str(uuid.uuid4())
self.name = name
self.function = function
self.execution_time = execution_time
self.interval = interval
self.is_running = False
self.is_completed = False
self.thread = None
def should_run(self):
"""Проверяет, должна ли задача быть запущена"""
if self.is_completed and not self.interval:
return False
current_time = datetime.now()
if self.execution_time and current_time >= self.execution_time:
if self.interval:
# Обновляем время следующего запуска для периодических задач
while current_time >= self.execution_time:
self.execution_time = datetime.fromtimestamp(
self.execution_time.timestamp() + self.interval
)
return True
return False
def run(self):
"""Запускает задачу в отдельном потоке"""
if self.is_running:
return
self.is_running = True
self.thread = threading.Thread(target=self._execute)
self.thread.daemon = True
self.thread.start()
def _execute(self):
"""Выполняет функцию задачи"""
try:
print(f"[{datetime.now()}] Выполняется задача: {self.name}")
self.function()
print(f"[{datetime.now()}] Задача {self.name} завершена")
except Exception as e:
print(f"[{datetime.now()}] Ошибка при выполнении задачи {self.name}: {e}")
finally:
self.is_running = False
if not self.interval:
self.is_completed = True
class TaskScheduler:
"""Планировщик задач"""
def __init__(self, polling_interval=1):
"""
Инициализация планировщика
:param polling_interval: Интервал проверки задач в секундах
"""
self.tasks = []
self.polling_interval = polling_interval
self.is_running = False
self.scheduler_thread = None
def add_task(self, task):
"""Добавляет задачу в список задач"""
self.tasks.append(task)
print(f"Задача '{task.name}' добавлена в планировщик")
return task.id
def schedule_task(self, name, function, execution_time=None, interval=None):
"""
Создает и добавляет новую задачу
:param name: Название задачи
:param function: Функция для выполнения
:param execution_time: Время выполнения (datetime объект)
:param interval: Интервал повторения в секундах
:return: ID задачи
"""
task = Task(name, function, execution_time, interval)
return self.add_task(task)
def remove_task(self, task_id):
"""Удаляет задачу по ID"""
for i, task in enumerate(self.tasks):
if task.id == task_id:
self.tasks.pop(i)
print(f"Задача с ID {task_id} удалена")
return True
return False
def start(self):
"""Запускает планировщик задач"""
if self.is_running:
print("Планировщик уже запущен")
return
self.is_running = True
self.scheduler_thread = threading.Thread(target=self._main_loop)
self.scheduler_thread.daemon = True
self.scheduler_thread.start()
print("Планировщик задач запущен")
def stop(self):
"""Останавливает планировщик задач"""
self.is_running = False
if self.scheduler_thread:
self.scheduler_thread.join(timeout=self.polling_interval*2)
print("Планировщик задач остановлен")
def _main_loop(self):
"""Основной цикл планировщика"""
while self.is_running:
# Проверяем и запускаем задачи, которые должны быть выполнены
for task in self.tasks:
if task.should_run():
task.run()
# Удаляем завершенные задачи
self.tasks = [task for task in self.tasks if not task.is_completed]
# Минимальная задержка в основном цикле
time.sleep(self.polling_interval)
# Пример использования
if __name__ == "__main__":
def task1():
print("Выполняется задача 1")
time.sleep(2) # Имитация работы
def task2():
print("Выполняется задача 2")
time.sleep(1) # Имитация работы
def task3():
print("Выполняется периодическая задача")
# Создаем планировщик с интервалом опроса 0.5 секунды
scheduler = TaskScheduler(polling_interval=0.5)
# Добавляем задачи
# Задача, которая выполнится через 3 секунды
scheduler.schedule_task(
"Отложенная задача",
task1,
execution_time=datetime.now().replace(microsecond=0) +
datetime.timedelta(seconds=3)
)
# Задача, которая выполнится сразу
scheduler.schedule_task(
"Мгновенная задача",
task2,
execution_time=datetime.now()
)
# Периодическая задача, которая будет выполняться каждые 5 секунд
scheduler.schedule_task(
"Периодическая задача",
task3,
execution_time=datetime.now(),
interval=5
)
# Запускаем планировщик
scheduler.start()
try:
# Даем планировщику поработать 30 секунд
time.sleep(30)
except KeyboardInterrupt:
print("Программа прервана пользователем")
finally:
# Останавливаем планировщик
scheduler.stop()
print("Программа завершена")
И так теперь смотрим что у на по итогу получилось?

Итак, наш базовый планировщик работает: задачи добавляются, запускаются по расписанию, всё хорошо.
Но возникает логичный вопрос:
Как это может помочь системным и сетевым администраторам в реальной жизни?
Ответ прост: используя планировщик, можно автоматизировать рутинные задачи, которые раньше приходилось выполнять вручную. Например:
Мониторинг использования ресурсов (CPU, память, диск).
Проверка доступности серверов и сервисов.
Запуск диагностических скриптов или утилит.
Автоматическое выполнение профилактических процедур.
Чтобы добавить такие возможности, нам понадобятся ещё две мощные библиотеки:
psutil
psutil
(process and system utilities) — сторонняя библиотека для Python, которая позволяет получать информацию о состоянии системы:
загрузка процессора,
использование оперативной памяти,
статистика по дискам и сетевым интерфейсам,
процессы, запущенные в системе.
Почему она важна?
С её помощью можно написать задачи, которые будут следить за состоянием машины и вовремя предупреждать о проблемах.
subprocess
subprocess
— стандартный модуль Python для работы с внешними процессами.
Позволяет:
запускать системные команды,
взаимодействовать с выводом консоли,
обрабатывать ошибки при запуске.
Почему это важно?
С помощью subprocess
можно, например:
запустить скрипт антивирусной проверки,
проверить доступность сервера через
ping
,перезапустить службы,
выполнить резервное копирование через системные утилиты.
Теперь давайте усложним наш планировщик и добавим несколько реальных примеров использования этих библиотек.
import threading
import time
from datetime import datetime, timedelta
import uuid
import psutil # Для мониторинга системных ресурсов
import subprocess # Для выполнения системных команд
class Task:
"""Класс для представления задачи"""
def __init__(self, name, function, execution_time=None, interval=None):
self.id = str(uuid.uuid4())
self.name = name
self.function = function
self.execution_time = execution_time
self.interval = interval
self.is_running = False
self.is_completed = False
self.thread = None
def should_run(self):
"""Проверяет, должна ли задача быть запущена"""
if self.is_completed and not self.interval:
return False
current_time = datetime.now()
if self.execution_time and current_time >= self.execution_time:
if self.interval:
# Обновляем время следующего запуска для периодических задач
while current_time >= self.execution_time:
self.execution_time = datetime.fromtimestamp(
self.execution_time.timestamp() + self.interval
)
return True
return False
def run(self):
"""Запускает задачу в отдельном потоке"""
if self.is_running:
return
self.is_running = True
self.thread = threading.Thread(target=self._execute)
self.thread.daemon = True
self.thread.start()
def _execute(self):
"""Выполняет функцию задачи"""
try:
print(f"[{datetime.now()}] Выполняется задача: {self.name}")
self.function()
print(f"[{datetime.now()}] Задача {self.name} завершена")
except Exception as e:
print(f"[{datetime.now()}] Ошибка при выполнении задачи {self.name}: {e}")
finally:
self.is_running = False
if not self.interval:
self.is_completed = True
class TaskScheduler:
"""Планировщик задач"""
def __init__(self, polling_interval=1):
"""
Инициализация планировщика
:param polling_interval: Интервал проверки задач в секундах
"""
self.tasks = []
self.polling_interval = polling_interval
self.is_running = False
self.scheduler_thread = None
def add_task(self, task):
"""Добавляет задачу в список задач"""
self.tasks.append(task)
print(f"Задача '{task.name}' добавлена в планировщик")
return task.id
def schedule_task(self, name, function, execution_time=None, interval=None):
"""
Создает и добавляет новую задачу
:param name: Название задачи
:param function: Функция для выполнения
:param execution_time: Время выполнения (datetime объект)
:param interval: Интервал повторения в секундах
:return: ID задачи
"""
task = Task(name, function, execution_time, interval)
return self.add_task(task)
def remove_task(self, task_id):
"""Удаляет задачу по ID"""
for i, task in enumerate(self.tasks):
if task.id == task_id:
self.tasks.pop(i)
print(f"Задача с ID {task_id} удалена")
return True
return False
def start(self):
"""Запускает планировщик задач"""
if self.is_running:
print("Планировщик уже запущен")
return
self.is_running = True
self.scheduler_thread = threading.Thread(target=self._main_loop)
self.scheduler_thread.daemon = True
self.scheduler_thread.start()
print("Планировщик задач запущен")
def stop(self):
"""Останавливает планировщик задач"""
self.is_running = False
if self.scheduler_thread:
self.scheduler_thread.join(timeout=self.polling_interval*2)
print("Планировщик задач остановлен")
def _main_loop(self):
"""Основной цикл планировщика"""
while self.is_running:
# Проверяем и запускаем задачи, которые должны быть выполнены
for task in self.tasks:
if task.should_run():
task.run()
# Удаляем завершенные задачи
self.tasks = [task for task in self.tasks if not task.is_completed]
# Минимальная задержка в основном цикле
time.sleep(self.polling_interval)
# Функции для мониторинга системы
def monitor_cpu_usage():
"""Мониторинг загрузки ЦП компьютера"""
cpu_percent = psutil.cpu_percent(interval=1)
print(f"Текущая загрузка ЦП: {cpu_percent}%")
# Дополнительная информация о ЦП
cpu_count = psutil.cpu_count(logical=False)
cpu_count_logical = psutil.cpu_count(logical=True)
print(f"Количество физических ядер ЦП: {cpu_count}")
print(f"Количество логических ядер ЦП: {cpu_count_logical}")
# Информация о загрузке каждого ядра
per_cpu = psutil.cpu_percent(interval=1, percpu=True)
for i, percent in enumerate(per_cpu):
print(f"Загрузка ядра {i}: {percent}%")
def monitor_network_packets():
"""Мониторинг сетевых пакетов пользователя"""
try:
# Получаем информацию о сетевых интерфейсах
net_io = psutil.net_io_counters(pernic=True)
print("Информация о сетевых пакетах:")
for interface, stats in net_io.items():
print(f"\nИнтерфейс: {interface}")
print(f"Отправлено байт: {stats.bytes_sent}")
print(f"Получено байт: {stats.bytes_recv}")
print(f"Отправлено пакетов: {stats.packets_sent}")
print(f"Получено пакетов: {stats.packets_recv}")
print(f"Ошибки при отправке: {stats.errin}")
print(f"Ошибки при получении: {stats.errout}")
print(f"Пакеты отброшены при отправке: {stats.dropin}")
print(f"Пакеты отброшены при получении: {stats.dropout}")
# Дополнительно можно использовать netstat для Windows
print("\nАктивные сетевые соединения:")
result = subprocess.run(["netstat", "-n"], capture_output=True, text=True)
connections = result.stdout.split('\n')
# Выводим только первые 10 строк, чтобы не перегружать вывод
for line in connections[:10]:
print(line)
except Exception as e:
print(f"Ошибка при мониторинге сетевых пакетов: {e}")
# Пример использования
if __name__ == "__main__":
# Создаем планировщик с интервалом опроса 0.5 секунды
scheduler = TaskScheduler(polling_interval=0.5)
# Добавляем задачи
# Задача 1: Мониторинг загрузки ЦП (каждые 5 секунд)
scheduler.schedule_task(
"Мониторинг загрузки ЦП",
monitor_cpu_usage,
execution_time=datetime.now(),
interval=5
)
# Задача 2: Мониторинг сетевых пакетов (каждые 10 секунд)
scheduler.schedule_task(
"Мониторинг сетевых пакетов",
monitor_network_packets,
execution_time=datetime.now() + timedelta(seconds=2),
interval=10
)
# Запускаем планировщик
scheduler.start()
try:
# Даем планировщику поработать 60 секунд
print("Планировщик будет работать 60 секунд. Нажмите Ctrl+C для досрочного завершения.")
time.sleep(60)
except KeyboardInterrupt:
print("Программа прервана пользователем")
finally:
# Останавливаем планировщик
scheduler.stop()
print("Программа завершена")
и что мы получаем в ответ

4.Этап: Итоги
Что можно сделать на основе такого простого планировщика задач?
Да практически всё, что угодно!
Реализовать удалённую доставку задач для корпоративных компьютеров.
Построить центры автоматизации для системных администраторов.
Развить проект до полноценного решения для управления инфраструктурой.
Или даже запустить коммерческий продукт на базе этой идеи.
Код, который мы написали, — это фундамент, на котором можно строить сложные и надёжные системы. Он лёгкий, расширяемый и даёт отличную отправную точку для собственных экспериментов и разработок.
Спасибо, что дочитали статью до конца! Надеюсь, материал был для вас полезным и вдохновляющим.