LinkDesk/backend/services/recovery_service.py

1444 lines
55 KiB
Python

"""
Recovery Service
This service handles the recovery of soft-deleted shots and assets, restoring them
and all their related data to active status.
All operations are performed within database transactions to ensure atomicity.
"""
from datetime import datetime
from typing import List, Dict, Any, Optional
from sqlalchemy.orm import Session
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy import func
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from models.shot import Shot
from models.asset import Asset
from models.task import Task, Submission, Review, ProductionNote, TaskAttachment
from models.user import User
from models.activity import Activity, ActivityType
from utils.activity import ActivityService
from utils.file_handler import file_handler
class DeletedShot:
"""Information about a deleted shot."""
def __init__(self):
self.id: int = 0
self.name: str = ""
self.episode_name: str = ""
self.project_id: int = 0
self.project_name: str = ""
self.deleted_at: str = ""
self.deleted_by: int = 0
self.deleted_by_name: str = ""
self.task_count: int = 0
self.submission_count: int = 0
self.attachment_count: int = 0
self.note_count: int = 0
self.review_count: int = 0
class DeletedAsset:
"""Information about a deleted asset."""
def __init__(self):
self.id: int = 0
self.name: str = ""
self.category: str = ""
self.project_id: int = 0
self.project_name: str = ""
self.deleted_at: str = ""
self.deleted_by: int = 0
self.deleted_by_name: str = ""
self.task_count: int = 0
self.submission_count: int = 0
self.attachment_count: int = 0
self.note_count: int = 0
self.review_count: int = 0
class RecoveryInfo:
"""Information about what will be recovered."""
def __init__(self):
self.shot_id: Optional[int] = None
self.asset_id: Optional[int] = None
self.name: str = ""
self.episode_name: Optional[str] = None
self.project_id: Optional[int] = None
self.project_name: str = ""
# Counts of items that will be recovered
self.task_count: int = 0
self.submission_count: int = 0
self.attachment_count: int = 0
self.note_count: int = 0
self.review_count: int = 0
# Deletion information
self.deleted_at: str = ""
self.deleted_by: int = 0
self.deleted_by_name: str = ""
# File status
self.files_preserved: bool = True
self.file_count: int = 0
class RecoveryResult:
"""Result of a recovery operation."""
def __init__(self):
self.success: bool = False
self.shot_id: Optional[int] = None
self.asset_id: Optional[int] = None
self.name: str = ""
# Recovery results
self.recovered_tasks: int = 0
self.recovered_submissions: int = 0
self.recovered_attachments: int = 0
self.recovered_notes: int = 0
self.recovered_reviews: int = 0
# Timing
self.operation_duration: float = 0.0
self.recovered_at: str = ""
self.recovered_by: int = 0
# Errors
self.errors: List[str] = []
self.warnings: List[str] = []
class PermanentDeleteResult:
"""Result of a permanent deletion operation."""
def __init__(self):
self.success: bool = False
self.shot_id: Optional[int] = None
self.asset_id: Optional[int] = None
self.name: str = ""
# Deletion results
self.deleted_tasks: int = 0
self.deleted_submissions: int = 0
self.deleted_attachments: int = 0
self.deleted_notes: int = 0
self.deleted_reviews: int = 0
self.deleted_files: int = 0
self.database_records_deleted: int = 0
# Timing
self.operation_duration: float = 0.0
self.deleted_at: str = ""
self.deleted_by: int = 0
# Errors
self.errors: List[str] = []
self.warnings: List[str] = []
class BulkPermanentDeleteResult:
"""Result of a bulk permanent deletion operation."""
def __init__(self):
self.total_items: int = 0
self.successful_deletions: int = 0
self.failed_deletions: int = 0
self.deleted_items: List[Dict[str, Any]] = []
self.errors: List[Dict[str, str]] = []
self.files_deleted: int = 0
self.database_records_deleted: int = 0
class RecoveryService:
"""Service for handling recovery of soft-deleted shots and assets."""
def __init__(self):
self.activity_service = ActivityService()
def get_deleted_shots(self, project_id: Optional[int], db: Session) -> List[DeletedShot]:
"""
Get list of deleted shots, optionally filtered by project.
Args:
project_id: Optional project ID to filter by
db: Database session
Returns:
List of DeletedShot objects
"""
try:
query = db.query(Shot).filter(Shot.deleted_at.isnot(None))
if project_id:
# Filter by project through episode relationship
query = query.join(Shot.episode).filter(
Shot.episode.has(project_id=project_id)
)
deleted_shots = query.all()
result = []
for shot in deleted_shots:
deleted_shot = DeletedShot()
deleted_shot.id = shot.id
deleted_shot.name = shot.name
deleted_shot.episode_name = shot.episode.name if shot.episode else ""
deleted_shot.project_id = shot.project_id
deleted_shot.project_name = shot.project.name if shot.project else ""
deleted_shot.deleted_at = shot.deleted_at.isoformat() if shot.deleted_at else ""
deleted_shot.deleted_by = shot.deleted_by or 0
deleted_shot.deleted_by_name = (
f"{shot.deleted_by_user.first_name} {shot.deleted_by_user.last_name}"
if shot.deleted_by_user else "Unknown"
)
# Count related deleted items
self._count_deleted_items_for_shot(shot.id, deleted_shot, db)
result.append(deleted_shot)
return result
except SQLAlchemyError as e:
raise Exception(f"Database error while getting deleted shots: {str(e)}")
def get_deleted_assets(self, project_id: Optional[int], db: Session) -> List[DeletedAsset]:
"""
Get list of deleted assets, optionally filtered by project.
Args:
project_id: Optional project ID to filter by
db: Database session
Returns:
List of DeletedAsset objects
"""
try:
query = db.query(Asset).filter(Asset.deleted_at.isnot(None))
if project_id:
query = query.filter(Asset.project_id == project_id)
deleted_assets = query.all()
result = []
for asset in deleted_assets:
deleted_asset = DeletedAsset()
deleted_asset.id = asset.id
deleted_asset.name = asset.name
deleted_asset.category = asset.category.value if asset.category else ""
deleted_asset.project_id = asset.project_id
deleted_asset.project_name = asset.project.name if asset.project else ""
deleted_asset.deleted_at = asset.deleted_at.isoformat() if asset.deleted_at else ""
deleted_asset.deleted_by = asset.deleted_by or 0
deleted_asset.deleted_by_name = (
f"{asset.deleted_by_user.first_name} {asset.deleted_by_user.last_name}"
if asset.deleted_by_user else "Unknown"
)
# Count related deleted items
self._count_deleted_items_for_asset(asset.id, deleted_asset, db)
result.append(deleted_asset)
return result
except SQLAlchemyError as e:
raise Exception(f"Database error while getting deleted assets: {str(e)}")
def preview_shot_recovery(self, shot_id: int, db: Session) -> Optional[RecoveryInfo]:
"""
Preview what will be recovered when a shot is restored.
Args:
shot_id: ID of the deleted shot
db: Database session
Returns:
RecoveryInfo object or None if shot not found
"""
try:
# Get the deleted shot
shot = db.query(Shot).filter(
Shot.id == shot_id,
Shot.deleted_at.isnot(None)
).first()
if not shot:
return None
info = RecoveryInfo()
info.shot_id = shot.id
info.name = shot.name
info.episode_name = shot.episode.name if shot.episode else None
info.project_id = shot.project_id
info.project_name = shot.project.name if shot.project else ""
info.deleted_at = shot.deleted_at.isoformat() if shot.deleted_at else ""
info.deleted_by = shot.deleted_by or 0
info.deleted_by_name = (
f"{shot.deleted_by_user.first_name} {shot.deleted_by_user.last_name}"
if shot.deleted_by_user else "Unknown"
)
# Count deleted items that will be recovered
self._count_recoverable_items_for_shot(shot_id, info, db)
return info
except SQLAlchemyError as e:
raise Exception(f"Database error while previewing shot recovery: {str(e)}")
def preview_asset_recovery(self, asset_id: int, db: Session) -> Optional[RecoveryInfo]:
"""
Preview what will be recovered when an asset is restored.
Args:
asset_id: ID of the deleted asset
db: Database session
Returns:
RecoveryInfo object or None if asset not found
"""
try:
# Get the deleted asset
asset = db.query(Asset).filter(
Asset.id == asset_id,
Asset.deleted_at.isnot(None)
).first()
if not asset:
return None
info = RecoveryInfo()
info.asset_id = asset.id
info.name = asset.name
info.project_name = asset.project.name if asset.project else ""
info.deleted_at = asset.deleted_at.isoformat() if asset.deleted_at else ""
info.deleted_by = asset.deleted_by or 0
info.deleted_by_name = (
f"{asset.deleted_by_user.first_name} {asset.deleted_by_user.last_name}"
if asset.deleted_by_user else "Unknown"
)
# Count deleted items that will be recovered
self._count_recoverable_items_for_asset(asset_id, info, db)
return info
except SQLAlchemyError as e:
raise Exception(f"Database error while previewing asset recovery: {str(e)}")
def recover_shot(self, shot_id: int, db: Session, current_user: User) -> RecoveryResult:
"""
Recover a soft-deleted shot and all its related data.
Args:
shot_id: ID of the shot to recover
db: Database session
current_user: User performing the recovery
Returns:
RecoveryResult with operation details
"""
start_time = datetime.utcnow()
result = RecoveryResult()
result.shot_id = shot_id
result.recovered_by = current_user.id
try:
# Get the deleted shot
shot = db.query(Shot).filter(
Shot.id == shot_id,
Shot.deleted_at.isnot(None)
).first()
if not shot:
result.errors.append("Shot not found or not deleted")
return result
result.name = shot.name
recovered_at = datetime.utcnow()
result.recovered_at = recovered_at.isoformat()
# Get recovery info for logging
recovery_info = self.preview_shot_recovery(shot_id, db)
# Recover related data
self._recover_related_data_for_shot(shot_id, db, result)
# Recover the shot
shot.deleted_at = None
shot.deleted_by = None
db.flush()
# Log the recovery activity
self._log_shot_recovery(shot, current_user, recovery_info, db)
result.success = True
except SQLAlchemyError as e:
result.errors.append(f"Database error during recovery: {str(e)}")
except Exception as e:
result.errors.append(f"Unexpected error during recovery: {str(e)}")
# Calculate operation duration
end_time = datetime.utcnow()
result.operation_duration = (end_time - start_time).total_seconds()
return result
def recover_asset(self, asset_id: int, db: Session, current_user: User) -> RecoveryResult:
"""
Recover a soft-deleted asset and all its related data.
Args:
asset_id: ID of the asset to recover
db: Database session
current_user: User performing the recovery
Returns:
RecoveryResult with operation details
"""
start_time = datetime.utcnow()
result = RecoveryResult()
result.asset_id = asset_id
result.recovered_by = current_user.id
try:
# Get the deleted asset
asset = db.query(Asset).filter(
Asset.id == asset_id,
Asset.deleted_at.isnot(None)
).first()
if not asset:
result.errors.append("Asset not found or not deleted")
return result
result.name = asset.name
recovered_at = datetime.utcnow()
result.recovered_at = recovered_at.isoformat()
# Get recovery info for logging
recovery_info = self.preview_asset_recovery(asset_id, db)
# Recover related data
self._recover_related_data_for_asset(asset_id, db, result)
# Recover the asset
asset.deleted_at = None
asset.deleted_by = None
db.flush()
# Log the recovery activity
self._log_asset_recovery(asset, current_user, recovery_info, db)
result.success = True
except SQLAlchemyError as e:
result.errors.append(f"Database error during recovery: {str(e)}")
except Exception as e:
result.errors.append(f"Unexpected error during recovery: {str(e)}")
# Calculate operation duration
end_time = datetime.utcnow()
result.operation_duration = (end_time - start_time).total_seconds()
return result
def _count_deleted_items_for_shot(self, shot_id: int, deleted_shot: DeletedShot, db: Session) -> None:
"""Count deleted items related to a shot."""
try:
# Count deleted tasks
deleted_shot.task_count = db.query(Task).filter(
Task.shot_id == shot_id,
Task.deleted_at.isnot(None)
).count()
# Get task IDs for counting related items
task_ids = [t.id for t in db.query(Task.id).filter(
Task.shot_id == shot_id,
Task.deleted_at.isnot(None)
).all()]
if task_ids:
# Count deleted submissions
deleted_shot.submission_count = db.query(Submission).filter(
Submission.task_id.in_(task_ids),
Submission.deleted_at.isnot(None)
).count()
# Count deleted attachments
deleted_shot.attachment_count = db.query(TaskAttachment).filter(
TaskAttachment.task_id.in_(task_ids),
TaskAttachment.deleted_at.isnot(None)
).count()
# Count deleted notes
deleted_shot.note_count = db.query(ProductionNote).filter(
ProductionNote.task_id.in_(task_ids),
ProductionNote.deleted_at.isnot(None)
).count()
# Count deleted reviews
submission_ids = [s.id for s in db.query(Submission.id).filter(
Submission.task_id.in_(task_ids),
Submission.deleted_at.isnot(None)
).all()]
if submission_ids:
deleted_shot.review_count = db.query(Review).filter(
Review.submission_id.in_(submission_ids),
Review.deleted_at.isnot(None)
).count()
except SQLAlchemyError:
pass # Ignore errors in counting
def _count_deleted_items_for_asset(self, asset_id: int, deleted_asset: DeletedAsset, db: Session) -> None:
"""Count deleted items related to an asset."""
try:
# Count deleted tasks
deleted_asset.task_count = db.query(Task).filter(
Task.asset_id == asset_id,
Task.deleted_at.isnot(None)
).count()
# Get task IDs for counting related items
task_ids = [t.id for t in db.query(Task.id).filter(
Task.asset_id == asset_id,
Task.deleted_at.isnot(None)
).all()]
if task_ids:
# Count deleted submissions
deleted_asset.submission_count = db.query(Submission).filter(
Submission.task_id.in_(task_ids),
Submission.deleted_at.isnot(None)
).count()
# Count deleted attachments
deleted_asset.attachment_count = db.query(TaskAttachment).filter(
TaskAttachment.task_id.in_(task_ids),
TaskAttachment.deleted_at.isnot(None)
).count()
# Count deleted notes
deleted_asset.note_count = db.query(ProductionNote).filter(
ProductionNote.task_id.in_(task_ids),
ProductionNote.deleted_at.isnot(None)
).count()
# Count deleted reviews
submission_ids = [s.id for s in db.query(Submission.id).filter(
Submission.task_id.in_(task_ids),
Submission.deleted_at.isnot(None)
).all()]
if submission_ids:
deleted_asset.review_count = db.query(Review).filter(
Review.submission_id.in_(submission_ids),
Review.deleted_at.isnot(None)
).count()
except SQLAlchemyError:
pass # Ignore errors in counting
def _count_recoverable_items_for_shot(self, shot_id: int, info: RecoveryInfo, db: Session) -> None:
"""Count items that will be recovered for a shot."""
try:
# Count deleted tasks
info.task_count = db.query(Task).filter(
Task.shot_id == shot_id,
Task.deleted_at.isnot(None)
).count()
# Get task IDs for counting related items
task_ids = [t.id for t in db.query(Task.id).filter(
Task.shot_id == shot_id,
Task.deleted_at.isnot(None)
).all()]
if task_ids:
# Count deleted submissions
info.submission_count = db.query(Submission).filter(
Submission.task_id.in_(task_ids),
Submission.deleted_at.isnot(None)
).count()
# Count deleted attachments
info.attachment_count = db.query(TaskAttachment).filter(
TaskAttachment.task_id.in_(task_ids),
TaskAttachment.deleted_at.isnot(None)
).count()
# Count deleted notes
info.note_count = db.query(ProductionNote).filter(
ProductionNote.task_id.in_(task_ids),
ProductionNote.deleted_at.isnot(None)
).count()
# Count deleted reviews
submission_ids = [s.id for s in db.query(Submission.id).filter(
Submission.task_id.in_(task_ids),
Submission.deleted_at.isnot(None)
).all()]
if submission_ids:
info.review_count = db.query(Review).filter(
Review.submission_id.in_(submission_ids),
Review.deleted_at.isnot(None)
).count()
# Count files (attachments + submissions)
info.file_count = info.attachment_count + info.submission_count
except SQLAlchemyError:
pass # Ignore errors in counting
def _count_recoverable_items_for_asset(self, asset_id: int, info: RecoveryInfo, db: Session) -> None:
"""Count items that will be recovered for an asset."""
try:
# Count deleted tasks
info.task_count = db.query(Task).filter(
Task.asset_id == asset_id,
Task.deleted_at.isnot(None)
).count()
# Get task IDs for counting related items
task_ids = [t.id for t in db.query(Task.id).filter(
Task.asset_id == asset_id,
Task.deleted_at.isnot(None)
).all()]
if task_ids:
# Count deleted submissions
info.submission_count = db.query(Submission).filter(
Submission.task_id.in_(task_ids),
Submission.deleted_at.isnot(None)
).count()
# Count deleted attachments
info.attachment_count = db.query(TaskAttachment).filter(
TaskAttachment.task_id.in_(task_ids),
TaskAttachment.deleted_at.isnot(None)
).count()
# Count deleted notes
info.note_count = db.query(ProductionNote).filter(
ProductionNote.task_id.in_(task_ids),
ProductionNote.deleted_at.isnot(None)
).count()
# Count deleted reviews
submission_ids = [s.id for s in db.query(Submission.id).filter(
Submission.task_id.in_(task_ids),
Submission.deleted_at.isnot(None)
).all()]
if submission_ids:
info.review_count = db.query(Review).filter(
Review.submission_id.in_(submission_ids),
Review.deleted_at.isnot(None)
).count()
# Count files (attachments + submissions)
info.file_count = info.attachment_count + info.submission_count
except SQLAlchemyError:
pass # Ignore errors in counting
def _recover_related_data_for_shot(self, shot_id: int, db: Session, result: RecoveryResult) -> None:
"""Recover all data related to a shot."""
# Get deleted task IDs
task_ids = [t.id for t in db.query(Task.id).filter(
Task.shot_id == shot_id,
Task.deleted_at.isnot(None)
).all()]
if not task_ids:
return
# Recover tasks
result.recovered_tasks = db.query(Task).filter(
Task.shot_id == shot_id,
Task.deleted_at.isnot(None)
).update({
Task.deleted_at: None,
Task.deleted_by: None
}, synchronize_session=False)
# Recover submissions
result.recovered_submissions = db.query(Submission).filter(
Submission.task_id.in_(task_ids),
Submission.deleted_at.isnot(None)
).update({
Submission.deleted_at: None,
Submission.deleted_by: None
}, synchronize_session=False)
# Recover attachments
result.recovered_attachments = db.query(TaskAttachment).filter(
TaskAttachment.task_id.in_(task_ids),
TaskAttachment.deleted_at.isnot(None)
).update({
TaskAttachment.deleted_at: None,
TaskAttachment.deleted_by: None
}, synchronize_session=False)
# Recover production notes
result.recovered_notes = db.query(ProductionNote).filter(
ProductionNote.task_id.in_(task_ids),
ProductionNote.deleted_at.isnot(None)
).update({
ProductionNote.deleted_at: None,
ProductionNote.deleted_by: None
}, synchronize_session=False)
# Recover reviews
submission_ids = [s.id for s in db.query(Submission.id).filter(
Submission.task_id.in_(task_ids)
).all()]
if submission_ids:
result.recovered_reviews = db.query(Review).filter(
Review.submission_id.in_(submission_ids),
Review.deleted_at.isnot(None)
).update({
Review.deleted_at: None,
Review.deleted_by: None
}, synchronize_session=False)
db.flush()
def _recover_related_data_for_asset(self, asset_id: int, db: Session, result: RecoveryResult) -> None:
"""Recover all data related to an asset."""
# Get deleted task IDs
task_ids = [t.id for t in db.query(Task.id).filter(
Task.asset_id == asset_id,
Task.deleted_at.isnot(None)
).all()]
if not task_ids:
return
# Recover tasks
result.recovered_tasks = db.query(Task).filter(
Task.asset_id == asset_id,
Task.deleted_at.isnot(None)
).update({
Task.deleted_at: None,
Task.deleted_by: None
}, synchronize_session=False)
# Recover submissions
result.recovered_submissions = db.query(Submission).filter(
Submission.task_id.in_(task_ids),
Submission.deleted_at.isnot(None)
).update({
Submission.deleted_at: None,
Submission.deleted_by: None
}, synchronize_session=False)
# Recover attachments
result.recovered_attachments = db.query(TaskAttachment).filter(
TaskAttachment.task_id.in_(task_ids),
TaskAttachment.deleted_at.isnot(None)
).update({
TaskAttachment.deleted_at: None,
TaskAttachment.deleted_by: None
}, synchronize_session=False)
# Recover production notes
result.recovered_notes = db.query(ProductionNote).filter(
ProductionNote.task_id.in_(task_ids),
ProductionNote.deleted_at.isnot(None)
).update({
ProductionNote.deleted_at: None,
ProductionNote.deleted_by: None
}, synchronize_session=False)
# Recover reviews
submission_ids = [s.id for s in db.query(Submission.id).filter(
Submission.task_id.in_(task_ids)
).all()]
if submission_ids:
result.recovered_reviews = db.query(Review).filter(
Review.submission_id.in_(submission_ids),
Review.deleted_at.isnot(None)
).update({
Review.deleted_at: None,
Review.deleted_by: None
}, synchronize_session=False)
db.flush()
def _log_shot_recovery(self, shot: Shot, current_user: User, recovery_info: Optional[RecoveryInfo], db: Session) -> None:
"""Log the shot recovery activity."""
try:
activity = Activity(
type=ActivityType.SHOT_UPDATED, # We'll use SHOT_UPDATED for now, could add SHOT_RECOVERED later
user_id=current_user.id,
project_id=shot.project_id,
shot_id=shot.id,
description=f"Shot '{shot.name}' was recovered by {current_user.first_name} {current_user.last_name}",
activity_metadata={
'action': 'recover',
'shot_name': shot.name,
'episode_name': shot.episode.name if shot.episode else None,
'project_id': shot.project_id,
'project_name': shot.project.name if shot.project else None,
'recovered_counts': {
'tasks': recovery_info.task_count if recovery_info else 0,
'submissions': recovery_info.submission_count if recovery_info else 0,
'attachments': recovery_info.attachment_count if recovery_info else 0,
'notes': recovery_info.note_count if recovery_info else 0,
'reviews': recovery_info.review_count if recovery_info else 0
}
}
)
db.add(activity)
db.flush()
except SQLAlchemyError:
pass # Don't fail recovery if logging fails
def _log_asset_recovery(self, asset: Asset, current_user: User, recovery_info: Optional[RecoveryInfo], db: Session) -> None:
"""Log the asset recovery activity."""
try:
activity = Activity(
type=ActivityType.ASSET_UPDATED, # We'll use ASSET_UPDATED for now, could add ASSET_RECOVERED later
user_id=current_user.id,
project_id=asset.project_id,
asset_id=asset.id,
description=f"Asset '{asset.name}' was recovered by {current_user.first_name} {current_user.last_name}",
activity_metadata={
'action': 'recover',
'asset_name': asset.name,
'asset_category': asset.category.value if asset.category else None,
'project_name': asset.project.name if asset.project else None,
'recovered_counts': {
'tasks': recovery_info.task_count if recovery_info else 0,
'submissions': recovery_info.submission_count if recovery_info else 0,
'attachments': recovery_info.attachment_count if recovery_info else 0,
'notes': recovery_info.note_count if recovery_info else 0,
'reviews': recovery_info.review_count if recovery_info else 0
}
}
)
db.add(activity)
db.flush()
except SQLAlchemyError:
pass # Don't fail recovery if logging fails
def permanent_delete_shot(self, shot_id: int, db: Session, current_user: User) -> PermanentDeleteResult:
"""
Permanently delete a soft-deleted shot and all its related data.
Args:
shot_id: ID of the shot to permanently delete
db: Database session
current_user: User performing the deletion
Returns:
PermanentDeleteResult with operation details
"""
start_time = datetime.utcnow()
result = PermanentDeleteResult()
result.shot_id = shot_id
result.deleted_by = current_user.id
try:
# Get the deleted shot
shot = db.query(Shot).filter(
Shot.id == shot_id,
Shot.deleted_at.isnot(None)
).first()
if not shot:
result.errors.append("Shot not found or not deleted")
return result
result.name = shot.name
deleted_at = datetime.utcnow()
result.deleted_at = deleted_at.isoformat()
# Get all related data before deletion for counting and file cleanup
tasks = db.query(Task).filter(Task.shot_id == shot_id).all()
task_ids = [task.id for task in tasks]
# Collect file paths for cleanup
file_paths = []
if task_ids:
# Get submissions and their file paths
submissions = db.query(Submission).filter(Submission.task_id.in_(task_ids)).all()
for submission in submissions:
if submission.file_path:
file_paths.append(submission.file_path)
# Get attachments and their file paths
attachments = db.query(TaskAttachment).filter(TaskAttachment.task_id.in_(task_ids)).all()
for attachment in attachments:
if attachment.file_path:
file_paths.append(attachment.file_path)
# Get submission IDs for reviews
submission_ids = [s.id for s in submissions]
# Count items before deletion
result.deleted_tasks = len(tasks)
result.deleted_submissions = len(submissions)
result.deleted_attachments = len(attachments)
result.deleted_notes = db.query(ProductionNote).filter(ProductionNote.task_id.in_(task_ids)).count()
if submission_ids:
result.deleted_reviews = db.query(Review).filter(Review.submission_id.in_(submission_ids)).count()
# Delete reviews first (they reference submissions)
if submission_ids:
db.query(Review).filter(Review.submission_id.in_(submission_ids)).delete(synchronize_session=False)
# Delete production notes
db.query(ProductionNote).filter(ProductionNote.task_id.in_(task_ids)).delete(synchronize_session=False)
# Delete attachments
db.query(TaskAttachment).filter(TaskAttachment.task_id.in_(task_ids)).delete(synchronize_session=False)
# Delete submissions
db.query(Submission).filter(Submission.task_id.in_(task_ids)).delete(synchronize_session=False)
# Delete tasks
db.query(Task).filter(Task.shot_id == shot_id).delete(synchronize_session=False)
# Delete activity records related to this shot
db.query(Activity).filter(Activity.shot_id == shot_id).delete(synchronize_session=False)
# Delete the shot itself
db.query(Shot).filter(Shot.id == shot_id).delete(synchronize_session=False)
# Calculate total database records deleted
result.database_records_deleted = (
result.deleted_tasks + result.deleted_submissions +
result.deleted_attachments + result.deleted_notes +
result.deleted_reviews + 1 # +1 for the shot itself
)
# Commit database changes first
db.flush()
# Clean up files from filesystem
files_deleted = 0
for file_path in file_paths:
try:
if file_handler.delete_file(file_path):
files_deleted += 1
except Exception as e:
result.warnings.append(f"Failed to delete file {file_path}: {str(e)}")
result.deleted_files = files_deleted
# Log the permanent deletion activity
self._log_shot_permanent_deletion(shot, current_user, result, db)
result.success = True
except SQLAlchemyError as e:
result.errors.append(f"Database error during permanent deletion: {str(e)}")
db.rollback()
except Exception as e:
result.errors.append(f"Unexpected error during permanent deletion: {str(e)}")
db.rollback()
# Calculate operation duration
end_time = datetime.utcnow()
result.operation_duration = (end_time - start_time).total_seconds()
return result
def permanent_delete_asset(self, asset_id: int, db: Session, current_user: User) -> PermanentDeleteResult:
"""
Permanently delete a soft-deleted asset and all its related data.
Args:
asset_id: ID of the asset to permanently delete
db: Database session
current_user: User performing the deletion
Returns:
PermanentDeleteResult with operation details
"""
start_time = datetime.utcnow()
result = PermanentDeleteResult()
result.asset_id = asset_id
result.deleted_by = current_user.id
try:
# Get the deleted asset
asset = db.query(Asset).filter(
Asset.id == asset_id,
Asset.deleted_at.isnot(None)
).first()
if not asset:
result.errors.append("Asset not found or not deleted")
return result
result.name = asset.name
deleted_at = datetime.utcnow()
result.deleted_at = deleted_at.isoformat()
# Get all related data before deletion for counting and file cleanup
tasks = db.query(Task).filter(Task.asset_id == asset_id).all()
task_ids = [task.id for task in tasks]
# Collect file paths for cleanup
file_paths = []
if task_ids:
# Get submissions and their file paths
submissions = db.query(Submission).filter(Submission.task_id.in_(task_ids)).all()
for submission in submissions:
if submission.file_path:
file_paths.append(submission.file_path)
# Get attachments and their file paths
attachments = db.query(TaskAttachment).filter(TaskAttachment.task_id.in_(task_ids)).all()
for attachment in attachments:
if attachment.file_path:
file_paths.append(attachment.file_path)
# Get submission IDs for reviews
submission_ids = [s.id for s in submissions]
# Count items before deletion
result.deleted_tasks = len(tasks)
result.deleted_submissions = len(submissions)
result.deleted_attachments = len(attachments)
result.deleted_notes = db.query(ProductionNote).filter(ProductionNote.task_id.in_(task_ids)).count()
if submission_ids:
result.deleted_reviews = db.query(Review).filter(Review.submission_id.in_(submission_ids)).count()
# Delete reviews first (they reference submissions)
if submission_ids:
db.query(Review).filter(Review.submission_id.in_(submission_ids)).delete(synchronize_session=False)
# Delete production notes
db.query(ProductionNote).filter(ProductionNote.task_id.in_(task_ids)).delete(synchronize_session=False)
# Delete attachments
db.query(TaskAttachment).filter(TaskAttachment.task_id.in_(task_ids)).delete(synchronize_session=False)
# Delete submissions
db.query(Submission).filter(Submission.task_id.in_(task_ids)).delete(synchronize_session=False)
# Delete tasks
db.query(Task).filter(Task.asset_id == asset_id).delete(synchronize_session=False)
# Delete activity records related to this asset
db.query(Activity).filter(Activity.asset_id == asset_id).delete(synchronize_session=False)
# Delete the asset itself
db.query(Asset).filter(Asset.id == asset_id).delete(synchronize_session=False)
# Calculate total database records deleted
result.database_records_deleted = (
result.deleted_tasks + result.deleted_submissions +
result.deleted_attachments + result.deleted_notes +
result.deleted_reviews + 1 # +1 for the asset itself
)
# Commit database changes first
db.flush()
# Clean up files from filesystem
files_deleted = 0
for file_path in file_paths:
try:
if file_handler.delete_file(file_path):
files_deleted += 1
except Exception as e:
result.warnings.append(f"Failed to delete file {file_path}: {str(e)}")
result.deleted_files = files_deleted
# Log the permanent deletion activity
self._log_asset_permanent_deletion(asset, current_user, result, db)
result.success = True
except SQLAlchemyError as e:
result.errors.append(f"Database error during permanent deletion: {str(e)}")
db.rollback()
except Exception as e:
result.errors.append(f"Unexpected error during permanent deletion: {str(e)}")
db.rollback()
# Calculate operation duration
end_time = datetime.utcnow()
result.operation_duration = (end_time - start_time).total_seconds()
return result
def bulk_permanent_delete_shots(self, shot_ids: List[int], db: Session, current_user: User) -> BulkPermanentDeleteResult:
"""
Permanently delete multiple shots in bulk.
Args:
shot_ids: List of shot IDs to permanently delete
db: Database session
current_user: User performing the deletion
Returns:
BulkPermanentDeleteResult with operation details
"""
result = BulkPermanentDeleteResult()
result.total_items = len(shot_ids)
for shot_id in shot_ids:
try:
delete_result = self.permanent_delete_shot(shot_id, db, current_user)
if delete_result.success:
result.successful_deletions += 1
result.deleted_items.append({
'id': shot_id,
'name': delete_result.name,
'type': 'shot'
})
result.files_deleted += delete_result.deleted_files
result.database_records_deleted += delete_result.database_records_deleted
else:
result.failed_deletions += 1
result.errors.append({
'id': shot_id,
'error': '; '.join(delete_result.errors) if delete_result.errors else 'Unknown error'
})
except Exception as e:
result.failed_deletions += 1
result.errors.append({
'id': shot_id,
'error': str(e)
})
return result
def bulk_permanent_delete_assets(self, asset_ids: List[int], db: Session, current_user: User) -> BulkPermanentDeleteResult:
"""
Permanently delete multiple assets in bulk.
Args:
asset_ids: List of asset IDs to permanently delete
db: Database session
current_user: User performing the deletion
Returns:
BulkPermanentDeleteResult with operation details
"""
result = BulkPermanentDeleteResult()
result.total_items = len(asset_ids)
for asset_id in asset_ids:
try:
delete_result = self.permanent_delete_asset(asset_id, db, current_user)
if delete_result.success:
result.successful_deletions += 1
result.deleted_items.append({
'id': asset_id,
'name': delete_result.name,
'type': 'asset'
})
result.files_deleted += delete_result.deleted_files
result.database_records_deleted += delete_result.database_records_deleted
else:
result.failed_deletions += 1
result.errors.append({
'id': asset_id,
'error': '; '.join(delete_result.errors) if delete_result.errors else 'Unknown error'
})
except Exception as e:
result.failed_deletions += 1
result.errors.append({
'id': asset_id,
'error': str(e)
})
return result
def _log_shot_permanent_deletion(self, shot: Shot, current_user: User, result: PermanentDeleteResult, db: Session) -> None:
"""Log the shot permanent deletion activity."""
try:
activity = Activity(
type=ActivityType.SHOT_DELETED, # Reuse existing type for permanent deletion
user_id=current_user.id,
project_id=shot.project_id,
description=f"Shot '{shot.name}' was permanently deleted by {current_user.first_name} {current_user.last_name}",
activity_metadata={
'action': 'permanent_delete',
'shot_name': shot.name,
'episode_name': shot.episode.name if shot.episode else None,
'project_id': shot.project_id,
'project_name': shot.project.name if shot.project else None,
'deleted_counts': {
'tasks': result.deleted_tasks,
'submissions': result.deleted_submissions,
'attachments': result.deleted_attachments,
'notes': result.deleted_notes,
'reviews': result.deleted_reviews,
'files': result.deleted_files,
'database_records': result.database_records_deleted
},
'operation_duration': result.operation_duration
}
)
db.add(activity)
db.flush()
except SQLAlchemyError:
pass # Don't fail deletion if logging fails
def _log_asset_permanent_deletion(self, asset: Asset, current_user: User, result: PermanentDeleteResult, db: Session) -> None:
"""Log the asset permanent deletion activity."""
try:
activity = Activity(
type=ActivityType.ASSET_DELETED, # Reuse existing type for permanent deletion
user_id=current_user.id,
project_id=asset.project_id,
description=f"Asset '{asset.name}' was permanently deleted by {current_user.first_name} {current_user.last_name}",
activity_metadata={
'action': 'permanent_delete',
'asset_name': asset.name,
'asset_category': asset.category.value if asset.category else None,
'project_name': asset.project.name if asset.project else None,
'deleted_counts': {
'tasks': result.deleted_tasks,
'submissions': result.deleted_submissions,
'attachments': result.deleted_attachments,
'notes': result.deleted_notes,
'reviews': result.deleted_reviews,
'files': result.deleted_files,
'database_records': result.database_records_deleted
},
'operation_duration': result.operation_duration
}
)
db.add(activity)
db.flush()
except SQLAlchemyError:
pass # Don't fail deletion if logging fails
class BulkRecoveryError:
"""Information about a bulk recovery error."""
def __init__(self):
self.item_id: int = 0
self.item_type: str = ""
self.error: str = ""
class BulkRecoveryResult:
"""Result of a bulk recovery operation."""
def __init__(self):
self.total_items: int = 0
self.successful_recoveries: int = 0
self.failed_recoveries: int = 0
self.results: List[RecoveryResult] = []
self.errors: List[BulkRecoveryError] = []
class RecoveryStats:
"""Statistics about deleted items."""
def __init__(self):
self.deleted_shots_count: int = 0
self.deleted_assets_count: int = 0
self.total_deleted_tasks: int = 0
self.total_deleted_files: int = 0
self.oldest_deletion_date: Optional[str] = None
# Add bulk recovery methods to RecoveryService
def bulk_recover_shots(self, shot_ids: List[int], db: Session, current_user: User) -> BulkRecoveryResult:
"""
Bulk recover multiple shots.
Args:
shot_ids: List of shot IDs to recover
db: Database session
current_user: User performing the recovery
Returns:
BulkRecoveryResult with operation details
"""
result = BulkRecoveryResult()
result.total_items = len(shot_ids)
for shot_id in shot_ids:
try:
recovery_result = self.recover_shot(shot_id, db, current_user)
result.results.append(recovery_result)
if recovery_result.success:
result.successful_recoveries += 1
else:
result.failed_recoveries += 1
error = BulkRecoveryError()
error.item_id = shot_id
error.item_type = "shot"
error.error = "; ".join(recovery_result.errors) if recovery_result.errors else "Unknown error"
result.errors.append(error)
except Exception as e:
result.failed_recoveries += 1
error = BulkRecoveryError()
error.item_id = shot_id
error.item_type = "shot"
error.error = str(e)
result.errors.append(error)
return result
def bulk_recover_assets(self, asset_ids: List[int], db: Session, current_user: User) -> BulkRecoveryResult:
"""
Bulk recover multiple assets.
Args:
asset_ids: List of asset IDs to recover
db: Database session
current_user: User performing the recovery
Returns:
BulkRecoveryResult with operation details
"""
result = BulkRecoveryResult()
result.total_items = len(asset_ids)
for asset_id in asset_ids:
try:
recovery_result = self.recover_asset(asset_id, db, current_user)
result.results.append(recovery_result)
if recovery_result.success:
result.successful_recoveries += 1
else:
result.failed_recoveries += 1
error = BulkRecoveryError()
error.item_id = asset_id
error.item_type = "asset"
error.error = "; ".join(recovery_result.errors) if recovery_result.errors else "Unknown error"
result.errors.append(error)
except Exception as e:
result.failed_recoveries += 1
error = BulkRecoveryError()
error.item_id = asset_id
error.item_type = "asset"
error.error = str(e)
result.errors.append(error)
return result
def get_recovery_stats(self, project_id: Optional[int], db: Session) -> RecoveryStats:
"""
Get recovery statistics.
Args:
project_id: Optional project ID to filter by
db: Database session
Returns:
RecoveryStats with statistics
"""
stats = RecoveryStats()
try:
# Count deleted shots
shot_query = db.query(Shot).filter(Shot.deleted_at.isnot(None))
if project_id:
shot_query = shot_query.join(Shot.episode).filter(
Shot.episode.has(project_id=project_id)
)
stats.deleted_shots_count = shot_query.count()
# Count deleted assets
asset_query = db.query(Asset).filter(Asset.deleted_at.isnot(None))
if project_id:
asset_query = asset_query.filter(Asset.project_id == project_id)
stats.deleted_assets_count = asset_query.count()
# Count total deleted tasks
task_query = db.query(Task).filter(Task.deleted_at.isnot(None))
if project_id:
# Filter tasks by project through shot/asset relationships
from sqlalchemy import or_
task_query = task_query.filter(
or_(
Task.shot_id.in_(
db.query(Shot.id).join(Shot.episode).filter(
Shot.episode.has(project_id=project_id)
)
),
Task.asset_id.in_(
db.query(Asset.id).filter(Asset.project_id == project_id)
)
)
)
stats.total_deleted_tasks = task_query.count()
# Count total deleted files (submissions + attachments)
submission_count = db.query(Submission).filter(Submission.deleted_at.isnot(None)).count()
attachment_count = db.query(TaskAttachment).filter(TaskAttachment.deleted_at.isnot(None)).count()
stats.total_deleted_files = submission_count + attachment_count
# Get oldest deletion date
oldest_shot = db.query(func.min(Shot.deleted_at)).filter(Shot.deleted_at.isnot(None)).scalar()
oldest_asset = db.query(func.min(Asset.deleted_at)).filter(Asset.deleted_at.isnot(None)).scalar()
oldest_date = None
if oldest_shot and oldest_asset:
oldest_date = min(oldest_shot, oldest_asset)
elif oldest_shot:
oldest_date = oldest_shot
elif oldest_asset:
oldest_date = oldest_asset
if oldest_date:
stats.oldest_deletion_date = oldest_date.isoformat()
except SQLAlchemyError:
pass # Return empty stats on error
return stats
# Add the methods to the RecoveryService class
RecoveryService.bulk_recover_shots = bulk_recover_shots
RecoveryService.bulk_recover_assets = bulk_recover_assets
RecoveryService.get_recovery_stats = get_recovery_stats