LinkDesk/backend/routers/notifications.py

179 lines
5.5 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from sqlalchemy import desc, func
from typing import List, Optional
from datetime import datetime
from database import get_db
from models.user import User
from models.notification import Notification, UserNotificationPreference, NotificationType
from schemas.notification import (
NotificationResponse,
NotificationMarkRead,
NotificationPreferencesResponse,
NotificationPreferencesUpdate,
NotificationStats
)
from utils.auth import get_current_user
router = APIRouter(prefix="/notifications", tags=["notifications"])
@router.get("", response_model=List[NotificationResponse])
def get_notifications(
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=100),
unread_only: bool = Query(False),
type_filter: Optional[NotificationType] = None,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Get notifications for the current user."""
query = db.query(Notification).filter(Notification.user_id == current_user.id)
if unread_only:
query = query.filter(Notification.read == False)
if type_filter:
query = query.filter(Notification.type == type_filter)
notifications = query.order_by(desc(Notification.created_at)).offset(skip).limit(limit).all()
return notifications
@router.get("/stats", response_model=NotificationStats)
def get_notification_stats(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Get notification statistics for the current user."""
total = db.query(Notification).filter(Notification.user_id == current_user.id).count()
unread = db.query(Notification).filter(
Notification.user_id == current_user.id,
Notification.read == False
).count()
# Get counts by type
by_type_query = db.query(
Notification.type,
func.count(Notification.id).label('count')
).filter(
Notification.user_id == current_user.id,
Notification.read == False
).group_by(Notification.type).all()
by_type = {str(type_): count for type_, count in by_type_query}
return NotificationStats(total=total, unread=unread, by_type=by_type)
@router.post("/mark-read")
def mark_notifications_read(
data: NotificationMarkRead,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Mark notifications as read."""
notifications = db.query(Notification).filter(
Notification.id.in_(data.notification_ids),
Notification.user_id == current_user.id
).all()
if not notifications:
raise HTTPException(status_code=404, detail="No notifications found")
for notification in notifications:
notification.read = True
notification.read_at = datetime.utcnow()
db.commit()
return {"message": f"Marked {len(notifications)} notifications as read"}
@router.post("/mark-all-read")
def mark_all_notifications_read(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Mark all notifications as read for the current user."""
count = db.query(Notification).filter(
Notification.user_id == current_user.id,
Notification.read == False
).update({
"read": True,
"read_at": datetime.utcnow()
})
db.commit()
return {"message": f"Marked {count} notifications as read"}
@router.delete("/{notification_id}")
def delete_notification(
notification_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Delete a notification."""
notification = db.query(Notification).filter(
Notification.id == notification_id,
Notification.user_id == current_user.id
).first()
if not notification:
raise HTTPException(status_code=404, detail="Notification not found")
db.delete(notification)
db.commit()
return {"message": "Notification deleted"}
@router.get("/preferences", response_model=NotificationPreferencesResponse)
def get_notification_preferences(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Get notification preferences for the current user."""
preferences = db.query(UserNotificationPreference).filter(
UserNotificationPreference.user_id == current_user.id
).first()
if not preferences:
# Create default preferences
preferences = UserNotificationPreference(user_id=current_user.id)
db.add(preferences)
db.commit()
db.refresh(preferences)
return preferences
@router.put("/preferences", response_model=NotificationPreferencesResponse)
def update_notification_preferences(
data: NotificationPreferencesUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Update notification preferences for the current user."""
preferences = db.query(UserNotificationPreference).filter(
UserNotificationPreference.user_id == current_user.id
).first()
if not preferences:
preferences = UserNotificationPreference(user_id=current_user.id)
db.add(preferences)
# Update all fields
for field, value in data.model_dump().items():
setattr(preferences, field, value)
preferences.updated_at = datetime.utcnow()
db.commit()
db.refresh(preferences)
return preferences