Overview
Once you’ve built and saved a workflow, you can execute it with different inputs and monitor its progress. The Noxus SDK provides both synchronous and asynchronous methods for running workflows and handling results.Basic Workflow Execution
Running a Workflow
Copy
Ask AI
from noxus_sdk.client import Client
client = Client(api_key="your_api_key_here")
# Get an existing workflow
workflow = client.workflows.get("workflow_id_here")
# Run with input data
run = workflow.run(body={
"User Input": "What are the benefits of renewable energy?",
"Analysis Type": "comprehensive"
})
# Wait for completion
result = run.wait(interval=2) # Check every 2 seconds
print(f"Status: {result.status}")
print(f"Output: {result.output}")
Input Formats
The input format depends on how you configured your workflow nodes:Node Labels
Node Labels
Use the label you assigned to input nodes:
Copy
Ask AI
# If you created an input node with label "User Question"
input_node = workflow_def.node("InputNode").config(
label="User Question"
)
# Run with this input
run = workflow.run(body={
"User Question": "How does photosynthesis work?"
})
Node IDs
Node IDs
Use the node’s unique ID:
Copy
Ask AI
# Get node ID from workflow definition
input_node = workflow_def.node("InputNode")
node_id = input_node.id
# Run with node ID
run = workflow.run(body={
node_id: "Your input value here"
})
Node ID with Input Name
Node ID with Input Name
Use the format
{node_id}::{input_name}:Copy
Ask AI
# For input nodes, the default input name is "input"
run = workflow.run(body={
f"{node_id}::input": "Your input value here"
})
Monitoring Execution
Synchronous Monitoring
Wait for workflow completion with custom intervals:Copy
Ask AI
# Run workflow
run = workflow.run(body={"input": "test data"})
# Monitor with custom interval and timeout
try:
result = run.wait(
interval=5, # Check every 5 seconds
timeout=300 # Maximum wait time (5 minutes)
)
print(f"Completed in {result.execution_time}ms")
print(f"Final output: {result.output}")
except TimeoutError:
print("Workflow execution timed out")
print(f"Current status: {run.status}")
Asynchronous Monitoring
For non-blocking execution monitoring:Copy
Ask AI
import asyncio
async def monitor_workflow_async():
client = Client(api_key="your_api_key_here")
workflow = await client.workflows.aget("workflow_id")
# Start workflow execution
run = await workflow.arun(body={"input": "test data"})
# Monitor asynchronously
result = await run.a_wait(interval=2)
return result
# Run async monitoring
result = asyncio.run(monitor_workflow_async())
Manual Status Checking
For more control over monitoring:Copy
Ask AI
import time
# Start workflow
run = workflow.run(body={"input": "test data"})
# Manual monitoring loop
while True:
# Refresh run status
run = run.refresh()
print(f"Status: {run.status}")
if run.status in ["completed", "failed", "cancelled"]:
break
time.sleep(3) # Wait 3 seconds before next check
# Handle final result
if run.status == "completed":
print(f"Success! Output: {run.output}")
elif run.status == "failed":
print(f"Failed: {run.error_message}")
Handling Different Input Types
Text Inputs
Copy
Ask AI
# Simple text input
run = workflow.run(body={
"Question": "Explain quantum computing",
"Context": "For a general audience"
})
File Inputs
Copy
Ask AI
import base64
# For file input nodes
with open("document.pdf", "rb") as file:
file_content = base64.b64encode(file.read()).decode("utf-8")
run = workflow.run(body={
"Document": {
"name": "document.pdf",
"content": file_content,
"mime_type": "application/pdf"
}
})
Multiple Inputs
Copy
Ask AI
# Workflow with multiple input nodes
run = workflow.run(body={
"Primary Text": "The main content to analyze",
"Analysis Focus": "sentiment and themes",
"Output Format": "bullet points",
"Max Length": "300 words"
})
Fixed vs Dynamic Inputs
Copy
Ask AI
# Only provide values for dynamic inputs
# Fixed inputs (configured with fixed_value=True) are automatically used
run = workflow.run(body={
"User Question": "What is machine learning?"
# Fixed inputs like system prompts are handled automatically
})
# Override fixed inputs if needed (if the workflow allows it)
run = workflow.run(body={
"User Question": "What is machine learning?",
"System Prompt": "You are a technical expert. Be precise and detailed."
})
Result Handling
Understanding Run Results
Copy
Ask AI
result = run.wait()
# Basic result information
print(f"Run ID: {result.id}")
print(f"Status: {result.status}")
print(f"Started: {result.created_at}")
print(f"Completed: {result.updated_at}")
print(f"Execution time: {result.execution_time}ms")
# Output data
print(f"Final output: {result.output}")
# Error information (if failed)
if result.status == "failed":
print(f"Error: {result.error_message}")
print(f"Error details: {result.error_details}")
Processing Output Data
Copy
Ask AI
# Output is typically a string, but can be structured data
output = result.output
if isinstance(output, str):
# Text output
print(f"Generated text: {output}")
elif isinstance(output, dict):
# Structured output
for key, value in output.items():
print(f"{key}: {value}")
elif isinstance(output, list):
# List output
for i, item in enumerate(output):
print(f"Item {i}: {item}")
Saving Results
Copy
Ask AI
import json
from datetime import datetime
# Save result to file
result_data = {
"workflow_id": workflow.id,
"run_id": result.id,
"timestamp": datetime.now().isoformat(),
"status": result.status,
"output": result.output,
"execution_time": result.execution_time
}
with open(f"workflow_result_{result.id}.json", "w") as f:
json.dump(result_data, f, indent=2)
print(f"Result saved to workflow_result_{result.id}.json")
Batch Processing
Running Multiple Workflows
Copy
Ask AI
import asyncio
async def run_multiple_workflows():
client = Client(api_key="your_api_key_here")
# Get workflows
workflow1 = await client.workflows.aget("workflow_id_1")
workflow2 = await client.workflows.aget("workflow_id_2")
# Start multiple runs concurrently
run1 = await workflow1.arun(body={"input": "data for workflow 1"})
run2 = await workflow2.arun(body={"input": "data for workflow 2"})
# Wait for all to complete
result1, result2 = await asyncio.gather(
run1.a_wait(),
run2.a_wait()
)
return result1, result2
# Execute batch processing
results = asyncio.run(run_multiple_workflows())
Processing Multiple Inputs
Copy
Ask AI
async def process_multiple_inputs(workflow, inputs):
"""Process multiple inputs through the same workflow"""
# Start all runs
runs = []
for input_data in inputs:
run = await workflow.arun(body=input_data)
runs.append(run)
# Wait for all to complete
results = []
for run in runs:
result = await run.a_wait()
results.append(result)
return results
# Usage
inputs = [
{"Question": "What is AI?"},
{"Question": "How does machine learning work?"},
{"Question": "What are neural networks?"}
]
workflow = client.workflows.get("qa_workflow_id")
results = asyncio.run(process_multiple_inputs(workflow, inputs))
for i, result in enumerate(results):
print(f"Answer {i+1}: {result.output}")
Error Handling
Handling Execution Errors
Copy
Ask AI
def run_workflow_safely(workflow, input_data):
"""Run workflow with comprehensive error handling"""
try:
# Start the workflow
run = workflow.run(body=input_data)
# Wait for completion with timeout
result = run.wait(interval=2, timeout=600) # 10 minute timeout
if result.status == "completed":
return {"success": True, "output": result.output}
elif result.status == "failed":
return {
"success": False,
"error": result.error_message,
"details": result.error_details
}
else:
return {"success": False, "error": f"Unexpected status: {result.status}"}
except TimeoutError:
return {"success": False, "error": "Workflow execution timed out"}
except Exception as e:
return {"success": False, "error": f"Execution error: {str(e)}"}
# Usage
result = run_workflow_safely(workflow, {"input": "test data"})
if result["success"]:
print(f"Success: {result['output']}")
else:
print(f"Error: {result['error']}")
Retry Logic
Copy
Ask AI
import time
import random
def run_workflow_with_retry(workflow, input_data, max_retries=3):
"""Run workflow with exponential backoff retry"""
for attempt in range(max_retries):
try:
run = workflow.run(body=input_data)
result = run.wait(interval=2, timeout=300)
if result.status == "completed":
return result
elif result.status == "failed":
if attempt == max_retries - 1:
raise Exception(f"Workflow failed: {result.error_message}")
# Wait before retry with exponential backoff
wait_time = (2 ** attempt) + random.uniform(0, 1)
print(f"Attempt {attempt + 1} failed, retrying in {wait_time:.1f}s...")
time.sleep(wait_time)
except Exception as e:
if attempt == max_retries - 1:
raise
wait_time = (2 ** attempt) + random.uniform(0, 1)
print(f"Error on attempt {attempt + 1}: {e}")
print(f"Retrying in {wait_time:.1f}s...")
time.sleep(wait_time)
# Usage
try:
result = run_workflow_with_retry(workflow, {"input": "test data"})
print(f"Success after retries: {result.output}")
except Exception as e:
print(f"Failed after all retries: {e}")
Performance Optimization
Efficient Polling
Copy
Ask AI
def adaptive_wait(run, initial_interval=1, max_interval=30, timeout=600):
"""Wait for workflow completion with adaptive polling"""
start_time = time.time()
interval = initial_interval
while True:
run = run.refresh()
if run.status in ["completed", "failed", "cancelled"]:
return run
# Check timeout
if time.time() - start_time > timeout:
raise TimeoutError("Workflow execution timed out")
# Adaptive interval - increase polling interval over time
time.sleep(interval)
interval = min(interval * 1.5, max_interval)
# Usage
run = workflow.run(body={"input": "test data"})
result = adaptive_wait(run)
Concurrent Execution with Limits
Copy
Ask AI
import asyncio
from asyncio import Semaphore
async def run_workflows_with_limit(workflows_and_inputs, max_concurrent=5):
"""Run multiple workflows with concurrency limit"""
semaphore = Semaphore(max_concurrent)
async def run_single(workflow, input_data):
async with semaphore:
run = await workflow.arun(body=input_data)
return await run.a_wait()
# Create tasks for all workflows
tasks = [
run_single(workflow, input_data)
for workflow, input_data in workflows_and_inputs
]
# Execute with concurrency limit
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
# Usage
workflows_and_inputs = [
(workflow1, {"input": "data1"}),
(workflow2, {"input": "data2"}),
(workflow3, {"input": "data3"}),
# ... more workflows
]
results = asyncio.run(run_workflows_with_limit(workflows_and_inputs))
Best Practices
Input Validation
Input Validation
Validate inputs before running workflows:
Copy
Ask AI
def validate_workflow_input(input_data, required_fields):
"""Validate workflow input data"""
missing_fields = []
for field in required_fields:
if field not in input_data or not input_data[field]:
missing_fields.append(field)
if missing_fields:
raise ValueError(f"Missing required fields: {missing_fields}")
return True
# Usage
required_fields = ["Question", "Context"]
validate_workflow_input(input_data, required_fields)
Resource Management
Resource Management
Manage resources efficiently:
Copy
Ask AI
# Use context managers for resource cleanup
async def process_workflows():
client = Client(api_key="your_api_key")
try:
# Process workflows
results = await run_multiple_workflows()
return results
finally:
# Cleanup resources if needed
pass
Logging and Monitoring
Logging and Monitoring
Implement comprehensive logging:
Copy
Ask AI
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def run_workflow_with_logging(workflow, input_data):
logger.info(f"Starting workflow {workflow.id}")
logger.info(f"Input data: {input_data}")
start_time = time.time()
run = workflow.run(body=input_data)
logger.info(f"Workflow started, run ID: {run.id}")
result = run.wait()
execution_time = time.time() - start_time
logger.info(f"Workflow completed in {execution_time:.2f}s")
logger.info(f"Status: {result.status}")
if result.status == "failed":
logger.error(f"Workflow failed: {result.error_message}")
return result