From a40bd9af73055ab5029fa8b952c82f842a86af47 Mon Sep 17 00:00:00 2001 From: MoonTestUse1 Date: Tue, 7 Jan 2025 08:10:40 +0600 Subject: [PATCH] Fix: Admin autorization --- backend/app/crud/employees.py | 66 ++++++++++++++----------- backend/app/models/employee.py | 1 + backend/app/routers/auth.py | 88 +++++++++++++++++---------------- backend/app/schemas/employee.py | 11 +++-- 4 files changed, 92 insertions(+), 74 deletions(-) diff --git a/backend/app/crud/employees.py b/backend/app/crud/employees.py index c87e356..d17ca09 100644 --- a/backend/app/crud/employees.py +++ b/backend/app/crud/employees.py @@ -1,13 +1,9 @@ """Employee CRUD operations""" +from typing import Optional, Dict, Any from sqlalchemy.orm import Session -from typing import List, Optional -from ..models.employee import Employee -from ..schemas.employee import EmployeeCreate, EmployeeUpdate -from ..utils.loggers import auth_logger -def get_employees(db: Session, skip: int = 0, limit: int = 100) -> List[Employee]: - """Get all employees""" - return db.query(Employee).offset(skip).limit(limit).all() +from ..models.employee import Employee +from ..utils.security import get_password_hash def get_employee(db: Session, employee_id: int) -> Optional[Employee]: """Get employee by ID""" @@ -20,35 +16,49 @@ def get_employee_by_credentials(db: Session, first_name: str, last_name: str) -> Employee.last_name == last_name ).first() -def create_employee(db: Session, employee: EmployeeCreate, hashed_password: str) -> Employee: +def get_employee_by_login(db: Session, login: str) -> Optional[Employee]: + """Get employee by login""" + return db.query(Employee).filter(Employee.login == login).first() + +def get_employees(db: Session, skip: int = 0, limit: int = 100): + """Get list of employees""" + return db.query(Employee).offset(skip).limit(limit).all() + +def create_employee(db: Session, employee_data: Dict[str, Any]) -> Employee: """Create new employee""" + # Хешируем пароль + hashed_password = get_password_hash(employee_data["password"]) + + # Создаем сотрудника db_employee = Employee( - first_name=employee.first_name, - last_name=employee.last_name, - department=employee.department, - office=employee.office, + login=employee_data.get("login"), + first_name=employee_data["first_name"], + last_name=employee_data["last_name"], + department=employee_data["department"], + office=employee_data["office"], hashed_password=hashed_password, - is_admin=employee.is_admin + is_admin=employee_data.get("is_admin", False) ) db.add(db_employee) db.commit() db.refresh(db_employee) return db_employee -def update_employee(db: Session, employee_id: int, employee: EmployeeUpdate) -> Optional[Employee]: - """Update employee data""" - db_employee = get_employee(db, employee_id) - if db_employee: - for key, value in employee.dict(exclude_unset=True).items(): - setattr(db_employee, key, value) - db.commit() - db.refresh(db_employee) - return db_employee +def update_employee(db: Session, employee: Employee, employee_data: Dict[str, Any]) -> Employee: + """Update employee""" + # Если есть пароль в данных, хешируем его + if "password" in employee_data: + employee_data["hashed_password"] = get_password_hash(employee_data.pop("password")) + + # Обновляем поля + for field, value in employee_data.items(): + setattr(employee, field, value) + + db.commit() + db.refresh(employee) + return employee -def delete_employee(db: Session, employee_id: int) -> Optional[Employee]: +def delete_employee(db: Session, employee: Employee) -> None: """Delete employee""" - db_employee = get_employee(db, employee_id) - if db_employee: - db.delete(db_employee) - db.commit() - return db_employee \ No newline at end of file + db.delete(employee) + db.commit() \ No newline at end of file diff --git a/backend/app/models/employee.py b/backend/app/models/employee.py index c5dfc57..3e9a07c 100644 --- a/backend/app/models/employee.py +++ b/backend/app/models/employee.py @@ -9,6 +9,7 @@ class Employee(Base): __tablename__ = "employees" id = Column(Integer, primary_key=True, index=True) + login = Column(String, unique=True, nullable=False, index=True) first_name = Column(String, nullable=False) last_name = Column(String, nullable=False) department = Column(String, nullable=False) diff --git a/backend/app/routers/auth.py b/backend/app/routers/auth.py index b522a9e..6a92ae0 100644 --- a/backend/app/routers/auth.py +++ b/backend/app/routers/auth.py @@ -1,20 +1,57 @@ """Authentication router""" from fastapi import APIRouter, Depends, HTTPException, status -from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm +from fastapi.security import OAuth2PasswordRequestForm from sqlalchemy.orm import Session -from typing import Optional -from ..database import get_db from ..crud import employees -from ..schemas.auth import Token, LoginCredentials -from ..utils.auth import verify_password +from ..schemas.token import Token +from ..utils.security import verify_password from ..utils.jwt import create_and_save_token +from ..dependencies import get_db router = APIRouter() -oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login") + +ADMIN_LOGIN = "admin" +ADMIN_PASSWORD = "admin123" + +@router.post("/admin/login", response_model=Token) +async def admin_login( + form_data: OAuth2PasswordRequestForm = Depends(), + db: Session = Depends(get_db) +): + """Авторизация администратора""" + # Проверяем фиксированные учетные данные администратора + if form_data.username != ADMIN_LOGIN or form_data.password != ADMIN_PASSWORD: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Incorrect username or password", + headers={"WWW-Authenticate": "Bearer"}, + ) + + # Получаем или создаем админа в базе + admin = employees.get_employee_by_login(db, ADMIN_LOGIN) + if not admin: + # Если админа нет в базе, создаем его + admin = employees.create_employee(db, { + "login": ADMIN_LOGIN, + "first_name": "Admin", + "last_name": "User", + "department": "IT", + "office": "Main", + "password": ADMIN_PASSWORD, + "is_admin": True + }) + + # Создаем и сохраняем токен + access_token = create_and_save_token(admin.id, db) + + return { + "access_token": access_token, + "token_type": "bearer" + } @router.post("/login", response_model=Token) -async def login_for_access_token( +async def login( form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db) ): @@ -24,8 +61,8 @@ async def login_for_access_token( first_name, last_name = form_data.username.split() except ValueError: raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Username should be in format: 'First Last'", + status_code=status.HTTP_400_BAD_REQUEST, + detail="Username must be in format: 'First Last'", headers={"WWW-Authenticate": "Bearer"}, ) @@ -46,36 +83,3 @@ async def login_for_access_token( "token_type": "bearer" } -@router.post("/admin/login", response_model=Token) -async def admin_login( - form_data: OAuth2PasswordRequestForm = Depends(), - db: Session = Depends(get_db) -): - """Авторизация администратора""" - # Разделяем username на имя и фамилию - try: - first_name, last_name = form_data.username.split() - except ValueError: - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Username should be in format: 'First Last'", - headers={"WWW-Authenticate": "Bearer"}, - ) - - # Проверяем учетные данные администратора - employee = employees.get_employee_by_credentials(db, first_name, last_name) - if not employee or not employee.is_admin or not verify_password(form_data.password, employee.hashed_password): - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Incorrect username or password", - headers={"WWW-Authenticate": "Bearer"}, - ) - - # Создаем и сохраняем токен - access_token = create_and_save_token(employee.id, db) - - return { - "access_token": access_token, - "token_type": "bearer" - } - diff --git a/backend/app/schemas/employee.py b/backend/app/schemas/employee.py index a55d5e1..b6af1ca 100644 --- a/backend/app/schemas/employee.py +++ b/backend/app/schemas/employee.py @@ -1,19 +1,21 @@ """Employee schemas""" -from pydantic import BaseModel -from datetime import datetime from typing import Optional +from pydantic import BaseModel, EmailStr class EmployeeBase(BaseModel): """Base employee schema""" + login: str first_name: str last_name: str department: str office: str + is_active: bool = True is_admin: bool = False class EmployeeCreate(EmployeeBase): """Employee creation schema""" password: str + hashed_password: Optional[str] = None class EmployeeUpdate(BaseModel): """Employee update schema""" @@ -21,12 +23,13 @@ class EmployeeUpdate(BaseModel): last_name: Optional[str] = None department: Optional[str] = None office: Optional[str] = None + password: Optional[str] = None + is_active: Optional[bool] = None + is_admin: Optional[bool] = None class Employee(EmployeeBase): """Employee schema""" id: int - is_active: bool - created_at: datetime class Config: """Pydantic config"""