LinkDesk/backend/test_asset_optimization.py

180 lines
6.8 KiB
Python

#!/usr/bin/env python3
"""
Test script to verify the asset router optimization is working correctly.
This tests the single query pattern vs the old N+1 pattern.
"""
import requests
import json
import time
from typing import List, Dict
# Configuration
BASE_URL = "http://localhost:8000"
LOGIN_URL = f"{BASE_URL}/auth/login"
ASSETS_URL = f"{BASE_URL}/assets/"
def get_auth_headers():
"""Get authentication headers."""
login_data = {
"email": "admin@vfx.com",
"password": "admin123"
}
response = requests.post(LOGIN_URL, json=login_data)
if response.status_code != 200:
raise Exception(f"Login failed: {response.status_code}")
token_data = response.json()
token = token_data["access_token"]
return {"Authorization": f"Bearer {token}"}
def test_asset_optimization():
"""Test the optimized asset router."""
print("Testing Asset Router Optimization")
print("=" * 50)
try:
headers = get_auth_headers()
print("✅ Authentication successful")
# Get projects
projects_response = requests.get(f"{BASE_URL}/projects/", headers=headers)
projects = projects_response.json()
if not projects:
print("❌ No projects found")
return
project_id = projects[0]["id"]
print(f"✅ Using project ID: {project_id}")
# Test 1: Basic asset listing with task status
print("\n1. Testing basic asset listing with task status...")
start_time = time.time()
params = {"project_id": project_id, "limit": 50}
response = requests.get(ASSETS_URL, params=params, headers=headers)
end_time = time.time()
query_time = (end_time - start_time) * 1000 # Convert to milliseconds
if response.status_code != 200:
print(f"❌ Failed to get assets: {response.status_code}")
print(response.text)
return
assets = response.json()
print(f"✅ Retrieved {len(assets)} assets in {query_time:.2f}ms")
# Verify data structure
if assets:
asset = assets[0]
required_fields = ['id', 'name', 'category', 'task_count', 'task_status', 'task_details']
missing_fields = [field for field in required_fields if field not in asset]
if missing_fields:
print(f"❌ Missing fields in response: {missing_fields}")
return
print(f"✅ Response structure is correct")
print(f" Sample asset: {asset['name']}")
print(f" Task count: {asset['task_count']}")
print(f" Task status keys: {list(asset['task_status'].keys())}")
print(f" Task details count: {len(asset['task_details'])}")
# Test 2: Individual asset retrieval
print("\n2. Testing individual asset retrieval...")
if assets:
asset_id = assets[0]['id']
start_time = time.time()
response = requests.get(f"{ASSETS_URL}{asset_id}", headers=headers)
end_time = time.time()
query_time = (end_time - start_time) * 1000
if response.status_code == 200:
asset_detail = response.json()
print(f"✅ Retrieved asset details in {query_time:.2f}ms")
print(f" Asset: {asset_detail['name']}")
print(f" Task count: {asset_detail['task_count']}")
else:
print(f"❌ Failed to get asset details: {response.status_code}")
# Test 3: Task status filtering
print("\n3. Testing task status filtering...")
params = {
"project_id": project_id,
"task_status_filter": "modeling:not_started"
}
start_time = time.time()
response = requests.get(ASSETS_URL, params=params, headers=headers)
end_time = time.time()
query_time = (end_time - start_time) * 1000
if response.status_code == 200:
filtered_assets = response.json()
print(f"✅ Task status filtering works in {query_time:.2f}ms")
print(f" Found {len(filtered_assets)} assets with modeling:not_started")
else:
print(f"❌ Task status filtering failed: {response.status_code}")
# Test 4: Task status sorting
print("\n4. Testing task status sorting...")
params = {
"project_id": project_id,
"sort_by": "modeling_status",
"sort_direction": "desc"
}
start_time = time.time()
response = requests.get(ASSETS_URL, params=params, headers=headers)
end_time = time.time()
query_time = (end_time - start_time) * 1000
if response.status_code == 200:
sorted_assets = response.json()
print(f"✅ Task status sorting works in {query_time:.2f}ms")
print(f" Retrieved {len(sorted_assets)} sorted assets")
# Show first few assets to verify sorting
if len(sorted_assets) >= 3:
print(" Sorting verification (first 3 assets):")
for i, asset in enumerate(sorted_assets[:3]):
modeling_status = asset['task_status'].get('modeling', 'not_started')
print(f" {i+1}. {asset['name']}: modeling={modeling_status}")
else:
print(f"❌ Task status sorting failed: {response.status_code}")
# Test 5: Performance with larger dataset
print("\n5. Testing performance with larger dataset...")
params = {"project_id": project_id, "limit": 100}
start_time = time.time()
response = requests.get(ASSETS_URL, params=params, headers=headers)
end_time = time.time()
query_time = (end_time - start_time) * 1000
if response.status_code == 200:
large_assets = response.json()
print(f"✅ Large dataset query completed in {query_time:.2f}ms")
print(f" Retrieved {len(large_assets)} assets")
# Check if performance meets requirements (< 500ms for up to 100 assets)
if query_time < 500:
print(f"✅ Performance requirement met (< 500ms)")
else:
print(f"⚠️ Performance requirement not met ({query_time:.2f}ms >= 500ms)")
else:
print(f"❌ Large dataset query failed: {response.status_code}")
print("\n" + "=" * 50)
print("✅ Asset Router Optimization Tests Completed!")
print("=" * 50)
except Exception as e:
print(f"❌ Test failed with error: {e}")
if __name__ == "__main__":
test_asset_optimization()