LinkDesk/backend/test_requirement_3_4_valida...

310 lines
14 KiB
Python

"""
Requirement 3.4 Validation Test
This test specifically validates requirement 3.4:
"WHEN new task types are added to a project, THE system SHALL automatically include them in the aggregated task status queries"
This test demonstrates that the optimization system correctly handles dynamic task types
by automatically including them in aggregated queries without requiring system restarts
or manual configuration changes.
"""
import requests
import json
import time
from typing import Dict, List, Any
BASE_URL = "http://localhost:8000"
def login() -> str:
"""Login and return access token"""
response = requests.post(f"{BASE_URL}/auth/login", json={
"email": "admin@vfx.com",
"password": "admin123"
})
if response.status_code == 200:
return response.json()["access_token"]
else:
raise Exception(f"Login failed: {response.status_code} - {response.text}")
def create_test_project(token: str) -> int:
"""Create a test project and return its ID"""
headers = {"Authorization": f"Bearer {token}"}
project_data = {
"name": f"Requirement 3.4 Test Project {int(time.time())}",
"code_name": f"req_3_4_test_{int(time.time())}",
"client_name": "Test Client",
"project_type": "tv",
"description": "Test project for requirement 3.4 validation"
}
response = requests.post(f"{BASE_URL}/projects/", headers=headers, json=project_data)
if response.status_code == 201:
return response.json()["id"]
else:
raise Exception(f"Failed to create project: {response.status_code} - {response.text}")
def create_test_episode(token: str, project_id: int) -> int:
"""Create a test episode and return its ID"""
headers = {"Authorization": f"Bearer {token}"}
episode_data = {
"name": "Test Episode",
"episode_number": 1,
"description": "Test episode for requirement 3.4"
}
response = requests.post(f"{BASE_URL}/episodes/projects/{project_id}/episodes", headers=headers, json=episode_data)
if response.status_code == 201:
return response.json()["id"]
else:
raise Exception(f"Failed to create episode: {response.status_code} - {response.text}")
def create_test_shot(token: str, episode_id: int, shot_name: str) -> int:
"""Create a test shot and return its ID"""
headers = {"Authorization": f"Bearer {token}"}
shot_data = {
"name": shot_name,
"description": f"Test shot {shot_name}",
"frame_start": 1001,
"frame_end": 1100
}
response = requests.post(f"{BASE_URL}/shots/?episode_id={episode_id}", headers=headers, json=shot_data)
if response.status_code == 201:
return response.json()["id"]
else:
raise Exception(f"Failed to create shot: {response.status_code} - {response.text}")
def create_test_asset(token: str, project_id: int, asset_name: str) -> int:
"""Create a test asset and return its ID"""
headers = {"Authorization": f"Bearer {token}"}
asset_data = {
"name": asset_name,
"category": "characters",
"description": f"Test asset {asset_name}"
}
response = requests.post(f"{BASE_URL}/assets/?project_id={project_id}", headers=headers, json=asset_data)
if response.status_code == 201:
return response.json()["id"]
else:
raise Exception(f"Failed to create asset: {response.status_code} - {response.text}")
def add_custom_task_type(token: str, project_id: int, task_type: str, category: str) -> bool:
"""Add a custom task type to the project"""
headers = {"Authorization": f"Bearer {token}"}
data = {
"task_type": task_type,
"category": category
}
response = requests.post(f"{BASE_URL}/projects/{project_id}/custom-task-types", headers=headers, json=data)
return response.status_code in [201, 409] # 201 = created, 409 = already exists
def get_shots_with_task_status(token: str, project_id: int) -> List[Dict[str, Any]]:
"""Get shots with their task status information using optimized aggregated query"""
headers = {"Authorization": f"Bearer {token}"}
response = requests.get(f"{BASE_URL}/shots/?project_id={project_id}", headers=headers)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to get shots: {response.status_code} - {response.text}")
def get_assets_with_task_status(token: str, project_id: int) -> List[Dict[str, Any]]:
"""Get assets with their task status information using optimized aggregated query"""
headers = {"Authorization": f"Bearer {token}"}
response = requests.get(f"{BASE_URL}/assets/?project_id={project_id}", headers=headers)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to get assets: {response.status_code} - {response.text}")
def get_single_shot(token: str, shot_id: int) -> Dict[str, Any]:
"""Get a single shot with optimized task data"""
headers = {"Authorization": f"Bearer {token}"}
response = requests.get(f"{BASE_URL}/shots/{shot_id}", headers=headers)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to get shot: {response.status_code} - {response.text}")
def get_single_asset(token: str, asset_id: int) -> Dict[str, Any]:
"""Get a single asset with optimized task data"""
headers = {"Authorization": f"Bearer {token}"}
response = requests.get(f"{BASE_URL}/assets/{asset_id}", headers=headers)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to get asset: {response.status_code} - {response.text}")
def validate_requirement_3_4():
"""
Validate Requirement 3.4:
WHEN new task types are added to a project, THE system SHALL automatically include them in the aggregated task status queries
"""
print("=" * 100)
print("REQUIREMENT 3.4 VALIDATION TEST")
print("=" * 100)
print("Requirement: WHEN new task types are added to a project,")
print(" THE system SHALL automatically include them in the aggregated task status queries")
print("=" * 100)
try:
# Step 1: Setup test environment
print("\n1. Setting up test environment...")
token = login()
project_id = create_test_project(token)
episode_id = create_test_episode(token, project_id)
shot_id = create_test_shot(token, episode_id, "REQ34_001")
asset_id = create_test_asset(token, project_id, "Req34Character")
print(f"✓ Created project {project_id}, episode {episode_id}, shot {shot_id}, asset {asset_id}")
# Step 2: Verify baseline - only standard task types present
print("\n2. Verifying baseline state (standard task types only)...")
initial_shots = get_shots_with_task_status(token, project_id)
initial_assets = get_assets_with_task_status(token, project_id)
initial_shot_types = set(initial_shots[0].get('task_status', {}).keys()) if initial_shots else set()
initial_asset_types = set(initial_assets[0].get('task_status', {}).keys()) if initial_assets else set()
expected_shot_types = {"layout", "animation", "simulation", "lighting", "compositing"}
expected_asset_types = {"modeling", "surfacing", "rigging"}
baseline_shot_correct = initial_shot_types == expected_shot_types
baseline_asset_correct = initial_asset_types == expected_asset_types
print(f" Shot task types: {sorted(initial_shot_types)}")
print(f" Asset task types: {sorted(initial_asset_types)}")
print(f" Baseline shot types correct: {'' if baseline_shot_correct else ''}")
print(f" Baseline asset types correct: {'' if baseline_asset_correct else ''}")
# Step 3: Add new task types to the project
print("\n3. Adding new task types to the project...")
new_shot_type = "dynamic_fx"
new_asset_type = "dynamic_grooming"
shot_type_added = add_custom_task_type(token, project_id, new_shot_type, "shot")
asset_type_added = add_custom_task_type(token, project_id, new_asset_type, "asset")
print(f" Added shot task type '{new_shot_type}': {'' if shot_type_added else ''}")
print(f" Added asset task type '{new_asset_type}': {'' if asset_type_added else ''}")
# Step 4: Verify new task types are automatically included in aggregated queries
print("\n4. Verifying new task types are automatically included in aggregated queries...")
# Test list endpoints (aggregated queries)
updated_shots = get_shots_with_task_status(token, project_id)
updated_assets = get_assets_with_task_status(token, project_id)
updated_shot_types = set(updated_shots[0].get('task_status', {}).keys()) if updated_shots else set()
updated_asset_types = set(updated_assets[0].get('task_status', {}).keys()) if updated_assets else set()
# Test single item endpoints (also use optimized queries)
single_shot = get_single_shot(token, shot_id)
single_asset = get_single_asset(token, asset_id)
# Note: Single item endpoints don't return task_status, but they use optimized queries internally
# Verify new types are included
shot_type_included = new_shot_type in updated_shot_types
asset_type_included = new_asset_type in updated_asset_types
print(f" Updated shot task types: {sorted(updated_shot_types)}")
print(f" Updated asset task types: {sorted(updated_asset_types)}")
print(f" New shot type '{new_shot_type}' included: {'' if shot_type_included else ''}")
print(f" New asset type '{new_asset_type}' included: {'' if asset_type_included else ''}")
# Step 5: Verify all standard types are still present
print("\n5. Verifying standard task types are still present...")
standard_shot_types_present = expected_shot_types.issubset(updated_shot_types)
standard_asset_types_present = expected_asset_types.issubset(updated_asset_types)
print(f" All standard shot types present: {'' if standard_shot_types_present else ''}")
print(f" All standard asset types present: {'' if standard_asset_types_present else ''}")
# Step 6: Add another task type to verify continuous dynamic behavior
print("\n6. Adding another task type to verify continuous dynamic behavior...")
second_shot_type = "procedural_animation"
second_asset_type = "advanced_shading"
add_custom_task_type(token, project_id, second_shot_type, "shot")
add_custom_task_type(token, project_id, second_asset_type, "asset")
# Verify both new types are included
final_shots = get_shots_with_task_status(token, project_id)
final_assets = get_assets_with_task_status(token, project_id)
final_shot_types = set(final_shots[0].get('task_status', {}).keys()) if final_shots else set()
final_asset_types = set(final_assets[0].get('task_status', {}).keys()) if final_assets else set()
both_shot_types_included = new_shot_type in final_shot_types and second_shot_type in final_shot_types
both_asset_types_included = new_asset_type in final_asset_types and second_asset_type in final_asset_types
print(f" Final shot task types: {sorted(final_shot_types)}")
print(f" Final asset task types: {sorted(final_asset_types)}")
print(f" Both new shot types included: {'' if both_shot_types_included else ''}")
print(f" Both new asset types included: {'' if both_asset_types_included else ''}")
# Step 7: Validation Results
print("\n" + "=" * 100)
print("REQUIREMENT 3.4 VALIDATION RESULTS")
print("=" * 100)
validation_criteria = [
("Baseline state correct", baseline_shot_correct and baseline_asset_correct),
("New task types added successfully", shot_type_added and asset_type_added),
("New task types automatically included in aggregated queries", shot_type_included and asset_type_included),
("Standard task types preserved", standard_shot_types_present and standard_asset_types_present),
("Continuous dynamic behavior works", both_shot_types_included and both_asset_types_included)
]
passed_criteria = 0
total_criteria = len(validation_criteria)
for i, (criterion, passed) in enumerate(validation_criteria, 1):
status = "✓ PASS" if passed else "✗ FAIL"
print(f"{i}. {criterion}: {status}")
if passed:
passed_criteria += 1
print(f"\nOverall Validation Result: {passed_criteria}/{total_criteria} criteria passed")
if passed_criteria == total_criteria:
print("\n🎉 REQUIREMENT 3.4 VALIDATION SUCCESSFUL!")
print("The system correctly automatically includes new task types in aggregated task status queries.")
return True
else:
print("\n❌ REQUIREMENT 3.4 VALIDATION FAILED!")
print("The system does not properly handle dynamic task types in aggregated queries.")
return False
except Exception as e:
print(f"\n❌ VALIDATION ERROR: {str(e)}")
return False
if __name__ == "__main__":
success = validate_requirement_3_4()
exit(0 if success else 1)