SchedulerConfig¶
The SchedulerConfig is simply an interface that allows you to integrate any scheduler into Lilya in a cleaner way.
Warning
If you want to run your own implementation and integration with different schedulers, please feel free to jump this section and ignore all of this.
This is possible due to the fact that Lilya implements the SchedulerConfig.
How to import it¶
You can import the configuration from the following:
from lilya.contrib.schedulers.config import SchedulerConfig
The SchedulerConfig class¶
When implementing a scheduler configurations you must implement two functions.
This is what makes the SchedulerConfig
modular because there are plenty of schedulers out there and each one of them
with a lot of different options and configurations but the one thing they all have in common is the fact that all
of them must start and shutdown at some point. The only thing Lilya "cares" is that by encapsulating that functionality
into two simple functions.
The start function¶
The start function, as the name suggests, its the function that Lilya calls to start the scheduler for you. These are
usually passed into the on_startup
or in the upper part of the lifespan
above the yield
.
The shutdown function¶
The shutdown function, as the name suggests, its the function that Lilya calls to shutdown the scheduler for you. These are
usually passed into the on_shutdowb
or in the lower part of the lifespan
below the yield
.
How to use it¶
Lilya already implements this interface with the custom AsynczConfig
. This functionality is very handy since Asyncz
has a lot of configurations that can be passed and used within an Lilya application.
Let us see how the implementation looks like.
import warnings
from datetime import datetime
from datetime import timezone as dtimezone
from typing import Any
from asyncz.schedulers import AsyncIOScheduler
from asyncz.schedulers.types import SchedulerType
from asyncz.triggers.types import TriggerType
from asyncz.typing import undefined, UndefinedType
from lilya.conf import settings
from lilya.contrib.schedulers.base import SchedulerConfig
from lilya.exceptions import ImproperlyConfigured
class AsynczConfig(SchedulerConfig):
"""
Implements an integration with Asyncz, allowing to
customise the scheduler with the provided configurations.
"""
def __init__(
self,
scheduler_class: type[SchedulerType] = AsyncIOScheduler,
tasks: dict[str, str] | None = None,
timezone: dtimezone | str | None = None,
configurations: dict[str, dict[str, str]] | None = None,
**kwargs: dict[str, Any],
):
"""
Initializes the AsynczConfig object.
Args:
scheduler_class: The class of the scheduler to be used.
tasks: A dictionary of tasks to be registered in the scheduler.
timezone: The timezone to be used by the scheduler.
configurations: Extra configurations to be passed to the scheduler.
**kwargs: Additional keyword arguments.
"""
super().__init__(**kwargs)
self.scheduler_class = scheduler_class
self.tasks = tasks
self.timezone = timezone
self.configurations = configurations
self.options = kwargs
for task, module in self.tasks.items():
if not isinstance(task, str) or not isinstance(module, str):
raise ImproperlyConfigured("The dict of tasks must be dict[str, str].")
if not self.tasks:
warnings.warn(
"Lilya is starting the scheduler, yet there are no tasks declared.",
UserWarning,
stacklevel=2,
)
# Load the scheduler object
self.handler = self.get_scheduler(
scheduler=self.scheduler_class,
timezone=self.timezone,
configurations=self.configurations,
**self.options,
)
self.register_tasks(tasks=self.tasks)
def register_tasks(self, tasks: dict[str, str]) -> None:
"""
Registers the tasks in the Scheduler.
Args:
tasks: A dictionary of tasks to be registered in the scheduler.
"""
for task, _module in tasks.items():
imported_task = f"{_module}.{task}"
scheduled_task: "Task" = load(imported_task)
if not scheduled_task.is_enabled:
continue
try:
scheduled_task.add_task(self.handler)
except Exception as e:
raise ImproperlyConfigured(str(e)) from e
def get_scheduler(
self,
scheduler: type[SchedulerType],
timezone: dtimezone | str | None = None,
configurations: dict[str, Any] | None = None,
**options: dict[str, Any],
) -> SchedulerType:
"""
Initiates the scheduler from the given time.
If no value is provided, it will default to AsyncIOScheduler.
The value of `scheduler_class` can be overwritten by any lilya custom settings.
Args:
scheduler: The class of the scheduler to be used.
timezone: The timezone instance.
configurations: A dictionary with extra configurations to be passed to the scheduler.
**options: Additional options.
Returns:
SchedulerType: An instance of a Scheduler.
"""
if not timezone:
timezone = settings.timezone
if not configurations:
return scheduler(timezone=timezone, **options)
return scheduler(global_config=configurations, timezone=timezone, **options)
async def start(self, **kwargs: dict[str, Any]) -> None:
"""
Starts the scheduler.
Args:
**kwargs: Additional keyword arguments.
"""
self.handler.start(**kwargs)
async def shutdown(self, **kwargs: dict[str, Any]) -> None:
"""
Shuts down the scheduler.
Args:
**kwargs: Additional keyword arguments.
"""
self.handler.shutdown(**kwargs)
class Task:
"""
Base for the scheduler decorator that will auto discover the
tasks in the application and add them to the internal scheduler.
"""
def __init__(
self,
*,
name: str | None = None,
trigger: TriggerType | None = None,
id: str | None = None,
mistrigger_grace_time: int | UndefinedType | None = undefined,
coalesce: bool | UndefinedType = undefined,
max_instances: int | UndefinedType | None = undefined,
next_run_time: datetime | str | UndefinedType | None = undefined,
store: str = "default",
executor: str = "default",
replace_existing: bool = False,
args: Any | None = None,
kwargs: dict[str, Any] | None = None,
is_enabled: bool = True,
) -> None:
"""
Initializes a new instance of the `Task` class for the Scheduler.
Args:
name (str, optional): Textual description of the task.
trigger (TriggerType, optional): An instance of a trigger class.
id (str, optional): Explicit identifier for the task.
mistrigger_grace_time (int, optional): Seconds after the designated runtime that the task is still allowed to be run
(or None to allow the task to run no matter how late it is).
coalesce (bool, optional): Run once instead of many times if the scheduler determines that the task should be run more than once in succession.
max_instances (int, optional): Maximum number of concurrently running instances allowed for this task.
next_run_time (datetime, optional): When to first run the task, regardless of the trigger (pass None to add the task as paused).
store (str, optional): Alias of the task store to store the task in.
executor (str, optional): Alias of the executor to run the task with.
replace_existing (bool, optional): True to replace an existing task with the same id
(but retain the number of runs from the existing one).
args (Any, optional): List of positional arguments to call func with.
kwargs (dict[str, Any], optional): Dict of keyword arguments to call func with.
is_enabled (bool, optional): True if the task is to be added to the scheduler.
"""
self.name = name
self.trigger = trigger
self.id = id
self.mistrigger_grace_time = mistrigger_grace_time
self.coalesce = coalesce
self.max_instances = max_instances
self.next_run_time = next_run_time
self.store = store
self.executor = executor
self.replace_existing = replace_existing
self.args = args
self.kwargs = kwargs
self.is_enabled = is_enabled
self.fn = None
def add_task(self, scheduler: SchedulerType) -> None:
try:
scheduler.add_task(
self.fn,
trigger=self.trigger,
args=self.args,
kwargs=self.kwargs,
id=self.id,
name=self.name,
mistrigger_grace_time=self.mistrigger_grace_time,
coalesce=self.coalesce,
max_instances=self.max_instances,
next_run_time=self.next_run_time,
store=self.store,
executor=self.executor,
replace_existing=self.replace_existing,
)
except Exception as e:
raise ImproperlyConfigured(str(e)) from e
We won't be dueling on the technicalities of this configuration because its unique to Asyncz provided by Lilya but it is not mandatory to use it as you can build your own.
SchedulerConfig and application¶
To use the SchedulerConfig
in an application, like the one shown above with asyncz, you can simply do this:
Note
We use the existing AsynczConfig as example but feel free to use your own if you require something else.
from asyncz.executors import AsyncIOExecutor, ThreadPoolExecutor
from asyncz.stores.mongo import MongoDBStore
from lilya.apps import Lilya
from lilya.contrib.schedulers.asyncz.config import AsynczConfig
def get_scheduler_config() -> AsynczConfig:
# Define the stores
# Override the default MemoryStore to become RedisStore where the db is 0
stores = {"default": MongoDBStore()}
# Define the executors
# Override the default ot be the AsyncIOExecutor
executors = {
"default": AsyncIOExecutor(),
"threadpool": ThreadPoolExecutor(max_workers=20),
}
# Set the defaults
task_defaults = {"coalesce": False, "max_instances": 4}
return AsynczConfig(
tasks=...,
timezone="UTC",
stores=stores,
executors=executors,
task_defaults=task_defaults,
)
app = Lilya(
routes=[...],
on_startup=[get_scheduler_config().start],
on_shutdown=[get_scheduler_config().shutdown]
)
Application lifecycle¶
Lilya scheduler is tight to the application lifecycle and that means the on_startup/on_shutdown
and lifespan
.
You can read more about this in the appropriate section of the documentation.
By default, the scheduler is linked to on_startup/on_shutdown
events.
The following example serves as a suggestion but feel free to use your own design. Let us check how we could manage
this using the lifespan
instead.
from contextlib import asynccontextmanager
from functools import lru_cache
from asyncz.executors import AsyncIOExecutor, ThreadPoolExecutor
from asyncz.stores.mongo import MongoDBStore
from lilya.apps import Lilya
from lilya.contrib.schedulers.asyncz.config import AsynczConfig
@asynccontextmanager
async def lifespan(app: Lilya):
# What happens on startup
await get_scheduler_config().start()
yield
# What happens on shutdown
await get_scheduler_config().shutdown()
@lru_cache
def get_scheduler_config() -> AsynczConfig:
# Define the stores
# Override the default MemoryStore to become RedisStore where the db is 0
stores = {"default": MongoDBStore()}
# Define the executors
# Override the default ot be the AsyncIOExecutor
executors = {
"default": AsyncIOExecutor(),
"threadpool": ThreadPoolExecutor(max_workers=20),
}
# Set the defaults
task_defaults = {"coalesce": False, "max_instances": 4}
return AsynczConfig(
tasks=...,
timezone="UTC",
stores=stores,
executors=executors,
task_defaults=task_defaults,
)
app = Lilya(
routes=[...],
lifespan=lifespan,
)
Pretty easy, right? Lilya then understands what needs to be done as normal.
The SchedulerConfig and the settings¶
Like everything in Lilya, the SchedulerConfig can be also made available via settings.
from asyncz.executors import AsyncIOExecutor, ThreadPoolExecutor
from asyncz.stores.mongo import MongoDBStore
from lilya.conf.global_settings import Settings
from lilya.contrib.schedulers.base import SchedulerConfig
from lilya.contrib.schedulers.asyncz.config import AsynczConfig
class CustomSettings(Settings):
@property
def scheduler_config(self) -> SchedulerConfig:
stores = {"default": MongoDBStore()}
# Define the executors
# Override the default ot be the AsyncIOExecutor
executors = {
"default": AsyncIOExecutor(),
"threadpool": ThreadPoolExecutor(max_workers=20),
}
# Set the defaults
task_defaults = {"coalesce": False, "max_instances": 4}
return AsynczConfig(
tasks=...,
timezone="UTC",
stores=stores,
executors=executors,
task_defaults=task_defaults,
)
Important Notes¶
- You can create your own custom scheduler config.
- You must implement the
start/shutdown
functions in any scheduler configuration. - You can use or
on_startup/shutdown
orlifespan
events. The first is automatically managed for you.