> ## Documentation Index
> Fetch the complete documentation index at: https://docs.noxus.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Creating Custom Nodes

> Extend Noxus by building custom workflow nodes

Custom nodes allow you to extend Noxus with specialized functionality tailored to your specific needs. This guide covers everything you need to know to build, test, and deploy custom nodes.

## Node Architecture

### BaseNode Class

All nodes inherit from `BaseNode[ConfigType]`, a generic base class that provides the node lifecycle and interface.

```python theme={null}
from spotflow.nodes.base import BaseNode, NodeCategory
from spotflow.nodes.data_types import Connector, TypeDefinition
from pydantic import BaseModel

class MyCustomNode(BaseNode["MyCustomNodeConfig"]):
    # Node metadata
    node_name = "my_custom_node"
    title = "My Custom Node"
    category = NodeCategory.DATA
    color = "#4A90E2"
    image = "https://your-icon-url.com/icon.png"
    visible = True  # Show in node palette

    # Input/output connectors
    inputs = [
        Connector(
            key="input_text",
            label="Input Text",
            type_=TypeDefinition.text()
        )
    ]

    outputs = [
        Connector(
            key="output_text",
            label="Output Text",
            type_=TypeDefinition.text()
        )
    ]

    # Main execution method
    async def call(
        self,
        ctx: ExecutionContext,
        input_text: str
    ) -> dict[str, str]:
        # Your logic here
        result = input_text.upper()
        return {"output_text": result}
```

### Node Metadata

**node\_name** (str, required)

* Unique identifier for the node type
* Use snake\_case convention
* Must be globally unique across all nodes

**title** (str, required)

* Display name in UI
* Use Title Case
* Keep concise (2-4 words)

**category** (NodeCategory, required)

* Groups nodes in palette
* Options: `AIText`, `Agent`, `Logic`, `Data`, `Integration`, `InputOutput`, `Utility`

**color** (str, required)

* Hex color code for node appearance
* Use brand colors or category-standard colors

**image** (str, required)

* Icon URL (PNG/SVG)
* Displayed in node palette and on canvas
* Recommended size: 48x48px

**visible** (bool, default: True)

* Whether to show in node palette
* Set to False for deprecated or internal nodes

## Defining Inputs and Outputs

### Connector Types

**Single-Value Connector**:

```python theme={null}
from spotflow.nodes.data_types import Connector, TypeDefinition

inputs = [
    Connector(
        key="text_input",
        label="Text Input",
        type_=TypeDefinition.text(),
        required=True  # Must be connected or have value
    )
]
```

**Variable Connector** (multiple named inputs/outputs):

```python theme={null}
from spotflow.nodes.data_types import VariableConnector

inputs = [
    VariableConnector(
        key="variables",
        label="Variables",
        type_=TypeDefinition.text()
    )
]

# User can add multiple inputs: var1, var2, var3, etc.
# Accessed in call() as: variables: dict[str, str]
```

**Variable Type-Size Connector** (different types):

```python theme={null}
from spotflow.nodes.data_types import VariableTypeSizeConnector

inputs = [
    VariableTypeSizeConnector(
        key="inputs",
        label="Inputs"
    )
]

# User can add inputs with different types
# Accessed as: inputs: dict[str, Any]
```

### Data Types

Noxus supports rich type definitions:

```python theme={null}
# Basic types
TypeDefinition.text()            # String
TypeDefinition.number()          # Float or int
TypeDefinition.boolean()         # True/False

# File types
TypeDefinition.file()            # Any file
TypeDefinition.image()           # Image file
TypeDefinition.audio()           # Audio file

# Structured types
TypeDefinition.json()            # JSON object

# Lists
TypeDefinition.text(is_list=True)    # List of strings
TypeDefinition.file(is_list=True)    # List of files
```

### Optional Inputs

```python theme={null}
inputs = [
    Connector(
        key="optional_param",
        label="Optional Parameter",
        type_=TypeDefinition.text(),
        required=False  # Can be left unconnected
    )
]

# In call(), check if provided:
async def call(self, ctx, optional_param: str | None = None):
    if optional_param:
        # Use parameter
        pass
    else:
        # Use default behavior
        pass
```

## Node Configuration

Configuration fields allow users to customize node behavior without connections.

### Configuration Schema

Define a Pydantic model for configuration:

```python theme={null}
from pydantic import BaseModel, Field
from spotflow.nodes.config_fields import (
    ConfigText,
    ConfigBigText,
    ConfigSelect,
    ConfigToggle,
    ConfigNumberSlider
)

class MyNodeConfig(BaseModel):
    api_key: str = Field(
        title="API Key",
        description="Your API key for the service",
        json_schema_extra=ConfigText()
    )

    mode: str = Field(
        title="Mode",
        description="Processing mode",
        default="fast",
        json_schema_extra=ConfigSelect(
            options=["fast", "balanced", "quality"]
        )
    )

    temperature: float = Field(
        title="Temperature",
        description="Randomness in generation",
        default=0.7,
        json_schema_extra=ConfigNumberSlider(
            min=0.0,
            max=2.0,
            step=0.1
        )
    )

    enable_cache: bool = Field(
        title="Enable Caching",
        description="Cache results for faster retrieval",
        default=True,
        json_schema_extra=ConfigToggle()
    )
```

### Configuration Field Types

**ConfigText**: Single-line text input
**ConfigBigText**: Multi-line textarea
**ConfigRichTextVariables**: Rich text editor with variable insertion
**ConfigSelect**: Dropdown selection
**ConfigMultiSelect**: Multi-select dropdown
**ConfigToggle**: Boolean switch
**ConfigNumberSlider**: Numeric slider
**ConfigDictList**: Key-value pair list
**ConfigJsonSchemaBuilder**: JSON schema designer
**ConfigModelSelect**: LLM model picker
**ConfigToolsSelect**: Agent tool selector

### Dynamic Configuration

Generate configuration options dynamically:

```python theme={null}
@classmethod
def get_config(
    cls,
    ctx: ExecutionContext,
    config: "MyNodeConfig"
) -> type["MyNodeConfig"]:
    # Fetch options from database/API
    async with ctx.db() as db:
        accounts = await db.execute(
            select(Account).filter_by(user_id=ctx.user.id)
        )
        account_options = [acc.name for acc in accounts]

    # Update config schema
    class DynamicConfig(MyNodeConfig):
        account: str = Field(
            title="Account",
            json_schema_extra=ConfigSelect(options=account_options)
        )

    return DynamicConfig
```

## Implementing Node Logic

### The call() Method

The `call()` method is where your node's logic executes:

```python theme={null}
async def call(
    self,
    ctx: ExecutionContext,
    # Input parameters match connector keys
    input_text: str,
    number_input: float,
    optional_file: File | None = None
) -> dict[str, Any]:
    """
    Node execution logic.

    Args:
        ctx: Execution context with access to DB, Redis, credentials, etc.
        input_text: Text from input connector
        number_input: Number from input connector
        optional_file: Optional file input

    Returns:
        Dictionary mapping output connector keys to values
    """

    # Access configuration
    api_key = self.config.api_key
    mode = self.config.mode

    # Your logic here
    result = await process_data(input_text, mode, api_key)

    # Return outputs
    return {
        "output_text": result.text,
        "output_number": result.score
    }
```

### Sync vs Async

Nodes can be synchronous or asynchronous:

**Async (Recommended)**:

```python theme={null}
async def call(self, ctx, input_text: str) -> dict:
    result = await async_api_call(input_text)
    return {"output": result}
```

**Sync**:

```python theme={null}
def call(self, ctx, input_text: str) -> dict:
    result = sync_processing(input_text)
    return {"output": result}
```

Use async for:

* Database queries
* External API calls
* I/O operations

Use sync for:

* Pure computation
* Simple transformations

### Execution Context

The `ExecutionContext` provides access to platform resources:

**Database Access**:

```python theme={null}
async with ctx.db() as db:
    user = await db.get(User, ctx.user.id)
    # Perform database operations
```

**Redis Access**:

```python theme={null}
redis = ctx.redis
await redis.set("key", "value")
value = await redis.get("key")
```

**Credentials**:

```python theme={null}
# Access integration credentials
credentials = await ctx.get_credentials(integration_name="google")
access_token = credentials.access_token
```

**LLM Access**:

```python theme={null}
# Use LLM providers
llms = ctx.llms()
response = await llms.generate(
    model="gpt-4o",
    prompt="Hello, world!"
)
```

**Embeddings**:

```python theme={null}
embeddings = ctx.embeddings()
vectors = await embeddings.embed(["text1", "text2"])
```

**User/Group Info**:

```python theme={null}
user = ctx.user  # Current user
group = ctx.group  # Current workspace
tenant = ctx.group.tenant  # Organization
api_key = ctx.api_key  # If called via API
```

**Fingerprint** (Run Metadata):

```python theme={null}
fingerprint = ctx.get_fingerprint()
# Contains: user_id, group_id, run_id, etc.
```

## Error Handling

### Raising Errors

Raise exceptions to signal errors:

```python theme={null}
from spotflow.nodes.exceptions import NodeExecutionError

async def call(self, ctx, input_text: str) -> dict:
    if not input_text:
        raise NodeExecutionError("Input text cannot be empty")

    try:
        result = await external_api(input_text)
    except APIException as e:
        raise NodeExecutionError(f"API call failed: {e}") from e

    return {"output": result}
```

### Continue on Error

Users can configure nodes to continue on error. Your node should return default values:

```python theme={null}
async def call(self, ctx, input_text: str) -> dict:
    try:
        result = await risky_operation(input_text)
        return {"output": result}
    except Exception as e:
        # If continue-on-error is enabled, this returns default
        return {"output": ""}  # Empty string as default
```

## Timeout Configuration

Nodes can specify dynamic timeouts:

```python theme={null}
def calculate_timeout(
    self,
    ctx: ExecutionContext,
    **inputs
) -> int:
    """
    Calculate timeout in seconds based on inputs.

    Returns:
        Timeout in seconds
    """
    # Example: Longer timeout for larger files
    file_input = inputs.get("file_input")
    if file_input:
        file_size_mb = file_input.size / (1024 * 1024)
        return int(60 + file_size_mb * 2)  # 60s + 2s per MB

    return 300  # Default 5 minutes
```

## List Handling

Nodes automatically handle list iteration when a list output connects to a non-list input.

**Option 1: Non-List Input (Automatic Iteration)**:

```python theme={null}
inputs = [
    Connector(
        key="text_input",
        label="Text Input",
        type_=TypeDefinition.text()  # NOT a list
    )
]

# When list connects here, node executes once per item
async def call(self, ctx, text_input: str) -> dict:
    # Receives single string, even if list upstream
    result = text_input.upper()
    return {"output": result}
```

**Option 2: List Input (Processes Entire List)**:

```python theme={null}
inputs = [
    Connector(
        key="text_list",
        label="Text List",
        type_=TypeDefinition.text(is_list=True)  # List type
    )
]

# Node receives entire list
async def call(self, ctx, text_list: list[str]) -> dict:
    # Process all items together
    results = [t.upper() for t in text_list]
    return {"output_list": results}
```

## File Handling

### Reading Files

```python theme={null}
from spotflow.models import File

async def call(self, ctx, file_input: File) -> dict:
    # Read file contents
    content = await file_input.read_bytes()

    # Or get file path
    file_path = file_input.path

    # Access metadata
    filename = file_input.filename
    mime_type = file_input.mime_type
    size = file_input.size

    return {"output": process(content)}
```

### Creating Files

```python theme={null}
from spotflow.models import File

async def call(self, ctx, text_input: str) -> dict:
    # Create file from text
    output_file = File.from_text(
        text=text_input,
        filename="output.txt",
        mime_type="text/plain"
    )

    # Or from bytes
    output_file = File.from_bytes(
        content=b"...",
        filename="output.pdf",
        mime_type="application/pdf"
    )

    return {"output_file": output_file}
```

## Testing Custom Nodes

### Unit Tests

```python theme={null}
import pytest
from spotflow.nodes.test_utils import create_test_context

@pytest.mark.asyncio
async def test_my_custom_node():
    # Create test context
    ctx = await create_test_context()

    # Create node instance
    node = MyCustomNode(config=MyCustomNodeConfig(api_key="test"))

    # Execute node
    result = await node.call(ctx, input_text="hello")

    # Assert results
    assert result["output_text"] == "HELLO"
```

### Integration Tests

```python theme={null}
from spotflow.flow.runner import online_runner
from spotflow.models import WorkflowDefinition

@pytest.mark.asyncio
async def test_node_in_workflow():
    # Create workflow with your node
    workflow_def = WorkflowDefinition(
        nodes=[
            {"id": "input", "type": "input", ...},
            {"id": "custom", "type": "my_custom_node", ...},
            {"id": "output", "type": "output", ...}
        ],
        edges=[...]
    )

    # Execute workflow
    result = await online_runner(
        workflow=workflow_def,
        inputs={"input": "test"},
        context=ctx
    )

    # Verify results
    assert result["output"] == "EXPECTED"
```

## Registering Nodes

Register your custom node with the node registry:

```python theme={null}
from spotflow.registry import get_registry

registry = get_registry()
registry.nodes.register(MyCustomNode)
```

For plugin-based distribution:

```python theme={null}
# In your plugin's __init__.py
def register_plugin():
    from spotflow.registry import get_registry
    from .nodes import MyCustomNode

    registry = get_registry()
    registry.nodes.register(MyCustomNode)
```

## Best Practices

### Design

**Single Responsibility**: Each node should do one thing well
**Composability**: Design nodes to work together via connections
**Clear Naming**: Use descriptive names for nodes, inputs, and outputs
**Consistent Style**: Follow existing node conventions

### Performance

**Async I/O**: Use async for network and database operations
**Batch Operations**: Process batches efficiently when possible
**Resource Limits**: Set appropriate timeouts for long operations
**Memory Management**: Clean up large objects after use

### Error Handling

**Descriptive Errors**: Provide clear error messages
**Validation**: Validate inputs early
**Graceful Degradation**: Return sensible defaults when possible
**Logging**: Log errors with context for debugging

### Security

**Input Validation**: Validate and sanitize all inputs
**Credential Handling**: Never log or expose credentials
**API Rate Limits**: Respect external API rate limits
**Dependency Security**: Keep dependencies updated

## Advanced Topics

### Progress Updates

Report progress for long-running operations:

```python theme={null}
async def call(self, ctx, items: list[str]) -> dict:
    results = []
    total = len(items)

    for i, item in enumerate(items):
        result = await process(item)
        results.append(result)

        # Update progress (0.0 to 1.0)
        await ctx.update_progress((i + 1) / total)

    return {"results": results}
```

### Streaming Outputs

Stream outputs for real-time updates:

```python theme={null}
async def call(self, ctx, prompt: str) -> dict:
    full_response = ""

    async for chunk in llm_stream(prompt):
        full_response += chunk
        # Stream to UI
        await ctx.stream_output("response", chunk)

    return {"response": full_response}
```

### Memory Nodes

Access persistent memory:

```python theme={null}
# Write to memory
await ctx.write_memory("key", "value", scope="workflow")

# Read from memory
value = await ctx.read_memory("key", scope="workflow")
```

## Example: Complete Custom Node

```python theme={null}
from pydantic import BaseModel, Field
from spotflow.nodes.base import BaseNode, NodeCategory
from spotflow.nodes.data_types import Connector, TypeDefinition
from spotflow.nodes.config_fields import ConfigText, ConfigToggle
from spotflow.nodes.exceptions import NodeExecutionError
from spotflow.integrations.credentials import ExecutionContext

class WeatherNodeConfig(BaseModel):
    api_key: str = Field(
        title="API Key",
        description="OpenWeatherMap API key",
        json_schema_extra=ConfigText()
    )

    use_celsius: bool = Field(
        title="Use Celsius",
        description="Temperature in Celsius instead of Fahrenheit",
        default=True,
        json_schema_extra=ConfigToggle()
    )

class WeatherNode(BaseNode[WeatherNodeConfig]):
    node_name = "weather_node"
    title = "Get Weather"
    category = NodeCategory.DATA
    color = "#4A90E2"
    image = "https://example.com/weather-icon.png"
    visible = True

    inputs = [
        Connector(
            key="city",
            label="City",
            type_=TypeDefinition.text(),
            required=True
        )
    ]

    outputs = [
        Connector(
            key="temperature",
            label="Temperature",
            type_=TypeDefinition.number()
        ),
        Connector(
            key="description",
            label="Description",
            type_=TypeDefinition.text()
        )
    ]

    def calculate_timeout(self, ctx, **inputs) -> int:
        return 30  # 30 seconds for API call

    async def call(
        self,
        ctx: ExecutionContext,
        city: str
    ) -> dict[str, float | str]:
        import httpx

        if not city:
            raise NodeExecutionError("City name is required")

        api_key = self.config.api_key
        units = "metric" if self.config.use_celsius else "imperial"

        try:
            async with httpx.AsyncClient() as client:
                response = await client.get(
                    f"https://api.openweathermap.org/data/2.5/weather",
                    params={
                        "q": city,
                        "appid": api_key,
                        "units": units
                    }
                )
                response.raise_for_status()
                data = response.json()

            return {
                "temperature": data["main"]["temp"],
                "description": data["weather"][0]["description"]
            }

        except httpx.HTTPError as e:
            raise NodeExecutionError(f"Weather API error: {e}") from e

# Register the node
from spotflow.registry import get_registry
get_registry().nodes.register(WeatherNode)
```

***

Building custom nodes extends Noxus with unlimited possibilities. Start with simple nodes and gradually add complexity as you master the patterns.

<Card title="Node Configuration Guide" icon="sliders" href="/developers/nodes/node-configuration">
  Deep dive into configuration field types and dynamic configuration
</Card>
