Overview
The Noxus Client SDK provides comprehensive support for asynchronous operations, allowing you to build high-performance applications that can handle multiple concurrent requests efficiently. This is particularly useful for I/O-bound operations like API calls, file uploads, and long-running workflows.Why Use Async?
Better Performance
Handle multiple operations concurrently without blocking
Resource Efficiency
Use system resources more efficiently with non-blocking I/O
Scalability
Build applications that can handle more concurrent users
Responsiveness
Keep your application responsive during long operations
Async Method Pattern
All async methods in the SDK follow a consistent naming pattern - they’re prefixed witha:
Copy
Ask AI
# Synchronous methods
workflows = client.workflows.list()
workflow = client.workflows.get("workflow_id")
conversation = client.conversations.create(name="Test", settings=settings)
# Asynchronous equivalents
workflows = await client.workflows.alist()
workflow = await client.workflows.aget("workflow_id")
conversation = await client.conversations.acreate(name="Test", settings=settings)
Basic Async Usage
Simple Async Function
Copy
Ask AI
import asyncio
from noxus_sdk.client import Client
async def main():
client = Client(api_key="your_api_key_here")
# Async operations
workflows = await client.workflows.alist()
models = await client.aget_models()
print(f"Found {len(workflows)} workflows")
print(f"Available models: {len(models)}")
# Run the async function
asyncio.run(main())
Async Context Manager
For better resource management:Copy
Ask AI
import asyncio
import aiohttp
from noxus_sdk.client import Client
async def main():
client = Client(api_key="your_api_key_here")
try:
# Perform async operations
workflows = await client.workflows.alist()
for workflow in workflows[:3]: # Process first 3
details = await client.workflows.aget(workflow.id)
print(f"Workflow: {details.name}")
finally:
# Cleanup if needed
pass
asyncio.run(main())
Concurrent Operations
Running Multiple Operations Concurrently
Useasyncio.gather() to run multiple operations simultaneously:
Copy
Ask AI
import asyncio
from noxus_sdk.client import Client
async def get_platform_info():
client = Client(api_key="your_api_key_here")
# Run multiple operations concurrently
workflows_task = client.workflows.alist()
models_task = client.aget_models()
presets_task = client.aget_chat_presets()
nodes_task = client.aget_nodes()
# Wait for all to complete
workflows, models, presets, nodes = await asyncio.gather(
workflows_task,
models_task,
presets_task,
nodes_task
)
return {
"workflows": len(workflows),
"models": len(models),
"presets": len(presets),
"nodes": len(nodes)
}
# Usage
info = asyncio.run(get_platform_info())
print(info)
Processing Collections Concurrently
Process multiple items concurrently with controlled concurrency:Copy
Ask AI
import asyncio
from noxus_sdk.client import Client
async def process_workflow(client, workflow_id):
"""Process a single workflow"""
workflow = await client.workflows.aget(workflow_id)
runs = await client.runs.alist(workflow_id=workflow_id)
return {
"id": workflow.id,
"name": workflow.name,
"run_count": len(runs)
}
async def process_all_workflows():
client = Client(api_key="your_api_key_here")
# Get all workflows
workflows = await client.workflows.alist()
# Process workflows concurrently (limit concurrency to 5)
semaphore = asyncio.Semaphore(5)
async def process_with_semaphore(workflow_id):
async with semaphore:
return await process_workflow(client, workflow_id)
# Create tasks for all workflows
tasks = [
process_with_semaphore(workflow.id)
for workflow in workflows
]
# Wait for all to complete
results = await asyncio.gather(*tasks)
return results
# Usage
results = asyncio.run(process_all_workflows())
for result in results:
print(f"{result['name']}: {result['run_count']} runs")
Async Workflows
Creating and Running Workflows Asynchronously
Copy
Ask AI
import asyncio
from noxus_sdk.client import Client
from noxus_sdk.workflows import WorkflowDefinition
async def create_and_run_workflow():
client = Client(api_key="your_api_key_here")
# Create workflow definition
workflow_def = WorkflowDefinition(name="Async Workflow")
input_node = workflow_def.node("InputNode").config(
label="Input",
fixed_value=True,
value="Tell me about async programming",
type="str"
)
ai_node = workflow_def.node("TextGenerationNode").config(
template="Explain: ((Input 1))",
model=["gpt-4o-mini"]
)
output_node = workflow_def.node("OutputNode")
# Connect nodes
workflow_def.link(input_node.output(), ai_node.input("variables", "Input 1"))
workflow_def.link(ai_node.output(), output_node.input())
# Save workflow asynchronously
workflow = await client.workflows.asave(workflow_def)
print(f"Created workflow: {workflow.id}")
# Run workflow asynchronously
run = await workflow.arun(body={})
# Wait for completion asynchronously
result = await run.a_wait(interval=2)
return result
# Usage
result = asyncio.run(create_and_run_workflow())
print(f"Result: {result.output}")
Async Conversations
Handling Multiple Conversations
Copy
Ask AI
import asyncio
from noxus_sdk.client import Client
from noxus_sdk.resources.conversations import ConversationSettings, MessageRequest
async def handle_multiple_conversations():
client = Client(api_key="your_api_key_here")
settings = ConversationSettings(
model=["gpt-4o-mini"],
temperature=0.7,
max_tokens=150
)
# Create multiple conversations concurrently
conversation_tasks = [
client.conversations.acreate(
name=f"Conversation {i}",
settings=settings
)
for i in range(3)
]
conversations = await asyncio.gather(*conversation_tasks)
# Send messages to all conversations concurrently
message_tasks = []
for i, conv in enumerate(conversations):
message = MessageRequest(content=f"Hello from conversation {i}!")
message_tasks.append(conv.aadd_message(message))
responses = await asyncio.gather(*message_tasks)
# Print responses
for i, response in enumerate(responses):
print(f"Conversation {i}: {response.message_parts}")
return conversations
# Usage
conversations = asyncio.run(handle_multiple_conversations())
Async Knowledge Bases
Document Processing with Async
Copy
Ask AI
import asyncio
from noxus_sdk.client import Client
from noxus_sdk.resources.knowledge_bases import KnowledgeBaseSettings
async def process_knowledge_base():
client = Client(api_key="your_api_key_here")
# Create knowledge base
kb = await client.knowledge_bases.acreate(
name="Async KB",
description="Created asynchronously",
document_types=["pdf", "txt"]
)
# Upload multiple documents concurrently
files = ["doc1.txt", "doc2.txt", "doc3.txt"]
upload_tasks = [
kb.aupload_document(files=[file], prefix=f"/docs/{file}")
for file in files
]
run_ids = await asyncio.gather(*upload_tasks)
print(f"Started {len(run_ids)} upload processes")
# Monitor processing status
while True:
kb = await kb.arefresh()
if kb.status == "ready":
break
print(f"KB status: {kb.status}")
await asyncio.sleep(5)
print("Knowledge base ready!")
return kb
# Usage
kb = asyncio.run(process_knowledge_base())
Error Handling in Async Code
Handling Individual Errors
Copy
Ask AI
import asyncio
import httpx
from noxus_sdk.client import Client
async def safe_async_operation():
client = Client(api_key="your_api_key_here")
try:
workflows = await client.workflows.alist()
return workflows
except httpx.HTTPStatusError as e:
print(f"HTTP error: {e.response.status_code}")
return []
except httpx.RequestError as e:
print(f"Network error: {e}")
return []
except Exception as e:
print(f"Unexpected error: {e}")
return []
# Usage
workflows = asyncio.run(safe_async_operation())
Handling Errors in Concurrent Operations
Copy
Ask AI
import asyncio
import httpx
from noxus_sdk.client import Client
async def fetch_workflow_safe(client, workflow_id):
"""Safely fetch a workflow with error handling"""
try:
return await client.workflows.aget(workflow_id)
except httpx.HTTPStatusError as e:
print(f"Failed to fetch workflow {workflow_id}: {e.response.status_code}")
return None
except Exception as e:
print(f"Error fetching workflow {workflow_id}: {e}")
return None
async def fetch_multiple_workflows_safe():
client = Client(api_key="your_api_key_here")
# Get workflow IDs
workflows = await client.workflows.alist()
workflow_ids = [w.id for w in workflows[:5]] # First 5
# Fetch details concurrently with error handling
tasks = [
fetch_workflow_safe(client, workflow_id)
for workflow_id in workflow_ids
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter out None results and exceptions
successful_results = [
result for result in results
if result is not None and not isinstance(result, Exception)
]
print(f"Successfully fetched {len(successful_results)} workflows")
return successful_results
# Usage
workflows = asyncio.run(fetch_multiple_workflows_safe())
Performance Tips
Use Semaphores for Rate Limiting
Use Semaphores for Rate Limiting
Control concurrency to avoid overwhelming the API:
Copy
Ask AI
async def rate_limited_operations():
semaphore = asyncio.Semaphore(5) # Max 5 concurrent operations
async def limited_operation(item):
async with semaphore:
return await process_item(item)
tasks = [limited_operation(item) for item in items]
return await asyncio.gather(*tasks)
Batch Operations When Possible
Batch Operations When Possible
Group related operations to reduce API calls:
Copy
Ask AI
async def batch_workflow_creation():
# Create multiple workflows in one batch
workflow_defs = [create_workflow_def(i) for i in range(10)]
# Process in batches of 3
batch_size = 3
results = []
for i in range(0, len(workflow_defs), batch_size):
batch = workflow_defs[i:i + batch_size]
batch_tasks = [client.workflows.asave(wf) for wf in batch]
batch_results = await asyncio.gather(*batch_tasks)
results.extend(batch_results)
# Small delay between batches
await asyncio.sleep(0.1)
return results
Use asyncio.as_completed for Progressive Results
Use asyncio.as_completed for Progressive Results
Process results as they become available:
Copy
Ask AI
async def process_as_completed():
client = Client(api_key="your_api_key_here")
# Create tasks
tasks = [
client.workflows.aget(workflow_id)
for workflow_id in workflow_ids
]
# Process results as they complete
for coro in asyncio.as_completed(tasks):
try:
workflow = await coro
print(f"Processed: {workflow.name}")
# Do something with the workflow immediately
except Exception as e:
print(f"Error: {e}")
Integration with Web Frameworks
FastAPI Integration
Copy
Ask AI
from fastapi import FastAPI, HTTPException
from noxus_sdk.client import Client
import os
app = FastAPI()
client = Client(api_key=os.getenv("NOXUS_API_KEY"))
@app.get("/workflows")
async def list_workflows():
try:
workflows = await client.workflows.alist()
return {"workflows": [{"id": w.id, "name": w.name} for w in workflows]}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/workflows/{workflow_id}/run")
async def run_workflow(workflow_id: str, input_data: dict):
try:
workflow = await client.workflows.aget(workflow_id)
run = await workflow.arun(body=input_data)
result = await run.a_wait(interval=2)
return {"result": result.output}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
Django Async Views
Copy
Ask AI
from django.http import JsonResponse
from django.views.decorators.http import require_http_methods
from asgiref.sync import sync_to_async
from noxus_sdk.client import Client
import os
client = Client(api_key=os.getenv("NOXUS_API_KEY"))
@require_http_methods(["GET"])
async def list_workflows(request):
try:
workflows = await client.workflows.alist()
data = {"workflows": [{"id": w.id, "name": w.name} for w in workflows]}
return JsonResponse(data)
except Exception as e:
return JsonResponse({"error": str(e)}, status=500)
Testing Async Code
Using pytest-asyncio
Copy
Ask AI
import pytest
import asyncio
from unittest.mock import AsyncMock, Mock
from noxus_sdk.client import Client
@pytest.mark.asyncio
async def test_async_workflow_creation():
# Mock the client
mock_client = Mock(spec=Client)
mock_client.workflows.asave = AsyncMock()
mock_client.workflows.asave.return_value = Mock(id="workflow_123")
# Test async operation
result = await mock_client.workflows.asave(Mock())
assert result.id == "workflow_123"
mock_client.workflows.asave.assert_called_once()
@pytest.mark.asyncio
async def test_concurrent_operations():
mock_client = Mock(spec=Client)
mock_client.workflows.alist = AsyncMock(return_value=[Mock(id="1"), Mock(id="2")])
mock_client.aget_models = AsyncMock(return_value=[{"name": "gpt-4"}])
# Test concurrent operations
workflows, models = await asyncio.gather(
mock_client.workflows.alist(),
mock_client.aget_models()
)
assert len(workflows) == 2
assert len(models) == 1