LinkDesk/backend/test_index_performance.py

179 lines
7.1 KiB
Python

#!/usr/bin/env python3
"""
Test the performance of the newly created indexes with realistic queries.
"""
import sqlite3
import time
from contextlib import contextmanager
@contextmanager
def timer():
"""Context manager to measure execution time."""
start = time.time()
yield
end = time.time()
print(f" Execution time: {(end - start) * 1000:.2f}ms")
def test_optimized_queries():
"""Test the performance of optimized queries that will be used in the application."""
conn = sqlite3.connect('database.db')
cursor = conn.cursor()
try:
print("Testing optimized query performance...")
print("=" * 50)
# Test 1: Shot list with task status aggregation (simulating the optimized shot router)
print("\n1. Shot list with task status aggregation:")
with timer():
cursor.execute("""
SELECT
s.id, s.name, s.status,
GROUP_CONCAT(t.task_type || ':' || t.status) as task_statuses,
COUNT(t.id) as task_count
FROM shots s
LEFT JOIN tasks t ON s.id = t.shot_id AND t.deleted_at IS NULL
WHERE s.deleted_at IS NULL
GROUP BY s.id, s.name, s.status
LIMIT 10
""")
results = cursor.fetchall()
print(f" Retrieved {len(results)} shots with task data")
# Test 2: Asset list with task status aggregation
print("\n2. Asset list with task status aggregation:")
with timer():
cursor.execute("""
SELECT
a.id, a.name, a.status,
GROUP_CONCAT(t.task_type || ':' || t.status) as task_statuses,
COUNT(t.id) as task_count
FROM assets a
LEFT JOIN tasks t ON a.id = t.asset_id AND t.deleted_at IS NULL
WHERE a.deleted_at IS NULL
GROUP BY a.id, a.name, a.status
LIMIT 10
""")
results = cursor.fetchall()
print(f" Retrieved {len(results)} assets with task data")
# Test 3: Single shot with detailed task information
print("\n3. Single shot with detailed task information:")
with timer():
cursor.execute("""
SELECT
s.id, s.name, s.status,
t.id as task_id, t.task_type, t.status as task_status,
t.assigned_user_id, t.updated_at
FROM shots s
LEFT JOIN tasks t ON s.id = t.shot_id AND t.deleted_at IS NULL
WHERE s.id = 1 AND s.deleted_at IS NULL
""")
results = cursor.fetchall()
print(f" Retrieved shot with {len(results)} task details")
# Test 4: Project-wide task status filtering
print("\n4. Project-wide task status filtering:")
with timer():
cursor.execute("""
SELECT t.id, t.task_type, t.status, t.shot_id, t.asset_id
FROM tasks t
WHERE t.project_id = 1
AND t.status IN ('in_progress', 'submitted', 'approved')
AND t.deleted_at IS NULL
LIMIT 50
""")
results = cursor.fetchall()
print(f" Retrieved {len(results)} filtered tasks")
# Test 5: Complex aggregation query (simulating dashboard statistics)
print("\n5. Complex aggregation query (dashboard statistics):")
with timer():
cursor.execute("""
SELECT
t.status,
t.task_type,
COUNT(*) as count,
COUNT(DISTINCT t.shot_id) as shots_affected,
COUNT(DISTINCT t.asset_id) as assets_affected
FROM tasks t
WHERE t.project_id = 1 AND t.deleted_at IS NULL
GROUP BY t.status, t.task_type
ORDER BY t.status, t.task_type
""")
results = cursor.fetchall()
print(f" Generated {len(results)} status/type combinations")
# Test 6: Task browser query (combining shots and assets)
print("\n6. Task browser query (combining shots and assets):")
with timer():
cursor.execute("""
SELECT
t.id, t.task_type, t.status, t.assigned_user_id, t.updated_at,
CASE
WHEN t.shot_id IS NOT NULL THEN 'shot'
WHEN t.asset_id IS NOT NULL THEN 'asset'
ELSE 'other'
END as entity_type,
COALESCE(s.name, a.name) as entity_name
FROM tasks t
LEFT JOIN shots s ON t.shot_id = s.id AND s.deleted_at IS NULL
LEFT JOIN assets a ON t.asset_id = a.id AND a.deleted_at IS NULL
WHERE t.project_id = 1 AND t.deleted_at IS NULL
ORDER BY t.updated_at DESC
LIMIT 20
""")
results = cursor.fetchall()
print(f" Retrieved {len(results)} tasks for browser")
print("\n" + "=" * 50)
print("✅ All performance tests completed successfully!")
print("The indexes are working correctly and should provide")
print("significant performance improvements for table queries.")
except sqlite3.Error as e:
print(f"❌ Error during performance testing: {e}")
finally:
conn.close()
def analyze_index_usage():
"""Analyze which indexes are being used by the optimizer."""
conn = sqlite3.connect('database.db')
cursor = conn.cursor()
try:
print("\n" + "=" * 50)
print("INDEX USAGE ANALYSIS")
print("=" * 50)
queries_to_analyze = [
("Shot task lookup", "SELECT * FROM tasks WHERE shot_id = 1 AND deleted_at IS NULL"),
("Asset task lookup", "SELECT * FROM tasks WHERE asset_id = 1 AND deleted_at IS NULL"),
("Status filtering", "SELECT * FROM tasks WHERE status = 'in_progress' AND task_type = 'animation' AND deleted_at IS NULL"),
("Shot status combo", "SELECT * FROM tasks WHERE shot_id = 1 AND status = 'in_progress' AND deleted_at IS NULL"),
("Asset status combo", "SELECT * FROM tasks WHERE asset_id = 1 AND status = 'completed' AND deleted_at IS NULL"),
]
for query_name, query in queries_to_analyze:
print(f"\n{query_name}:")
cursor.execute(f"EXPLAIN QUERY PLAN {query}")
plan = cursor.fetchall()
for row in plan:
if "USING INDEX" in row[3]:
index_name = row[3].split("USING INDEX ")[1].split(" ")[0]
print(f" ✓ Using index: {index_name}")
else:
print(f"{row[3]}")
except sqlite3.Error as e:
print(f"❌ Error analyzing index usage: {e}")
finally:
conn.close()
if __name__ == "__main__":
test_optimized_queries()
analyze_index_usage()