Skip to main content

Overview

Once you’ve built and saved a workflow, you can execute it with different inputs and monitor its progress. The Noxus SDK provides both synchronous and asynchronous methods for running workflows and handling results.

Basic Workflow Execution

Running a Workflow

from noxus_sdk.client import Client

client = Client(api_key="your_api_key_here")

# Get an existing workflow
workflow = client.workflows.get("workflow_id_here")

# Run with input data
run = workflow.run(body={
    "User Input": "What are the benefits of renewable energy?",
    "Analysis Type": "comprehensive"
})

# Wait for completion
result = run.wait(interval=2)  # Check every 2 seconds

print(f"Status: {result.status}")
print(f"Output: {result.output}")

Input Formats

The input format depends on how you configured your workflow nodes:
Use the label you assigned to input nodes:
# If you created an input node with label "User Question"
input_node = workflow_def.node("InputNode").config(
    label="User Question"
)

# Run with this input
run = workflow.run(body={
    "User Question": "How does photosynthesis work?"
})
Use the node’s unique ID:
# Get node ID from workflow definition
input_node = workflow_def.node("InputNode")
node_id = input_node.id

# Run with node ID
run = workflow.run(body={
    node_id: "Your input value here"
})
Use the format {node_id}::{input_name}:
# For input nodes, the default input name is "input"
run = workflow.run(body={
    f"{node_id}::input": "Your input value here"
})

Monitoring Execution

Synchronous Monitoring

Wait for workflow completion with custom intervals:
# Run workflow
run = workflow.run(body={"input": "test data"})

# Monitor with custom interval and timeout
try:
    result = run.wait(
        interval=5,      # Check every 5 seconds
        timeout=300      # Maximum wait time (5 minutes)
    )

    print(f"Completed in {result.execution_time}ms")
    print(f"Final output: {result.output}")

except TimeoutError:
    print("Workflow execution timed out")
    print(f"Current status: {run.status}")

Asynchronous Monitoring

For non-blocking execution monitoring:
import asyncio

async def monitor_workflow_async():
    client = Client(api_key="your_api_key_here")
    workflow = await client.workflows.aget("workflow_id")

    # Start workflow execution
    run = await workflow.arun(body={"input": "test data"})

    # Monitor asynchronously
    result = await run.a_wait(interval=2)

    return result

# Run async monitoring
result = asyncio.run(monitor_workflow_async())

Manual Status Checking

For more control over monitoring:
import time

# Start workflow
run = workflow.run(body={"input": "test data"})

# Manual monitoring loop
while True:
    # Refresh run status
    run = run.refresh()

    print(f"Status: {run.status}")

    if run.status in ["completed", "failed", "cancelled"]:
        break

    time.sleep(3)  # Wait 3 seconds before next check

# Handle final result
if run.status == "completed":
    print(f"Success! Output: {run.output}")
elif run.status == "failed":
    print(f"Failed: {run.error_message}")

Handling Different Input Types

Text Inputs

# Simple text input
run = workflow.run(body={
    "Question": "Explain quantum computing",
    "Context": "For a general audience"
})

File Inputs

# Method 1: Using a file URL
run = workflow.run(body={
    "Document": {
        "file_metadata": {"uri": "https://path/to/file.pdf"}
    }
})

# Method 2: Using base64-encoded file content (data URI format)
import base64

with open("document.pdf", "rb") as file:
    file_content = file.read()
    base64_content = base64.b64encode(file_content).decode("utf-8")
    data_uri = f"data:application/pdf;base64,{base64_content}"

run = workflow.run(body={
    "Document": {
        "file_metadata": {"uri": data_uri}
    }
})

Multiple Inputs

# Workflow with multiple input nodes
run = workflow.run(body={
    "Primary Text": "The main content to analyze",
    "Analysis Focus": "sentiment and themes",
    "Output Format": "bullet points",
    "Max Length": "300 words"
})

Fixed vs Dynamic Inputs

# Only provide values for dynamic inputs
# Fixed inputs (configured with fixed_value=True) are automatically used
run = workflow.run(body={
    "User Question": "What is machine learning?"
    # Fixed inputs like system prompts are handled automatically
})

# Override fixed inputs if needed (if the workflow allows it)
run = workflow.run(body={
    "User Question": "What is machine learning?",
    "System Prompt": "You are a technical expert. Be precise and detailed."
})

Result Handling

Understanding Run Results

result = run.wait()

# Basic result information
print(f"Run ID: {result.id}")
print(f"Status: {result.status}")
print(f"Started: {result.created_at}")
print(f"Completed: {result.updated_at}")
print(f"Execution time: {result.execution_time}ms")

# Output data
print(f"Final output: {result.output}")

# Error information (if failed)
if result.status == "failed":
    print(f"Error: {result.error_message}")
    print(f"Error details: {result.error_details}")

Processing Output Data

# Output is typically a string, but can be structured data
output = result.output

if isinstance(output, str):
    # Text output
    print(f"Generated text: {output}")
elif isinstance(output, dict):
    # Structured output
    for key, value in output.items():
        print(f"{key}: {value}")
elif isinstance(output, list):
    # List output
    for i, item in enumerate(output):
        print(f"Item {i}: {item}")

Saving Results

import json
from datetime import datetime

# Save result to file
result_data = {
    "workflow_id": workflow.id,
    "run_id": result.id,
    "timestamp": datetime.now().isoformat(),
    "status": result.status,
    "output": result.output,
    "execution_time": result.execution_time
}

with open(f"workflow_result_{result.id}.json", "w") as f:
    json.dump(result_data, f, indent=2)

print(f"Result saved to workflow_result_{result.id}.json")

Batch Processing

Running Multiple Workflows

import asyncio

async def run_multiple_workflows():
    client = Client(api_key="your_api_key_here")

    # Get workflows
    workflow1 = await client.workflows.aget("workflow_id_1")
    workflow2 = await client.workflows.aget("workflow_id_2")

    # Start multiple runs concurrently
    run1 = await workflow1.arun(body={"input": "data for workflow 1"})
    run2 = await workflow2.arun(body={"input": "data for workflow 2"})

    # Wait for all to complete
    result1, result2 = await asyncio.gather(
        run1.a_wait(),
        run2.a_wait()
    )

    return result1, result2

# Execute batch processing
results = asyncio.run(run_multiple_workflows())

Processing Multiple Inputs

async def process_multiple_inputs(workflow, inputs):
    """Process multiple inputs through the same workflow"""

    # Start all runs
    runs = []
    for input_data in inputs:
        run = await workflow.arun(body=input_data)
        runs.append(run)

    # Wait for all to complete
    results = []
    for run in runs:
        result = await run.a_wait()
        results.append(result)

    return results

# Usage
inputs = [
    {"Question": "What is AI?"},
    {"Question": "How does machine learning work?"},
    {"Question": "What are neural networks?"}
]

workflow = client.workflows.get("qa_workflow_id")
results = asyncio.run(process_multiple_inputs(workflow, inputs))

for i, result in enumerate(results):
    print(f"Answer {i+1}: {result.output}")

Error Handling

Handling Execution Errors

def run_workflow_safely(workflow, input_data):
    """Run workflow with comprehensive error handling"""

    try:
        # Start the workflow
        run = workflow.run(body=input_data)

        # Wait for completion with timeout
        result = run.wait(interval=2, timeout=600)  # 10 minute timeout

        if result.status == "completed":
            return {"success": True, "output": result.output}
        elif result.status == "failed":
            return {
                "success": False,
                "error": result.error_message,
                "details": result.error_details
            }
        else:
            return {"success": False, "error": f"Unexpected status: {result.status}"}

    except TimeoutError:
        return {"success": False, "error": "Workflow execution timed out"}
    except Exception as e:
        return {"success": False, "error": f"Execution error: {str(e)}"}

# Usage
result = run_workflow_safely(workflow, {"input": "test data"})

if result["success"]:
    print(f"Success: {result['output']}")
else:
    print(f"Error: {result['error']}")

Retry Logic

import time
import random

def run_workflow_with_retry(workflow, input_data, max_retries=3):
    """Run workflow with exponential backoff retry"""

    for attempt in range(max_retries):
        try:
            run = workflow.run(body=input_data)
            result = run.wait(interval=2, timeout=300)

            if result.status == "completed":
                return result
            elif result.status == "failed":
                if attempt == max_retries - 1:
                    raise Exception(f"Workflow failed: {result.error_message}")

                # Wait before retry with exponential backoff
                wait_time = (2 ** attempt) + random.uniform(0, 1)
                print(f"Attempt {attempt + 1} failed, retrying in {wait_time:.1f}s...")
                time.sleep(wait_time)

        except Exception as e:
            if attempt == max_retries - 1:
                raise

            wait_time = (2 ** attempt) + random.uniform(0, 1)
            print(f"Error on attempt {attempt + 1}: {e}")
            print(f"Retrying in {wait_time:.1f}s...")
            time.sleep(wait_time)

# Usage
try:
    result = run_workflow_with_retry(workflow, {"input": "test data"})
    print(f"Success after retries: {result.output}")
except Exception as e:
    print(f"Failed after all retries: {e}")

Performance Optimization

Efficient Polling

def adaptive_wait(run, initial_interval=1, max_interval=30, timeout=600):
    """Wait for workflow completion with adaptive polling"""

    start_time = time.time()
    interval = initial_interval

    while True:
        run = run.refresh()

        if run.status in ["completed", "failed", "cancelled"]:
            return run

        # Check timeout
        if time.time() - start_time > timeout:
            raise TimeoutError("Workflow execution timed out")

        # Adaptive interval - increase polling interval over time
        time.sleep(interval)
        interval = min(interval * 1.5, max_interval)

# Usage
run = workflow.run(body={"input": "test data"})
result = adaptive_wait(run)

Concurrent Execution with Limits

import asyncio
from asyncio import Semaphore

async def run_workflows_with_limit(workflows_and_inputs, max_concurrent=5):
    """Run multiple workflows with concurrency limit"""

    semaphore = Semaphore(max_concurrent)

    async def run_single(workflow, input_data):
        async with semaphore:
            run = await workflow.arun(body=input_data)
            return await run.a_wait()

    # Create tasks for all workflows
    tasks = [
        run_single(workflow, input_data)
        for workflow, input_data in workflows_and_inputs
    ]

    # Execute with concurrency limit
    results = await asyncio.gather(*tasks, return_exceptions=True)

    return results

# Usage
workflows_and_inputs = [
    (workflow1, {"input": "data1"}),
    (workflow2, {"input": "data2"}),
    (workflow3, {"input": "data3"}),
    # ... more workflows
]

results = asyncio.run(run_workflows_with_limit(workflows_and_inputs))

Best Practices

Validate inputs before running workflows:
def validate_workflow_input(input_data, required_fields):
    """Validate workflow input data"""
    missing_fields = []
    
    for field in required_fields:
        if field not in input_data or not input_data[field]:
            missing_fields.append(field)
    
    if missing_fields:
        raise ValueError(f"Missing required fields: {missing_fields}")
    
    return True

# Usage
required_fields = ["Question", "Context"]
validate_workflow_input(input_data, required_fields)
Manage resources efficiently:
# Use context managers for resource cleanup
async def process_workflows():
    client = Client(api_key="your_api_key")
    try:
        # Process workflows
        results = await run_multiple_workflows()
        return results
    finally:
        # Cleanup resources if needed
        pass
Implement comprehensive logging:
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def run_workflow_with_logging(workflow, input_data):
    logger.info(f"Starting workflow {workflow.id}")
    logger.info(f"Input data: {input_data}")
    
    start_time = time.time()
    run = workflow.run(body=input_data)
    
    logger.info(f"Workflow started, run ID: {run.id}")
    
    result = run.wait()
    execution_time = time.time() - start_time
    
    logger.info(f"Workflow completed in {execution_time:.2f}s")
    logger.info(f"Status: {result.status}")
    
    if result.status == "failed":
        logger.error(f"Workflow failed: {result.error_message}")
    
    return result

Next Steps

Node Types

Learn about all available node types and their capabilities

Workflow Examples

Explore complete workflow examples for common use cases

Advanced Patterns

Master advanced workflow design and execution patterns

API Reference

Detailed API reference for workflow execution