LinkDesk/backend/test_comprehensive_custom_s...

709 lines
25 KiB
Python

#!/usr/bin/env python3
"""
Comprehensive test to verify that the optimized shot and asset routers fully support custom task statuses.
This test covers all requirements from task 4:
- Ensure optimized queries include both default and custom task statuses
- Test with projects that have custom task statuses defined
- Verify aggregated data includes all status types
- Test edge cases like empty custom statuses, malformed JSON, etc.
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
import json
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from database import Base
from models.user import User, UserRole
from models.project import Project, ProjectType, ProjectStatus, ProjectMember
from models.task import Task
from models.asset import Asset, AssetCategory, AssetStatus
from models.episode import Episode, EpisodeStatus
from models.shot import Shot, ShotStatus
# Create test database
TEST_DB = "test_comprehensive_custom_status.db"
if os.path.exists(TEST_DB):
try:
os.remove(TEST_DB)
except PermissionError:
pass
engine = create_engine(f"sqlite:///{TEST_DB}")
Base.metadata.create_all(engine)
SessionLocal = sessionmaker(bind=engine)
def setup_comprehensive_test_data():
"""Set up comprehensive test data covering all edge cases."""
print("Setting up comprehensive test data...")
db = SessionLocal()
try:
# Create test user
user = User(
email="coordinator@test.com",
password_hash="test",
first_name="Test",
last_name="Coordinator",
role=UserRole.COORDINATOR,
is_approved=True
)
db.add(user)
db.commit()
# Test Case 1: Project with comprehensive custom statuses
comprehensive_custom = [
{
"id": "custom_todo",
"name": "To Do",
"color": "#FF0000",
"order": 0,
"is_default": True
},
{
"id": "custom_in_review",
"name": "In Review",
"color": "#FFA500",
"order": 1,
"is_default": False
},
{
"id": "custom_approved_final",
"name": "Final Approved",
"color": "#00FF00",
"order": 2,
"is_default": False
},
{
"id": "custom_on_hold",
"name": "On Hold",
"color": "#808080",
"order": 3,
"is_default": False
}
]
project1 = Project(
name="Comprehensive Custom Status Project",
code_name="CCSP",
client_name="Test Client",
project_type=ProjectType.CINEMA,
status=ProjectStatus.IN_PROGRESS,
custom_task_statuses=json.dumps(comprehensive_custom)
)
db.add(project1)
# Test Case 2: Project with empty custom statuses (should use defaults)
project2 = Project(
name="Empty Custom Status Project",
code_name="ECSP",
client_name="Test Client",
project_type=ProjectType.TV,
status=ProjectStatus.IN_PROGRESS,
custom_task_statuses=json.dumps([])
)
db.add(project2)
# Test Case 3: Project with NULL custom statuses
project3 = Project(
name="Null Custom Status Project",
code_name="NCSP",
client_name="Test Client",
project_type=ProjectType.CINEMA,
status=ProjectStatus.IN_PROGRESS,
custom_task_statuses=None
)
db.add(project3)
# Test Case 4: Project with malformed JSON (should fallback gracefully)
project4 = Project(
name="Malformed JSON Project",
code_name="MJP",
client_name="Test Client",
project_type=ProjectType.TV,
status=ProjectStatus.IN_PROGRESS,
custom_task_statuses='{"invalid": json}' # Malformed JSON
)
db.add(project4)
db.commit()
# Add user as member of all projects
for project in [project1, project2, project3, project4]:
member = ProjectMember(
user_id=user.id,
project_id=project.id
)
db.add(member)
db.commit()
# Create episodes for each project
episodes = []
for i, project in enumerate([project1, project2, project3, project4], 1):
episode = Episode(
project_id=project.id,
name=f"Episode {i}",
episode_number=i,
status=EpisodeStatus.IN_PROGRESS
)
db.add(episode)
episodes.append(episode)
db.commit()
# Create shots and assets with comprehensive status coverage
# Project 1: Mix of all custom statuses and default statuses
shot1 = Shot(
project_id=project1.id,
episode_id=episodes[0].id,
name="comprehensive_shot_001",
frame_start=1001,
frame_end=1100,
status=ShotStatus.IN_PROGRESS
)
db.add(shot1)
db.flush()
# Tasks covering all custom statuses plus some defaults
task_statuses_p1 = [
("layout", "custom_todo"),
("animation", "custom_in_review"),
("lighting", "custom_approved_final"),
("compositing", "custom_on_hold"),
("simulation", "in_progress") # Default status mixed in
]
for task_type, status in task_statuses_p1:
task = Task(
project_id=project1.id,
episode_id=episodes[0].id,
shot_id=shot1.id,
name=f"{shot1.name}_{task_type}",
task_type=task_type,
status=status
)
db.add(task)
# Asset for project 1
asset1 = Asset(
project_id=project1.id,
name="comprehensive_character",
category=AssetCategory.CHARACTERS,
status=AssetStatus.IN_PROGRESS
)
db.add(asset1)
db.flush()
asset_task_statuses_p1 = [
("modeling", "custom_todo"),
("surfacing", "custom_approved_final"),
("rigging", "approved") # Default status
]
for task_type, status in asset_task_statuses_p1:
task = Task(
project_id=project1.id,
asset_id=asset1.id,
name=f"{asset1.name}_{task_type}",
task_type=task_type,
status=status
)
db.add(task)
# Project 2: Only default statuses (empty custom statuses)
shot2 = Shot(
project_id=project2.id,
episode_id=episodes[1].id,
name="empty_custom_shot_001",
frame_start=1001,
frame_end=1100,
status=ShotStatus.IN_PROGRESS
)
db.add(shot2)
db.flush()
# Only default statuses
task_statuses_p2 = [
("layout", "not_started"),
("animation", "in_progress"),
("lighting", "submitted"),
("compositing", "approved")
]
for task_type, status in task_statuses_p2:
task = Task(
project_id=project2.id,
episode_id=episodes[1].id,
shot_id=shot2.id,
name=f"{shot2.name}_{task_type}",
task_type=task_type,
status=status
)
db.add(task)
# Asset for project 2
asset2 = Asset(
project_id=project2.id,
name="empty_custom_prop",
category=AssetCategory.PROPS,
status=AssetStatus.IN_PROGRESS
)
db.add(asset2)
db.flush()
asset_task_statuses_p2 = [
("modeling", "not_started"),
("surfacing", "retake")
]
for task_type, status in asset_task_statuses_p2:
task = Task(
project_id=project2.id,
asset_id=asset2.id,
name=f"{asset2.name}_{task_type}",
task_type=task_type,
status=status
)
db.add(task)
# Project 3: NULL custom statuses (should use defaults)
shot3 = Shot(
project_id=project3.id,
episode_id=episodes[2].id,
name="null_custom_shot_001",
frame_start=1001,
frame_end=1100,
status=ShotStatus.IN_PROGRESS
)
db.add(shot3)
db.flush()
# Only default statuses
task_statuses_p3 = [
("layout", "not_started"),
("animation", "submitted")
]
for task_type, status in task_statuses_p3:
task = Task(
project_id=project3.id,
episode_id=episodes[2].id,
shot_id=shot3.id,
name=f"{shot3.name}_{task_type}",
task_type=task_type,
status=status
)
db.add(task)
# Project 4: Malformed JSON (should fallback to defaults)
shot4 = Shot(
project_id=project4.id,
episode_id=episodes[3].id,
name="malformed_shot_001",
frame_start=1001,
frame_end=1100,
status=ShotStatus.IN_PROGRESS
)
db.add(shot4)
db.flush()
# Only default statuses (since custom status parsing should fail gracefully)
task_statuses_p4 = [
("layout", "approved"),
("animation", "retake")
]
for task_type, status in task_statuses_p4:
task = Task(
project_id=project4.id,
episode_id=episodes[3].id,
shot_id=shot4.id,
name=f"{shot4.name}_{task_type}",
task_type=task_type,
status=status
)
db.add(task)
db.commit()
print("✅ Comprehensive test data setup complete")
return user, [project1, project2, project3, project4], episodes
except Exception as e:
db.rollback()
print(f"❌ Failed to setup comprehensive test data: {e}")
raise
finally:
db.close()
def test_comprehensive_custom_status_support():
"""Test comprehensive custom status support in optimized queries."""
print("\n=== Test 1: Comprehensive Custom Status Support ===")
db = SessionLocal()
try:
user = db.query(User).first()
projects = db.query(Project).all()
from routers.shots import list_shots
from routers.assets import list_assets
import asyncio
# Test each project's shots
for project in projects:
episode = db.query(Episode).filter(Episode.project_id == project.id).first()
print(f"\n--- Testing Project: {project.name} ---")
print(f"Custom statuses: {project.custom_task_statuses}")
# Test shots
async def get_project_shots():
return await list_shots(
episode_id=episode.id,
project_id=project.id,
task_status_filter=None,
sort_by=None,
sort_direction="asc",
skip=0,
limit=100,
db=db,
current_user=user
)
shots = asyncio.run(get_project_shots())
for shot in shots:
print(f"Shot: {shot.name}")
print(f" Task Status: {shot.task_status}")
print(f" Task Details: {len(shot.task_details)} tasks")
# Verify all task statuses are valid
for task_type, status in shot.task_status.items():
# Should be either a default status or a valid custom status for this project
is_default = status in ['not_started', 'in_progress', 'submitted', 'approved', 'retake']
is_custom = False
if project.custom_task_statuses:
try:
custom_statuses = json.loads(project.custom_task_statuses)
if isinstance(custom_statuses, list):
custom_status_ids = [cs.get('id') for cs in custom_statuses if isinstance(cs, dict)]
is_custom = status in custom_status_ids
except (json.JSONDecodeError, TypeError):
pass # Malformed JSON, should fallback to defaults
assert is_default or is_custom, f"Invalid status '{status}' for project {project.name}"
print(f" ✅ Status '{status}' is valid ({'default' if is_default else 'custom'})")
# Verify task_details match task_status
for task_detail in shot.task_details:
expected_status = shot.task_status.get(task_detail.task_type)
assert task_detail.status == expected_status, f"Task detail status mismatch: {task_detail.status} != {expected_status}"
# Test assets
async def get_project_assets():
return await list_assets(
project_id=project.id,
category=None,
task_status_filter=None,
sort_by=None,
sort_direction="asc",
skip=0,
limit=100,
db=db,
current_user=user
)
assets = asyncio.run(get_project_assets())
for asset in assets:
print(f"Asset: {asset.name}")
print(f" Task Status: {asset.task_status}")
print(f" Task Details: {len(asset.task_details)} tasks")
# Verify all task statuses are valid
for task_type, status in asset.task_status.items():
# Should be either a default status or a valid custom status for this project
is_default = status in ['not_started', 'in_progress', 'submitted', 'approved', 'retake']
is_custom = False
if project.custom_task_statuses:
try:
custom_statuses = json.loads(project.custom_task_statuses)
if isinstance(custom_statuses, list):
custom_status_ids = [cs.get('id') for cs in custom_statuses if isinstance(cs, dict)]
is_custom = status in custom_status_ids
except (json.JSONDecodeError, TypeError):
pass # Malformed JSON, should fallback to defaults
assert is_default or is_custom, f"Invalid status '{status}' for project {project.name}"
print(f" ✅ Status '{status}' is valid ({'default' if is_default else 'custom'})")
print("✅ Test 1 PASSED: Comprehensive custom status support works correctly")
return True
except Exception as e:
print(f"❌ Test 1 FAILED: {e}")
import traceback
traceback.print_exc()
return False
finally:
db.close()
def test_custom_status_aggregation_completeness():
"""Test that aggregated data includes ALL status types (default + custom)."""
print("\n=== Test 2: Custom Status Aggregation Completeness ===")
db = SessionLocal()
try:
user = db.query(User).first()
# Get the comprehensive project (project 1)
project = db.query(Project).filter(Project.name == "Comprehensive Custom Status Project").first()
episode = db.query(Episode).filter(Episode.project_id == project.id).first()
from routers.shots import list_shots
from routers.assets import list_assets
import asyncio
# Test shots aggregation completeness
async def get_shots():
return await list_shots(
episode_id=episode.id,
project_id=project.id,
task_status_filter=None,
sort_by=None,
sort_direction="asc",
skip=0,
limit=100,
db=db,
current_user=user
)
shots = asyncio.run(get_shots())
print(f"Testing aggregation completeness for project: {project.name}")
# Verify that all custom statuses are present in aggregated data
expected_custom_statuses = {'custom_todo', 'custom_in_review', 'custom_approved_final', 'custom_on_hold'}
expected_default_statuses = {'in_progress'} # We know we have this one
found_custom_statuses = set()
found_default_statuses = set()
for shot in shots:
print(f"Shot: {shot.name}")
# Check task_status dict
for task_type, status in shot.task_status.items():
if status in expected_custom_statuses:
found_custom_statuses.add(status)
elif status in ['not_started', 'in_progress', 'submitted', 'approved', 'retake']:
found_default_statuses.add(status)
# Check task_details
for task_detail in shot.task_details:
if task_detail.status in expected_custom_statuses:
found_custom_statuses.add(task_detail.status)
elif task_detail.status in ['not_started', 'in_progress', 'submitted', 'approved', 'retake']:
found_default_statuses.add(task_detail.status)
print(f"Found custom statuses: {found_custom_statuses}")
print(f"Found default statuses: {found_default_statuses}")
# Verify we found all expected custom statuses
missing_custom = expected_custom_statuses - found_custom_statuses
assert len(missing_custom) == 0, f"Missing custom statuses in aggregation: {missing_custom}"
# Verify we found expected default statuses
missing_default = expected_default_statuses - found_default_statuses
assert len(missing_default) == 0, f"Missing default statuses in aggregation: {missing_default}"
# Test assets aggregation completeness
async def get_assets():
return await list_assets(
project_id=project.id,
category=None,
task_status_filter=None,
sort_by=None,
sort_direction="asc",
skip=0,
limit=100,
db=db,
current_user=user
)
assets = asyncio.run(get_assets())
asset_found_custom = set()
asset_found_default = set()
for asset in assets:
print(f"Asset: {asset.name}")
# Check task_status dict
for task_type, status in asset.task_status.items():
if status in expected_custom_statuses:
asset_found_custom.add(status)
elif status in ['not_started', 'in_progress', 'submitted', 'approved', 'retake']:
asset_found_default.add(status)
# Check task_details
for task_detail in asset.task_details:
if task_detail.status in expected_custom_statuses:
asset_found_custom.add(task_detail.status)
elif task_detail.status in ['not_started', 'in_progress', 'submitted', 'approved', 'retake']:
asset_found_default.add(task_detail.status)
print(f"Asset found custom statuses: {asset_found_custom}")
print(f"Asset found default statuses: {asset_found_default}")
# Verify assets have both custom and default statuses
assert len(asset_found_custom) > 0, "No custom statuses found in asset aggregation"
assert len(asset_found_default) > 0, "No default statuses found in asset aggregation"
print("✅ Test 2 PASSED: Custom status aggregation completeness verified")
return True
except Exception as e:
print(f"❌ Test 2 FAILED: {e}")
import traceback
traceback.print_exc()
return False
finally:
db.close()
def test_edge_case_handling():
"""Test edge cases like NULL, empty, and malformed custom statuses."""
print("\n=== Test 3: Edge Case Handling ===")
db = SessionLocal()
try:
user = db.query(User).first()
from routers.shots import list_shots
import asyncio
# Test projects with edge cases
edge_case_projects = [
("Empty Custom Status Project", "empty custom statuses"),
("Null Custom Status Project", "NULL custom statuses"),
("Malformed JSON Project", "malformed JSON custom statuses")
]
for project_name, description in edge_case_projects:
project = db.query(Project).filter(Project.name == project_name).first()
episode = db.query(Episode).filter(Episode.project_id == project.id).first()
print(f"\n--- Testing {description}: {project.name} ---")
print(f"Custom statuses: {project.custom_task_statuses}")
async def get_edge_case_shots():
return await list_shots(
episode_id=episode.id,
project_id=project.id,
task_status_filter=None,
sort_by=None,
sort_direction="asc",
skip=0,
limit=100,
db=db,
current_user=user
)
shots = asyncio.run(get_edge_case_shots())
for shot in shots:
print(f"Shot: {shot.name}")
print(f" Task Status: {shot.task_status}")
# Verify that all statuses are valid defaults (since custom statuses should be empty/invalid)
for task_type, status in shot.task_status.items():
is_default = status in ['not_started', 'in_progress', 'submitted', 'approved', 'retake']
assert is_default, f"Expected default status, got '{status}' for edge case project {project.name}"
print(f" ✅ Status '{status}' is valid default status")
# Verify task_details are consistent
for task_detail in shot.task_details:
is_default = task_detail.status in ['not_started', 'in_progress', 'submitted', 'approved', 'retake']
assert is_default, f"Expected default status in task details, got '{task_detail.status}'"
print("✅ Test 3 PASSED: Edge case handling works correctly")
return True
except Exception as e:
print(f"❌ Test 3 FAILED: {e}")
import traceback
traceback.print_exc()
return False
finally:
db.close()
def main():
"""Run all comprehensive custom status tests."""
print("🚀 Starting Comprehensive Custom Status Verification Tests")
print("=" * 70)
try:
# Setup test data
setup_comprehensive_test_data()
# Run tests
tests = [
test_comprehensive_custom_status_support,
test_custom_status_aggregation_completeness,
test_edge_case_handling
]
passed = 0
failed = 0
for test in tests:
try:
if test():
passed += 1
else:
failed += 1
except Exception as e:
print(f"❌ Test failed with exception: {e}")
failed += 1
print("\n" + "=" * 70)
print(f"📊 Test Results: {passed} passed, {failed} failed")
if failed == 0:
print("🎉 All comprehensive tests passed!")
print("✅ Custom status support is fully implemented and working correctly.")
print("✅ Optimized queries include both default and custom task statuses.")
print("✅ Projects with custom task statuses are properly supported.")
print("✅ Aggregated data includes all status types.")
return True
else:
print("❌ Some tests failed. Custom status support needs fixes.")
return False
except Exception as e:
print(f"❌ Test setup failed: {e}")
import traceback
traceback.print_exc()
return False
finally:
# Cleanup
try:
if os.path.exists(TEST_DB):
os.remove(TEST_DB)
except PermissionError:
pass
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)