180 lines
6.8 KiB
Python
180 lines
6.8 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Test script to verify the asset router optimization is working correctly.
|
|
This tests the single query pattern vs the old N+1 pattern.
|
|
"""
|
|
|
|
import requests
|
|
import json
|
|
import time
|
|
from typing import List, Dict
|
|
|
|
# Configuration
|
|
BASE_URL = "http://localhost:8000"
|
|
LOGIN_URL = f"{BASE_URL}/auth/login"
|
|
ASSETS_URL = f"{BASE_URL}/assets/"
|
|
|
|
def get_auth_headers():
|
|
"""Get authentication headers."""
|
|
login_data = {
|
|
"email": "admin@vfx.com",
|
|
"password": "admin123"
|
|
}
|
|
|
|
response = requests.post(LOGIN_URL, json=login_data)
|
|
if response.status_code != 200:
|
|
raise Exception(f"Login failed: {response.status_code}")
|
|
|
|
token_data = response.json()
|
|
token = token_data["access_token"]
|
|
return {"Authorization": f"Bearer {token}"}
|
|
|
|
def test_asset_optimization():
|
|
"""Test the optimized asset router."""
|
|
print("Testing Asset Router Optimization")
|
|
print("=" * 50)
|
|
|
|
try:
|
|
headers = get_auth_headers()
|
|
print("✅ Authentication successful")
|
|
|
|
# Get projects
|
|
projects_response = requests.get(f"{BASE_URL}/projects/", headers=headers)
|
|
projects = projects_response.json()
|
|
if not projects:
|
|
print("❌ No projects found")
|
|
return
|
|
|
|
project_id = projects[0]["id"]
|
|
print(f"✅ Using project ID: {project_id}")
|
|
|
|
# Test 1: Basic asset listing with task status
|
|
print("\n1. Testing basic asset listing with task status...")
|
|
start_time = time.time()
|
|
|
|
params = {"project_id": project_id, "limit": 50}
|
|
response = requests.get(ASSETS_URL, params=params, headers=headers)
|
|
|
|
end_time = time.time()
|
|
query_time = (end_time - start_time) * 1000 # Convert to milliseconds
|
|
|
|
if response.status_code != 200:
|
|
print(f"❌ Failed to get assets: {response.status_code}")
|
|
print(response.text)
|
|
return
|
|
|
|
assets = response.json()
|
|
print(f"✅ Retrieved {len(assets)} assets in {query_time:.2f}ms")
|
|
|
|
# Verify data structure
|
|
if assets:
|
|
asset = assets[0]
|
|
required_fields = ['id', 'name', 'category', 'task_count', 'task_status', 'task_details']
|
|
missing_fields = [field for field in required_fields if field not in asset]
|
|
|
|
if missing_fields:
|
|
print(f"❌ Missing fields in response: {missing_fields}")
|
|
return
|
|
|
|
print(f"✅ Response structure is correct")
|
|
print(f" Sample asset: {asset['name']}")
|
|
print(f" Task count: {asset['task_count']}")
|
|
print(f" Task status keys: {list(asset['task_status'].keys())}")
|
|
print(f" Task details count: {len(asset['task_details'])}")
|
|
|
|
# Test 2: Individual asset retrieval
|
|
print("\n2. Testing individual asset retrieval...")
|
|
if assets:
|
|
asset_id = assets[0]['id']
|
|
start_time = time.time()
|
|
|
|
response = requests.get(f"{ASSETS_URL}{asset_id}", headers=headers)
|
|
|
|
end_time = time.time()
|
|
query_time = (end_time - start_time) * 1000
|
|
|
|
if response.status_code == 200:
|
|
asset_detail = response.json()
|
|
print(f"✅ Retrieved asset details in {query_time:.2f}ms")
|
|
print(f" Asset: {asset_detail['name']}")
|
|
print(f" Task count: {asset_detail['task_count']}")
|
|
else:
|
|
print(f"❌ Failed to get asset details: {response.status_code}")
|
|
|
|
# Test 3: Task status filtering
|
|
print("\n3. Testing task status filtering...")
|
|
params = {
|
|
"project_id": project_id,
|
|
"task_status_filter": "modeling:not_started"
|
|
}
|
|
|
|
start_time = time.time()
|
|
response = requests.get(ASSETS_URL, params=params, headers=headers)
|
|
end_time = time.time()
|
|
query_time = (end_time - start_time) * 1000
|
|
|
|
if response.status_code == 200:
|
|
filtered_assets = response.json()
|
|
print(f"✅ Task status filtering works in {query_time:.2f}ms")
|
|
print(f" Found {len(filtered_assets)} assets with modeling:not_started")
|
|
else:
|
|
print(f"❌ Task status filtering failed: {response.status_code}")
|
|
|
|
# Test 4: Task status sorting
|
|
print("\n4. Testing task status sorting...")
|
|
params = {
|
|
"project_id": project_id,
|
|
"sort_by": "modeling_status",
|
|
"sort_direction": "desc"
|
|
}
|
|
|
|
start_time = time.time()
|
|
response = requests.get(ASSETS_URL, params=params, headers=headers)
|
|
end_time = time.time()
|
|
query_time = (end_time - start_time) * 1000
|
|
|
|
if response.status_code == 200:
|
|
sorted_assets = response.json()
|
|
print(f"✅ Task status sorting works in {query_time:.2f}ms")
|
|
print(f" Retrieved {len(sorted_assets)} sorted assets")
|
|
|
|
# Show first few assets to verify sorting
|
|
if len(sorted_assets) >= 3:
|
|
print(" Sorting verification (first 3 assets):")
|
|
for i, asset in enumerate(sorted_assets[:3]):
|
|
modeling_status = asset['task_status'].get('modeling', 'not_started')
|
|
print(f" {i+1}. {asset['name']}: modeling={modeling_status}")
|
|
else:
|
|
print(f"❌ Task status sorting failed: {response.status_code}")
|
|
|
|
# Test 5: Performance with larger dataset
|
|
print("\n5. Testing performance with larger dataset...")
|
|
params = {"project_id": project_id, "limit": 100}
|
|
|
|
start_time = time.time()
|
|
response = requests.get(ASSETS_URL, params=params, headers=headers)
|
|
end_time = time.time()
|
|
query_time = (end_time - start_time) * 1000
|
|
|
|
if response.status_code == 200:
|
|
large_assets = response.json()
|
|
print(f"✅ Large dataset query completed in {query_time:.2f}ms")
|
|
print(f" Retrieved {len(large_assets)} assets")
|
|
|
|
# Check if performance meets requirements (< 500ms for up to 100 assets)
|
|
if query_time < 500:
|
|
print(f"✅ Performance requirement met (< 500ms)")
|
|
else:
|
|
print(f"⚠️ Performance requirement not met ({query_time:.2f}ms >= 500ms)")
|
|
else:
|
|
print(f"❌ Large dataset query failed: {response.status_code}")
|
|
|
|
print("\n" + "=" * 50)
|
|
print("✅ Asset Router Optimization Tests Completed!")
|
|
print("=" * 50)
|
|
|
|
except Exception as e:
|
|
print(f"❌ Test failed with error: {e}")
|
|
|
|
if __name__ == "__main__":
|
|
test_asset_optimization() |