283 lines
10 KiB
Python
283 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Validation script to verify migration results and file accessibility.
|
|
|
|
This script validates that:
|
|
1. All file paths in the database are now relative
|
|
2. All files are accessible via their relative paths
|
|
3. File serving endpoints will work correctly
|
|
|
|
Requirements addressed: 1.5, 4.3, 4.5
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
from pathlib import Path
|
|
from typing import List, Tuple
|
|
|
|
# Add the backend directory to the path
|
|
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
from sqlalchemy.orm import sessionmaker
|
|
from database import engine
|
|
from models.task import Submission, TaskAttachment
|
|
from models.project import Project
|
|
|
|
|
|
class MigrationValidator:
|
|
"""Validates migration results and file accessibility."""
|
|
|
|
def __init__(self):
|
|
self.SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
|
self.backend_dir = Path(__file__).parent.resolve()
|
|
self.issues = []
|
|
|
|
def is_relative_path(self, path: str) -> bool:
|
|
"""Check if a path is relative."""
|
|
if not path:
|
|
return False
|
|
return not Path(path).is_absolute()
|
|
|
|
def validate_file_accessibility(self, relative_path: str) -> Tuple[bool, str]:
|
|
"""Validate that a file is accessible via its relative path."""
|
|
try:
|
|
full_path = self.backend_dir / relative_path
|
|
exists = full_path.exists()
|
|
return exists, str(full_path)
|
|
except Exception as e:
|
|
return False, f"Error: {e}"
|
|
|
|
def validate_submissions(self) -> dict:
|
|
"""Validate submissions table."""
|
|
print("Validating submissions table...")
|
|
|
|
db = self.SessionLocal()
|
|
results = {
|
|
'total': 0,
|
|
'relative_paths': 0,
|
|
'accessible_files': 0,
|
|
'issues': []
|
|
}
|
|
|
|
try:
|
|
submissions = db.query(Submission).all()
|
|
results['total'] = len(submissions)
|
|
|
|
for submission in submissions:
|
|
# Check if path is relative
|
|
if self.is_relative_path(submission.file_path):
|
|
results['relative_paths'] += 1
|
|
|
|
# Check if file is accessible
|
|
accessible, full_path = self.validate_file_accessibility(submission.file_path)
|
|
if accessible:
|
|
results['accessible_files'] += 1
|
|
else:
|
|
issue = f"Submission {submission.id}: File not accessible at {full_path}"
|
|
results['issues'].append(issue)
|
|
self.issues.append(issue)
|
|
else:
|
|
issue = f"Submission {submission.id}: Still has absolute path: {submission.file_path}"
|
|
results['issues'].append(issue)
|
|
self.issues.append(issue)
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
return results
|
|
|
|
def validate_attachments(self) -> dict:
|
|
"""Validate task_attachments table."""
|
|
print("Validating task_attachments table...")
|
|
|
|
db = self.SessionLocal()
|
|
results = {
|
|
'total': 0,
|
|
'relative_paths': 0,
|
|
'accessible_files': 0,
|
|
'issues': []
|
|
}
|
|
|
|
try:
|
|
attachments = db.query(TaskAttachment).all()
|
|
results['total'] = len(attachments)
|
|
|
|
for attachment in attachments:
|
|
# Check if path is relative
|
|
if self.is_relative_path(attachment.file_path):
|
|
results['relative_paths'] += 1
|
|
|
|
# Check if file is accessible
|
|
accessible, full_path = self.validate_file_accessibility(attachment.file_path)
|
|
if accessible:
|
|
results['accessible_files'] += 1
|
|
else:
|
|
issue = f"Attachment {attachment.id}: File not accessible at {full_path}"
|
|
results['issues'].append(issue)
|
|
self.issues.append(issue)
|
|
else:
|
|
issue = f"Attachment {attachment.id}: Still has absolute path: {attachment.file_path}"
|
|
results['issues'].append(issue)
|
|
self.issues.append(issue)
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
return results
|
|
|
|
def validate_projects(self) -> dict:
|
|
"""Validate projects table."""
|
|
print("Validating projects table...")
|
|
|
|
db = self.SessionLocal()
|
|
results = {
|
|
'total': 0,
|
|
'relative_paths': 0,
|
|
'accessible_files': 0,
|
|
'issues': []
|
|
}
|
|
|
|
try:
|
|
projects = db.query(Project).filter(Project.thumbnail_path.isnot(None)).all()
|
|
results['total'] = len(projects)
|
|
|
|
for project in projects:
|
|
# Check if path is relative
|
|
if self.is_relative_path(project.thumbnail_path):
|
|
results['relative_paths'] += 1
|
|
|
|
# Check if file is accessible
|
|
accessible, full_path = self.validate_file_accessibility(project.thumbnail_path)
|
|
if accessible:
|
|
results['accessible_files'] += 1
|
|
else:
|
|
issue = f"Project {project.id}: Thumbnail not accessible at {full_path}"
|
|
results['issues'].append(issue)
|
|
self.issues.append(issue)
|
|
else:
|
|
issue = f"Project {project.id}: Still has absolute thumbnail path: {project.thumbnail_path}"
|
|
results['issues'].append(issue)
|
|
self.issues.append(issue)
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
return results
|
|
|
|
def validate_path_format_consistency(self) -> dict:
|
|
"""Validate that all relative paths follow consistent format."""
|
|
print("Validating path format consistency...")
|
|
|
|
db = self.SessionLocal()
|
|
results = {
|
|
'consistent_format': 0,
|
|
'inconsistent_format': 0,
|
|
'issues': []
|
|
}
|
|
|
|
try:
|
|
# Expected format: uploads/[type]/[id]/[filename]
|
|
|
|
# Check submissions
|
|
submissions = db.query(Submission).all()
|
|
for submission in submissions:
|
|
if submission.file_path.startswith('uploads/submissions/'):
|
|
results['consistent_format'] += 1
|
|
else:
|
|
issue = f"Submission {submission.id}: Inconsistent path format: {submission.file_path}"
|
|
results['issues'].append(issue)
|
|
results['inconsistent_format'] += 1
|
|
|
|
# Check attachments
|
|
attachments = db.query(TaskAttachment).all()
|
|
for attachment in attachments:
|
|
if attachment.file_path.startswith('uploads/attachments/'):
|
|
results['consistent_format'] += 1
|
|
else:
|
|
issue = f"Attachment {attachment.id}: Inconsistent path format: {attachment.file_path}"
|
|
results['issues'].append(issue)
|
|
results['inconsistent_format'] += 1
|
|
|
|
# Check projects
|
|
projects = db.query(Project).filter(Project.thumbnail_path.isnot(None)).all()
|
|
for project in projects:
|
|
if project.thumbnail_path.startswith('uploads/project_thumbnails/'):
|
|
results['consistent_format'] += 1
|
|
else:
|
|
issue = f"Project {project.id}: Inconsistent thumbnail path format: {project.thumbnail_path}"
|
|
results['issues'].append(issue)
|
|
results['inconsistent_format'] += 1
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
return results
|
|
|
|
def run_validation(self) -> bool:
|
|
"""Run complete validation and return success status."""
|
|
print("=" * 60)
|
|
print("MIGRATION VALIDATION REPORT")
|
|
print("=" * 60)
|
|
|
|
# Validate each table
|
|
submissions_results = self.validate_submissions()
|
|
attachments_results = self.validate_attachments()
|
|
projects_results = self.validate_projects()
|
|
format_results = self.validate_path_format_consistency()
|
|
|
|
# Print results
|
|
print("\nSUBMISSIONS:")
|
|
print(f" Total: {submissions_results['total']}")
|
|
print(f" Relative paths: {submissions_results['relative_paths']}")
|
|
print(f" Accessible files: {submissions_results['accessible_files']}")
|
|
print(f" Issues: {len(submissions_results['issues'])}")
|
|
|
|
print("\nATTACHMENTS:")
|
|
print(f" Total: {attachments_results['total']}")
|
|
print(f" Relative paths: {attachments_results['relative_paths']}")
|
|
print(f" Accessible files: {attachments_results['accessible_files']}")
|
|
print(f" Issues: {len(attachments_results['issues'])}")
|
|
|
|
print("\nPROJECTS:")
|
|
print(f" Total: {projects_results['total']}")
|
|
print(f" Relative paths: {projects_results['relative_paths']}")
|
|
print(f" Accessible files: {projects_results['accessible_files']}")
|
|
print(f" Issues: {len(projects_results['issues'])}")
|
|
|
|
print("\nPATH FORMAT:")
|
|
print(f" Consistent format: {format_results['consistent_format']}")
|
|
print(f" Inconsistent format: {format_results['inconsistent_format']}")
|
|
|
|
# Print issues if any
|
|
if self.issues:
|
|
print("\nISSUES FOUND:")
|
|
for issue in self.issues:
|
|
print(f" - {issue}")
|
|
|
|
# Determine overall success
|
|
total_issues = len(self.issues) + format_results['inconsistent_format']
|
|
|
|
print("\n" + "=" * 60)
|
|
if total_issues == 0:
|
|
print("✅ VALIDATION PASSED: All file paths are relative and accessible")
|
|
return True
|
|
else:
|
|
print(f"❌ VALIDATION FAILED: {total_issues} issues found")
|
|
return False
|
|
|
|
|
|
def main():
|
|
"""Main validation function."""
|
|
validator = MigrationValidator()
|
|
success = validator.run_validation()
|
|
|
|
if success:
|
|
print("\nMigration validation completed successfully!")
|
|
sys.exit(0)
|
|
else:
|
|
print("\nMigration validation found issues that need attention.")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |