255 lines
8.9 KiB
Python
255 lines
8.9 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Create database indexes to optimize task status queries for shots and assets.
|
|
This script implements the database schema optimization from the shot-asset-task-status-optimization spec.
|
|
"""
|
|
import sqlite3
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
def create_task_status_indexes():
|
|
"""Create optimized indexes for task status queries."""
|
|
|
|
db_path = Path("database.db")
|
|
if not db_path.exists():
|
|
print("Error: database.db not found. Please run from the backend directory.")
|
|
return False
|
|
|
|
conn = sqlite3.connect('database.db')
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
print("Creating optimized indexes for task status queries...")
|
|
|
|
# Index 1: Optimize task lookups by shot_id (active tasks only)
|
|
print("Creating idx_tasks_shot_id_active...")
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_tasks_shot_id_active
|
|
ON tasks(shot_id)
|
|
WHERE deleted_at IS NULL
|
|
""")
|
|
|
|
# Index 2: Optimize task lookups by asset_id (active tasks only)
|
|
print("Creating idx_tasks_asset_id_active...")
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_tasks_asset_id_active
|
|
ON tasks(asset_id)
|
|
WHERE deleted_at IS NULL
|
|
""")
|
|
|
|
# Index 3: Optimize task status filtering (active tasks only)
|
|
print("Creating idx_tasks_status_type_active...")
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_tasks_status_type_active
|
|
ON tasks(status, task_type)
|
|
WHERE deleted_at IS NULL
|
|
""")
|
|
|
|
# Index 4: Composite index for shot + status + type queries (most common pattern)
|
|
print("Creating idx_tasks_shot_status_type_active...")
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_tasks_shot_status_type_active
|
|
ON tasks(shot_id, status, task_type)
|
|
WHERE deleted_at IS NULL
|
|
""")
|
|
|
|
# Index 5: Composite index for asset + status + type queries (most common pattern)
|
|
print("Creating idx_tasks_asset_status_type_active...")
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_tasks_asset_status_type_active
|
|
ON tasks(asset_id, status, task_type)
|
|
WHERE deleted_at IS NULL
|
|
""")
|
|
|
|
# Index 6: Optimize queries that need task details (id, type, status, assignee, updated_at)
|
|
print("Creating idx_tasks_details_shot...")
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_tasks_details_shot
|
|
ON tasks(shot_id, id, task_type, status, assigned_user_id, updated_at)
|
|
WHERE deleted_at IS NULL
|
|
""")
|
|
|
|
# Index 7: Optimize queries that need task details for assets
|
|
print("Creating idx_tasks_details_asset...")
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_tasks_details_asset
|
|
ON tasks(asset_id, id, task_type, status, assigned_user_id, updated_at)
|
|
WHERE deleted_at IS NULL
|
|
""")
|
|
|
|
# Index 8: Optimize project-wide task queries with status filtering
|
|
print("Creating idx_tasks_project_status_active...")
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_tasks_project_status_active
|
|
ON tasks(project_id, status, task_type)
|
|
WHERE deleted_at IS NULL
|
|
""")
|
|
|
|
conn.commit()
|
|
print("✅ All indexes created successfully!")
|
|
|
|
# Verify indexes were created
|
|
print("\nVerifying created indexes...")
|
|
cursor.execute("""
|
|
SELECT name FROM sqlite_master
|
|
WHERE type='index'
|
|
AND name LIKE 'idx_tasks_%_active'
|
|
ORDER BY name
|
|
""")
|
|
|
|
new_indexes = cursor.fetchall()
|
|
for idx in new_indexes:
|
|
print(f" ✓ {idx[0]}")
|
|
|
|
return True
|
|
|
|
except sqlite3.Error as e:
|
|
print(f"❌ Error creating indexes: {e}")
|
|
conn.rollback()
|
|
return False
|
|
|
|
finally:
|
|
conn.close()
|
|
|
|
def test_index_performance():
|
|
"""Test the performance of the new indexes with sample queries."""
|
|
|
|
conn = sqlite3.connect('database.db')
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
print("\n" + "="*50)
|
|
print("TESTING INDEX PERFORMANCE")
|
|
print("="*50)
|
|
|
|
# Test 1: Shot task status aggregation query
|
|
print("\nTest 1: Shot task status aggregation")
|
|
cursor.execute("EXPLAIN QUERY PLAN SELECT shot_id, task_type, status FROM tasks WHERE shot_id = 1 AND deleted_at IS NULL")
|
|
plan = cursor.fetchall()
|
|
for row in plan:
|
|
print(f" {row}")
|
|
|
|
# Test 2: Asset task status aggregation query
|
|
print("\nTest 2: Asset task status aggregation")
|
|
cursor.execute("EXPLAIN QUERY PLAN SELECT asset_id, task_type, status FROM tasks WHERE asset_id = 1 AND deleted_at IS NULL")
|
|
plan = cursor.fetchall()
|
|
for row in plan:
|
|
print(f" {row}")
|
|
|
|
# Test 3: Project-wide status filtering
|
|
print("\nTest 3: Project-wide status filtering")
|
|
cursor.execute("EXPLAIN QUERY PLAN SELECT * FROM tasks WHERE project_id = 1 AND status = 'in_progress' AND deleted_at IS NULL")
|
|
plan = cursor.fetchall()
|
|
for row in plan:
|
|
print(f" {row}")
|
|
|
|
# Test 4: Complex join query (shots with task status)
|
|
print("\nTest 4: Shots with task status join")
|
|
cursor.execute("""
|
|
EXPLAIN QUERY PLAN
|
|
SELECT s.id, s.name, t.task_type, t.status
|
|
FROM shots s
|
|
LEFT JOIN tasks t ON s.id = t.shot_id AND t.deleted_at IS NULL
|
|
WHERE s.deleted_at IS NULL
|
|
LIMIT 10
|
|
""")
|
|
plan = cursor.fetchall()
|
|
for row in plan:
|
|
print(f" {row}")
|
|
|
|
print("\n✅ Index performance tests completed!")
|
|
|
|
except sqlite3.Error as e:
|
|
print(f"❌ Error testing performance: {e}")
|
|
|
|
finally:
|
|
conn.close()
|
|
|
|
def get_sample_data_stats():
|
|
"""Get statistics about the current data to validate index effectiveness."""
|
|
|
|
conn = sqlite3.connect('database.db')
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
print("\n" + "="*50)
|
|
print("SAMPLE DATA STATISTICS")
|
|
print("="*50)
|
|
|
|
# Count total tasks
|
|
cursor.execute("SELECT COUNT(*) FROM tasks WHERE deleted_at IS NULL")
|
|
total_tasks = cursor.fetchone()[0]
|
|
print(f"Total active tasks: {total_tasks}")
|
|
|
|
# Count tasks by type
|
|
cursor.execute("SELECT task_type, COUNT(*) FROM tasks WHERE deleted_at IS NULL GROUP BY task_type")
|
|
task_types = cursor.fetchall()
|
|
print("\nTasks by type:")
|
|
for task_type, count in task_types:
|
|
print(f" {task_type}: {count}")
|
|
|
|
# Count tasks by status
|
|
cursor.execute("SELECT status, COUNT(*) FROM tasks WHERE deleted_at IS NULL GROUP BY status")
|
|
task_statuses = cursor.fetchall()
|
|
print("\nTasks by status:")
|
|
for status, count in task_statuses:
|
|
print(f" {status}: {count}")
|
|
|
|
# Count shots with tasks
|
|
cursor.execute("""
|
|
SELECT COUNT(DISTINCT s.id)
|
|
FROM shots s
|
|
INNER JOIN tasks t ON s.id = t.shot_id
|
|
WHERE s.deleted_at IS NULL AND t.deleted_at IS NULL
|
|
""")
|
|
shots_with_tasks = cursor.fetchone()[0]
|
|
print(f"\nShots with tasks: {shots_with_tasks}")
|
|
|
|
# Count assets with tasks
|
|
cursor.execute("""
|
|
SELECT COUNT(DISTINCT a.id)
|
|
FROM assets a
|
|
INNER JOIN tasks t ON a.id = t.asset_id
|
|
WHERE a.deleted_at IS NULL AND t.deleted_at IS NULL
|
|
""")
|
|
assets_with_tasks = cursor.fetchone()[0]
|
|
print(f"Assets with tasks: {assets_with_tasks}")
|
|
|
|
return {
|
|
'total_tasks': total_tasks,
|
|
'shots_with_tasks': shots_with_tasks,
|
|
'assets_with_tasks': assets_with_tasks
|
|
}
|
|
|
|
except sqlite3.Error as e:
|
|
print(f"❌ Error getting statistics: {e}")
|
|
return None
|
|
|
|
finally:
|
|
conn.close()
|
|
|
|
if __name__ == "__main__":
|
|
print("Shot-Asset Task Status Optimization: Database Index Creation")
|
|
print("=" * 60)
|
|
|
|
# Get current data statistics
|
|
stats = get_sample_data_stats()
|
|
|
|
# Create the indexes
|
|
success = create_task_status_indexes()
|
|
|
|
if success:
|
|
# Test index performance
|
|
test_index_performance()
|
|
|
|
print("\n" + "="*60)
|
|
print("INDEX CREATION COMPLETED SUCCESSFULLY!")
|
|
print("="*60)
|
|
print("\nNext steps:")
|
|
print("1. Run backend optimization tests")
|
|
print("2. Implement optimized query patterns in routers")
|
|
print("3. Test with larger datasets")
|
|
|
|
else:
|
|
print("\n❌ Index creation failed!")
|
|
sys.exit(1) |