Как стать автором
Обновить

Разработка собственной простой системы управления задачами по расписанию на Python

Уровень сложностиСредний
Время на прочтение12 мин
Количество просмотров2.5K

Начало

Давненько я ничего не публиковал на Хабре — пора это исправлять.

В этот раз хочу поделиться темой, которая кажется простой, но на деле вызывает интерес у многих разработчиков и системных администраторов: как создать свою легковесную систему планирования задач на Python. Что-то вроде мини-аналога cron, но под свои задачи и со своими фишками.

Ведь часто бывает так: хочется, чтобы какие-то проверки или скрипты запускались в определённое время — например, в обеденный перерыв у сотрудников можно поставить автоматическую проверку всех машин на наличие вредоносного ПО. Или наоборот — распределить рутинные проверки так, чтобы они не мешали основной работе.

Сегодня я расскажу:

  • 1.Этап: Теория

  • 2.Этап: Теоретическая разработка такой программы

  • 3.Этап: Техническая разработка ПО

  • 4.Этап: Итоги

Итак, поехали!

1.Этап: Теория

Что такое планирование задач и зачем оно нужно?

В самой основе планирование задач – это механизм, который позволяет запустить функцию, скрипт или команду не прямо сейчас, а позже, основываясь на некотором правиле:

  • В определенное время (например, "каждый день в 3:00 ночи").

  • Через определенный интервал (например, "каждые 5 минут").

  • В определенную дату и время (например, "31 декабря 2023 года в 23:59").

  • При наступлении определенного события (хотя это уже более продвинутые планировщики).

Зачем это нужно?

  • Автоматизация: Избавление от рутинных, повторяющихся действий.

  • Выполнение задач вне рабочего времени: Запуск ресурсоемких процессов (обработка данных, отчеты) ночью, когда нагрузка на систему минимальна.

  • Периодические проверки: Мониторинг состояния, проверка обновлений.

  • Отложенное выполнение: Запуск задач, которые зависят от готовности данных или наступления определенного момента.

Теория под капотом: Как работают планировщики?

Представим, что у нас есть список задач, каждая из которых должна выполниться по своему расписанию. Как планировщик решает, когда и что запускать? Существует несколько базовых подходов:

  1. Polling (Опрос): Самый простой метод. Планировщик постоянно (или через короткие промежутки времени) проверяет список всех задач. Для каждой задачи он смотрит: "Пришло ли время ее выполнять?". Если да, он запускает задачу.

    • Плюсы: Простота реализации.

    • Минусы: Неэффективность. Если задач много или интервал проверки слишком частый, это может потреблять много CPU. Если интервал проверки слишком редкий, задачи могут запускаться с опозданием. Также может быть проблема с "busy-waiting" (активное ожидание), если используется простой цикл while True.

  2. Event-Driven (На основе событий/таймеров): Более изящный подход. Вместо постоянного опроса, планировщик вычисляет время до следующей запланированной задачи. Затем он "засыпает" (блокируется) на это время. Когда таймер срабатывает, планировщик просыпается, выполняет задачу и вычисляет время до следующей задачи (возможно, это та же задача, если она периодическая, или другая).

    • Плюсы: Эффективность. Планировщик не тратит ресурсы впустую, когда нет задач для выполнения. Более точное планирование.

    • Минусы: Чуть сложнее в реализации, нужно правильно управлять таймерами.

  3. Использование системных планировщиков: Делегирование работы ОС (cron, systemd timers, Windows Task Scheduler). В этом случае ваше приложение или скрипт просто запускается самой операционной системой по расписанию.

    • Плюсы: Надежность (системные планировщики отказоустойчивы), не нужно писать свою логику расписаний.

    • Минусы: Меньше гибкости (сложно передавать контекст выполнения, обмениваться данными между запусками), нет централизованного управления из вашего приложения.

Выполнение задач:
Что происходит, когда приходит время запустить задачу?

  • Последовательное выполнение: Планировщик запускает задачу и ждет ее завершения, прежде чем перейти к проверке или запуску следующих задач.

    • Проблема: Если одна задача зависнет или выполняется долго, это заблокирует выполнение всех остальных задач.

  • Параллельное выполнение (в потоках или процессах): Планировщик запускает задачу в отдельном потоке (thread) или процессе. Сам планировщик при этом продолжает работать, проверяя и запуская другие задачи.

    • Плюсы: Долго выполняющиеся задачи не блокируют планировщик и другие задачи.

    • Минусы: Нужно управлять параллелизмом, избегать состояния гонки, следить за потреблением ресурсов (слишком много потоков/процессов могут нагрузить систему).

Для нашего простого планировщика на Python мы выберем комбинацию: polling (простой цикл проверки) для определения, когда запускать задачи, и параллельное выполнение в потоках (threading), чтобы сами задачи не блокировали главный цикл планировщика. Это хороший баланс между простотой и практичностью для базовых нужд.
Лично по своему опыту скажу: когда сталкиваешься с подобной теорией впервые, всё кажется сложным и запутанным. Но стоит один раз вникнуть в базовые принципы, и перед вами открывается огромный простор — можно строить целые центры автоматизации процессов.

2.Этап: Алгоритм нашей ПО

Наш планировщик будет уметь:

  • Добавлять задачи.

  • Запускать задачи по двум типам расписания:

    • С заданной периодичностью (интервал в секундах).

    • Один раз в определенное время.

  • Выполнять задачи в отдельных потоках.

  • Работать в одном процессе.

Он НЕ будет уметь (что отличает его от "взрослых" решений):

  • Сохранять список задач между перезапусками (нет персистентности).

  • Иметь гибкий синтаксис расписаний (как cron).

  • Обрабатывать ошибки выполнения задач.

  • Распределяться на несколько машин.

  • Учитывать часовые пояса.

Это будет именно простой планировщик для понимания принципов и выполнения базовых локальных задач.

Как же выглядит его алгоритм?

Алгоритм данной программы
Алгоритм данной программы

3.Этап: Техническая разработка ПО

Для создания нашего планировщика задач мы будем использовать только стандартные средства Python, без сторонних зависимостей. Это делает проект максимально лёгким и переносимым.

Нам понадобятся:

  • Python (подойдёт версия 3.8 и выше).

  • Библиотеки стандартной библиотеки Python:

    • datetime

    • uuid

    • threading

    • time

Давайте чуть подробнее рассмотрим две ключевые библиотеки, которые сыграют важную роль в работе нашего планировщика:

datetime

Модуль datetime — это стандартный инструмент для работы с датой и временем в Python.
Он позволяет:

  • получать текущую дату и время (datetime.datetime.now()),

  • выполнять арифметику дат (например, прибавлять интервалы),

  • сравнивать моменты времени,

  • форматировать дату и время в разные строки.

Почему нам важен datetime?
Мы будем с его помощью определять, когда пора запускать задачи — сравнивая текущее время с временем, назначенным для задачи.

uuid

Модуль uuid позволяет генерировать уникальные идентификаторы.
UUID расшифровывается как Universally Unique Identifier.

Почему он нужен в нашем проекте?
Каждой задаче мы будем присваивать уникальный ID. Это удобно для:

  • внутренней идентификации задач,

  • безопасного удаления или поиска конкретной задачи,

  • предотвращения конфликтов между задачами.

Мы будем использовать uuid.uuid4(), чтобы создавать случайные уникальные идентификаторы.
Теперь, когда все подготовительные шаги пройдены, можно переходить к написанию первой версии планировщика.

import threading
import time
from datetime import datetime
import uuid

class Task:
    """Класс для представления задачи"""
    def __init__(self, name, function, execution_time=None, interval=None):
        self.id = str(uuid.uuid4())
        self.name = name
        self.function = function
        self.execution_time = execution_time
        self.interval = interval
        self.is_running = False
        self.is_completed = False
        self.thread = None

    def should_run(self):
        """Проверяет, должна ли задача быть запущена"""
        if self.is_completed and not self.interval:
            return False
            
        current_time = datetime.now()
        
        if self.execution_time and current_time >= self.execution_time:
            if self.interval:
                # Обновляем время следующего запуска для периодических задач
                while current_time >= self.execution_time:
                    self.execution_time = datetime.fromtimestamp(
                        self.execution_time.timestamp() + self.interval
                    )
            return True
        return False

    def run(self):
        """Запускает задачу в отдельном потоке"""
        if self.is_running:
            return
            
        self.is_running = True
        self.thread = threading.Thread(target=self._execute)
        self.thread.daemon = True
        self.thread.start()

    def _execute(self):
        """Выполняет функцию задачи"""
        try:
            print(f"[{datetime.now()}] Выполняется задача: {self.name}")
            self.function()
            print(f"[{datetime.now()}] Задача {self.name} завершена")
        except Exception as e:
            print(f"[{datetime.now()}] Ошибка при выполнении задачи {self.name}: {e}")
        finally:
            self.is_running = False
            if not self.interval:
                self.is_completed = True


class TaskScheduler:
    """Планировщик задач"""
    def __init__(self, polling_interval=1):
        """
        Инициализация планировщика
        :param polling_interval: Интервал проверки задач в секундах
        """
        self.tasks = []
        self.polling_interval = polling_interval
        self.is_running = False
        self.scheduler_thread = None

    def add_task(self, task):
        """Добавляет задачу в список задач"""
        self.tasks.append(task)
        print(f"Задача '{task.name}' добавлена в планировщик")
        return task.id

    def schedule_task(self, name, function, execution_time=None, interval=None):
        """
        Создает и добавляет новую задачу
        :param name: Название задачи
        :param function: Функция для выполнения
        :param execution_time: Время выполнения (datetime объект)
        :param interval: Интервал повторения в секундах
        :return: ID задачи
        """
        task = Task(name, function, execution_time, interval)
        return self.add_task(task)

    def remove_task(self, task_id):
        """Удаляет задачу по ID"""
        for i, task in enumerate(self.tasks):
            if task.id == task_id:
                self.tasks.pop(i)
                print(f"Задача с ID {task_id} удалена")
                return True
        return False

    def start(self):
        """Запускает планировщик задач"""
        if self.is_running:
            print("Планировщик уже запущен")
            return

        self.is_running = True
        self.scheduler_thread = threading.Thread(target=self._main_loop)
        self.scheduler_thread.daemon = True
        self.scheduler_thread.start()
        print("Планировщик задач запущен")

    def stop(self):
        """Останавливает планировщик задач"""
        self.is_running = False
        if self.scheduler_thread:
            self.scheduler_thread.join(timeout=self.polling_interval*2)
        print("Планировщик задач остановлен")

    def _main_loop(self):
        """Основной цикл планировщика"""
        while self.is_running:
            # Проверяем и запускаем задачи, которые должны быть выполнены
            for task in self.tasks:
                if task.should_run():
                    task.run()
            
            # Удаляем завершенные задачи
            self.tasks = [task for task in self.tasks if not task.is_completed]
            
            # Минимальная задержка в основном цикле
            time.sleep(self.polling_interval)


# Пример использования
if __name__ == "__main__":
    def task1():
        print("Выполняется задача 1")
        time.sleep(2)  # Имитация работы
        
    def task2():
        print("Выполняется задача 2")
        time.sleep(1)  # Имитация работы
        
    def task3():
        print("Выполняется периодическая задача")
    
    # Создаем планировщик с интервалом опроса 0.5 секунды
    scheduler = TaskScheduler(polling_interval=0.5)
    
    # Добавляем задачи
    # Задача, которая выполнится через 3 секунды
    scheduler.schedule_task(
        "Отложенная задача", 
        task1, 
        execution_time=datetime.now().replace(microsecond=0) + 
                       datetime.timedelta(seconds=3)
    )
    
    # Задача, которая выполнится сразу
    scheduler.schedule_task(
        "Мгновенная задача", 
        task2, 
        execution_time=datetime.now()
    )
    
    # Периодическая задача, которая будет выполняться каждые 5 секунд
    scheduler.schedule_task(
        "Периодическая задача", 
        task3, 
        execution_time=datetime.now(), 
        interval=5
    )
    
    # Запускаем планировщик
    scheduler.start()
    
    try:
        # Даем планировщику поработать 30 секунд
        time.sleep(30)
    except KeyboardInterrupt:
        print("Программа прервана пользователем")
    finally:
        # Останавливаем планировщик
        scheduler.stop()
        print("Программа завершена")

И так теперь смотрим что у на по итогу получилось?

Итак, наш базовый планировщик работает: задачи добавляются, запускаются по расписанию, всё хорошо.

Но возникает логичный вопрос:
Как это может помочь системным и сетевым администраторам в реальной жизни?

Ответ прост: используя планировщик, можно автоматизировать рутинные задачи, которые раньше приходилось выполнять вручную. Например:

  • Мониторинг использования ресурсов (CPU, память, диск).

  • Проверка доступности серверов и сервисов.

  • Запуск диагностических скриптов или утилит.

  • Автоматическое выполнение профилактических процедур.

Чтобы добавить такие возможности, нам понадобятся ещё две мощные библиотеки:

psutil

psutil (process and system utilities) — сторонняя библиотека для Python, которая позволяет получать информацию о состоянии системы:

  • загрузка процессора,

  • использование оперативной памяти,

  • статистика по дискам и сетевым интерфейсам,

  • процессы, запущенные в системе.

Почему она важна?
С её помощью можно написать задачи, которые будут следить за состоянием машины и вовремя предупреждать о проблемах.

subprocess

subprocess — стандартный модуль Python для работы с внешними процессами.

Позволяет:

  • запускать системные команды,

  • взаимодействовать с выводом консоли,

  • обрабатывать ошибки при запуске.

Почему это важно?
С помощью subprocess можно, например:

  • запустить скрипт антивирусной проверки,

  • проверить доступность сервера через ping,

  • перезапустить службы,

  • выполнить резервное копирование через системные утилиты.

Теперь давайте усложним наш планировщик и добавим несколько реальных примеров использования этих библиотек.

import threading
import time
from datetime import datetime, timedelta
import uuid
import psutil  # Для мониторинга системных ресурсов
import subprocess  # Для выполнения системных команд

class Task:
    """Класс для представления задачи"""
    def __init__(self, name, function, execution_time=None, interval=None):
        self.id = str(uuid.uuid4())
        self.name = name
        self.function = function
        self.execution_time = execution_time
        self.interval = interval
        self.is_running = False
        self.is_completed = False
        self.thread = None

    def should_run(self):
        """Проверяет, должна ли задача быть запущена"""
        if self.is_completed and not self.interval:
            return False
            
        current_time = datetime.now()
        
        if self.execution_time and current_time >= self.execution_time:
            if self.interval:
                # Обновляем время следующего запуска для периодических задач
                while current_time >= self.execution_time:
                    self.execution_time = datetime.fromtimestamp(
                        self.execution_time.timestamp() + self.interval
                    )
            return True
        return False

    def run(self):
        """Запускает задачу в отдельном потоке"""
        if self.is_running:
            return
            
        self.is_running = True
        self.thread = threading.Thread(target=self._execute)
        self.thread.daemon = True
        self.thread.start()

    def _execute(self):
        """Выполняет функцию задачи"""
        try:
            print(f"[{datetime.now()}] Выполняется задача: {self.name}")
            self.function()
            print(f"[{datetime.now()}] Задача {self.name} завершена")
        except Exception as e:
            print(f"[{datetime.now()}] Ошибка при выполнении задачи {self.name}: {e}")
        finally:
            self.is_running = False
            if not self.interval:
                self.is_completed = True


class TaskScheduler:
    """Планировщик задач"""
    def __init__(self, polling_interval=1):
        """
        Инициализация планировщика
        :param polling_interval: Интервал проверки задач в секундах
        """
        self.tasks = []
        self.polling_interval = polling_interval
        self.is_running = False
        self.scheduler_thread = None

    def add_task(self, task):
        """Добавляет задачу в список задач"""
        self.tasks.append(task)
        print(f"Задача '{task.name}' добавлена в планировщик")
        return task.id

    def schedule_task(self, name, function, execution_time=None, interval=None):
        """
        Создает и добавляет новую задачу
        :param name: Название задачи
        :param function: Функция для выполнения
        :param execution_time: Время выполнения (datetime объект)
        :param interval: Интервал повторения в секундах
        :return: ID задачи
        """
        task = Task(name, function, execution_time, interval)
        return self.add_task(task)

    def remove_task(self, task_id):
        """Удаляет задачу по ID"""
        for i, task in enumerate(self.tasks):
            if task.id == task_id:
                self.tasks.pop(i)
                print(f"Задача с ID {task_id} удалена")
                return True
        return False

    def start(self):
        """Запускает планировщик задач"""
        if self.is_running:
            print("Планировщик уже запущен")
            return

        self.is_running = True
        self.scheduler_thread = threading.Thread(target=self._main_loop)
        self.scheduler_thread.daemon = True
        self.scheduler_thread.start()
        print("Планировщик задач запущен")

    def stop(self):
        """Останавливает планировщик задач"""
        self.is_running = False
        if self.scheduler_thread:
            self.scheduler_thread.join(timeout=self.polling_interval*2)
        print("Планировщик задач остановлен")

    def _main_loop(self):
        """Основной цикл планировщика"""
        while self.is_running:
            # Проверяем и запускаем задачи, которые должны быть выполнены
            for task in self.tasks:
                if task.should_run():
                    task.run()
            
            # Удаляем завершенные задачи
            self.tasks = [task for task in self.tasks if not task.is_completed]
            
            # Минимальная задержка в основном цикле
            time.sleep(self.polling_interval)


# Функции для мониторинга системы
def monitor_cpu_usage():
    """Мониторинг загрузки ЦП компьютера"""
    cpu_percent = psutil.cpu_percent(interval=1)
    print(f"Текущая загрузка ЦП: {cpu_percent}%")
    
    # Дополнительная информация о ЦП
    cpu_count = psutil.cpu_count(logical=False)
    cpu_count_logical = psutil.cpu_count(logical=True)
    print(f"Количество физических ядер ЦП: {cpu_count}")
    print(f"Количество логических ядер ЦП: {cpu_count_logical}")
    
    # Информация о загрузке каждого ядра
    per_cpu = psutil.cpu_percent(interval=1, percpu=True)
    for i, percent in enumerate(per_cpu):
        print(f"Загрузка ядра {i}: {percent}%")

def monitor_network_packets():
    """Мониторинг сетевых пакетов пользователя"""
    try:
        # Получаем информацию о сетевых интерфейсах
        net_io = psutil.net_io_counters(pernic=True)
        
        print("Информация о сетевых пакетах:")
        for interface, stats in net_io.items():
            print(f"\nИнтерфейс: {interface}")
            print(f"Отправлено байт: {stats.bytes_sent}")
            print(f"Получено байт: {stats.bytes_recv}")
            print(f"Отправлено пакетов: {stats.packets_sent}")
            print(f"Получено пакетов: {stats.packets_recv}")
            print(f"Ошибки при отправке: {stats.errin}")
            print(f"Ошибки при получении: {stats.errout}")
            print(f"Пакеты отброшены при отправке: {stats.dropin}")
            print(f"Пакеты отброшены при получении: {stats.dropout}")
        
        # Дополнительно можно использовать netstat для Windows
        print("\nАктивные сетевые соединения:")
        result = subprocess.run(["netstat", "-n"], capture_output=True, text=True)
        connections = result.stdout.split('\n')
        # Выводим только первые 10 строк, чтобы не перегружать вывод
        for line in connections[:10]:
            print(line)
            
    except Exception as e:
        print(f"Ошибка при мониторинге сетевых пакетов: {e}")

# Пример использования
if __name__ == "__main__":
    # Создаем планировщик с интервалом опроса 0.5 секунды
    scheduler = TaskScheduler(polling_interval=0.5)
    
    # Добавляем задачи
    # Задача 1: Мониторинг загрузки ЦП (каждые 5 секунд)
    scheduler.schedule_task(
        "Мониторинг загрузки ЦП", 
        monitor_cpu_usage, 
        execution_time=datetime.now(), 
        interval=5
    )
    
    # Задача 2: Мониторинг сетевых пакетов (каждые 10 секунд)
    scheduler.schedule_task(
        "Мониторинг сетевых пакетов", 
        monitor_network_packets, 
        execution_time=datetime.now() + timedelta(seconds=2), 
        interval=10
    )
    
    # Запускаем планировщик
    scheduler.start()
    
    try:
        # Даем планировщику поработать 60 секунд
        print("Планировщик будет работать 60 секунд. Нажмите Ctrl+C для досрочного завершения.")
        time.sleep(60)
    except KeyboardInterrupt:
        print("Программа прервана пользователем")
    finally:
        # Останавливаем планировщик
        scheduler.stop()
        print("Программа завершена")

и что мы получаем в ответ

4.Этап: Итоги

Что можно сделать на основе такого простого планировщика задач?
Да практически всё, что угодно!

  • Реализовать удалённую доставку задач для корпоративных компьютеров.

  • Построить центры автоматизации для системных администраторов.

  • Развить проект до полноценного решения для управления инфраструктурой.

  • Или даже запустить коммерческий продукт на базе этой идеи.

Код, который мы написали, — это фундамент, на котором можно строить сложные и надёжные системы. Он лёгкий, расширяемый и даёт отличную отправную точку для собственных экспериментов и разработок.

Спасибо, что дочитали статью до конца! Надеюсь, материал был для вас полезным и вдохновляющим.

Только зарегистрированные пользователи могут участвовать в опросе. Войдите, пожалуйста.
Полезно ли оказалась вам статья?
50% Да6
50% Нет6
Проголосовали 12 пользователей. Воздержались 2 пользователя.
Теги:
Хабы:
+2
Комментарии3

Публикации

Работа

Data Scientist
41 вакансия
DevOps инженер
30 вакансий

Ближайшие события