310 lines
14 KiB
Python
310 lines
14 KiB
Python
"""
|
|
Requirement 3.4 Validation Test
|
|
|
|
This test specifically validates requirement 3.4:
|
|
"WHEN new task types are added to a project, THE system SHALL automatically include them in the aggregated task status queries"
|
|
|
|
This test demonstrates that the optimization system correctly handles dynamic task types
|
|
by automatically including them in aggregated queries without requiring system restarts
|
|
or manual configuration changes.
|
|
"""
|
|
|
|
import requests
|
|
import json
|
|
import time
|
|
from typing import Dict, List, Any
|
|
|
|
BASE_URL = "http://localhost:8000"
|
|
|
|
|
|
def login() -> str:
|
|
"""Login and return access token"""
|
|
response = requests.post(f"{BASE_URL}/auth/login", json={
|
|
"email": "admin@vfx.com",
|
|
"password": "admin123"
|
|
})
|
|
if response.status_code == 200:
|
|
return response.json()["access_token"]
|
|
else:
|
|
raise Exception(f"Login failed: {response.status_code} - {response.text}")
|
|
|
|
|
|
def create_test_project(token: str) -> int:
|
|
"""Create a test project and return its ID"""
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
|
|
project_data = {
|
|
"name": f"Requirement 3.4 Test Project {int(time.time())}",
|
|
"code_name": f"req_3_4_test_{int(time.time())}",
|
|
"client_name": "Test Client",
|
|
"project_type": "tv",
|
|
"description": "Test project for requirement 3.4 validation"
|
|
}
|
|
|
|
response = requests.post(f"{BASE_URL}/projects/", headers=headers, json=project_data)
|
|
if response.status_code == 201:
|
|
return response.json()["id"]
|
|
else:
|
|
raise Exception(f"Failed to create project: {response.status_code} - {response.text}")
|
|
|
|
|
|
def create_test_episode(token: str, project_id: int) -> int:
|
|
"""Create a test episode and return its ID"""
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
|
|
episode_data = {
|
|
"name": "Test Episode",
|
|
"episode_number": 1,
|
|
"description": "Test episode for requirement 3.4"
|
|
}
|
|
|
|
response = requests.post(f"{BASE_URL}/episodes/projects/{project_id}/episodes", headers=headers, json=episode_data)
|
|
if response.status_code == 201:
|
|
return response.json()["id"]
|
|
else:
|
|
raise Exception(f"Failed to create episode: {response.status_code} - {response.text}")
|
|
|
|
|
|
def create_test_shot(token: str, episode_id: int, shot_name: str) -> int:
|
|
"""Create a test shot and return its ID"""
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
|
|
shot_data = {
|
|
"name": shot_name,
|
|
"description": f"Test shot {shot_name}",
|
|
"frame_start": 1001,
|
|
"frame_end": 1100
|
|
}
|
|
|
|
response = requests.post(f"{BASE_URL}/shots/?episode_id={episode_id}", headers=headers, json=shot_data)
|
|
if response.status_code == 201:
|
|
return response.json()["id"]
|
|
else:
|
|
raise Exception(f"Failed to create shot: {response.status_code} - {response.text}")
|
|
|
|
|
|
def create_test_asset(token: str, project_id: int, asset_name: str) -> int:
|
|
"""Create a test asset and return its ID"""
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
|
|
asset_data = {
|
|
"name": asset_name,
|
|
"category": "characters",
|
|
"description": f"Test asset {asset_name}"
|
|
}
|
|
|
|
response = requests.post(f"{BASE_URL}/assets/?project_id={project_id}", headers=headers, json=asset_data)
|
|
if response.status_code == 201:
|
|
return response.json()["id"]
|
|
else:
|
|
raise Exception(f"Failed to create asset: {response.status_code} - {response.text}")
|
|
|
|
|
|
def add_custom_task_type(token: str, project_id: int, task_type: str, category: str) -> bool:
|
|
"""Add a custom task type to the project"""
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
|
|
data = {
|
|
"task_type": task_type,
|
|
"category": category
|
|
}
|
|
|
|
response = requests.post(f"{BASE_URL}/projects/{project_id}/custom-task-types", headers=headers, json=data)
|
|
return response.status_code in [201, 409] # 201 = created, 409 = already exists
|
|
|
|
|
|
def get_shots_with_task_status(token: str, project_id: int) -> List[Dict[str, Any]]:
|
|
"""Get shots with their task status information using optimized aggregated query"""
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
|
|
response = requests.get(f"{BASE_URL}/shots/?project_id={project_id}", headers=headers)
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
else:
|
|
raise Exception(f"Failed to get shots: {response.status_code} - {response.text}")
|
|
|
|
|
|
def get_assets_with_task_status(token: str, project_id: int) -> List[Dict[str, Any]]:
|
|
"""Get assets with their task status information using optimized aggregated query"""
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
|
|
response = requests.get(f"{BASE_URL}/assets/?project_id={project_id}", headers=headers)
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
else:
|
|
raise Exception(f"Failed to get assets: {response.status_code} - {response.text}")
|
|
|
|
|
|
def get_single_shot(token: str, shot_id: int) -> Dict[str, Any]:
|
|
"""Get a single shot with optimized task data"""
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
|
|
response = requests.get(f"{BASE_URL}/shots/{shot_id}", headers=headers)
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
else:
|
|
raise Exception(f"Failed to get shot: {response.status_code} - {response.text}")
|
|
|
|
|
|
def get_single_asset(token: str, asset_id: int) -> Dict[str, Any]:
|
|
"""Get a single asset with optimized task data"""
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
|
|
response = requests.get(f"{BASE_URL}/assets/{asset_id}", headers=headers)
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
else:
|
|
raise Exception(f"Failed to get asset: {response.status_code} - {response.text}")
|
|
|
|
|
|
def validate_requirement_3_4():
|
|
"""
|
|
Validate Requirement 3.4:
|
|
WHEN new task types are added to a project, THE system SHALL automatically include them in the aggregated task status queries
|
|
"""
|
|
print("=" * 100)
|
|
print("REQUIREMENT 3.4 VALIDATION TEST")
|
|
print("=" * 100)
|
|
print("Requirement: WHEN new task types are added to a project,")
|
|
print(" THE system SHALL automatically include them in the aggregated task status queries")
|
|
print("=" * 100)
|
|
|
|
try:
|
|
# Step 1: Setup test environment
|
|
print("\n1. Setting up test environment...")
|
|
token = login()
|
|
project_id = create_test_project(token)
|
|
episode_id = create_test_episode(token, project_id)
|
|
shot_id = create_test_shot(token, episode_id, "REQ34_001")
|
|
asset_id = create_test_asset(token, project_id, "Req34Character")
|
|
print(f"✓ Created project {project_id}, episode {episode_id}, shot {shot_id}, asset {asset_id}")
|
|
|
|
# Step 2: Verify baseline - only standard task types present
|
|
print("\n2. Verifying baseline state (standard task types only)...")
|
|
initial_shots = get_shots_with_task_status(token, project_id)
|
|
initial_assets = get_assets_with_task_status(token, project_id)
|
|
|
|
initial_shot_types = set(initial_shots[0].get('task_status', {}).keys()) if initial_shots else set()
|
|
initial_asset_types = set(initial_assets[0].get('task_status', {}).keys()) if initial_assets else set()
|
|
|
|
expected_shot_types = {"layout", "animation", "simulation", "lighting", "compositing"}
|
|
expected_asset_types = {"modeling", "surfacing", "rigging"}
|
|
|
|
baseline_shot_correct = initial_shot_types == expected_shot_types
|
|
baseline_asset_correct = initial_asset_types == expected_asset_types
|
|
|
|
print(f" Shot task types: {sorted(initial_shot_types)}")
|
|
print(f" Asset task types: {sorted(initial_asset_types)}")
|
|
print(f" Baseline shot types correct: {'✓' if baseline_shot_correct else '✗'}")
|
|
print(f" Baseline asset types correct: {'✓' if baseline_asset_correct else '✗'}")
|
|
|
|
# Step 3: Add new task types to the project
|
|
print("\n3. Adding new task types to the project...")
|
|
new_shot_type = "dynamic_fx"
|
|
new_asset_type = "dynamic_grooming"
|
|
|
|
shot_type_added = add_custom_task_type(token, project_id, new_shot_type, "shot")
|
|
asset_type_added = add_custom_task_type(token, project_id, new_asset_type, "asset")
|
|
|
|
print(f" Added shot task type '{new_shot_type}': {'✓' if shot_type_added else '✗'}")
|
|
print(f" Added asset task type '{new_asset_type}': {'✓' if asset_type_added else '✗'}")
|
|
|
|
# Step 4: Verify new task types are automatically included in aggregated queries
|
|
print("\n4. Verifying new task types are automatically included in aggregated queries...")
|
|
|
|
# Test list endpoints (aggregated queries)
|
|
updated_shots = get_shots_with_task_status(token, project_id)
|
|
updated_assets = get_assets_with_task_status(token, project_id)
|
|
|
|
updated_shot_types = set(updated_shots[0].get('task_status', {}).keys()) if updated_shots else set()
|
|
updated_asset_types = set(updated_assets[0].get('task_status', {}).keys()) if updated_assets else set()
|
|
|
|
# Test single item endpoints (also use optimized queries)
|
|
single_shot = get_single_shot(token, shot_id)
|
|
single_asset = get_single_asset(token, asset_id)
|
|
|
|
# Note: Single item endpoints don't return task_status, but they use optimized queries internally
|
|
|
|
# Verify new types are included
|
|
shot_type_included = new_shot_type in updated_shot_types
|
|
asset_type_included = new_asset_type in updated_asset_types
|
|
|
|
print(f" Updated shot task types: {sorted(updated_shot_types)}")
|
|
print(f" Updated asset task types: {sorted(updated_asset_types)}")
|
|
print(f" New shot type '{new_shot_type}' included: {'✓' if shot_type_included else '✗'}")
|
|
print(f" New asset type '{new_asset_type}' included: {'✓' if asset_type_included else '✗'}")
|
|
|
|
# Step 5: Verify all standard types are still present
|
|
print("\n5. Verifying standard task types are still present...")
|
|
|
|
standard_shot_types_present = expected_shot_types.issubset(updated_shot_types)
|
|
standard_asset_types_present = expected_asset_types.issubset(updated_asset_types)
|
|
|
|
print(f" All standard shot types present: {'✓' if standard_shot_types_present else '✗'}")
|
|
print(f" All standard asset types present: {'✓' if standard_asset_types_present else '✗'}")
|
|
|
|
# Step 6: Add another task type to verify continuous dynamic behavior
|
|
print("\n6. Adding another task type to verify continuous dynamic behavior...")
|
|
|
|
second_shot_type = "procedural_animation"
|
|
second_asset_type = "advanced_shading"
|
|
|
|
add_custom_task_type(token, project_id, second_shot_type, "shot")
|
|
add_custom_task_type(token, project_id, second_asset_type, "asset")
|
|
|
|
# Verify both new types are included
|
|
final_shots = get_shots_with_task_status(token, project_id)
|
|
final_assets = get_assets_with_task_status(token, project_id)
|
|
|
|
final_shot_types = set(final_shots[0].get('task_status', {}).keys()) if final_shots else set()
|
|
final_asset_types = set(final_assets[0].get('task_status', {}).keys()) if final_assets else set()
|
|
|
|
both_shot_types_included = new_shot_type in final_shot_types and second_shot_type in final_shot_types
|
|
both_asset_types_included = new_asset_type in final_asset_types and second_asset_type in final_asset_types
|
|
|
|
print(f" Final shot task types: {sorted(final_shot_types)}")
|
|
print(f" Final asset task types: {sorted(final_asset_types)}")
|
|
print(f" Both new shot types included: {'✓' if both_shot_types_included else '✗'}")
|
|
print(f" Both new asset types included: {'✓' if both_asset_types_included else '✗'}")
|
|
|
|
# Step 7: Validation Results
|
|
print("\n" + "=" * 100)
|
|
print("REQUIREMENT 3.4 VALIDATION RESULTS")
|
|
print("=" * 100)
|
|
|
|
validation_criteria = [
|
|
("Baseline state correct", baseline_shot_correct and baseline_asset_correct),
|
|
("New task types added successfully", shot_type_added and asset_type_added),
|
|
("New task types automatically included in aggregated queries", shot_type_included and asset_type_included),
|
|
("Standard task types preserved", standard_shot_types_present and standard_asset_types_present),
|
|
("Continuous dynamic behavior works", both_shot_types_included and both_asset_types_included)
|
|
]
|
|
|
|
passed_criteria = 0
|
|
total_criteria = len(validation_criteria)
|
|
|
|
for i, (criterion, passed) in enumerate(validation_criteria, 1):
|
|
status = "✓ PASS" if passed else "✗ FAIL"
|
|
print(f"{i}. {criterion}: {status}")
|
|
if passed:
|
|
passed_criteria += 1
|
|
|
|
print(f"\nOverall Validation Result: {passed_criteria}/{total_criteria} criteria passed")
|
|
|
|
if passed_criteria == total_criteria:
|
|
print("\n🎉 REQUIREMENT 3.4 VALIDATION SUCCESSFUL!")
|
|
print("The system correctly automatically includes new task types in aggregated task status queries.")
|
|
return True
|
|
else:
|
|
print("\n❌ REQUIREMENT 3.4 VALIDATION FAILED!")
|
|
print("The system does not properly handle dynamic task types in aggregated queries.")
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"\n❌ VALIDATION ERROR: {str(e)}")
|
|
return False
|
|
|
|
|
|
if __name__ == "__main__":
|
|
success = validate_requirement_3_4()
|
|
exit(0 if success else 1) |