Adding a trigger

Triggers are used to run a workflow based on an event. They are defined by inheriting from BaseTriggerDefinition and implementing the required methods.

Triggers are similar to nodes.

from datetime import datetime, timedelta
from noxus.triggers import BaseTriggerDefinition, TriggerConfiguration, BaseEvent

class PeriodicTriggerConfig(TriggerConfiguration):
    type: Literal["periodic"] = Parameter(default="periodic")

    @model_validator(mode="before")
    def make_routing_key(cls, data) -> str:
        '''The routing key is used to identify the events that run this workflow'''
        data["routing_key"] = f"periodic:somethingunique"
        return data

class PeriodicEvent(BaseEvent):
    type: Literal["periodic"] = "periodic"
    timestamp: datetime

class PeriodicTriggerDefinition(BaseTriggerDefinition):
    type = "periodic"
    title = "Periodic Trigger"
    description = "Trigger workflows every 3 days"
    config_class = PeriodicTriggerConfig

    # The input map config allows you to map trigger event specific inputs to workflow inputs
    input_map_config: list[TriggerInput] | None = [
        TriggerInput(
            value="timestamp",
            label="Trigger Time",
            type=DataType.datetime,
        )
    ]
    def check_event(self, evt) -> bool:
        return evt.routing_key == self.config.routing_key

    def get_inputs(self, evt) -> dict:
        return {
            "timestamp": evt.timestamp.isoformat()
        }

The code that runs the trigger needs to be defined by creating an asyncio task to deal with the specific event and calling register_event when something happens

import asyncio
from datetime import datetime

async def periodic_task():
    while True:
        event = PeriodicEvent(
            routing_key="periodic:somethingunique",
            timestamp=datetime.utcnow()
        )
        await register_event(event) # this will cause workflows with this trigger to run
        await asyncio.sleep(60 * 60 * 24 * 3)  # Sleep for 3 days

# The install_triggers function in the definition of your plugin should start the logic of the trigger.
def install_triggers():
    loop = asyncio.get_event_loop()
    loop.create_task(periodic_task())

The final step is to return the trigger and event in the triggers method of your plugin.