LinkDesk/backend/test_backend_performance_va...

502 lines
18 KiB
Python

#!/usr/bin/env python3
"""
Backend Performance Validation Test
This test validates the performance requirements for the shot-asset-task-status-optimization:
- Test optimized queries with datasets of 100+ shots/assets
- Measure query execution time and ensure sub-500ms performance
- Validate database-level aggregation is being used
Requirements: 1.5, 2.5, 3.5
"""
import requests
import time
import sqlite3
import statistics
from typing import List, Dict, Tuple
from contextlib import contextmanager
BASE_URL = "http://localhost:8000"
# Test credentials
LOGIN_DATA = {
"email": "admin@vfx.com",
"password": "admin123"
}
@contextmanager
def timer():
"""Context manager to measure execution time."""
start = time.time()
yield lambda: (time.time() - start) * 1000
def login() -> str:
"""Login and get access token"""
response = requests.post(f"{BASE_URL}/auth/login", json=LOGIN_DATA)
if response.status_code == 200:
return response.json()["access_token"]
else:
print(f"❌ Login failed: {response.status_code}")
return None
def get_database_stats() -> Dict:
"""Get current database statistics for shots, assets, and tasks."""
conn = sqlite3.connect('database.db')
cursor = conn.cursor()
try:
# Count active shots
cursor.execute("SELECT COUNT(*) FROM shots WHERE deleted_at IS NULL")
shot_count = cursor.fetchone()[0]
# Count active assets
cursor.execute("SELECT COUNT(*) FROM assets WHERE deleted_at IS NULL")
asset_count = cursor.fetchone()[0]
# Count active tasks
cursor.execute("SELECT COUNT(*) FROM tasks WHERE deleted_at IS NULL")
task_count = cursor.fetchone()[0]
# Count tasks by entity type
cursor.execute("""
SELECT
COUNT(CASE WHEN shot_id IS NOT NULL THEN 1 END) as shot_tasks,
COUNT(CASE WHEN asset_id IS NOT NULL THEN 1 END) as asset_tasks
FROM tasks WHERE deleted_at IS NULL
""")
shot_tasks, asset_tasks = cursor.fetchone()
return {
'shots': shot_count,
'assets': asset_count,
'tasks': task_count,
'shot_tasks': shot_tasks,
'asset_tasks': asset_tasks
}
finally:
conn.close()
def validate_database_aggregation() -> bool:
"""Validate that database-level aggregation is being used (Requirement 3.5)."""
print("\n=== Validating Database-Level Aggregation (Requirement 3.5) ===")
conn = sqlite3.connect('database.db')
cursor = conn.cursor()
try:
# Test the actual optimized query patterns used in the routers
print("Testing shot aggregation query pattern...")
# This is the pattern used in the optimized shots router
with timer() as get_time:
cursor.execute("""
SELECT
s.id, s.name, s.status,
t.id as task_id, t.task_type, t.status as task_status,
t.assigned_user_id
FROM shots s
LEFT JOIN tasks t ON s.id = t.shot_id AND t.deleted_at IS NULL
WHERE s.deleted_at IS NULL
LIMIT 10
""")
results = cursor.fetchall()
db_time = get_time()
print(f" Database aggregation time: {db_time:.2f}ms")
print(f" Results returned: {len(results)} rows")
# Test asset aggregation query pattern
print("\nTesting asset aggregation query pattern...")
with timer() as get_time:
cursor.execute("""
SELECT
a.id, a.name, a.status,
t.id as task_id, t.task_type, t.status as task_status,
t.assigned_user_id
FROM assets a
LEFT JOIN tasks t ON a.id = t.asset_id AND t.deleted_at IS NULL
WHERE a.deleted_at IS NULL
LIMIT 10
""")
results = cursor.fetchall()
db_time = get_time()
print(f" Database aggregation time: {db_time:.2f}ms")
print(f" Results returned: {len(results)} rows")
# Verify indexes are being used
print("\nVerifying index usage...")
# Check shot task index usage
cursor.execute("EXPLAIN QUERY PLAN SELECT * FROM tasks WHERE shot_id = 1 AND deleted_at IS NULL")
plan = cursor.fetchall()
shot_index_used = any("USING INDEX" in str(row) for row in plan)
# Check asset task index usage
cursor.execute("EXPLAIN QUERY PLAN SELECT * FROM tasks WHERE asset_id = 1 AND deleted_at IS NULL")
plan = cursor.fetchall()
asset_index_used = any("USING INDEX" in str(row) for row in plan)
print(f" Shot task index used: {'' if shot_index_used else ''}")
print(f" Asset task index used: {'' if asset_index_used else ''}")
if shot_index_used and asset_index_used:
print("✅ Database-level aggregation validation passed!")
return True
else:
print("❌ Database indexes not being used properly")
return False
except Exception as e:
print(f"❌ Database aggregation validation failed: {e}")
return False
finally:
conn.close()
def test_shot_performance_with_large_dataset(token: str, target_count: int = 100) -> Tuple[bool, List[float]]:
"""Test shot query performance with 100+ shots (Requirements 1.5)."""
print(f"\n=== Testing Shot Performance with {target_count}+ Shots (Requirement 1.5) ===")
headers = {"Authorization": f"Bearer {token}"}
# Get available episodes/projects for testing
response = requests.get(f"{BASE_URL}/episodes/", headers=headers)
if response.status_code != 200:
print("❌ Failed to get episodes for testing")
return False, []
episodes = response.json()
if not episodes:
print("❌ No episodes available for testing")
return False, []
times = []
shots_tested = 0
# Test multiple episodes to get 100+ shots
for episode in episodes:
episode_id = episode['id']
# Test with different limits to measure scaling
for limit in [50, 100, 150]:
print(f" Testing episode {episode_id} with limit {limit}...")
with timer() as get_time:
response = requests.get(
f"{BASE_URL}/shots/?episode_id={episode_id}&limit={limit}",
headers=headers
)
query_time = get_time()
if response.status_code != 200:
print(f" ❌ Request failed: {response.status_code}")
continue
shots = response.json()
shots_count = len(shots)
shots_tested += shots_count
print(f" Query time: {query_time:.2f}ms for {shots_count} shots")
if shots_count > 0:
times.append(query_time)
# Validate response structure
sample_shot = shots[0]
required_fields = ['task_status', 'task_details', 'task_count']
missing_fields = [field for field in required_fields if field not in sample_shot]
if missing_fields:
print(f" ❌ Missing fields: {missing_fields}")
return False, times
# Validate task data aggregation
task_status_count = len(sample_shot['task_status'])
task_details_count = len(sample_shot['task_details'])
print(f" Task status types: {task_status_count}")
print(f" Task details: {task_details_count}")
# Stop if we've tested enough shots
if shots_tested >= target_count:
break
if shots_tested >= target_count:
break
if not times:
print("❌ No valid performance measurements obtained")
return False, []
avg_time = statistics.mean(times)
max_time = max(times)
min_time = min(times)
print(f"\nPerformance Summary:")
print(f" Total shots tested: {shots_tested}")
print(f" Average query time: {avg_time:.2f}ms")
print(f" Min query time: {min_time:.2f}ms")
print(f" Max query time: {max_time:.2f}ms")
# Requirement 1.5: sub-500ms performance
performance_passed = avg_time < 500 and max_time < 500
if performance_passed:
print("✅ Shot performance requirement met (< 500ms)")
else:
print(f"❌ Shot performance requirement failed (avg: {avg_time:.2f}ms, max: {max_time:.2f}ms)")
return performance_passed, times
def test_asset_performance_with_large_dataset(token: str, target_count: int = 100) -> Tuple[bool, List[float]]:
"""Test asset query performance with 100+ assets (Requirements 2.5)."""
print(f"\n=== Testing Asset Performance with {target_count}+ Assets (Requirement 2.5) ===")
headers = {"Authorization": f"Bearer {token}"}
# Get available projects for testing
response = requests.get(f"{BASE_URL}/projects/", headers=headers)
if response.status_code != 200:
print("❌ Failed to get projects for testing")
return False, []
projects = response.json()
if not projects:
print("❌ No projects available for testing")
return False, []
times = []
assets_tested = 0
# Test multiple projects to get 100+ assets
for project in projects:
project_id = project['id']
# Test with different limits to measure scaling
for limit in [50, 100, 150]:
print(f" Testing project {project_id} with limit {limit}...")
with timer() as get_time:
response = requests.get(
f"{BASE_URL}/assets/?project_id={project_id}&limit={limit}",
headers=headers
)
query_time = get_time()
if response.status_code != 200:
print(f" ❌ Request failed: {response.status_code}")
continue
assets = response.json()
assets_count = len(assets)
assets_tested += assets_count
print(f" Query time: {query_time:.2f}ms for {assets_count} assets")
if assets_count > 0:
times.append(query_time)
# Validate response structure
sample_asset = assets[0]
required_fields = ['task_status', 'task_details', 'task_count']
missing_fields = [field for field in required_fields if field not in sample_asset]
if missing_fields:
print(f" ❌ Missing fields: {missing_fields}")
return False, times
# Validate task data aggregation
task_status_count = len(sample_asset['task_status'])
task_details_count = len(sample_asset['task_details'])
print(f" Task status types: {task_status_count}")
print(f" Task details: {task_details_count}")
# Stop if we've tested enough assets
if assets_tested >= target_count:
break
if assets_tested >= target_count:
break
if not times:
print("❌ No valid performance measurements obtained")
return False, []
avg_time = statistics.mean(times)
max_time = max(times)
min_time = min(times)
print(f"\nPerformance Summary:")
print(f" Total assets tested: {assets_tested}")
print(f" Average query time: {avg_time:.2f}ms")
print(f" Min query time: {min_time:.2f}ms")
print(f" Max query time: {max_time:.2f}ms")
# Requirement 2.5: sub-500ms performance
performance_passed = avg_time < 500 and max_time < 500
if performance_passed:
print("✅ Asset performance requirement met (< 500ms)")
else:
print(f"❌ Asset performance requirement failed (avg: {avg_time:.2f}ms, max: {max_time:.2f}ms)")
return performance_passed, times
def test_single_query_optimization(token: str) -> bool:
"""Test that single query optimization is working (Requirements 1.1, 2.1)."""
print("\n=== Testing Single Query Optimization (Requirements 1.1, 2.1) ===")
headers = {"Authorization": f"Bearer {token}"}
# Test shot single query optimization
print("Testing shot single query optimization...")
with timer() as get_time:
response = requests.get(f"{BASE_URL}/shots/?limit=10", headers=headers)
shot_time = get_time()
if response.status_code != 200:
print(f"❌ Shot request failed: {response.status_code}")
return False
shots = response.json()
print(f" Shot query time: {shot_time:.2f}ms for {len(shots)} shots")
# Verify task data is embedded
if shots:
shot = shots[0]
has_task_status = 'task_status' in shot and isinstance(shot['task_status'], dict)
has_task_details = 'task_details' in shot and isinstance(shot['task_details'], list)
if not (has_task_status and has_task_details):
print("❌ Shot task data not properly embedded")
return False
print(f" ✅ Shot has {len(shot['task_status'])} task status types")
print(f" ✅ Shot has {len(shot['task_details'])} task details")
# Test asset single query optimization
print("\nTesting asset single query optimization...")
with timer() as get_time:
response = requests.get(f"{BASE_URL}/assets/?limit=10", headers=headers)
asset_time = get_time()
if response.status_code != 200:
print(f"❌ Asset request failed: {response.status_code}")
return False
assets = response.json()
print(f" Asset query time: {asset_time:.2f}ms for {len(assets)} assets")
# Verify task data is embedded
if assets:
asset = assets[0]
has_task_status = 'task_status' in asset and isinstance(asset['task_status'], dict)
has_task_details = 'task_details' in asset and isinstance(asset['task_details'], list)
if not (has_task_status and has_task_details):
print("❌ Asset task data not properly embedded")
return False
print(f" ✅ Asset has {len(asset['task_status'])} task status types")
print(f" ✅ Asset has {len(asset['task_details'])} task details")
print("✅ Single query optimization validation passed!")
return True
def main():
"""Run all backend performance validation tests."""
print("Backend Performance Validation Test")
print("=" * 60)
print("Testing Requirements: 1.5, 2.5, 3.5")
print("=" * 60)
# Check if server is running
try:
response = requests.get(f"{BASE_URL}/docs")
if response.status_code != 200:
print("❌ Server is not running. Please start the backend server first.")
return
except requests.exceptions.ConnectionError:
print("❌ Cannot connect to server. Please start the backend server first.")
return
# Login
token = login()
if not token:
print("❌ Authentication failed")
return
# Get database statistics
stats = get_database_stats()
print(f"\nDatabase Statistics:")
print(f" Active shots: {stats['shots']}")
print(f" Active assets: {stats['assets']}")
print(f" Active tasks: {stats['tasks']}")
print(f" Shot tasks: {stats['shot_tasks']}")
print(f" Asset tasks: {stats['asset_tasks']}")
if stats['shots'] < 10 or stats['assets'] < 10:
print("\n⚠️ Warning: Limited test data available.")
print(" Consider running create_example_data.py for more comprehensive testing.")
# Run all validation tests
tests_passed = 0
total_tests = 4
# Test 1: Database-level aggregation validation
if validate_database_aggregation():
tests_passed += 1
# Test 2: Single query optimization
if test_single_query_optimization(token):
tests_passed += 1
# Test 3: Shot performance with large dataset
shot_perf_passed, shot_times = test_shot_performance_with_large_dataset(token)
if shot_perf_passed:
tests_passed += 1
# Test 4: Asset performance with large dataset
asset_perf_passed, asset_times = test_asset_performance_with_large_dataset(token)
if asset_perf_passed:
tests_passed += 1
# Final summary
print("\n" + "=" * 60)
print("BACKEND PERFORMANCE VALIDATION SUMMARY")
print("=" * 60)
print(f"Tests passed: {tests_passed}/{total_tests}")
if shot_times:
print(f"Shot query performance: avg {statistics.mean(shot_times):.2f}ms")
if asset_times:
print(f"Asset query performance: avg {statistics.mean(asset_times):.2f}ms")
if tests_passed == total_tests:
print("✅ ALL PERFORMANCE REQUIREMENTS VALIDATED!")
print("\nRequirements Status:")
print(" ✅ 1.5: Shot performance < 500ms")
print(" ✅ 2.5: Asset performance < 500ms")
print(" ✅ 3.5: Database-level aggregation used")
else:
print(f"{total_tests - tests_passed} performance requirements failed")
if not shot_perf_passed:
print(" ❌ 1.5: Shot performance requirement failed")
if not asset_perf_passed:
print(" ❌ 2.5: Asset performance requirement failed")
print("=" * 60)
if __name__ == "__main__":
main()