Skip to main content
Custom nodes allow you to extend Noxus with specialized functionality tailored to your specific needs. This guide covers everything you need to know to build, test, and deploy custom nodes.

Node Architecture

BaseNode Class

All nodes inherit from BaseNode[ConfigType], a generic base class that provides the node lifecycle and interface.
from spotflow.nodes.base import BaseNode, NodeCategory
from spotflow.nodes.data_types import Connector, TypeDefinition
from pydantic import BaseModel

class MyCustomNode(BaseNode["MyCustomNodeConfig"]):
    # Node metadata
    node_name = "my_custom_node"
    title = "My Custom Node"
    category = NodeCategory.DATA
    color = "#4A90E2"
    image = "https://your-icon-url.com/icon.png"
    visible = True  # Show in node palette

    # Input/output connectors
    inputs = [
        Connector(
            key="input_text",
            label="Input Text",
            type_=TypeDefinition.text()
        )
    ]

    outputs = [
        Connector(
            key="output_text",
            label="Output Text",
            type_=TypeDefinition.text()
        )
    ]

    # Main execution method
    async def call(
        self,
        ctx: ExecutionContext,
        input_text: str
    ) -> dict[str, str]:
        # Your logic here
        result = input_text.upper()
        return {"output_text": result}

Node Metadata

node_name (str, required)
  • Unique identifier for the node type
  • Use snake_case convention
  • Must be globally unique across all nodes
title (str, required)
  • Display name in UI
  • Use Title Case
  • Keep concise (2-4 words)
category (NodeCategory, required)
  • Groups nodes in palette
  • Options: AIText, Agent, Logic, Data, Integration, InputOutput, Utility
color (str, required)
  • Hex color code for node appearance
  • Use brand colors or category-standard colors
image (str, required)
  • Icon URL (PNG/SVG)
  • Displayed in node palette and on canvas
  • Recommended size: 48x48px
visible (bool, default: True)
  • Whether to show in node palette
  • Set to False for deprecated or internal nodes

Defining Inputs and Outputs

Connector Types

Single-Value Connector:
from spotflow.nodes.data_types import Connector, TypeDefinition

inputs = [
    Connector(
        key="text_input",
        label="Text Input",
        type_=TypeDefinition.text(),
        required=True  # Must be connected or have value
    )
]
Variable Connector (multiple named inputs/outputs):
from spotflow.nodes.data_types import VariableConnector

inputs = [
    VariableConnector(
        key="variables",
        label="Variables",
        type_=TypeDefinition.text()
    )
]

# User can add multiple inputs: var1, var2, var3, etc.
# Accessed in call() as: variables: dict[str, str]
Variable Type-Size Connector (different types):
from spotflow.nodes.data_types import VariableTypeSizeConnector

inputs = [
    VariableTypeSizeConnector(
        key="inputs",
        label="Inputs"
    )
]

# User can add inputs with different types
# Accessed as: inputs: dict[str, Any]

Data Types

Noxus supports rich type definitions:
# Basic types
TypeDefinition.text()            # String
TypeDefinition.number()          # Float or int
TypeDefinition.boolean()         # True/False

# File types
TypeDefinition.file()            # Any file
TypeDefinition.image()           # Image file
TypeDefinition.audio()           # Audio file

# Structured types
TypeDefinition.json()            # JSON object

# Lists
TypeDefinition.text(is_list=True)    # List of strings
TypeDefinition.file(is_list=True)    # List of files

Optional Inputs

inputs = [
    Connector(
        key="optional_param",
        label="Optional Parameter",
        type_=TypeDefinition.text(),
        required=False  # Can be left unconnected
    )
]

# In call(), check if provided:
async def call(self, ctx, optional_param: str | None = None):
    if optional_param:
        # Use parameter
        pass
    else:
        # Use default behavior
        pass

Node Configuration

Configuration fields allow users to customize node behavior without connections.

Configuration Schema

Define a Pydantic model for configuration:
from pydantic import BaseModel, Field
from spotflow.nodes.config_fields import (
    ConfigText,
    ConfigBigText,
    ConfigSelect,
    ConfigToggle,
    ConfigNumberSlider
)

class MyNodeConfig(BaseModel):
    api_key: str = Field(
        title="API Key",
        description="Your API key for the service",
        json_schema_extra=ConfigText()
    )

    mode: str = Field(
        title="Mode",
        description="Processing mode",
        default="fast",
        json_schema_extra=ConfigSelect(
            options=["fast", "balanced", "quality"]
        )
    )

    temperature: float = Field(
        title="Temperature",
        description="Randomness in generation",
        default=0.7,
        json_schema_extra=ConfigNumberSlider(
            min=0.0,
            max=2.0,
            step=0.1
        )
    )

    enable_cache: bool = Field(
        title="Enable Caching",
        description="Cache results for faster retrieval",
        default=True,
        json_schema_extra=ConfigToggle()
    )

Configuration Field Types

ConfigText: Single-line text input ConfigBigText: Multi-line textarea ConfigRichTextVariables: Rich text editor with variable insertion ConfigSelect: Dropdown selection ConfigMultiSelect: Multi-select dropdown ConfigToggle: Boolean switch ConfigNumberSlider: Numeric slider ConfigDictList: Key-value pair list ConfigJsonSchemaBuilder: JSON schema designer ConfigModelSelect: LLM model picker ConfigToolsSelect: Agent tool selector

Dynamic Configuration

Generate configuration options dynamically:
@classmethod
def get_config(
    cls,
    ctx: ExecutionContext,
    config: "MyNodeConfig"
) -> type["MyNodeConfig"]:
    # Fetch options from database/API
    async with ctx.db() as db:
        accounts = await db.execute(
            select(Account).filter_by(user_id=ctx.user.id)
        )
        account_options = [acc.name for acc in accounts]

    # Update config schema
    class DynamicConfig(MyNodeConfig):
        account: str = Field(
            title="Account",
            json_schema_extra=ConfigSelect(options=account_options)
        )

    return DynamicConfig

Implementing Node Logic

The call() Method

The call() method is where your node’s logic executes:
async def call(
    self,
    ctx: ExecutionContext,
    # Input parameters match connector keys
    input_text: str,
    number_input: float,
    optional_file: File | None = None
) -> dict[str, Any]:
    """
    Node execution logic.

    Args:
        ctx: Execution context with access to DB, Redis, credentials, etc.
        input_text: Text from input connector
        number_input: Number from input connector
        optional_file: Optional file input

    Returns:
        Dictionary mapping output connector keys to values
    """

    # Access configuration
    api_key = self.config.api_key
    mode = self.config.mode

    # Your logic here
    result = await process_data(input_text, mode, api_key)

    # Return outputs
    return {
        "output_text": result.text,
        "output_number": result.score
    }

Sync vs Async

Nodes can be synchronous or asynchronous: Async (Recommended):
async def call(self, ctx, input_text: str) -> dict:
    result = await async_api_call(input_text)
    return {"output": result}
Sync:
def call(self, ctx, input_text: str) -> dict:
    result = sync_processing(input_text)
    return {"output": result}
Use async for:
  • Database queries
  • External API calls
  • I/O operations
Use sync for:
  • Pure computation
  • Simple transformations

Execution Context

The ExecutionContext provides access to platform resources: Database Access:
async with ctx.db() as db:
    user = await db.get(User, ctx.user.id)
    # Perform database operations
Redis Access:
redis = ctx.redis
await redis.set("key", "value")
value = await redis.get("key")
Credentials:
# Access integration credentials
credentials = await ctx.get_credentials(integration_name="google")
access_token = credentials.access_token
LLM Access:
# Use LLM providers
llms = ctx.llms()
response = await llms.generate(
    model="gpt-4o",
    prompt="Hello, world!"
)
Embeddings:
embeddings = ctx.embeddings()
vectors = await embeddings.embed(["text1", "text2"])
User/Group Info:
user = ctx.user  # Current user
group = ctx.group  # Current workspace
tenant = ctx.group.tenant  # Organization
api_key = ctx.api_key  # If called via API
Fingerprint (Run Metadata):
fingerprint = ctx.get_fingerprint()
# Contains: user_id, group_id, run_id, etc.

Error Handling

Raising Errors

Raise exceptions to signal errors:
from spotflow.nodes.exceptions import NodeExecutionError

async def call(self, ctx, input_text: str) -> dict:
    if not input_text:
        raise NodeExecutionError("Input text cannot be empty")

    try:
        result = await external_api(input_text)
    except APIException as e:
        raise NodeExecutionError(f"API call failed: {e}") from e

    return {"output": result}

Continue on Error

Users can configure nodes to continue on error. Your node should return default values:
async def call(self, ctx, input_text: str) -> dict:
    try:
        result = await risky_operation(input_text)
        return {"output": result}
    except Exception as e:
        # If continue-on-error is enabled, this returns default
        return {"output": ""}  # Empty string as default

Timeout Configuration

Nodes can specify dynamic timeouts:
def calculate_timeout(
    self,
    ctx: ExecutionContext,
    **inputs
) -> int:
    """
    Calculate timeout in seconds based on inputs.

    Returns:
        Timeout in seconds
    """
    # Example: Longer timeout for larger files
    file_input = inputs.get("file_input")
    if file_input:
        file_size_mb = file_input.size / (1024 * 1024)
        return int(60 + file_size_mb * 2)  # 60s + 2s per MB

    return 300  # Default 5 minutes

List Handling

Nodes automatically handle list iteration when a list output connects to a non-list input. Option 1: Non-List Input (Automatic Iteration):
inputs = [
    Connector(
        key="text_input",
        label="Text Input",
        type_=TypeDefinition.text()  # NOT a list
    )
]

# When list connects here, node executes once per item
async def call(self, ctx, text_input: str) -> dict:
    # Receives single string, even if list upstream
    result = text_input.upper()
    return {"output": result}
Option 2: List Input (Processes Entire List):
inputs = [
    Connector(
        key="text_list",
        label="Text List",
        type_=TypeDefinition.text(is_list=True)  # List type
    )
]

# Node receives entire list
async def call(self, ctx, text_list: list[str]) -> dict:
    # Process all items together
    results = [t.upper() for t in text_list]
    return {"output_list": results}

File Handling

Reading Files

from spotflow.models import File

async def call(self, ctx, file_input: File) -> dict:
    # Read file contents
    content = await file_input.read_bytes()

    # Or get file path
    file_path = file_input.path

    # Access metadata
    filename = file_input.filename
    mime_type = file_input.mime_type
    size = file_input.size

    return {"output": process(content)}

Creating Files

from spotflow.models import File

async def call(self, ctx, text_input: str) -> dict:
    # Create file from text
    output_file = File.from_text(
        text=text_input,
        filename="output.txt",
        mime_type="text/plain"
    )

    # Or from bytes
    output_file = File.from_bytes(
        content=b"...",
        filename="output.pdf",
        mime_type="application/pdf"
    )

    return {"output_file": output_file}

Testing Custom Nodes

Unit Tests

import pytest
from spotflow.nodes.test_utils import create_test_context

@pytest.mark.asyncio
async def test_my_custom_node():
    # Create test context
    ctx = await create_test_context()

    # Create node instance
    node = MyCustomNode(config=MyCustomNodeConfig(api_key="test"))

    # Execute node
    result = await node.call(ctx, input_text="hello")

    # Assert results
    assert result["output_text"] == "HELLO"

Integration Tests

from spotflow.flow.runner import online_runner
from spotflow.models import WorkflowDefinition

@pytest.mark.asyncio
async def test_node_in_workflow():
    # Create workflow with your node
    workflow_def = WorkflowDefinition(
        nodes=[
            {"id": "input", "type": "input", ...},
            {"id": "custom", "type": "my_custom_node", ...},
            {"id": "output", "type": "output", ...}
        ],
        edges=[...]
    )

    # Execute workflow
    result = await online_runner(
        workflow=workflow_def,
        inputs={"input": "test"},
        context=ctx
    )

    # Verify results
    assert result["output"] == "EXPECTED"

Registering Nodes

Register your custom node with the node registry:
from spotflow.registry import get_registry

registry = get_registry()
registry.nodes.register(MyCustomNode)
For plugin-based distribution:
# In your plugin's __init__.py
def register_plugin():
    from spotflow.registry import get_registry
    from .nodes import MyCustomNode

    registry = get_registry()
    registry.nodes.register(MyCustomNode)

Best Practices

Design

Single Responsibility: Each node should do one thing well Composability: Design nodes to work together via connections Clear Naming: Use descriptive names for nodes, inputs, and outputs Consistent Style: Follow existing node conventions

Performance

Async I/O: Use async for network and database operations Batch Operations: Process batches efficiently when possible Resource Limits: Set appropriate timeouts for long operations Memory Management: Clean up large objects after use

Error Handling

Descriptive Errors: Provide clear error messages Validation: Validate inputs early Graceful Degradation: Return sensible defaults when possible Logging: Log errors with context for debugging

Security

Input Validation: Validate and sanitize all inputs Credential Handling: Never log or expose credentials API Rate Limits: Respect external API rate limits Dependency Security: Keep dependencies updated

Advanced Topics

Progress Updates

Report progress for long-running operations:
async def call(self, ctx, items: list[str]) -> dict:
    results = []
    total = len(items)

    for i, item in enumerate(items):
        result = await process(item)
        results.append(result)

        # Update progress (0.0 to 1.0)
        await ctx.update_progress((i + 1) / total)

    return {"results": results}

Streaming Outputs

Stream outputs for real-time updates:
async def call(self, ctx, prompt: str) -> dict:
    full_response = ""

    async for chunk in llm_stream(prompt):
        full_response += chunk
        # Stream to UI
        await ctx.stream_output("response", chunk)

    return {"response": full_response}

Memory Nodes

Access persistent memory:
# Write to memory
await ctx.write_memory("key", "value", scope="workflow")

# Read from memory
value = await ctx.read_memory("key", scope="workflow")

Example: Complete Custom Node

from pydantic import BaseModel, Field
from spotflow.nodes.base import BaseNode, NodeCategory
from spotflow.nodes.data_types import Connector, TypeDefinition
from spotflow.nodes.config_fields import ConfigText, ConfigToggle
from spotflow.nodes.exceptions import NodeExecutionError
from spotflow.integrations.credentials import ExecutionContext

class WeatherNodeConfig(BaseModel):
    api_key: str = Field(
        title="API Key",
        description="OpenWeatherMap API key",
        json_schema_extra=ConfigText()
    )

    use_celsius: bool = Field(
        title="Use Celsius",
        description="Temperature in Celsius instead of Fahrenheit",
        default=True,
        json_schema_extra=ConfigToggle()
    )

class WeatherNode(BaseNode[WeatherNodeConfig]):
    node_name = "weather_node"
    title = "Get Weather"
    category = NodeCategory.DATA
    color = "#4A90E2"
    image = "https://example.com/weather-icon.png"
    visible = True

    inputs = [
        Connector(
            key="city",
            label="City",
            type_=TypeDefinition.text(),
            required=True
        )
    ]

    outputs = [
        Connector(
            key="temperature",
            label="Temperature",
            type_=TypeDefinition.number()
        ),
        Connector(
            key="description",
            label="Description",
            type_=TypeDefinition.text()
        )
    ]

    def calculate_timeout(self, ctx, **inputs) -> int:
        return 30  # 30 seconds for API call

    async def call(
        self,
        ctx: ExecutionContext,
        city: str
    ) -> dict[str, float | str]:
        import httpx

        if not city:
            raise NodeExecutionError("City name is required")

        api_key = self.config.api_key
        units = "metric" if self.config.use_celsius else "imperial"

        try:
            async with httpx.AsyncClient() as client:
                response = await client.get(
                    f"https://api.openweathermap.org/data/2.5/weather",
                    params={
                        "q": city,
                        "appid": api_key,
                        "units": units
                    }
                )
                response.raise_for_status()
                data = response.json()

            return {
                "temperature": data["main"]["temp"],
                "description": data["weather"][0]["description"]
            }

        except httpx.HTTPError as e:
            raise NodeExecutionError(f"Weather API error: {e}") from e

# Register the node
from spotflow.registry import get_registry
get_registry().nodes.register(WeatherNode)

Building custom nodes extends Noxus with unlimited possibilities. Start with simple nodes and gradually add complexity as you master the patterns.

Node Configuration Guide

Deep dive into configuration field types and dynamic configuration