OAuth Token Management

AIM uses OAuth 2.0 refresh token flows to manage SDK authentication automatically. The SDK handles token refresh seamlessly, ensuring agents maintain continuous access without manual intervention.

Important: OAuth in AIM

OAuth in AIM is used exclusively for SDK token management (refresh tokens), not for user authentication. Users authenticate via JWT tokens or API keys. The OAuth implementation ensures that SDK agents can maintain long-lived sessions with automatic token refresh.

How OAuth Token Management Works

1

Agent Registration

SDK registers agent with AIM and receives initial access token (15 min) and refresh token (30 days)

2

Automatic Monitoring

SDK monitors token expiration and refreshes 5 minutes before expiry

3

Token Rotation

Each refresh returns NEW access and refresh tokens (rotation for security)

4

Seamless Operation

Agent continues operating without interruption - all handled by SDK

Token Refresh Flow

How the SDK Manages Tokens

# How AIM SDK Uses OAuth for Token Management

# 1. Initial Agent Registration (SDK handles automatically)
from aim_sdk import secure

# SDK automatically:
# - Registers agent with AIM server
# - Receives access_token and refresh_token
# - Stores tokens securely
agent = secure("my-agent")

# 2. Behind the scenes - Token refresh flow
class AIMTokenManager:
    def __init__(self):
        self.access_token = None
        self.refresh_token = None
        self.expires_at = None

    async def ensure_valid_token(self):
        """SDK automatically refreshes tokens before expiry"""
        if self._is_token_expired():
            await self._refresh_access_token()
        return self.access_token

    async def _refresh_access_token(self):
        """OAuth 2.0 refresh token grant"""
        response = await http.post(
            "https://api.aim.example.com/v1/auth/refresh",
            json={
                "grant_type": "refresh_token",
                "refresh_token": self.refresh_token,
                "client_id": self.agent_id
            }
        )

        # Update tokens (refresh token rotation)
        data = response.json()
        self.access_token = data["access_token"]
        self.refresh_token = data["refresh_token"]  # New refresh token
        self.expires_at = time.time() + data["expires_in"]

    def _is_token_expired(self):
        """Check if token needs refresh (5 min buffer)"""
        return time.time() > (self.expires_at - 300)

Token Lifecycle

// Token Lifecycle in AIM SDK

// 1. Initial Registration Response
{
  "agent_id": "agent_abc123",
  "access_token": "eyJhbGciOiJSUzI1NiIs...",  // JWT, expires in 15 minutes
  "refresh_token": "rt_xyzabc789...",          // Opaque, expires in 30 days
  "expires_in": 900,                           // 15 minutes
  "token_type": "Bearer"
}

// 2. Automatic Refresh (before access token expires)
POST /v1/auth/refresh
{
  "grant_type": "refresh_token",
  "refresh_token": "rt_xyzabc789...",
  "client_id": "agent_abc123"
}

// 3. Refresh Response (token rotation)
{
  "access_token": "eyJhbGciOiJSUzI1NiIs...",  // New access token
  "refresh_token": "rt_newtoken456...",        // NEW refresh token (rotation)
  "expires_in": 900,                           // 15 minutes
  "token_type": "Bearer"
}

// 4. Using the Access Token
Authorization: Bearer eyJhbGciOiJSUzI1NiIs...

Manual Implementation (Non-SDK)

Implementing Token Management Without SDK

If you're not using the Python SDK, here's how to implement OAuth token management manually:

# Manual OAuth Token Management (for non-SDK users)

import requests
import time
import json
from typing import Optional, Dict
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ed25519

class ManualAIMClient:
    """Manual implementation of AIM OAuth token management"""

    def __init__(self, agent_name: str, private_key_path: str):
        self.agent_name = agent_name
        self.base_url = "https://api.aim.example.com"
        self.access_token = None
        self.refresh_token = None
        self.expires_at = None

        # Load Ed25519 private key for signing
        with open(private_key_path, 'rb') as f:
            self.private_key = serialization.load_pem_private_key(
                f.read(), password=None
            )

    def register_agent(self) -> Dict:
        """Initial agent registration with signed request"""
        # Create registration payload
        payload = {
            "name": self.agent_name,
            "public_key": self._get_public_key(),
            "capabilities": ["read", "write", "execute"],
            "timestamp": int(time.time())
        }

        # Sign the payload
        signature = self._sign_payload(payload)

        # Register agent
        response = requests.post(
            f"{self.base_url}/v1/agents/register",
            json=payload,
            headers={
                "X-Agent-Signature": signature
            }
        )

        if response.status_code == 201:
            data = response.json()
            self.access_token = data["access_token"]
            self.refresh_token = data["refresh_token"]
            self.expires_at = time.time() + data["expires_in"]
            return data
        else:
            raise Exception(f"Registration failed: {response.text}")

    def refresh_tokens(self) -> Dict:
        """Refresh expired access token using refresh token"""
        if not self.refresh_token:
            raise Exception("No refresh token available")

        response = requests.post(
            f"{self.base_url}/v1/auth/refresh",
            json={
                "grant_type": "refresh_token",
                "refresh_token": self.refresh_token,
                "client_id": self.agent_name
            }
        )

        if response.status_code == 200:
            data = response.json()
            # Update tokens (rotation)
            self.access_token = data["access_token"]
            self.refresh_token = data["refresh_token"]  # New refresh token!
            self.expires_at = time.time() + data["expires_in"]
            return data
        else:
            # Refresh token expired or revoked
            # Need to re-register the agent
            return self.register_agent()

    def make_api_call(self, endpoint: str, method: str = "GET", **kwargs) -> Dict:
        """Make an API call with automatic token refresh"""
        # Check if token needs refresh
        if self._needs_refresh():
            self.refresh_tokens()

        # Make the API call
        response = requests.request(
            method,
            f"{self.base_url}{endpoint}",
            headers={
                "Authorization": f"Bearer {self.access_token}"
            },
            **kwargs
        )

        # Handle 401 Unauthorized (token expired)
        if response.status_code == 401:
            self.refresh_tokens()
            # Retry the request
            response = requests.request(
                method,
                f"{self.base_url}{endpoint}",
                headers={
                    "Authorization": f"Bearer {self.access_token}"
                },
                **kwargs
            )

        return response.json()

    def _needs_refresh(self) -> bool:
        """Check if access token needs refresh (5 min buffer)"""
        if not self.expires_at:
            return True
        return time.time() > (self.expires_at - 300)

    def _get_public_key(self) -> str:
        """Get public key in PEM format"""
        public_key = self.private_key.public_key()
        return public_key.public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        ).decode('utf-8')

    def _sign_payload(self, payload: Dict) -> str:
        """Sign payload with Ed25519 private key"""
        message = json.dumps(payload, sort_keys=True).encode()
        signature = self.private_key.sign(message)
        return signature.hex()

# Usage example
client = ManualAIMClient("my-agent", "/path/to/private_key.pem")
client.register_agent()

# Make API calls - tokens refresh automatically
agents = client.make_api_call("/v1/agents")
trust_score = client.make_api_call("/v1/agents/my-agent/trust")

Server-Side Implementation

AIM Server Token Refresh Handler

// Server-side OAuth Implementation (Go)

// Token refresh endpoint
func (h *AuthHandler) RefreshToken(c *fiber.Ctx) error {
    var req RefreshTokenRequest
    if err := c.BodyParser(&req); err != nil {
        return c.Status(400).JSON(fiber.Map{
            "error": "Invalid request body",
        })
    }

    // Validate refresh token
    claims, err := h.validateRefreshToken(req.RefreshToken)
    if err != nil {
        return c.Status(401).JSON(fiber.Map{
            "error": "Invalid or expired refresh token",
        })
    }

    // Check if refresh token has been revoked
    if h.isTokenRevoked(req.RefreshToken) {
        return c.Status(401).JSON(fiber.Map{
            "error": "Refresh token has been revoked",
        })
    }

    // Generate new tokens (rotation)
    newAccessToken, err := h.generateAccessToken(claims.AgentID)
    if err != nil {
        return c.Status(500).JSON(fiber.Map{
            "error": "Failed to generate access token",
        })
    }

    newRefreshToken, err := h.generateRefreshToken(claims.AgentID)
    if err != nil {
        return c.Status(500).JSON(fiber.Map{
            "error": "Failed to generate refresh token",
        })
    }

    // Revoke old refresh token
    h.revokeToken(req.RefreshToken)

    // Store new refresh token
    h.storeRefreshToken(newRefreshToken, claims.AgentID)

    // Log token refresh event
    h.auditLog.Log(AuditEvent{
        Type:     "token_refresh",
        AgentID:  claims.AgentID,
        IP:       c.IP(),
        Success:  true,
    })

    return c.JSON(fiber.Map{
        "access_token":  newAccessToken,
        "refresh_token": newRefreshToken,
        "expires_in":    900, // 15 minutes
        "token_type":    "Bearer",
    })
}

// Token generation with proper claims
func (h *AuthHandler) generateAccessToken(agentID string) (string, error) {
    claims := jwt.MapClaims{
        "sub":      agentID,
        "type":     "access",
        "iat":      time.Now().Unix(),
        "exp":      time.Now().Add(15 * time.Minute).Unix(),
        "iss":      "aim.example.com",
        "aud":      "aim-api",
        "jti":      uuid.New().String(), // Unique token ID
    }

    token := jwt.NewWithClaims(jwt.SigningMethodRS256, claims)
    return token.SignedString(h.privateKey)
}

Security Features

# OAuth Security Features in AIM

## 1. Refresh Token Rotation
- Every refresh generates NEW access AND refresh tokens
- Old refresh token immediately revoked
- Prevents token replay attacks

## 2. Token Expiration Strategy
Access Token: 15 minutes (short-lived)
Refresh Token: 30 days (configurable)
SDK Buffer: Refreshes 5 minutes before expiry

## 3. Token Storage (SDK)
# Secure storage per platform
- Linux: ~/.aim/tokens (chmod 600)
- macOS: Keychain
- Windows: Credential Manager

## 4. Automatic Recovery
# SDK handles all failure scenarios
- Expired tokens: Auto-refresh
- Revoked tokens: Re-register agent
- Network errors: Exponential backoff
- Invalid tokens: Clear and re-authenticate

## 5. Rate Limiting
# Prevent abuse
- Refresh: 10 requests per minute per agent
- Register: 5 requests per hour per IP
- API calls: 1000 requests per hour per agent

Token Types in AIM

Token TypePurposeLifetimeFormatRotation
Access TokenAPI authentication15 minutesJWT (RS256)On refresh
Refresh TokenGet new access token30 daysOpaque (UUID)Every use
API KeyDirect API accessConfigurableSHA-256 hashManual
JWT (User)User authentication24 hoursJWT (RS256)On refresh

Best Practices

For SDK Users

  • Let SDK handle all token management
  • Use secure() function for registration
  • Store credentials in environment variables
  • Monitor SDK logs for token errors

For Manual Implementation

  • Implement token refresh 5 min before expiry
  • Handle 401 responses gracefully
  • Store refresh tokens securely
  • Implement exponential backoff for retries

OAuth API Endpoints

POST/v1/auth/refresh

Refresh access token using refresh token (token rotation)

POST/v1/auth/revoke

Revoke a refresh token (logout agent)

GET/v1/auth/validate

Validate current access token