138 lines
4.8 KiB
Python
138 lines
4.8 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Direct database test to verify the asset query optimization performance.
|
|
This tests the database queries directly without HTTP overhead.
|
|
"""
|
|
|
|
import time
|
|
from sqlalchemy.orm import Session, joinedload
|
|
from database import get_db
|
|
from models.asset import Asset
|
|
from models.task import Task
|
|
from models.project import Project
|
|
|
|
def test_old_vs_new_query_pattern():
|
|
"""Compare old N+1 pattern vs new optimized pattern."""
|
|
print("Asset Query Performance Comparison")
|
|
print("=" * 50)
|
|
|
|
# Get database session
|
|
db = next(get_db())
|
|
|
|
try:
|
|
# Test with project_id = 2 (has assets)
|
|
project_id = 2
|
|
limit = 50
|
|
|
|
print(f"Testing with project_id={project_id}, limit={limit}")
|
|
|
|
# OLD PATTERN (N+1 queries) - Simulated
|
|
print("\n1. OLD PATTERN (N+1 queries):")
|
|
start_time = time.time()
|
|
|
|
# Main query for assets
|
|
assets = db.query(Asset).filter(
|
|
Asset.deleted_at.is_(None),
|
|
Asset.project_id == project_id
|
|
).limit(limit).all()
|
|
|
|
# Simulate N+1 pattern - separate query for each asset's tasks
|
|
total_tasks = 0
|
|
for asset in assets:
|
|
tasks = db.query(Task).filter(
|
|
Task.asset_id == asset.id,
|
|
Task.deleted_at.is_(None)
|
|
).all()
|
|
total_tasks += len(tasks)
|
|
|
|
old_time = time.time() - start_time
|
|
print(f" Time: {old_time * 1000:.2f}ms")
|
|
print(f" Assets: {len(assets)}")
|
|
print(f" Total tasks: {total_tasks}")
|
|
print(f" Queries executed: {len(assets) + 1} (1 for assets + {len(assets)} for tasks)")
|
|
|
|
# NEW PATTERN (Single JOIN query)
|
|
print("\n2. NEW PATTERN (Single JOIN query):")
|
|
start_time = time.time()
|
|
|
|
# Single query with JOIN
|
|
assets_with_tasks = (
|
|
db.query(Asset)
|
|
.outerjoin(Task, (Task.asset_id == Asset.id) & (Task.deleted_at.is_(None)))
|
|
.filter(Asset.deleted_at.is_(None), Asset.project_id == project_id)
|
|
.options(joinedload(Asset.project))
|
|
.add_columns(
|
|
Task.id.label('task_id'),
|
|
Task.task_type,
|
|
Task.status.label('task_status'),
|
|
Task.assigned_user_id
|
|
)
|
|
.limit(limit)
|
|
.all()
|
|
)
|
|
|
|
# Process results (simulate the aggregation logic)
|
|
assets_dict = {}
|
|
for row in assets_with_tasks:
|
|
asset = row[0]
|
|
task_id = row[1]
|
|
|
|
if asset.id not in assets_dict:
|
|
assets_dict[asset.id] = {
|
|
'asset': asset,
|
|
'tasks': []
|
|
}
|
|
|
|
if task_id is not None:
|
|
assets_dict[asset.id]['tasks'].append({
|
|
'task_id': task_id,
|
|
'task_type': row[2],
|
|
'status': row[3],
|
|
'assigned_user_id': row[4]
|
|
})
|
|
|
|
new_time = time.time() - start_time
|
|
total_tasks_new = sum(len(data['tasks']) for data in assets_dict.values())
|
|
|
|
print(f" Time: {new_time * 1000:.2f}ms")
|
|
print(f" Assets: {len(assets_dict)}")
|
|
print(f" Total tasks: {total_tasks_new}")
|
|
print(f" Queries executed: 1 (single JOIN query)")
|
|
|
|
# Performance comparison
|
|
print("\n3. PERFORMANCE COMPARISON:")
|
|
improvement = ((old_time - new_time) / old_time) * 100
|
|
print(f" Old pattern: {old_time * 1000:.2f}ms")
|
|
print(f" New pattern: {new_time * 1000:.2f}ms")
|
|
print(f" Improvement: {improvement:.1f}%")
|
|
|
|
if new_time < 0.5: # 500ms requirement
|
|
print(f" ✅ Performance requirement met (< 500ms)")
|
|
else:
|
|
print(f" ⚠️ Performance requirement not met ({new_time * 1000:.2f}ms >= 500ms)")
|
|
|
|
# Verify data consistency
|
|
print("\n4. DATA CONSISTENCY CHECK:")
|
|
if total_tasks == total_tasks_new:
|
|
print(" ✅ Task counts match between old and new patterns")
|
|
else:
|
|
print(f" ❌ Task count mismatch: old={total_tasks}, new={total_tasks_new}")
|
|
|
|
if len(assets) == len(assets_dict):
|
|
print(" ✅ Asset counts match between old and new patterns")
|
|
else:
|
|
print(f" ❌ Asset count mismatch: old={len(assets)}, new={len(assets_dict)}")
|
|
|
|
print("\n" + "=" * 50)
|
|
print("✅ Query Performance Test Completed!")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Test failed with error: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
if __name__ == "__main__":
|
|
test_old_vs_new_query_pattern() |