LinkDesk/backend/migrate_file_paths_to_relat...

381 lines
16 KiB
Python

#!/usr/bin/env python3
"""
Database migration script to convert absolute file paths to relative paths.
This script addresses the file path storage issue where absolute paths stored
in the database become invalid when deploying to different environments,
particularly Linux. The solution converts all absolute paths to relative paths
that are resolved dynamically at runtime.
Requirements addressed: 1.1, 1.2, 1.3, 1.4, 1.5
"""
import sys
import os
import logging
from pathlib import Path
from typing import List, Tuple, Optional
# Add the backend directory to the path
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from sqlalchemy.orm import sessionmaker
from sqlalchemy import text
from database import engine
from models.task import Submission, TaskAttachment
from models.project import Project
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('migration_file_paths.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class FilePathMigrator:
"""Handles migration of absolute file paths to relative paths."""
def __init__(self):
self.SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
self.backend_dir = Path(__file__).parent.resolve()
self.errors = []
self.stats = {
'submissions_processed': 0,
'submissions_converted': 0,
'attachments_processed': 0,
'attachments_converted': 0,
'projects_processed': 0,
'projects_converted': 0,
'errors': 0
}
def is_absolute_path(self, path: str) -> bool:
"""Check if a path is absolute."""
if not path:
return False
path_obj = Path(path)
return path_obj.is_absolute()
def convert_to_relative_path(self, absolute_path: str) -> Optional[str]:
"""
Convert absolute path to relative path.
Args:
absolute_path: The absolute file path to convert
Returns:
Relative path string or None if conversion fails
"""
try:
abs_path = Path(absolute_path).resolve()
# Check if the path is within the backend directory
try:
relative_path = abs_path.relative_to(self.backend_dir)
return str(relative_path).replace('\\', '/') # Use forward slashes for consistency
except ValueError:
# Path is not within backend directory
logger.warning(f"Path is outside backend directory: {absolute_path}")
# Try to extract just the uploads part if it exists
path_parts = abs_path.parts
if 'uploads' in path_parts:
uploads_index = path_parts.index('uploads')
relative_parts = path_parts[uploads_index:]
relative_path = '/'.join(relative_parts)
logger.info(f"Extracted uploads path: {relative_path}")
return relative_path
return None
except Exception as e:
logger.error(f"Failed to convert path {absolute_path}: {e}")
return None
def validate_file_exists(self, relative_path: str) -> bool:
"""
Validate that the file exists at the relative path.
Args:
relative_path: The relative path to validate
Returns:
True if file exists, False otherwise
"""
try:
full_path = self.backend_dir / relative_path
return full_path.exists()
except Exception:
return False
def migrate_submissions_table(self) -> None:
"""Migrate file paths in submissions table."""
logger.info("Starting migration of submissions table...")
db = self.SessionLocal()
try:
submissions = db.query(Submission).all()
self.stats['submissions_processed'] = len(submissions)
for submission in submissions:
try:
if not self.is_absolute_path(submission.file_path):
logger.debug(f"Submission {submission.id} already has relative path: {submission.file_path}")
continue
relative_path = self.convert_to_relative_path(submission.file_path)
if relative_path:
# Validate file exists
if self.validate_file_exists(relative_path):
old_path = submission.file_path
submission.file_path = relative_path
self.stats['submissions_converted'] += 1
logger.info(f"Submission {submission.id}: {old_path} -> {relative_path}")
else:
error_msg = f"Submission {submission.id}: File not found at relative path {relative_path}"
logger.warning(error_msg)
self.errors.append(error_msg)
self.stats['errors'] += 1
else:
error_msg = f"Submission {submission.id}: Failed to convert path {submission.file_path}"
logger.error(error_msg)
self.errors.append(error_msg)
self.stats['errors'] += 1
except Exception as e:
error_msg = f"Submission {submission.id}: Exception during migration: {e}"
logger.error(error_msg)
self.errors.append(error_msg)
self.stats['errors'] += 1
db.commit()
logger.info(f"Submissions migration completed. Converted: {self.stats['submissions_converted']}/{self.stats['submissions_processed']}")
except Exception as e:
db.rollback()
logger.error(f"Failed to migrate submissions table: {e}")
raise
finally:
db.close()
def migrate_attachments_table(self) -> None:
"""Migrate file paths in task_attachments table."""
logger.info("Starting migration of task_attachments table...")
db = self.SessionLocal()
try:
attachments = db.query(TaskAttachment).all()
self.stats['attachments_processed'] = len(attachments)
for attachment in attachments:
try:
if not self.is_absolute_path(attachment.file_path):
logger.debug(f"Attachment {attachment.id} already has relative path: {attachment.file_path}")
continue
relative_path = self.convert_to_relative_path(attachment.file_path)
if relative_path:
# Validate file exists
if self.validate_file_exists(relative_path):
old_path = attachment.file_path
attachment.file_path = relative_path
self.stats['attachments_converted'] += 1
logger.info(f"Attachment {attachment.id}: {old_path} -> {relative_path}")
else:
error_msg = f"Attachment {attachment.id}: File not found at relative path {relative_path}"
logger.warning(error_msg)
self.errors.append(error_msg)
self.stats['errors'] += 1
else:
error_msg = f"Attachment {attachment.id}: Failed to convert path {attachment.file_path}"
logger.error(error_msg)
self.errors.append(error_msg)
self.stats['errors'] += 1
except Exception as e:
error_msg = f"Attachment {attachment.id}: Exception during migration: {e}"
logger.error(error_msg)
self.errors.append(error_msg)
self.stats['errors'] += 1
db.commit()
logger.info(f"Attachments migration completed. Converted: {self.stats['attachments_converted']}/{self.stats['attachments_processed']}")
except Exception as e:
db.rollback()
logger.error(f"Failed to migrate attachments table: {e}")
raise
finally:
db.close()
def migrate_projects_table(self) -> None:
"""Migrate thumbnail paths in projects table."""
logger.info("Starting migration of projects table...")
db = self.SessionLocal()
try:
projects = db.query(Project).filter(Project.thumbnail_path.isnot(None)).all()
self.stats['projects_processed'] = len(projects)
for project in projects:
try:
if not self.is_absolute_path(project.thumbnail_path):
logger.debug(f"Project {project.id} already has relative thumbnail path: {project.thumbnail_path}")
continue
relative_path = self.convert_to_relative_path(project.thumbnail_path)
if relative_path:
# Validate file exists
if self.validate_file_exists(relative_path):
old_path = project.thumbnail_path
project.thumbnail_path = relative_path
self.stats['projects_converted'] += 1
logger.info(f"Project {project.id}: {old_path} -> {relative_path}")
else:
error_msg = f"Project {project.id}: Thumbnail not found at relative path {relative_path}"
logger.warning(error_msg)
self.errors.append(error_msg)
self.stats['errors'] += 1
else:
error_msg = f"Project {project.id}: Failed to convert thumbnail path {project.thumbnail_path}"
logger.error(error_msg)
self.errors.append(error_msg)
self.stats['errors'] += 1
except Exception as e:
error_msg = f"Project {project.id}: Exception during migration: {e}"
logger.error(error_msg)
self.errors.append(error_msg)
self.stats['errors'] += 1
db.commit()
logger.info(f"Projects migration completed. Converted: {self.stats['projects_converted']}/{self.stats['projects_processed']}")
except Exception as e:
db.rollback()
logger.error(f"Failed to migrate projects table: {e}")
raise
finally:
db.close()
def validate_migration_results(self) -> bool:
"""Validate that migration was successful."""
logger.info("Validating migration results...")
db = self.SessionLocal()
try:
# Check for any remaining absolute paths
remaining_absolute_submissions = db.query(Submission).filter(
Submission.file_path.like('C:%') |
Submission.file_path.like('D:%') |
Submission.file_path.like('/%')
).count()
remaining_absolute_attachments = db.query(TaskAttachment).filter(
TaskAttachment.file_path.like('C:%') |
TaskAttachment.file_path.like('D:%') |
TaskAttachment.file_path.like('/%')
).count()
remaining_absolute_projects = db.query(Project).filter(
Project.thumbnail_path.like('C:%') |
Project.thumbnail_path.like('D:%') |
Project.thumbnail_path.like('/%')
).count()
total_remaining = remaining_absolute_submissions + remaining_absolute_attachments + remaining_absolute_projects
if total_remaining > 0:
logger.warning(f"Migration incomplete: {total_remaining} absolute paths remain")
logger.warning(f" - Submissions: {remaining_absolute_submissions}")
logger.warning(f" - Attachments: {remaining_absolute_attachments}")
logger.warning(f" - Projects: {remaining_absolute_projects}")
return False
else:
logger.info("Migration validation successful: No absolute paths remain")
return True
except Exception as e:
logger.error(f"Failed to validate migration: {e}")
return False
finally:
db.close()
def print_migration_summary(self) -> None:
"""Print a summary of the migration results."""
logger.info("=== MIGRATION SUMMARY ===")
logger.info(f"Submissions processed: {self.stats['submissions_processed']}")
logger.info(f"Submissions converted: {self.stats['submissions_converted']}")
logger.info(f"Attachments processed: {self.stats['attachments_processed']}")
logger.info(f"Attachments converted: {self.stats['attachments_converted']}")
logger.info(f"Projects processed: {self.stats['projects_processed']}")
logger.info(f"Projects converted: {self.stats['projects_converted']}")
logger.info(f"Total errors: {self.stats['errors']}")
if self.errors:
logger.info("=== ERRORS ===")
for error in self.errors:
logger.info(f" - {error}")
def run_migration(self) -> bool:
"""Run the complete migration process."""
logger.info("Starting file path migration to relative paths...")
logger.info(f"Backend directory: {self.backend_dir}")
try:
# Migrate each table
self.migrate_submissions_table()
self.migrate_attachments_table()
self.migrate_projects_table()
# Validate results
validation_success = self.validate_migration_results()
# Print summary
self.print_migration_summary()
if validation_success and self.stats['errors'] == 0:
logger.info("Migration completed successfully!")
return True
else:
logger.warning("Migration completed with warnings or errors. Please review the log.")
return False
except Exception as e:
logger.error(f"Migration failed: {e}")
return False
def main():
"""Main function to run the migration."""
print("File Path Migration Script")
print("=" * 50)
print("This script will convert absolute file paths to relative paths in the database.")
print("A backup of the database is recommended before running this migration.")
print()
# Ask for confirmation
response = input("Do you want to proceed with the migration? (y/N): ").strip().lower()
if response not in ['y', 'yes']:
print("Migration cancelled.")
return
# Run migration
migrator = FilePathMigrator()
success = migrator.run_migration()
if success:
print("\nMigration completed successfully!")
sys.exit(0)
else:
print("\nMigration completed with errors. Please check the log file.")
sys.exit(1)
if __name__ == "__main__":
main()