LinkDesk/backend/migrate_shot_project_id.py

524 lines
20 KiB
Python

#!/usr/bin/env python3
"""
Migration script to add project_id column to shots table.
This script adds the following to the shots table:
- project_id column with foreign key constraint to projects table
- Index on project_id for performance optimization
- Populates project_id for existing shots based on episode relationships
- Unique constraint for project-scoped shot names (excluding soft-deleted shots)
Requirements: 2.1, 2.2, 2.3, 2.4, 2.5
Usage:
python migrate_shot_project_id.py
"""
import sqlite3
import sys
from pathlib import Path
import logging
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')
logger = logging.getLogger(__name__)
def get_database_path():
"""Get the database path, trying multiple possible locations."""
possible_paths = [
"vfx_project_management.db", # Primary database
"database.db",
"../vfx_project_management.db"
]
for path in possible_paths:
if Path(path).exists():
return path
# If no existing database found, use the default name
return "vfx_project_management.db"
def check_column_exists(cursor, table_name, column_name):
"""Check if a column exists in a table."""
cursor.execute(f"PRAGMA table_info({table_name})")
columns = [column[1] for column in cursor.fetchall()]
return column_name in columns
def check_index_exists(cursor, index_name):
"""Check if an index exists."""
cursor.execute("SELECT name FROM sqlite_master WHERE type='index' AND name=?", (index_name,))
return cursor.fetchone() is not None
def check_constraint_exists(cursor, table_name, constraint_name):
"""Check if a constraint exists by examining the table schema."""
cursor.execute("SELECT sql FROM sqlite_master WHERE type='table' AND name=?", (table_name,))
result = cursor.fetchone()
if result:
return constraint_name in result[0]
return False
def validate_data_integrity(cursor):
"""Validate that all episodes have valid project references."""
logger.info("Validating data integrity...")
# Check for orphaned episodes (episodes without valid project references)
cursor.execute("""
SELECT e.id, e.name, e.project_id
FROM episodes e
LEFT JOIN projects p ON e.project_id = p.id
WHERE p.id IS NULL
""")
orphaned_episodes = cursor.fetchall()
if orphaned_episodes:
logger.error(f"Found {len(orphaned_episodes)} orphaned episodes:")
for episode in orphaned_episodes:
logger.error(f" Episode {episode[0]}: {episode[1]} -> Invalid project_id: {episode[2]}")
return False
# Check for shots with episodes that don't have project references
cursor.execute("""
SELECT s.id, s.name, s.episode_id, e.name as episode_name, e.project_id
FROM shots s
JOIN episodes e ON s.episode_id = e.id
LEFT JOIN projects p ON e.project_id = p.id
WHERE p.id IS NULL
""")
shots_with_invalid_episodes = cursor.fetchall()
if shots_with_invalid_episodes:
logger.error(f"Found {len(shots_with_invalid_episodes)} shots with invalid episode references:")
for shot in shots_with_invalid_episodes:
logger.error(f" Shot {shot[0]}: {shot[1]} -> Episode {shot[2]} ({shot[3]}) -> Invalid project_id: {shot[4]}")
return False
logger.info("Data integrity validation passed")
return True
def check_name_conflicts(cursor):
"""Check for potential project-scoped name conflicts."""
logger.info("Checking for potential project-scoped name conflicts...")
cursor.execute("""
SELECT p.id, p.name as project_name, s.name as shot_name, COUNT(*) as count
FROM shots s
JOIN episodes e ON s.episode_id = e.id
JOIN projects p ON e.project_id = p.id
WHERE s.deleted_at IS NULL
GROUP BY p.id, s.name
HAVING COUNT(*) > 1
ORDER BY p.id, s.name
""")
conflicts = cursor.fetchall()
if conflicts:
logger.warning(f"Found {len(conflicts)} potential name conflicts:")
for conflict in conflicts:
logger.warning(f" Project {conflict[0]} ({conflict[1]}): Shot name '{conflict[2]}' appears {conflict[3]} times")
# Show detailed conflict information
for conflict in conflicts:
project_id, project_name, shot_name, count = conflict
cursor.execute("""
SELECT s.id, s.name, e.id as episode_id, e.name as episode_name
FROM shots s
JOIN episodes e ON s.episode_id = e.id
WHERE e.project_id = ? AND s.name = ? AND s.deleted_at IS NULL
ORDER BY s.id
""", (project_id, shot_name))
conflicting_shots = cursor.fetchall()
logger.warning(f" Conflicting shots in project {project_name}:")
for shot in conflicting_shots:
logger.warning(f" Shot {shot[0]}: {shot[1]} in Episode {shot[2]} ({shot[3]})")
return False
logger.info("No project-scoped name conflicts found")
return True
def migrate_database():
"""Add project_id column to shots table and set up constraints."""
db_path = get_database_path()
logger.info(f"Using database: {db_path}")
try:
# Connect to the database
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Enable foreign key constraints
cursor.execute("PRAGMA foreign_keys = ON")
# Check if shots table exists
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='shots'")
if not cursor.fetchone():
logger.error("Shots table not found. Database may not be initialized.")
return False
# Check if projects table exists
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='projects'")
if not cursor.fetchone():
logger.error("Projects table not found. Database may not be initialized.")
return False
# Check if episodes table exists
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='episodes'")
if not cursor.fetchone():
logger.error("Episodes table not found. Database may not be initialized.")
return False
# Validate data integrity before migration
if not validate_data_integrity(cursor):
logger.error("Data integrity validation failed. Please fix data issues before running migration.")
return False
# Check for name conflicts before adding unique constraint
if not check_name_conflicts(cursor):
logger.error("Name conflicts detected. Please resolve conflicts before running migration.")
logger.error("You may need to rename conflicting shots to ensure uniqueness within each project.")
return False
# Check if project_id column already exists
if check_column_exists(cursor, "shots", "project_id"):
logger.warning("project_id column already exists in shots table")
# Check if migration is already complete
cursor.execute("PRAGMA table_info(shots)")
columns = {col[1]: col for col in cursor.fetchall()}
project_id_col = columns.get('project_id')
if project_id_col and project_id_col[3] == 1: # NOT NULL constraint exists
logger.info("Migration appears to be already complete")
return True
else:
logger.info("project_id column exists but migration is incomplete, continuing...")
# Continue with migration to complete it
logger.info("Starting shots table project_id migration...")
# Step 1: Add project_id column (nullable initially for data population)
if not check_column_exists(cursor, "shots", "project_id"):
logger.info("Adding project_id column to shots table...")
cursor.execute("""
ALTER TABLE shots
ADD COLUMN project_id INTEGER
""")
else:
logger.info("project_id column already exists, skipping column creation...")
# Step 2: Populate project_id for existing shots based on episode relationships
logger.info("Populating project_id for existing shots...")
cursor.execute("""
UPDATE shots
SET project_id = (
SELECT e.project_id
FROM episodes e
WHERE e.id = shots.episode_id
)
""")
# Verify that all shots now have project_id
cursor.execute("SELECT COUNT(*) FROM shots WHERE project_id IS NULL")
null_project_count = cursor.fetchone()[0]
if null_project_count > 0:
logger.error(f"Failed to populate project_id for {null_project_count} shots")
return False
# Get count of updated shots
cursor.execute("SELECT COUNT(*) FROM shots WHERE project_id IS NOT NULL")
updated_count = cursor.fetchone()[0]
logger.info(f"Successfully populated project_id for {updated_count} shots")
# Step 3: Create a new table with the proper constraints
logger.info("Creating new shots table with proper constraints...")
# First, get the current table schema to preserve other columns
cursor.execute("PRAGMA table_info(shots)")
columns_info = cursor.fetchall()
# Create new table with project_id as NOT NULL and foreign key constraint
cursor.execute("""
CREATE TABLE shots_new (
id INTEGER PRIMARY KEY,
project_id INTEGER NOT NULL,
episode_id INTEGER NOT NULL,
name VARCHAR NOT NULL,
description VARCHAR,
frame_start INTEGER NOT NULL DEFAULT 1001,
frame_end INTEGER NOT NULL DEFAULT 1001,
status VARCHAR(11) NOT NULL DEFAULT 'not_started',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
deleted_at TIMESTAMP,
deleted_by INTEGER,
FOREIGN KEY (project_id) REFERENCES projects(id),
FOREIGN KEY (episode_id) REFERENCES episodes(id),
FOREIGN KEY (deleted_by) REFERENCES users(id)
)
""")
# Copy data from old table to new table
logger.info("Copying data to new table...")
cursor.execute("""
INSERT INTO shots_new
SELECT id, project_id, episode_id, name, description, frame_start, frame_end,
status, created_at, updated_at, deleted_at, deleted_by
FROM shots
""")
# Verify data copy
cursor.execute("SELECT COUNT(*) FROM shots")
original_count = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM shots_new")
new_count = cursor.fetchone()[0]
if original_count != new_count:
logger.error(f"Data copy failed: original={original_count}, new={new_count}")
return False
logger.info(f"Successfully copied {new_count} shots to new table")
# Drop old table and rename new table
logger.info("Replacing old table with new table...")
cursor.execute("DROP TABLE shots")
cursor.execute("ALTER TABLE shots_new RENAME TO shots")
# Step 4: Create indexes for performance
logger.info("Creating indexes for performance optimization...")
# Index on project_id for filtering
cursor.execute("""
CREATE INDEX idx_shots_project_id ON shots(project_id)
""")
# Index on name for searching
cursor.execute("""
CREATE INDEX idx_shots_name ON shots(name)
""")
# Partial index for active (non-deleted) shots
cursor.execute("""
CREATE INDEX idx_shots_active ON shots(id) WHERE deleted_at IS NULL
""")
# Composite index for project + episode queries
cursor.execute("""
CREATE INDEX idx_shots_project_episode ON shots(project_id, episode_id)
""")
# Unique constraint for project-scoped shot names (excluding soft-deleted shots)
cursor.execute("""
CREATE UNIQUE INDEX idx_shots_project_name_unique
ON shots(project_id, name) WHERE deleted_at IS NULL
""")
logger.info("Successfully created performance indexes and unique constraint")
# Commit all changes
conn.commit()
# Step 5: Verify the migration
logger.info("Verifying migration...")
# Check that project_id column exists and is properly constrained
cursor.execute("PRAGMA table_info(shots)")
columns = {col[1]: col for col in cursor.fetchall()}
if 'project_id' not in columns:
logger.error("project_id column not found after migration")
return False
project_id_col = columns['project_id']
if project_id_col[3] != 1: # NOT NULL constraint
logger.error("project_id column is not properly constrained as NOT NULL")
return False
# Check foreign key constraints
cursor.execute("PRAGMA foreign_key_list(shots)")
foreign_keys = cursor.fetchall()
project_fk_found = False
for fk in foreign_keys:
if fk[2] == 'projects' and fk[3] == 'project_id':
project_fk_found = True
break
if not project_fk_found:
logger.error("Foreign key constraint to projects table not found")
return False
# Check unique constraint (implemented as partial unique index)
cursor.execute("PRAGMA index_list(shots)")
indexes = [index[1] for index in cursor.fetchall()]
if "idx_shots_project_name_unique" not in indexes:
logger.error("Unique constraint for project-scoped names not found")
return False
# Verify data integrity after migration
cursor.execute("""
SELECT s.id, s.name, s.project_id, e.project_id as episode_project_id
FROM shots s
JOIN episodes e ON s.episode_id = e.id
WHERE s.project_id != e.project_id
""")
inconsistent_data = cursor.fetchall()
if inconsistent_data:
logger.error(f"Found {len(inconsistent_data)} shots with inconsistent project_id")
return False
# Show migration summary
cursor.execute("SELECT COUNT(*) FROM shots")
total_shots = cursor.fetchone()[0]
cursor.execute("""
SELECT p.name, COUNT(s.id) as shot_count
FROM projects p
LEFT JOIN shots s ON p.id = s.project_id AND s.deleted_at IS NULL
GROUP BY p.id, p.name
ORDER BY shot_count DESC
""")
project_summary = cursor.fetchall()
logger.info("Migration completed successfully!")
logger.info(f"Total shots: {total_shots}")
logger.info("Shots per project:")
for project_name, shot_count in project_summary:
logger.info(f" {project_name}: {shot_count} shots")
return True
except sqlite3.Error as e:
logger.error(f"Database error: {e}")
if conn:
conn.rollback()
return False
except Exception as e:
logger.error(f"Unexpected error: {e}")
if conn:
conn.rollback()
return False
finally:
if conn:
conn.close()
def verify_migration():
"""Verify that the migration was successful."""
db_path = get_database_path()
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
logger.info("Performing post-migration verification...")
# Check table structure
cursor.execute("PRAGMA table_info(shots)")
columns = {col[1]: col for col in cursor.fetchall()}
# Verify project_id column exists and is NOT NULL
if 'project_id' not in columns:
logger.error("❌ project_id column not found")
return False
if columns['project_id'][3] != 1: # NOT NULL constraint
logger.error("❌ project_id column is not NOT NULL")
return False
logger.info("✅ project_id column exists and is properly constrained")
# Check foreign key constraints
cursor.execute("PRAGMA foreign_key_list(shots)")
foreign_keys = cursor.fetchall()
project_fk_found = False
for fk in foreign_keys:
if fk[2] == 'projects' and fk[3] == 'project_id':
project_fk_found = True
break
if not project_fk_found:
logger.error("❌ Foreign key constraint to projects table not found")
return False
logger.info("✅ Foreign key constraint to projects table verified")
# Check indexes
cursor.execute("PRAGMA index_list(shots)")
indexes = [index[1] for index in cursor.fetchall()]
required_indexes = ['idx_shots_project_id', 'idx_shots_name', 'idx_shots_active', 'idx_shots_project_episode', 'idx_shots_project_name_unique']
for index_name in required_indexes:
if index_name not in indexes:
logger.error(f"❌ Index {index_name} not found")
return False
logger.info("✅ All required indexes verified")
# Check unique constraint (implemented as partial unique index)
if "idx_shots_project_name_unique" not in indexes:
logger.error("❌ Unique constraint for project-scoped names not found")
return False
logger.info("✅ Unique constraint for project-scoped names verified")
# Verify data consistency
cursor.execute("""
SELECT COUNT(*) FROM shots s
JOIN episodes e ON s.episode_id = e.id
WHERE s.project_id = e.project_id
""")
consistent_count = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM shots")
total_count = cursor.fetchone()[0]
if consistent_count != total_count:
logger.error(f"❌ Data consistency check failed: {consistent_count}/{total_count} shots have consistent project_id")
return False
logger.info(f"✅ Data consistency verified: {consistent_count} shots have consistent project_id")
return True
except sqlite3.Error as e:
logger.error(f"Verification error: {e}")
return False
finally:
if conn:
conn.close()
if __name__ == "__main__":
print("VFX Project Management - Shot Project ID Migration")
print("=" * 60)
success = migrate_database()
if success:
print("\n" + "=" * 60)
print("VERIFYING MIGRATION")
print("=" * 60)
if verify_migration():
print("\n✅ SUCCESS: Migration completed successfully!")
print("✅ All shots now have project_id with proper constraints and indexes")
sys.exit(0)
else:
print("\n❌ FAILED: Migration verification failed!")
sys.exit(1)
else:
print("\n❌ FAILED: Migration failed!")
sys.exit(1)