Documentation Index Fetch the complete documentation index at: https://docs.noxus.ai/llms.txt
Use this file to discover all available pages before exploring further.
Custom nodes allow you to extend Noxus with specialized functionality tailored to your specific needs. This guide covers everything you need to know to build, test, and deploy custom nodes.
Node Architecture
BaseNode Class
All nodes inherit from BaseNode[ConfigType], a generic base class that provides the node lifecycle and interface.
from spotflow.nodes.base import BaseNode, NodeCategory
from spotflow.nodes.data_types import Connector, TypeDefinition
from pydantic import BaseModel
class MyCustomNode (BaseNode[ "MyCustomNodeConfig" ]):
# Node metadata
node_name = "my_custom_node"
title = "My Custom Node"
category = NodeCategory. DATA
color = "#4A90E2"
image = "https://your-icon-url.com/icon.png"
visible = True # Show in node palette
# Input/output connectors
inputs = [
Connector(
key = "input_text" ,
label = "Input Text" ,
type_ = TypeDefinition.text()
)
]
outputs = [
Connector(
key = "output_text" ,
label = "Output Text" ,
type_ = TypeDefinition.text()
)
]
# Main execution method
async def call (
self ,
ctx : ExecutionContext,
input_text : str
) -> dict[ str , str ]:
# Your logic here
result = input_text.upper()
return { "output_text" : result}
node_name (str, required)
Unique identifier for the node type
Use snake_case convention
Must be globally unique across all nodes
title (str, required)
Display name in UI
Use Title Case
Keep concise (2-4 words)
category (NodeCategory, required)
Groups nodes in palette
Options: AIText, Agent, Logic, Data, Integration, InputOutput, Utility
color (str, required)
Hex color code for node appearance
Use brand colors or category-standard colors
image (str, required)
Icon URL (PNG/SVG)
Displayed in node palette and on canvas
Recommended size: 48x48px
visible (bool, default: True)
Whether to show in node palette
Set to False for deprecated or internal nodes
Connector Types
Single-Value Connector :
from spotflow.nodes.data_types import Connector, TypeDefinition
inputs = [
Connector(
key = "text_input" ,
label = "Text Input" ,
type_ = TypeDefinition.text(),
required = True # Must be connected or have value
)
]
Variable Connector (multiple named inputs/outputs):
from spotflow.nodes.data_types import VariableConnector
inputs = [
VariableConnector(
key = "variables" ,
label = "Variables" ,
type_ = TypeDefinition.text()
)
]
# User can add multiple inputs: var1, var2, var3, etc.
# Accessed in call() as: variables: dict[str, str]
Variable Type-Size Connector (different types):
from spotflow.nodes.data_types import VariableTypeSizeConnector
inputs = [
VariableTypeSizeConnector(
key = "inputs" ,
label = "Inputs"
)
]
# User can add inputs with different types
# Accessed as: inputs: dict[str, Any]
Data Types
Noxus supports rich type definitions:
# Basic types
TypeDefinition.text() # String
TypeDefinition.number() # Float or int
TypeDefinition.boolean() # True/False
# File types
TypeDefinition.file() # Any file
TypeDefinition.image() # Image file
TypeDefinition.audio() # Audio file
# Structured types
TypeDefinition.json() # JSON object
# Lists
TypeDefinition.text( is_list = True ) # List of strings
TypeDefinition.file( is_list = True ) # List of files
inputs = [
Connector(
key = "optional_param" ,
label = "Optional Parameter" ,
type_ = TypeDefinition.text(),
required = False # Can be left unconnected
)
]
# In call(), check if provided:
async def call ( self , ctx , optional_param : str | None = None ):
if optional_param:
# Use parameter
pass
else :
# Use default behavior
pass
Node Configuration
Configuration fields allow users to customize node behavior without connections.
Configuration Schema
Define a Pydantic model for configuration:
from pydantic import BaseModel, Field
from spotflow.nodes.config_fields import (
ConfigText,
ConfigBigText,
ConfigSelect,
ConfigToggle,
ConfigNumberSlider
)
class MyNodeConfig ( BaseModel ):
api_key: str = Field(
title = "API Key" ,
description = "Your API key for the service" ,
json_schema_extra = ConfigText()
)
mode: str = Field(
title = "Mode" ,
description = "Processing mode" ,
default = "fast" ,
json_schema_extra = ConfigSelect(
options = [ "fast" , "balanced" , "quality" ]
)
)
temperature: float = Field(
title = "Temperature" ,
description = "Randomness in generation" ,
default = 0.7 ,
json_schema_extra = ConfigNumberSlider(
min = 0.0 ,
max = 2.0 ,
step = 0.1
)
)
enable_cache: bool = Field(
title = "Enable Caching" ,
description = "Cache results for faster retrieval" ,
default = True ,
json_schema_extra = ConfigToggle()
)
Configuration Field Types
ConfigText : Single-line text input
ConfigBigText : Multi-line textarea
ConfigRichTextVariables : Rich text editor with variable insertion
ConfigSelect : Dropdown selection
ConfigMultiSelect : Multi-select dropdown
ConfigToggle : Boolean switch
ConfigNumberSlider : Numeric slider
ConfigDictList : Key-value pair list
ConfigJsonSchemaBuilder : JSON schema designer
ConfigModelSelect : LLM model picker
ConfigToolsSelect : Agent tool selector
Dynamic Configuration
Generate configuration options dynamically:
@ classmethod
def get_config (
cls ,
ctx : ExecutionContext,
config : "MyNodeConfig"
) -> type[ "MyNodeConfig" ]:
# Fetch options from database/API
async with ctx.db() as db:
accounts = await db.execute(
select(Account).filter_by( user_id = ctx.user.id)
)
account_options = [acc.name for acc in accounts]
# Update config schema
class DynamicConfig ( MyNodeConfig ):
account: str = Field(
title = "Account" ,
json_schema_extra = ConfigSelect( options = account_options)
)
return DynamicConfig
Implementing Node Logic
The call() Method
The call() method is where your node’s logic executes:
async def call (
self ,
ctx : ExecutionContext,
# Input parameters match connector keys
input_text : str ,
number_input : float ,
optional_file : File | None = None
) -> dict[ str , Any]:
"""
Node execution logic.
Args:
ctx: Execution context with access to DB, Redis, credentials, etc.
input_text: Text from input connector
number_input: Number from input connector
optional_file: Optional file input
Returns:
Dictionary mapping output connector keys to values
"""
# Access configuration
api_key = self .config.api_key
mode = self .config.mode
# Your logic here
result = await process_data(input_text, mode, api_key)
# Return outputs
return {
"output_text" : result.text,
"output_number" : result.score
}
Sync vs Async
Nodes can be synchronous or asynchronous:
Async (Recommended) :
async def call ( self , ctx , input_text : str ) -> dict :
result = await async_api_call(input_text)
return { "output" : result}
Sync :
def call ( self , ctx , input_text : str ) -> dict :
result = sync_processing(input_text)
return { "output" : result}
Use async for:
Database queries
External API calls
I/O operations
Use sync for:
Pure computation
Simple transformations
Execution Context
The ExecutionContext provides access to platform resources:
Database Access :
async with ctx.db() as db:
user = await db.get(User, ctx.user.id)
# Perform database operations
Redis Access :
redis = ctx.redis
await redis.set( "key" , "value" )
value = await redis.get( "key" )
Credentials :
# Access integration credentials
credentials = await ctx.get_credentials( integration_name = "google" )
access_token = credentials.access_token
LLM Access :
# Use LLM providers
llms = ctx.llms()
response = await llms.generate(
model = "gpt-4o" ,
prompt = "Hello, world!"
)
Embeddings :
embeddings = ctx.embeddings()
vectors = await embeddings.embed([ "text1" , "text2" ])
User/Group Info :
user = ctx.user # Current user
group = ctx.group # Current workspace
tenant = ctx.group.tenant # Organization
api_key = ctx.api_key # If called via API
Fingerprint (Run Metadata):
fingerprint = ctx.get_fingerprint()
# Contains: user_id, group_id, run_id, etc.
Error Handling
Raising Errors
Raise exceptions to signal errors:
from spotflow.nodes.exceptions import NodeExecutionError
async def call ( self , ctx , input_text : str ) -> dict :
if not input_text:
raise NodeExecutionError( "Input text cannot be empty" )
try :
result = await external_api(input_text)
except APIException as e:
raise NodeExecutionError( f "API call failed: { e } " ) from e
return { "output" : result}
Continue on Error
Users can configure nodes to continue on error. Your node should return default values:
async def call ( self , ctx , input_text : str ) -> dict :
try :
result = await risky_operation(input_text)
return { "output" : result}
except Exception as e:
# If continue-on-error is enabled, this returns default
return { "output" : "" } # Empty string as default
Timeout Configuration
Nodes can specify dynamic timeouts:
def calculate_timeout (
self ,
ctx : ExecutionContext,
** inputs
) -> int :
"""
Calculate timeout in seconds based on inputs.
Returns:
Timeout in seconds
"""
# Example: Longer timeout for larger files
file_input = inputs.get( "file_input" )
if file_input:
file_size_mb = file_input.size / ( 1024 * 1024 )
return int ( 60 + file_size_mb * 2 ) # 60s + 2s per MB
return 300 # Default 5 minutes
List Handling
Nodes automatically handle list iteration when a list output connects to a non-list input.
Option 1: Non-List Input (Automatic Iteration) :
inputs = [
Connector(
key = "text_input" ,
label = "Text Input" ,
type_ = TypeDefinition.text() # NOT a list
)
]
# When list connects here, node executes once per item
async def call ( self , ctx , text_input : str ) -> dict :
# Receives single string, even if list upstream
result = text_input.upper()
return { "output" : result}
Option 2: List Input (Processes Entire List) :
inputs = [
Connector(
key = "text_list" ,
label = "Text List" ,
type_ = TypeDefinition.text( is_list = True ) # List type
)
]
# Node receives entire list
async def call ( self , ctx , text_list : list[ str ]) -> dict :
# Process all items together
results = [t.upper() for t in text_list]
return { "output_list" : results}
File Handling
Reading Files
from spotflow.models import File
async def call ( self , ctx , file_input : File) -> dict :
# Read file contents
content = await file_input.read_bytes()
# Or get file path
file_path = file_input.path
# Access metadata
filename = file_input.filename
mime_type = file_input.mime_type
size = file_input.size
return { "output" : process(content)}
Creating Files
from spotflow.models import File
async def call ( self , ctx , text_input : str ) -> dict :
# Create file from text
output_file = File.from_text(
text = text_input,
filename = "output.txt" ,
mime_type = "text/plain"
)
# Or from bytes
output_file = File.from_bytes(
content = b "..." ,
filename = "output.pdf" ,
mime_type = "application/pdf"
)
return { "output_file" : output_file}
Testing Custom Nodes
Unit Tests
import pytest
from spotflow.nodes.test_utils import create_test_context
@pytest.mark.asyncio
async def test_my_custom_node ():
# Create test context
ctx = await create_test_context()
# Create node instance
node = MyCustomNode( config = MyCustomNodeConfig( api_key = "test" ))
# Execute node
result = await node.call(ctx, input_text = "hello" )
# Assert results
assert result[ "output_text" ] == "HELLO"
Integration Tests
from spotflow.flow.runner import online_runner
from spotflow.models import WorkflowDefinition
@pytest.mark.asyncio
async def test_node_in_workflow ():
# Create workflow with your node
workflow_def = WorkflowDefinition(
nodes = [
{ "id" : "input" , "type" : "input" , ... },
{ "id" : "custom" , "type" : "my_custom_node" , ... },
{ "id" : "output" , "type" : "output" , ... }
],
edges = [ ... ]
)
# Execute workflow
result = await online_runner(
workflow = workflow_def,
inputs = { "input" : "test" },
context = ctx
)
# Verify results
assert result[ "output" ] == "EXPECTED"
Registering Nodes
Register your custom node with the node registry:
from spotflow.registry import get_registry
registry = get_registry()
registry.nodes.register(MyCustomNode)
For plugin-based distribution:
# In your plugin's __init__.py
def register_plugin ():
from spotflow.registry import get_registry
from .nodes import MyCustomNode
registry = get_registry()
registry.nodes.register(MyCustomNode)
Best Practices
Design
Single Responsibility : Each node should do one thing well
Composability : Design nodes to work together via connections
Clear Naming : Use descriptive names for nodes, inputs, and outputs
Consistent Style : Follow existing node conventions
Async I/O : Use async for network and database operations
Batch Operations : Process batches efficiently when possible
Resource Limits : Set appropriate timeouts for long operations
Memory Management : Clean up large objects after use
Error Handling
Descriptive Errors : Provide clear error messages
Validation : Validate inputs early
Graceful Degradation : Return sensible defaults when possible
Logging : Log errors with context for debugging
Security
Input Validation : Validate and sanitize all inputs
Credential Handling : Never log or expose credentials
API Rate Limits : Respect external API rate limits
Dependency Security : Keep dependencies updated
Advanced Topics
Progress Updates
Report progress for long-running operations:
async def call ( self , ctx , items : list[ str ]) -> dict :
results = []
total = len (items)
for i, item in enumerate (items):
result = await process(item)
results.append(result)
# Update progress (0.0 to 1.0)
await ctx.update_progress((i + 1 ) / total)
return { "results" : results}
Streaming Outputs
Stream outputs for real-time updates:
async def call ( self , ctx , prompt : str ) -> dict :
full_response = ""
async for chunk in llm_stream(prompt):
full_response += chunk
# Stream to UI
await ctx.stream_output( "response" , chunk)
return { "response" : full_response}
Memory Nodes
Access persistent memory:
# Write to memory
await ctx.write_memory( "key" , "value" , scope = "workflow" )
# Read from memory
value = await ctx.read_memory( "key" , scope = "workflow" )
Example: Complete Custom Node
from pydantic import BaseModel, Field
from spotflow.nodes.base import BaseNode, NodeCategory
from spotflow.nodes.data_types import Connector, TypeDefinition
from spotflow.nodes.config_fields import ConfigText, ConfigToggle
from spotflow.nodes.exceptions import NodeExecutionError
from spotflow.integrations.credentials import ExecutionContext
class WeatherNodeConfig ( BaseModel ):
api_key: str = Field(
title = "API Key" ,
description = "OpenWeatherMap API key" ,
json_schema_extra = ConfigText()
)
use_celsius: bool = Field(
title = "Use Celsius" ,
description = "Temperature in Celsius instead of Fahrenheit" ,
default = True ,
json_schema_extra = ConfigToggle()
)
class WeatherNode (BaseNode[WeatherNodeConfig]):
node_name = "weather_node"
title = "Get Weather"
category = NodeCategory. DATA
color = "#4A90E2"
image = "https://example.com/weather-icon.png"
visible = True
inputs = [
Connector(
key = "city" ,
label = "City" ,
type_ = TypeDefinition.text(),
required = True
)
]
outputs = [
Connector(
key = "temperature" ,
label = "Temperature" ,
type_ = TypeDefinition.number()
),
Connector(
key = "description" ,
label = "Description" ,
type_ = TypeDefinition.text()
)
]
def calculate_timeout ( self , ctx , ** inputs ) -> int :
return 30 # 30 seconds for API call
async def call (
self ,
ctx : ExecutionContext,
city : str
) -> dict[ str , float | str ]:
import httpx
if not city:
raise NodeExecutionError( "City name is required" )
api_key = self .config.api_key
units = "metric" if self .config.use_celsius else "imperial"
try :
async with httpx.AsyncClient() as client:
response = await client.get(
f "https://api.openweathermap.org/data/2.5/weather" ,
params = {
"q" : city,
"appid" : api_key,
"units" : units
}
)
response.raise_for_status()
data = response.json()
return {
"temperature" : data[ "main" ][ "temp" ],
"description" : data[ "weather" ][ 0 ][ "description" ]
}
except httpx.HTTPError as e:
raise NodeExecutionError( f "Weather API error: { e } " ) from e
# Register the node
from spotflow.registry import get_registry
get_registry().nodes.register(WeatherNode)
Building custom nodes extends Noxus with unlimited possibilities. Start with simple nodes and gradually add complexity as you master the patterns.
Node Configuration Guide Deep dive into configuration field types and dynamic configuration