97 lines
3.7 KiB
Python
97 lines
3.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Migration script to convert admin role users to admin permission users.
|
|
This script should be run once to migrate existing admin users to the new admin permission system.
|
|
"""
|
|
|
|
from sqlalchemy import create_engine, text
|
|
from sqlalchemy.orm import sessionmaker
|
|
from database import DATABASE_URL, Base
|
|
from models.user import User, UserRole
|
|
import logging
|
|
|
|
# Setup logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def migrate_admin_users():
|
|
"""Migrate existing admin role users to admin permission system."""
|
|
|
|
# Create engine and session
|
|
engine = create_engine(DATABASE_URL)
|
|
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
|
|
|
# Create tables if they don't exist
|
|
Base.metadata.create_all(bind=engine)
|
|
|
|
db = SessionLocal()
|
|
|
|
try:
|
|
# Check if is_admin column exists, if not add it
|
|
try:
|
|
# Try to query is_admin column
|
|
db.execute(text("SELECT is_admin FROM users LIMIT 1"))
|
|
logger.info("is_admin column already exists")
|
|
except Exception:
|
|
# Add is_admin column if it doesn't exist
|
|
logger.info("Adding is_admin column to users table")
|
|
db.execute(text("ALTER TABLE users ADD COLUMN is_admin BOOLEAN DEFAULT FALSE"))
|
|
db.commit()
|
|
|
|
# Find all users with admin role and convert them
|
|
# Use raw SQL to avoid enum validation issues
|
|
result = db.execute(text("SELECT * FROM users WHERE role = 'ADMIN'"))
|
|
admin_users_data = result.fetchall()
|
|
|
|
# Also try lowercase 'admin' in case some are stored that way
|
|
result2 = db.execute(text("SELECT * FROM users WHERE role = 'admin'"))
|
|
admin_users_data2 = result2.fetchall()
|
|
|
|
all_admin_data = list(admin_users_data) + list(admin_users_data2)
|
|
|
|
if not all_admin_data:
|
|
logger.info("No admin role users found to migrate")
|
|
return
|
|
|
|
logger.info(f"Found {len(all_admin_data)} admin role users to migrate")
|
|
|
|
for user_data in all_admin_data:
|
|
user_id = user_data[0] # Assuming id is the first column
|
|
user_email = user_data[1] # Assuming email is the second column
|
|
|
|
logger.info(f"Migrating user {user_email} (ID: {user_id}) from admin role to admin permission")
|
|
|
|
# Update using raw SQL to avoid enum validation issues
|
|
# Use uppercase enum values as they're stored in the database
|
|
db.execute(text("""
|
|
UPDATE users
|
|
SET role = 'COORDINATOR', is_admin = TRUE
|
|
WHERE id = :user_id
|
|
"""), {"user_id": user_id})
|
|
|
|
logger.info(f"User {user_email} migrated: role=COORDINATOR, is_admin=True")
|
|
|
|
# Commit all changes
|
|
db.commit()
|
|
logger.info("Migration completed successfully")
|
|
|
|
# Verify migration using raw SQL
|
|
result = db.execute(text("SELECT email, role, is_admin FROM users WHERE is_admin = TRUE"))
|
|
admin_permission_users = result.fetchall()
|
|
logger.info(f"After migration: {len(admin_permission_users)} users have admin permission")
|
|
|
|
for user_data in admin_permission_users:
|
|
email, role, is_admin = user_data
|
|
logger.info(f" - {email}: role={role}, is_admin={is_admin}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Migration failed: {e}")
|
|
db.rollback()
|
|
raise
|
|
finally:
|
|
db.close()
|
|
|
|
if __name__ == "__main__":
|
|
logger.info("Starting admin user migration...")
|
|
migrate_admin_users()
|
|
logger.info("Migration completed!") |