LinkDesk/backend/routers/projects.py

1958 lines
67 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, status, Query, UploadFile
from sqlalchemy.orm import Session, joinedload
from sqlalchemy.orm.attributes import flag_modified
from sqlalchemy import func
from typing import List, Optional
import json
from database import get_db
from models.project import Project, ProjectMember
from models.user import User, UserRole
from models.episode import Episode
from models.asset import Asset
from schemas.project import (
ProjectCreate, ProjectUpdate, ProjectResponse, ProjectListResponse,
ProjectMemberCreate, ProjectMemberUpdate, ProjectMemberResponse,
ProjectTechnicalSpecs, DeliveryMovieSpec, DEFAULT_DELIVERY_MOVIE_SPECS,
ProjectSettings, ProjectSettingsUpdate, DEFAULT_ASSET_TASKS, DEFAULT_SHOT_TASKS
)
from utils.auth import get_current_user, require_role, get_current_user_from_token
router = APIRouter()
def get_current_user_with_db(
token_data: dict = Depends(get_current_user_from_token),
db: Session = Depends(get_db)
):
"""Get current user with proper database dependency."""
from utils.auth import _get_user_from_db
return _get_user_from_db(db, token_data["user_id"])
def require_coordinator_or_admin(
token_data: dict = Depends(get_current_user_from_token),
db: Session = Depends(get_db)
):
"""Require coordinator or admin role."""
from utils.auth import _get_user_from_db
current_user = _get_user_from_db(db, token_data["user_id"])
if current_user.role != UserRole.COORDINATOR and not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Insufficient permissions"
)
return current_user
def require_admin(
token_data: dict = Depends(get_current_user_from_token),
db: Session = Depends(get_db)
):
"""Require admin role."""
from utils.auth import _get_user_from_db
current_user = _get_user_from_db(db, token_data["user_id"])
if not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Insufficient permissions"
)
return current_user
@router.get("/", response_model=List[ProjectListResponse])
async def list_projects(
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user_with_db)
):
"""List all projects with summary information"""
# Artists can only see projects they're members of
if current_user.role == UserRole.ARTIST:
projects = db.query(Project).join(ProjectMember).filter(
ProjectMember.user_id == current_user.id
).offset(skip).limit(limit).all()
else:
projects = db.query(Project).offset(skip).limit(limit).all()
# Add summary information
result = []
for project in projects:
member_count = db.query(ProjectMember).filter(ProjectMember.project_id == project.id).count()
episode_count = db.query(Episode).filter(Episode.project_id == project.id).count()
asset_count = db.query(Asset).filter(Asset.project_id == project.id).count()
project_data = ProjectListResponse.model_validate(project)
project_data.member_count = member_count
project_data.episode_count = episode_count
project_data.asset_count = asset_count
# Add thumbnail URL if thumbnail exists
if project.thumbnail_path:
project_data.thumbnail_url = f"/files/projects/{project.id}/thumbnail"
result.append(project_data)
return result
@router.post("/", response_model=ProjectResponse, status_code=status.HTTP_201_CREATED)
async def create_project(
project: ProjectCreate,
db: Session = Depends(get_db),
current_user: User = Depends(require_coordinator_or_admin)
):
"""Create a new project"""
# Check if code_name already exists
existing_project = db.query(Project).filter(Project.code_name == project.code_name).first()
if existing_project:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Project with code name '{project.code_name}' already exists"
)
db_project = Project(**project.model_dump())
db.add(db_project)
try:
db.commit()
db.refresh(db_project)
except Exception as e:
db.rollback()
if "UNIQUE constraint failed" in str(e):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Project with code name '{project.code_name}' already exists"
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to create project"
)
return ProjectResponse.model_validate(db_project)
@router.get("/{project_id}", response_model=ProjectResponse)
async def get_project(
project_id: int,
include_members: bool = False,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user_with_db)
):
"""Get a specific project by ID"""
query = db.query(Project)
if include_members:
query = query.options(joinedload(Project.project_members).joinedload(ProjectMember.user))
project = query.filter(Project.id == project_id).first()
if not project:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Project not found"
)
# Check if artist has access to this project
if current_user.role == UserRole.ARTIST:
member = db.query(ProjectMember).filter(
ProjectMember.project_id == project_id,
ProjectMember.user_id == current_user.id
).first()
if not member:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Access denied to this project"
)
# Build project response without members first
project_dict = {
'id': project.id,
'name': project.name,
'code_name': project.code_name,
'client_name': project.client_name,
'description': project.description,
'project_type': project.project_type,
'status': project.status,
'start_date': project.start_date,
'end_date': project.end_date,
'created_at': project.created_at,
'updated_at': project.updated_at,
'thumbnail_path': project.thumbnail_path,
'custom_asset_task_types': project.custom_asset_task_types,
'custom_shot_task_types': project.custom_shot_task_types,
'project_members': []
}
project_data = ProjectResponse(**project_dict)
# Add thumbnail URL if thumbnail exists
if project.thumbnail_path:
project_data.thumbnail_url = f"/files/projects/{project.id}/thumbnail"
if include_members and project.project_members:
members = []
for member in project.project_members:
member_data = ProjectMemberResponse(
id=member.id,
user_id=member.user_id,
project_id=member.project_id,
department_role=member.department_role,
joined_at=member.joined_at,
user_email=member.user.email,
user_first_name=member.user.first_name,
user_last_name=member.user.last_name
)
members.append(member_data)
project_data.project_members = members
return project_data
@router.put("/{project_id}", response_model=ProjectResponse)
async def update_project(
project_id: int,
project_update: ProjectUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(require_coordinator_or_admin)
):
"""Update a project"""
db_project = db.query(Project).filter(Project.id == project_id).first()
if not db_project:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Project not found"
)
# Check if code_name is being updated and if it already exists
update_data = project_update.model_dump(exclude_unset=True)
if 'code_name' in update_data:
existing_project = db.query(Project).filter(
Project.code_name == update_data['code_name'],
Project.id != project_id
).first()
if existing_project:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Project with code name '{update_data['code_name']}' already exists"
)
# Update only provided fields
for field, value in update_data.items():
setattr(db_project, field, value)
try:
db.commit()
db.refresh(db_project)
except Exception as e:
db.rollback()
if "UNIQUE constraint failed" in str(e):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Project with code name '{update_data.get('code_name', 'unknown')}' already exists"
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to update project"
)
# Create response without project members to avoid validation issues
# The frontend doesn't need member data for project updates
# Handle delivery_movie_specs_by_department - convert from JSON string if needed
delivery_specs = {}
if db_project.delivery_movie_specs_by_department:
specs_data = db_project.delivery_movie_specs_by_department
if isinstance(specs_data, str):
try:
specs_data = json.loads(specs_data)
except (json.JSONDecodeError, TypeError):
specs_data = {}
if isinstance(specs_data, dict):
for dept, spec_data in specs_data.items():
if isinstance(spec_data, dict):
delivery_specs[dept] = DeliveryMovieSpec(**spec_data)
else:
# Skip invalid spec data
continue
project_dict = {
'id': db_project.id,
'name': db_project.name,
'code_name': db_project.code_name,
'client_name': db_project.client_name,
'project_type': db_project.project_type,
'description': db_project.description,
'status': db_project.status,
'start_date': db_project.start_date,
'end_date': db_project.end_date,
'frame_rate': db_project.frame_rate,
'data_drive_path': db_project.data_drive_path,
'publish_storage_path': db_project.publish_storage_path,
'delivery_image_resolution': db_project.delivery_image_resolution,
'delivery_movie_specs_by_department': delivery_specs,
'created_at': db_project.created_at,
'updated_at': db_project.updated_at,
'thumbnail_url': f"/files/projects/{db_project.id}/thumbnail" if db_project.thumbnail_path else None,
'project_members': None # Explicitly set to None to avoid validation issues
}
return ProjectResponse.model_validate(project_dict)
@router.delete("/{project_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_project(
project_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(require_admin)
):
"""Delete a project (admin only)"""
db_project = db.query(Project).filter(Project.id == project_id).first()
if not db_project:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Project not found"
)
db.delete(db_project)
db.commit()
# Project Member Management Endpoints
@router.get("/{project_id}/members", response_model=List[ProjectMemberResponse])
async def list_project_members(
project_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user_with_db)
):
"""List all members of a project"""
# Check if project exists
project = db.query(Project).filter(Project.id == project_id).first()
if not project:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Project not found"
)
# Check access for artists
if current_user.role == UserRole.ARTIST:
member = db.query(ProjectMember).filter(
ProjectMember.project_id == project_id,
ProjectMember.user_id == current_user.id
).first()
if not member:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Access denied to this project"
)
members = db.query(ProjectMember).options(
joinedload(ProjectMember.user)
).filter(ProjectMember.project_id == project_id).all()
result = []
for member in members:
member_data = ProjectMemberResponse(
id=member.id,
user_id=member.user_id,
project_id=member.project_id,
department_role=member.department_role,
joined_at=member.joined_at,
user_email=member.user.email,
user_first_name=member.user.first_name,
user_last_name=member.user.last_name,
user_avatar_url=member.user.avatar_url
)
result.append(member_data)
return result
@router.post("/{project_id}/members", response_model=ProjectMemberResponse, status_code=status.HTTP_201_CREATED)
async def add_project_member(
project_id: int,
member: ProjectMemberCreate,
db: Session = Depends(get_db),
current_user: User = Depends(require_coordinator_or_admin)
):
"""Add a member to a project"""
# Check if project exists
project = db.query(Project).filter(Project.id == project_id).first()
if not project:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Project not found"
)
# Check if user exists
user = db.query(User).filter(User.id == member.user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
# Check if user is already a member
existing_member = db.query(ProjectMember).filter(
ProjectMember.project_id == project_id,
ProjectMember.user_id == member.user_id
).first()
if existing_member:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="User is already a member of this project"
)
# Create new project member
db_member = ProjectMember(
project_id=project_id,
user_id=member.user_id,
department_role=member.department_role
)
db.add(db_member)
db.commit()
db.refresh(db_member)
# Load user data for response
db_member = db.query(ProjectMember).options(
joinedload(ProjectMember.user)
).filter(ProjectMember.id == db_member.id).first()
return ProjectMemberResponse(
id=db_member.id,
user_id=db_member.user_id,
project_id=db_member.project_id,
department_role=db_member.department_role,
joined_at=db_member.joined_at,
user_email=db_member.user.email,
user_first_name=db_member.user.first_name,
user_last_name=db_member.user.last_name
)
@router.put("/{project_id}/members/{member_id}", response_model=ProjectMemberResponse)
async def update_project_member(
project_id: int,
member_id: int,
member_update: ProjectMemberUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(require_coordinator_or_admin)
):
"""Update a project member's department role"""
db_member = db.query(ProjectMember).options(
joinedload(ProjectMember.user)
).filter(
ProjectMember.id == member_id,
ProjectMember.project_id == project_id
).first()
if not db_member:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Project member not found"
)
# Update department role
if member_update.department_role is not None:
db_member.department_role = member_update.department_role
db.commit()
db.refresh(db_member)
return ProjectMemberResponse(
id=db_member.id,
user_id=db_member.user_id,
project_id=db_member.project_id,
department_role=db_member.department_role,
joined_at=db_member.joined_at,
user_email=db_member.user.email,
user_first_name=db_member.user.first_name,
user_last_name=db_member.user.last_name
)
@router.delete("/{project_id}/members/{member_id}", status_code=status.HTTP_204_NO_CONTENT)
async def remove_project_member(
project_id: int,
member_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(require_coordinator_or_admin)
):
"""Remove a member from a project"""
db_member = db.query(ProjectMember).filter(
ProjectMember.id == member_id,
ProjectMember.project_id == project_id
).first()
if not db_member:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Project member not found"
)
db.delete(db_member)
db.commit()
# Technical Specifications Endpoints
@router.get("/{project_id}/technical-specs", response_model=ProjectTechnicalSpecs)
async def get_project_technical_specs(
project_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user_with_db)
):
"""Get project technical specifications"""
project = db.query(Project).filter(Project.id == project_id).first()
if not project:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Project not found"
)
# Check access for artists
if current_user.role == UserRole.ARTIST:
member = db.query(ProjectMember).filter(
ProjectMember.project_id == project_id,
ProjectMember.user_id == current_user.id
).first()
if not member:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Access denied to this project"
)
# Convert delivery_movie_specs_by_department from JSON to dict of DeliveryMovieSpec
delivery_specs = {}
if project.delivery_movie_specs_by_department:
# Handle both string (JSON) and dict cases
specs_data = project.delivery_movie_specs_by_department
if isinstance(specs_data, str):
try:
specs_data = json.loads(specs_data)
except (json.JSONDecodeError, TypeError):
specs_data = {}
if isinstance(specs_data, dict):
for dept, spec_data in specs_data.items():
if isinstance(spec_data, dict):
delivery_specs[dept] = DeliveryMovieSpec(**spec_data)
else:
# Skip invalid spec data
continue
return ProjectTechnicalSpecs(
frame_rate=project.frame_rate,
data_drive_path=project.data_drive_path,
publish_storage_path=project.publish_storage_path,
delivery_image_resolution=project.delivery_image_resolution,
delivery_movie_specs_by_department=delivery_specs
)
@router.put("/{project_id}/technical-specs", response_model=ProjectTechnicalSpecs)
async def update_project_technical_specs(
project_id: int,
tech_specs: ProjectTechnicalSpecs,
db: Session = Depends(get_db),
current_user: User = Depends(require_coordinator_or_admin)
):
"""Update project technical specifications"""
db_project = db.query(Project).filter(Project.id == project_id).first()
if not db_project:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Project not found"
)
# Update technical specifications
db_project.frame_rate = tech_specs.frame_rate
db_project.data_drive_path = tech_specs.data_drive_path
db_project.publish_storage_path = tech_specs.publish_storage_path
db_project.delivery_image_resolution = tech_specs.delivery_image_resolution
# Convert delivery_movie_specs_by_department to JSON for storage
if tech_specs.delivery_movie_specs_by_department:
delivery_specs_json = {}
for dept, spec in tech_specs.delivery_movie_specs_by_department.items():
delivery_specs_json[dept] = spec.dict()
db_project.delivery_movie_specs_by_department = delivery_specs_json
else:
db_project.delivery_movie_specs_by_department = {}
try:
db.commit()
db.refresh(db_project)
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to update technical specifications"
)
# Return updated specs
delivery_specs = {}
if db_project.delivery_movie_specs_by_department:
# Handle both string (JSON) and dict cases
specs_data = db_project.delivery_movie_specs_by_department
if isinstance(specs_data, str):
try:
specs_data = json.loads(specs_data)
except (json.JSONDecodeError, TypeError):
specs_data = {}
if isinstance(specs_data, dict):
for dept, spec_data in specs_data.items():
if isinstance(spec_data, dict):
delivery_specs[dept] = DeliveryMovieSpec(**spec_data)
else:
# Skip invalid spec data
continue
return ProjectTechnicalSpecs(
frame_rate=db_project.frame_rate,
data_drive_path=db_project.data_drive_path,
publish_storage_path=db_project.publish_storage_path,
delivery_image_resolution=db_project.delivery_image_resolution,
delivery_movie_specs_by_department=delivery_specs
)
@router.post("/{project_id}/technical-specs/defaults", response_model=ProjectTechnicalSpecs)
async def set_default_technical_specs(
project_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(require_coordinator_or_admin)
):
"""Set default technical specifications for a project"""
db_project = db.query(Project).filter(Project.id == project_id).first()
if not db_project:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Project not found"
)
# Set default values
db_project.frame_rate = 24.0 # Default frame rate
db_project.delivery_image_resolution = "1920x1080" # Default HD resolution
# Set default delivery movie specs
default_specs_json = {}
for dept, spec in DEFAULT_DELIVERY_MOVIE_SPECS.items():
default_specs_json[dept] = spec.dict()
db_project.delivery_movie_specs_by_department = default_specs_json
try:
db.commit()
db.refresh(db_project)
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to set default technical specifications"
)
# Return updated specs
return ProjectTechnicalSpecs(
frame_rate=db_project.frame_rate,
data_drive_path=db_project.data_drive_path,
publish_storage_path=db_project.publish_storage_path,
delivery_image_resolution=db_project.delivery_image_resolution,
delivery_movie_specs_by_department=DEFAULT_DELIVERY_MOVIE_SPECS
)
# Project Settings Endpoints
@router.get("/{project_id}/settings", response_model=ProjectSettings)
async def get_project_settings(
project_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user_with_db)
):
"""Get project-specific settings for upload location and task templates"""
project = db.query(Project).filter(Project.id == project_id).first()
if not project:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Project not found"
)
# Check access for artists
if current_user.role == UserRole.ARTIST:
member = db.query(ProjectMember).filter(
ProjectMember.project_id == project_id,
ProjectMember.user_id == current_user.id
).first()
if not member:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Access denied to this project"
)
# Return project settings with defaults if not set
return ProjectSettings(
upload_data_location=project.upload_data_location,
asset_task_templates=project.asset_task_templates if project.asset_task_templates else DEFAULT_ASSET_TASKS.copy(),
shot_task_templates=project.shot_task_templates if project.shot_task_templates else DEFAULT_SHOT_TASKS.copy(),
enabled_asset_tasks=project.enabled_asset_tasks if project.enabled_asset_tasks else {},
enabled_shot_tasks=project.enabled_shot_tasks if project.enabled_shot_tasks else DEFAULT_SHOT_TASKS.copy()
)
@router.put("/{project_id}/settings", response_model=ProjectSettings)
async def update_project_settings(
project_id: int,
settings: ProjectSettingsUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(require_coordinator_or_admin)
):
"""Update project-specific settings for upload location and task templates"""
db_project = db.query(Project).filter(Project.id == project_id).first()
if not db_project:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Project not found"
)
# Update settings fields only if provided
if settings.upload_data_location is not None:
db_project.upload_data_location = settings.upload_data_location
if settings.asset_task_templates is not None:
db_project.asset_task_templates = settings.asset_task_templates
if settings.shot_task_templates is not None:
db_project.shot_task_templates = settings.shot_task_templates
if settings.enabled_asset_tasks is not None:
db_project.enabled_asset_tasks = settings.enabled_asset_tasks
if settings.enabled_shot_tasks is not None:
db_project.enabled_shot_tasks = settings.enabled_shot_tasks
try:
db.commit()
db.refresh(db_project)
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to update project settings: {str(e)}"
)
# Return updated settings
return ProjectSettings(
upload_data_location=db_project.upload_data_location,
asset_task_templates=db_project.asset_task_templates if db_project.asset_task_templates else DEFAULT_ASSET_TASKS.copy(),
shot_task_templates=db_project.shot_task_templates if db_project.shot_task_templates else DEFAULT_SHOT_TASKS.copy(),
enabled_asset_tasks=db_project.enabled_asset_tasks if db_project.enabled_asset_tasks else {},
enabled_shot_tasks=db_project.enabled_shot_tasks if db_project.enabled_shot_tasks else DEFAULT_SHOT_TASKS.copy()
)
# Custom Task Type Management Endpoints
# Standard task types (read-only)
STANDARD_ASSET_TASK_TYPES = ["modeling", "surfacing", "rigging"]
STANDARD_SHOT_TASK_TYPES = ["layout", "animation", "simulation", "lighting", "compositing"]
def _build_all_task_types_response(db_project: Project):
"""Helper function to build AllTaskTypesResponse"""
from schemas.custom_task_type import AllTaskTypesResponse
custom_asset_types = db_project.custom_asset_task_types or []
custom_shot_types = db_project.custom_shot_task_types or []
all_asset_types = STANDARD_ASSET_TASK_TYPES + custom_asset_types
all_shot_types = STANDARD_SHOT_TASK_TYPES + custom_shot_types
return AllTaskTypesResponse(
asset_task_types=all_asset_types,
shot_task_types=all_shot_types,
standard_asset_types=STANDARD_ASSET_TASK_TYPES,
standard_shot_types=STANDARD_SHOT_TASK_TYPES,
custom_asset_types=custom_asset_types,
custom_shot_types=custom_shot_types
)
@router.get("/{project_id}/custom-task-types")
async def get_all_task_types(
project_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user_with_db)
):
"""Get all task types (standard + custom) for a project"""
from schemas.custom_task_type import AllTaskTypesResponse
db_project = db.query(Project).filter(Project.id == project_id).first()
if not db_project:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Project not found"
)
return _build_all_task_types_response(db_project)
@router.post("/{project_id}/custom-task-types", status_code=status.HTTP_201_CREATED)
async def add_custom_task_type(
project_id: int,
task_type_data: dict,
db: Session = Depends(get_db),
current_user: User = Depends(require_coordinator_or_admin)
):
"""Add a new custom task type to a project"""
from schemas.custom_task_type import CustomTaskTypeCreate
# Validate input
try:
task_type_create = CustomTaskTypeCreate(**task_type_data)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=str(e)
)
db_project = db.query(Project).filter(Project.id == project_id).first()
if not db_project:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Project not found"
)
# Get existing custom task types
if task_type_create.category == "asset":
custom_types = db_project.custom_asset_task_types or []
standard_types = STANDARD_ASSET_TASK_TYPES
else: # shot
custom_types = db_project.custom_shot_task_types or []
standard_types = STANDARD_SHOT_TASK_TYPES
# Check if task type already exists (in standard or custom)
if task_type_create.task_type in standard_types:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Task type '{task_type_create.task_type}' is a standard task type and cannot be added as custom"
)
if task_type_create.task_type in custom_types:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Task type '{task_type_create.task_type}' already exists"
)
# Add new custom task type
custom_types.append(task_type_create.task_type)
# Update database - need to flag as modified for JSON columns
if task_type_create.category == "asset":
db_project.custom_asset_task_types = custom_types
flag_modified(db_project, 'custom_asset_task_types')
else:
db_project.custom_shot_task_types = custom_types
flag_modified(db_project, 'custom_shot_task_types')
try:
db.commit()
db.refresh(db_project)
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to add custom task type"
)
return _build_all_task_types_response(db_project)
@router.put("/{project_id}/custom-task-types/{task_type}")
async def update_custom_task_type(
project_id: int,
task_type: str,
update_data: dict,
db: Session = Depends(get_db),
current_user: User = Depends(require_coordinator_or_admin)
):
"""Update a custom task type name"""
from schemas.custom_task_type import CustomTaskTypeUpdate
from models.task import Task
# Validate input
try:
task_type_update = CustomTaskTypeUpdate(**update_data)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=str(e)
)
# Verify task_type matches old_name
if task_type != task_type_update.old_name:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Task type in URL does not match old_name in request body"
)
db_project = db.query(Project).filter(Project.id == project_id).first()
if not db_project:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Project not found"
)
# Get existing custom task types
if task_type_update.category == "asset":
custom_types = db_project.custom_asset_task_types or []
standard_types = STANDARD_ASSET_TASK_TYPES
else: # shot
custom_types = db_project.custom_shot_task_types or []
standard_types = STANDARD_SHOT_TASK_TYPES
# Check if old task type exists in custom types
if task_type_update.old_name not in custom_types:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Custom task type '{task_type_update.old_name}' not found"
)
# Check if new name conflicts with standard or existing custom types
if task_type_update.new_name in standard_types:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Task type '{task_type_update.new_name}' is a standard task type"
)
if task_type_update.new_name in custom_types and task_type_update.new_name != task_type_update.old_name:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Task type '{task_type_update.new_name}' already exists"
)
# Update custom task type name in the list
custom_types = [task_type_update.new_name if t == task_type_update.old_name else t for t in custom_types]
# Update database - need to flag as modified for JSON columns
if task_type_update.category == "asset":
db_project.custom_asset_task_types = custom_types
flag_modified(db_project, 'custom_asset_task_types')
else:
db_project.custom_shot_task_types = custom_types
flag_modified(db_project, 'custom_shot_task_types')
# Update all tasks using this task type
if task_type_update.category == "asset":
tasks_to_update = db.query(Task).join(Asset).filter(
Asset.project_id == project_id,
Task.task_type == task_type_update.old_name
).all()
else: # shot
from models.shot import Shot
tasks_to_update = db.query(Task).join(Shot).filter(
Shot.project_id == project_id,
Task.task_type == task_type_update.old_name
).all()
for task in tasks_to_update:
task.task_type = task_type_update.new_name
try:
db.commit()
db.refresh(db_project)
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to update custom task type"
)
return _build_all_task_types_response(db_project)
@router.delete("/{project_id}/custom-task-types/{task_type}")
async def delete_custom_task_type(
project_id: int,
task_type: str,
category: str = Query(..., description="Category: 'asset' or 'shot'"),
db: Session = Depends(get_db),
current_user: User = Depends(require_coordinator_or_admin)
):
"""Delete a custom task type"""
from models.task import Task
from schemas.custom_task_type import TaskTypeInUseError
# Validate category
if category not in ["asset", "shot"]:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Category must be 'asset' or 'shot'"
)
db_project = db.query(Project).filter(Project.id == project_id).first()
if not db_project:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Project not found"
)
# Get existing custom task types
if category == "asset":
custom_types = db_project.custom_asset_task_types or []
else: # shot
custom_types = db_project.custom_shot_task_types or []
# Check if task type exists in custom types
if task_type not in custom_types:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Custom task type '{task_type}' not found"
)
# Check if task type is in use
if category == "asset":
tasks_using_type = db.query(Task).join(Asset).filter(
Asset.project_id == project_id,
Task.task_type == task_type
).all()
else: # shot
from models.shot import Shot
tasks_using_type = db.query(Task).join(Shot).filter(
Shot.project_id == project_id,
Task.task_type == task_type
).all()
if tasks_using_type:
task_ids = [task.id for task in tasks_using_type]
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail={
"error": f"Cannot delete task type '{task_type}' because it is currently in use",
"task_type": task_type,
"task_count": len(tasks_using_type),
"task_ids": task_ids
}
)
# Remove custom task type
custom_types.remove(task_type)
# Update database - need to flag as modified for JSON columns
if category == "asset":
db_project.custom_asset_task_types = custom_types
flag_modified(db_project, 'custom_asset_task_types')
else:
db_project.custom_shot_task_types = custom_types
flag_modified(db_project, 'custom_shot_task_types')
try:
db.commit()
db.refresh(db_project)
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to delete custom task type"
)
return _build_all_task_types_response(db_project)
# Project Thumbnail Management Endpoints
@router.post("/{project_id}/thumbnail", status_code=status.HTTP_201_CREATED)
async def upload_project_thumbnail(
project_id: int,
file: UploadFile,
db: Session = Depends(get_db),
current_user: User = Depends(require_coordinator_or_admin)
):
"""Upload a thumbnail image for a project"""
from PIL import Image
import io
from pathlib import Path
import hashlib
from datetime import datetime
from utils.file_handler import file_handler
# Verify project exists
db_project = db.query(Project).filter(Project.id == project_id).first()
if not db_project:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Project not found"
)
# Validate file type
allowed_formats = {'.jpg', '.jpeg', '.png', '.gif', '.webp'}
file_extension = Path(file.filename).suffix.lower()
if file_extension not in allowed_formats:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid file format. Allowed formats: {', '.join(allowed_formats)}"
)
# Read file content
file_content = await file.read()
file_size = len(file_content)
# Validate file size (10MB max)
max_size = 10 * 1024 * 1024 # 10MB
if file_size > max_size:
raise HTTPException(
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
detail="File too large. Maximum size is 10MB"
)
# Use FileHandler's project thumbnails directory
project_thumbnails_dir = file_handler.project_thumbnails_dir
# Delete old thumbnail if exists
if db_project.thumbnail_path:
absolute_old_thumbnail = file_handler.resolve_absolute_path(db_project.thumbnail_path)
old_thumbnail = Path(absolute_old_thumbnail)
if old_thumbnail.exists():
try:
old_thumbnail.unlink()
except Exception:
pass # Continue even if deletion fails
# Generate unique filename
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
hash_input = f"{file.filename}{timestamp}".encode()
short_hash = hashlib.md5(hash_input).hexdigest()[:8]
unique_filename = f"project_{project_id}_{timestamp}_{short_hash}{file_extension}"
# Process image
try:
image = Image.open(io.BytesIO(file_content))
# Convert to RGB if necessary
if image.mode in ('RGBA', 'LA', 'P'):
# Create white background for transparency
background = Image.new('RGB', image.size, (255, 255, 255))
if image.mode == 'RGBA':
background.paste(image, mask=image.split()[3]) # Use alpha channel as mask
else:
background.paste(image)
image = background
elif image.mode != 'RGB':
image = image.convert('RGB')
# Resize image maintaining aspect ratio
# Maximum dimensions: 800x600
max_width = 800
max_height = 600
# Calculate new dimensions
width, height = image.size
aspect_ratio = width / height
if width > max_width or height > max_height:
if aspect_ratio > max_width / max_height:
# Width is the limiting factor
new_width = max_width
new_height = int(max_width / aspect_ratio)
else:
# Height is the limiting factor
new_height = max_height
new_width = int(max_height * aspect_ratio)
image = image.resize((new_width, new_height), Image.Resampling.LANCZOS)
# Save processed image
thumbnail_path = project_thumbnails_dir / unique_filename
image.save(thumbnail_path, 'JPEG', quality=90, optimize=True)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Failed to process image: {str(e)}"
)
# Update project with thumbnail path (store as relative path)
db_project.thumbnail_path = file_handler.store_relative_path(str(thumbnail_path))
try:
db.commit()
db.refresh(db_project)
except Exception as e:
db.rollback()
# Clean up uploaded file
if thumbnail_path.exists():
thumbnail_path.unlink()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to update project thumbnail"
)
return {
"message": "Thumbnail uploaded successfully",
"thumbnail_url": f"/files/projects/{project_id}/thumbnail"
}
@router.delete("/{project_id}/thumbnail", status_code=status.HTTP_204_NO_CONTENT)
async def delete_project_thumbnail(
project_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(require_coordinator_or_admin)
):
"""Delete a project thumbnail"""
from pathlib import Path
from utils.file_handler import file_handler
# Verify project exists
db_project = db.query(Project).filter(Project.id == project_id).first()
if not db_project:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Project not found"
)
# Check if thumbnail exists
if not db_project.thumbnail_path:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Project has no thumbnail"
)
# Delete thumbnail file using FileHandler's delete method
if not file_handler.delete_file(db_project.thumbnail_path):
# If FileHandler delete fails, try manual deletion as fallback
absolute_thumbnail_path = file_handler.resolve_absolute_path(db_project.thumbnail_path)
thumbnail_path = Path(absolute_thumbnail_path)
if thumbnail_path.exists():
try:
thumbnail_path.unlink()
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to delete thumbnail file: {str(e)}"
)
# Update project to clear thumbnail_path
db_project.thumbnail_path = None
try:
db.commit()
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to update project"
)
# Custom Task Status Management Endpoints
# System task statuses (built-in, read-only)
SYSTEM_TASK_STATUSES = [
{"id": "not_started", "name": "Not Started", "color": "#6B7280"},
{"id": "in_progress", "name": "In Progress", "color": "#3B82F6"},
{"id": "submitted", "name": "Submitted", "color": "#F59E0B"},
{"id": "approved", "name": "Approved", "color": "#10B981"},
{"id": "retake", "name": "Retake", "color": "#EF4444"}
]
@router.get("/{project_id}/task-statuses")
async def get_all_task_statuses(
project_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user_with_db)
):
"""Get all task statuses (system + custom) for a project"""
from schemas.custom_task_status import AllTaskStatusesResponse, CustomTaskStatus, SystemTaskStatus
# Verify project exists
db_project = db.query(Project).filter(Project.id == project_id).first()
if not db_project:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Project not found"
)
# Check access for artists
if current_user.role == UserRole.ARTIST:
member = db.query(ProjectMember).filter(
ProjectMember.project_id == project_id,
ProjectMember.user_id == current_user.id
).first()
if not member:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Access denied to this project"
)
# Build system statuses list
system_statuses = [
SystemTaskStatus(
id=status["id"],
name=status["name"],
color=status["color"],
is_system=True
)
for status in SYSTEM_TASK_STATUSES
]
# Get custom statuses from project
custom_statuses = []
default_status_id = "not_started" # Default to system status
if db_project.custom_task_statuses:
custom_statuses_data = db_project.custom_task_statuses
if isinstance(custom_statuses_data, str):
try:
custom_statuses_data = json.loads(custom_statuses_data)
except (json.JSONDecodeError, TypeError):
custom_statuses_data = []
if isinstance(custom_statuses_data, list):
for status_data in custom_statuses_data:
if isinstance(status_data, dict):
custom_status = CustomTaskStatus(**status_data)
custom_statuses.append(custom_status)
# Check if this is the default status
if status_data.get('is_default', False):
default_status_id = status_data.get('id')
return AllTaskStatusesResponse(
statuses=custom_statuses,
system_statuses=system_statuses,
default_status_id=default_status_id
)
# Default color palette for custom task statuses
DEFAULT_STATUS_COLOR_PALETTE = [
"#8B5CF6", # Purple
"#EC4899", # Pink
"#14B8A6", # Teal
"#F97316", # Orange
"#06B6D4", # Cyan
"#84CC16", # Lime
"#A855F7", # Violet
"#F43F5E", # Rose
"#22D3EE", # Sky
"#FACC15", # Yellow
]
@router.post("/{project_id}/task-statuses", status_code=status.HTTP_201_CREATED)
async def create_custom_task_status(
project_id: int,
status_create: dict,
db: Session = Depends(get_db),
current_user: User = Depends(require_coordinator_or_admin)
):
"""Create a new custom task status for a project"""
from schemas.custom_task_status import (
CustomTaskStatusCreate,
CustomTaskStatus,
CustomTaskStatusResponse,
AllTaskStatusesResponse,
SystemTaskStatus
)
import uuid
# Validate input
try:
status_create = CustomTaskStatusCreate(**status_create)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=str(e)
)
# Verify project exists
db_project = db.query(Project).filter(Project.id == project_id).first()
if not db_project:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Project not found"
)
# Get existing custom statuses
custom_statuses_data = db_project.custom_task_statuses or []
if isinstance(custom_statuses_data, str):
try:
custom_statuses_data = json.loads(custom_statuses_data)
except (json.JSONDecodeError, TypeError):
custom_statuses_data = []
# Validate status name uniqueness within project
existing_names = [s.get('name', '').lower() for s in custom_statuses_data if isinstance(s, dict)]
system_names = [s['name'].lower() for s in SYSTEM_TASK_STATUSES]
if status_create.name.lower() in existing_names:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Status with name '{status_create.name}' already exists in this project"
)
if status_create.name.lower() in system_names:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Status name '{status_create.name}' conflicts with a system status"
)
# Auto-assign color from palette if not provided
if status_create.color is None:
# Get colors already in use
used_colors = [s.get('color', '').upper() for s in custom_statuses_data if isinstance(s, dict)]
# Find first available color from palette
assigned_color = None
for color in DEFAULT_STATUS_COLOR_PALETTE:
if color.upper() not in used_colors:
assigned_color = color
break
# If all palette colors are used, cycle back to the first one
if assigned_color is None:
assigned_color = DEFAULT_STATUS_COLOR_PALETTE[len(custom_statuses_data) % len(DEFAULT_STATUS_COLOR_PALETTE)]
else:
assigned_color = status_create.color
# Generate unique status ID
status_id = f"custom_{uuid.uuid4().hex[:8]}"
# Determine order (append to end)
max_order = max([s.get('order', -1) for s in custom_statuses_data if isinstance(s, dict)], default=-1)
new_order = max_order + 1
# Create new status object
new_status = {
"id": status_id,
"name": status_create.name,
"color": assigned_color,
"order": new_order,
"is_default": False # New statuses are never default by default
}
# Add status to project's custom_task_statuses JSON array
custom_statuses_data.append(new_status)
db_project.custom_task_statuses = custom_statuses_data
# Use flag_modified for JSON column updates
flag_modified(db_project, 'custom_task_statuses')
try:
db.commit()
db.refresh(db_project)
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to create custom task status: {str(e)}"
)
# Build response with created status and all statuses
created_status = CustomTaskStatus(**new_status)
# Build all statuses for response
system_statuses = [
SystemTaskStatus(
id=status["id"],
name=status["name"],
color=status["color"],
is_system=True
)
for status in SYSTEM_TASK_STATUSES
]
custom_statuses = [CustomTaskStatus(**s) for s in custom_statuses_data if isinstance(s, dict)]
# Find default status
default_status_id = "not_started"
for status_data in custom_statuses_data:
if isinstance(status_data, dict) and status_data.get('is_default', False):
default_status_id = status_data.get('id')
break
all_statuses = AllTaskStatusesResponse(
statuses=custom_statuses,
system_statuses=system_statuses,
default_status_id=default_status_id
)
return CustomTaskStatusResponse(
message=f"Custom task status '{status_create.name}' created successfully",
status=created_status,
all_statuses=all_statuses
)
@router.put("/{project_id}/task-statuses/{status_id}")
async def update_custom_task_status(
project_id: int,
status_id: str,
status_update: dict,
db: Session = Depends(get_db),
current_user: User = Depends(require_coordinator_or_admin)
):
"""Update a custom task status"""
from schemas.custom_task_status import (
CustomTaskStatusUpdate,
CustomTaskStatus,
CustomTaskStatusResponse,
AllTaskStatusesResponse,
SystemTaskStatus
)
# Validate input
try:
status_update = CustomTaskStatusUpdate(**status_update)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=str(e)
)
# Verify project exists
db_project = db.query(Project).filter(Project.id == project_id).first()
if not db_project:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Project not found"
)
# Get existing custom statuses
custom_statuses_data = db_project.custom_task_statuses or []
if isinstance(custom_statuses_data, str):
try:
custom_statuses_data = json.loads(custom_statuses_data)
except (json.JSONDecodeError, TypeError):
custom_statuses_data = []
# Find the status to update
status_to_update = None
status_index = None
for idx, status_data in enumerate(custom_statuses_data):
if isinstance(status_data, dict) and status_data.get('id') == status_id:
status_to_update = status_data
status_index = idx
break
if status_to_update is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Custom task status with ID '{status_id}' not found"
)
# Validate name uniqueness if name is being changed
if status_update.name is not None and status_update.name != status_to_update.get('name'):
# Check against other custom statuses
existing_names = [
s.get('name', '').lower()
for i, s in enumerate(custom_statuses_data)
if isinstance(s, dict) and i != status_index
]
# Check against system statuses
system_names = [s['name'].lower() for s in SYSTEM_TASK_STATUSES]
if status_update.name.lower() in existing_names:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Status with name '{status_update.name}' already exists in this project"
)
if status_update.name.lower() in system_names:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Status name '{status_update.name}' conflicts with a system status"
)
# Update name
status_to_update['name'] = status_update.name
# Update color if provided
if status_update.color is not None:
status_to_update['color'] = status_update.color
# Handle is_default flag
if status_update.is_default is not None:
if status_update.is_default:
# If setting as default, unset other default statuses
for status_data in custom_statuses_data:
if isinstance(status_data, dict):
status_data['is_default'] = False
# Set this status as default
status_to_update['is_default'] = True
else:
# Just unset this status as default
status_to_update['is_default'] = False
# Update the status in the list
custom_statuses_data[status_index] = status_to_update
db_project.custom_task_statuses = custom_statuses_data
# Use flag_modified for JSON column updates
flag_modified(db_project, 'custom_task_statuses')
try:
db.commit()
db.refresh(db_project)
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to update custom task status: {str(e)}"
)
# Build response with updated status and all statuses
updated_status = CustomTaskStatus(**status_to_update)
# Build all statuses for response
system_statuses = [
SystemTaskStatus(
id=status["id"],
name=status["name"],
color=status["color"],
is_system=True
)
for status in SYSTEM_TASK_STATUSES
]
custom_statuses = [CustomTaskStatus(**s) for s in custom_statuses_data if isinstance(s, dict)]
# Find default status
default_status_id = "not_started"
for status_data in custom_statuses_data:
if isinstance(status_data, dict) and status_data.get('is_default', False):
default_status_id = status_data.get('id')
break
all_statuses = AllTaskStatusesResponse(
statuses=custom_statuses,
system_statuses=system_statuses,
default_status_id=default_status_id
)
return CustomTaskStatusResponse(
message=f"Custom task status updated successfully",
status=updated_status,
all_statuses=all_statuses
)
@router.delete("/{project_id}/task-statuses/{status_id}")
async def delete_custom_task_status(
project_id: int,
status_id: str,
reassign_to_status_id: Optional[str] = Query(None, description="Status ID to reassign tasks to"),
db: Session = Depends(get_db),
current_user: User = Depends(require_coordinator_or_admin)
):
"""Delete a custom task status"""
from schemas.custom_task_status import (
CustomTaskStatus,
CustomTaskStatusResponse,
AllTaskStatusesResponse,
SystemTaskStatus,
TaskStatusInUseError
)
from models.task import Task
# Verify project exists
db_project = db.query(Project).filter(Project.id == project_id).first()
if not db_project:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Project not found"
)
# Get existing custom statuses
custom_statuses_data = db_project.custom_task_statuses or []
if isinstance(custom_statuses_data, str):
try:
custom_statuses_data = json.loads(custom_statuses_data)
except (json.JSONDecodeError, TypeError):
custom_statuses_data = []
# Find the status to delete
status_to_delete = None
status_index = None
for idx, status_data in enumerate(custom_statuses_data):
if isinstance(status_data, dict) and status_data.get('id') == status_id:
status_to_delete = status_data
status_index = idx
break
if status_to_delete is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Custom task status with ID '{status_id}' not found"
)
# Prevent deletion of last status
if len(custom_statuses_data) == 1:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Cannot delete the last custom status. At least one custom status must remain."
)
# Check if status is in use by any tasks
tasks_using_status = db.query(Task).filter(
Task.project_id == project_id,
Task.status == status_id
).all()
if tasks_using_status:
task_ids = [task.id for task in tasks_using_status]
# If status is in use and no reassignment provided, return error
if reassign_to_status_id is None:
error_response = TaskStatusInUseError(
error=f"Cannot delete status '{status_to_delete.get('name')}' because it is currently in use by {len(tasks_using_status)} task(s)",
status_id=status_id,
status_name=status_to_delete.get('name', 'Unknown'),
task_count=len(tasks_using_status),
task_ids=task_ids
)
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=error_response.dict()
)
# Validate reassignment status exists
reassign_status_exists = False
# Check if it's a system status
system_status_ids = [s['id'] for s in SYSTEM_TASK_STATUSES]
if reassign_to_status_id in system_status_ids:
reassign_status_exists = True
else:
# Check if it's a custom status (and not the one being deleted)
for status_data in custom_statuses_data:
if isinstance(status_data, dict) and status_data.get('id') == reassign_to_status_id:
if status_data.get('id') != status_id:
reassign_status_exists = True
break
if not reassign_status_exists:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Reassignment status ID '{reassign_to_status_id}' not found or is the same as the status being deleted"
)
# Reassign all tasks to the new status
for task in tasks_using_status:
task.status = reassign_to_status_id
# Check if deleting default status
was_default = status_to_delete.get('is_default', False)
# Remove the status from the list
custom_statuses_data.pop(status_index)
# If we deleted the default status, auto-assign new default
if was_default and len(custom_statuses_data) > 0:
# Set the first remaining custom status as default
custom_statuses_data[0]['is_default'] = True
# Update the project
db_project.custom_task_statuses = custom_statuses_data
# Use flag_modified for JSON column updates
flag_modified(db_project, 'custom_task_statuses')
try:
db.commit()
db.refresh(db_project)
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to delete custom task status: {str(e)}"
)
# Build response with all remaining statuses
system_statuses = [
SystemTaskStatus(
id=status["id"],
name=status["name"],
color=status["color"],
is_system=True
)
for status in SYSTEM_TASK_STATUSES
]
custom_statuses = [CustomTaskStatus(**s) for s in custom_statuses_data if isinstance(s, dict)]
# Find default status
default_status_id = "not_started" # Default to system status
for status_data in custom_statuses_data:
if isinstance(status_data, dict) and status_data.get('is_default', False):
default_status_id = status_data.get('id')
break
all_statuses = AllTaskStatusesResponse(
statuses=custom_statuses,
system_statuses=system_statuses,
default_status_id=default_status_id
)
message = f"Custom task status '{status_to_delete.get('name')}' deleted successfully"
if tasks_using_status:
message += f" and {len(tasks_using_status)} task(s) reassigned"
return CustomTaskStatusResponse(
message=message,
status=None,
all_statuses=all_statuses
)
@router.patch("/{project_id}/task-statuses/reorder")
async def reorder_custom_task_statuses(
project_id: int,
reorder_data: dict,
db: Session = Depends(get_db),
current_user: User = Depends(require_coordinator_or_admin)
):
"""Reorder custom task statuses"""
from schemas.custom_task_status import (
CustomTaskStatusReorder,
CustomTaskStatus,
CustomTaskStatusResponse,
AllTaskStatusesResponse,
SystemTaskStatus
)
# Validate input
try:
reorder_request = CustomTaskStatusReorder(**reorder_data)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=str(e)
)
# Verify project exists
db_project = db.query(Project).filter(Project.id == project_id).first()
if not db_project:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Project not found"
)
# Get existing custom statuses
custom_statuses_data = db_project.custom_task_statuses or []
if isinstance(custom_statuses_data, str):
try:
custom_statuses_data = json.loads(custom_statuses_data)
except (json.JSONDecodeError, TypeError):
custom_statuses_data = []
# Validate all status IDs are present
existing_status_ids = {s.get('id') for s in custom_statuses_data if isinstance(s, dict)}
provided_status_ids = set(reorder_request.status_ids)
# Check if all provided IDs exist
missing_ids = provided_status_ids - existing_status_ids
if missing_ids:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Status IDs not found: {', '.join(missing_ids)}"
)
# Check if all existing IDs are provided
extra_ids = existing_status_ids - provided_status_ids
if extra_ids:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Missing status IDs in reorder request: {', '.join(extra_ids)}"
)
# Create a mapping of status_id to status data
status_map = {
s.get('id'): s
for s in custom_statuses_data
if isinstance(s, dict)
}
# Reorder statuses and update order field
reordered_statuses = []
for order, status_id in enumerate(reorder_request.status_ids):
status_data = status_map[status_id].copy()
status_data['order'] = order
reordered_statuses.append(status_data)
# Update the project with reordered statuses
db_project.custom_task_statuses = reordered_statuses
# Use flag_modified for JSON column updates
flag_modified(db_project, 'custom_task_statuses')
try:
db.commit()
db.refresh(db_project)
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to reorder custom task statuses: {str(e)}"
)
# Build response with reordered statuses
system_statuses = [
SystemTaskStatus(
id=status["id"],
name=status["name"],
color=status["color"],
is_system=True
)
for status in SYSTEM_TASK_STATUSES
]
custom_statuses = [CustomTaskStatus(**s) for s in reordered_statuses if isinstance(s, dict)]
# Find default status
default_status_id = "not_started" # Default to system status
for status_data in reordered_statuses:
if isinstance(status_data, dict) and status_data.get('is_default', False):
default_status_id = status_data.get('id')
break
all_statuses = AllTaskStatusesResponse(
statuses=custom_statuses,
system_statuses=system_statuses,
default_status_id=default_status_id
)
return CustomTaskStatusResponse(
message="Custom task statuses reordered successfully",
status=None,
all_statuses=all_statuses
)