LinkDesk/backend/create_task_status_indexes.py

255 lines
8.9 KiB
Python

#!/usr/bin/env python3
"""
Create database indexes to optimize task status queries for shots and assets.
This script implements the database schema optimization from the shot-asset-task-status-optimization spec.
"""
import sqlite3
import sys
from pathlib import Path
def create_task_status_indexes():
"""Create optimized indexes for task status queries."""
db_path = Path("database.db")
if not db_path.exists():
print("Error: database.db not found. Please run from the backend directory.")
return False
conn = sqlite3.connect('database.db')
cursor = conn.cursor()
try:
print("Creating optimized indexes for task status queries...")
# Index 1: Optimize task lookups by shot_id (active tasks only)
print("Creating idx_tasks_shot_id_active...")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_tasks_shot_id_active
ON tasks(shot_id)
WHERE deleted_at IS NULL
""")
# Index 2: Optimize task lookups by asset_id (active tasks only)
print("Creating idx_tasks_asset_id_active...")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_tasks_asset_id_active
ON tasks(asset_id)
WHERE deleted_at IS NULL
""")
# Index 3: Optimize task status filtering (active tasks only)
print("Creating idx_tasks_status_type_active...")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_tasks_status_type_active
ON tasks(status, task_type)
WHERE deleted_at IS NULL
""")
# Index 4: Composite index for shot + status + type queries (most common pattern)
print("Creating idx_tasks_shot_status_type_active...")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_tasks_shot_status_type_active
ON tasks(shot_id, status, task_type)
WHERE deleted_at IS NULL
""")
# Index 5: Composite index for asset + status + type queries (most common pattern)
print("Creating idx_tasks_asset_status_type_active...")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_tasks_asset_status_type_active
ON tasks(asset_id, status, task_type)
WHERE deleted_at IS NULL
""")
# Index 6: Optimize queries that need task details (id, type, status, assignee, updated_at)
print("Creating idx_tasks_details_shot...")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_tasks_details_shot
ON tasks(shot_id, id, task_type, status, assigned_user_id, updated_at)
WHERE deleted_at IS NULL
""")
# Index 7: Optimize queries that need task details for assets
print("Creating idx_tasks_details_asset...")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_tasks_details_asset
ON tasks(asset_id, id, task_type, status, assigned_user_id, updated_at)
WHERE deleted_at IS NULL
""")
# Index 8: Optimize project-wide task queries with status filtering
print("Creating idx_tasks_project_status_active...")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_tasks_project_status_active
ON tasks(project_id, status, task_type)
WHERE deleted_at IS NULL
""")
conn.commit()
print("✅ All indexes created successfully!")
# Verify indexes were created
print("\nVerifying created indexes...")
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='index'
AND name LIKE 'idx_tasks_%_active'
ORDER BY name
""")
new_indexes = cursor.fetchall()
for idx in new_indexes:
print(f"{idx[0]}")
return True
except sqlite3.Error as e:
print(f"❌ Error creating indexes: {e}")
conn.rollback()
return False
finally:
conn.close()
def test_index_performance():
"""Test the performance of the new indexes with sample queries."""
conn = sqlite3.connect('database.db')
cursor = conn.cursor()
try:
print("\n" + "="*50)
print("TESTING INDEX PERFORMANCE")
print("="*50)
# Test 1: Shot task status aggregation query
print("\nTest 1: Shot task status aggregation")
cursor.execute("EXPLAIN QUERY PLAN SELECT shot_id, task_type, status FROM tasks WHERE shot_id = 1 AND deleted_at IS NULL")
plan = cursor.fetchall()
for row in plan:
print(f" {row}")
# Test 2: Asset task status aggregation query
print("\nTest 2: Asset task status aggregation")
cursor.execute("EXPLAIN QUERY PLAN SELECT asset_id, task_type, status FROM tasks WHERE asset_id = 1 AND deleted_at IS NULL")
plan = cursor.fetchall()
for row in plan:
print(f" {row}")
# Test 3: Project-wide status filtering
print("\nTest 3: Project-wide status filtering")
cursor.execute("EXPLAIN QUERY PLAN SELECT * FROM tasks WHERE project_id = 1 AND status = 'in_progress' AND deleted_at IS NULL")
plan = cursor.fetchall()
for row in plan:
print(f" {row}")
# Test 4: Complex join query (shots with task status)
print("\nTest 4: Shots with task status join")
cursor.execute("""
EXPLAIN QUERY PLAN
SELECT s.id, s.name, t.task_type, t.status
FROM shots s
LEFT JOIN tasks t ON s.id = t.shot_id AND t.deleted_at IS NULL
WHERE s.deleted_at IS NULL
LIMIT 10
""")
plan = cursor.fetchall()
for row in plan:
print(f" {row}")
print("\n✅ Index performance tests completed!")
except sqlite3.Error as e:
print(f"❌ Error testing performance: {e}")
finally:
conn.close()
def get_sample_data_stats():
"""Get statistics about the current data to validate index effectiveness."""
conn = sqlite3.connect('database.db')
cursor = conn.cursor()
try:
print("\n" + "="*50)
print("SAMPLE DATA STATISTICS")
print("="*50)
# Count total tasks
cursor.execute("SELECT COUNT(*) FROM tasks WHERE deleted_at IS NULL")
total_tasks = cursor.fetchone()[0]
print(f"Total active tasks: {total_tasks}")
# Count tasks by type
cursor.execute("SELECT task_type, COUNT(*) FROM tasks WHERE deleted_at IS NULL GROUP BY task_type")
task_types = cursor.fetchall()
print("\nTasks by type:")
for task_type, count in task_types:
print(f" {task_type}: {count}")
# Count tasks by status
cursor.execute("SELECT status, COUNT(*) FROM tasks WHERE deleted_at IS NULL GROUP BY status")
task_statuses = cursor.fetchall()
print("\nTasks by status:")
for status, count in task_statuses:
print(f" {status}: {count}")
# Count shots with tasks
cursor.execute("""
SELECT COUNT(DISTINCT s.id)
FROM shots s
INNER JOIN tasks t ON s.id = t.shot_id
WHERE s.deleted_at IS NULL AND t.deleted_at IS NULL
""")
shots_with_tasks = cursor.fetchone()[0]
print(f"\nShots with tasks: {shots_with_tasks}")
# Count assets with tasks
cursor.execute("""
SELECT COUNT(DISTINCT a.id)
FROM assets a
INNER JOIN tasks t ON a.id = t.asset_id
WHERE a.deleted_at IS NULL AND t.deleted_at IS NULL
""")
assets_with_tasks = cursor.fetchone()[0]
print(f"Assets with tasks: {assets_with_tasks}")
return {
'total_tasks': total_tasks,
'shots_with_tasks': shots_with_tasks,
'assets_with_tasks': assets_with_tasks
}
except sqlite3.Error as e:
print(f"❌ Error getting statistics: {e}")
return None
finally:
conn.close()
if __name__ == "__main__":
print("Shot-Asset Task Status Optimization: Database Index Creation")
print("=" * 60)
# Get current data statistics
stats = get_sample_data_stats()
# Create the indexes
success = create_task_status_indexes()
if success:
# Test index performance
test_index_performance()
print("\n" + "="*60)
print("INDEX CREATION COMPLETED SUCCESSFULLY!")
print("="*60)
print("\nNext steps:")
print("1. Run backend optimization tests")
print("2. Implement optimized query patterns in routers")
print("3. Test with larger datasets")
else:
print("\n❌ Index creation failed!")
sys.exit(1)