Skip to main content

Overview

Once you’ve built and saved a workflow, you can execute it with different inputs and monitor its progress. The Noxus SDK provides both synchronous and asynchronous methods for running workflows and handling results.

Basic Workflow Execution

Running a Workflow

from noxus_sdk.client import Client

client = Client(api_key="your_api_key_here")

# Get an existing workflow
workflow = client.workflows.get("workflow_id_here")

# Run with input data
run = workflow.run(body={
    "User Input": "What are the benefits of renewable energy?",
    "Analysis Type": "comprehensive"
})

# Wait for completion
result = run.wait(interval=2)  # Check every 2 seconds

print(f"Status: {result.status}")
print(f"Output: {result.output}")

Input Formats

The input format depends on how you configured your workflow nodes:
Use the label you assigned to input nodes:
# If you created an input node with label "User Question"
input_node = workflow_def.node("InputNode").config(
    label="User Question"
)

# Run with this input
run = workflow.run(body={
    "User Question": "How does photosynthesis work?"
})
Use the node’s unique ID:
# Get node ID from workflow definition
input_node = workflow_def.node("InputNode")
node_id = input_node.id

# Run with node ID
run = workflow.run(body={
    node_id: "Your input value here"
})
Use the format {node_id}::{input_name}:
# For input nodes, the default input name is "input"
run = workflow.run(body={
    f"{node_id}::input": "Your input value here"
})

Monitoring Execution

Synchronous Monitoring

Wait for workflow completion with custom intervals:
# Run workflow
run = workflow.run(body={"input": "test data"})

# Monitor with custom interval and timeout
try:
    result = run.wait(
        interval=5,      # Check every 5 seconds
        timeout=300      # Maximum wait time (5 minutes)
    )

    print(f"Completed in {result.execution_time}ms")
    print(f"Final output: {result.output}")

except TimeoutError:
    print("Workflow execution timed out")
    print(f"Current status: {run.status}")

Asynchronous Monitoring

For non-blocking execution monitoring:
import asyncio

async def monitor_workflow_async():
    client = Client(api_key="your_api_key_here")
    workflow = await client.workflows.aget("workflow_id")

    # Start workflow execution
    run = await workflow.arun(body={"input": "test data"})

    # Monitor asynchronously
    result = await run.a_wait(interval=2)

    return result

# Run async monitoring
result = asyncio.run(monitor_workflow_async())

Manual Status Checking

For more control over monitoring:
import time

# Start workflow
run = workflow.run(body={"input": "test data"})

# Manual monitoring loop
while True:
    # Refresh run status
    run = run.refresh()

    print(f"Status: {run.status}")

    if run.status in ["completed", "failed", "cancelled"]:
        break

    time.sleep(3)  # Wait 3 seconds before next check

# Handle final result
if run.status == "completed":
    print(f"Success! Output: {run.output}")
elif run.status == "failed":
    print(f"Failed: {run.error_message}")

Handling Different Input Types

Text Inputs

# Simple text input
run = workflow.run(body={
    "Question": "Explain quantum computing",
    "Context": "For a general audience"
})

File Inputs

import base64

# For file input nodes
with open("document.pdf", "rb") as file:
    file_content = base64.b64encode(file.read()).decode("utf-8")

run = workflow.run(body={
    "Document": {
        "name": "document.pdf",
        "content": file_content,
        "mime_type": "application/pdf"
    }
})

Multiple Inputs

# Workflow with multiple input nodes
run = workflow.run(body={
    "Primary Text": "The main content to analyze",
    "Analysis Focus": "sentiment and themes",
    "Output Format": "bullet points",
    "Max Length": "300 words"
})

Fixed vs Dynamic Inputs

# Only provide values for dynamic inputs
# Fixed inputs (configured with fixed_value=True) are automatically used
run = workflow.run(body={
    "User Question": "What is machine learning?"
    # Fixed inputs like system prompts are handled automatically
})

# Override fixed inputs if needed (if the workflow allows it)
run = workflow.run(body={
    "User Question": "What is machine learning?",
    "System Prompt": "You are a technical expert. Be precise and detailed."
})

Result Handling

Understanding Run Results

result = run.wait()

# Basic result information
print(f"Run ID: {result.id}")
print(f"Status: {result.status}")
print(f"Started: {result.created_at}")
print(f"Completed: {result.updated_at}")
print(f"Execution time: {result.execution_time}ms")

# Output data
print(f"Final output: {result.output}")

# Error information (if failed)
if result.status == "failed":
    print(f"Error: {result.error_message}")
    print(f"Error details: {result.error_details}")

Processing Output Data

# Output is typically a string, but can be structured data
output = result.output

if isinstance(output, str):
    # Text output
    print(f"Generated text: {output}")
elif isinstance(output, dict):
    # Structured output
    for key, value in output.items():
        print(f"{key}: {value}")
elif isinstance(output, list):
    # List output
    for i, item in enumerate(output):
        print(f"Item {i}: {item}")

Saving Results

import json
from datetime import datetime

# Save result to file
result_data = {
    "workflow_id": workflow.id,
    "run_id": result.id,
    "timestamp": datetime.now().isoformat(),
    "status": result.status,
    "output": result.output,
    "execution_time": result.execution_time
}

with open(f"workflow_result_{result.id}.json", "w") as f:
    json.dump(result_data, f, indent=2)

print(f"Result saved to workflow_result_{result.id}.json")

Batch Processing

Running Multiple Workflows

import asyncio

async def run_multiple_workflows():
    client = Client(api_key="your_api_key_here")

    # Get workflows
    workflow1 = await client.workflows.aget("workflow_id_1")
    workflow2 = await client.workflows.aget("workflow_id_2")

    # Start multiple runs concurrently
    run1 = await workflow1.arun(body={"input": "data for workflow 1"})
    run2 = await workflow2.arun(body={"input": "data for workflow 2"})

    # Wait for all to complete
    result1, result2 = await asyncio.gather(
        run1.a_wait(),
        run2.a_wait()
    )

    return result1, result2

# Execute batch processing
results = asyncio.run(run_multiple_workflows())

Processing Multiple Inputs

async def process_multiple_inputs(workflow, inputs):
    """Process multiple inputs through the same workflow"""

    # Start all runs
    runs = []
    for input_data in inputs:
        run = await workflow.arun(body=input_data)
        runs.append(run)

    # Wait for all to complete
    results = []
    for run in runs:
        result = await run.a_wait()
        results.append(result)

    return results

# Usage
inputs = [
    {"Question": "What is AI?"},
    {"Question": "How does machine learning work?"},
    {"Question": "What are neural networks?"}
]

workflow = client.workflows.get("qa_workflow_id")
results = asyncio.run(process_multiple_inputs(workflow, inputs))

for i, result in enumerate(results):
    print(f"Answer {i+1}: {result.output}")

Error Handling

Handling Execution Errors

def run_workflow_safely(workflow, input_data):
    """Run workflow with comprehensive error handling"""

    try:
        # Start the workflow
        run = workflow.run(body=input_data)

        # Wait for completion with timeout
        result = run.wait(interval=2, timeout=600)  # 10 minute timeout

        if result.status == "completed":
            return {"success": True, "output": result.output}
        elif result.status == "failed":
            return {
                "success": False,
                "error": result.error_message,
                "details": result.error_details
            }
        else:
            return {"success": False, "error": f"Unexpected status: {result.status}"}

    except TimeoutError:
        return {"success": False, "error": "Workflow execution timed out"}
    except Exception as e:
        return {"success": False, "error": f"Execution error: {str(e)}"}

# Usage
result = run_workflow_safely(workflow, {"input": "test data"})

if result["success"]:
    print(f"Success: {result['output']}")
else:
    print(f"Error: {result['error']}")

Retry Logic

import time
import random

def run_workflow_with_retry(workflow, input_data, max_retries=3):
    """Run workflow with exponential backoff retry"""

    for attempt in range(max_retries):
        try:
            run = workflow.run(body=input_data)
            result = run.wait(interval=2, timeout=300)

            if result.status == "completed":
                return result
            elif result.status == "failed":
                if attempt == max_retries - 1:
                    raise Exception(f"Workflow failed: {result.error_message}")

                # Wait before retry with exponential backoff
                wait_time = (2 ** attempt) + random.uniform(0, 1)
                print(f"Attempt {attempt + 1} failed, retrying in {wait_time:.1f}s...")
                time.sleep(wait_time)

        except Exception as e:
            if attempt == max_retries - 1:
                raise

            wait_time = (2 ** attempt) + random.uniform(0, 1)
            print(f"Error on attempt {attempt + 1}: {e}")
            print(f"Retrying in {wait_time:.1f}s...")
            time.sleep(wait_time)

# Usage
try:
    result = run_workflow_with_retry(workflow, {"input": "test data"})
    print(f"Success after retries: {result.output}")
except Exception as e:
    print(f"Failed after all retries: {e}")

Performance Optimization

Efficient Polling

def adaptive_wait(run, initial_interval=1, max_interval=30, timeout=600):
    """Wait for workflow completion with adaptive polling"""

    start_time = time.time()
    interval = initial_interval

    while True:
        run = run.refresh()

        if run.status in ["completed", "failed", "cancelled"]:
            return run

        # Check timeout
        if time.time() - start_time > timeout:
            raise TimeoutError("Workflow execution timed out")

        # Adaptive interval - increase polling interval over time
        time.sleep(interval)
        interval = min(interval * 1.5, max_interval)

# Usage
run = workflow.run(body={"input": "test data"})
result = adaptive_wait(run)

Concurrent Execution with Limits

import asyncio
from asyncio import Semaphore

async def run_workflows_with_limit(workflows_and_inputs, max_concurrent=5):
    """Run multiple workflows with concurrency limit"""

    semaphore = Semaphore(max_concurrent)

    async def run_single(workflow, input_data):
        async with semaphore:
            run = await workflow.arun(body=input_data)
            return await run.a_wait()

    # Create tasks for all workflows
    tasks = [
        run_single(workflow, input_data)
        for workflow, input_data in workflows_and_inputs
    ]

    # Execute with concurrency limit
    results = await asyncio.gather(*tasks, return_exceptions=True)

    return results

# Usage
workflows_and_inputs = [
    (workflow1, {"input": "data1"}),
    (workflow2, {"input": "data2"}),
    (workflow3, {"input": "data3"}),
    # ... more workflows
]

results = asyncio.run(run_workflows_with_limit(workflows_and_inputs))

Best Practices

Validate inputs before running workflows:
def validate_workflow_input(input_data, required_fields):
    """Validate workflow input data"""
    missing_fields = []
    
    for field in required_fields:
        if field not in input_data or not input_data[field]:
            missing_fields.append(field)
    
    if missing_fields:
        raise ValueError(f"Missing required fields: {missing_fields}")
    
    return True

# Usage
required_fields = ["Question", "Context"]
validate_workflow_input(input_data, required_fields)
Manage resources efficiently:
# Use context managers for resource cleanup
async def process_workflows():
    client = Client(api_key="your_api_key")
    try:
        # Process workflows
        results = await run_multiple_workflows()
        return results
    finally:
        # Cleanup resources if needed
        pass
Implement comprehensive logging:
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def run_workflow_with_logging(workflow, input_data):
    logger.info(f"Starting workflow {workflow.id}")
    logger.info(f"Input data: {input_data}")
    
    start_time = time.time()
    run = workflow.run(body=input_data)
    
    logger.info(f"Workflow started, run ID: {run.id}")
    
    result = run.wait()
    execution_time = time.time() - start_time
    
    logger.info(f"Workflow completed in {execution_time:.2f}s")
    logger.info(f"Status: {result.status}")
    
    if result.status == "failed":
        logger.error(f"Workflow failed: {result.error_message}")
    
    return result

Next Steps