Documentation Index Fetch the complete documentation index at: https://docs.noxus.ai/llms.txt
Use this file to discover all available pages before exploring further.
Overview
The Noxus Client SDK provides comprehensive support for asynchronous operations, allowing you to build high-performance applications that can handle multiple concurrent requests efficiently. This is particularly useful for I/O-bound operations like API calls, file uploads, and long-running workflows.
Why Use Async?
Better Performance Handle multiple operations concurrently without blocking
Resource Efficiency Use system resources more efficiently with non-blocking I/O
Scalability Build applications that can handle more concurrent users
Responsiveness Keep your application responsive during long operations
Async Method Pattern
All async methods in the SDK follow a consistent naming pattern - they’re prefixed with a:
# Synchronous methods
workflows = client.workflows.list()
workflow = client.workflows.get( "workflow_id" )
conversation = client.conversations.create( name = "Test" , settings = settings)
# Asynchronous equivalents
workflows = await client.workflows.alist()
workflow = await client.workflows.aget( "workflow_id" )
conversation = await client.conversations.acreate( name = "Test" , settings = settings)
Basic Async Usage
Simple Async Function
import asyncio
from noxus_sdk.client import Client
async def main ():
client = Client( api_key = "your_api_key_here" )
# Async operations
workflows = await client.workflows.alist()
models = await client.aget_models()
print ( f "Found { len (workflows) } workflows" )
print ( f "Available models: { len (models) } " )
# Run the async function
asyncio.run(main())
Async Context Manager
For better resource management:
import asyncio
import aiohttp
from noxus_sdk.client import Client
async def main ():
client = Client( api_key = "your_api_key_here" )
try :
# Perform async operations
workflows = await client.workflows.alist()
for workflow in workflows[: 3 ]: # Process first 3
details = await client.workflows.aget(workflow.id)
print ( f "Workflow: { details.name } " )
finally :
# Cleanup if needed
pass
asyncio.run(main())
Concurrent Operations
Running Multiple Operations Concurrently
Use asyncio.gather() to run multiple operations simultaneously:
import asyncio
from noxus_sdk.client import Client
async def get_platform_info ():
client = Client( api_key = "your_api_key_here" )
# Run multiple operations concurrently
workflows_task = client.workflows.alist()
models_task = client.aget_models()
presets_task = client.aget_chat_presets()
nodes_task = client.aget_nodes()
# Wait for all to complete
workflows, models, presets, nodes = await asyncio.gather(
workflows_task,
models_task,
presets_task,
nodes_task
)
return {
"workflows" : len (workflows),
"models" : len (models),
"presets" : len (presets),
"nodes" : len (nodes)
}
# Usage
info = asyncio.run(get_platform_info())
print (info)
Processing Collections Concurrently
Process multiple items concurrently with controlled concurrency:
import asyncio
from noxus_sdk.client import Client
async def process_workflow ( client , workflow_id ):
"""Process a single workflow"""
workflow = await client.workflows.aget(workflow_id)
runs = await client.runs.alist( workflow_id = workflow_id)
return {
"id" : workflow.id,
"name" : workflow.name,
"run_count" : len (runs)
}
async def process_all_workflows ():
client = Client( api_key = "your_api_key_here" )
# Get all workflows
workflows = await client.workflows.alist()
# Process workflows concurrently (limit concurrency to 5)
semaphore = asyncio.Semaphore( 5 )
async def process_with_semaphore ( workflow_id ):
async with semaphore:
return await process_workflow(client, workflow_id)
# Create tasks for all workflows
tasks = [
process_with_semaphore(workflow.id)
for workflow in workflows
]
# Wait for all to complete
results = await asyncio.gather( * tasks)
return results
# Usage
results = asyncio.run(process_all_workflows())
for result in results:
print ( f " { result[ 'name' ] } : { result[ 'run_count' ] } runs" )
Async Workflows
Creating and Running Workflows Asynchronously
import asyncio
from noxus_sdk.client import Client
from noxus_sdk.workflows import WorkflowDefinition
async def create_and_run_workflow ():
client = Client( api_key = "your_api_key_here" )
# Create workflow definition
workflow_def = WorkflowDefinition( name = "Async Workflow" )
input_node = workflow_def.node( "InputNode" ).config(
label = "Input" ,
fixed_value = True ,
value = "Tell me about async programming" ,
type = "str"
)
ai_node = workflow_def.node( "TextGenerationNode" ).config(
template = "Explain: ((Input 1))" ,
model = [ "gpt-4o-mini" ]
)
output_node = workflow_def.node( "OutputNode" )
# Connect nodes
workflow_def.link(input_node.output(), ai_node.input( "variables" , "Input 1" ))
workflow_def.link(ai_node.output(), output_node.input())
# Save workflow asynchronously
workflow = await client.workflows.asave(workflow_def)
print ( f "Created workflow: { workflow.id } " )
# Run workflow asynchronously
run = await workflow.arun( body = {})
# Wait for completion asynchronously
result = await run.a_wait( interval = 2 )
return result
# Usage
result = asyncio.run(create_and_run_workflow())
print ( f "Result: { result.output } " )
Async Conversations
Handling Multiple Conversations
import asyncio
from noxus_sdk.client import Client
from noxus_sdk.resources.conversations import ConversationSettings, MessageRequest
async def handle_multiple_conversations ():
client = Client( api_key = "your_api_key_here" )
settings = ConversationSettings(
model = [ "gpt-4o-mini" ],
temperature = 0.7 ,
max_tokens = 150
)
# Create multiple conversations concurrently
conversation_tasks = [
client.conversations.acreate(
name = f "Conversation { i } " ,
settings = settings
)
for i in range ( 3 )
]
conversations = await asyncio.gather( * conversation_tasks)
# Send messages to all conversations concurrently
message_tasks = []
for i, conv in enumerate (conversations):
message = MessageRequest( content = f "Hello from conversation { i } !" )
message_tasks.append(conv.aadd_message(message))
responses = await asyncio.gather( * message_tasks)
# Print responses
for i, response in enumerate (responses):
print ( f "Conversation { i } : { response.message_parts } " )
return conversations
# Usage
conversations = asyncio.run(handle_multiple_conversations())
Async Knowledge Bases
Document Processing with Async
import asyncio
from noxus_sdk.client import Client
from noxus_sdk.resources.knowledge_bases import KnowledgeBaseSettings
async def process_knowledge_base ():
client = Client( api_key = "your_api_key_here" )
# Create knowledge base
kb = await client.knowledge_bases.acreate(
name = "Async KB" ,
description = "Created asynchronously" ,
document_types = [ "pdf" , "txt" ]
)
# Upload multiple documents concurrently
files = [ "doc1.txt" , "doc2.txt" , "doc3.txt" ]
upload_tasks = [
kb.aupload_document( files = [ file ], prefix = f "/docs/ { file } " )
for file in files
]
run_ids = await asyncio.gather( * upload_tasks)
print ( f "Started { len (run_ids) } upload processes" )
# Monitor processing status
while True :
kb = await kb.arefresh()
if kb.status == "ready" :
break
print ( f "KB status: { kb.status } " )
await asyncio.sleep( 5 )
print ( "Knowledge base ready!" )
return kb
# Usage
kb = asyncio.run(process_knowledge_base())
Error Handling in Async Code
Handling Individual Errors
import asyncio
import httpx
from noxus_sdk.client import Client
async def safe_async_operation ():
client = Client( api_key = "your_api_key_here" )
try :
workflows = await client.workflows.alist()
return workflows
except httpx.HTTPStatusError as e:
print ( f "HTTP error: { e.response.status_code } " )
return []
except httpx.RequestError as e:
print ( f "Network error: { e } " )
return []
except Exception as e:
print ( f "Unexpected error: { e } " )
return []
# Usage
workflows = asyncio.run(safe_async_operation())
Handling Errors in Concurrent Operations
import asyncio
import httpx
from noxus_sdk.client import Client
async def fetch_workflow_safe ( client , workflow_id ):
"""Safely fetch a workflow with error handling"""
try :
return await client.workflows.aget(workflow_id)
except httpx.HTTPStatusError as e:
print ( f "Failed to fetch workflow { workflow_id } : { e.response.status_code } " )
return None
except Exception as e:
print ( f "Error fetching workflow { workflow_id } : { e } " )
return None
async def fetch_multiple_workflows_safe ():
client = Client( api_key = "your_api_key_here" )
# Get workflow IDs
workflows = await client.workflows.alist()
workflow_ids = [w.id for w in workflows[: 5 ]] # First 5
# Fetch details concurrently with error handling
tasks = [
fetch_workflow_safe(client, workflow_id)
for workflow_id in workflow_ids
]
results = await asyncio.gather( * tasks, return_exceptions = True )
# Filter out None results and exceptions
successful_results = [
result for result in results
if result is not None and not isinstance (result, Exception )
]
print ( f "Successfully fetched { len (successful_results) } workflows" )
return successful_results
# Usage
workflows = asyncio.run(fetch_multiple_workflows_safe())
Use Semaphores for Rate Limiting
Control concurrency to avoid overwhelming the API: async def rate_limited_operations ():
semaphore = asyncio.Semaphore( 5 ) # Max 5 concurrent operations
async def limited_operation ( item ):
async with semaphore:
return await process_item(item)
tasks = [limited_operation(item) for item in items]
return await asyncio.gather( * tasks)
Batch Operations When Possible
Group related operations to reduce API calls: async def batch_workflow_creation ():
# Create multiple workflows in one batch
workflow_defs = [create_workflow_def(i) for i in range ( 10 )]
# Process in batches of 3
batch_size = 3
results = []
for i in range ( 0 , len (workflow_defs), batch_size):
batch = workflow_defs[i:i + batch_size]
batch_tasks = [client.workflows.asave(wf) for wf in batch]
batch_results = await asyncio.gather( * batch_tasks)
results.extend(batch_results)
# Small delay between batches
await asyncio.sleep( 0.1 )
return results
Use asyncio.as_completed for Progressive Results
Process results as they become available: async def process_as_completed ():
client = Client( api_key = "your_api_key_here" )
# Create tasks
tasks = [
client.workflows.aget(workflow_id)
for workflow_id in workflow_ids
]
# Process results as they complete
for coro in asyncio.as_completed(tasks):
try :
workflow = await coro
print ( f "Processed: { workflow.name } " )
# Do something with the workflow immediately
except Exception as e:
print ( f "Error: { e } " )
Integration with Web Frameworks
FastAPI Integration
from fastapi import FastAPI, HTTPException
from noxus_sdk.client import Client
import os
app = FastAPI()
client = Client( api_key = os.getenv( "NOXUS_API_KEY" ))
@app.get ( "/workflows" )
async def list_workflows ():
try :
workflows = await client.workflows.alist()
return { "workflows" : [{ "id" : w.id, "name" : w.name} for w in workflows]}
except Exception as e:
raise HTTPException( status_code = 500 , detail = str (e))
@app.post ( "/workflows/ {workflow_id} /run" )
async def run_workflow ( workflow_id : str , input_data : dict ):
try :
workflow = await client.workflows.aget(workflow_id)
run = await workflow.arun( body = input_data)
result = await run.a_wait( interval = 2 )
return { "result" : result.output}
except Exception as e:
raise HTTPException( status_code = 500 , detail = str (e))
Django Async Views
from django.http import JsonResponse
from django.views.decorators.http import require_http_methods
from asgiref.sync import sync_to_async
from noxus_sdk.client import Client
import os
client = Client( api_key = os.getenv( "NOXUS_API_KEY" ))
@require_http_methods ([ "GET" ])
async def list_workflows ( request ):
try :
workflows = await client.workflows.alist()
data = { "workflows" : [{ "id" : w.id, "name" : w.name} for w in workflows]}
return JsonResponse(data)
except Exception as e:
return JsonResponse({ "error" : str (e)}, status = 500 )
Testing Async Code
Using pytest-asyncio
import pytest
import asyncio
from unittest.mock import AsyncMock, Mock
from noxus_sdk.client import Client
@pytest.mark.asyncio
async def test_async_workflow_creation ():
# Mock the client
mock_client = Mock( spec = Client)
mock_client.workflows.asave = AsyncMock()
mock_client.workflows.asave.return_value = Mock( id = "workflow_123" )
# Test async operation
result = await mock_client.workflows.asave(Mock())
assert result.id == "workflow_123"
mock_client.workflows.asave.assert_called_once()
@pytest.mark.asyncio
async def test_concurrent_operations ():
mock_client = Mock( spec = Client)
mock_client.workflows.alist = AsyncMock( return_value = [Mock( id = "1" ), Mock( id = "2" )])
mock_client.aget_models = AsyncMock( return_value = [{ "name" : "gpt-4" }])
# Test concurrent operations
workflows, models = await asyncio.gather(
mock_client.workflows.alist(),
mock_client.aget_models()
)
assert len (workflows) == 2
assert len (models) == 1
Next Steps
Error Handling Learn advanced error handling patterns for async code
Performance Optimization Discover best practices for high-performance applications
Workflow Examples See async patterns in workflow management
API Reference Explore all available async methods