381 lines
16 KiB
Python
381 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Database migration script to convert absolute file paths to relative paths.
|
|
|
|
This script addresses the file path storage issue where absolute paths stored
|
|
in the database become invalid when deploying to different environments,
|
|
particularly Linux. The solution converts all absolute paths to relative paths
|
|
that are resolved dynamically at runtime.
|
|
|
|
Requirements addressed: 1.1, 1.2, 1.3, 1.4, 1.5
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import List, Tuple, Optional
|
|
|
|
# Add the backend directory to the path
|
|
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
from sqlalchemy.orm import sessionmaker
|
|
from sqlalchemy import text
|
|
from database import engine
|
|
from models.task import Submission, TaskAttachment
|
|
from models.project import Project
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler('migration_file_paths.log'),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class FilePathMigrator:
|
|
"""Handles migration of absolute file paths to relative paths."""
|
|
|
|
def __init__(self):
|
|
self.SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
|
self.backend_dir = Path(__file__).parent.resolve()
|
|
self.errors = []
|
|
self.stats = {
|
|
'submissions_processed': 0,
|
|
'submissions_converted': 0,
|
|
'attachments_processed': 0,
|
|
'attachments_converted': 0,
|
|
'projects_processed': 0,
|
|
'projects_converted': 0,
|
|
'errors': 0
|
|
}
|
|
|
|
def is_absolute_path(self, path: str) -> bool:
|
|
"""Check if a path is absolute."""
|
|
if not path:
|
|
return False
|
|
path_obj = Path(path)
|
|
return path_obj.is_absolute()
|
|
|
|
def convert_to_relative_path(self, absolute_path: str) -> Optional[str]:
|
|
"""
|
|
Convert absolute path to relative path.
|
|
|
|
Args:
|
|
absolute_path: The absolute file path to convert
|
|
|
|
Returns:
|
|
Relative path string or None if conversion fails
|
|
"""
|
|
try:
|
|
abs_path = Path(absolute_path).resolve()
|
|
|
|
# Check if the path is within the backend directory
|
|
try:
|
|
relative_path = abs_path.relative_to(self.backend_dir)
|
|
return str(relative_path).replace('\\', '/') # Use forward slashes for consistency
|
|
except ValueError:
|
|
# Path is not within backend directory
|
|
logger.warning(f"Path is outside backend directory: {absolute_path}")
|
|
|
|
# Try to extract just the uploads part if it exists
|
|
path_parts = abs_path.parts
|
|
if 'uploads' in path_parts:
|
|
uploads_index = path_parts.index('uploads')
|
|
relative_parts = path_parts[uploads_index:]
|
|
relative_path = '/'.join(relative_parts)
|
|
logger.info(f"Extracted uploads path: {relative_path}")
|
|
return relative_path
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to convert path {absolute_path}: {e}")
|
|
return None
|
|
|
|
def validate_file_exists(self, relative_path: str) -> bool:
|
|
"""
|
|
Validate that the file exists at the relative path.
|
|
|
|
Args:
|
|
relative_path: The relative path to validate
|
|
|
|
Returns:
|
|
True if file exists, False otherwise
|
|
"""
|
|
try:
|
|
full_path = self.backend_dir / relative_path
|
|
return full_path.exists()
|
|
except Exception:
|
|
return False
|
|
|
|
def migrate_submissions_table(self) -> None:
|
|
"""Migrate file paths in submissions table."""
|
|
logger.info("Starting migration of submissions table...")
|
|
|
|
db = self.SessionLocal()
|
|
try:
|
|
submissions = db.query(Submission).all()
|
|
self.stats['submissions_processed'] = len(submissions)
|
|
|
|
for submission in submissions:
|
|
try:
|
|
if not self.is_absolute_path(submission.file_path):
|
|
logger.debug(f"Submission {submission.id} already has relative path: {submission.file_path}")
|
|
continue
|
|
|
|
relative_path = self.convert_to_relative_path(submission.file_path)
|
|
if relative_path:
|
|
# Validate file exists
|
|
if self.validate_file_exists(relative_path):
|
|
old_path = submission.file_path
|
|
submission.file_path = relative_path
|
|
self.stats['submissions_converted'] += 1
|
|
logger.info(f"Submission {submission.id}: {old_path} -> {relative_path}")
|
|
else:
|
|
error_msg = f"Submission {submission.id}: File not found at relative path {relative_path}"
|
|
logger.warning(error_msg)
|
|
self.errors.append(error_msg)
|
|
self.stats['errors'] += 1
|
|
else:
|
|
error_msg = f"Submission {submission.id}: Failed to convert path {submission.file_path}"
|
|
logger.error(error_msg)
|
|
self.errors.append(error_msg)
|
|
self.stats['errors'] += 1
|
|
|
|
except Exception as e:
|
|
error_msg = f"Submission {submission.id}: Exception during migration: {e}"
|
|
logger.error(error_msg)
|
|
self.errors.append(error_msg)
|
|
self.stats['errors'] += 1
|
|
|
|
db.commit()
|
|
logger.info(f"Submissions migration completed. Converted: {self.stats['submissions_converted']}/{self.stats['submissions_processed']}")
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"Failed to migrate submissions table: {e}")
|
|
raise
|
|
finally:
|
|
db.close()
|
|
|
|
def migrate_attachments_table(self) -> None:
|
|
"""Migrate file paths in task_attachments table."""
|
|
logger.info("Starting migration of task_attachments table...")
|
|
|
|
db = self.SessionLocal()
|
|
try:
|
|
attachments = db.query(TaskAttachment).all()
|
|
self.stats['attachments_processed'] = len(attachments)
|
|
|
|
for attachment in attachments:
|
|
try:
|
|
if not self.is_absolute_path(attachment.file_path):
|
|
logger.debug(f"Attachment {attachment.id} already has relative path: {attachment.file_path}")
|
|
continue
|
|
|
|
relative_path = self.convert_to_relative_path(attachment.file_path)
|
|
if relative_path:
|
|
# Validate file exists
|
|
if self.validate_file_exists(relative_path):
|
|
old_path = attachment.file_path
|
|
attachment.file_path = relative_path
|
|
self.stats['attachments_converted'] += 1
|
|
logger.info(f"Attachment {attachment.id}: {old_path} -> {relative_path}")
|
|
else:
|
|
error_msg = f"Attachment {attachment.id}: File not found at relative path {relative_path}"
|
|
logger.warning(error_msg)
|
|
self.errors.append(error_msg)
|
|
self.stats['errors'] += 1
|
|
else:
|
|
error_msg = f"Attachment {attachment.id}: Failed to convert path {attachment.file_path}"
|
|
logger.error(error_msg)
|
|
self.errors.append(error_msg)
|
|
self.stats['errors'] += 1
|
|
|
|
except Exception as e:
|
|
error_msg = f"Attachment {attachment.id}: Exception during migration: {e}"
|
|
logger.error(error_msg)
|
|
self.errors.append(error_msg)
|
|
self.stats['errors'] += 1
|
|
|
|
db.commit()
|
|
logger.info(f"Attachments migration completed. Converted: {self.stats['attachments_converted']}/{self.stats['attachments_processed']}")
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"Failed to migrate attachments table: {e}")
|
|
raise
|
|
finally:
|
|
db.close()
|
|
|
|
def migrate_projects_table(self) -> None:
|
|
"""Migrate thumbnail paths in projects table."""
|
|
logger.info("Starting migration of projects table...")
|
|
|
|
db = self.SessionLocal()
|
|
try:
|
|
projects = db.query(Project).filter(Project.thumbnail_path.isnot(None)).all()
|
|
self.stats['projects_processed'] = len(projects)
|
|
|
|
for project in projects:
|
|
try:
|
|
if not self.is_absolute_path(project.thumbnail_path):
|
|
logger.debug(f"Project {project.id} already has relative thumbnail path: {project.thumbnail_path}")
|
|
continue
|
|
|
|
relative_path = self.convert_to_relative_path(project.thumbnail_path)
|
|
if relative_path:
|
|
# Validate file exists
|
|
if self.validate_file_exists(relative_path):
|
|
old_path = project.thumbnail_path
|
|
project.thumbnail_path = relative_path
|
|
self.stats['projects_converted'] += 1
|
|
logger.info(f"Project {project.id}: {old_path} -> {relative_path}")
|
|
else:
|
|
error_msg = f"Project {project.id}: Thumbnail not found at relative path {relative_path}"
|
|
logger.warning(error_msg)
|
|
self.errors.append(error_msg)
|
|
self.stats['errors'] += 1
|
|
else:
|
|
error_msg = f"Project {project.id}: Failed to convert thumbnail path {project.thumbnail_path}"
|
|
logger.error(error_msg)
|
|
self.errors.append(error_msg)
|
|
self.stats['errors'] += 1
|
|
|
|
except Exception as e:
|
|
error_msg = f"Project {project.id}: Exception during migration: {e}"
|
|
logger.error(error_msg)
|
|
self.errors.append(error_msg)
|
|
self.stats['errors'] += 1
|
|
|
|
db.commit()
|
|
logger.info(f"Projects migration completed. Converted: {self.stats['projects_converted']}/{self.stats['projects_processed']}")
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"Failed to migrate projects table: {e}")
|
|
raise
|
|
finally:
|
|
db.close()
|
|
|
|
def validate_migration_results(self) -> bool:
|
|
"""Validate that migration was successful."""
|
|
logger.info("Validating migration results...")
|
|
|
|
db = self.SessionLocal()
|
|
try:
|
|
# Check for any remaining absolute paths
|
|
remaining_absolute_submissions = db.query(Submission).filter(
|
|
Submission.file_path.like('C:%') |
|
|
Submission.file_path.like('D:%') |
|
|
Submission.file_path.like('/%')
|
|
).count()
|
|
|
|
remaining_absolute_attachments = db.query(TaskAttachment).filter(
|
|
TaskAttachment.file_path.like('C:%') |
|
|
TaskAttachment.file_path.like('D:%') |
|
|
TaskAttachment.file_path.like('/%')
|
|
).count()
|
|
|
|
remaining_absolute_projects = db.query(Project).filter(
|
|
Project.thumbnail_path.like('C:%') |
|
|
Project.thumbnail_path.like('D:%') |
|
|
Project.thumbnail_path.like('/%')
|
|
).count()
|
|
|
|
total_remaining = remaining_absolute_submissions + remaining_absolute_attachments + remaining_absolute_projects
|
|
|
|
if total_remaining > 0:
|
|
logger.warning(f"Migration incomplete: {total_remaining} absolute paths remain")
|
|
logger.warning(f" - Submissions: {remaining_absolute_submissions}")
|
|
logger.warning(f" - Attachments: {remaining_absolute_attachments}")
|
|
logger.warning(f" - Projects: {remaining_absolute_projects}")
|
|
return False
|
|
else:
|
|
logger.info("Migration validation successful: No absolute paths remain")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to validate migration: {e}")
|
|
return False
|
|
finally:
|
|
db.close()
|
|
|
|
def print_migration_summary(self) -> None:
|
|
"""Print a summary of the migration results."""
|
|
logger.info("=== MIGRATION SUMMARY ===")
|
|
logger.info(f"Submissions processed: {self.stats['submissions_processed']}")
|
|
logger.info(f"Submissions converted: {self.stats['submissions_converted']}")
|
|
logger.info(f"Attachments processed: {self.stats['attachments_processed']}")
|
|
logger.info(f"Attachments converted: {self.stats['attachments_converted']}")
|
|
logger.info(f"Projects processed: {self.stats['projects_processed']}")
|
|
logger.info(f"Projects converted: {self.stats['projects_converted']}")
|
|
logger.info(f"Total errors: {self.stats['errors']}")
|
|
|
|
if self.errors:
|
|
logger.info("=== ERRORS ===")
|
|
for error in self.errors:
|
|
logger.info(f" - {error}")
|
|
|
|
def run_migration(self) -> bool:
|
|
"""Run the complete migration process."""
|
|
logger.info("Starting file path migration to relative paths...")
|
|
logger.info(f"Backend directory: {self.backend_dir}")
|
|
|
|
try:
|
|
# Migrate each table
|
|
self.migrate_submissions_table()
|
|
self.migrate_attachments_table()
|
|
self.migrate_projects_table()
|
|
|
|
# Validate results
|
|
validation_success = self.validate_migration_results()
|
|
|
|
# Print summary
|
|
self.print_migration_summary()
|
|
|
|
if validation_success and self.stats['errors'] == 0:
|
|
logger.info("Migration completed successfully!")
|
|
return True
|
|
else:
|
|
logger.warning("Migration completed with warnings or errors. Please review the log.")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Migration failed: {e}")
|
|
return False
|
|
|
|
|
|
def main():
|
|
"""Main function to run the migration."""
|
|
print("File Path Migration Script")
|
|
print("=" * 50)
|
|
print("This script will convert absolute file paths to relative paths in the database.")
|
|
print("A backup of the database is recommended before running this migration.")
|
|
print()
|
|
|
|
# Ask for confirmation
|
|
response = input("Do you want to proceed with the migration? (y/N): ").strip().lower()
|
|
if response not in ['y', 'yes']:
|
|
print("Migration cancelled.")
|
|
return
|
|
|
|
# Run migration
|
|
migrator = FilePathMigrator()
|
|
success = migrator.run_migration()
|
|
|
|
if success:
|
|
print("\nMigration completed successfully!")
|
|
sys.exit(0)
|
|
else:
|
|
print("\nMigration completed with errors. Please check the log file.")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |