502 lines
18 KiB
Python
502 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Backend Performance Validation Test
|
|
|
|
This test validates the performance requirements for the shot-asset-task-status-optimization:
|
|
- Test optimized queries with datasets of 100+ shots/assets
|
|
- Measure query execution time and ensure sub-500ms performance
|
|
- Validate database-level aggregation is being used
|
|
|
|
Requirements: 1.5, 2.5, 3.5
|
|
"""
|
|
|
|
import requests
|
|
import time
|
|
import sqlite3
|
|
import statistics
|
|
from typing import List, Dict, Tuple
|
|
from contextlib import contextmanager
|
|
|
|
BASE_URL = "http://localhost:8000"
|
|
|
|
# Test credentials
|
|
LOGIN_DATA = {
|
|
"email": "admin@vfx.com",
|
|
"password": "admin123"
|
|
}
|
|
|
|
@contextmanager
|
|
def timer():
|
|
"""Context manager to measure execution time."""
|
|
start = time.time()
|
|
yield lambda: (time.time() - start) * 1000
|
|
|
|
def login() -> str:
|
|
"""Login and get access token"""
|
|
response = requests.post(f"{BASE_URL}/auth/login", json=LOGIN_DATA)
|
|
if response.status_code == 200:
|
|
return response.json()["access_token"]
|
|
else:
|
|
print(f"❌ Login failed: {response.status_code}")
|
|
return None
|
|
|
|
def get_database_stats() -> Dict:
|
|
"""Get current database statistics for shots, assets, and tasks."""
|
|
conn = sqlite3.connect('database.db')
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
# Count active shots
|
|
cursor.execute("SELECT COUNT(*) FROM shots WHERE deleted_at IS NULL")
|
|
shot_count = cursor.fetchone()[0]
|
|
|
|
# Count active assets
|
|
cursor.execute("SELECT COUNT(*) FROM assets WHERE deleted_at IS NULL")
|
|
asset_count = cursor.fetchone()[0]
|
|
|
|
# Count active tasks
|
|
cursor.execute("SELECT COUNT(*) FROM tasks WHERE deleted_at IS NULL")
|
|
task_count = cursor.fetchone()[0]
|
|
|
|
# Count tasks by entity type
|
|
cursor.execute("""
|
|
SELECT
|
|
COUNT(CASE WHEN shot_id IS NOT NULL THEN 1 END) as shot_tasks,
|
|
COUNT(CASE WHEN asset_id IS NOT NULL THEN 1 END) as asset_tasks
|
|
FROM tasks WHERE deleted_at IS NULL
|
|
""")
|
|
shot_tasks, asset_tasks = cursor.fetchone()
|
|
|
|
return {
|
|
'shots': shot_count,
|
|
'assets': asset_count,
|
|
'tasks': task_count,
|
|
'shot_tasks': shot_tasks,
|
|
'asset_tasks': asset_tasks
|
|
}
|
|
finally:
|
|
conn.close()
|
|
|
|
def validate_database_aggregation() -> bool:
|
|
"""Validate that database-level aggregation is being used (Requirement 3.5)."""
|
|
print("\n=== Validating Database-Level Aggregation (Requirement 3.5) ===")
|
|
|
|
conn = sqlite3.connect('database.db')
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
# Test the actual optimized query patterns used in the routers
|
|
print("Testing shot aggregation query pattern...")
|
|
|
|
# This is the pattern used in the optimized shots router
|
|
with timer() as get_time:
|
|
cursor.execute("""
|
|
SELECT
|
|
s.id, s.name, s.status,
|
|
t.id as task_id, t.task_type, t.status as task_status,
|
|
t.assigned_user_id
|
|
FROM shots s
|
|
LEFT JOIN tasks t ON s.id = t.shot_id AND t.deleted_at IS NULL
|
|
WHERE s.deleted_at IS NULL
|
|
LIMIT 10
|
|
""")
|
|
results = cursor.fetchall()
|
|
|
|
db_time = get_time()
|
|
print(f" Database aggregation time: {db_time:.2f}ms")
|
|
print(f" Results returned: {len(results)} rows")
|
|
|
|
# Test asset aggregation query pattern
|
|
print("\nTesting asset aggregation query pattern...")
|
|
|
|
with timer() as get_time:
|
|
cursor.execute("""
|
|
SELECT
|
|
a.id, a.name, a.status,
|
|
t.id as task_id, t.task_type, t.status as task_status,
|
|
t.assigned_user_id
|
|
FROM assets a
|
|
LEFT JOIN tasks t ON a.id = t.asset_id AND t.deleted_at IS NULL
|
|
WHERE a.deleted_at IS NULL
|
|
LIMIT 10
|
|
""")
|
|
results = cursor.fetchall()
|
|
|
|
db_time = get_time()
|
|
print(f" Database aggregation time: {db_time:.2f}ms")
|
|
print(f" Results returned: {len(results)} rows")
|
|
|
|
# Verify indexes are being used
|
|
print("\nVerifying index usage...")
|
|
|
|
# Check shot task index usage
|
|
cursor.execute("EXPLAIN QUERY PLAN SELECT * FROM tasks WHERE shot_id = 1 AND deleted_at IS NULL")
|
|
plan = cursor.fetchall()
|
|
shot_index_used = any("USING INDEX" in str(row) for row in plan)
|
|
|
|
# Check asset task index usage
|
|
cursor.execute("EXPLAIN QUERY PLAN SELECT * FROM tasks WHERE asset_id = 1 AND deleted_at IS NULL")
|
|
plan = cursor.fetchall()
|
|
asset_index_used = any("USING INDEX" in str(row) for row in plan)
|
|
|
|
print(f" Shot task index used: {'✅' if shot_index_used else '❌'}")
|
|
print(f" Asset task index used: {'✅' if asset_index_used else '❌'}")
|
|
|
|
if shot_index_used and asset_index_used:
|
|
print("✅ Database-level aggregation validation passed!")
|
|
return True
|
|
else:
|
|
print("❌ Database indexes not being used properly")
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"❌ Database aggregation validation failed: {e}")
|
|
return False
|
|
finally:
|
|
conn.close()
|
|
|
|
def test_shot_performance_with_large_dataset(token: str, target_count: int = 100) -> Tuple[bool, List[float]]:
|
|
"""Test shot query performance with 100+ shots (Requirements 1.5)."""
|
|
print(f"\n=== Testing Shot Performance with {target_count}+ Shots (Requirement 1.5) ===")
|
|
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
|
|
# Get available episodes/projects for testing
|
|
response = requests.get(f"{BASE_URL}/episodes/", headers=headers)
|
|
if response.status_code != 200:
|
|
print("❌ Failed to get episodes for testing")
|
|
return False, []
|
|
|
|
episodes = response.json()
|
|
if not episodes:
|
|
print("❌ No episodes available for testing")
|
|
return False, []
|
|
|
|
times = []
|
|
shots_tested = 0
|
|
|
|
# Test multiple episodes to get 100+ shots
|
|
for episode in episodes:
|
|
episode_id = episode['id']
|
|
|
|
# Test with different limits to measure scaling
|
|
for limit in [50, 100, 150]:
|
|
print(f" Testing episode {episode_id} with limit {limit}...")
|
|
|
|
with timer() as get_time:
|
|
response = requests.get(
|
|
f"{BASE_URL}/shots/?episode_id={episode_id}&limit={limit}",
|
|
headers=headers
|
|
)
|
|
|
|
query_time = get_time()
|
|
|
|
if response.status_code != 200:
|
|
print(f" ❌ Request failed: {response.status_code}")
|
|
continue
|
|
|
|
shots = response.json()
|
|
shots_count = len(shots)
|
|
shots_tested += shots_count
|
|
|
|
print(f" Query time: {query_time:.2f}ms for {shots_count} shots")
|
|
|
|
if shots_count > 0:
|
|
times.append(query_time)
|
|
|
|
# Validate response structure
|
|
sample_shot = shots[0]
|
|
required_fields = ['task_status', 'task_details', 'task_count']
|
|
missing_fields = [field for field in required_fields if field not in sample_shot]
|
|
|
|
if missing_fields:
|
|
print(f" ❌ Missing fields: {missing_fields}")
|
|
return False, times
|
|
|
|
# Validate task data aggregation
|
|
task_status_count = len(sample_shot['task_status'])
|
|
task_details_count = len(sample_shot['task_details'])
|
|
|
|
print(f" Task status types: {task_status_count}")
|
|
print(f" Task details: {task_details_count}")
|
|
|
|
# Stop if we've tested enough shots
|
|
if shots_tested >= target_count:
|
|
break
|
|
|
|
if shots_tested >= target_count:
|
|
break
|
|
|
|
if not times:
|
|
print("❌ No valid performance measurements obtained")
|
|
return False, []
|
|
|
|
avg_time = statistics.mean(times)
|
|
max_time = max(times)
|
|
min_time = min(times)
|
|
|
|
print(f"\nPerformance Summary:")
|
|
print(f" Total shots tested: {shots_tested}")
|
|
print(f" Average query time: {avg_time:.2f}ms")
|
|
print(f" Min query time: {min_time:.2f}ms")
|
|
print(f" Max query time: {max_time:.2f}ms")
|
|
|
|
# Requirement 1.5: sub-500ms performance
|
|
performance_passed = avg_time < 500 and max_time < 500
|
|
|
|
if performance_passed:
|
|
print("✅ Shot performance requirement met (< 500ms)")
|
|
else:
|
|
print(f"❌ Shot performance requirement failed (avg: {avg_time:.2f}ms, max: {max_time:.2f}ms)")
|
|
|
|
return performance_passed, times
|
|
|
|
def test_asset_performance_with_large_dataset(token: str, target_count: int = 100) -> Tuple[bool, List[float]]:
|
|
"""Test asset query performance with 100+ assets (Requirements 2.5)."""
|
|
print(f"\n=== Testing Asset Performance with {target_count}+ Assets (Requirement 2.5) ===")
|
|
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
|
|
# Get available projects for testing
|
|
response = requests.get(f"{BASE_URL}/projects/", headers=headers)
|
|
if response.status_code != 200:
|
|
print("❌ Failed to get projects for testing")
|
|
return False, []
|
|
|
|
projects = response.json()
|
|
if not projects:
|
|
print("❌ No projects available for testing")
|
|
return False, []
|
|
|
|
times = []
|
|
assets_tested = 0
|
|
|
|
# Test multiple projects to get 100+ assets
|
|
for project in projects:
|
|
project_id = project['id']
|
|
|
|
# Test with different limits to measure scaling
|
|
for limit in [50, 100, 150]:
|
|
print(f" Testing project {project_id} with limit {limit}...")
|
|
|
|
with timer() as get_time:
|
|
response = requests.get(
|
|
f"{BASE_URL}/assets/?project_id={project_id}&limit={limit}",
|
|
headers=headers
|
|
)
|
|
|
|
query_time = get_time()
|
|
|
|
if response.status_code != 200:
|
|
print(f" ❌ Request failed: {response.status_code}")
|
|
continue
|
|
|
|
assets = response.json()
|
|
assets_count = len(assets)
|
|
assets_tested += assets_count
|
|
|
|
print(f" Query time: {query_time:.2f}ms for {assets_count} assets")
|
|
|
|
if assets_count > 0:
|
|
times.append(query_time)
|
|
|
|
# Validate response structure
|
|
sample_asset = assets[0]
|
|
required_fields = ['task_status', 'task_details', 'task_count']
|
|
missing_fields = [field for field in required_fields if field not in sample_asset]
|
|
|
|
if missing_fields:
|
|
print(f" ❌ Missing fields: {missing_fields}")
|
|
return False, times
|
|
|
|
# Validate task data aggregation
|
|
task_status_count = len(sample_asset['task_status'])
|
|
task_details_count = len(sample_asset['task_details'])
|
|
|
|
print(f" Task status types: {task_status_count}")
|
|
print(f" Task details: {task_details_count}")
|
|
|
|
# Stop if we've tested enough assets
|
|
if assets_tested >= target_count:
|
|
break
|
|
|
|
if assets_tested >= target_count:
|
|
break
|
|
|
|
if not times:
|
|
print("❌ No valid performance measurements obtained")
|
|
return False, []
|
|
|
|
avg_time = statistics.mean(times)
|
|
max_time = max(times)
|
|
min_time = min(times)
|
|
|
|
print(f"\nPerformance Summary:")
|
|
print(f" Total assets tested: {assets_tested}")
|
|
print(f" Average query time: {avg_time:.2f}ms")
|
|
print(f" Min query time: {min_time:.2f}ms")
|
|
print(f" Max query time: {max_time:.2f}ms")
|
|
|
|
# Requirement 2.5: sub-500ms performance
|
|
performance_passed = avg_time < 500 and max_time < 500
|
|
|
|
if performance_passed:
|
|
print("✅ Asset performance requirement met (< 500ms)")
|
|
else:
|
|
print(f"❌ Asset performance requirement failed (avg: {avg_time:.2f}ms, max: {max_time:.2f}ms)")
|
|
|
|
return performance_passed, times
|
|
|
|
def test_single_query_optimization(token: str) -> bool:
|
|
"""Test that single query optimization is working (Requirements 1.1, 2.1)."""
|
|
print("\n=== Testing Single Query Optimization (Requirements 1.1, 2.1) ===")
|
|
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
|
|
# Test shot single query optimization
|
|
print("Testing shot single query optimization...")
|
|
|
|
with timer() as get_time:
|
|
response = requests.get(f"{BASE_URL}/shots/?limit=10", headers=headers)
|
|
|
|
shot_time = get_time()
|
|
|
|
if response.status_code != 200:
|
|
print(f"❌ Shot request failed: {response.status_code}")
|
|
return False
|
|
|
|
shots = response.json()
|
|
print(f" Shot query time: {shot_time:.2f}ms for {len(shots)} shots")
|
|
|
|
# Verify task data is embedded
|
|
if shots:
|
|
shot = shots[0]
|
|
has_task_status = 'task_status' in shot and isinstance(shot['task_status'], dict)
|
|
has_task_details = 'task_details' in shot and isinstance(shot['task_details'], list)
|
|
|
|
if not (has_task_status and has_task_details):
|
|
print("❌ Shot task data not properly embedded")
|
|
return False
|
|
|
|
print(f" ✅ Shot has {len(shot['task_status'])} task status types")
|
|
print(f" ✅ Shot has {len(shot['task_details'])} task details")
|
|
|
|
# Test asset single query optimization
|
|
print("\nTesting asset single query optimization...")
|
|
|
|
with timer() as get_time:
|
|
response = requests.get(f"{BASE_URL}/assets/?limit=10", headers=headers)
|
|
|
|
asset_time = get_time()
|
|
|
|
if response.status_code != 200:
|
|
print(f"❌ Asset request failed: {response.status_code}")
|
|
return False
|
|
|
|
assets = response.json()
|
|
print(f" Asset query time: {asset_time:.2f}ms for {len(assets)} assets")
|
|
|
|
# Verify task data is embedded
|
|
if assets:
|
|
asset = assets[0]
|
|
has_task_status = 'task_status' in asset and isinstance(asset['task_status'], dict)
|
|
has_task_details = 'task_details' in asset and isinstance(asset['task_details'], list)
|
|
|
|
if not (has_task_status and has_task_details):
|
|
print("❌ Asset task data not properly embedded")
|
|
return False
|
|
|
|
print(f" ✅ Asset has {len(asset['task_status'])} task status types")
|
|
print(f" ✅ Asset has {len(asset['task_details'])} task details")
|
|
|
|
print("✅ Single query optimization validation passed!")
|
|
return True
|
|
|
|
def main():
|
|
"""Run all backend performance validation tests."""
|
|
print("Backend Performance Validation Test")
|
|
print("=" * 60)
|
|
print("Testing Requirements: 1.5, 2.5, 3.5")
|
|
print("=" * 60)
|
|
|
|
# Check if server is running
|
|
try:
|
|
response = requests.get(f"{BASE_URL}/docs")
|
|
if response.status_code != 200:
|
|
print("❌ Server is not running. Please start the backend server first.")
|
|
return
|
|
except requests.exceptions.ConnectionError:
|
|
print("❌ Cannot connect to server. Please start the backend server first.")
|
|
return
|
|
|
|
# Login
|
|
token = login()
|
|
if not token:
|
|
print("❌ Authentication failed")
|
|
return
|
|
|
|
# Get database statistics
|
|
stats = get_database_stats()
|
|
print(f"\nDatabase Statistics:")
|
|
print(f" Active shots: {stats['shots']}")
|
|
print(f" Active assets: {stats['assets']}")
|
|
print(f" Active tasks: {stats['tasks']}")
|
|
print(f" Shot tasks: {stats['shot_tasks']}")
|
|
print(f" Asset tasks: {stats['asset_tasks']}")
|
|
|
|
if stats['shots'] < 10 or stats['assets'] < 10:
|
|
print("\n⚠️ Warning: Limited test data available.")
|
|
print(" Consider running create_example_data.py for more comprehensive testing.")
|
|
|
|
# Run all validation tests
|
|
tests_passed = 0
|
|
total_tests = 4
|
|
|
|
# Test 1: Database-level aggregation validation
|
|
if validate_database_aggregation():
|
|
tests_passed += 1
|
|
|
|
# Test 2: Single query optimization
|
|
if test_single_query_optimization(token):
|
|
tests_passed += 1
|
|
|
|
# Test 3: Shot performance with large dataset
|
|
shot_perf_passed, shot_times = test_shot_performance_with_large_dataset(token)
|
|
if shot_perf_passed:
|
|
tests_passed += 1
|
|
|
|
# Test 4: Asset performance with large dataset
|
|
asset_perf_passed, asset_times = test_asset_performance_with_large_dataset(token)
|
|
if asset_perf_passed:
|
|
tests_passed += 1
|
|
|
|
# Final summary
|
|
print("\n" + "=" * 60)
|
|
print("BACKEND PERFORMANCE VALIDATION SUMMARY")
|
|
print("=" * 60)
|
|
|
|
print(f"Tests passed: {tests_passed}/{total_tests}")
|
|
|
|
if shot_times:
|
|
print(f"Shot query performance: avg {statistics.mean(shot_times):.2f}ms")
|
|
if asset_times:
|
|
print(f"Asset query performance: avg {statistics.mean(asset_times):.2f}ms")
|
|
|
|
if tests_passed == total_tests:
|
|
print("✅ ALL PERFORMANCE REQUIREMENTS VALIDATED!")
|
|
print("\nRequirements Status:")
|
|
print(" ✅ 1.5: Shot performance < 500ms")
|
|
print(" ✅ 2.5: Asset performance < 500ms")
|
|
print(" ✅ 3.5: Database-level aggregation used")
|
|
else:
|
|
print(f"❌ {total_tests - tests_passed} performance requirements failed")
|
|
|
|
if not shot_perf_passed:
|
|
print(" ❌ 1.5: Shot performance requirement failed")
|
|
if not asset_perf_passed:
|
|
print(" ❌ 2.5: Asset performance requirement failed")
|
|
|
|
print("=" * 60)
|
|
|
|
if __name__ == "__main__":
|
|
main() |