#!/usr/bin/env python3 """ Direct database test to verify the asset query optimization performance. This tests the database queries directly without HTTP overhead. """ import time from sqlalchemy.orm import Session, joinedload from database import get_db from models.asset import Asset from models.task import Task from models.project import Project def test_old_vs_new_query_pattern(): """Compare old N+1 pattern vs new optimized pattern.""" print("Asset Query Performance Comparison") print("=" * 50) # Get database session db = next(get_db()) try: # Test with project_id = 2 (has assets) project_id = 2 limit = 50 print(f"Testing with project_id={project_id}, limit={limit}") # OLD PATTERN (N+1 queries) - Simulated print("\n1. OLD PATTERN (N+1 queries):") start_time = time.time() # Main query for assets assets = db.query(Asset).filter( Asset.deleted_at.is_(None), Asset.project_id == project_id ).limit(limit).all() # Simulate N+1 pattern - separate query for each asset's tasks total_tasks = 0 for asset in assets: tasks = db.query(Task).filter( Task.asset_id == asset.id, Task.deleted_at.is_(None) ).all() total_tasks += len(tasks) old_time = time.time() - start_time print(f" Time: {old_time * 1000:.2f}ms") print(f" Assets: {len(assets)}") print(f" Total tasks: {total_tasks}") print(f" Queries executed: {len(assets) + 1} (1 for assets + {len(assets)} for tasks)") # NEW PATTERN (Single JOIN query) print("\n2. NEW PATTERN (Single JOIN query):") start_time = time.time() # Single query with JOIN assets_with_tasks = ( db.query(Asset) .outerjoin(Task, (Task.asset_id == Asset.id) & (Task.deleted_at.is_(None))) .filter(Asset.deleted_at.is_(None), Asset.project_id == project_id) .options(joinedload(Asset.project)) .add_columns( Task.id.label('task_id'), Task.task_type, Task.status.label('task_status'), Task.assigned_user_id ) .limit(limit) .all() ) # Process results (simulate the aggregation logic) assets_dict = {} for row in assets_with_tasks: asset = row[0] task_id = row[1] if asset.id not in assets_dict: assets_dict[asset.id] = { 'asset': asset, 'tasks': [] } if task_id is not None: assets_dict[asset.id]['tasks'].append({ 'task_id': task_id, 'task_type': row[2], 'status': row[3], 'assigned_user_id': row[4] }) new_time = time.time() - start_time total_tasks_new = sum(len(data['tasks']) for data in assets_dict.values()) print(f" Time: {new_time * 1000:.2f}ms") print(f" Assets: {len(assets_dict)}") print(f" Total tasks: {total_tasks_new}") print(f" Queries executed: 1 (single JOIN query)") # Performance comparison print("\n3. PERFORMANCE COMPARISON:") improvement = ((old_time - new_time) / old_time) * 100 print(f" Old pattern: {old_time * 1000:.2f}ms") print(f" New pattern: {new_time * 1000:.2f}ms") print(f" Improvement: {improvement:.1f}%") if new_time < 0.5: # 500ms requirement print(f" ✅ Performance requirement met (< 500ms)") else: print(f" ⚠️ Performance requirement not met ({new_time * 1000:.2f}ms >= 500ms)") # Verify data consistency print("\n4. DATA CONSISTENCY CHECK:") if total_tasks == total_tasks_new: print(" ✅ Task counts match between old and new patterns") else: print(f" ❌ Task count mismatch: old={total_tasks}, new={total_tasks_new}") if len(assets) == len(assets_dict): print(" ✅ Asset counts match between old and new patterns") else: print(f" ❌ Asset count mismatch: old={len(assets)}, new={len(assets_dict)}") print("\n" + "=" * 50) print("✅ Query Performance Test Completed!") except Exception as e: print(f"❌ Test failed with error: {e}") import traceback traceback.print_exc() finally: db.close() if __name__ == "__main__": test_old_vs_new_query_pattern()