"""Authentication API endpoints.""" from typing import Optional from datetime import datetime, timedelta from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks from fastapi.security import OAuth2PasswordRequestForm from pydantic import BaseModel, EmailStr, Field from sqlalchemy.orm import Session import sys from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent)) from core.database import get_db from core.config import settings, auth_settings from models.user import User from services.auth_service import AuthService from services.email_service import EmailService from api.dependencies import get_current_user, get_current_active_user router = APIRouter(prefix="/api/auth", tags=["authentication"]) # Request/Response models class UserRegisterRequest(BaseModel): """User registration request model.""" email: EmailStr password: str = Field(..., min_length=8) confirm_password: str def validate_passwords(self) -> tuple[bool, str]: """Validate password requirements.""" if self.password != self.confirm_password: return False, "Passwords do not match" return auth_settings.validate_password_requirements(self.password) class UserLoginRequest(BaseModel): """User login request model.""" email: EmailStr password: str class TokenResponse(BaseModel): """Token response model.""" access_token: str refresh_token: str token_type: str = "bearer" expires_in: int # seconds class UserResponse(BaseModel): """User response model.""" id: str email: str is_verified: bool is_active: bool created_at: datetime last_login: Optional[datetime] class Config: from_attributes = True class MessageResponse(BaseModel): """Simple message response.""" message: str success: bool = True class RefreshTokenRequest(BaseModel): """Refresh token request model.""" refresh_token: str class PasswordResetRequest(BaseModel): """Password reset request model.""" email: EmailStr class PasswordResetConfirmRequest(BaseModel): """Password reset confirmation model.""" token: str new_password: str = Field(..., min_length=8) confirm_password: str # Endpoints @router.post("/register", response_model=UserResponse, status_code=status.HTTP_201_CREATED) async def register( request: UserRegisterRequest, background_tasks: BackgroundTasks, db: Session = Depends(get_db) ): """ Register a new user account. Args: request: Registration details background_tasks: Background task runner db: Database session Returns: Created user Raises: HTTPException: If registration fails """ # Validate passwords valid, message = request.validate_passwords() if not valid: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=message ) # Check if user exists existing_user = db.query(User).filter(User.email == request.email).first() if existing_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered" ) # Create user hashed_password = AuthService.hash_password(request.password) user = User( email=request.email, password_hash=hashed_password, is_verified=False, is_active=True ) db.add(user) db.commit() db.refresh(user) # Send verification email in background verification_token = AuthService.create_email_verification_token(str(user.id)) background_tasks.add_task( EmailService.send_verification_email, email=user.email, token=verification_token ) return UserResponse.from_orm(user) @router.post("/login", response_model=TokenResponse) async def login( request: UserLoginRequest, db: Session = Depends(get_db) ): """ Login with email and password. Args: request: Login credentials db: Database session Returns: Access and refresh tokens Raises: HTTPException: If authentication fails """ user = AuthService.authenticate_user(request.email, request.password, db) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid email or password", headers={"WWW-Authenticate": "Bearer"}, ) if not user.is_active: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Account is disabled" ) # Create tokens access_token = AuthService.create_access_token( data={"sub": str(user.id), "email": user.email} ) refresh_token = AuthService.create_refresh_token(str(user.id), db) return TokenResponse( access_token=access_token, refresh_token=refresh_token, expires_in=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60 ) @router.post("/refresh", response_model=TokenResponse) async def refresh_token( request: RefreshTokenRequest, db: Session = Depends(get_db) ): """ Refresh access token using refresh token. Args: request: Refresh token db: Database session Returns: New access and refresh tokens Raises: HTTPException: If refresh token is invalid """ # Verify refresh token token_obj = AuthService.verify_refresh_token(request.refresh_token, db) if not token_obj: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid refresh token" ) # Get user user = db.query(User).filter(User.id == token_obj.user_id).first() if not user or not user.is_active: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found or inactive" ) # Revoke old refresh token AuthService.revoke_refresh_token(request.refresh_token, db) # Create new tokens access_token = AuthService.create_access_token( data={"sub": str(user.id), "email": user.email} ) new_refresh_token = AuthService.create_refresh_token(str(user.id), db) return TokenResponse( access_token=access_token, refresh_token=new_refresh_token, expires_in=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60 ) @router.post("/logout", response_model=MessageResponse) async def logout( refresh_token: Optional[str] = None, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """ Logout user and revoke tokens. Args: refresh_token: Optional refresh token to revoke current_user: Current authenticated user db: Database session Returns: Success message """ if refresh_token: AuthService.revoke_refresh_token(refresh_token, db) else: # Revoke all user tokens AuthService.revoke_all_user_tokens(str(current_user.id), db) return MessageResponse(message="Logged out successfully") @router.get("/me", response_model=UserResponse) async def get_current_user_info( current_user: User = Depends(get_current_active_user) ): """ Get current user information. Args: current_user: Current authenticated user Returns: User information """ return UserResponse.from_orm(current_user) @router.post("/verify-email", response_model=MessageResponse) async def verify_email( token: str, db: Session = Depends(get_db) ): """ Verify email address with token. Args: token: Email verification token db: Database session Returns: Success message Raises: HTTPException: If verification fails """ user_id = AuthService.verify_email_token(token) if not user_id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid or expired verification token" ) # Update user user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) if user.is_verified: return MessageResponse(message="Email already verified") user.is_verified = True db.commit() return MessageResponse(message="Email verified successfully") @router.post("/resend-verification", response_model=MessageResponse) async def resend_verification( background_tasks: BackgroundTasks, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """ Resend email verification link. Args: background_tasks: Background task runner current_user: Current authenticated user db: Database session Returns: Success message """ if current_user.is_verified: return MessageResponse(message="Email already verified") # Send new verification email verification_token = AuthService.create_email_verification_token(str(current_user.id)) background_tasks.add_task( EmailService.send_verification_email, email=current_user.email, token=verification_token ) return MessageResponse(message="Verification email sent") @router.post("/reset-password", response_model=MessageResponse) async def reset_password_request( request: PasswordResetRequest, background_tasks: BackgroundTasks, db: Session = Depends(get_db) ): """ Request password reset email. Args: request: Email for password reset background_tasks: Background task runner db: Database session Returns: Success message (always returns success for security) """ # Find user user = db.query(User).filter(User.email == request.email).first() if user: # Send password reset email reset_token = AuthService.create_password_reset_token(str(user.id)) background_tasks.add_task( EmailService.send_password_reset_email, email=user.email, token=reset_token ) # Always return success for security (don't reveal if email exists) return MessageResponse( message="If the email exists, a password reset link has been sent" ) @router.post("/reset-password/confirm", response_model=MessageResponse) async def reset_password_confirm( request: PasswordResetConfirmRequest, db: Session = Depends(get_db) ): """ Confirm password reset with new password. Args: request: Reset token and new password db: Database session Returns: Success message Raises: HTTPException: If reset fails """ # Validate passwords match if request.new_password != request.confirm_password: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Passwords do not match" ) # Validate password requirements valid, message = auth_settings.validate_password_requirements(request.new_password) if not valid: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=message ) # Verify token user_id = AuthService.verify_password_reset_token(request.token) if not user_id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid or expired reset token" ) # Update password user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) user.password_hash = AuthService.hash_password(request.new_password) # Revoke all refresh tokens for security AuthService.revoke_all_user_tokens(str(user.id), db) db.commit() return MessageResponse(message="Password reset successfully")