Skip to main content

Overview

The Noxus Client SDK provides comprehensive support for asynchronous operations, allowing you to build high-performance applications that can handle multiple concurrent requests efficiently. This is particularly useful for I/O-bound operations like API calls, file uploads, and long-running workflows.

Why Use Async?

Better Performance

Handle multiple operations concurrently without blocking

Resource Efficiency

Use system resources more efficiently with non-blocking I/O

Scalability

Build applications that can handle more concurrent users

Responsiveness

Keep your application responsive during long operations

Async Method Pattern

All async methods in the SDK follow a consistent naming pattern - they’re prefixed with a:
# Synchronous methods
workflows = client.workflows.list()
workflow = client.workflows.get("workflow_id")
conversation = client.conversations.create(name="Test", settings=settings)

# Asynchronous equivalents
workflows = await client.workflows.alist()
workflow = await client.workflows.aget("workflow_id")
conversation = await client.conversations.acreate(name="Test", settings=settings)

Basic Async Usage

Simple Async Function

import asyncio
from noxus_sdk.client import Client

async def main():
    client = Client(api_key="your_api_key_here")

    # Async operations
    workflows = await client.workflows.alist()
    models = await client.aget_models()

    print(f"Found {len(workflows)} workflows")
    print(f"Available models: {len(models)}")

# Run the async function
asyncio.run(main())

Async Context Manager

For better resource management:
import asyncio
import aiohttp
from noxus_sdk.client import Client

async def main():
    client = Client(api_key="your_api_key_here")

    try:
        # Perform async operations
        workflows = await client.workflows.alist()

        for workflow in workflows[:3]:  # Process first 3
            details = await client.workflows.aget(workflow.id)
            print(f"Workflow: {details.name}")

    finally:
        # Cleanup if needed
        pass

asyncio.run(main())

Concurrent Operations

Running Multiple Operations Concurrently

Use asyncio.gather() to run multiple operations simultaneously:
import asyncio
from noxus_sdk.client import Client

async def get_platform_info():
    client = Client(api_key="your_api_key_here")

    # Run multiple operations concurrently
    workflows_task = client.workflows.alist()
    models_task = client.aget_models()
    presets_task = client.aget_chat_presets()
    nodes_task = client.aget_nodes()

    # Wait for all to complete
    workflows, models, presets, nodes = await asyncio.gather(
        workflows_task,
        models_task,
        presets_task,
        nodes_task
    )

    return {
        "workflows": len(workflows),
        "models": len(models),
        "presets": len(presets),
        "nodes": len(nodes)
    }

# Usage
info = asyncio.run(get_platform_info())
print(info)

Processing Collections Concurrently

Process multiple items concurrently with controlled concurrency:
import asyncio
from noxus_sdk.client import Client

async def process_workflow(client, workflow_id):
    """Process a single workflow"""
    workflow = await client.workflows.aget(workflow_id)
    runs = await client.runs.alist(workflow_id=workflow_id)

    return {
        "id": workflow.id,
        "name": workflow.name,
        "run_count": len(runs)
    }

async def process_all_workflows():
    client = Client(api_key="your_api_key_here")

    # Get all workflows
    workflows = await client.workflows.alist()

    # Process workflows concurrently (limit concurrency to 5)
    semaphore = asyncio.Semaphore(5)

    async def process_with_semaphore(workflow_id):
        async with semaphore:
            return await process_workflow(client, workflow_id)

    # Create tasks for all workflows
    tasks = [
        process_with_semaphore(workflow.id)
        for workflow in workflows
    ]

    # Wait for all to complete
    results = await asyncio.gather(*tasks)

    return results

# Usage
results = asyncio.run(process_all_workflows())
for result in results:
    print(f"{result['name']}: {result['run_count']} runs")

Async Workflows

Creating and Running Workflows Asynchronously

import asyncio
from noxus_sdk.client import Client
from noxus_sdk.workflows import WorkflowDefinition

async def create_and_run_workflow():
    client = Client(api_key="your_api_key_here")

    # Create workflow definition
    workflow_def = WorkflowDefinition(name="Async Workflow")

    input_node = workflow_def.node("InputNode").config(
        label="Input",
        fixed_value=True,
        value="Tell me about async programming",
        type="str"
    )

    ai_node = workflow_def.node("TextGenerationNode").config(
        template="Explain: ((Input 1))",
        model=["gpt-4o-mini"]
    )

    output_node = workflow_def.node("OutputNode")

    # Connect nodes
    workflow_def.link(input_node.output(), ai_node.input("variables", "Input 1"))
    workflow_def.link(ai_node.output(), output_node.input())

    # Save workflow asynchronously
    workflow = await client.workflows.asave(workflow_def)
    print(f"Created workflow: {workflow.id}")

    # Run workflow asynchronously
    run = await workflow.arun(body={})

    # Wait for completion asynchronously
    result = await run.a_wait(interval=2)

    return result

# Usage
result = asyncio.run(create_and_run_workflow())
print(f"Result: {result.output}")

Async Conversations

Handling Multiple Conversations

import asyncio
from noxus_sdk.client import Client
from noxus_sdk.resources.conversations import ConversationSettings, MessageRequest

async def handle_multiple_conversations():
    client = Client(api_key="your_api_key_here")

    settings = ConversationSettings(
        model=["gpt-4o-mini"],
        temperature=0.7,
        max_tokens=150
    )

    # Create multiple conversations concurrently
    conversation_tasks = [
        client.conversations.acreate(
            name=f"Conversation {i}",
            settings=settings
        )
        for i in range(3)
    ]

    conversations = await asyncio.gather(*conversation_tasks)

    # Send messages to all conversations concurrently
    message_tasks = []
    for i, conv in enumerate(conversations):
        message = MessageRequest(content=f"Hello from conversation {i}!")
        message_tasks.append(conv.aadd_message(message))

    responses = await asyncio.gather(*message_tasks)

    # Print responses
    for i, response in enumerate(responses):
        print(f"Conversation {i}: {response.message_parts}")

    return conversations

# Usage
conversations = asyncio.run(handle_multiple_conversations())

Async Knowledge Bases

Document Processing with Async

import asyncio
from noxus_sdk.client import Client
from noxus_sdk.resources.knowledge_bases import KnowledgeBaseSettings

async def process_knowledge_base():
    client = Client(api_key="your_api_key_here")

    # Create knowledge base
    kb = await client.knowledge_bases.acreate(
        name="Async KB",
        description="Created asynchronously",
        document_types=["pdf", "txt"]
    )

    # Upload multiple documents concurrently
    files = ["doc1.txt", "doc2.txt", "doc3.txt"]
    upload_tasks = [
        kb.aupload_document(files=[file], prefix=f"/docs/{file}")
        for file in files
    ]

    run_ids = await asyncio.gather(*upload_tasks)
    print(f"Started {len(run_ids)} upload processes")

    # Monitor processing status
    while True:
        kb = await kb.arefresh()
        if kb.status == "ready":
            break
        print(f"KB status: {kb.status}")
        await asyncio.sleep(5)

    print("Knowledge base ready!")
    return kb

# Usage
kb = asyncio.run(process_knowledge_base())

Error Handling in Async Code

Handling Individual Errors

import asyncio
import httpx
from noxus_sdk.client import Client

async def safe_async_operation():
    client = Client(api_key="your_api_key_here")

    try:
        workflows = await client.workflows.alist()
        return workflows

    except httpx.HTTPStatusError as e:
        print(f"HTTP error: {e.response.status_code}")
        return []

    except httpx.RequestError as e:
        print(f"Network error: {e}")
        return []

    except Exception as e:
        print(f"Unexpected error: {e}")
        return []

# Usage
workflows = asyncio.run(safe_async_operation())

Handling Errors in Concurrent Operations

import asyncio
import httpx
from noxus_sdk.client import Client

async def fetch_workflow_safe(client, workflow_id):
    """Safely fetch a workflow with error handling"""
    try:
        return await client.workflows.aget(workflow_id)
    except httpx.HTTPStatusError as e:
        print(f"Failed to fetch workflow {workflow_id}: {e.response.status_code}")
        return None
    except Exception as e:
        print(f"Error fetching workflow {workflow_id}: {e}")
        return None

async def fetch_multiple_workflows_safe():
    client = Client(api_key="your_api_key_here")

    # Get workflow IDs
    workflows = await client.workflows.alist()
    workflow_ids = [w.id for w in workflows[:5]]  # First 5

    # Fetch details concurrently with error handling
    tasks = [
        fetch_workflow_safe(client, workflow_id)
        for workflow_id in workflow_ids
    ]

    results = await asyncio.gather(*tasks, return_exceptions=True)

    # Filter out None results and exceptions
    successful_results = [
        result for result in results
        if result is not None and not isinstance(result, Exception)
    ]

    print(f"Successfully fetched {len(successful_results)} workflows")
    return successful_results

# Usage
workflows = asyncio.run(fetch_multiple_workflows_safe())

Performance Tips

Control concurrency to avoid overwhelming the API:
async def rate_limited_operations():
    semaphore = asyncio.Semaphore(5)  # Max 5 concurrent operations
    
    async def limited_operation(item):
        async with semaphore:
            return await process_item(item)
    
    tasks = [limited_operation(item) for item in items]
    return await asyncio.gather(*tasks)
Group related operations to reduce API calls:
async def batch_workflow_creation():
    # Create multiple workflows in one batch
    workflow_defs = [create_workflow_def(i) for i in range(10)]
    
    # Process in batches of 3
    batch_size = 3
    results = []
    
    for i in range(0, len(workflow_defs), batch_size):
        batch = workflow_defs[i:i + batch_size]
        batch_tasks = [client.workflows.asave(wf) for wf in batch]
        batch_results = await asyncio.gather(*batch_tasks)
        results.extend(batch_results)
        
        # Small delay between batches
        await asyncio.sleep(0.1)
    
    return results
Process results as they become available:
async def process_as_completed():
    client = Client(api_key="your_api_key_here")
    
    # Create tasks
    tasks = [
        client.workflows.aget(workflow_id)
        for workflow_id in workflow_ids
    ]
    
    # Process results as they complete
    for coro in asyncio.as_completed(tasks):
        try:
            workflow = await coro
            print(f"Processed: {workflow.name}")
            # Do something with the workflow immediately
        except Exception as e:
            print(f"Error: {e}")

Integration with Web Frameworks

FastAPI Integration

from fastapi import FastAPI, HTTPException
from noxus_sdk.client import Client
import os

app = FastAPI()
client = Client(api_key=os.getenv("NOXUS_API_KEY"))

@app.get("/workflows")
async def list_workflows():
    try:
        workflows = await client.workflows.alist()
        return {"workflows": [{"id": w.id, "name": w.name} for w in workflows]}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/workflows/{workflow_id}/run")
async def run_workflow(workflow_id: str, input_data: dict):
    try:
        workflow = await client.workflows.aget(workflow_id)
        run = await workflow.arun(body=input_data)
        result = await run.a_wait(interval=2)
        return {"result": result.output}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

Django Async Views

from django.http import JsonResponse
from django.views.decorators.http import require_http_methods
from asgiref.sync import sync_to_async
from noxus_sdk.client import Client
import os

client = Client(api_key=os.getenv("NOXUS_API_KEY"))

@require_http_methods(["GET"])
async def list_workflows(request):
    try:
        workflows = await client.workflows.alist()
        data = {"workflows": [{"id": w.id, "name": w.name} for w in workflows]}
        return JsonResponse(data)
    except Exception as e:
        return JsonResponse({"error": str(e)}, status=500)

Testing Async Code

Using pytest-asyncio

import pytest
import asyncio
from unittest.mock import AsyncMock, Mock
from noxus_sdk.client import Client

@pytest.mark.asyncio
async def test_async_workflow_creation():
    # Mock the client
    mock_client = Mock(spec=Client)
    mock_client.workflows.asave = AsyncMock()
    mock_client.workflows.asave.return_value = Mock(id="workflow_123")

    # Test async operation
    result = await mock_client.workflows.asave(Mock())

    assert result.id == "workflow_123"
    mock_client.workflows.asave.assert_called_once()

@pytest.mark.asyncio
async def test_concurrent_operations():
    mock_client = Mock(spec=Client)
    mock_client.workflows.alist = AsyncMock(return_value=[Mock(id="1"), Mock(id="2")])
    mock_client.aget_models = AsyncMock(return_value=[{"name": "gpt-4"}])

    # Test concurrent operations
    workflows, models = await asyncio.gather(
        mock_client.workflows.alist(),
        mock_client.aget_models()
    )

    assert len(workflows) == 2
    assert len(models) == 1

Next Steps