from fastapi import APIRouter, Depends, HTTPException, status, Request from fastapi.security import HTTPAuthorizationCredentials from sqlalchemy.orm import Session from datetime import timedelta from typing import List import json from database import get_db from models.user import User, UserRole from models.api_key import APIKey, APIKeyScope from models.api_key_usage import APIKeyUsage from schemas.auth import UserLogin, UserRegister, Token, RefreshToken from schemas.api_key import APIKeyCreate, APIKeyResponse, APIKeyWithToken, APIKeyUpdate, APIKeyUsageLog from utils.auth import ( verify_password, get_password_hash, create_access_token, create_refresh_token, verify_token, security, ACCESS_TOKEN_EXPIRE_MINUTES, REFRESH_TOKEN_EXPIRE_DAYS, generate_api_key, hash_api_key, get_current_user_flexible, require_role ) router = APIRouter() @router.post("/register", response_model=dict) async def register(user_data: UserRegister, db: Session = Depends(get_db)): """Register a new user account.""" # Check if user already exists existing_user = db.query(User).filter(User.email == user_data.email).first() if existing_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered" ) # Create new user hashed_password = get_password_hash(user_data.password) new_user = User( email=user_data.email, password_hash=hashed_password, first_name=user_data.first_name, last_name=user_data.last_name, role=UserRole.ARTIST, # Default role is_approved=False # Requires admin approval ) db.add(new_user) db.commit() db.refresh(new_user) return { "message": "User registered successfully. Awaiting admin approval.", "user_id": new_user.id } @router.post("/login", response_model=Token) async def login(user_credentials: UserLogin, db: Session = Depends(get_db)): """Authenticate user and return JWT tokens.""" # Find user by email print(user_credentials.email) user = db.query(User).filter(User.email == user_credentials.email).first() if not user or not verify_password(user_credentials.password, user.password_hash): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect email or password" ) # Check if user is approved if not user.is_approved: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Account not approved by administrator" ) # Create tokens access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) refresh_token_expires = timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS) token_data = {"sub": str(user.id), "email": user.email, "role": user.role} access_token = create_access_token( data=token_data, expires_delta=access_token_expires ) refresh_token = create_refresh_token( data=token_data, expires_delta=refresh_token_expires ) return Token( access_token=access_token, refresh_token=refresh_token ) @router.post("/refresh", response_model=Token) async def refresh_token(refresh_data: RefreshToken, db: Session = Depends(get_db)): """Refresh access token using refresh token.""" # Verify refresh token payload = verify_token(refresh_data.refresh_token, "refresh") if payload is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid refresh token" ) # Get user from database user_id = payload.get("sub") user = db.query(User).filter(User.id == user_id).first() if not user or not user.is_approved: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found or not approved" ) # Create new tokens access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) refresh_token_expires = timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS) token_data = {"sub": str(user.id), "email": user.email, "role": user.role} new_access_token = create_access_token( data=token_data, expires_delta=access_token_expires ) new_refresh_token = create_refresh_token( data=token_data, expires_delta=refresh_token_expires ) return Token( access_token=new_access_token, refresh_token=new_refresh_token ) @router.post("/logout") async def logout(): """Logout user (client should discard tokens).""" return {"message": "Successfully logged out"} # API Key Management Endpoints @router.post("/api-keys", response_model=APIKeyWithToken) async def create_api_key( api_key_data: APIKeyCreate, request: Request, db: Session = Depends(get_db), current_user: User = Depends(get_current_user_flexible) ): """Create a new API key. Only developers and admins can create API keys.""" # Check if user has permission to create API keys if current_user.role != UserRole.DEVELOPER and not current_user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Only developers and users with admin permission can create API keys" ) # Determine target user for the API key target_user_id = current_user.id # Default to current user target_user = current_user # If user_id is specified and current user has admin permission, allow creating for other users if api_key_data.user_id is not None: if not current_user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Only users with admin permission can create API keys for other users" ) # Verify target user exists and is approved target_user = db.query(User).filter(User.id == api_key_data.user_id).first() if not target_user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Target user not found" ) if not target_user.is_approved: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot create API key for unapproved user" ) target_user_id = target_user.id # Generate API key api_key_token = generate_api_key() key_hash = hash_api_key(api_key_token) # Convert scopes to JSON string scopes_json = json.dumps([scope.value for scope in api_key_data.scopes]) # Create API key record new_api_key = APIKey( user_id=target_user_id, key_hash=key_hash, name=api_key_data.name, scopes=scopes_json, expires_at=api_key_data.expires_at ) db.add(new_api_key) db.commit() db.refresh(new_api_key) # Convert scopes back to list for response scopes_list = json.loads(new_api_key.scopes) api_key_response = APIKeyResponse( id=new_api_key.id, user_id=new_api_key.user_id, name=new_api_key.name, scopes=scopes_list, is_active=new_api_key.is_active, expires_at=new_api_key.expires_at, last_used_at=new_api_key.last_used_at, created_at=new_api_key.created_at, user_email=target_user.email ) return APIKeyWithToken( api_key=api_key_response, token=api_key_token ) @router.get("/api-keys", response_model=List[APIKeyResponse]) async def list_api_keys( request: Request, db: Session = Depends(get_db), current_user: User = Depends(get_current_user_flexible), user_id: int = None ): """List API keys. Developers see their own, admins can see all or filter by user.""" # Developers and users with admin permission can see API keys if current_user.role != UserRole.DEVELOPER and not current_user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Only developers and users with admin permission can manage API keys" ) # Build query based on user permissions and parameters if current_user.is_admin: if user_id is not None: # Admin requesting specific user's API keys api_keys = db.query(APIKey).filter(APIKey.user_id == user_id).all() else: # Admin requesting all API keys api_keys = db.query(APIKey).all() else: # Developer can only see their own API keys if user_id is not None and user_id != current_user.id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Developers can only view their own API keys" ) api_keys = db.query(APIKey).filter(APIKey.user_id == current_user.id).all() result = [] for api_key in api_keys: scopes_list = json.loads(api_key.scopes) # Get user email for admin view user_email = None if current_user.is_admin: user = db.query(User).filter(User.id == api_key.user_id).first() if user: user_email = user.email result.append(APIKeyResponse( id=api_key.id, user_id=api_key.user_id, name=api_key.name, scopes=scopes_list, is_active=api_key.is_active, expires_at=api_key.expires_at, last_used_at=api_key.last_used_at, created_at=api_key.created_at, user_email=user_email )) return result @router.put("/api-keys/{api_key_id}", response_model=APIKeyResponse) async def update_api_key( api_key_id: int, api_key_data: APIKeyUpdate, request: Request, db: Session = Depends(get_db), current_user: User = Depends(get_current_user_flexible) ): """Update an API key.""" # Check if user has permission if current_user.role != UserRole.DEVELOPER and not current_user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Only developers and users with admin permission can manage API keys" ) # Get API key with different access rules for admin vs developer if current_user.is_admin: # Users with admin permission can update any API key api_key = db.query(APIKey).filter(APIKey.id == api_key_id).first() else: # Developers can only update their own API keys api_key = db.query(APIKey).filter( APIKey.id == api_key_id, APIKey.user_id == current_user.id ).first() if not api_key: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="API key not found" ) # Update fields if api_key_data.name is not None: api_key.name = api_key_data.name if api_key_data.scopes is not None: scopes_json = json.dumps([scope.value for scope in api_key_data.scopes]) api_key.scopes = scopes_json if api_key_data.is_active is not None: api_key.is_active = api_key_data.is_active if api_key_data.expires_at is not None: api_key.expires_at = api_key_data.expires_at db.commit() db.refresh(api_key) # Convert scopes back to list for response scopes_list = json.loads(api_key.scopes) # Get user email for admin view user_email = None if current_user.is_admin: user = db.query(User).filter(User.id == api_key.user_id).first() if user: user_email = user.email return APIKeyResponse( id=api_key.id, user_id=api_key.user_id, name=api_key.name, scopes=scopes_list, is_active=api_key.is_active, expires_at=api_key.expires_at, last_used_at=api_key.last_used_at, created_at=api_key.created_at, user_email=user_email ) @router.delete("/api-keys/{api_key_id}") async def delete_api_key( api_key_id: int, request: Request, db: Session = Depends(get_db), current_user: User = Depends(get_current_user_flexible) ): """Delete (revoke) an API key.""" # Check if user has permission if current_user.role != UserRole.DEVELOPER and not current_user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Only developers and users with admin permission can manage API keys" ) # Get API key with different access rules for admin vs developer if current_user.is_admin: # Users with admin permission can delete any API key api_key = db.query(APIKey).filter(APIKey.id == api_key_id).first() else: # Developers can only delete their own API keys api_key = db.query(APIKey).filter( APIKey.id == api_key_id, APIKey.user_id == current_user.id ).first() if not api_key: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="API key not found" ) # Delete the API key db.delete(api_key) db.commit() return {"message": "API key revoked successfully"} @router.get("/api-keys/{api_key_id}/usage", response_model=List[APIKeyUsageLog]) async def get_api_key_usage( api_key_id: int, request: Request, db: Session = Depends(get_db), current_user: User = Depends(get_current_user_flexible), limit: int = 100 ): """Get usage logs for an API key.""" # Check if user has permission if current_user.role != UserRole.DEVELOPER and not current_user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Only developers and users with admin permission can view API key usage" ) # Verify API key access with different rules for admin vs developer if current_user.is_admin: # Users with admin permission can view usage for any API key api_key = db.query(APIKey).filter(APIKey.id == api_key_id).first() else: # Developers can only view usage for their own API keys api_key = db.query(APIKey).filter( APIKey.id == api_key_id, APIKey.user_id == current_user.id ).first() if not api_key: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="API key not found" ) # Get usage logs usage_logs = db.query(APIKeyUsage).filter( APIKeyUsage.api_key_id == api_key_id ).order_by(APIKeyUsage.timestamp.desc()).limit(limit).all() return [ APIKeyUsageLog( api_key_id=log.api_key_id, endpoint=log.endpoint, method=log.method, timestamp=log.timestamp, ip_address=log.ip_address, user_agent=log.user_agent ) for log in usage_logs ] # Admin-only endpoints for API key management @router.get("/admin/users/{user_id}/api-keys", response_model=List[APIKeyResponse]) async def list_user_api_keys_admin( user_id: int, request: Request, db: Session = Depends(get_db), current_user: User = Depends(get_current_user_flexible) ): """Admin endpoint to list API keys for a specific user.""" # Only users with admin permission can access this endpoint if not current_user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Admin permission required to access this endpoint" ) # Verify target user exists target_user = db.query(User).filter(User.id == user_id).first() if not target_user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) # Get API keys for the specified user api_keys = db.query(APIKey).filter(APIKey.user_id == user_id).all() result = [] for api_key in api_keys: scopes_list = json.loads(api_key.scopes) result.append(APIKeyResponse( id=api_key.id, user_id=api_key.user_id, name=api_key.name, scopes=scopes_list, is_active=api_key.is_active, expires_at=api_key.expires_at, last_used_at=api_key.last_used_at, created_at=api_key.created_at, user_email=target_user.email )) return result @router.post("/admin/users/{user_id}/api-keys", response_model=APIKeyWithToken) async def create_api_key_for_user_admin( user_id: int, api_key_data: APIKeyCreate, request: Request, db: Session = Depends(get_db), current_user: User = Depends(get_current_user_flexible) ): """Admin endpoint to create an API key for a specific user.""" # Only users with admin permission can access this endpoint if not current_user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Admin permission required to access this endpoint" ) # Verify target user exists and is approved target_user = db.query(User).filter(User.id == user_id).first() if not target_user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) if not target_user.is_approved: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot create API key for unapproved user" ) # Generate API key api_key_token = generate_api_key() key_hash = hash_api_key(api_key_token) # Convert scopes to JSON string scopes_json = json.dumps([scope.value for scope in api_key_data.scopes]) # Create API key record new_api_key = APIKey( user_id=user_id, key_hash=key_hash, name=api_key_data.name, scopes=scopes_json, expires_at=api_key_data.expires_at ) db.add(new_api_key) db.commit() db.refresh(new_api_key) # Convert scopes back to list for response scopes_list = json.loads(new_api_key.scopes) api_key_response = APIKeyResponse( id=new_api_key.id, user_id=new_api_key.user_id, name=new_api_key.name, scopes=scopes_list, is_active=new_api_key.is_active, expires_at=new_api_key.expires_at, last_used_at=new_api_key.last_used_at, created_at=new_api_key.created_at, user_email=target_user.email ) return APIKeyWithToken( api_key=api_key_response, token=api_key_token )