524 lines
20 KiB
Python
524 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Migration script to add project_id column to shots table.
|
|
|
|
This script adds the following to the shots table:
|
|
- project_id column with foreign key constraint to projects table
|
|
- Index on project_id for performance optimization
|
|
- Populates project_id for existing shots based on episode relationships
|
|
- Unique constraint for project-scoped shot names (excluding soft-deleted shots)
|
|
|
|
Requirements: 2.1, 2.2, 2.3, 2.4, 2.5
|
|
|
|
Usage:
|
|
python migrate_shot_project_id.py
|
|
"""
|
|
|
|
import sqlite3
|
|
import sys
|
|
from pathlib import Path
|
|
import logging
|
|
|
|
# Setup logging
|
|
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def get_database_path():
|
|
"""Get the database path, trying multiple possible locations."""
|
|
possible_paths = [
|
|
"vfx_project_management.db", # Primary database
|
|
"database.db",
|
|
"../vfx_project_management.db"
|
|
]
|
|
|
|
for path in possible_paths:
|
|
if Path(path).exists():
|
|
return path
|
|
|
|
# If no existing database found, use the default name
|
|
return "vfx_project_management.db"
|
|
|
|
|
|
def check_column_exists(cursor, table_name, column_name):
|
|
"""Check if a column exists in a table."""
|
|
cursor.execute(f"PRAGMA table_info({table_name})")
|
|
columns = [column[1] for column in cursor.fetchall()]
|
|
return column_name in columns
|
|
|
|
|
|
def check_index_exists(cursor, index_name):
|
|
"""Check if an index exists."""
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='index' AND name=?", (index_name,))
|
|
return cursor.fetchone() is not None
|
|
|
|
|
|
def check_constraint_exists(cursor, table_name, constraint_name):
|
|
"""Check if a constraint exists by examining the table schema."""
|
|
cursor.execute("SELECT sql FROM sqlite_master WHERE type='table' AND name=?", (table_name,))
|
|
result = cursor.fetchone()
|
|
if result:
|
|
return constraint_name in result[0]
|
|
return False
|
|
|
|
|
|
def validate_data_integrity(cursor):
|
|
"""Validate that all episodes have valid project references."""
|
|
logger.info("Validating data integrity...")
|
|
|
|
# Check for orphaned episodes (episodes without valid project references)
|
|
cursor.execute("""
|
|
SELECT e.id, e.name, e.project_id
|
|
FROM episodes e
|
|
LEFT JOIN projects p ON e.project_id = p.id
|
|
WHERE p.id IS NULL
|
|
""")
|
|
orphaned_episodes = cursor.fetchall()
|
|
|
|
if orphaned_episodes:
|
|
logger.error(f"Found {len(orphaned_episodes)} orphaned episodes:")
|
|
for episode in orphaned_episodes:
|
|
logger.error(f" Episode {episode[0]}: {episode[1]} -> Invalid project_id: {episode[2]}")
|
|
return False
|
|
|
|
# Check for shots with episodes that don't have project references
|
|
cursor.execute("""
|
|
SELECT s.id, s.name, s.episode_id, e.name as episode_name, e.project_id
|
|
FROM shots s
|
|
JOIN episodes e ON s.episode_id = e.id
|
|
LEFT JOIN projects p ON e.project_id = p.id
|
|
WHERE p.id IS NULL
|
|
""")
|
|
shots_with_invalid_episodes = cursor.fetchall()
|
|
|
|
if shots_with_invalid_episodes:
|
|
logger.error(f"Found {len(shots_with_invalid_episodes)} shots with invalid episode references:")
|
|
for shot in shots_with_invalid_episodes:
|
|
logger.error(f" Shot {shot[0]}: {shot[1]} -> Episode {shot[2]} ({shot[3]}) -> Invalid project_id: {shot[4]}")
|
|
return False
|
|
|
|
logger.info("Data integrity validation passed")
|
|
return True
|
|
|
|
|
|
def check_name_conflicts(cursor):
|
|
"""Check for potential project-scoped name conflicts."""
|
|
logger.info("Checking for potential project-scoped name conflicts...")
|
|
|
|
cursor.execute("""
|
|
SELECT p.id, p.name as project_name, s.name as shot_name, COUNT(*) as count
|
|
FROM shots s
|
|
JOIN episodes e ON s.episode_id = e.id
|
|
JOIN projects p ON e.project_id = p.id
|
|
WHERE s.deleted_at IS NULL
|
|
GROUP BY p.id, s.name
|
|
HAVING COUNT(*) > 1
|
|
ORDER BY p.id, s.name
|
|
""")
|
|
|
|
conflicts = cursor.fetchall()
|
|
|
|
if conflicts:
|
|
logger.warning(f"Found {len(conflicts)} potential name conflicts:")
|
|
for conflict in conflicts:
|
|
logger.warning(f" Project {conflict[0]} ({conflict[1]}): Shot name '{conflict[2]}' appears {conflict[3]} times")
|
|
|
|
# Show detailed conflict information
|
|
for conflict in conflicts:
|
|
project_id, project_name, shot_name, count = conflict
|
|
cursor.execute("""
|
|
SELECT s.id, s.name, e.id as episode_id, e.name as episode_name
|
|
FROM shots s
|
|
JOIN episodes e ON s.episode_id = e.id
|
|
WHERE e.project_id = ? AND s.name = ? AND s.deleted_at IS NULL
|
|
ORDER BY s.id
|
|
""", (project_id, shot_name))
|
|
|
|
conflicting_shots = cursor.fetchall()
|
|
logger.warning(f" Conflicting shots in project {project_name}:")
|
|
for shot in conflicting_shots:
|
|
logger.warning(f" Shot {shot[0]}: {shot[1]} in Episode {shot[2]} ({shot[3]})")
|
|
|
|
return False
|
|
|
|
logger.info("No project-scoped name conflicts found")
|
|
return True
|
|
|
|
|
|
def migrate_database():
|
|
"""Add project_id column to shots table and set up constraints."""
|
|
db_path = get_database_path()
|
|
logger.info(f"Using database: {db_path}")
|
|
|
|
try:
|
|
# Connect to the database
|
|
conn = sqlite3.connect(db_path)
|
|
cursor = conn.cursor()
|
|
|
|
# Enable foreign key constraints
|
|
cursor.execute("PRAGMA foreign_keys = ON")
|
|
|
|
# Check if shots table exists
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='shots'")
|
|
if not cursor.fetchone():
|
|
logger.error("Shots table not found. Database may not be initialized.")
|
|
return False
|
|
|
|
# Check if projects table exists
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='projects'")
|
|
if not cursor.fetchone():
|
|
logger.error("Projects table not found. Database may not be initialized.")
|
|
return False
|
|
|
|
# Check if episodes table exists
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='episodes'")
|
|
if not cursor.fetchone():
|
|
logger.error("Episodes table not found. Database may not be initialized.")
|
|
return False
|
|
|
|
# Validate data integrity before migration
|
|
if not validate_data_integrity(cursor):
|
|
logger.error("Data integrity validation failed. Please fix data issues before running migration.")
|
|
return False
|
|
|
|
# Check for name conflicts before adding unique constraint
|
|
if not check_name_conflicts(cursor):
|
|
logger.error("Name conflicts detected. Please resolve conflicts before running migration.")
|
|
logger.error("You may need to rename conflicting shots to ensure uniqueness within each project.")
|
|
return False
|
|
|
|
# Check if project_id column already exists
|
|
if check_column_exists(cursor, "shots", "project_id"):
|
|
logger.warning("project_id column already exists in shots table")
|
|
|
|
# Check if migration is already complete
|
|
cursor.execute("PRAGMA table_info(shots)")
|
|
columns = {col[1]: col for col in cursor.fetchall()}
|
|
project_id_col = columns.get('project_id')
|
|
|
|
if project_id_col and project_id_col[3] == 1: # NOT NULL constraint exists
|
|
logger.info("Migration appears to be already complete")
|
|
return True
|
|
else:
|
|
logger.info("project_id column exists but migration is incomplete, continuing...")
|
|
# Continue with migration to complete it
|
|
|
|
logger.info("Starting shots table project_id migration...")
|
|
|
|
# Step 1: Add project_id column (nullable initially for data population)
|
|
if not check_column_exists(cursor, "shots", "project_id"):
|
|
logger.info("Adding project_id column to shots table...")
|
|
cursor.execute("""
|
|
ALTER TABLE shots
|
|
ADD COLUMN project_id INTEGER
|
|
""")
|
|
else:
|
|
logger.info("project_id column already exists, skipping column creation...")
|
|
|
|
# Step 2: Populate project_id for existing shots based on episode relationships
|
|
logger.info("Populating project_id for existing shots...")
|
|
cursor.execute("""
|
|
UPDATE shots
|
|
SET project_id = (
|
|
SELECT e.project_id
|
|
FROM episodes e
|
|
WHERE e.id = shots.episode_id
|
|
)
|
|
""")
|
|
|
|
# Verify that all shots now have project_id
|
|
cursor.execute("SELECT COUNT(*) FROM shots WHERE project_id IS NULL")
|
|
null_project_count = cursor.fetchone()[0]
|
|
|
|
if null_project_count > 0:
|
|
logger.error(f"Failed to populate project_id for {null_project_count} shots")
|
|
return False
|
|
|
|
# Get count of updated shots
|
|
cursor.execute("SELECT COUNT(*) FROM shots WHERE project_id IS NOT NULL")
|
|
updated_count = cursor.fetchone()[0]
|
|
logger.info(f"Successfully populated project_id for {updated_count} shots")
|
|
|
|
# Step 3: Create a new table with the proper constraints
|
|
logger.info("Creating new shots table with proper constraints...")
|
|
|
|
# First, get the current table schema to preserve other columns
|
|
cursor.execute("PRAGMA table_info(shots)")
|
|
columns_info = cursor.fetchall()
|
|
|
|
# Create new table with project_id as NOT NULL and foreign key constraint
|
|
cursor.execute("""
|
|
CREATE TABLE shots_new (
|
|
id INTEGER PRIMARY KEY,
|
|
project_id INTEGER NOT NULL,
|
|
episode_id INTEGER NOT NULL,
|
|
name VARCHAR NOT NULL,
|
|
description VARCHAR,
|
|
frame_start INTEGER NOT NULL DEFAULT 1001,
|
|
frame_end INTEGER NOT NULL DEFAULT 1001,
|
|
status VARCHAR(11) NOT NULL DEFAULT 'not_started',
|
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
deleted_at TIMESTAMP,
|
|
deleted_by INTEGER,
|
|
FOREIGN KEY (project_id) REFERENCES projects(id),
|
|
FOREIGN KEY (episode_id) REFERENCES episodes(id),
|
|
FOREIGN KEY (deleted_by) REFERENCES users(id)
|
|
)
|
|
""")
|
|
|
|
# Copy data from old table to new table
|
|
logger.info("Copying data to new table...")
|
|
cursor.execute("""
|
|
INSERT INTO shots_new
|
|
SELECT id, project_id, episode_id, name, description, frame_start, frame_end,
|
|
status, created_at, updated_at, deleted_at, deleted_by
|
|
FROM shots
|
|
""")
|
|
|
|
# Verify data copy
|
|
cursor.execute("SELECT COUNT(*) FROM shots")
|
|
original_count = cursor.fetchone()[0]
|
|
cursor.execute("SELECT COUNT(*) FROM shots_new")
|
|
new_count = cursor.fetchone()[0]
|
|
|
|
if original_count != new_count:
|
|
logger.error(f"Data copy failed: original={original_count}, new={new_count}")
|
|
return False
|
|
|
|
logger.info(f"Successfully copied {new_count} shots to new table")
|
|
|
|
# Drop old table and rename new table
|
|
logger.info("Replacing old table with new table...")
|
|
cursor.execute("DROP TABLE shots")
|
|
cursor.execute("ALTER TABLE shots_new RENAME TO shots")
|
|
|
|
# Step 4: Create indexes for performance
|
|
logger.info("Creating indexes for performance optimization...")
|
|
|
|
# Index on project_id for filtering
|
|
cursor.execute("""
|
|
CREATE INDEX idx_shots_project_id ON shots(project_id)
|
|
""")
|
|
|
|
# Index on name for searching
|
|
cursor.execute("""
|
|
CREATE INDEX idx_shots_name ON shots(name)
|
|
""")
|
|
|
|
# Partial index for active (non-deleted) shots
|
|
cursor.execute("""
|
|
CREATE INDEX idx_shots_active ON shots(id) WHERE deleted_at IS NULL
|
|
""")
|
|
|
|
# Composite index for project + episode queries
|
|
cursor.execute("""
|
|
CREATE INDEX idx_shots_project_episode ON shots(project_id, episode_id)
|
|
""")
|
|
|
|
# Unique constraint for project-scoped shot names (excluding soft-deleted shots)
|
|
cursor.execute("""
|
|
CREATE UNIQUE INDEX idx_shots_project_name_unique
|
|
ON shots(project_id, name) WHERE deleted_at IS NULL
|
|
""")
|
|
|
|
logger.info("Successfully created performance indexes and unique constraint")
|
|
|
|
# Commit all changes
|
|
conn.commit()
|
|
|
|
# Step 5: Verify the migration
|
|
logger.info("Verifying migration...")
|
|
|
|
# Check that project_id column exists and is properly constrained
|
|
cursor.execute("PRAGMA table_info(shots)")
|
|
columns = {col[1]: col for col in cursor.fetchall()}
|
|
|
|
if 'project_id' not in columns:
|
|
logger.error("project_id column not found after migration")
|
|
return False
|
|
|
|
project_id_col = columns['project_id']
|
|
if project_id_col[3] != 1: # NOT NULL constraint
|
|
logger.error("project_id column is not properly constrained as NOT NULL")
|
|
return False
|
|
|
|
# Check foreign key constraints
|
|
cursor.execute("PRAGMA foreign_key_list(shots)")
|
|
foreign_keys = cursor.fetchall()
|
|
|
|
project_fk_found = False
|
|
for fk in foreign_keys:
|
|
if fk[2] == 'projects' and fk[3] == 'project_id':
|
|
project_fk_found = True
|
|
break
|
|
|
|
if not project_fk_found:
|
|
logger.error("Foreign key constraint to projects table not found")
|
|
return False
|
|
|
|
# Check unique constraint (implemented as partial unique index)
|
|
cursor.execute("PRAGMA index_list(shots)")
|
|
indexes = [index[1] for index in cursor.fetchall()]
|
|
if "idx_shots_project_name_unique" not in indexes:
|
|
logger.error("Unique constraint for project-scoped names not found")
|
|
return False
|
|
|
|
# Verify data integrity after migration
|
|
cursor.execute("""
|
|
SELECT s.id, s.name, s.project_id, e.project_id as episode_project_id
|
|
FROM shots s
|
|
JOIN episodes e ON s.episode_id = e.id
|
|
WHERE s.project_id != e.project_id
|
|
""")
|
|
|
|
inconsistent_data = cursor.fetchall()
|
|
if inconsistent_data:
|
|
logger.error(f"Found {len(inconsistent_data)} shots with inconsistent project_id")
|
|
return False
|
|
|
|
# Show migration summary
|
|
cursor.execute("SELECT COUNT(*) FROM shots")
|
|
total_shots = cursor.fetchone()[0]
|
|
|
|
cursor.execute("""
|
|
SELECT p.name, COUNT(s.id) as shot_count
|
|
FROM projects p
|
|
LEFT JOIN shots s ON p.id = s.project_id AND s.deleted_at IS NULL
|
|
GROUP BY p.id, p.name
|
|
ORDER BY shot_count DESC
|
|
""")
|
|
|
|
project_summary = cursor.fetchall()
|
|
|
|
logger.info("Migration completed successfully!")
|
|
logger.info(f"Total shots: {total_shots}")
|
|
logger.info("Shots per project:")
|
|
for project_name, shot_count in project_summary:
|
|
logger.info(f" {project_name}: {shot_count} shots")
|
|
|
|
return True
|
|
|
|
except sqlite3.Error as e:
|
|
logger.error(f"Database error: {e}")
|
|
if conn:
|
|
conn.rollback()
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error: {e}")
|
|
if conn:
|
|
conn.rollback()
|
|
return False
|
|
finally:
|
|
if conn:
|
|
conn.close()
|
|
|
|
|
|
def verify_migration():
|
|
"""Verify that the migration was successful."""
|
|
db_path = get_database_path()
|
|
|
|
try:
|
|
conn = sqlite3.connect(db_path)
|
|
cursor = conn.cursor()
|
|
|
|
logger.info("Performing post-migration verification...")
|
|
|
|
# Check table structure
|
|
cursor.execute("PRAGMA table_info(shots)")
|
|
columns = {col[1]: col for col in cursor.fetchall()}
|
|
|
|
# Verify project_id column exists and is NOT NULL
|
|
if 'project_id' not in columns:
|
|
logger.error("❌ project_id column not found")
|
|
return False
|
|
|
|
if columns['project_id'][3] != 1: # NOT NULL constraint
|
|
logger.error("❌ project_id column is not NOT NULL")
|
|
return False
|
|
|
|
logger.info("✅ project_id column exists and is properly constrained")
|
|
|
|
# Check foreign key constraints
|
|
cursor.execute("PRAGMA foreign_key_list(shots)")
|
|
foreign_keys = cursor.fetchall()
|
|
|
|
project_fk_found = False
|
|
for fk in foreign_keys:
|
|
if fk[2] == 'projects' and fk[3] == 'project_id':
|
|
project_fk_found = True
|
|
break
|
|
|
|
if not project_fk_found:
|
|
logger.error("❌ Foreign key constraint to projects table not found")
|
|
return False
|
|
|
|
logger.info("✅ Foreign key constraint to projects table verified")
|
|
|
|
# Check indexes
|
|
cursor.execute("PRAGMA index_list(shots)")
|
|
indexes = [index[1] for index in cursor.fetchall()]
|
|
|
|
required_indexes = ['idx_shots_project_id', 'idx_shots_name', 'idx_shots_active', 'idx_shots_project_episode', 'idx_shots_project_name_unique']
|
|
for index_name in required_indexes:
|
|
if index_name not in indexes:
|
|
logger.error(f"❌ Index {index_name} not found")
|
|
return False
|
|
|
|
logger.info("✅ All required indexes verified")
|
|
|
|
# Check unique constraint (implemented as partial unique index)
|
|
if "idx_shots_project_name_unique" not in indexes:
|
|
logger.error("❌ Unique constraint for project-scoped names not found")
|
|
return False
|
|
|
|
logger.info("✅ Unique constraint for project-scoped names verified")
|
|
|
|
# Verify data consistency
|
|
cursor.execute("""
|
|
SELECT COUNT(*) FROM shots s
|
|
JOIN episodes e ON s.episode_id = e.id
|
|
WHERE s.project_id = e.project_id
|
|
""")
|
|
consistent_count = cursor.fetchone()[0]
|
|
|
|
cursor.execute("SELECT COUNT(*) FROM shots")
|
|
total_count = cursor.fetchone()[0]
|
|
|
|
if consistent_count != total_count:
|
|
logger.error(f"❌ Data consistency check failed: {consistent_count}/{total_count} shots have consistent project_id")
|
|
return False
|
|
|
|
logger.info(f"✅ Data consistency verified: {consistent_count} shots have consistent project_id")
|
|
|
|
return True
|
|
|
|
except sqlite3.Error as e:
|
|
logger.error(f"Verification error: {e}")
|
|
return False
|
|
finally:
|
|
if conn:
|
|
conn.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print("VFX Project Management - Shot Project ID Migration")
|
|
print("=" * 60)
|
|
|
|
success = migrate_database()
|
|
|
|
if success:
|
|
print("\n" + "=" * 60)
|
|
print("VERIFYING MIGRATION")
|
|
print("=" * 60)
|
|
|
|
if verify_migration():
|
|
print("\n✅ SUCCESS: Migration completed successfully!")
|
|
print("✅ All shots now have project_id with proper constraints and indexes")
|
|
sys.exit(0)
|
|
else:
|
|
print("\n❌ FAILED: Migration verification failed!")
|
|
sys.exit(1)
|
|
else:
|
|
print("\n❌ FAILED: Migration failed!")
|
|
sys.exit(1) |