Но очень быстро оказывается, что настоящая сложность не в самих задачах, а в инфраструктуре вокруг них.

Сначала хочется просто запускать код в фоне. Потом появляются ретраи, таймауты, логирование, контроль выполнения. Ещё чуть позже всплывают совсем неприятные вопросы: что делать с пропущенными запусками после рестарта, как не дублировать задачи и где вообще должна жить логика планирования.

В какой-то момент становится очевидно:

ты уже пишешь не задачи, а pipeline вокруг задач

Именно в этот момент мне перестало хватать привычных инструментов.

  • системный Linux cron через crontab слишком примитивен

  • Celery требует брокеров и дополнительной инфраструктуры

  • APScheduler закрывает часть сценариев, но не даёт достаточной гибкости на уровне самого пайплайна

Мне хотелось не просто “ещё раз где-то запустить функцию”, а перестать каждый раз руками собирать вокруг задачи одну и ту же обвязку: от момента постановки до завершения или ошибки.

Так появился Jobify — асинхронный scheduler, с которым можно начать работать за несколько строк кода: без брокеров, со SQLite по умолчанию и с middleware-подходом.

Мне отдельно хотелось, чтобы он оставался лёгким. Многие решения в этой области за годы обросли легаси-подходом, легаси-кодом, дополнительными слоями абстракции и инфраструктурными допущениями. Мне не хотелось тащить ради фоновых задач половину отдельной инфры. Хотелось другого: гибкости, лёгкости и надёжности без тяжёлой инфраструктурной модели.

По умолчанию его задачи сохраняются на диск, а значит могут переживать перезапуск приложения. Для фоновых задач это важно: такой планировщик ощущается не только удобным, но и надёжным.

Здесь middleware управляют не только выполнением задач, но и их планированием через outer middleware.

Отдельно мне было важно сделать API знакомым и предсказуемым. При проектировании Jobify я во многом вдохновлялся подходом FastAPI, поэтому многие вещи здесь сразу ощущаются привычно.

Если вы уже работали с FastAPI, то, скорее всего, быстро поймёте и Jobify: без долгого вчитывания в документацию и без ощущения, что нужно осваивать ещё одну новую абстракцию.

А в самом простом виде это выглядит так:

import asyncio

from jobify import Jobify

app = Jobify()


@app.task
async def send_notification(user_id: int) -> None:
    print(f"Notify user {user_id}")


async def main() -> None:
    async with app:
        await send_notification.push(42)
        print("Job scheduled in background")


asyncio.run(main())

То есть для простого background job “запусти и не жди” не нужно ничего сложнее, чем task.push(...).

Где на самом деле начинаются проблемы

Проблемы начинаются не тогда, когда нужно запустить функцию “позже”. Проблемы начинаются, когда фоновые задачи перестают быть редким исключением и становятся нормальной частью приложения.

Их становится больше. У них появляются собственные правила, ограничения и побочные эффекты. С этого момента “запустить позже” уже недостаточно.

Обычно всё распадается на две части:

  1. Выполнение. Тут нужны ретраи через retry, timeout, обработка исключений, observability и dependency injection.

  2. Планирование. А вот тут интереснее: что делать, если задача с таким идентификатором уже существует? Можно ли заменить расписание? Что считать дубликатом? Нужно ли выполнять дополнительную логику в момент постановки? Что делать с задачами после перезапуска приложения? Нужно ли логгировать что мы запланировали задачу?

    спойлер: jobify уже решает проблему с дубликатами

Довольно быстро обвязка вокруг задач разрастается: часть логики живёт в декораторах, часть — в коде планирования, часть — в обработчиках ошибок. В итоге инфраструктура часто оказывается сложнее самих задач.

Задача превращается не в “функцию, которую надо вызвать потом”, а в сущность с собственным жизненным циклом.

А что насчёт встроенного sched

У Python уже есть встроенный модуль sched. Для самых простых сценариев это вполне нормальный инструмент.

import sched
import time

scheduler = sched.scheduler(time.time, time.sleep)


def send_email() -> None:
    print("Email sent")


scheduler.enter(10, 1, send_email)
scheduler.run()

Для “через 10 секунд выполнить функцию” этого достаточно. Но sched — это просто синхронный механизм отложенного вызова. Для фоновых задач уровня приложения этого быстро становится мало.

В нём нет хранения задач на диске, поэтому задачи не переживают перезапуск процесса. Нет встроенной модели для ретраев через retry, timeout, middleware, outer middleware, Cron, обработки пропущенных запусков и управления жизненным циклом задачи как отдельной сущности.

Кроме того, sched — это синхронная модель. Условный time.sleep() просто останавливает текущий поток. В Jobify ожидание устроено иначе: задача сразу ставится на таймер через loop.call_at, а цикл событий продолжает обслуживать остальные задачи. Для асинхронной системы это принципиальная разница: I/O-задачи не блокируют друг друга во время ожидания.

Почему привычные решения закрывают задачу только частично

Проблема не в том, что существующие инструменты плохие. Наоборот, у каждого из них есть своя понятная зона применения.

Системный Linux cron с настройкой через crontab отлично работает, когда нужно просто запускать что-то по расписанию на уровне системы. Он предсказуемый, понятный и живёт десятилетиями. Но как только хочется работать с задачами как с объектами приложения, его быстро перестаёт хватать. У него нет нормальной модели для middleware, контекста, обработки исключений, ретраев, статусов выполнения и программного управления жизненным циклом задачи.

Celery решает уже другой класс проблем. Это серьёзный инструмент для распределённого выполнения задач, очередей и продакшен-нагрузки. Но вместе с этим он приносит и свою цену: брокеры, инфраструктуру, отдельную операционную сложность и API, который ощущается тяжелее, чем хотелось бы для части сценариев.

APScheduler находится ближе всего к той области, которая была нужна мне. Он действительно закрывает важный кусок задачи: позволяет планировать задачи внутри приложения и по ощущениям уже намного ближе к тому, чего обычно хочешь от библиотеки.

Но в какой-то момент я упёрся в то, что мне нужен был не просто планировщик, а более цельная и при этом лёгкая модель вокруг задач.

Мне было важно иметь единый и понятный способ:

  • вмешиваться не только в выполнение, но и в сам момент планирования

  • работать с задачами как с сущностями приложения, а не просто зарегистрированными callback’ами

  • быть надёжным по умолчанию: задачи должны переживать рестарты, а не пропадать вместе с процессом

  • гибко управлять поведением после рестартов и пропущенных запусков

  • не тащить за собой легаси-подход и тяжёлую модель там, где это не нужно

  • держать знакомый, интуитивный API без тяжёлой внешней инфраструктуры

И отдельно мне хотелось, чтобы API ощущался по-настоящему привычно. Если разработчик уже работал с FastAPI, он ожидает увидеть знакомые идеи: middleware, lifespan, роутеры, внятную композицию приложения и нормальную интеграцию с DI. Для фоновых задач такой стиль оказывается не менее удобным, чем для HTTP-слоя.

Это особенно важно сейчас, когда вокруг Python уже сформировался довольно понятный developer experience. Те вещи, которые многим нравятся в FastAPI, хочется видеть и в планировщике задач: интуитивный API, расширяемость, понятные точки входа и интеграции вроде Dishka. Не отдельный мир со своими странными правилами, а инструмент, который нормально ложится в уже знакомый стек.

Именно в этом месте я понял, что проблема не в отсутствии инструмента “запусти позже”. Таких инструментов достаточно. Проблема в том, что мне нужен был планировщик, который изначально проектируется вокруг задачи целиком и при этом даёт удобный, знакомый API, а не только умеет запускать функцию по времени.

Каким в итоге должен быть удобный scheduler

Требования получились довольно простыми:

  • минимум инфраструктуры для старта

  • лёгкость самой модели и кода

  • надёжность по умолчанию: задачи должны переживать рестарты

  • понятный API

  • мидлвари не только на выполнении, но и на этапе планирования

  • нормальный жизненный цикл задачи: статусы, рестарты, ошибки, ретраи

  • нативность для asyncio

Из этого и вырос Jobify.

Что под капотом: отказ от polling

Большинство планировщиков работают по принципу

while True:
    sleep(1)
    check_tasks()

Jobify работает иначе: когда задача запланирована, она сразу ставится на таймер через loop.call_at. Это как раз один из ключевых плюсов его более современной архитектуры, о которых я пишу и в официальном разделе Why Jobify.

Это даёт:

  • отсутствие постоянного опроса

  • меньше нагрузки в простое

  • более точный запуск

Но есть и компромисс. Таймеры завязаны на монотонное время, поэтому при резком изменении системного времени задачи не пересчитываются автоматически.

То есть отказ от цикла опроса здесь не “магия лучше всех”, а осознанный trade-off: меньше накладных расходов, выше точность, но больше требований к окружению.

Что такое Jobify

Jobify — это асинхронный scheduler для Python, который:

  • поднимается за несколько строк кода

  • не требует брокеров

  • использует SQLite по умолчанию

  • хранит задачи на диске

  • даёт middleware-подход

Главная идея: задачи — это не просто функции, а объекты с жизненным циклом.

Как это выглядит в коде

Пример с ожиданием результата:

import asyncio
from jobify import Jobify

app = Jobify()


@app.task
async def send_email(to: str, subject: str) -> None:
    print(f"Sending email to {to}: {subject}")


async def main() -> None:
    async with app:
        job = await send_email.schedule(
            to="user@example.com",
            subject="Welcome!",
        ).delay(10)
        await job.wait()


asyncio.run(main())

Без брокеров. Без внешних воркеров. Без обязательной инфраструктуры.

Но при этом есть Job-объект:

  • можно ждать результат

  • проверять статус

  • отменять выполнение

Расширение: от простой задачи к системе

Через lifespan можно положить конфигурацию в состояние приложения:

import asyncio
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager

from jobify import INJECT, Jobify, State


@asynccontextmanager
async def lifespan(_: Jobify) -> AsyncIterator[dict[str, str]]:
    yield {"sender": "noreply@example.com"}


app = Jobify(lifespan=lifespan)


@app.task
def send_email(to: str, subject: str, state: State = INJECT) -> None:
    print(f"{state.sender} -> {to}: {subject}")

узнали схожесть лайфспана из фастапи?) да, в jobify он точно такой-же.

Политику выполнения можно задавать прямо на задаче:

from jobify import Jobify

app = Jobify()


@app.task(retry=3, timeout=30)
async def sync_user(user_id: int) -> None:
    ...

Outer middleware работают в момент планирования задачи:

import asyncio

from jobify import Jobify, OuterContext
from jobify.middleware import BaseOuterMiddleware, CallNextOuter


class ScheduleLoggerMiddleware(BaseOuterMiddleware):
    async def __call__(
        self,
        call_next: CallNextOuter,
        context: OuterContext,
    ) -> asyncio.Handle:
        print(f"Scheduling {context.job.id} with trigger: {context.trigger}")
        return await call_next(context)


app = Jobify(outer_middleware=[ScheduleLoggerMiddleware()])

Обычные middleware работают во время выполнения:

import logging
from typing import Any

from jobify import JobContext, Jobify
from jobify.middleware import BaseMiddleware, CallNext


class AuditMiddleware(BaseMiddleware):
    async def __call__(self, call_next: CallNext, context: JobContext) -> Any:
        logging.info("Job %s started", context.job.id)
        try:
            return await call_next(context)
        finally:
            logging.info("Job %s finished", context.job.id)


app = Jobify(middleware=[AuditMiddleware()])

Когда задач становится больше, помогают роутеры:

from jobify import JobRouter, Jobify

notifications_router = JobRouter(prefix="notifications")


@notifications_router.task
async def send_email_alert(recipient: str, subject: str) -> None:
    print(f"{recipient}: {subject}")


app = Jobify()
app.include_router(notifications_router)

Модель остаётся одной и той же: вы начинаете с простой функции и постепенно добавляете к ней dependency injection, ретраи, логику планирования и организацию по модулям.

Планирование задач

Поддерживаются базовые сценарии:

  • push() — сразу

  • delay(...) — через время

  • at(...) — в конкретный момент

  • cron(...) — по расписанию

При этом cron поддерживает секундную точность, а не только минуты.

Для простого background job этого уже достаточно: task.push(...) ставит задачу в выполнение без ожидания результата.

Пропущенные cron-задачи

Один из самых неприятных вопросов:

что делать, если задача должна была выполниться, пока сервис был выключен?

В Jobify это решается через misfire policy у Cron:

  • ALL — выполнить всё пропущенное

  • SKIP — пропустить

  • ONCE — выполнить один раз

  • GRACE(timedelta(seconds=600)) — выполнить только в заданном окне

Минимальный пример с MisfirePolicy.ALL:

from jobify import Cron, Jobify, MisfirePolicy

app = Jobify()


@app.task(
    cron=Cron("*/5 * * * * * *", misfire_policy=MisfirePolicy.ALL),
)
async def sync_metrics() -> None:
    ...

Если приложение было недоступно, Jobify попытается догнать все пропущенные срабатывания этой cron-задачи после запуска.

Ключевые возможности Jobify

Если совсем коротко, то для меня Jobify держится на четырёх вещах:

Ограничения и компромиссы

Важно понимать границы:

  1. Сейчас Jobify лучше всего чувствует себя в рамках одного процесса или одной инстанции. Поддержка распределённого запуска готовится к 1.0.0.

  2. Если в вашей среде часто меняется системное время, это тоже важно учитывать. Подробнее: System Time and Scheduling Trade-offs.

  3. Если прямо сейчас нужна тяжёлая распределённая очередь, стоит смотреть в сторону других инструментов.

Когда использовать Jobify

Jobify хорошо ложится в сценарии, где задачи — часть бизнес-логики:

  • синхронизация с внешними API

  • отправка уведомлений

  • фоновые вычисления

  • агрегация метрик

  • обработка событий

И особенно хорошо подходит, когда нужны:

  • единый и понятный способ запускать задачи сразу, с задержкой, в конкретное время или по cron

  • управление жизненным циклом задачи, а не только фактом запуска

  • лёгкий и гибкий инструмент без лишней инфраструктурной сложности

  • middleware не только на выполнении, но и на этапе планирования

  • сохранение задач на диск по умолчанию и восстановление после перезапуска

  • знакомый API, если вы привыкли к подходу FastAPI

  • секундный cron и точный запуск без polling

Итог

В какой-то момент становится ясно: запуск задачи — это самая простая часть. Сложность — в управлении её жизненным циклом.

Jobify — это попытка сделать этот цикл явным и управляемым, но без лишней тяжести: с привычным API, хранением задач на диск, секундным cron и точным запуском благодаря loop.call_at

Я и сам использую Jobify в одном из своих сервисов, то есть для меня это не “библиотека ради библиотеки”, а рабочий инструмент под реальные задачи.

Кроме того, я уже получил фидбэк от двух человек, которые переехали с APScheduler на Jobify. Для меня это хороший сигнал, что проблема, которую я пытался решить, не была чисто моей локальной болью.

Проект открытый, и я буду рад фидбеку:

Если у вас есть идеи по улучшению фреймворка, я всегда буду рад предложениям. Один из главных следующих шагов для Jobify — нормальная поддержка distributed-сценария, чтобы он умел работать уже не только как single-instance планировщик, но и в более распределённой модели, ближе к тому, как это умеет Celery.