LinkDesk/backend/migrate_file_paths_producti...

584 lines
26 KiB
Python

#!/usr/bin/env python3
"""
Production-ready database migration script to convert absolute file paths to relative paths.
This script is designed for production deployment and includes comprehensive
validation, backup recommendations, and detailed logging.
Requirements addressed: 1.1, 1.2, 1.3, 1.4, 1.5
"""
import sys
import os
import logging
import shutil
from pathlib import Path
from typing import List, Tuple, Optional, Dict
from datetime import datetime
# Add the backend directory to the path
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from sqlalchemy.orm import sessionmaker
from sqlalchemy import text
from database import engine
from models.task import Submission, TaskAttachment
from models.project import Project
# Configure logging
def setup_logging():
"""Setup logging configuration."""
log_filename = f"migration_file_paths_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_filename),
logging.StreamHandler()
]
)
return logging.getLogger(__name__)
class ProductionFilePathMigrator:
"""Production-ready file path migrator with comprehensive validation and error handling."""
def __init__(self):
self.logger = setup_logging()
self.SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
self.backend_dir = Path(__file__).parent.resolve()
self.errors = []
self.warnings = []
self.stats = {
'submissions_processed': 0,
'submissions_converted': 0,
'submissions_skipped': 0,
'attachments_processed': 0,
'attachments_converted': 0,
'attachments_skipped': 0,
'projects_processed': 0,
'projects_converted': 0,
'projects_skipped': 0,
'errors': 0,
'warnings': 0
}
def create_database_backup(self) -> Optional[str]:
"""Create a backup of the database before migration."""
try:
db_path = self.backend_dir / "database.db"
if not db_path.exists():
self.logger.warning("Database file not found, skipping backup")
return None
backup_filename = f"database_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.db"
backup_path = self.backend_dir / backup_filename
shutil.copy2(db_path, backup_path)
self.logger.info(f"Database backup created: {backup_filename}")
return str(backup_path)
except Exception as e:
self.logger.error(f"Failed to create database backup: {e}")
return None
def validate_database_connection(self) -> bool:
"""Validate database connection and basic structure."""
try:
db = self.SessionLocal()
# Test basic queries
submissions_count = db.query(Submission).count()
attachments_count = db.query(TaskAttachment).count()
projects_count = db.query(Project).count()
self.logger.info(f"Database validation successful:")
self.logger.info(f" - Submissions: {submissions_count}")
self.logger.info(f" - Attachments: {attachments_count}")
self.logger.info(f" - Projects: {projects_count}")
db.close()
return True
except Exception as e:
self.logger.error(f"Database validation failed: {e}")
return False
def is_absolute_path(self, path: str) -> bool:
"""Check if a path is absolute."""
if not path:
return False
path_obj = Path(path)
return path_obj.is_absolute()
def convert_to_relative_path(self, absolute_path: str) -> Optional[str]:
"""
Convert absolute path to relative path with enhanced error handling.
Args:
absolute_path: The absolute file path to convert
Returns:
Relative path string or None if conversion fails
"""
try:
abs_path = Path(absolute_path).resolve()
# Check if the path is within the backend directory
try:
relative_path = abs_path.relative_to(self.backend_dir)
return str(relative_path).replace('\\', '/') # Use forward slashes for consistency
except ValueError:
# Path is not within backend directory - try to extract uploads part
path_parts = abs_path.parts
if 'uploads' in path_parts:
uploads_index = path_parts.index('uploads')
relative_parts = path_parts[uploads_index:]
relative_path = '/'.join(relative_parts)
self.logger.info(f"Extracted uploads path from external location: {relative_path}")
return relative_path
# Try to find backend directory in path
if 'backend' in path_parts:
backend_index = path_parts.index('backend')
if backend_index + 1 < len(path_parts):
relative_parts = path_parts[backend_index + 1:]
relative_path = '/'.join(relative_parts)
self.logger.info(f"Extracted path relative to backend: {relative_path}")
return relative_path
self.logger.warning(f"Cannot determine relative path for: {absolute_path}")
return None
except Exception as e:
self.logger.error(f"Failed to convert path {absolute_path}: {e}")
return None
def validate_file_exists(self, relative_path: str) -> Tuple[bool, str]:
"""
Validate that the file exists at the relative path.
Args:
relative_path: The relative path to validate
Returns:
Tuple of (exists, full_path)
"""
try:
full_path = self.backend_dir / relative_path
exists = full_path.exists()
return exists, str(full_path)
except Exception as e:
self.logger.error(f"Error validating file existence for {relative_path}: {e}")
return False, ""
def analyze_problematic_paths(self) -> Dict[str, List[str]]:
"""Analyze and categorize problematic paths before migration."""
self.logger.info("Analyzing potentially problematic paths...")
problematic = {
'submissions': [],
'attachments': [],
'projects': []
}
db = self.SessionLocal()
try:
# Check submissions
submissions = db.query(Submission).all()
for sub in submissions:
if self.is_absolute_path(sub.file_path):
relative = self.convert_to_relative_path(sub.file_path)
if not relative:
problematic['submissions'].append(f"ID {sub.id}: {sub.file_path}")
else:
exists, _ = self.validate_file_exists(relative)
if not exists:
problematic['submissions'].append(f"ID {sub.id}: File not found - {sub.file_path}")
# Check attachments
attachments = db.query(TaskAttachment).all()
for att in attachments:
if self.is_absolute_path(att.file_path):
relative = self.convert_to_relative_path(att.file_path)
if not relative:
problematic['attachments'].append(f"ID {att.id}: {att.file_path}")
else:
exists, _ = self.validate_file_exists(relative)
if not exists:
problematic['attachments'].append(f"ID {att.id}: File not found - {att.file_path}")
# Check projects
projects = db.query(Project).filter(Project.thumbnail_path.isnot(None)).all()
for proj in projects:
if self.is_absolute_path(proj.thumbnail_path):
relative = self.convert_to_relative_path(proj.thumbnail_path)
if not relative:
problematic['projects'].append(f"ID {proj.id}: {proj.thumbnail_path}")
else:
exists, _ = self.validate_file_exists(relative)
if not exists:
problematic['projects'].append(f"ID {proj.id}: File not found - {proj.thumbnail_path}")
finally:
db.close()
# Report findings
total_issues = sum(len(issues) for issues in problematic.values())
if total_issues > 0:
self.logger.warning(f"Found {total_issues} potentially problematic paths:")
for category, issues in problematic.items():
if issues:
self.logger.warning(f" {category.upper()}:")
for issue in issues:
self.logger.warning(f" - {issue}")
else:
self.logger.info("No problematic paths detected")
return problematic
def migrate_submissions_table(self) -> None:
"""Migrate file paths in submissions table with enhanced error handling."""
self.logger.info("Starting migration of submissions table...")
db = self.SessionLocal()
try:
submissions = db.query(Submission).all()
self.stats['submissions_processed'] = len(submissions)
for submission in submissions:
try:
if not self.is_absolute_path(submission.file_path):
self.logger.debug(f"Submission {submission.id} already has relative path: {submission.file_path}")
self.stats['submissions_skipped'] += 1
continue
relative_path = self.convert_to_relative_path(submission.file_path)
if relative_path:
# Validate file exists
exists, full_path = self.validate_file_exists(relative_path)
if exists:
old_path = submission.file_path
submission.file_path = relative_path
self.stats['submissions_converted'] += 1
self.logger.info(f"Submission {submission.id}: {old_path} -> {relative_path}")
else:
error_msg = f"Submission {submission.id}: File not found at {full_path}"
self.logger.warning(error_msg)
self.warnings.append(error_msg)
self.stats['warnings'] += 1
else:
error_msg = f"Submission {submission.id}: Failed to convert path {submission.file_path}"
self.logger.error(error_msg)
self.errors.append(error_msg)
self.stats['errors'] += 1
except Exception as e:
error_msg = f"Submission {submission.id}: Exception during migration: {e}"
self.logger.error(error_msg)
self.errors.append(error_msg)
self.stats['errors'] += 1
db.commit()
self.logger.info(f"Submissions migration completed. Converted: {self.stats['submissions_converted']}, Skipped: {self.stats['submissions_skipped']}")
except Exception as e:
db.rollback()
self.logger.error(f"Failed to migrate submissions table: {e}")
raise
finally:
db.close()
def migrate_attachments_table(self) -> None:
"""Migrate file paths in task_attachments table with enhanced error handling."""
self.logger.info("Starting migration of task_attachments table...")
db = self.SessionLocal()
try:
attachments = db.query(TaskAttachment).all()
self.stats['attachments_processed'] = len(attachments)
for attachment in attachments:
try:
if not self.is_absolute_path(attachment.file_path):
self.logger.debug(f"Attachment {attachment.id} already has relative path: {attachment.file_path}")
self.stats['attachments_skipped'] += 1
continue
relative_path = self.convert_to_relative_path(attachment.file_path)
if relative_path:
# Validate file exists
exists, full_path = self.validate_file_exists(relative_path)
if exists:
old_path = attachment.file_path
attachment.file_path = relative_path
self.stats['attachments_converted'] += 1
self.logger.info(f"Attachment {attachment.id}: {old_path} -> {relative_path}")
else:
error_msg = f"Attachment {attachment.id}: File not found at {full_path}"
self.logger.warning(error_msg)
self.warnings.append(error_msg)
self.stats['warnings'] += 1
else:
error_msg = f"Attachment {attachment.id}: Failed to convert path {attachment.file_path}"
self.logger.error(error_msg)
self.errors.append(error_msg)
self.stats['errors'] += 1
except Exception as e:
error_msg = f"Attachment {attachment.id}: Exception during migration: {e}"
self.logger.error(error_msg)
self.errors.append(error_msg)
self.stats['errors'] += 1
db.commit()
self.logger.info(f"Attachments migration completed. Converted: {self.stats['attachments_converted']}, Skipped: {self.stats['attachments_skipped']}")
except Exception as e:
db.rollback()
self.logger.error(f"Failed to migrate attachments table: {e}")
raise
finally:
db.close()
def migrate_projects_table(self) -> None:
"""Migrate thumbnail paths in projects table with enhanced error handling."""
self.logger.info("Starting migration of projects table...")
db = self.SessionLocal()
try:
projects = db.query(Project).filter(Project.thumbnail_path.isnot(None)).all()
self.stats['projects_processed'] = len(projects)
for project in projects:
try:
if not self.is_absolute_path(project.thumbnail_path):
self.logger.debug(f"Project {project.id} already has relative thumbnail path: {project.thumbnail_path}")
self.stats['projects_skipped'] += 1
continue
relative_path = self.convert_to_relative_path(project.thumbnail_path)
if relative_path:
# Validate file exists
exists, full_path = self.validate_file_exists(relative_path)
if exists:
old_path = project.thumbnail_path
project.thumbnail_path = relative_path
self.stats['projects_converted'] += 1
self.logger.info(f"Project {project.id}: {old_path} -> {relative_path}")
else:
error_msg = f"Project {project.id}: Thumbnail not found at {full_path}"
self.logger.warning(error_msg)
self.warnings.append(error_msg)
self.stats['warnings'] += 1
else:
error_msg = f"Project {project.id}: Failed to convert thumbnail path {project.thumbnail_path}"
self.logger.error(error_msg)
self.errors.append(error_msg)
self.stats['errors'] += 1
except Exception as e:
error_msg = f"Project {project.id}: Exception during migration: {e}"
self.logger.error(error_msg)
self.errors.append(error_msg)
self.stats['errors'] += 1
db.commit()
self.logger.info(f"Projects migration completed. Converted: {self.stats['projects_converted']}, Skipped: {self.stats['projects_skipped']}")
except Exception as e:
db.rollback()
self.logger.error(f"Failed to migrate projects table: {e}")
raise
finally:
db.close()
def validate_migration_results(self) -> bool:
"""Comprehensive validation of migration results."""
self.logger.info("Validating migration results...")
db = self.SessionLocal()
try:
# Check for any remaining absolute paths using multiple patterns
absolute_patterns = ['C:%', 'D:%', 'E:%', 'F:%', '/%', '/home/%', '/usr/%', '/var/%']
remaining_absolute_submissions = 0
remaining_absolute_attachments = 0
remaining_absolute_projects = 0
for pattern in absolute_patterns:
remaining_absolute_submissions += db.query(Submission).filter(
Submission.file_path.like(pattern)
).count()
remaining_absolute_attachments += db.query(TaskAttachment).filter(
TaskAttachment.file_path.like(pattern)
).count()
remaining_absolute_projects += db.query(Project).filter(
Project.thumbnail_path.like(pattern)
).count()
total_remaining = remaining_absolute_submissions + remaining_absolute_attachments + remaining_absolute_projects
# Additional validation: check that all relative paths start with expected prefixes
valid_prefixes = ['uploads/', './uploads/', 'backend/uploads/']
invalid_submissions = db.query(Submission).filter(
~Submission.file_path.like('uploads/%')
).count()
invalid_attachments = db.query(TaskAttachment).filter(
~TaskAttachment.file_path.like('uploads/%')
).count()
invalid_projects = db.query(Project).filter(
Project.thumbnail_path.isnot(None),
~Project.thumbnail_path.like('uploads/%')
).count()
total_invalid = invalid_submissions + invalid_attachments + invalid_projects
if total_remaining > 0:
self.logger.warning(f"Migration incomplete: {total_remaining} absolute paths remain")
self.logger.warning(f" - Submissions: {remaining_absolute_submissions}")
self.logger.warning(f" - Attachments: {remaining_absolute_attachments}")
self.logger.warning(f" - Projects: {remaining_absolute_projects}")
return False
elif total_invalid > 0:
self.logger.warning(f"Migration validation found {total_invalid} paths with unexpected format")
self.logger.warning(f" - Submissions: {invalid_submissions}")
self.logger.warning(f" - Attachments: {invalid_attachments}")
self.logger.warning(f" - Projects: {invalid_projects}")
return False
else:
self.logger.info("Migration validation successful: All paths are now relative and properly formatted")
return True
except Exception as e:
self.logger.error(f"Failed to validate migration: {e}")
return False
finally:
db.close()
def print_migration_summary(self) -> None:
"""Print a comprehensive summary of the migration results."""
self.logger.info("=" * 60)
self.logger.info("MIGRATION SUMMARY")
self.logger.info("=" * 60)
self.logger.info(f"Submissions processed: {self.stats['submissions_processed']}")
self.logger.info(f"Submissions converted: {self.stats['submissions_converted']}")
self.logger.info(f"Submissions skipped: {self.stats['submissions_skipped']}")
self.logger.info(f"Attachments processed: {self.stats['attachments_processed']}")
self.logger.info(f"Attachments converted: {self.stats['attachments_converted']}")
self.logger.info(f"Attachments skipped: {self.stats['attachments_skipped']}")
self.logger.info(f"Projects processed: {self.stats['projects_processed']}")
self.logger.info(f"Projects converted: {self.stats['projects_converted']}")
self.logger.info(f"Projects skipped: {self.stats['projects_skipped']}")
self.logger.info(f"Total errors: {self.stats['errors']}")
self.logger.info(f"Total warnings: {self.stats['warnings']}")
if self.errors:
self.logger.info("=" * 60)
self.logger.info("ERRORS")
self.logger.info("=" * 60)
for error in self.errors:
self.logger.error(f" - {error}")
if self.warnings:
self.logger.info("=" * 60)
self.logger.info("WARNINGS")
self.logger.info("=" * 60)
for warning in self.warnings:
self.logger.warning(f" - {warning}")
def run_migration(self, create_backup: bool = True) -> bool:
"""Run the complete migration process with comprehensive validation."""
self.logger.info("Starting production file path migration to relative paths...")
self.logger.info(f"Backend directory: {self.backend_dir}")
try:
# Step 1: Validate database connection
if not self.validate_database_connection():
self.logger.error("Database validation failed. Aborting migration.")
return False
# Step 2: Create backup if requested
backup_path = None
if create_backup:
backup_path = self.create_database_backup()
if backup_path:
self.logger.info(f"Database backup created at: {backup_path}")
else:
self.logger.warning("Failed to create backup, but continuing with migration")
# Step 3: Analyze problematic paths
problematic_paths = self.analyze_problematic_paths()
# Step 4: Migrate each table
self.migrate_submissions_table()
self.migrate_attachments_table()
self.migrate_projects_table()
# Step 5: Validate results
validation_success = self.validate_migration_results()
# Step 6: Print summary
self.print_migration_summary()
# Step 7: Determine overall success
if validation_success and self.stats['errors'] == 0:
self.logger.info("Migration completed successfully!")
if backup_path:
self.logger.info(f"Backup is available at: {backup_path}")
return True
else:
self.logger.warning("Migration completed with warnings or errors. Please review the log.")
if backup_path:
self.logger.info(f"Database can be restored from backup: {backup_path}")
return False
except Exception as e:
self.logger.error(f"Migration failed: {e}")
return False
def main():
"""Main function for production migration."""
print("Production File Path Migration Script")
print("=" * 60)
print("This script will convert absolute file paths to relative paths in the database.")
print("It includes comprehensive validation and error handling for production use.")
print()
print("IMPORTANT: This script will create a database backup before migration.")
print(" Review the log file for detailed results.")
print()
# Ask for confirmation
response = input("Do you want to proceed with the migration? (y/N): ").strip().lower()
if response not in ['y', 'yes']:
print("Migration cancelled.")
return
# Ask about backup
backup_response = input("Create database backup before migration? (Y/n): ").strip().lower()
create_backup = backup_response not in ['n', 'no']
# Run migration
migrator = ProductionFilePathMigrator()
success = migrator.run_migration(create_backup=create_backup)
if success:
print("\nMigration completed successfully!")
print("Check the log file for detailed results.")
sys.exit(0)
else:
print("\nMigration completed with errors or warnings.")
print("Check the log file for detailed results.")
print("Database backup is available if restoration is needed.")
sys.exit(1)
if __name__ == "__main__":
main()