#!/usr/bin/env python3 """ Comprehensive test to verify that the optimized shot and asset routers fully support custom task statuses. This test covers all requirements from task 4: - Ensure optimized queries include both default and custom task statuses - Test with projects that have custom task statuses defined - Verify aggregated data includes all status types - Test edge cases like empty custom statuses, malformed JSON, etc. """ import sys import os sys.path.append(os.path.dirname(os.path.abspath(__file__))) import json from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from database import Base from models.user import User, UserRole from models.project import Project, ProjectType, ProjectStatus, ProjectMember from models.task import Task from models.asset import Asset, AssetCategory, AssetStatus from models.episode import Episode, EpisodeStatus from models.shot import Shot, ShotStatus # Create test database TEST_DB = "test_comprehensive_custom_status.db" if os.path.exists(TEST_DB): try: os.remove(TEST_DB) except PermissionError: pass engine = create_engine(f"sqlite:///{TEST_DB}") Base.metadata.create_all(engine) SessionLocal = sessionmaker(bind=engine) def setup_comprehensive_test_data(): """Set up comprehensive test data covering all edge cases.""" print("Setting up comprehensive test data...") db = SessionLocal() try: # Create test user user = User( email="coordinator@test.com", password_hash="test", first_name="Test", last_name="Coordinator", role=UserRole.COORDINATOR, is_approved=True ) db.add(user) db.commit() # Test Case 1: Project with comprehensive custom statuses comprehensive_custom = [ { "id": "custom_todo", "name": "To Do", "color": "#FF0000", "order": 0, "is_default": True }, { "id": "custom_in_review", "name": "In Review", "color": "#FFA500", "order": 1, "is_default": False }, { "id": "custom_approved_final", "name": "Final Approved", "color": "#00FF00", "order": 2, "is_default": False }, { "id": "custom_on_hold", "name": "On Hold", "color": "#808080", "order": 3, "is_default": False } ] project1 = Project( name="Comprehensive Custom Status Project", code_name="CCSP", client_name="Test Client", project_type=ProjectType.CINEMA, status=ProjectStatus.IN_PROGRESS, custom_task_statuses=json.dumps(comprehensive_custom) ) db.add(project1) # Test Case 2: Project with empty custom statuses (should use defaults) project2 = Project( name="Empty Custom Status Project", code_name="ECSP", client_name="Test Client", project_type=ProjectType.TV, status=ProjectStatus.IN_PROGRESS, custom_task_statuses=json.dumps([]) ) db.add(project2) # Test Case 3: Project with NULL custom statuses project3 = Project( name="Null Custom Status Project", code_name="NCSP", client_name="Test Client", project_type=ProjectType.CINEMA, status=ProjectStatus.IN_PROGRESS, custom_task_statuses=None ) db.add(project3) # Test Case 4: Project with malformed JSON (should fallback gracefully) project4 = Project( name="Malformed JSON Project", code_name="MJP", client_name="Test Client", project_type=ProjectType.TV, status=ProjectStatus.IN_PROGRESS, custom_task_statuses='{"invalid": json}' # Malformed JSON ) db.add(project4) db.commit() # Add user as member of all projects for project in [project1, project2, project3, project4]: member = ProjectMember( user_id=user.id, project_id=project.id ) db.add(member) db.commit() # Create episodes for each project episodes = [] for i, project in enumerate([project1, project2, project3, project4], 1): episode = Episode( project_id=project.id, name=f"Episode {i}", episode_number=i, status=EpisodeStatus.IN_PROGRESS ) db.add(episode) episodes.append(episode) db.commit() # Create shots and assets with comprehensive status coverage # Project 1: Mix of all custom statuses and default statuses shot1 = Shot( project_id=project1.id, episode_id=episodes[0].id, name="comprehensive_shot_001", frame_start=1001, frame_end=1100, status=ShotStatus.IN_PROGRESS ) db.add(shot1) db.flush() # Tasks covering all custom statuses plus some defaults task_statuses_p1 = [ ("layout", "custom_todo"), ("animation", "custom_in_review"), ("lighting", "custom_approved_final"), ("compositing", "custom_on_hold"), ("simulation", "in_progress") # Default status mixed in ] for task_type, status in task_statuses_p1: task = Task( project_id=project1.id, episode_id=episodes[0].id, shot_id=shot1.id, name=f"{shot1.name}_{task_type}", task_type=task_type, status=status ) db.add(task) # Asset for project 1 asset1 = Asset( project_id=project1.id, name="comprehensive_character", category=AssetCategory.CHARACTERS, status=AssetStatus.IN_PROGRESS ) db.add(asset1) db.flush() asset_task_statuses_p1 = [ ("modeling", "custom_todo"), ("surfacing", "custom_approved_final"), ("rigging", "approved") # Default status ] for task_type, status in asset_task_statuses_p1: task = Task( project_id=project1.id, asset_id=asset1.id, name=f"{asset1.name}_{task_type}", task_type=task_type, status=status ) db.add(task) # Project 2: Only default statuses (empty custom statuses) shot2 = Shot( project_id=project2.id, episode_id=episodes[1].id, name="empty_custom_shot_001", frame_start=1001, frame_end=1100, status=ShotStatus.IN_PROGRESS ) db.add(shot2) db.flush() # Only default statuses task_statuses_p2 = [ ("layout", "not_started"), ("animation", "in_progress"), ("lighting", "submitted"), ("compositing", "approved") ] for task_type, status in task_statuses_p2: task = Task( project_id=project2.id, episode_id=episodes[1].id, shot_id=shot2.id, name=f"{shot2.name}_{task_type}", task_type=task_type, status=status ) db.add(task) # Asset for project 2 asset2 = Asset( project_id=project2.id, name="empty_custom_prop", category=AssetCategory.PROPS, status=AssetStatus.IN_PROGRESS ) db.add(asset2) db.flush() asset_task_statuses_p2 = [ ("modeling", "not_started"), ("surfacing", "retake") ] for task_type, status in asset_task_statuses_p2: task = Task( project_id=project2.id, asset_id=asset2.id, name=f"{asset2.name}_{task_type}", task_type=task_type, status=status ) db.add(task) # Project 3: NULL custom statuses (should use defaults) shot3 = Shot( project_id=project3.id, episode_id=episodes[2].id, name="null_custom_shot_001", frame_start=1001, frame_end=1100, status=ShotStatus.IN_PROGRESS ) db.add(shot3) db.flush() # Only default statuses task_statuses_p3 = [ ("layout", "not_started"), ("animation", "submitted") ] for task_type, status in task_statuses_p3: task = Task( project_id=project3.id, episode_id=episodes[2].id, shot_id=shot3.id, name=f"{shot3.name}_{task_type}", task_type=task_type, status=status ) db.add(task) # Project 4: Malformed JSON (should fallback to defaults) shot4 = Shot( project_id=project4.id, episode_id=episodes[3].id, name="malformed_shot_001", frame_start=1001, frame_end=1100, status=ShotStatus.IN_PROGRESS ) db.add(shot4) db.flush() # Only default statuses (since custom status parsing should fail gracefully) task_statuses_p4 = [ ("layout", "approved"), ("animation", "retake") ] for task_type, status in task_statuses_p4: task = Task( project_id=project4.id, episode_id=episodes[3].id, shot_id=shot4.id, name=f"{shot4.name}_{task_type}", task_type=task_type, status=status ) db.add(task) db.commit() print("✅ Comprehensive test data setup complete") return user, [project1, project2, project3, project4], episodes except Exception as e: db.rollback() print(f"❌ Failed to setup comprehensive test data: {e}") raise finally: db.close() def test_comprehensive_custom_status_support(): """Test comprehensive custom status support in optimized queries.""" print("\n=== Test 1: Comprehensive Custom Status Support ===") db = SessionLocal() try: user = db.query(User).first() projects = db.query(Project).all() from routers.shots import list_shots from routers.assets import list_assets import asyncio # Test each project's shots for project in projects: episode = db.query(Episode).filter(Episode.project_id == project.id).first() print(f"\n--- Testing Project: {project.name} ---") print(f"Custom statuses: {project.custom_task_statuses}") # Test shots async def get_project_shots(): return await list_shots( episode_id=episode.id, project_id=project.id, task_status_filter=None, sort_by=None, sort_direction="asc", skip=0, limit=100, db=db, current_user=user ) shots = asyncio.run(get_project_shots()) for shot in shots: print(f"Shot: {shot.name}") print(f" Task Status: {shot.task_status}") print(f" Task Details: {len(shot.task_details)} tasks") # Verify all task statuses are valid for task_type, status in shot.task_status.items(): # Should be either a default status or a valid custom status for this project is_default = status in ['not_started', 'in_progress', 'submitted', 'approved', 'retake'] is_custom = False if project.custom_task_statuses: try: custom_statuses = json.loads(project.custom_task_statuses) if isinstance(custom_statuses, list): custom_status_ids = [cs.get('id') for cs in custom_statuses if isinstance(cs, dict)] is_custom = status in custom_status_ids except (json.JSONDecodeError, TypeError): pass # Malformed JSON, should fallback to defaults assert is_default or is_custom, f"Invalid status '{status}' for project {project.name}" print(f" ✅ Status '{status}' is valid ({'default' if is_default else 'custom'})") # Verify task_details match task_status for task_detail in shot.task_details: expected_status = shot.task_status.get(task_detail.task_type) assert task_detail.status == expected_status, f"Task detail status mismatch: {task_detail.status} != {expected_status}" # Test assets async def get_project_assets(): return await list_assets( project_id=project.id, category=None, task_status_filter=None, sort_by=None, sort_direction="asc", skip=0, limit=100, db=db, current_user=user ) assets = asyncio.run(get_project_assets()) for asset in assets: print(f"Asset: {asset.name}") print(f" Task Status: {asset.task_status}") print(f" Task Details: {len(asset.task_details)} tasks") # Verify all task statuses are valid for task_type, status in asset.task_status.items(): # Should be either a default status or a valid custom status for this project is_default = status in ['not_started', 'in_progress', 'submitted', 'approved', 'retake'] is_custom = False if project.custom_task_statuses: try: custom_statuses = json.loads(project.custom_task_statuses) if isinstance(custom_statuses, list): custom_status_ids = [cs.get('id') for cs in custom_statuses if isinstance(cs, dict)] is_custom = status in custom_status_ids except (json.JSONDecodeError, TypeError): pass # Malformed JSON, should fallback to defaults assert is_default or is_custom, f"Invalid status '{status}' for project {project.name}" print(f" ✅ Status '{status}' is valid ({'default' if is_default else 'custom'})") print("✅ Test 1 PASSED: Comprehensive custom status support works correctly") return True except Exception as e: print(f"❌ Test 1 FAILED: {e}") import traceback traceback.print_exc() return False finally: db.close() def test_custom_status_aggregation_completeness(): """Test that aggregated data includes ALL status types (default + custom).""" print("\n=== Test 2: Custom Status Aggregation Completeness ===") db = SessionLocal() try: user = db.query(User).first() # Get the comprehensive project (project 1) project = db.query(Project).filter(Project.name == "Comprehensive Custom Status Project").first() episode = db.query(Episode).filter(Episode.project_id == project.id).first() from routers.shots import list_shots from routers.assets import list_assets import asyncio # Test shots aggregation completeness async def get_shots(): return await list_shots( episode_id=episode.id, project_id=project.id, task_status_filter=None, sort_by=None, sort_direction="asc", skip=0, limit=100, db=db, current_user=user ) shots = asyncio.run(get_shots()) print(f"Testing aggregation completeness for project: {project.name}") # Verify that all custom statuses are present in aggregated data expected_custom_statuses = {'custom_todo', 'custom_in_review', 'custom_approved_final', 'custom_on_hold'} expected_default_statuses = {'in_progress'} # We know we have this one found_custom_statuses = set() found_default_statuses = set() for shot in shots: print(f"Shot: {shot.name}") # Check task_status dict for task_type, status in shot.task_status.items(): if status in expected_custom_statuses: found_custom_statuses.add(status) elif status in ['not_started', 'in_progress', 'submitted', 'approved', 'retake']: found_default_statuses.add(status) # Check task_details for task_detail in shot.task_details: if task_detail.status in expected_custom_statuses: found_custom_statuses.add(task_detail.status) elif task_detail.status in ['not_started', 'in_progress', 'submitted', 'approved', 'retake']: found_default_statuses.add(task_detail.status) print(f"Found custom statuses: {found_custom_statuses}") print(f"Found default statuses: {found_default_statuses}") # Verify we found all expected custom statuses missing_custom = expected_custom_statuses - found_custom_statuses assert len(missing_custom) == 0, f"Missing custom statuses in aggregation: {missing_custom}" # Verify we found expected default statuses missing_default = expected_default_statuses - found_default_statuses assert len(missing_default) == 0, f"Missing default statuses in aggregation: {missing_default}" # Test assets aggregation completeness async def get_assets(): return await list_assets( project_id=project.id, category=None, task_status_filter=None, sort_by=None, sort_direction="asc", skip=0, limit=100, db=db, current_user=user ) assets = asyncio.run(get_assets()) asset_found_custom = set() asset_found_default = set() for asset in assets: print(f"Asset: {asset.name}") # Check task_status dict for task_type, status in asset.task_status.items(): if status in expected_custom_statuses: asset_found_custom.add(status) elif status in ['not_started', 'in_progress', 'submitted', 'approved', 'retake']: asset_found_default.add(status) # Check task_details for task_detail in asset.task_details: if task_detail.status in expected_custom_statuses: asset_found_custom.add(task_detail.status) elif task_detail.status in ['not_started', 'in_progress', 'submitted', 'approved', 'retake']: asset_found_default.add(task_detail.status) print(f"Asset found custom statuses: {asset_found_custom}") print(f"Asset found default statuses: {asset_found_default}") # Verify assets have both custom and default statuses assert len(asset_found_custom) > 0, "No custom statuses found in asset aggregation" assert len(asset_found_default) > 0, "No default statuses found in asset aggregation" print("✅ Test 2 PASSED: Custom status aggregation completeness verified") return True except Exception as e: print(f"❌ Test 2 FAILED: {e}") import traceback traceback.print_exc() return False finally: db.close() def test_edge_case_handling(): """Test edge cases like NULL, empty, and malformed custom statuses.""" print("\n=== Test 3: Edge Case Handling ===") db = SessionLocal() try: user = db.query(User).first() from routers.shots import list_shots import asyncio # Test projects with edge cases edge_case_projects = [ ("Empty Custom Status Project", "empty custom statuses"), ("Null Custom Status Project", "NULL custom statuses"), ("Malformed JSON Project", "malformed JSON custom statuses") ] for project_name, description in edge_case_projects: project = db.query(Project).filter(Project.name == project_name).first() episode = db.query(Episode).filter(Episode.project_id == project.id).first() print(f"\n--- Testing {description}: {project.name} ---") print(f"Custom statuses: {project.custom_task_statuses}") async def get_edge_case_shots(): return await list_shots( episode_id=episode.id, project_id=project.id, task_status_filter=None, sort_by=None, sort_direction="asc", skip=0, limit=100, db=db, current_user=user ) shots = asyncio.run(get_edge_case_shots()) for shot in shots: print(f"Shot: {shot.name}") print(f" Task Status: {shot.task_status}") # Verify that all statuses are valid defaults (since custom statuses should be empty/invalid) for task_type, status in shot.task_status.items(): is_default = status in ['not_started', 'in_progress', 'submitted', 'approved', 'retake'] assert is_default, f"Expected default status, got '{status}' for edge case project {project.name}" print(f" ✅ Status '{status}' is valid default status") # Verify task_details are consistent for task_detail in shot.task_details: is_default = task_detail.status in ['not_started', 'in_progress', 'submitted', 'approved', 'retake'] assert is_default, f"Expected default status in task details, got '{task_detail.status}'" print("✅ Test 3 PASSED: Edge case handling works correctly") return True except Exception as e: print(f"❌ Test 3 FAILED: {e}") import traceback traceback.print_exc() return False finally: db.close() def main(): """Run all comprehensive custom status tests.""" print("🚀 Starting Comprehensive Custom Status Verification Tests") print("=" * 70) try: # Setup test data setup_comprehensive_test_data() # Run tests tests = [ test_comprehensive_custom_status_support, test_custom_status_aggregation_completeness, test_edge_case_handling ] passed = 0 failed = 0 for test in tests: try: if test(): passed += 1 else: failed += 1 except Exception as e: print(f"❌ Test failed with exception: {e}") failed += 1 print("\n" + "=" * 70) print(f"📊 Test Results: {passed} passed, {failed} failed") if failed == 0: print("🎉 All comprehensive tests passed!") print("✅ Custom status support is fully implemented and working correctly.") print("✅ Optimized queries include both default and custom task statuses.") print("✅ Projects with custom task statuses are properly supported.") print("✅ Aggregated data includes all status types.") return True else: print("❌ Some tests failed. Custom status support needs fixes.") return False except Exception as e: print(f"❌ Test setup failed: {e}") import traceback traceback.print_exc() return False finally: # Cleanup try: if os.path.exists(TEST_DB): os.remove(TEST_DB) except PermissionError: pass if __name__ == "__main__": success = main() sys.exit(0 if success else 1)