Once you’ve built and saved a workflow, you can execute it with different inputs and monitor its progress. The Noxus SDK provides both synchronous and asynchronous methods for running workflows and handling results.
from noxus_sdk.client import Clientclient = Client(api_key="your_api_key_here")# Get an existing workflowworkflow = client.workflows.get("workflow_id_here")# Run with input datarun = workflow.run(body={ "User Input": "What are the benefits of renewable energy?", "Analysis Type": "comprehensive"})# Wait for completionresult = run.wait(interval=2) # Check every 2 secondsprint(f"Status: {result.status}")print(f"Output: {result.output}")
The input format depends on how you configured your workflow nodes:
Node Labels
Use the label you assigned to input nodes:
# If you created an input node with label "User Question"input_node = workflow_def.node("InputNode").config( label="User Question")# Run with this inputrun = workflow.run(body={ "User Question": "How does photosynthesis work?"})
Node IDs
Use the node’s unique ID:
# Get node ID from workflow definitioninput_node = workflow_def.node("InputNode")node_id = input_node.id# Run with node IDrun = workflow.run(body={ node_id: "Your input value here"})
Node ID with Input Name
Use the format {node_id}::{input_name}:
# For input nodes, the default input name is "input"run = workflow.run(body={ f"{node_id}::input": "Your input value here"})
Instead of polling for run completion, you can stream events in real time using Server-Sent Events (SSE). This gives you instant notifications as nodes execute and when the run finishes.
# Create a run and stream events as they happenrun = workflow.run(body={"User Input": "Explain quantum computing"})for event in run.stream(): if event.type == "content": node_id = event.data.get("id") status = event.data.get("status") content = event.data.get("content", "") print(f"[{node_id}] {status}: {content}") elif event.type == "state": print(f"Progress: {event.data.get('status')}") if event.is_terminal: print(f"Run finished: {event.data['workflow_status']}")# Refresh to get final output after streamingrun.refresh()print(f"Output: {run.output}")
Create a run and immediately stream its events in a single call:
for event in workflow.run_and_stream(body={"User Input": "Hello"}): if event.type == "content" and event.data.get("content"): print(event.data["content"], end="") if event.is_terminal: print(f"\nDone: {event.data['workflow_status']}")
Each event yielded by stream() is a RunEvent object:
Property
Type
Description
event.type
str
"content" (node output) or "state" (progress)
event.data
dict
Event payload with node ID, status, content, etc.
event.is_terminal
bool
True when workflow_status is "completed" or "failed"
event.redis_id
str
Stream cursor — pass as etag to resume from this point
run.wait() now uses SSE streaming internally for instant completion detection. If the SSE endpoint is unavailable, it automatically falls back to polling.
For event-driven integrations, you can provide a callback_url when creating a run. The platform will POST the run result to your URL when the run reaches a terminal state (completed, failed, or stopped).
run = workflow.run( body={"User Input": "Analyze this data"}, callback_url="https://your-server.com/webhook/noxus")# No need to poll — your server will be notifiedprint(f"Run {run.id} started, webhook will fire on completion")
# Only provide values for dynamic inputs# Fixed inputs (configured with fixed_value=True) are automatically usedrun = workflow.run(body={ "User Question": "What is machine learning?" # Fixed inputs like system prompts are handled automatically})# Override fixed inputs if needed (if the workflow allows it)run = workflow.run(body={ "User Question": "What is machine learning?", "System Prompt": "You are a technical expert. Be precise and detailed."})
# Output is typically a string, but can be structured dataoutput = result.outputif isinstance(output, str): # Text output print(f"Generated text: {output}")elif isinstance(output, dict): # Structured output for key, value in output.items(): print(f"{key}: {value}")elif isinstance(output, list): # List output for i, item in enumerate(output): print(f"Item {i}: {item}")
When a text output exceeds 16,000 characters, the API does not return the full string inline. Instead, it truncates the value in the response, uploads the full content to file storage, and returns a TextContainer-shaped dict with a has_preview flag and a file reference:
Detect this shape via has_preview and fetch the full content through the Files API:
result = run.wait()for key, value in result.output.items(): if isinstance(value, dict) and value.get("has_preview"): # Output was truncated — download the full content from storage file_id = value["file"]["id"] full_content = client.files.get(file_id).decode("utf-8") print(f"{key}: full length {len(full_content)} chars") else: # Output fit under the limit — use `text` (or the raw string) directly text = value.get("text") if isinstance(value, dict) else value print(f"{key}: {text}")
has_preview only appears on outputs that were actually truncated. Outputs that fit under the 16,000-character limit are returned either as a plain string or as {"text": "..."} without a file reference, so always guard on has_preview before attempting to download.
async def process_multiple_inputs(workflow, inputs): """Process multiple inputs through the same workflow""" # Start all runs runs = [] for input_data in inputs: run = await workflow.arun(body=input_data) runs.append(run) # Wait for all to complete results = [] for run in runs: result = await run.a_wait() results.append(result) return results# Usageinputs = [ {"Question": "What is AI?"}, {"Question": "How does machine learning work?"}, {"Question": "What are neural networks?"}]workflow = client.workflows.get("qa_workflow_id")results = asyncio.run(process_multiple_inputs(workflow, inputs))for i, result in enumerate(results): print(f"Answer {i+1}: {result.output}")
def validate_workflow_input(input_data, required_fields): """Validate workflow input data""" missing_fields = [] for field in required_fields: if field not in input_data or not input_data[field]: missing_fields.append(field) if missing_fields: raise ValueError(f"Missing required fields: {missing_fields}") return True# Usagerequired_fields = ["Question", "Context"]validate_workflow_input(input_data, required_fields)
Resource Management
Manage resources efficiently:
# Use context managers for resource cleanupasync def process_workflows(): client = Client(api_key="your_api_key") try: # Process workflows results = await run_multiple_workflows() return results finally: # Cleanup resources if needed pass
Logging and Monitoring
Implement comprehensive logging:
import logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__)def run_workflow_with_logging(workflow, input_data): logger.info(f"Starting workflow {workflow.id}") logger.info(f"Input data: {input_data}") start_time = time.time() run = workflow.run(body=input_data) logger.info(f"Workflow started, run ID: {run.id}") result = run.wait() execution_time = time.time() - start_time logger.info(f"Workflow completed in {execution_time:.2f}s") logger.info(f"Status: {result.status}") if result.status == "failed": logger.error(f"Workflow failed: {result.error_message}") return result