LinkDesk/backend/test_asset_query_performanc...

138 lines
4.8 KiB
Python

#!/usr/bin/env python3
"""
Direct database test to verify the asset query optimization performance.
This tests the database queries directly without HTTP overhead.
"""
import time
from sqlalchemy.orm import Session, joinedload
from database import get_db
from models.asset import Asset
from models.task import Task
from models.project import Project
def test_old_vs_new_query_pattern():
"""Compare old N+1 pattern vs new optimized pattern."""
print("Asset Query Performance Comparison")
print("=" * 50)
# Get database session
db = next(get_db())
try:
# Test with project_id = 2 (has assets)
project_id = 2
limit = 50
print(f"Testing with project_id={project_id}, limit={limit}")
# OLD PATTERN (N+1 queries) - Simulated
print("\n1. OLD PATTERN (N+1 queries):")
start_time = time.time()
# Main query for assets
assets = db.query(Asset).filter(
Asset.deleted_at.is_(None),
Asset.project_id == project_id
).limit(limit).all()
# Simulate N+1 pattern - separate query for each asset's tasks
total_tasks = 0
for asset in assets:
tasks = db.query(Task).filter(
Task.asset_id == asset.id,
Task.deleted_at.is_(None)
).all()
total_tasks += len(tasks)
old_time = time.time() - start_time
print(f" Time: {old_time * 1000:.2f}ms")
print(f" Assets: {len(assets)}")
print(f" Total tasks: {total_tasks}")
print(f" Queries executed: {len(assets) + 1} (1 for assets + {len(assets)} for tasks)")
# NEW PATTERN (Single JOIN query)
print("\n2. NEW PATTERN (Single JOIN query):")
start_time = time.time()
# Single query with JOIN
assets_with_tasks = (
db.query(Asset)
.outerjoin(Task, (Task.asset_id == Asset.id) & (Task.deleted_at.is_(None)))
.filter(Asset.deleted_at.is_(None), Asset.project_id == project_id)
.options(joinedload(Asset.project))
.add_columns(
Task.id.label('task_id'),
Task.task_type,
Task.status.label('task_status'),
Task.assigned_user_id
)
.limit(limit)
.all()
)
# Process results (simulate the aggregation logic)
assets_dict = {}
for row in assets_with_tasks:
asset = row[0]
task_id = row[1]
if asset.id not in assets_dict:
assets_dict[asset.id] = {
'asset': asset,
'tasks': []
}
if task_id is not None:
assets_dict[asset.id]['tasks'].append({
'task_id': task_id,
'task_type': row[2],
'status': row[3],
'assigned_user_id': row[4]
})
new_time = time.time() - start_time
total_tasks_new = sum(len(data['tasks']) for data in assets_dict.values())
print(f" Time: {new_time * 1000:.2f}ms")
print(f" Assets: {len(assets_dict)}")
print(f" Total tasks: {total_tasks_new}")
print(f" Queries executed: 1 (single JOIN query)")
# Performance comparison
print("\n3. PERFORMANCE COMPARISON:")
improvement = ((old_time - new_time) / old_time) * 100
print(f" Old pattern: {old_time * 1000:.2f}ms")
print(f" New pattern: {new_time * 1000:.2f}ms")
print(f" Improvement: {improvement:.1f}%")
if new_time < 0.5: # 500ms requirement
print(f" ✅ Performance requirement met (< 500ms)")
else:
print(f" ⚠️ Performance requirement not met ({new_time * 1000:.2f}ms >= 500ms)")
# Verify data consistency
print("\n4. DATA CONSISTENCY CHECK:")
if total_tasks == total_tasks_new:
print(" ✅ Task counts match between old and new patterns")
else:
print(f" ❌ Task count mismatch: old={total_tasks}, new={total_tasks_new}")
if len(assets) == len(assets_dict):
print(" ✅ Asset counts match between old and new patterns")
else:
print(f" ❌ Asset count mismatch: old={len(assets)}, new={len(assets_dict)}")
print("\n" + "=" * 50)
print("✅ Query Performance Test Completed!")
except Exception as e:
print(f"❌ Test failed with error: {e}")
import traceback
traceback.print_exc()
finally:
db.close()
if __name__ == "__main__":
test_old_vs_new_query_pattern()