Python SDK Integration
The official Python SDK for AIM provides one-line agent registration with automatic identity verification, capability detection, and MCP server discovery. Built with the simplicity of Stripe.
System Requirements
Prerequisites
- • Python 3.7+ (3.10+ recommended)
- • keyring - Required for secure credential storage (will fail without it!)
- • PyNaCl - For Ed25519 cryptographic operations
- • requests - For HTTP communication
- • cryptography - For additional security features
⚠️ Important: The SDK will fail with "ModuleNotFoundError: No module named 'keyring'" if keyring is not installed. This is a hard requirement, not optional.
Installation
⚠️ NO pip install Available
# Download SDK from AIM Dashboard (Settings → SDK Download)
# There is NO pip package - SDK contains your pre-configured credentials📦 Download SDK from Dashboard
The AIM SDK must be downloaded from your AIM dashboard as it contains your pre-configured credentials (zero config mode).
After downloading, install required dependencies:
pip install keyring PyNaCl requests cryptographyNote: keyring is required for secure credential storage. On Linux: sudo apt-get install python3-keyring
Why no pip package? The SDK contains your personal credentials and cannot be distributed via PyPI.
✅ Download Pre-Configured SDK (ONLY Option)
This is the ONLY way to get the AIM SDK. It comes with your embedded credentials - zero configuration required.
- 1. Login to AIM dashboard
- 2. Navigate to Settings → SDK Download
- 3. Click "Download Python SDK"
- 4. Extract the ZIP file to your project directory
- 5. Import and use:
from aim_sdk import secure
💡 Tip: The downloaded SDK already contains your API keys and organization credentials. No configuration needed!
Quick Start
⚡ The Stripe Moment - One Line Registration
from aim_sdk import secure, AgentType
# Full-featured agent registration
agent = secure(
"my-ai-assistant",
agent_type=AgentType.LANGCHAIN, # CREWAI, AUTOGEN, GPT, CLAUDE, etc.
capabilities=["db:read", "api:call"],
version="1.0.0",
description="Customer support AI agent",
tags=["production", "customer-facing"],
metadata={"model": "gpt-4", "department": "support"},
mcp_servers=["github", "filesystem"], # Remove to use mcp auto detect
)
# Risk level auto-detected from capability name (db:read → low)
@agent.perform_action(capability="db:read")
def get_customer(customer_id: str):
return {"id": customer_id, "name": "Jane Doe"}
result = get_customer("cust-123")Important: Always use the secure() function. This is the recommended function that emphasizes security-first design.
Configuration
Zero Configuration Mode
If you downloaded the SDK from AIM dashboard, everything is pre-configured.
from aim_sdk import secure
# Zero-configuration mode - Downloads SDK with embedded credentials
# No API keys needed if you downloaded the SDK from AIM dashboard
agent = secure("my-agent")
# Everything is auto-detected:
# - AIM server URL
# - Organization credentials
# - MCP servers the agent talks to
# - Agent capabilities
print(f"Agent ID: {agent.agent_id}")
print(f"Trust Score: {agent.get_trust_score()}")Manual Configuration
For pip install users or custom deployments.
from aim_sdk import secure
# Manual configuration for pip install users
agent = secure(
"my-agent",
api_key="aim_abc123...", # Your AIM API key
aim_url="https://api.aim.example.com" # Your AIM server
)
# Optional: Override auto-detection
agent = secure(
"my-agent",
api_key="aim_abc123...",
display_name="My Awesome Agent",
description="Production agent for data processing",
version="2.0.0",
talks_to=["mcp-server-1", "mcp-server-2"], # Manual MCP specification
capabilities=["db:read", "file:write"] # namespace:action format, # Manual capabilities
auto_detect=False # Disable auto-detection
)Core Features
Capability Verification with Decorators
Use decorators for automatic verification before sensitive operations.
from aim_sdk import secure
agent = secure("data-processor")
# Risk levels are AUTO-DETECTED from capability patterns!
@agent.perform_action(capability="config:read") # Auto: low (read-only namespace)
def read_config():
"""Low-risk: logged, doesn't affect trust score"""
return load_config()
@agent.perform_action(capability="db:read", resource="users") # Auto: low
def get_user(user_id: str):
"""Low-risk: read operations are safe"""
return db.get_user(user_id)
@agent.perform_action(capability="db:write", resource="users") # Auto: medium
def update_user(user_id: str, data: dict):
"""Medium-risk: data modification detected"""
return db.update_user(user_id, data)
# JIT access: waits for approval if needed
@agent.perform_action(capability="payment:refund", resource="stripe", jit_access=True)
def process_refund(order_id: str, amount: float):
"""Critical-risk + JIT: Waits for AIM approval before executing"""
return stripe.refund(order_id, amount)
# See /docs/decorators for full documentationMCP Server Auto-Detection
Automatically discovers MCP servers from Claude Desktop configuration.
from aim_sdk import secure
# Automatically detects MCP servers from Claude Desktop config
agent = secure("my-agent")
# View detected MCP servers
detected_mcps = agent.get_detected_mcps()
for mcp in detected_mcps:
print(f"Found MCP: {mcp['name']} - {mcp['command']}")
# Manual MCP specification (overrides auto-detection)
agent = secure(
"my-agent",
talks_to=[
{"name": "filesystem", "command": "npx @modelcontextprotocol/server-filesystem"},
{"name": "github", "command": "npx @modelcontextprotocol/server-github"}
]
)
# Report runtime MCP detection
agent.report_detected_mcps([
{"name": "new-mcp", "command": "python mcp_server.py", "detected_at": "runtime"}
])Capability Management
Fine-grained permission control with automatic capability detection.
from aim_sdk import secure
agent = secure("capability-demo")
# ═══════════════════════════════════════════════════════════
# CAPABILITY REGISTRATION
# ═══════════════════════════════════════════════════════════
# Auto-register: capabilities are registered automatically on first use
# when using @agent.perform_action(auto_register=True) # default
# Explicitly register a capability with metadata
agent.register_capability(
"api:search",
description="Search public APIs",
risk_level="low"
)
agent.register_capability(
"db:read",
description="Read from database",
risk_level="medium"
)
# ═══════════════════════════════════════════════════════════
# REQUEST NEW CAPABILITIES (requires admin approval)
# ═══════════════════════════════════════════════════════════
# Request a capability that needs admin approval
agent.request_capability(
capability="db:admin", # namespace:action format
reason="Need admin access for data migration task"
)
# Auto-detected capabilities from code analysis
capabilities = agent.get_capabilities()
print(f"Agent has {len(capabilities)} capabilities")
# Check if capability is allowed
if agent.can_perform("users:delete", resource="users_table"):
# Safe to proceed
delete_user(user_id)
else:
print("Capability not permitted - insufficient permissions")
# Get capability violations
violations = agent.get_violations()
if violations:
print(f"Warning: {len(violations)} capability violations detected")Trust Score Monitoring
Real-time trust scoring with ML-powered risk assessment.
from aim_sdk import secure
agent = secure("trust-monitor")
# Get current trust score
trust_score = agent.get_trust_score()
print(f"Current trust score: {trust_score['score']}/100")
print(f"Risk level: {trust_score['risk_level']}")
# Get trust score factors
factors = trust_score['factors']
for factor, value in factors.items():
print(f" {factor}: {value}")
# Monitor trust score changes
@agent.on_trust_change
def handle_trust_change(old_score, new_score):
if new_score < 70:
alert_admin(f"Trust score dropped to {new_score}")
# Get trust score history
history = agent.get_trust_history(days=30)
for entry in history:
print(f"{entry['timestamp']}: {entry['score']}")Error Handling
from aim_sdk import secure
from aim_sdk.exceptions import (
AIMError,
AuthenticationError,
VerificationError,
CapabilityDeniedError
)
try:
agent = secure("my-agent")
except AuthenticationError as e:
print(f"Authentication failed: {e}")
# Check API key or credentials
except AIMError as e:
print(f"AIM error: {e}")
# Handle general AIM errors
# Handle verification errors
try:
@agent.perform_action(capability="data:delete", resource="production") # namespace:action format
def dangerous_operation():
# This might be denied
pass
dangerous_operation()
except CapabilityDeniedError as e:
print(f"Capability denied: {e.reason}")
print(f"Required capability: {e.required_capability}")
except VerificationError as e:
print(f"Verification failed: {e}")Advanced Usage
from aim_sdk import secure
import asyncio
# Advanced configuration
agent = secure(
"advanced-agent",
api_key="aim_abc123...",
# Retry configuration
max_retries=3,
retry_delay=1.0,
# Timeout settings
request_timeout=30,
verification_timeout=5,
# Caching
enable_cache=True,
cache_ttl=3600,
# Debugging
debug=True,
log_level="DEBUG"
)
# Custom verification handler
@agent.verification_handler
def custom_verification(action, resource, context):
"""Custom logic before standard verification."""
if context.get("emergency_override"):
return True # Skip verification in emergencies
# Add custom checks
if action == "delete" and "production" in resource:
require_two_factor_auth()
return None # Continue with standard verification
# Batch operations
actions = [
("read_file", "/data/file1.csv"),
("read_file", "/data/file2.csv"),
("write_file", "/output/result.csv")
]
results = agent.verify_batch(actions)
for action, resource, allowed in results:
print(f"{action} on {resource}: {'✓' if allowed else '✗'}")
# Performance monitoring
with agent.measure_performance():
# Your code here
process_large_dataset()
# Get performance metrics
metrics = agent.get_metrics()
print(f"Avg verification time: {metrics['avg_verification_time']}ms")
print(f"Total verifications: {metrics['total_verifications']}")Testing
The SDK includes testing utilities for unit and integration tests.
import pytest
from aim_sdk import secure
from aim_sdk.testing import MockAIMClient
def test_agent_registration():
"""Test agent registration with AIM."""
agent = secure("test-agent", api_key="test_key")
assert agent.agent_id is not None
assert agent.is_verified is True
def test_with_mock():
"""Test with mocked AIM client."""
with MockAIMClient() as mock_agent:
mock_agent.set_trust_score(85)
mock_agent.allow_capability("file:read", "/test/*")
# Test your code
@mock_agent.perform_action(capability="file:read", resource="/test/data.txt")
def read_test_file():
return "test data"
result = read_test_file()
assert result == "test data"
assert mock_agent.verify_called_with("file:read", "/test/data.txt")
@pytest.mark.integration
def test_real_aim_connection():
"""Integration test with real AIM server."""
agent = secure("integration-test")
# Test real verification
verification = agent.verify_capability(
capability="test:action",
resource="test_resource"
)
assert verification.get("status") == "approved"
# Cleanup
agent.cleanup()Environment Variables
| Variable | Description | Default |
|---|---|---|
| AIM_API_KEY | AIM API key for authentication | None (required) |
| AIM_URL | AIM server URL | http://localhost:8080 |
| AIM_AGENT_NAME | Default agent name | None |
| AIM_AUTO_DETECT | Enable auto-detection | true |
| AIM_DEBUG | Enable debug logging | false |
| AIM_CACHE_TTL | Cache TTL in seconds | 3600 |
SDK Methods Reference
secure(name, **kwargs)One-line agent registration with automatic security
@agent.perform_action(capability, risk_level, resource, auto_register=True)NEW: Decorator for capability verification. auto_register=True (default) registers capability on first use.
@agent.perform_action(..., jit_access=True)Decorator for JIT access requiring admin approval before execution
agent.register_capability(capability_type, description, risk_level)NEW: Explicitly register a capability with metadata. Used when auto_register is disabled.
agent.verify_capability(capability, resource)Manual capability verification method
agent.get_trust_score()Get current trust score and risk assessment
agent.get_detected_mcps()Get list of detected MCP servers
agent.get_capabilities()Get list of agent capabilities
agent.request_capability(capability, reason)Request additional capability (requires admin approval, creates JIT request)
Enforcement Modes
The Global Enforcement Mode (configured in Settings → Security Policies) determines how capability violations are handled:
MONITORING Mode
- ✓ All actions allowed
- ✓ Violations logged as alerts
- ✓ Great for development/testing
STRICT Mode
- ✓ Unauthorized actions blocked
- ✓ Policies fully enforced
- ✓ Recommended for production
Requirements
- • Python 3.8 or higher
- • cryptography >= 41.0.0 (Ed25519 support)
- • requests >= 2.31.0 (API communication)
- • pydantic >= 2.0.0 (data validation)
- • aiohttp >= 3.8.0 (async support, optional)