178 lines
7.5 KiB
Python
178 lines
7.5 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Comprehensive test to validate that the asset router optimization meets all requirements.
|
||
This tests Requirements 2.1, 2.3, and 3.1 from the specification.
|
||
"""
|
||
|
||
import requests
|
||
import json
|
||
import time
|
||
from typing import List, Dict
|
||
|
||
def test_requirements_validation():
|
||
"""Test all requirements for the asset router optimization."""
|
||
print("Asset Router Optimization - Requirements Validation")
|
||
print("=" * 60)
|
||
|
||
BASE_URL = 'http://localhost:8000'
|
||
|
||
# Login
|
||
login_response = requests.post(f'{BASE_URL}/auth/login', json={'email': 'admin@vfx.com', 'password': 'admin123'})
|
||
if login_response.status_code != 200:
|
||
print("❌ Login failed")
|
||
return False
|
||
|
||
token = login_response.json()['access_token']
|
||
headers = {'Authorization': f'Bearer {token}'}
|
||
|
||
# Get project for testing
|
||
projects_response = requests.get(f'{BASE_URL}/projects/', headers=headers)
|
||
projects = projects_response.json()
|
||
project_id = projects[0]['id'] if projects else 2
|
||
|
||
all_tests_passed = True
|
||
|
||
# REQUIREMENT 2.1: Single query operation for asset data fetching
|
||
print("\n1. TESTING REQUIREMENT 2.1:")
|
||
print(" 'WHEN the system fetches asset data for table display,")
|
||
print(" THE system SHALL include all associated task statuses in a single query operation'")
|
||
|
||
start_time = time.time()
|
||
response = requests.get(f'{BASE_URL}/assets/?project_id={project_id}', headers=headers)
|
||
query_time = (time.time() - start_time) * 1000
|
||
|
||
if response.status_code == 200:
|
||
assets = response.json()
|
||
|
||
# Verify that task status data is included
|
||
if assets and 'task_status' in assets[0] and 'task_details' in assets[0]:
|
||
print(f" ✅ Task status data included in single API call")
|
||
print(f" ✅ Query completed in {query_time:.2f}ms")
|
||
else:
|
||
print(" ❌ Task status data not included in response")
|
||
all_tests_passed = False
|
||
else:
|
||
print(f" ❌ Asset listing failed: {response.status_code}")
|
||
all_tests_passed = False
|
||
|
||
# REQUIREMENT 2.3: Task status aggregation for assets with multiple tasks
|
||
print("\n2. TESTING REQUIREMENT 2.3:")
|
||
print(" 'WHEN an asset has multiple tasks,")
|
||
print(" THE system SHALL aggregate all task statuses into the asset data response'")
|
||
|
||
if response.status_code == 200 and assets:
|
||
# Find an asset with multiple tasks
|
||
multi_task_asset = None
|
||
for asset in assets:
|
||
if asset.get('task_count', 0) > 1:
|
||
multi_task_asset = asset
|
||
break
|
||
|
||
if multi_task_asset:
|
||
task_status = multi_task_asset.get('task_status', {})
|
||
task_details = multi_task_asset.get('task_details', [])
|
||
|
||
print(f" ✅ Found asset '{multi_task_asset['name']}' with {multi_task_asset['task_count']} tasks")
|
||
print(f" ✅ Task status aggregated: {len(task_status)} status entries")
|
||
print(f" ✅ Task details aggregated: {len(task_details)} detail entries")
|
||
|
||
# Verify task status includes all expected types
|
||
expected_types = ['modeling', 'surfacing', 'rigging']
|
||
missing_types = [t for t in expected_types if t not in task_status]
|
||
if not missing_types:
|
||
print(f" ✅ All standard task types included in aggregation")
|
||
else:
|
||
print(f" ⚠️ Some standard task types missing: {missing_types}")
|
||
else:
|
||
print(" ⚠️ No assets with multiple tasks found for testing")
|
||
else:
|
||
print(" ❌ Cannot test aggregation - no asset data available")
|
||
all_tests_passed = False
|
||
|
||
# REQUIREMENT 3.1: Optimized SQL joins for single database round trip
|
||
print("\n3. TESTING REQUIREMENT 3.1:")
|
||
print(" 'WHEN querying assets, THE system SHALL use optimized SQL joins")
|
||
print(" to fetch task status data in a single database round trip'")
|
||
|
||
# Test performance to verify optimization
|
||
start_time = time.time()
|
||
response = requests.get(f'{BASE_URL}/assets/?project_id={project_id}&limit=50', headers=headers)
|
||
query_time = (time.time() - start_time) * 1000
|
||
|
||
if response.status_code == 200:
|
||
assets = response.json()
|
||
|
||
# Performance should be reasonable for optimized queries
|
||
if query_time < 5000: # 5 seconds is very generous for HTTP
|
||
print(f" ✅ Query performance acceptable: {query_time:.2f}ms")
|
||
else:
|
||
print(f" ⚠️ Query performance may need improvement: {query_time:.2f}ms")
|
||
|
||
# Verify data completeness (all assets should have task data)
|
||
assets_with_task_data = sum(1 for a in assets if 'task_status' in a and 'task_details' in a)
|
||
if assets_with_task_data == len(assets):
|
||
print(f" ✅ All {len(assets)} assets have complete task data")
|
||
else:
|
||
print(f" ❌ Only {assets_with_task_data}/{len(assets)} assets have complete task data")
|
||
all_tests_passed = False
|
||
else:
|
||
print(f" ❌ Performance test failed: {response.status_code}")
|
||
all_tests_passed = False
|
||
|
||
# ADDITIONAL TEST: Custom task status support (Requirement 2.4)
|
||
print("\n4. TESTING CUSTOM TASK STATUS SUPPORT:")
|
||
print(" 'WHEN custom task statuses exist for a project,")
|
||
print(" THE system SHALL include both default and custom status information'")
|
||
|
||
if assets:
|
||
sample_asset = assets[0]
|
||
task_status = sample_asset.get('task_status', {})
|
||
|
||
# Check for both standard and custom task types
|
||
standard_types = ['modeling', 'surfacing', 'rigging']
|
||
all_types = list(task_status.keys())
|
||
custom_types = [t for t in all_types if t not in standard_types]
|
||
|
||
print(f" ✅ Standard task types found: {[t for t in standard_types if t in all_types]}")
|
||
if custom_types:
|
||
print(f" ✅ Custom task types found: {custom_types}")
|
||
else:
|
||
print(f" ℹ️ No custom task types in this project")
|
||
|
||
print(f" ✅ Total task types supported: {len(all_types)}")
|
||
|
||
# ADDITIONAL TEST: Single asset retrieval optimization
|
||
print("\n5. TESTING SINGLE ASSET RETRIEVAL OPTIMIZATION:")
|
||
|
||
if assets:
|
||
asset_id = assets[0]['id']
|
||
start_time = time.time()
|
||
response = requests.get(f'{BASE_URL}/assets/{asset_id}', headers=headers)
|
||
query_time = (time.time() - start_time) * 1000
|
||
|
||
if response.status_code == 200:
|
||
asset = response.json()
|
||
if 'task_count' in asset:
|
||
print(f" ✅ Single asset retrieval optimized: {query_time:.2f}ms")
|
||
print(f" ✅ Task count included: {asset['task_count']}")
|
||
else:
|
||
print(" ❌ Task count not included in single asset response")
|
||
all_tests_passed = False
|
||
else:
|
||
print(f" ❌ Single asset retrieval failed: {response.status_code}")
|
||
all_tests_passed = False
|
||
|
||
# SUMMARY
|
||
print("\n" + "=" * 60)
|
||
if all_tests_passed:
|
||
print("✅ ALL REQUIREMENTS VALIDATION TESTS PASSED!")
|
||
print("✅ Asset router optimization successfully implemented")
|
||
else:
|
||
print("❌ SOME REQUIREMENTS VALIDATION TESTS FAILED!")
|
||
print("❌ Asset router optimization needs attention")
|
||
print("=" * 60)
|
||
|
||
return all_tests_passed
|
||
|
||
if __name__ == "__main__":
|
||
test_requirements_validation() |