Python SDK Reference
Complete API reference for the AIM Python SDK. The SDK provides a high-level, Pythonic interface for agent registration, capability management, MCP integration, and secure operations.
v1.21.0StablePython 3.8+
Installation
Install via pip: The AIM SDK is available on PyPI. Use pip install aim-sdk then aim-sdk login to authenticate.
# Install from PyPI
pip install aim-sdk
# Login to AIM (opens browser for secure OAuth authentication)
aim-sdk login # AIM Cloud (aim.opena2a.org)
aim-sdk login --url http://localhost:8080 # Self-hosted
# Check authentication status
aim-sdk status
# Start using the SDK
python -c "from aim_sdk import secure; agent = secure('my-agent')"
# The aim-sdk login command:
# - Opens your browser for OAuth authentication (Google, etc.)
# - Uses PKCE (Proof Key for Code Exchange) for security
# - Saves credentials to ~/.aim/sdk_credentials.json
# - No password prompts or permission dialogsAuthentication:
- 1. Run
aim-sdk login - 2. Browser opens for secure OAuth authentication
- 3. Sign in with Google (or other provider)
- 4. Credentials saved automatically - start coding!
Quick Start
from aim_sdk import secure
# Zero-configuration mode (uses embedded credentials)
agent = secure("my-agent")
# The SDK automatically:
# - Registers the agent with AIM
# - Generates Ed25519 keypair
# - Manages authentication tokens
# - Auto-detects agent type from imports
# - Auto-detects capabilities
# Secure a function with capability verification
@agent.perform_action("db:read", resource="users_table")
def get_users():
return database.query("SELECT * FROM users")
# The decorator verifies capability with AIM before execution
result = get_users()Core Classes & Functions
Module Imports
# Core Classes and Functions
from aim_sdk import (
secure, # One-line agent registration (alias for register_agent)
AIMClient, # Main client class (returned by secure())
AgentType, # Agent type constants (LANGCHAIN, CLAUDE, GPT, etc.)
# Exceptions
AIMError, # Base exception class
AuthenticationError, # Authentication failures
VerificationError, # Verification failures
ActionDeniedError, # Capability denied by AIM
# Auto-detection utilities
auto_detect_capabilities, # Detect capabilities from code
auto_detect_agent_type, # Detect agent type from imports
# Security logging (SOC/SIEM integration)
SecurityLogger, # Structured security event logger
configure_security_logging, # Configure from environment
# Credential management
load_sdk_credentials, # Load saved OAuth credentials
save_sdk_credentials, # Save OAuth credentials
# A2A (Agent-to-Agent) protocol
A2AClient, # Agent-to-agent communication
A2AAgentCard, # Agent identity card for A2A
)AIMClient Class Methods
# AIMClient Class Methods (returned by secure())
class AIMClient:
"""Main client class for interacting with AIM"""
# Capability Verification
def verify_capability(
self,
capability: str,
resource: str = None,
context: Dict = None,
timeout_seconds: int = 300
) -> Dict:
"""Request verification for a capability from AIM.
Returns dict with: verified, verification_id, approved_by, expires_at"""
def perform_action(
self,
capability: str = None,
resource: str = None,
context: Dict = None,
timeout_seconds: int = 300,
jit_access: bool = False,
risk_level: str = None,
auto_register: bool = True
):
"""Decorator for automatic capability verification and action tracking.
Verifies with AIM before execution, logs results after."""
# Capability Management
def register_capability(
self,
capability_type: str,
description: str = "",
risk_level: str = "medium"
) -> Dict:
"""Register a capability this agent intends to use."""
def request_capability(
self,
capability_type: str,
reason: str,
metadata: Dict = None
) -> Dict:
"""Request an additional capability (requires admin approval)."""
# Agent Management (programmatic)
def create_new_agent(
self,
name: str,
display_name: str = None,
description: str = None,
agent_type: str = "ai_agent",
capabilities: List[str] = None,
mcp_servers: List[str] = None
) -> Dict:
"""Create/register a new agent through an authenticated client."""
def list_agents(self) -> Dict:
"""List all agents in the organization."""
def get_agent_details(self, agent_id: str) -> Dict:
"""Get detailed information about a specific agent."""
def update_agent(self, display_name: str = None, ...) -> Dict:
"""Update agent properties."""
def delete_agent(self, agent_id: str) -> Dict:
"""Delete an agent."""Decorators & Clean Code
Decorator-Based API
# @perform_action Decorator for Clean Code
from aim_sdk import secure
agent = secure("data-processor")
# Simple usage - function name becomes the capability
@agent.perform_action()
def search_flights(destination):
"""Capability auto-detected as 'search_flights'"""
return api.search(destination)
# Explicit capability with resource tracking
@agent.perform_action("db:read", resource="users_table")
def get_users():
"""Verified against AIM before execution"""
return database.query("SELECT * FROM users")
# JIT Access - requires real-time admin approval
@agent.perform_action(
capability="payment:refund",
jit_access=True,
risk_level="critical"
)
def process_refund(order_id: str, amount: float):
"""Waits for admin approval in AIM dashboard before executing"""
return stripe.refund(order_id, amount)
# Risk levels: "low", "medium", "high", "critical"
# Auto-detected from capability name if not specified
@agent.perform_action("file:write", risk_level="medium")
def save_report(path: str, data: dict):
"""Risk level determines approval requirements"""
with open(path, "w") as f:
json.dump(data, f)
# Usage - decorators handle verification transparently
result = get_users() # Verified, then executed
refund = process_refund("ord_123", 50.00) # Waits for approvalConfiguration
Configuration Options
# Configuration Options
from aim_sdk import secure, AIMClient, AgentType
# -----------------------------------------------
# 1. Zero-Config Mode (Recommended - uses OAuth)
# -----------------------------------------------
# Run "aim-sdk login" first, then:
agent = secure("my-agent")
# Credentials loaded from ~/.aim/sdk_credentials.json
# -----------------------------------------------
# 2. With API Key
# -----------------------------------------------
agent = secure("my-agent", api_key="aim_abc123")
# -----------------------------------------------
# 3. Explicit Agent Type (skip auto-detection)
# -----------------------------------------------
agent = secure(
"my-agent",
agent_type=AgentType.LANGCHAIN,
auto_detect=False,
capabilities=["db:read", "api:call"]
)
# -----------------------------------------------
# 4. With MCP Servers (from Claude Desktop config)
# -----------------------------------------------
agent = secure(
"my-agent",
mcp_servers=["github", "filesystem"] # Must be in Claude Desktop config
)
# -----------------------------------------------
# 5. Full Manual Configuration (AIMClient directly)
# -----------------------------------------------
client = AIMClient(
agent_id="550e8400-e29b-41d4-a716-446655440000",
public_key="base64-ed25519-public-key",
private_key="base64-ed25519-private-key",
aim_url="https://aim.example.com",
timeout=30, # HTTP request timeout (seconds)
auto_retry=True, # Auto-retry failed requests
max_retries=3 # Max retry attempts
)
# -----------------------------------------------
# 6. Custom AIM Server URL
# -----------------------------------------------
agent = secure(
"my-agent",
aim_url="http://localhost:8080" # Self-hosted AIM
)Async/Await Support
# Using perform_action with Async Functions
from aim_sdk import secure
agent = secure("my-agent")
# The @perform_action decorator works with regular functions.
# Capability verification is synchronous (HTTP request to AIM),
# but your decorated function executes normally after approval.
@agent.perform_action("db:read", resource="users_table")
def get_users():
"""Verification happens before execution"""
return database.query("SELECT * FROM users")
# For async workflows, use verify_capability() directly
import asyncio
async def process_data_async():
# Verify capability first (synchronous call to AIM)
result = agent.verify_capability(
capability="data:process",
resource="dataset_v2"
)
if result["verified"]:
# Run your async logic after verification
data = await fetch_data()
processed = await transform(data)
return processed
# Agent management methods work in any context
agents = agent.list_agents()
for a in agents.get("agents", []):
print(f"{a['name']}: {a['status']}")
# JIT access with timeout (waits for admin approval)
@agent.perform_action(
capability="system:admin",
jit_access=True,
timeout_seconds=600 # Wait up to 10 minutes for approval
)
def admin_operation():
return system.execute_admin_task()Error Handling
Exception Hierarchy
# Error Handling
from aim_sdk import (
secure,
AIMError, # Base exception for all AIM errors
AuthenticationError, # Auth token or credential issues
VerificationError, # Verification request failures
ActionDeniedError, # Capability explicitly denied by AIM
)
from aim_sdk.exceptions import ConfigurationError # SDK misconfiguration
# Registration errors
try:
agent = secure("my-agent")
except ConfigurationError as e:
# SDK is misconfigured (missing credentials, invalid URL)
print(f"Configuration error: {e}")
except AuthenticationError as e:
# Authentication failed (bad credentials, expired OAuth token)
# Fix: run "aim-sdk login" to refresh OAuth credentials
print(f"Auth error: {e}")
# Capability verification errors
try:
@agent.perform_action("db:write", resource="users_table")
def update_user(user_id, data):
return database.update(user_id, data)
update_user("user-123", {"name": "Alice"})
except ActionDeniedError as e:
# AIM explicitly denied the capability
# Agent does not have permission for this action
print(f"Action denied: {e}")
except VerificationError as e:
# Verification request failed (network, timeout, server error)
print(f"Verification failed: {e}")
except AIMError as e:
# Base exception - catches all AIM SDK errors
print(f"AIM error: {e}")
# Direct capability verification with error handling
try:
result = agent.verify_capability(
capability="email:send",
resource="admin@example.com"
)
if result["verified"]:
send_email("admin@example.com", "Report ready")
except ActionDeniedError:
print("Not authorized to send emails")MCP Server Integration
# MCP Server Integration
from aim_sdk import secure
# Register agent with MCP servers from Claude Desktop config
# The SDK auto-discovers server URLs and capabilities from Claude config
agent = secure(
"mcp-agent",
mcp_servers=["github", "filesystem", "data-processor"]
)
# Or auto-detect all MCP servers from Claude Desktop config
agent = secure("mcp-agent", auto_detect_mcp=True)
# Manual MCP server registration via the integration module
from aim_sdk.integrations.mcp.registration import register_mcp_server
server_info = register_mcp_server(
aim_client=agent,
server_name="research-mcp",
server_url="http://localhost:3000",
public_key="base64-ed25519-public-key",
capabilities=["tools", "resources"],
description="Research assistant MCP server",
version="1.0.0"
)
print(f"Server registered: {server_info['id']}")
print(f"Trust score: {server_info['trust_score']}")
# MCP auto-detection utilities
from aim_sdk import auto_detect_mcps, MCPDetector
# Detect MCP servers from environment
detected = auto_detect_mcps()
for server in detected:
print(f"Found MCP server: {server['name']}")
print(f" Command: {server.get('command', 'N/A')}")
print(f" Capabilities: {server.get('capabilities', [])}")
# Track MCP tool calls for audit logging
from aim_sdk import track_mcp_call
track_mcp_call("github", "create_issue", {"title": "Bug fix"})CLI Tools
Command-Line Interface
# CLI Tools (aim-sdk command)
# Login to AIM (opens browser for OAuth authentication)
$ aim-sdk login
Opening browser for authentication...
Login successful! Credentials saved to ~/.aim/sdk_credentials.json
# Login to self-hosted AIM server
$ aim-sdk login --url http://localhost:8080
Opening browser for authentication at http://localhost:8080...
Login successful!
# Force re-authentication
$ aim-sdk login --force
# Check authentication status
$ aim-sdk status
AIM SDK Status:
Server: https://aim.opena2a.org
User: user@example.com
Organization: My Org
Token: Valid (expires in 23h)
SDK Token: Active
# Show SDK version
$ aim-sdk version
aim-sdk 1.21.0
# Logout and clear credentials
$ aim-sdk logout
Credentials cleared from ~/.aim/sdk_credentials.json
# After login, use the SDK in Python:
$ python -c "
from aim_sdk import secure
agent = secure('my-agent')
print(f'Agent registered: {agent.agent_id}')
"Testing & Development
# Testing & Development
import pytest
from unittest.mock import patch, MagicMock
from aim_sdk import AIMClient, AIMError, ActionDeniedError
# Unit testing - mock the AIM client methods
def test_perform_action_approved():
"""Test that a decorated function executes when AIM approves"""
client = MagicMock(spec=AIMClient)
client.verify_capability.return_value = {
"verified": True,
"verification_id": "ver-123",
"approved_by": "admin",
"expires_at": "2026-01-01T00:00:00Z"
}
# Test your business logic independently
result = get_users_from_db()
assert len(result) > 0
def test_perform_action_denied():
"""Test behavior when AIM denies a capability"""
client = MagicMock(spec=AIMClient)
client.verify_capability.side_effect = ActionDeniedError(
"Agent not authorized for db:write"
)
with pytest.raises(ActionDeniedError):
client.verify_capability("db:write", resource="users")
# Integration testing with a real AIM server
@pytest.fixture
def aim_client():
"""Create a test client against local AIM"""
from aim_sdk import secure
return secure(
"test-agent",
aim_url="http://localhost:8080",
force_new=True
)
def test_agent_registration(aim_client):
assert aim_client.agent_id is not None
def test_capability_verification(aim_client):
result = aim_client.verify_capability("db:read", resource="test_table")
assert result["verified"] is True
def test_request_capability(aim_client):
result = aim_client.request_capability(
capability_type="db:write",
reason="Need write access for integration tests"
)
assert result["status"] in ["pending", "approved"]Advanced Features
# Advanced Features
from aim_sdk import secure, AgentType
# 1. Agent Management (create agents programmatically)
admin = secure("admin-agent")
new_agent = admin.create_new_agent(
name="data-processor",
display_name="Data Processor Agent",
description="Processes incoming data feeds",
agent_type=AgentType.LANGCHAIN,
capabilities=["db:read", "file:write", "api:call"]
)
print(f"Created agent: {new_agent['id']}")
# List all agents in the organization
agents = admin.list_agents()
for a in agents.get("agents", []):
print(f"{a['name']}: {a['status']} (trust: {a.get('trust_score', 'N/A')})")
# Get/update/delete agents
details = admin.get_agent_details(new_agent["id"])
admin.update_agent(display_name="Updated Processor")
# admin.delete_agent(agent_id)
# 2. Security Logging (SOC/SIEM Integration)
from aim_sdk import (
SecurityLogger, configure_security_logging,
EventSeverity, AuthzEventType
)
# Configure from environment (SECURITY_LOG_FILE, SECURITY_LOG_FORMAT, etc.)
configure_security_logging()
# Or configure programmatically
logger = SecurityLogger(log_file="/var/log/aim/security.log")
# 3. A2A (Agent-to-Agent) Protocol
from aim_sdk import A2AClient, A2AAgentCard
# Create an A2A client for agent-to-agent communication
a2a = A2AClient.from_env()
# Create an agent card (identity for A2A)
card = A2AAgentCard(
agent_id=admin.agent_id,
name="admin-agent",
capabilities=["db:read", "api:call"]
)
# 4. Credential Management
from aim_sdk import (
load_agent_credentials, save_agent_credentials,
list_agent_credentials, delete_agent_credentials
)
# List all saved agent credentials
creds = list_agent_credentials()
for name, info in creds.items():
print(f"Agent: {name}, Server: {info.get('aim_url', 'N/A')}")
# 5. Attestation Caching
from aim_sdk import AttestationCache
cache = AttestationCache()
# Caches verification results to reduce AIM server round-tripsSDK Features
Security
- • Ed25519 signatures
- • Automatic token refresh
- • Secure credential storage
- • SSL/TLS verification
- • Rate limiting
Reliability
- • Automatic retries
- • Attestation caching
- • Connection pooling
- • Graceful degradation
- • Credential persistence
Performance
- • Auto-detection (type, capabilities, MCP)
- • Connection reuse
- • JIT access (just-in-time approval)
- • Risk-level auto-detection
- • SOC/SIEM security logging
Version Compatibility
| SDK Version | Python Version | AIM API Version |
|---|---|---|
| 1.0.x | 3.8+ | v1 |
| 0.9.x | 3.7+ | v1-beta |
| 0.8.x | 3.7+ | v1-alpha |