from fastapi import APIRouter, Depends, HTTPException, Query, UploadFile, File, status from sqlalchemy.orm import Session, joinedload from sqlalchemy import and_, or_ from typing import List, Optional import os import shutil import json from datetime import datetime from database import get_db from models.task import Task, ProductionNote, TaskAttachment, Submission, Review from models.user import User, UserRole, DepartmentRole from models.project import Project, ProjectMember from models.asset import Asset from models.shot import Shot from models.episode import Episode from models.episode import Episode from schemas.task import ( TaskCreate, TaskUpdate, TaskResponse, TaskListResponse, TaskStatusUpdate, TaskAssignment, ProductionNoteCreate, ProductionNoteUpdate, ProductionNoteResponse, TaskAttachmentCreate, TaskAttachmentResponse, SubmissionCreate, SubmissionResponse, BulkStatusUpdate, BulkAssignment, BulkActionResult ) from utils.auth import get_current_user_from_token, _get_user_from_db, require_role from utils.notifications import notification_service from utils.file_handler import file_handler router = APIRouter() # System task statuses (built-in, read-only) SYSTEM_TASK_STATUSES = [ {"id": "not_started", "name": "Not Started", "color": "#6B7280"}, {"id": "in_progress", "name": "In Progress", "color": "#3B82F6"}, {"id": "submitted", "name": "Submitted", "color": "#F59E0B"}, {"id": "approved", "name": "Approved", "color": "#10B981"}, {"id": "retake", "name": "Retake", "color": "#EF4444"} ] def get_project_default_status(db: Session, project_id: int) -> str: """Get the default status for a project (custom or system).""" project = db.query(Project).filter(Project.id == project_id).first() if not project: return "not_started" # Check for custom statuses if project.custom_task_statuses: custom_statuses_data = project.custom_task_statuses if isinstance(custom_statuses_data, str): try: custom_statuses_data = json.loads(custom_statuses_data) except (json.JSONDecodeError, TypeError): custom_statuses_data = [] if isinstance(custom_statuses_data, list): for status_data in custom_statuses_data: if isinstance(status_data, dict) and status_data.get('is_default', False): return status_data.get('id', 'not_started') # Default to system status return "not_started" def validate_task_status(db: Session, project_id: int, status_value: str) -> bool: """ Validate that a status exists for a project (either system or custom). Returns True if valid, False otherwise. """ # Check system statuses first system_status_ids = [s["id"] for s in SYSTEM_TASK_STATUSES] if status_value in system_status_ids: return True # Check custom statuses for the project project = db.query(Project).filter(Project.id == project_id).first() if not project: return False if project.custom_task_statuses: custom_statuses_data = project.custom_task_statuses if isinstance(custom_statuses_data, str): try: custom_statuses_data = json.loads(custom_statuses_data) except (json.JSONDecodeError, TypeError): return False if isinstance(custom_statuses_data, list): for status_data in custom_statuses_data: if isinstance(status_data, dict) and status_data.get('id') == status_value: return True return False def require_admin_or_coordinator( token_data: dict = Depends(get_current_user_from_token), db: Session = Depends(get_db) ): """Dependency to require admin permission or coordinator role.""" current_user = _get_user_from_db(db, token_data["user_id"]) if not current_user.is_admin and current_user.role != UserRole.COORDINATOR: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Admin permission or Coordinator role required" ) return current_user def get_current_user( token_data: dict = Depends(get_current_user_from_token), db: Session = Depends(get_db) ): """Get current user with proper database dependency.""" return _get_user_from_db(db, token_data["user_id"]) def require_role(required_roles: list): """Create a dependency that requires specific user roles.""" def role_checker( token_data: dict = Depends(get_current_user_from_token), db: Session = Depends(get_db) ): current_user = _get_user_from_db(db, token_data["user_id"]) if current_user.role not in required_roles: raise HTTPException( status_code=403, detail="Insufficient permissions" ) return current_user return role_checker @router.get("/", response_model=List[TaskListResponse]) async def get_tasks( project_id: Optional[int] = Query(None, description="Filter by project ID"), shot_id: Optional[int] = Query(None, description="Filter by shot ID"), asset_id: Optional[int] = Query(None, description="Filter by asset ID"), assigned_user_id: Optional[int] = Query(None, description="Filter by assigned user ID"), status: Optional[str] = Query(None, description="Filter by task status"), task_type: Optional[str] = Query(None, description="Filter by task type"), department_role: Optional[str] = Query(None, description="Filter by department role for assignment"), skip: int = Query(0, ge=0), limit: int = Query(100, ge=1, le=1000), db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """ Get tasks with filtering options. Artists see only their assigned tasks unless they have coordinator+ role. """ try: query = db.query(Task).options( joinedload(Task.project), joinedload(Task.episode), joinedload(Task.shot), joinedload(Task.asset), joinedload(Task.assigned_user) ) # Determine project context from shot/asset if not provided context_project_id = project_id # Apply shot/asset filter first and exclude tasks from deleted shots/assets if shot_id: # Only include tasks from non-deleted shots query = query.filter(Task.shot_id == shot_id).join(Shot).filter(Shot.deleted_at.is_(None)) # Get the shot to find its project if not already specified if not context_project_id: shot = db.query(Shot).join(Episode).filter( Shot.id == shot_id, Shot.deleted_at.is_(None) ).first() if shot and shot.episode: context_project_id = shot.episode.project_id if asset_id: # Only include tasks from non-deleted assets query = query.filter(Task.asset_id == asset_id).join(Asset).filter(Asset.deleted_at.is_(None)) # Get the asset to find its project if not already specified if not context_project_id: asset = db.query(Asset).filter( Asset.id == asset_id, Asset.deleted_at.is_(None) ).first() if asset: context_project_id = asset.project_id # Exclude tasks that are soft deleted or belong to deleted shots/assets if not shot_id and not asset_id: # When not filtering by specific shot/asset, exclude tasks from deleted parents query = query.outerjoin(Shot, Task.shot_id == Shot.id).outerjoin(Asset, Task.asset_id == Asset.id) query = query.filter( Task.deleted_at.is_(None), or_( and_(Task.shot_id.is_(None), Task.asset_id.is_(None)), # Tasks without shot/asset and_(Task.shot_id.isnot(None), Shot.deleted_at.is_(None)), # Tasks with non-deleted shot and_(Task.asset_id.isnot(None), Asset.deleted_at.is_(None)) # Tasks with non-deleted asset ) ) else: # When filtering by shot/asset, just exclude soft deleted tasks query = query.filter(Task.deleted_at.is_(None)) # Role-based filtering if current_user.is_admin: # Admins can see all tasks if context_project_id: query = query.filter(Task.project_id == context_project_id) elif current_user.role in [UserRole.COORDINATOR, UserRole.DIRECTOR]: # Coordinators and directors can see all tasks in their projects if context_project_id: query = query.filter(Task.project_id == context_project_id) elif current_user.role == UserRole.ARTIST: if shot_id or asset_id: # When viewing a specific shot/asset, check project membership if context_project_id: is_member = db.query(ProjectMember).filter( and_( ProjectMember.project_id == context_project_id, ProjectMember.user_id == current_user.id ) ).first() if is_member: # Project members can see all tasks for the shot/asset pass # No additional filtering needed else: # Non-members only see their assigned tasks query = query.filter(Task.assigned_user_id == current_user.id) else: # When browsing all tasks, artists only see their assigned tasks query = query.filter(Task.assigned_user_id == current_user.id) if assigned_user_id: query = query.filter(Task.assigned_user_id == assigned_user_id) if status: query = query.filter(Task.status == status) if task_type: query = query.filter(Task.task_type == task_type) # Department role filtering for task assignment if department_role and (current_user.role == UserRole.COORDINATOR or current_user.is_admin): # Find users with matching department role in the project if context_project_id: subquery = db.query(ProjectMember.user_id).filter( and_( ProjectMember.project_id == context_project_id, ProjectMember.department_role == department_role ) ) query = query.filter(Task.assigned_user_id.in_(subquery)) tasks = query.offset(skip).limit(limit).all() # Build response with related entity names result = [] for task in tasks: task_data = { "id": task.id, "name": task.name, "task_type": task.task_type, "status": task.status, "deadline": task.deadline, "project_id": task.project_id, "project_name": task.project.name if task.project else None, "episode_id": task.episode_id, "episode_name": task.episode.name if task.episode else None, "shot_id": task.shot_id, "shot_name": task.shot.name if task.shot else None, "asset_id": task.asset_id, "asset_name": task.asset.name if task.asset else None, "assigned_user_id": task.assigned_user_id, "assigned_user_name": f"{task.assigned_user.first_name} {task.assigned_user.last_name}" if task.assigned_user else None, "created_at": task.created_at, "updated_at": task.updated_at } result.append(TaskListResponse(**task_data)) return result except Exception as e: import traceback print(f"ERROR in get_tasks: {str(e)}") print(traceback.format_exc()) raise HTTPException(status_code=500, detail=f"Error fetching tasks: {str(e)}") @router.get("/my-tasks", response_model=List[TaskListResponse]) async def get_my_tasks( project_id: Optional[int] = Query(None, description="Filter by project ID"), status: Optional[str] = Query(None, description="Filter by task status"), skip: int = Query(0, ge=0), limit: int = Query(100, ge=1, le=1000), db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """ Get all tasks assigned to the current user across all projects. This endpoint is designed for the My Tasks page. """ try: # Build query to get tasks assigned to current user query = db.query(Task).options( joinedload(Task.project), joinedload(Task.episode), joinedload(Task.shot), joinedload(Task.asset), joinedload(Task.assigned_user) ) # Always filter by assignee (current user) query = query.filter(Task.assigned_user_id == current_user.id) # Exclude soft-deleted tasks and tasks from deleted parents query = query.outerjoin(Shot, Task.shot_id == Shot.id).outerjoin(Asset, Task.asset_id == Asset.id) query = query.filter( Task.deleted_at.is_(None), or_( and_(Task.shot_id.is_(None), Task.asset_id.is_(None)), # Tasks without shot/asset and_(Task.shot_id.isnot(None), Shot.deleted_at.is_(None)), # Tasks with non-deleted shot and_(Task.asset_id.isnot(None), Asset.deleted_at.is_(None)) # Tasks with non-deleted asset ) ) # Filter by project if provided if project_id: query = query.filter(Task.project_id == project_id) # Filter by status if provided if status: query = query.filter(Task.status == status) # Apply pagination query = query.order_by(Task.deadline.asc().nullslast(), Task.updated_at.desc()) tasks = query.offset(skip).limit(limit).all() # Build response result = [] for task in tasks: result.append(TaskListResponse( id=task.id, name=task.name, task_type=task.task_type, status=task.status, deadline=task.deadline, project_id=task.project_id, project_name=task.project.name if task.project else "Unknown", episode_id=task.episode_id, episode_name=task.episode.name if task.episode else None, shot_id=task.shot_id, shot_name=task.shot.name if task.shot else None, asset_id=task.asset_id, asset_name=task.asset.name if task.asset else None, assigned_user_id=task.assigned_user_id, assigned_user_name=f"{task.assigned_user.first_name} {task.assigned_user.last_name}" if task.assigned_user else None, created_at=task.created_at, updated_at=task.updated_at )) return result except HTTPException: raise except Exception as e: import traceback print(f"ERROR in get_my_tasks: {str(e)}") print(traceback.format_exc()) raise HTTPException(status_code=500, detail=f"Error fetching my tasks: {str(e)}") @router.post("/", response_model=TaskResponse) async def create_task( task: TaskCreate, db: Session = Depends(get_db), current_user: User = Depends(require_admin_or_coordinator) ): """Create a new task. Only coordinators and users with admin permission can create tasks.""" # Verify project exists project = db.query(Project).filter(Project.id == task.project_id).first() if not project: raise HTTPException(status_code=404, detail="Project not found") # Verify episode exists if specified if task.episode_id: episode = db.query(Episode).filter(Episode.id == task.episode_id).first() if not episode or episode.project_id != task.project_id: raise HTTPException(status_code=404, detail="Episode not found or not in specified project") # Verify shot exists if specified if task.shot_id: shot = db.query(Shot).filter(Shot.id == task.shot_id).first() if not shot: raise HTTPException(status_code=404, detail="Shot not found") if task.episode_id and shot.episode_id != task.episode_id: raise HTTPException(status_code=400, detail="Shot does not belong to specified episode") # Verify asset exists if specified if task.asset_id: asset = db.query(Asset).filter(Asset.id == task.asset_id).first() if not asset or asset.project_id != task.project_id: raise HTTPException(status_code=404, detail="Asset not found or not in specified project") # Verify assigned user exists and is a project member if specified if task.assigned_user_id: assigned_user = db.query(User).filter(User.id == task.assigned_user_id).first() if not assigned_user: raise HTTPException(status_code=404, detail="Assigned user not found") # Check if user is a project member project_member = db.query(ProjectMember).filter( and_( ProjectMember.user_id == task.assigned_user_id, ProjectMember.project_id == task.project_id ) ).first() if not project_member: raise HTTPException(status_code=400, detail="Assigned user is not a member of this project") # Use default status if not specified task_data = task.model_dump() if not task_data.get('status') or task_data['status'] == 'not_started': task_data['status'] = get_project_default_status(db, task.project_id) else: # Validate the provided status if not validate_task_status(db, task.project_id, task_data['status']): raise HTTPException( status_code=400, detail=f"Invalid status '{task_data['status']}' for this project" ) # Create task db_task = Task(**task_data) db.add(db_task) db.commit() db.refresh(db_task) # Load related entities for response db_task = db.query(Task).options( joinedload(Task.project), joinedload(Task.episode), joinedload(Task.shot), joinedload(Task.asset), joinedload(Task.assigned_user) ).filter(Task.id == db_task.id).first() # Build response task_data = { **task.model_dump(), "id": db_task.id, "created_at": db_task.created_at, "updated_at": db_task.updated_at, "project_name": db_task.project.name if db_task.project else None, "episode_name": db_task.episode.name if db_task.episode else None, "shot_name": db_task.shot.name if db_task.shot else None, "asset_name": db_task.asset.name if db_task.asset else None, "assigned_user_name": f"{db_task.assigned_user.first_name} {db_task.assigned_user.last_name}" if db_task.assigned_user else None, "assigned_user_email": db_task.assigned_user.email if db_task.assigned_user else None, "assigned_user_avatar_url": db_task.assigned_user.avatar_url if db_task.assigned_user else None } return TaskResponse(**task_data) # Bulk action endpoints (must be before /{task_id} routes) @router.put("/bulk/status", response_model=BulkActionResult) async def bulk_update_task_status( bulk_update: BulkStatusUpdate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """ Update status for multiple tasks atomically. Coordinators and admins can update any tasks. Artists can only update their own assigned tasks. Validates that the status is valid for each task's project. """ if not bulk_update.task_ids: raise HTTPException(status_code=400, detail="No task IDs provided") success_count = 0 failed_count = 0 errors = [] try: # Start transaction # Fetch all tasks in one query tasks = db.query(Task).filter(Task.id.in_(bulk_update.task_ids)).all() if not tasks: # Return error result instead of raising exception return BulkActionResult( success_count=0, failed_count=len(bulk_update.task_ids), errors=[{"task_id": tid, "error": "Task not found"} for tid in bulk_update.task_ids] ) # Create a map of task IDs to tasks for quick lookup task_map = {task.id: task for task in tasks} # Verify permissions and status validity for all tasks before making any changes for task_id in bulk_update.task_ids: task = task_map.get(task_id) if not task: errors.append({ "task_id": task_id, "error": "Task not found" }) failed_count += 1 continue # Permission check if current_user.role == UserRole.ARTIST: if task.assigned_user_id != current_user.id: errors.append({ "task_id": task_id, "error": "Not authorized to update this task" }) failed_count += 1 continue elif current_user.role not in [UserRole.COORDINATOR, UserRole.DIRECTOR] and not current_user.is_admin: errors.append({ "task_id": task_id, "error": "Insufficient permissions" }) failed_count += 1 continue # Validate status for the task's project if not validate_task_status(db, task.project_id, bulk_update.status): errors.append({ "task_id": task_id, "error": f"Invalid status '{bulk_update.status}' for task's project" }) failed_count += 1 continue # If any task failed permission or validation check, rollback all changes if failed_count > 0: db.rollback() return BulkActionResult( success_count=0, failed_count=failed_count, errors=errors ) # All permission checks and validations passed, update all tasks updated_task_ids = [] for task_id in bulk_update.task_ids: task = task_map.get(task_id) if task: old_status = task.status # Store old status for consistency tracking task.status = bulk_update.status updated_task_ids.append((task_id, old_status, bulk_update.status)) success_count += 1 # Commit all changes atomically db.commit() # DATA CONSISTENCY: Propagate bulk task updates and validate consistency from services.data_consistency import create_data_consistency_service consistency_service = create_data_consistency_service(db) for task_id, old_status, new_status in updated_task_ids: propagation_result = consistency_service.propagate_task_update( task_id=task_id, old_status=old_status, new_status=new_status ) # Log any consistency issues (but don't fail the request) if not propagation_result.get('success') or not propagation_result.get('validation_result', {}).get('valid'): import logging logger = logging.getLogger(__name__) logger.warning(f"Bulk task update consistency issue for task {task_id}: {propagation_result}") return BulkActionResult( success_count=success_count, failed_count=failed_count, errors=errors if errors else None ) except Exception as e: db.rollback() import traceback print(f"ERROR in bulk_update_task_status: {str(e)}") print(traceback.format_exc()) raise HTTPException( status_code=500, detail=f"Failed to update tasks: {str(e)}" ) @router.put("/bulk/assign", response_model=BulkActionResult) async def bulk_assign_tasks( bulk_assignment: BulkAssignment, db: Session = Depends(get_db), current_user: User = Depends(require_admin_or_coordinator) ): """ Assign multiple tasks to a user atomically. Only coordinators and admins can perform bulk assignments. """ if not bulk_assignment.task_ids: raise HTTPException(status_code=400, detail="No task IDs provided") success_count = 0 failed_count = 0 errors = [] try: # Verify assigned user exists assigned_user = db.query(User).filter(User.id == bulk_assignment.assigned_user_id).first() if not assigned_user: raise HTTPException(status_code=404, detail="Assigned user not found") # Fetch all tasks in one query tasks = db.query(Task).filter(Task.id.in_(bulk_assignment.task_ids)).all() if not tasks: # Return error result instead of raising exception return BulkActionResult( success_count=0, failed_count=len(bulk_assignment.task_ids), errors=[{"task_id": tid, "error": "Task not found"} for tid in bulk_assignment.task_ids] ) # Create a map of task IDs to tasks for quick lookup task_map = {task.id: task for task in tasks} # Verify user is a member of all task projects before making any changes project_ids = set(task.project_id for task in tasks) for project_id in project_ids: project_member = db.query(ProjectMember).filter( and_( ProjectMember.user_id == bulk_assignment.assigned_user_id, ProjectMember.project_id == project_id ) ).first() if not project_member: # Find all tasks in this project and mark them as failed for task in tasks: if task.project_id == project_id: errors.append({ "task_id": task.id, "error": f"User is not a member of project {project_id}" }) failed_count += 1 # If any task failed validation, rollback all changes if failed_count > 0: db.rollback() return BulkActionResult( success_count=0, failed_count=failed_count, errors=errors ) # All validation passed, assign all tasks for task_id in bulk_assignment.task_ids: task = task_map.get(task_id) if task: task.assigned_user_id = bulk_assignment.assigned_user_id success_count += 1 # Send notification to assigned user notification_service.notify_task_assigned(db, task, assigned_user, current_user) # Commit all changes atomically db.commit() return BulkActionResult( success_count=success_count, failed_count=failed_count, errors=errors if errors else None ) except HTTPException: db.rollback() raise except Exception as e: db.rollback() import traceback print(f"ERROR in bulk_assign_tasks: {str(e)}") print(traceback.format_exc()) raise HTTPException( status_code=500, detail=f"Failed to assign tasks: {str(e)}" ) @router.get("/{task_id}", response_model=TaskResponse) async def get_task( task_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """Get a specific task by ID.""" task = db.query(Task).options( joinedload(Task.project), joinedload(Task.episode), joinedload(Task.shot), joinedload(Task.asset), joinedload(Task.assigned_user) ).outerjoin(Shot, Task.shot_id == Shot.id).outerjoin(Asset, Task.asset_id == Asset.id).filter( Task.id == task_id, Task.deleted_at.is_(None), or_( and_(Task.shot_id.is_(None), Task.asset_id.is_(None)), # Tasks without shot/asset and_(Task.shot_id.isnot(None), Shot.deleted_at.is_(None)), # Tasks with non-deleted shot and_(Task.asset_id.isnot(None), Asset.deleted_at.is_(None)) # Tasks with non-deleted asset ) ).first() if not task: raise HTTPException(status_code=404, detail="Task not found") # Artists can only view their own tasks if current_user.role == UserRole.ARTIST and task.assigned_user_id != current_user.id: raise HTTPException(status_code=403, detail="Not authorized to view this task") # Build response task_data = { "id": task.id, "name": task.name, "description": task.description, "task_type": task.task_type, "status": task.status, "deadline": task.deadline, "project_id": task.project_id, "episode_id": task.episode_id, "shot_id": task.shot_id, "asset_id": task.asset_id, "assigned_user_id": task.assigned_user_id, "created_at": task.created_at, "updated_at": task.updated_at, "project_name": task.project.name if task.project else None, "episode_name": task.episode.name if task.episode else None, "shot_name": task.shot.name if task.shot else None, "asset_name": task.asset.name if task.asset else None, "assigned_user_name": f"{task.assigned_user.first_name} {task.assigned_user.last_name}" if task.assigned_user else None, "assigned_user_email": task.assigned_user.email if task.assigned_user else None, "assigned_user_avatar_url": task.assigned_user.avatar_url if task.assigned_user else None } return TaskResponse(**task_data) @router.put("/{task_id}", response_model=TaskResponse) async def update_task( task_id: int, task_update: TaskUpdate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """Update a task. Coordinators can update any field, artists can only update status.""" task = db.query(Task).outerjoin(Shot, Task.shot_id == Shot.id).outerjoin(Asset, Task.asset_id == Asset.id).filter( Task.id == task_id, Task.deleted_at.is_(None), or_( and_(Task.shot_id.is_(None), Task.asset_id.is_(None)), # Tasks without shot/asset and_(Task.shot_id.isnot(None), Shot.deleted_at.is_(None)), # Tasks with non-deleted shot and_(Task.asset_id.isnot(None), Asset.deleted_at.is_(None)) # Tasks with non-deleted asset ) ).first() if not task: raise HTTPException(status_code=404, detail="Task not found") # Permission check if current_user.role == UserRole.ARTIST: if task.assigned_user_id != current_user.id: raise HTTPException(status_code=403, detail="Not authorized to update this task") # Artists can only update status if task_update.model_dump(exclude_unset=True).keys() - {"status"}: raise HTTPException(status_code=403, detail="Artists can only update task status") elif current_user.role != UserRole.COORDINATOR and not current_user.is_admin: raise HTTPException(status_code=403, detail="Not authorized to update tasks") # Verify assigned user if being updated if task_update.assigned_user_id is not None: if task_update.assigned_user_id != 0: # 0 means unassign assigned_user = db.query(User).filter(User.id == task_update.assigned_user_id).first() if not assigned_user: raise HTTPException(status_code=404, detail="Assigned user not found") # Check if user is a project member project_member = db.query(ProjectMember).filter( and_( ProjectMember.user_id == task_update.assigned_user_id, ProjectMember.project_id == task.project_id ) ).first() if not project_member: raise HTTPException(status_code=400, detail="Assigned user is not a member of this project") else: task_update.assigned_user_id = None # Validate status if being updated update_data = task_update.model_dump(exclude_unset=True) old_status = task.status # Store old status for consistency tracking if 'status' in update_data: if not validate_task_status(db, task.project_id, update_data['status']): raise HTTPException( status_code=400, detail=f"Invalid status '{update_data['status']}' for this project" ) # Update task for field, value in update_data.items(): setattr(task, field, value) db.commit() db.refresh(task) # DATA CONSISTENCY: Propagate task update and validate consistency if 'status' in update_data: from services.data_consistency import create_data_consistency_service consistency_service = create_data_consistency_service(db) propagation_result = consistency_service.propagate_task_update( task_id=task.id, old_status=old_status, new_status=task.status ) # Log any consistency issues (but don't fail the request) if not propagation_result.get('success') or not propagation_result.get('validation_result', {}).get('valid'): import logging logger = logging.getLogger(__name__) logger.warning(f"Task update consistency issue: {propagation_result}") # Load related entities for response task = db.query(Task).options( joinedload(Task.project), joinedload(Task.episode), joinedload(Task.shot), joinedload(Task.asset), joinedload(Task.assigned_user) ).filter(Task.id == task_id).first() # Build response task_data = { "id": task.id, "name": task.name, "description": task.description, "task_type": task.task_type, "status": task.status, "deadline": task.deadline, "project_id": task.project_id, "episode_id": task.episode_id, "shot_id": task.shot_id, "asset_id": task.asset_id, "assigned_user_id": task.assigned_user_id, "created_at": task.created_at, "updated_at": task.updated_at, "project_name": task.project.name if task.project else None, "episode_name": task.episode.name if task.episode else None, "shot_name": task.shot.name if task.shot else None, "asset_name": task.asset.name if task.asset else None, "assigned_user_name": f"{task.assigned_user.first_name} {task.assigned_user.last_name}" if task.assigned_user else None, "assigned_user_email": task.assigned_user.email if task.assigned_user else None, "assigned_user_avatar_url": task.assigned_user.avatar_url if task.assigned_user else None } return TaskResponse(**task_data) @router.put("/{task_id}/status", response_model=TaskResponse) async def update_task_status( task_id: int, status_update: TaskStatusUpdate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """Update task status. Artists can update their own tasks, coordinators can update any.""" task = db.query(Task).outerjoin(Shot, Task.shot_id == Shot.id).outerjoin(Asset, Task.asset_id == Asset.id).filter( Task.id == task_id, Task.deleted_at.is_(None), or_( and_(Task.shot_id.is_(None), Task.asset_id.is_(None)), # Tasks without shot/asset and_(Task.shot_id.isnot(None), Shot.deleted_at.is_(None)), # Tasks with non-deleted shot and_(Task.asset_id.isnot(None), Asset.deleted_at.is_(None)) # Tasks with non-deleted asset ) ).first() if not task: raise HTTPException(status_code=404, detail="Task not found") # Permission check if current_user.role == UserRole.ARTIST and task.assigned_user_id != current_user.id: raise HTTPException(status_code=403, detail="Not authorized to update this task") elif current_user.role not in [UserRole.ARTIST, UserRole.COORDINATOR] and not current_user.is_admin: raise HTTPException(status_code=403, detail="Not authorized to update task status") # Validate the status for the task's project if not validate_task_status(db, task.project_id, status_update.status): raise HTTPException( status_code=400, detail=f"Invalid status '{status_update.status}' for this project" ) old_status = task.status # Store old status for consistency tracking task.status = status_update.status db.commit() db.refresh(task) # DATA CONSISTENCY: Propagate task status update and validate consistency from services.data_consistency import create_data_consistency_service consistency_service = create_data_consistency_service(db) propagation_result = consistency_service.propagate_task_update( task_id=task.id, old_status=old_status, new_status=task.status ) # Log any consistency issues (but don't fail the request) if not propagation_result.get('success') or not propagation_result.get('validation_result', {}).get('valid'): import logging logger = logging.getLogger(__name__) logger.warning(f"Task status update consistency issue: {propagation_result}") # Load related entities for response task = db.query(Task).options( joinedload(Task.project), joinedload(Task.episode), joinedload(Task.shot), joinedload(Task.asset), joinedload(Task.assigned_user) ).filter(Task.id == task_id).first() # Build response task_data = { "id": task.id, "name": task.name, "description": task.description, "task_type": task.task_type, "status": task.status, "deadline": task.deadline, "project_id": task.project_id, "episode_id": task.episode_id, "shot_id": task.shot_id, "asset_id": task.asset_id, "assigned_user_id": task.assigned_user_id, "created_at": task.created_at, "updated_at": task.updated_at, "project_name": task.project.name if task.project else None, "episode_name": task.episode.name if task.episode else None, "shot_name": task.shot.name if task.shot else None, "asset_name": task.asset.name if task.asset else None, "assigned_user_name": f"{task.assigned_user.first_name} {task.assigned_user.last_name}" if task.assigned_user else None, "assigned_user_email": task.assigned_user.email if task.assigned_user else None, "assigned_user_avatar_url": task.assigned_user.avatar_url if task.assigned_user else None } return TaskResponse(**task_data) @router.put("/{task_id}/assign", response_model=TaskResponse) async def assign_task( task_id: int, assignment: TaskAssignment, db: Session = Depends(get_db), current_user: User = Depends(require_admin_or_coordinator) ): """Assign a task to a user with department role filtering.""" task = db.query(Task).outerjoin(Shot, Task.shot_id == Shot.id).outerjoin(Asset, Task.asset_id == Asset.id).filter( Task.id == task_id, Task.deleted_at.is_(None), or_( and_(Task.shot_id.is_(None), Task.asset_id.is_(None)), # Tasks without shot/asset and_(Task.shot_id.isnot(None), Shot.deleted_at.is_(None)), # Tasks with non-deleted shot and_(Task.asset_id.isnot(None), Asset.deleted_at.is_(None)) # Tasks with non-deleted asset ) ).first() if not task: raise HTTPException(status_code=404, detail="Task not found") # Verify assigned user exists and is a project member assigned_user = db.query(User).filter(User.id == assignment.assigned_user_id).first() if not assigned_user: raise HTTPException(status_code=404, detail="Assigned user not found") # Check if user is a project member with appropriate department role project_member = db.query(ProjectMember).filter( and_( ProjectMember.user_id == assignment.assigned_user_id, ProjectMember.project_id == task.project_id ) ).first() if not project_member: raise HTTPException(status_code=400, detail="User is not a member of this project") # Check if user's department role matches task type (optional validation) task_to_department_mapping = { "layout": DepartmentRole.LAYOUT, "animation": DepartmentRole.ANIMATION, "lighting": DepartmentRole.LIGHTING, "compositing": DepartmentRole.COMPOSITE, "modeling": DepartmentRole.MODELING, "rigging": DepartmentRole.RIGGING, "surfacing": DepartmentRole.SURFACING, "simulation": None # Simulation can be handled by multiple departments } # task.task_type is already a string, not an enum task_type_str = task.task_type if isinstance(task.task_type, str) else task.task_type.value expected_department = task_to_department_mapping.get(task_type_str) if expected_department and project_member.department_role != expected_department: # This is a warning, not an error - coordinators can override pass task.assigned_user_id = assignment.assigned_user_id db.commit() db.refresh(task) # Send notification to assigned user notification_service.notify_task_assigned(db, task, assigned_user, current_user) # Load related entities for response task = db.query(Task).options( joinedload(Task.project), joinedload(Task.episode), joinedload(Task.shot), joinedload(Task.asset), joinedload(Task.assigned_user) ).filter(Task.id == task_id).first() # Build response task_data = { "id": task.id, "name": task.name, "description": task.description, "task_type": task.task_type, "status": task.status, "deadline": task.deadline, "project_id": task.project_id, "episode_id": task.episode_id, "shot_id": task.shot_id, "asset_id": task.asset_id, "assigned_user_id": task.assigned_user_id, "created_at": task.created_at, "updated_at": task.updated_at, "project_name": task.project.name if task.project else None, "episode_name": task.episode.name if task.episode else None, "shot_name": task.shot.name if task.shot else None, "asset_name": task.asset.name if task.asset else None, "assigned_user_name": f"{task.assigned_user.first_name} {task.assigned_user.last_name}" if task.assigned_user else None, "assigned_user_email": task.assigned_user.email if task.assigned_user else None, "assigned_user_avatar_url": task.assigned_user.avatar_url if task.assigned_user else None } return TaskResponse(**task_data) @router.delete("/{task_id}") async def delete_task( task_id: int, db: Session = Depends(get_db), current_user: User = Depends(require_admin_or_coordinator) ): """Delete a task. Only coordinators and users with admin permission can delete tasks.""" task = db.query(Task).outerjoin(Shot, Task.shot_id == Shot.id).outerjoin(Asset, Task.asset_id == Asset.id).filter( Task.id == task_id, Task.deleted_at.is_(None), or_( and_(Task.shot_id.is_(None), Task.asset_id.is_(None)), # Tasks without shot/asset and_(Task.shot_id.isnot(None), Shot.deleted_at.is_(None)), # Tasks with non-deleted shot and_(Task.asset_id.isnot(None), Asset.deleted_at.is_(None)) # Tasks with non-deleted asset ) ).first() if not task: raise HTTPException(status_code=404, detail="Task not found") db.delete(task) db.commit() return {"message": "Task deleted successfully"} # Production Notes endpoints @router.get("/{task_id}/notes", response_model=List[ProductionNoteResponse]) async def get_task_notes( task_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """Get all notes for a task, organized in threaded structure.""" task = db.query(Task).outerjoin(Shot, Task.shot_id == Shot.id).outerjoin(Asset, Task.asset_id == Asset.id).filter( Task.id == task_id, Task.deleted_at.is_(None), or_( and_(Task.shot_id.is_(None), Task.asset_id.is_(None)), # Tasks without shot/asset and_(Task.shot_id.isnot(None), Shot.deleted_at.is_(None)), # Tasks with non-deleted shot and_(Task.asset_id.isnot(None), Asset.deleted_at.is_(None)) # Tasks with non-deleted asset ) ).first() if not task: raise HTTPException(status_code=404, detail="Task not found") # Artists can only view notes for their own tasks if current_user.role == UserRole.ARTIST and task.assigned_user_id != current_user.id: raise HTTPException(status_code=403, detail="Not authorized to view notes for this task") # Get all active notes for the task with user information (exclude soft deleted) notes = db.query(ProductionNote).options( joinedload(ProductionNote.user) ).filter( ProductionNote.task_id == task_id, ProductionNote.deleted_at.is_(None) ).order_by(ProductionNote.created_at).all() # Build threaded structure notes_dict = {} root_notes = [] for note in notes: note_data = { "id": note.id, "content": note.content, "parent_note_id": note.parent_note_id, "task_id": note.task_id, "user_id": note.user_id, "created_at": note.created_at, "updated_at": note.updated_at, "user_first_name": note.user.first_name, "user_last_name": note.user.last_name, "user_email": note.user.email, "user_avatar_url": note.user.avatar_url, "child_notes": [] } notes_dict[note.id] = note_data if note.parent_note_id is None: root_notes.append(note_data) else: if note.parent_note_id in notes_dict: notes_dict[note.parent_note_id]["child_notes"].append(note_data) return [ProductionNoteResponse(**note) for note in root_notes] @router.post("/{task_id}/notes", response_model=ProductionNoteResponse) async def create_task_note( task_id: int, note: ProductionNoteCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """Create a new note for a task.""" task = db.query(Task).outerjoin(Shot, Task.shot_id == Shot.id).outerjoin(Asset, Task.asset_id == Asset.id).filter( Task.id == task_id, Task.deleted_at.is_(None), or_( and_(Task.shot_id.is_(None), Task.asset_id.is_(None)), # Tasks without shot/asset and_(Task.shot_id.isnot(None), Shot.deleted_at.is_(None)), # Tasks with non-deleted shot and_(Task.asset_id.isnot(None), Asset.deleted_at.is_(None)) # Tasks with non-deleted asset ) ).first() if not task: raise HTTPException(status_code=404, detail="Task not found") # Artists can only add notes to their own tasks if current_user.role == UserRole.ARTIST and task.assigned_user_id != current_user.id: raise HTTPException(status_code=403, detail="Not authorized to add notes to this task") # Verify parent note exists if specified if note.parent_note_id: parent_note = db.query(ProductionNote).filter( and_( ProductionNote.id == note.parent_note_id, ProductionNote.task_id == task_id, ProductionNote.deleted_at.is_(None) ) ).first() if not parent_note: raise HTTPException(status_code=404, detail="Parent note not found") # Create note db_note = ProductionNote( task_id=task_id, user_id=current_user.id, content=note.content, parent_note_id=note.parent_note_id ) db.add(db_note) db.commit() db.refresh(db_note) # Load user information for response db_note = db.query(ProductionNote).options( joinedload(ProductionNote.user) ).filter(ProductionNote.id == db_note.id).first() note_data = { "id": db_note.id, "content": db_note.content, "parent_note_id": db_note.parent_note_id, "task_id": db_note.task_id, "user_id": db_note.user_id, "created_at": db_note.created_at, "updated_at": db_note.updated_at, "user_first_name": db_note.user.first_name, "user_last_name": db_note.user.last_name, "user_email": db_note.user.email, "user_avatar_url": db_note.user.avatar_url, "child_notes": [] } return ProductionNoteResponse(**note_data) @router.put("/{task_id}/notes/{note_id}", response_model=ProductionNoteResponse) async def update_task_note( task_id: int, note_id: int, note_update: ProductionNoteUpdate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """Update a note. Users can only update their own notes.""" note = db.query(ProductionNote).options( joinedload(ProductionNote.user) ).filter( and_( ProductionNote.id == note_id, ProductionNote.task_id == task_id, ProductionNote.deleted_at.is_(None) ) ).first() if not note: raise HTTPException(status_code=404, detail="Note not found") # Users can only update their own notes, unless they have admin permission if note.user_id != current_user.id and not current_user.is_admin: raise HTTPException(status_code=403, detail="Not authorized to update this note") note.content = note_update.content db.commit() db.refresh(note) note_data = { "id": note.id, "content": note.content, "parent_note_id": note.parent_note_id, "task_id": note.task_id, "user_id": note.user_id, "created_at": note.created_at, "updated_at": note.updated_at, "user_first_name": note.user.first_name, "user_last_name": note.user.last_name, "user_email": note.user.email, "user_avatar_url": note.user.avatar_url, "child_notes": [] } return ProductionNoteResponse(**note_data) @router.delete("/{task_id}/notes/{note_id}") async def delete_task_note( task_id: int, note_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """Delete a note. Users can only delete their own notes.""" note = db.query(ProductionNote).filter( and_( ProductionNote.id == note_id, ProductionNote.task_id == task_id, ProductionNote.deleted_at.is_(None) ) ).first() if not note: raise HTTPException(status_code=404, detail="Note not found") # Users can only delete their own notes, unless they have admin permission if note.user_id != current_user.id and not current_user.is_admin: raise HTTPException(status_code=403, detail="Not authorized to delete this note") db.delete(note) db.commit() return {"message": "Note deleted successfully"} # Task Attachments endpoints @router.get("/{task_id}/attachments", response_model=List[TaskAttachmentResponse]) async def get_task_attachments( task_id: int, attachment_type: Optional[str] = Query(None, description="Filter by attachment type"), db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """Get all attachments for a task.""" task = db.query(Task).filter(Task.id == task_id).first() if not task: raise HTTPException(status_code=404, detail="Task not found") # Artists can only view attachments for their own tasks if current_user.role == UserRole.ARTIST and task.assigned_user_id != current_user.id: raise HTTPException(status_code=403, detail="Not authorized to view attachments for this task") query = db.query(TaskAttachment).options( joinedload(TaskAttachment.user) ).filter( TaskAttachment.task_id == task_id, TaskAttachment.deleted_at.is_(None) ) if attachment_type: query = query.filter(TaskAttachment.attachment_type == attachment_type) attachments = query.order_by(TaskAttachment.uploaded_at.desc()).all() result = [] for attachment in attachments: attachment_data = { "id": attachment.id, "task_id": attachment.task_id, "user_id": attachment.user_id, "file_name": attachment.file_name, "file_path": attachment.file_path, "file_type": attachment.file_type, "file_size": attachment.file_size, "attachment_type": attachment.attachment_type, "description": attachment.description, "uploaded_at": attachment.uploaded_at, "user_first_name": attachment.user.first_name, "user_last_name": attachment.user.last_name, "download_url": f"/files/attachments/{attachment.id}", "thumbnail_url": f"/files/attachments/{attachment.id}?thumbnail=true" if file_handler.is_image_file(attachment.file_path) else None } result.append(TaskAttachmentResponse(**attachment_data)) return result @router.post("/{task_id}/attachments", response_model=TaskAttachmentResponse) async def upload_task_attachment( task_id: int, file: UploadFile = File(...), attachment_type: str = "reference", description: Optional[str] = None, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """Upload an attachment for a task.""" task = db.query(Task).filter(Task.id == task_id).first() if not task: raise HTTPException(status_code=404, detail="Task not found") # Artists can only upload attachments to their own tasks if current_user.role == UserRole.ARTIST and task.assigned_user_id != current_user.id: raise HTTPException(status_code=403, detail="Not authorized to upload attachments to this task") # Validate file using file handler file_handler.validate_file(file, file_handler.MAX_ATTACHMENT_SIZE, db) # Save file using file handler file_path, file_size = await file_handler.save_file(file, task_id, "attachment", db=db) # Create thumbnail for images thumbnail_path = None if file_handler.is_image_file(file_path): thumbnail_path = file_handler.create_thumbnail(file_path) # Create attachment record db_attachment = TaskAttachment( task_id=task_id, user_id=current_user.id, file_name=file.filename or "unknown", file_path=file_path, file_type=file.content_type or "application/octet-stream", file_size=file_size, attachment_type=attachment_type, description=description ) db.add(db_attachment) db.commit() db.refresh(db_attachment) # Load user information for response db_attachment = db.query(TaskAttachment).options( joinedload(TaskAttachment.user) ).filter(TaskAttachment.id == db_attachment.id).first() attachment_data = { "id": db_attachment.id, "task_id": db_attachment.task_id, "user_id": db_attachment.user_id, "file_name": db_attachment.file_name, "file_path": db_attachment.file_path, "file_type": db_attachment.file_type, "file_size": db_attachment.file_size, "attachment_type": db_attachment.attachment_type, "description": db_attachment.description, "uploaded_at": db_attachment.uploaded_at, "user_first_name": db_attachment.user.first_name, "user_last_name": db_attachment.user.last_name, "download_url": f"/files/attachments/{db_attachment.id}", "thumbnail_url": f"/files/attachments/{db_attachment.id}?thumbnail=true" if file_handler.is_image_file(db_attachment.file_path) else None } return TaskAttachmentResponse(**attachment_data) @router.delete("/{task_id}/attachments/{attachment_id}") async def delete_task_attachment( task_id: int, attachment_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """Delete a task attachment. Users can only delete their own attachments.""" attachment = db.query(TaskAttachment).filter( and_( TaskAttachment.id == attachment_id, TaskAttachment.task_id == task_id, TaskAttachment.deleted_at.is_(None) ) ).first() if not attachment: raise HTTPException(status_code=404, detail="Attachment not found") # Users can only delete their own attachments, unless they're admin/coordinator if (attachment.user_id != current_user.id and not current_user.is_admin and current_user.role != UserRole.COORDINATOR): raise HTTPException(status_code=403, detail="Not authorized to delete this attachment") # Delete file from filesystem using file handler file_handler.delete_file(attachment.file_path) db.delete(attachment) db.commit() return {"message": "Attachment deleted successfully"} # Submission endpoints @router.get("/{task_id}/submissions", response_model=List[SubmissionResponse]) async def get_task_submissions( task_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """Get all submissions for a task.""" task = db.query(Task).filter(Task.id == task_id).first() if not task: raise HTTPException(status_code=404, detail="Task not found") # Artists can only view submissions for their own tasks if current_user.role == UserRole.ARTIST and task.assigned_user_id != current_user.id: raise HTTPException(status_code=403, detail="Not authorized to view submissions for this task") submissions = db.query(Submission).options( joinedload(Submission.user), joinedload(Submission.reviews).joinedload(Review.reviewer) ).filter( Submission.task_id == task_id, Submission.deleted_at.is_(None) ).order_by(Submission.submitted_at.desc()).all() result = [] for submission in submissions: # Get latest review latest_review = None if submission.reviews: latest_review_obj = max(submission.reviews, key=lambda r: r.reviewed_at) latest_review = { "id": latest_review_obj.id, "submission_id": latest_review_obj.submission_id, "reviewer_id": latest_review_obj.reviewer_id, "decision": latest_review_obj.decision, "feedback": latest_review_obj.feedback, "reviewed_at": latest_review_obj.reviewed_at, "reviewer_first_name": latest_review_obj.reviewer.first_name, "reviewer_last_name": latest_review_obj.reviewer.last_name } submission_data = { "id": submission.id, "task_id": submission.task_id, "user_id": submission.user_id, "file_path": submission.file_path, "file_name": submission.file_name, "version_number": submission.version_number, "notes": submission.notes, "submitted_at": submission.submitted_at, "user_first_name": submission.user.first_name, "user_last_name": submission.user.last_name, "latest_review": latest_review, "download_url": f"/files/submissions/{submission.id}", "thumbnail_url": f"/files/submissions/{submission.id}?thumbnail=true" if file_handler.is_image_file(submission.file_path) else None, "stream_url": f"/files/submissions/{submission.id}/stream" if file_handler.is_video_file(submission.file_path) else None } result.append(SubmissionResponse(**submission_data)) return result @router.post("/{task_id}/submit", response_model=SubmissionResponse) async def submit_work( task_id: int, file: UploadFile = File(...), notes: Optional[str] = None, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """Submit work for a task. Only assigned artists can submit work.""" task = db.query(Task).filter(Task.id == task_id).first() if not task: raise HTTPException(status_code=404, detail="Task not found") # Only assigned artist can submit work if task.assigned_user_id != current_user.id: raise HTTPException(status_code=403, detail="Only the assigned artist can submit work for this task") # Validate file using file handler file_handler.validate_file(file, file_handler.MAX_SUBMISSION_SIZE, db) # Get next version number latest_submission = db.query(Submission).filter( Submission.task_id == task_id, Submission.deleted_at.is_(None) ).order_by(Submission.version_number.desc()).first() version_number = (latest_submission.version_number + 1) if latest_submission else 1 # Save file using file handler with version number file_path, file_size = await file_handler.save_file(file, task_id, "submission", version_number, db=db) # Create thumbnail for images thumbnail_path = None if file_handler.is_image_file(file_path): thumbnail_path = file_handler.create_thumbnail(file_path) # Create submission record db_submission = Submission( task_id=task_id, user_id=current_user.id, file_path=file_path, file_name=file.filename or "unknown", version_number=version_number, notes=notes ) db.add(db_submission) # Update task status to submitted task.status = "submitted" db.commit() db.refresh(db_submission) # Send notification to directors/coordinators notification_service.notify_work_submitted(db, db_submission, task) # Load user information for response db_submission = db.query(Submission).options( joinedload(Submission.user) ).filter(Submission.id == db_submission.id).first() submission_data = { "id": db_submission.id, "task_id": db_submission.task_id, "user_id": db_submission.user_id, "file_path": db_submission.file_path, "file_name": db_submission.file_name, "version_number": db_submission.version_number, "notes": db_submission.notes, "submitted_at": db_submission.submitted_at, "user_first_name": db_submission.user.first_name, "user_last_name": db_submission.user.last_name, "latest_review": None, "download_url": f"/files/submissions/{db_submission.id}", "thumbnail_url": f"/files/submissions/{db_submission.id}?thumbnail=true" if file_handler.is_image_file(db_submission.file_path) else None, "stream_url": f"/files/submissions/{db_submission.id}/stream" if file_handler.is_video_file(db_submission.file_path) else None } return SubmissionResponse(**submission_data)