Skip to content

SchedulerConfig

The SchedulerConfig is simply an interface that allows you to integrate any scheduler into Lilya in a cleaner way.

Warning

If you want to run your own implementation and integration with different schedulers, please feel free to jump this section and ignore all of this.

This is possible due to the fact that Lilya implements the SchedulerConfig.

How to import it

You can import the configuration from the following:

from lilya.contrib.schedulers.config import SchedulerConfig

The SchedulerConfig class

When implementing a scheduler configurations you must implement two functions.

  1. async def start()
  2. async def shutdown()

This is what makes the SchedulerConfig modular because there are plenty of schedulers out there and each one of them with a lot of different options and configurations but the one thing they all have in common is the fact that all of them must start and shutdown at some point. The only thing Lilya "cares" is that by encapsulating that functionality into two simple functions.

The start function

The start function, as the name suggests, its the function that Lilya calls to start the scheduler for you. These are usually passed into the on_startup or in the upper part of the lifespan above the yield.

The shutdown function

The shutdown function, as the name suggests, its the function that Lilya calls to shutdown the scheduler for you. These are usually passed into the on_shutdowb or in the lower part of the lifespan below the yield.

How to use it

Lilya already implements this interface with the custom AsynczConfig. This functionality is very handy since Asyncz has a lot of configurations that can be passed and used within an Lilya application.

Let us see how the implementation looks like.

import warnings
from datetime import datetime
from datetime import timezone as dtimezone
from typing import Any

from asyncz.schedulers import AsyncIOScheduler
from asyncz.schedulers.types import SchedulerType
from asyncz.triggers.types import TriggerType
from asyncz.typing import undefined, UndefinedType
from lilya.conf import settings
from lilya.contrib.schedulers.base import SchedulerConfig
from lilya.exceptions import ImproperlyConfigured


class AsynczConfig(SchedulerConfig):
    """
    Implements an integration with Asyncz, allowing to
    customise the scheduler with the provided configurations.
    """

    def __init__(
        self,
        scheduler_class: type[SchedulerType] = AsyncIOScheduler,
        tasks: dict[str, str] | None = None,
        timezone: dtimezone | str | None = None,
        configurations: dict[str, dict[str, str]] | None = None,
        **kwargs: dict[str, Any],
    ):
        """
        Initializes the AsynczConfig object.

        Args:
            scheduler_class: The class of the scheduler to be used.
            tasks: A dictionary of tasks to be registered in the scheduler.
            timezone: The timezone to be used by the scheduler.
            configurations: Extra configurations to be passed to the scheduler.
            **kwargs: Additional keyword arguments.
        """
        super().__init__(**kwargs)
        self.scheduler_class = scheduler_class
        self.tasks = tasks
        self.timezone = timezone
        self.configurations = configurations
        self.options = kwargs

        for task, module in self.tasks.items():
            if not isinstance(task, str) or not isinstance(module, str):
                raise ImproperlyConfigured("The dict of tasks must be dict[str, str].")

        if not self.tasks:
            warnings.warn(
                "Lilya is starting the scheduler, yet there are no tasks declared.",
                UserWarning,
                stacklevel=2,
            )

        # Load the scheduler object
        self.handler = self.get_scheduler(
            scheduler=self.scheduler_class,
            timezone=self.timezone,
            configurations=self.configurations,
            **self.options,
        )

        self.register_tasks(tasks=self.tasks)

    def register_tasks(self, tasks: dict[str, str]) -> None:
        """
        Registers the tasks in the Scheduler.

        Args:
            tasks: A dictionary of tasks to be registered in the scheduler.
        """
        for task, _module in tasks.items():
            imported_task = f"{_module}.{task}"
            scheduled_task: "Task" = load(imported_task)

            if not scheduled_task.is_enabled:
                continue

            try:
                scheduled_task.add_task(self.handler)
            except Exception as e:
                raise ImproperlyConfigured(str(e)) from e

    def get_scheduler(
        self,
        scheduler: type[SchedulerType],
        timezone: dtimezone | str | None = None,
        configurations: dict[str, Any] | None = None,
        **options: dict[str, Any],
    ) -> SchedulerType:
        """
        Initiates the scheduler from the given time.
        If no value is provided, it will default to AsyncIOScheduler.

        The value of `scheduler_class` can be overwritten by any lilya custom settings.

        Args:
            scheduler: The class of the scheduler to be used.
            timezone: The timezone instance.
            configurations: A dictionary with extra configurations to be passed to the scheduler.
            **options: Additional options.

        Returns:
            SchedulerType: An instance of a Scheduler.
        """
        if not timezone:
            timezone = settings.timezone

        if not configurations:
            return scheduler(timezone=timezone, **options)

        return scheduler(global_config=configurations, timezone=timezone, **options)

    async def start(self, **kwargs: dict[str, Any]) -> None:
        """
        Starts the scheduler.

        Args:
            **kwargs: Additional keyword arguments.
        """
        self.handler.start(**kwargs)

    async def shutdown(self, **kwargs: dict[str, Any]) -> None:
        """
        Shuts down the scheduler.

        Args:
            **kwargs: Additional keyword arguments.
        """
        self.handler.shutdown(**kwargs)


class Task:
    """
    Base for the scheduler decorator that will auto discover the
    tasks in the application and add them to the internal scheduler.
    """

    def __init__(
        self,
        *,
        name: str | None = None,
        trigger: TriggerType | None = None,
        id: str | None = None,
        mistrigger_grace_time: int | UndefinedType | None = undefined,
        coalesce: bool | UndefinedType = undefined,
        max_instances: int | UndefinedType | None = undefined,
        next_run_time: datetime | str | UndefinedType | None = undefined,
        store: str = "default",
        executor: str = "default",
        replace_existing: bool = False,
        args: Any | None = None,
        kwargs: dict[str, Any] | None = None,
        is_enabled: bool = True,
    ) -> None:
        """
        Initializes a new instance of the `Task` class for the  Scheduler.

        Args:
            name (str, optional): Textual description of the task.
            trigger (TriggerType, optional): An instance of a trigger class.
            id (str, optional): Explicit identifier for the task.
            mistrigger_grace_time (int, optional): Seconds after the designated runtime that the task is still allowed to be run
                (or None to allow the task to run no matter how late it is).
            coalesce (bool, optional): Run once instead of many times if the scheduler determines that the task should be run more than once in succession.
            max_instances (int, optional): Maximum number of concurrently running instances allowed for this task.
            next_run_time (datetime, optional): When to first run the task, regardless of the trigger (pass None to add the task as paused).
            store (str, optional): Alias of the task store to store the task in.
            executor (str, optional): Alias of the executor to run the task with.
            replace_existing (bool, optional): True to replace an existing task with the same id
                (but retain the number of runs from the existing one).
            args (Any, optional): List of positional arguments to call func with.
            kwargs (dict[str, Any], optional): Dict of keyword arguments to call func with.
            is_enabled (bool, optional): True if the task is to be added to the scheduler.
        """
        self.name = name
        self.trigger = trigger
        self.id = id
        self.mistrigger_grace_time = mistrigger_grace_time
        self.coalesce = coalesce
        self.max_instances = max_instances
        self.next_run_time = next_run_time
        self.store = store
        self.executor = executor
        self.replace_existing = replace_existing
        self.args = args
        self.kwargs = kwargs
        self.is_enabled = is_enabled
        self.fn = None

    def add_task(self, scheduler: SchedulerType) -> None:
        try:
            scheduler.add_task(
                self.fn,
                trigger=self.trigger,
                args=self.args,
                kwargs=self.kwargs,
                id=self.id,
                name=self.name,
                mistrigger_grace_time=self.mistrigger_grace_time,
                coalesce=self.coalesce,
                max_instances=self.max_instances,
                next_run_time=self.next_run_time,
                store=self.store,
                executor=self.executor,
                replace_existing=self.replace_existing,
            )
        except Exception as e:
            raise ImproperlyConfigured(str(e)) from e

We won't be dueling on the technicalities of this configuration because its unique to Asyncz provided by Lilya but it is not mandatory to use it as you can build your own.

SchedulerConfig and application

To use the SchedulerConfig in an application, like the one shown above with asyncz, you can simply do this:

Note

We use the existing AsynczConfig as example but feel free to use your own if you require something else.

from asyncz.executors import AsyncIOExecutor, ThreadPoolExecutor
from asyncz.stores.mongo import MongoDBStore

from lilya.apps import Lilya
from lilya.contrib.schedulers.asyncz.config import AsynczConfig


def get_scheduler_config() -> AsynczConfig:
    # Define the stores
    # Override the default MemoryStore to become RedisStore where the db is 0
    stores = {"default": MongoDBStore()}

    # Define the executors
    # Override the default ot be the AsyncIOExecutor
    executors = {
        "default": AsyncIOExecutor(),
        "threadpool": ThreadPoolExecutor(max_workers=20),
    }

    # Set the defaults
    task_defaults = {"coalesce": False, "max_instances": 4}

    return AsynczConfig(
        tasks=...,
        timezone="UTC",
        stores=stores,
        executors=executors,
        task_defaults=task_defaults,
    )


app = Lilya(
    routes=[...],
    on_startup=[get_scheduler_config().start],
    on_shutdown=[get_scheduler_config().shutdown]
)

Application lifecycle

Lilya scheduler is tight to the application lifecycle and that means the on_startup/on_shutdown and lifespan. You can read more about this in the appropriate section of the documentation.

By default, the scheduler is linked to on_startup/on_shutdown events.

The following example serves as a suggestion but feel free to use your own design. Let us check how we could manage this using the lifespan instead.

from contextlib import asynccontextmanager
from functools import lru_cache

from asyncz.executors import AsyncIOExecutor, ThreadPoolExecutor
from asyncz.stores.mongo import MongoDBStore
from lilya.apps import Lilya
from lilya.contrib.schedulers.asyncz.config import AsynczConfig


@asynccontextmanager
async def lifespan(app: Lilya):
    # What happens on startup
    await get_scheduler_config().start()
    yield
    # What happens on shutdown
    await get_scheduler_config().shutdown()


@lru_cache
def get_scheduler_config() -> AsynczConfig:
    # Define the stores
    # Override the default MemoryStore to become RedisStore where the db is 0
    stores = {"default": MongoDBStore()}

    # Define the executors
    # Override the default ot be the AsyncIOExecutor
    executors = {
        "default": AsyncIOExecutor(),
        "threadpool": ThreadPoolExecutor(max_workers=20),
    }

    # Set the defaults
    task_defaults = {"coalesce": False, "max_instances": 4}

    return AsynczConfig(
        tasks=...,
        timezone="UTC",
        stores=stores,
        executors=executors,
        task_defaults=task_defaults,
    )


app = Lilya(
    routes=[...],
    lifespan=lifespan,
)

Pretty easy, right? Lilya then understands what needs to be done as normal.

The SchedulerConfig and the settings

Like everything in Lilya, the SchedulerConfig can be also made available via settings.

from asyncz.executors import AsyncIOExecutor, ThreadPoolExecutor
from asyncz.stores.mongo import MongoDBStore
from lilya.conf.global_settings import Settings
from lilya.contrib.schedulers.base import SchedulerConfig
from lilya.contrib.schedulers.asyncz.config import AsynczConfig


class CustomSettings(Settings):

    @property
    def scheduler_config(self) -> SchedulerConfig:
        stores = {"default": MongoDBStore()}

        # Define the executors
        # Override the default ot be the AsyncIOExecutor
        executors = {
            "default": AsyncIOExecutor(),
            "threadpool": ThreadPoolExecutor(max_workers=20),
        }

        # Set the defaults
        task_defaults = {"coalesce": False, "max_instances": 4}

        return AsynczConfig(
            tasks=...,
            timezone="UTC",
            stores=stores,
            executors=executors,
            task_defaults=task_defaults,
        )

Important Notes

  • You can create your own custom scheduler config.
  • You must implement the start/shutdown functions in any scheduler configuration.
  • You can use or on_startup/shutdown or lifespan events. The first is automatically managed for you.