709 lines
25 KiB
Python
709 lines
25 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Comprehensive test to verify that the optimized shot and asset routers fully support custom task statuses.
|
|
|
|
This test covers all requirements from task 4:
|
|
- Ensure optimized queries include both default and custom task statuses
|
|
- Test with projects that have custom task statuses defined
|
|
- Verify aggregated data includes all status types
|
|
- Test edge cases like empty custom statuses, malformed JSON, etc.
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
import json
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import sessionmaker
|
|
from database import Base
|
|
from models.user import User, UserRole
|
|
from models.project import Project, ProjectType, ProjectStatus, ProjectMember
|
|
from models.task import Task
|
|
from models.asset import Asset, AssetCategory, AssetStatus
|
|
from models.episode import Episode, EpisodeStatus
|
|
from models.shot import Shot, ShotStatus
|
|
|
|
# Create test database
|
|
TEST_DB = "test_comprehensive_custom_status.db"
|
|
if os.path.exists(TEST_DB):
|
|
try:
|
|
os.remove(TEST_DB)
|
|
except PermissionError:
|
|
pass
|
|
|
|
engine = create_engine(f"sqlite:///{TEST_DB}")
|
|
Base.metadata.create_all(engine)
|
|
SessionLocal = sessionmaker(bind=engine)
|
|
|
|
def setup_comprehensive_test_data():
|
|
"""Set up comprehensive test data covering all edge cases."""
|
|
print("Setting up comprehensive test data...")
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
# Create test user
|
|
user = User(
|
|
email="coordinator@test.com",
|
|
password_hash="test",
|
|
first_name="Test",
|
|
last_name="Coordinator",
|
|
role=UserRole.COORDINATOR,
|
|
is_approved=True
|
|
)
|
|
db.add(user)
|
|
db.commit()
|
|
|
|
# Test Case 1: Project with comprehensive custom statuses
|
|
comprehensive_custom = [
|
|
{
|
|
"id": "custom_todo",
|
|
"name": "To Do",
|
|
"color": "#FF0000",
|
|
"order": 0,
|
|
"is_default": True
|
|
},
|
|
{
|
|
"id": "custom_in_review",
|
|
"name": "In Review",
|
|
"color": "#FFA500",
|
|
"order": 1,
|
|
"is_default": False
|
|
},
|
|
{
|
|
"id": "custom_approved_final",
|
|
"name": "Final Approved",
|
|
"color": "#00FF00",
|
|
"order": 2,
|
|
"is_default": False
|
|
},
|
|
{
|
|
"id": "custom_on_hold",
|
|
"name": "On Hold",
|
|
"color": "#808080",
|
|
"order": 3,
|
|
"is_default": False
|
|
}
|
|
]
|
|
|
|
project1 = Project(
|
|
name="Comprehensive Custom Status Project",
|
|
code_name="CCSP",
|
|
client_name="Test Client",
|
|
project_type=ProjectType.CINEMA,
|
|
status=ProjectStatus.IN_PROGRESS,
|
|
custom_task_statuses=json.dumps(comprehensive_custom)
|
|
)
|
|
db.add(project1)
|
|
|
|
# Test Case 2: Project with empty custom statuses (should use defaults)
|
|
project2 = Project(
|
|
name="Empty Custom Status Project",
|
|
code_name="ECSP",
|
|
client_name="Test Client",
|
|
project_type=ProjectType.TV,
|
|
status=ProjectStatus.IN_PROGRESS,
|
|
custom_task_statuses=json.dumps([])
|
|
)
|
|
db.add(project2)
|
|
|
|
# Test Case 3: Project with NULL custom statuses
|
|
project3 = Project(
|
|
name="Null Custom Status Project",
|
|
code_name="NCSP",
|
|
client_name="Test Client",
|
|
project_type=ProjectType.CINEMA,
|
|
status=ProjectStatus.IN_PROGRESS,
|
|
custom_task_statuses=None
|
|
)
|
|
db.add(project3)
|
|
|
|
# Test Case 4: Project with malformed JSON (should fallback gracefully)
|
|
project4 = Project(
|
|
name="Malformed JSON Project",
|
|
code_name="MJP",
|
|
client_name="Test Client",
|
|
project_type=ProjectType.TV,
|
|
status=ProjectStatus.IN_PROGRESS,
|
|
custom_task_statuses='{"invalid": json}' # Malformed JSON
|
|
)
|
|
db.add(project4)
|
|
|
|
db.commit()
|
|
|
|
# Add user as member of all projects
|
|
for project in [project1, project2, project3, project4]:
|
|
member = ProjectMember(
|
|
user_id=user.id,
|
|
project_id=project.id
|
|
)
|
|
db.add(member)
|
|
|
|
db.commit()
|
|
|
|
# Create episodes for each project
|
|
episodes = []
|
|
for i, project in enumerate([project1, project2, project3, project4], 1):
|
|
episode = Episode(
|
|
project_id=project.id,
|
|
name=f"Episode {i}",
|
|
episode_number=i,
|
|
status=EpisodeStatus.IN_PROGRESS
|
|
)
|
|
db.add(episode)
|
|
episodes.append(episode)
|
|
|
|
db.commit()
|
|
|
|
# Create shots and assets with comprehensive status coverage
|
|
|
|
# Project 1: Mix of all custom statuses and default statuses
|
|
shot1 = Shot(
|
|
project_id=project1.id,
|
|
episode_id=episodes[0].id,
|
|
name="comprehensive_shot_001",
|
|
frame_start=1001,
|
|
frame_end=1100,
|
|
status=ShotStatus.IN_PROGRESS
|
|
)
|
|
db.add(shot1)
|
|
db.flush()
|
|
|
|
# Tasks covering all custom statuses plus some defaults
|
|
task_statuses_p1 = [
|
|
("layout", "custom_todo"),
|
|
("animation", "custom_in_review"),
|
|
("lighting", "custom_approved_final"),
|
|
("compositing", "custom_on_hold"),
|
|
("simulation", "in_progress") # Default status mixed in
|
|
]
|
|
|
|
for task_type, status in task_statuses_p1:
|
|
task = Task(
|
|
project_id=project1.id,
|
|
episode_id=episodes[0].id,
|
|
shot_id=shot1.id,
|
|
name=f"{shot1.name}_{task_type}",
|
|
task_type=task_type,
|
|
status=status
|
|
)
|
|
db.add(task)
|
|
|
|
# Asset for project 1
|
|
asset1 = Asset(
|
|
project_id=project1.id,
|
|
name="comprehensive_character",
|
|
category=AssetCategory.CHARACTERS,
|
|
status=AssetStatus.IN_PROGRESS
|
|
)
|
|
db.add(asset1)
|
|
db.flush()
|
|
|
|
asset_task_statuses_p1 = [
|
|
("modeling", "custom_todo"),
|
|
("surfacing", "custom_approved_final"),
|
|
("rigging", "approved") # Default status
|
|
]
|
|
|
|
for task_type, status in asset_task_statuses_p1:
|
|
task = Task(
|
|
project_id=project1.id,
|
|
asset_id=asset1.id,
|
|
name=f"{asset1.name}_{task_type}",
|
|
task_type=task_type,
|
|
status=status
|
|
)
|
|
db.add(task)
|
|
|
|
# Project 2: Only default statuses (empty custom statuses)
|
|
shot2 = Shot(
|
|
project_id=project2.id,
|
|
episode_id=episodes[1].id,
|
|
name="empty_custom_shot_001",
|
|
frame_start=1001,
|
|
frame_end=1100,
|
|
status=ShotStatus.IN_PROGRESS
|
|
)
|
|
db.add(shot2)
|
|
db.flush()
|
|
|
|
# Only default statuses
|
|
task_statuses_p2 = [
|
|
("layout", "not_started"),
|
|
("animation", "in_progress"),
|
|
("lighting", "submitted"),
|
|
("compositing", "approved")
|
|
]
|
|
|
|
for task_type, status in task_statuses_p2:
|
|
task = Task(
|
|
project_id=project2.id,
|
|
episode_id=episodes[1].id,
|
|
shot_id=shot2.id,
|
|
name=f"{shot2.name}_{task_type}",
|
|
task_type=task_type,
|
|
status=status
|
|
)
|
|
db.add(task)
|
|
|
|
# Asset for project 2
|
|
asset2 = Asset(
|
|
project_id=project2.id,
|
|
name="empty_custom_prop",
|
|
category=AssetCategory.PROPS,
|
|
status=AssetStatus.IN_PROGRESS
|
|
)
|
|
db.add(asset2)
|
|
db.flush()
|
|
|
|
asset_task_statuses_p2 = [
|
|
("modeling", "not_started"),
|
|
("surfacing", "retake")
|
|
]
|
|
|
|
for task_type, status in asset_task_statuses_p2:
|
|
task = Task(
|
|
project_id=project2.id,
|
|
asset_id=asset2.id,
|
|
name=f"{asset2.name}_{task_type}",
|
|
task_type=task_type,
|
|
status=status
|
|
)
|
|
db.add(task)
|
|
|
|
# Project 3: NULL custom statuses (should use defaults)
|
|
shot3 = Shot(
|
|
project_id=project3.id,
|
|
episode_id=episodes[2].id,
|
|
name="null_custom_shot_001",
|
|
frame_start=1001,
|
|
frame_end=1100,
|
|
status=ShotStatus.IN_PROGRESS
|
|
)
|
|
db.add(shot3)
|
|
db.flush()
|
|
|
|
# Only default statuses
|
|
task_statuses_p3 = [
|
|
("layout", "not_started"),
|
|
("animation", "submitted")
|
|
]
|
|
|
|
for task_type, status in task_statuses_p3:
|
|
task = Task(
|
|
project_id=project3.id,
|
|
episode_id=episodes[2].id,
|
|
shot_id=shot3.id,
|
|
name=f"{shot3.name}_{task_type}",
|
|
task_type=task_type,
|
|
status=status
|
|
)
|
|
db.add(task)
|
|
|
|
# Project 4: Malformed JSON (should fallback to defaults)
|
|
shot4 = Shot(
|
|
project_id=project4.id,
|
|
episode_id=episodes[3].id,
|
|
name="malformed_shot_001",
|
|
frame_start=1001,
|
|
frame_end=1100,
|
|
status=ShotStatus.IN_PROGRESS
|
|
)
|
|
db.add(shot4)
|
|
db.flush()
|
|
|
|
# Only default statuses (since custom status parsing should fail gracefully)
|
|
task_statuses_p4 = [
|
|
("layout", "approved"),
|
|
("animation", "retake")
|
|
]
|
|
|
|
for task_type, status in task_statuses_p4:
|
|
task = Task(
|
|
project_id=project4.id,
|
|
episode_id=episodes[3].id,
|
|
shot_id=shot4.id,
|
|
name=f"{shot4.name}_{task_type}",
|
|
task_type=task_type,
|
|
status=status
|
|
)
|
|
db.add(task)
|
|
|
|
db.commit()
|
|
print("✅ Comprehensive test data setup complete")
|
|
return user, [project1, project2, project3, project4], episodes
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
print(f"❌ Failed to setup comprehensive test data: {e}")
|
|
raise
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def test_comprehensive_custom_status_support():
|
|
"""Test comprehensive custom status support in optimized queries."""
|
|
print("\n=== Test 1: Comprehensive Custom Status Support ===")
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
user = db.query(User).first()
|
|
projects = db.query(Project).all()
|
|
|
|
from routers.shots import list_shots
|
|
from routers.assets import list_assets
|
|
import asyncio
|
|
|
|
# Test each project's shots
|
|
for project in projects:
|
|
episode = db.query(Episode).filter(Episode.project_id == project.id).first()
|
|
|
|
print(f"\n--- Testing Project: {project.name} ---")
|
|
print(f"Custom statuses: {project.custom_task_statuses}")
|
|
|
|
# Test shots
|
|
async def get_project_shots():
|
|
return await list_shots(
|
|
episode_id=episode.id,
|
|
project_id=project.id,
|
|
task_status_filter=None,
|
|
sort_by=None,
|
|
sort_direction="asc",
|
|
skip=0,
|
|
limit=100,
|
|
db=db,
|
|
current_user=user
|
|
)
|
|
|
|
shots = asyncio.run(get_project_shots())
|
|
|
|
for shot in shots:
|
|
print(f"Shot: {shot.name}")
|
|
print(f" Task Status: {shot.task_status}")
|
|
print(f" Task Details: {len(shot.task_details)} tasks")
|
|
|
|
# Verify all task statuses are valid
|
|
for task_type, status in shot.task_status.items():
|
|
# Should be either a default status or a valid custom status for this project
|
|
is_default = status in ['not_started', 'in_progress', 'submitted', 'approved', 'retake']
|
|
is_custom = False
|
|
|
|
if project.custom_task_statuses:
|
|
try:
|
|
custom_statuses = json.loads(project.custom_task_statuses)
|
|
if isinstance(custom_statuses, list):
|
|
custom_status_ids = [cs.get('id') for cs in custom_statuses if isinstance(cs, dict)]
|
|
is_custom = status in custom_status_ids
|
|
except (json.JSONDecodeError, TypeError):
|
|
pass # Malformed JSON, should fallback to defaults
|
|
|
|
assert is_default or is_custom, f"Invalid status '{status}' for project {project.name}"
|
|
print(f" ✅ Status '{status}' is valid ({'default' if is_default else 'custom'})")
|
|
|
|
# Verify task_details match task_status
|
|
for task_detail in shot.task_details:
|
|
expected_status = shot.task_status.get(task_detail.task_type)
|
|
assert task_detail.status == expected_status, f"Task detail status mismatch: {task_detail.status} != {expected_status}"
|
|
|
|
# Test assets
|
|
async def get_project_assets():
|
|
return await list_assets(
|
|
project_id=project.id,
|
|
category=None,
|
|
task_status_filter=None,
|
|
sort_by=None,
|
|
sort_direction="asc",
|
|
skip=0,
|
|
limit=100,
|
|
db=db,
|
|
current_user=user
|
|
)
|
|
|
|
assets = asyncio.run(get_project_assets())
|
|
|
|
for asset in assets:
|
|
print(f"Asset: {asset.name}")
|
|
print(f" Task Status: {asset.task_status}")
|
|
print(f" Task Details: {len(asset.task_details)} tasks")
|
|
|
|
# Verify all task statuses are valid
|
|
for task_type, status in asset.task_status.items():
|
|
# Should be either a default status or a valid custom status for this project
|
|
is_default = status in ['not_started', 'in_progress', 'submitted', 'approved', 'retake']
|
|
is_custom = False
|
|
|
|
if project.custom_task_statuses:
|
|
try:
|
|
custom_statuses = json.loads(project.custom_task_statuses)
|
|
if isinstance(custom_statuses, list):
|
|
custom_status_ids = [cs.get('id') for cs in custom_statuses if isinstance(cs, dict)]
|
|
is_custom = status in custom_status_ids
|
|
except (json.JSONDecodeError, TypeError):
|
|
pass # Malformed JSON, should fallback to defaults
|
|
|
|
assert is_default or is_custom, f"Invalid status '{status}' for project {project.name}"
|
|
print(f" ✅ Status '{status}' is valid ({'default' if is_default else 'custom'})")
|
|
|
|
print("✅ Test 1 PASSED: Comprehensive custom status support works correctly")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"❌ Test 1 FAILED: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return False
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def test_custom_status_aggregation_completeness():
|
|
"""Test that aggregated data includes ALL status types (default + custom)."""
|
|
print("\n=== Test 2: Custom Status Aggregation Completeness ===")
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
user = db.query(User).first()
|
|
|
|
# Get the comprehensive project (project 1)
|
|
project = db.query(Project).filter(Project.name == "Comprehensive Custom Status Project").first()
|
|
episode = db.query(Episode).filter(Episode.project_id == project.id).first()
|
|
|
|
from routers.shots import list_shots
|
|
from routers.assets import list_assets
|
|
import asyncio
|
|
|
|
# Test shots aggregation completeness
|
|
async def get_shots():
|
|
return await list_shots(
|
|
episode_id=episode.id,
|
|
project_id=project.id,
|
|
task_status_filter=None,
|
|
sort_by=None,
|
|
sort_direction="asc",
|
|
skip=0,
|
|
limit=100,
|
|
db=db,
|
|
current_user=user
|
|
)
|
|
|
|
shots = asyncio.run(get_shots())
|
|
|
|
print(f"Testing aggregation completeness for project: {project.name}")
|
|
|
|
# Verify that all custom statuses are present in aggregated data
|
|
expected_custom_statuses = {'custom_todo', 'custom_in_review', 'custom_approved_final', 'custom_on_hold'}
|
|
expected_default_statuses = {'in_progress'} # We know we have this one
|
|
|
|
found_custom_statuses = set()
|
|
found_default_statuses = set()
|
|
|
|
for shot in shots:
|
|
print(f"Shot: {shot.name}")
|
|
|
|
# Check task_status dict
|
|
for task_type, status in shot.task_status.items():
|
|
if status in expected_custom_statuses:
|
|
found_custom_statuses.add(status)
|
|
elif status in ['not_started', 'in_progress', 'submitted', 'approved', 'retake']:
|
|
found_default_statuses.add(status)
|
|
|
|
# Check task_details
|
|
for task_detail in shot.task_details:
|
|
if task_detail.status in expected_custom_statuses:
|
|
found_custom_statuses.add(task_detail.status)
|
|
elif task_detail.status in ['not_started', 'in_progress', 'submitted', 'approved', 'retake']:
|
|
found_default_statuses.add(task_detail.status)
|
|
|
|
print(f"Found custom statuses: {found_custom_statuses}")
|
|
print(f"Found default statuses: {found_default_statuses}")
|
|
|
|
# Verify we found all expected custom statuses
|
|
missing_custom = expected_custom_statuses - found_custom_statuses
|
|
assert len(missing_custom) == 0, f"Missing custom statuses in aggregation: {missing_custom}"
|
|
|
|
# Verify we found expected default statuses
|
|
missing_default = expected_default_statuses - found_default_statuses
|
|
assert len(missing_default) == 0, f"Missing default statuses in aggregation: {missing_default}"
|
|
|
|
# Test assets aggregation completeness
|
|
async def get_assets():
|
|
return await list_assets(
|
|
project_id=project.id,
|
|
category=None,
|
|
task_status_filter=None,
|
|
sort_by=None,
|
|
sort_direction="asc",
|
|
skip=0,
|
|
limit=100,
|
|
db=db,
|
|
current_user=user
|
|
)
|
|
|
|
assets = asyncio.run(get_assets())
|
|
|
|
asset_found_custom = set()
|
|
asset_found_default = set()
|
|
|
|
for asset in assets:
|
|
print(f"Asset: {asset.name}")
|
|
|
|
# Check task_status dict
|
|
for task_type, status in asset.task_status.items():
|
|
if status in expected_custom_statuses:
|
|
asset_found_custom.add(status)
|
|
elif status in ['not_started', 'in_progress', 'submitted', 'approved', 'retake']:
|
|
asset_found_default.add(status)
|
|
|
|
# Check task_details
|
|
for task_detail in asset.task_details:
|
|
if task_detail.status in expected_custom_statuses:
|
|
asset_found_custom.add(task_detail.status)
|
|
elif task_detail.status in ['not_started', 'in_progress', 'submitted', 'approved', 'retake']:
|
|
asset_found_default.add(task_detail.status)
|
|
|
|
print(f"Asset found custom statuses: {asset_found_custom}")
|
|
print(f"Asset found default statuses: {asset_found_default}")
|
|
|
|
# Verify assets have both custom and default statuses
|
|
assert len(asset_found_custom) > 0, "No custom statuses found in asset aggregation"
|
|
assert len(asset_found_default) > 0, "No default statuses found in asset aggregation"
|
|
|
|
print("✅ Test 2 PASSED: Custom status aggregation completeness verified")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"❌ Test 2 FAILED: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return False
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def test_edge_case_handling():
|
|
"""Test edge cases like NULL, empty, and malformed custom statuses."""
|
|
print("\n=== Test 3: Edge Case Handling ===")
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
user = db.query(User).first()
|
|
|
|
from routers.shots import list_shots
|
|
import asyncio
|
|
|
|
# Test projects with edge cases
|
|
edge_case_projects = [
|
|
("Empty Custom Status Project", "empty custom statuses"),
|
|
("Null Custom Status Project", "NULL custom statuses"),
|
|
("Malformed JSON Project", "malformed JSON custom statuses")
|
|
]
|
|
|
|
for project_name, description in edge_case_projects:
|
|
project = db.query(Project).filter(Project.name == project_name).first()
|
|
episode = db.query(Episode).filter(Episode.project_id == project.id).first()
|
|
|
|
print(f"\n--- Testing {description}: {project.name} ---")
|
|
print(f"Custom statuses: {project.custom_task_statuses}")
|
|
|
|
async def get_edge_case_shots():
|
|
return await list_shots(
|
|
episode_id=episode.id,
|
|
project_id=project.id,
|
|
task_status_filter=None,
|
|
sort_by=None,
|
|
sort_direction="asc",
|
|
skip=0,
|
|
limit=100,
|
|
db=db,
|
|
current_user=user
|
|
)
|
|
|
|
shots = asyncio.run(get_edge_case_shots())
|
|
|
|
for shot in shots:
|
|
print(f"Shot: {shot.name}")
|
|
print(f" Task Status: {shot.task_status}")
|
|
|
|
# Verify that all statuses are valid defaults (since custom statuses should be empty/invalid)
|
|
for task_type, status in shot.task_status.items():
|
|
is_default = status in ['not_started', 'in_progress', 'submitted', 'approved', 'retake']
|
|
assert is_default, f"Expected default status, got '{status}' for edge case project {project.name}"
|
|
print(f" ✅ Status '{status}' is valid default status")
|
|
|
|
# Verify task_details are consistent
|
|
for task_detail in shot.task_details:
|
|
is_default = task_detail.status in ['not_started', 'in_progress', 'submitted', 'approved', 'retake']
|
|
assert is_default, f"Expected default status in task details, got '{task_detail.status}'"
|
|
|
|
print("✅ Test 3 PASSED: Edge case handling works correctly")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"❌ Test 3 FAILED: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return False
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def main():
|
|
"""Run all comprehensive custom status tests."""
|
|
print("🚀 Starting Comprehensive Custom Status Verification Tests")
|
|
print("=" * 70)
|
|
|
|
try:
|
|
# Setup test data
|
|
setup_comprehensive_test_data()
|
|
|
|
# Run tests
|
|
tests = [
|
|
test_comprehensive_custom_status_support,
|
|
test_custom_status_aggregation_completeness,
|
|
test_edge_case_handling
|
|
]
|
|
|
|
passed = 0
|
|
failed = 0
|
|
|
|
for test in tests:
|
|
try:
|
|
if test():
|
|
passed += 1
|
|
else:
|
|
failed += 1
|
|
except Exception as e:
|
|
print(f"❌ Test failed with exception: {e}")
|
|
failed += 1
|
|
|
|
print("\n" + "=" * 70)
|
|
print(f"📊 Test Results: {passed} passed, {failed} failed")
|
|
|
|
if failed == 0:
|
|
print("🎉 All comprehensive tests passed!")
|
|
print("✅ Custom status support is fully implemented and working correctly.")
|
|
print("✅ Optimized queries include both default and custom task statuses.")
|
|
print("✅ Projects with custom task statuses are properly supported.")
|
|
print("✅ Aggregated data includes all status types.")
|
|
return True
|
|
else:
|
|
print("❌ Some tests failed. Custom status support needs fixes.")
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"❌ Test setup failed: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return False
|
|
finally:
|
|
# Cleanup
|
|
try:
|
|
if os.path.exists(TEST_DB):
|
|
os.remove(TEST_DB)
|
|
except PermissionError:
|
|
pass
|
|
|
|
|
|
if __name__ == "__main__":
|
|
success = main()
|
|
sys.exit(0 if success else 1) |