OAuth Token Management
AIM uses OAuth 2.0 refresh token flows to manage SDK authentication automatically. The SDK handles token refresh seamlessly, ensuring agents maintain continuous access without manual intervention.
Important: OAuth in AIM
OAuth in AIM is used exclusively for SDK token management (refresh tokens), not for user authentication. Users authenticate via JWT tokens or API keys. The OAuth implementation ensures that SDK agents can maintain long-lived sessions with automatic token refresh.
How OAuth Token Management Works
Agent Registration
SDK registers agent with AIM and receives initial access token (15 min) and refresh token (30 days)
Automatic Monitoring
SDK monitors token expiration and refreshes 5 minutes before expiry
Token Rotation
Each refresh returns NEW access and refresh tokens (rotation for security)
Seamless Operation
Agent continues operating without interruption - all handled by SDK
Token Refresh Flow
How the SDK Manages Tokens
# How AIM SDK Uses OAuth for Token Management
# 1. Initial Agent Registration (SDK handles automatically)
from aim_sdk import secure
# SDK automatically:
# - Registers agent with AIM server
# - Receives access_token and refresh_token
# - Stores tokens securely
agent = secure("my-agent")
# 2. Behind the scenes - Token refresh flow
class AIMTokenManager:
def __init__(self):
self.access_token = None
self.refresh_token = None
self.expires_at = None
async def ensure_valid_token(self):
"""SDK automatically refreshes tokens before expiry"""
if self._is_token_expired():
await self._refresh_access_token()
return self.access_token
async def _refresh_access_token(self):
"""OAuth 2.0 refresh token grant"""
response = await http.post(
"https://api.aim.example.com/v1/auth/refresh",
json={
"grant_type": "refresh_token",
"refresh_token": self.refresh_token,
"client_id": self.agent_id
}
)
# Update tokens (refresh token rotation)
data = response.json()
self.access_token = data["access_token"]
self.refresh_token = data["refresh_token"] # New refresh token
self.expires_at = time.time() + data["expires_in"]
def _is_token_expired(self):
"""Check if token needs refresh (5 min buffer)"""
return time.time() > (self.expires_at - 300)Token Lifecycle
// Token Lifecycle in AIM SDK
// 1. Initial Registration Response
{
"agent_id": "agent_abc123",
"access_token": "eyJhbGciOiJSUzI1NiIs...", // JWT, expires in 15 minutes
"refresh_token": "rt_xyzabc789...", // Opaque, expires in 30 days
"expires_in": 900, // 15 minutes
"token_type": "Bearer"
}
// 2. Automatic Refresh (before access token expires)
POST /v1/auth/refresh
{
"grant_type": "refresh_token",
"refresh_token": "rt_xyzabc789...",
"client_id": "agent_abc123"
}
// 3. Refresh Response (token rotation)
{
"access_token": "eyJhbGciOiJSUzI1NiIs...", // New access token
"refresh_token": "rt_newtoken456...", // NEW refresh token (rotation)
"expires_in": 900, // 15 minutes
"token_type": "Bearer"
}
// 4. Using the Access Token
Authorization: Bearer eyJhbGciOiJSUzI1NiIs...Manual Implementation (Non-SDK)
Implementing Token Management Without SDK
If you're not using the Python SDK, here's how to implement OAuth token management manually:
# Manual OAuth Token Management (for non-SDK users)
import requests
import time
import json
from typing import Optional, Dict
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ed25519
class ManualAIMClient:
"""Manual implementation of AIM OAuth token management"""
def __init__(self, agent_name: str, private_key_path: str):
self.agent_name = agent_name
self.base_url = "https://api.aim.example.com"
self.access_token = None
self.refresh_token = None
self.expires_at = None
# Load Ed25519 private key for signing
with open(private_key_path, 'rb') as f:
self.private_key = serialization.load_pem_private_key(
f.read(), password=None
)
def register_agent(self) -> Dict:
"""Initial agent registration with signed request"""
# Create registration payload
payload = {
"name": self.agent_name,
"public_key": self._get_public_key(),
"capabilities": ["read", "write", "execute"],
"timestamp": int(time.time())
}
# Sign the payload
signature = self._sign_payload(payload)
# Register agent
response = requests.post(
f"{self.base_url}/v1/agents/register",
json=payload,
headers={
"X-Agent-Signature": signature
}
)
if response.status_code == 201:
data = response.json()
self.access_token = data["access_token"]
self.refresh_token = data["refresh_token"]
self.expires_at = time.time() + data["expires_in"]
return data
else:
raise Exception(f"Registration failed: {response.text}")
def refresh_tokens(self) -> Dict:
"""Refresh expired access token using refresh token"""
if not self.refresh_token:
raise Exception("No refresh token available")
response = requests.post(
f"{self.base_url}/v1/auth/refresh",
json={
"grant_type": "refresh_token",
"refresh_token": self.refresh_token,
"client_id": self.agent_name
}
)
if response.status_code == 200:
data = response.json()
# Update tokens (rotation)
self.access_token = data["access_token"]
self.refresh_token = data["refresh_token"] # New refresh token!
self.expires_at = time.time() + data["expires_in"]
return data
else:
# Refresh token expired or revoked
# Need to re-register the agent
return self.register_agent()
def make_api_call(self, endpoint: str, method: str = "GET", **kwargs) -> Dict:
"""Make an API call with automatic token refresh"""
# Check if token needs refresh
if self._needs_refresh():
self.refresh_tokens()
# Make the API call
response = requests.request(
method,
f"{self.base_url}{endpoint}",
headers={
"Authorization": f"Bearer {self.access_token}"
},
**kwargs
)
# Handle 401 Unauthorized (token expired)
if response.status_code == 401:
self.refresh_tokens()
# Retry the request
response = requests.request(
method,
f"{self.base_url}{endpoint}",
headers={
"Authorization": f"Bearer {self.access_token}"
},
**kwargs
)
return response.json()
def _needs_refresh(self) -> bool:
"""Check if access token needs refresh (5 min buffer)"""
if not self.expires_at:
return True
return time.time() > (self.expires_at - 300)
def _get_public_key(self) -> str:
"""Get public key in PEM format"""
public_key = self.private_key.public_key()
return public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
).decode('utf-8')
def _sign_payload(self, payload: Dict) -> str:
"""Sign payload with Ed25519 private key"""
message = json.dumps(payload, sort_keys=True).encode()
signature = self.private_key.sign(message)
return signature.hex()
# Usage example
client = ManualAIMClient("my-agent", "/path/to/private_key.pem")
client.register_agent()
# Make API calls - tokens refresh automatically
agents = client.make_api_call("/v1/agents")
trust_score = client.make_api_call("/v1/agents/my-agent/trust")Server-Side Implementation
AIM Server Token Refresh Handler
// Server-side OAuth Implementation (Go)
// Token refresh endpoint
func (h *AuthHandler) RefreshToken(c *fiber.Ctx) error {
var req RefreshTokenRequest
if err := c.BodyParser(&req); err != nil {
return c.Status(400).JSON(fiber.Map{
"error": "Invalid request body",
})
}
// Validate refresh token
claims, err := h.validateRefreshToken(req.RefreshToken)
if err != nil {
return c.Status(401).JSON(fiber.Map{
"error": "Invalid or expired refresh token",
})
}
// Check if refresh token has been revoked
if h.isTokenRevoked(req.RefreshToken) {
return c.Status(401).JSON(fiber.Map{
"error": "Refresh token has been revoked",
})
}
// Generate new tokens (rotation)
newAccessToken, err := h.generateAccessToken(claims.AgentID)
if err != nil {
return c.Status(500).JSON(fiber.Map{
"error": "Failed to generate access token",
})
}
newRefreshToken, err := h.generateRefreshToken(claims.AgentID)
if err != nil {
return c.Status(500).JSON(fiber.Map{
"error": "Failed to generate refresh token",
})
}
// Revoke old refresh token
h.revokeToken(req.RefreshToken)
// Store new refresh token
h.storeRefreshToken(newRefreshToken, claims.AgentID)
// Log token refresh event
h.auditLog.Log(AuditEvent{
Type: "token_refresh",
AgentID: claims.AgentID,
IP: c.IP(),
Success: true,
})
return c.JSON(fiber.Map{
"access_token": newAccessToken,
"refresh_token": newRefreshToken,
"expires_in": 900, // 15 minutes
"token_type": "Bearer",
})
}
// Token generation with proper claims
func (h *AuthHandler) generateAccessToken(agentID string) (string, error) {
claims := jwt.MapClaims{
"sub": agentID,
"type": "access",
"iat": time.Now().Unix(),
"exp": time.Now().Add(15 * time.Minute).Unix(),
"iss": "aim.example.com",
"aud": "aim-api",
"jti": uuid.New().String(), // Unique token ID
}
token := jwt.NewWithClaims(jwt.SigningMethodRS256, claims)
return token.SignedString(h.privateKey)
}Security Features
# OAuth Security Features in AIM
## 1. Refresh Token Rotation
- Every refresh generates NEW access AND refresh tokens
- Old refresh token immediately revoked
- Prevents token replay attacks
## 2. Token Expiration Strategy
Access Token: 15 minutes (short-lived)
Refresh Token: 30 days (configurable)
SDK Buffer: Refreshes 5 minutes before expiry
## 3. Token Storage (SDK)
# Secure storage per platform
- Linux: ~/.aim/tokens (chmod 600)
- macOS: Keychain
- Windows: Credential Manager
## 4. Automatic Recovery
# SDK handles all failure scenarios
- Expired tokens: Auto-refresh
- Revoked tokens: Re-register agent
- Network errors: Exponential backoff
- Invalid tokens: Clear and re-authenticate
## 5. Rate Limiting
# Prevent abuse
- Refresh: 10 requests per minute per agent
- Register: 5 requests per hour per IP
- API calls: 1000 requests per hour per agentToken Types in AIM
| Token Type | Purpose | Lifetime | Format | Rotation |
|---|---|---|---|---|
| Access Token | API authentication | 15 minutes | JWT (RS256) | On refresh |
| Refresh Token | Get new access token | 30 days | Opaque (UUID) | Every use |
| API Key | Direct API access | Configurable | SHA-256 hash | Manual |
| JWT (User) | User authentication | 24 hours | JWT (RS256) | On refresh |
Best Practices
For SDK Users
- Let SDK handle all token management
- Use secure() function for registration
- Store credentials in environment variables
- Monitor SDK logs for token errors
For Manual Implementation
- Implement token refresh 5 min before expiry
- Handle 401 responses gracefully
- Store refresh tokens securely
- Implement exponential backoff for retries
OAuth API Endpoints
/v1/auth/refreshRefresh access token using refresh token (token rotation)
/v1/auth/revokeRevoke a refresh token (logout agent)
/v1/auth/validateValidate current access token