Overview
Once you’ve built and saved a workflow, you can execute it with different inputs and monitor its progress. The Noxus SDK provides both synchronous and asynchronous methods for running workflows and handling results.Basic Workflow Execution
Running a Workflow
Copy
Ask AI
from noxus_sdk.client import Client
client = Client(api_key="your_api_key_here")
# Get an existing workflow
workflow = client.workflows.get("workflow_id_here")
# Run with input data
run = workflow.run(body={
"User Input": "What are the benefits of renewable energy?",
"Analysis Type": "comprehensive"
})
# Wait for completion
result = run.wait(interval=2) # Check every 2 seconds
print(f"Status: {result.status}")
print(f"Output: {result.output}")
Input Formats
The input format depends on how you configured your workflow nodes:Node Labels
Node Labels
Use the label you assigned to input nodes:
Copy
Ask AI
# If you created an input node with label "User Question"
input_node = workflow_def.node("InputNode").config(
label="User Question"
)
# Run with this input
run = workflow.run(body={
"User Question": "How does photosynthesis work?"
})
Node IDs
Node IDs
Use the node’s unique ID:
Copy
Ask AI
# Get node ID from workflow definition
input_node = workflow_def.node("InputNode")
node_id = input_node.id
# Run with node ID
run = workflow.run(body={
node_id: "Your input value here"
})
Node ID with Input Name
Node ID with Input Name
Use the format
{node_id}::{input_name}:Copy
Ask AI
# For input nodes, the default input name is "input"
run = workflow.run(body={
f"{node_id}::input": "Your input value here"
})
Monitoring Execution
Synchronous Monitoring
Wait for workflow completion with custom intervals:Copy
Ask AI
# Run workflow
run = workflow.run(body={"input": "test data"})
# Monitor with custom interval and timeout
try:
result = run.wait(
interval=5, # Check every 5 seconds
timeout=300 # Maximum wait time (5 minutes)
)
print(f"Completed in {result.execution_time}ms")
print(f"Final output: {result.output}")
except TimeoutError:
print("Workflow execution timed out")
print(f"Current status: {run.status}")
Asynchronous Monitoring
For non-blocking execution monitoring:Copy
Ask AI
import asyncio
async def monitor_workflow_async():
client = Client(api_key="your_api_key_here")
workflow = await client.workflows.aget("workflow_id")
# Start workflow execution
run = await workflow.arun(body={"input": "test data"})
# Monitor asynchronously
result = await run.a_wait(interval=2)
return result
# Run async monitoring
result = asyncio.run(monitor_workflow_async())
Manual Status Checking
For more control over monitoring:Copy
Ask AI
import time
# Start workflow
run = workflow.run(body={"input": "test data"})
# Manual monitoring loop
while True:
# Refresh run status
run = run.refresh()
print(f"Status: {run.status}")
if run.status in ["completed", "failed", "cancelled"]:
break
time.sleep(3) # Wait 3 seconds before next check
# Handle final result
if run.status == "completed":
print(f"Success! Output: {run.output}")
elif run.status == "failed":
print(f"Failed: {run.error_message}")
Handling Different Input Types
Text Inputs
Copy
Ask AI
# Simple text input
run = workflow.run(body={
"Question": "Explain quantum computing",
"Context": "For a general audience"
})
File Inputs
Copy
Ask AI
# Method 1: Using a file URL
run = workflow.run(body={
"Document": {
"file_metadata": {"uri": "https://path/to/file.pdf"}
}
})
# Method 2: Using base64-encoded file content (data URI format)
import base64
with open("document.pdf", "rb") as file:
file_content = file.read()
base64_content = base64.b64encode(file_content).decode("utf-8")
data_uri = f"data:application/pdf;base64,{base64_content}"
run = workflow.run(body={
"Document": {
"file_metadata": {"uri": data_uri}
}
})
Multiple Inputs
Copy
Ask AI
# Workflow with multiple input nodes
run = workflow.run(body={
"Primary Text": "The main content to analyze",
"Analysis Focus": "sentiment and themes",
"Output Format": "bullet points",
"Max Length": "300 words"
})
Fixed vs Dynamic Inputs
Copy
Ask AI
# Only provide values for dynamic inputs
# Fixed inputs (configured with fixed_value=True) are automatically used
run = workflow.run(body={
"User Question": "What is machine learning?"
# Fixed inputs like system prompts are handled automatically
})
# Override fixed inputs if needed (if the workflow allows it)
run = workflow.run(body={
"User Question": "What is machine learning?",
"System Prompt": "You are a technical expert. Be precise and detailed."
})
Result Handling
Understanding Run Results
Copy
Ask AI
result = run.wait()
# Basic result information
print(f"Run ID: {result.id}")
print(f"Status: {result.status}")
print(f"Started: {result.created_at}")
print(f"Completed: {result.updated_at}")
print(f"Execution time: {result.execution_time}ms")
# Output data
print(f"Final output: {result.output}")
# Error information (if failed)
if result.status == "failed":
print(f"Error: {result.error_message}")
print(f"Error details: {result.error_details}")
Processing Output Data
Copy
Ask AI
# Output is typically a string, but can be structured data
output = result.output
if isinstance(output, str):
# Text output
print(f"Generated text: {output}")
elif isinstance(output, dict):
# Structured output
for key, value in output.items():
print(f"{key}: {value}")
elif isinstance(output, list):
# List output
for i, item in enumerate(output):
print(f"Item {i}: {item}")
Saving Results
Copy
Ask AI
import json
from datetime import datetime
# Save result to file
result_data = {
"workflow_id": workflow.id,
"run_id": result.id,
"timestamp": datetime.now().isoformat(),
"status": result.status,
"output": result.output,
"execution_time": result.execution_time
}
with open(f"workflow_result_{result.id}.json", "w") as f:
json.dump(result_data, f, indent=2)
print(f"Result saved to workflow_result_{result.id}.json")
Batch Processing
Running Multiple Workflows
Copy
Ask AI
import asyncio
async def run_multiple_workflows():
client = Client(api_key="your_api_key_here")
# Get workflows
workflow1 = await client.workflows.aget("workflow_id_1")
workflow2 = await client.workflows.aget("workflow_id_2")
# Start multiple runs concurrently
run1 = await workflow1.arun(body={"input": "data for workflow 1"})
run2 = await workflow2.arun(body={"input": "data for workflow 2"})
# Wait for all to complete
result1, result2 = await asyncio.gather(
run1.a_wait(),
run2.a_wait()
)
return result1, result2
# Execute batch processing
results = asyncio.run(run_multiple_workflows())
Processing Multiple Inputs
Copy
Ask AI
async def process_multiple_inputs(workflow, inputs):
"""Process multiple inputs through the same workflow"""
# Start all runs
runs = []
for input_data in inputs:
run = await workflow.arun(body=input_data)
runs.append(run)
# Wait for all to complete
results = []
for run in runs:
result = await run.a_wait()
results.append(result)
return results
# Usage
inputs = [
{"Question": "What is AI?"},
{"Question": "How does machine learning work?"},
{"Question": "What are neural networks?"}
]
workflow = client.workflows.get("qa_workflow_id")
results = asyncio.run(process_multiple_inputs(workflow, inputs))
for i, result in enumerate(results):
print(f"Answer {i+1}: {result.output}")
Error Handling
Handling Execution Errors
Copy
Ask AI
def run_workflow_safely(workflow, input_data):
"""Run workflow with comprehensive error handling"""
try:
# Start the workflow
run = workflow.run(body=input_data)
# Wait for completion with timeout
result = run.wait(interval=2, timeout=600) # 10 minute timeout
if result.status == "completed":
return {"success": True, "output": result.output}
elif result.status == "failed":
return {
"success": False,
"error": result.error_message,
"details": result.error_details
}
else:
return {"success": False, "error": f"Unexpected status: {result.status}"}
except TimeoutError:
return {"success": False, "error": "Workflow execution timed out"}
except Exception as e:
return {"success": False, "error": f"Execution error: {str(e)}"}
# Usage
result = run_workflow_safely(workflow, {"input": "test data"})
if result["success"]:
print(f"Success: {result['output']}")
else:
print(f"Error: {result['error']}")
Retry Logic
Copy
Ask AI
import time
import random
def run_workflow_with_retry(workflow, input_data, max_retries=3):
"""Run workflow with exponential backoff retry"""
for attempt in range(max_retries):
try:
run = workflow.run(body=input_data)
result = run.wait(interval=2, timeout=300)
if result.status == "completed":
return result
elif result.status == "failed":
if attempt == max_retries - 1:
raise Exception(f"Workflow failed: {result.error_message}")
# Wait before retry with exponential backoff
wait_time = (2 ** attempt) + random.uniform(0, 1)
print(f"Attempt {attempt + 1} failed, retrying in {wait_time:.1f}s...")
time.sleep(wait_time)
except Exception as e:
if attempt == max_retries - 1:
raise
wait_time = (2 ** attempt) + random.uniform(0, 1)
print(f"Error on attempt {attempt + 1}: {e}")
print(f"Retrying in {wait_time:.1f}s...")
time.sleep(wait_time)
# Usage
try:
result = run_workflow_with_retry(workflow, {"input": "test data"})
print(f"Success after retries: {result.output}")
except Exception as e:
print(f"Failed after all retries: {e}")
Performance Optimization
Efficient Polling
Copy
Ask AI
def adaptive_wait(run, initial_interval=1, max_interval=30, timeout=600):
"""Wait for workflow completion with adaptive polling"""
start_time = time.time()
interval = initial_interval
while True:
run = run.refresh()
if run.status in ["completed", "failed", "cancelled"]:
return run
# Check timeout
if time.time() - start_time > timeout:
raise TimeoutError("Workflow execution timed out")
# Adaptive interval - increase polling interval over time
time.sleep(interval)
interval = min(interval * 1.5, max_interval)
# Usage
run = workflow.run(body={"input": "test data"})
result = adaptive_wait(run)
Concurrent Execution with Limits
Copy
Ask AI
import asyncio
from asyncio import Semaphore
async def run_workflows_with_limit(workflows_and_inputs, max_concurrent=5):
"""Run multiple workflows with concurrency limit"""
semaphore = Semaphore(max_concurrent)
async def run_single(workflow, input_data):
async with semaphore:
run = await workflow.arun(body=input_data)
return await run.a_wait()
# Create tasks for all workflows
tasks = [
run_single(workflow, input_data)
for workflow, input_data in workflows_and_inputs
]
# Execute with concurrency limit
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
# Usage
workflows_and_inputs = [
(workflow1, {"input": "data1"}),
(workflow2, {"input": "data2"}),
(workflow3, {"input": "data3"}),
# ... more workflows
]
results = asyncio.run(run_workflows_with_limit(workflows_and_inputs))
Best Practices
Input Validation
Input Validation
Validate inputs before running workflows:
Copy
Ask AI
def validate_workflow_input(input_data, required_fields):
"""Validate workflow input data"""
missing_fields = []
for field in required_fields:
if field not in input_data or not input_data[field]:
missing_fields.append(field)
if missing_fields:
raise ValueError(f"Missing required fields: {missing_fields}")
return True
# Usage
required_fields = ["Question", "Context"]
validate_workflow_input(input_data, required_fields)
Resource Management
Resource Management
Manage resources efficiently:
Copy
Ask AI
# Use context managers for resource cleanup
async def process_workflows():
client = Client(api_key="your_api_key")
try:
# Process workflows
results = await run_multiple_workflows()
return results
finally:
# Cleanup resources if needed
pass
Logging and Monitoring
Logging and Monitoring
Implement comprehensive logging:
Copy
Ask AI
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def run_workflow_with_logging(workflow, input_data):
logger.info(f"Starting workflow {workflow.id}")
logger.info(f"Input data: {input_data}")
start_time = time.time()
run = workflow.run(body=input_data)
logger.info(f"Workflow started, run ID: {run.id}")
result = run.wait()
execution_time = time.time() - start_time
logger.info(f"Workflow completed in {execution_time:.2f}s")
logger.info(f"Status: {result.status}")
if result.status == "failed":
logger.error(f"Workflow failed: {result.error_message}")
return result
Next Steps
Node Types
Learn about all available node types and their capabilities
Workflow Examples
Explore complete workflow examples for common use cases
Advanced Patterns
Master advanced workflow design and execution patterns
API Reference
Detailed API reference for workflow execution