LinkDesk/backend/test_bulk_actions.py

157 lines
4.7 KiB
Python

"""
Test script for bulk action endpoints.
Tests bulk status update and bulk assignment functionality.
"""
import requests
import json
BASE_URL = "http://localhost:8000"
def login(email, password):
"""Login and get access token."""
response = requests.post(
f"{BASE_URL}/auth/login",
json={"email": email, "password": password}
)
if response.status_code == 200:
return response.json()["access_token"]
else:
print(f"Login failed: {response.status_code}")
print(response.text)
return None
def get_tasks(token, project_id=None):
"""Get tasks list."""
headers = {"Authorization": f"Bearer {token}"}
params = {}
if project_id:
params["project_id"] = project_id
response = requests.get(
f"{BASE_URL}/tasks/",
headers=headers,
params=params
)
if response.status_code == 200:
return response.json()
else:
print(f"Get tasks failed: {response.status_code}")
print(response.text)
return []
def bulk_update_status(token, task_ids, status):
"""Bulk update task status."""
headers = {"Authorization": f"Bearer {token}"}
data = {
"task_ids": task_ids,
"status": status
}
response = requests.put(
f"{BASE_URL}/tasks/bulk/status",
headers=headers,
json=data
)
print(f"\nBulk Status Update Response ({response.status_code}):")
print(json.dumps(response.json(), indent=2))
return response.json()
def bulk_assign_tasks(token, task_ids, user_id):
"""Bulk assign tasks to a user."""
headers = {"Authorization": f"Bearer {token}"}
data = {
"task_ids": task_ids,
"assigned_user_id": user_id
}
response = requests.put(
f"{BASE_URL}/tasks/bulk/assign",
headers=headers,
json=data
)
print(f"\nBulk Assignment Response ({response.status_code}):")
print(json.dumps(response.json(), indent=2))
return response.json()
def main():
print("=" * 60)
print("Testing Bulk Action Endpoints")
print("=" * 60)
# Login as coordinator (should have permission for bulk actions)
print("\n1. Logging in as coordinator...")
token = login("admin@vfx.com", "admin123")
if not token:
print("Failed to login. Make sure the backend is running and user exists.")
return
print("✓ Login successful")
# Get some tasks
print("\n2. Fetching tasks...")
tasks = get_tasks(token, project_id=1)
if not tasks:
print("No tasks found. Create some tasks first.")
return
print(f"✓ Found {len(tasks)} tasks")
# Display first few tasks
print("\nFirst 3 tasks:")
for task in tasks[:3]:
print(f" - Task {task['id']}: {task['name']} (Status: {task['status']})")
# Test bulk status update
if len(tasks) >= 2:
task_ids = [tasks[0]['id'], tasks[1]['id']]
print(f"\n3. Testing bulk status update for tasks {task_ids}...")
result = bulk_update_status(token, task_ids, "in_progress")
if result.get('success_count', 0) > 0:
print(f"✓ Successfully updated {result['success_count']} tasks")
else:
print(f"✗ Failed to update tasks")
# Test bulk assignment
if len(tasks) >= 2:
task_ids = [tasks[0]['id'], tasks[1]['id']]
# Try to assign to user ID 1 (admin)
print(f"\n4. Testing bulk assignment for tasks {task_ids}...")
result = bulk_assign_tasks(token, task_ids, 1)
if result.get('success_count', 0) > 0:
print(f"✓ Successfully assigned {result['success_count']} tasks")
else:
print(f"✗ Failed to assign tasks")
# Test error handling - invalid task IDs
print("\n5. Testing error handling with invalid task IDs...")
result = bulk_update_status(token, [99999, 99998], "in_progress")
if result.get('failed_count', 0) > 0:
print(f"✓ Error handling works - {result['failed_count']} tasks failed as expected")
# Test atomicity - mix of valid and invalid IDs
if len(tasks) >= 1:
mixed_ids = [tasks[0]['id'], 99999]
print(f"\n6. Testing atomicity with mixed valid/invalid IDs {mixed_ids}...")
result = bulk_update_status(token, mixed_ids, "approved")
if result.get('success_count', 0) == 0 and result.get('failed_count', 0) > 0:
print("✓ Atomicity works - no partial updates when errors occur")
else:
print("✗ Atomicity may not be working correctly")
print("\n" + "=" * 60)
print("Bulk Action Tests Complete")
print("=" * 60)
if __name__ == "__main__":
main()