1
0
mirror of https://gitlab.com/MoonTestUse1/AdministrationItDepartmens.git synced 2025-08-14 00:25:46 +02:00

Fix: Admin autorization

This commit is contained in:
MoonTestUse1
2025-01-07 08:10:40 +06:00
parent eaafb9cf8f
commit a40bd9af73
4 changed files with 92 additions and 74 deletions

View File

@@ -1,13 +1,9 @@
"""Employee CRUD operations"""
from typing import Optional, Dict, Any
from sqlalchemy.orm import Session
from typing import List, Optional
from ..models.employee import Employee
from ..schemas.employee import EmployeeCreate, EmployeeUpdate
from ..utils.loggers import auth_logger
def get_employees(db: Session, skip: int = 0, limit: int = 100) -> List[Employee]:
"""Get all employees"""
return db.query(Employee).offset(skip).limit(limit).all()
from ..models.employee import Employee
from ..utils.security import get_password_hash
def get_employee(db: Session, employee_id: int) -> Optional[Employee]:
"""Get employee by ID"""
@@ -20,35 +16,49 @@ def get_employee_by_credentials(db: Session, first_name: str, last_name: str) ->
Employee.last_name == last_name
).first()
def create_employee(db: Session, employee: EmployeeCreate, hashed_password: str) -> Employee:
def get_employee_by_login(db: Session, login: str) -> Optional[Employee]:
"""Get employee by login"""
return db.query(Employee).filter(Employee.login == login).first()
def get_employees(db: Session, skip: int = 0, limit: int = 100):
"""Get list of employees"""
return db.query(Employee).offset(skip).limit(limit).all()
def create_employee(db: Session, employee_data: Dict[str, Any]) -> Employee:
"""Create new employee"""
# Хешируем пароль
hashed_password = get_password_hash(employee_data["password"])
# Создаем сотрудника
db_employee = Employee(
first_name=employee.first_name,
last_name=employee.last_name,
department=employee.department,
office=employee.office,
login=employee_data.get("login"),
first_name=employee_data["first_name"],
last_name=employee_data["last_name"],
department=employee_data["department"],
office=employee_data["office"],
hashed_password=hashed_password,
is_admin=employee.is_admin
is_admin=employee_data.get("is_admin", False)
)
db.add(db_employee)
db.commit()
db.refresh(db_employee)
return db_employee
def update_employee(db: Session, employee_id: int, employee: EmployeeUpdate) -> Optional[Employee]:
"""Update employee data"""
db_employee = get_employee(db, employee_id)
if db_employee:
for key, value in employee.dict(exclude_unset=True).items():
setattr(db_employee, key, value)
db.commit()
db.refresh(db_employee)
return db_employee
def update_employee(db: Session, employee: Employee, employee_data: Dict[str, Any]) -> Employee:
"""Update employee"""
# Если есть пароль в данных, хешируем его
if "password" in employee_data:
employee_data["hashed_password"] = get_password_hash(employee_data.pop("password"))
# Обновляем поля
for field, value in employee_data.items():
setattr(employee, field, value)
db.commit()
db.refresh(employee)
return employee
def delete_employee(db: Session, employee_id: int) -> Optional[Employee]:
def delete_employee(db: Session, employee: Employee) -> None:
"""Delete employee"""
db_employee = get_employee(db, employee_id)
if db_employee:
db.delete(db_employee)
db.commit()
return db_employee
db.delete(employee)
db.commit()

View File

@@ -9,6 +9,7 @@ class Employee(Base):
__tablename__ = "employees"
id = Column(Integer, primary_key=True, index=True)
login = Column(String, unique=True, nullable=False, index=True)
first_name = Column(String, nullable=False)
last_name = Column(String, nullable=False)
department = Column(String, nullable=False)

View File

@@ -1,20 +1,57 @@
"""Authentication router"""
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from typing import Optional
from ..database import get_db
from ..crud import employees
from ..schemas.auth import Token, LoginCredentials
from ..utils.auth import verify_password
from ..schemas.token import Token
from ..utils.security import verify_password
from ..utils.jwt import create_and_save_token
from ..dependencies import get_db
router = APIRouter()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login")
ADMIN_LOGIN = "admin"
ADMIN_PASSWORD = "admin123"
@router.post("/admin/login", response_model=Token)
async def admin_login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(get_db)
):
"""Авторизация администратора"""
# Проверяем фиксированные учетные данные администратора
if form_data.username != ADMIN_LOGIN or form_data.password != ADMIN_PASSWORD:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# Получаем или создаем админа в базе
admin = employees.get_employee_by_login(db, ADMIN_LOGIN)
if not admin:
# Если админа нет в базе, создаем его
admin = employees.create_employee(db, {
"login": ADMIN_LOGIN,
"first_name": "Admin",
"last_name": "User",
"department": "IT",
"office": "Main",
"password": ADMIN_PASSWORD,
"is_admin": True
})
# Создаем и сохраняем токен
access_token = create_and_save_token(admin.id, db)
return {
"access_token": access_token,
"token_type": "bearer"
}
@router.post("/login", response_model=Token)
async def login_for_access_token(
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(get_db)
):
@@ -24,8 +61,8 @@ async def login_for_access_token(
first_name, last_name = form_data.username.split()
except ValueError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Username should be in format: 'First Last'",
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username must be in format: 'First Last'",
headers={"WWW-Authenticate": "Bearer"},
)
@@ -46,36 +83,3 @@ async def login_for_access_token(
"token_type": "bearer"
}
@router.post("/admin/login", response_model=Token)
async def admin_login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(get_db)
):
"""Авторизация администратора"""
# Разделяем username на имя и фамилию
try:
first_name, last_name = form_data.username.split()
except ValueError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Username should be in format: 'First Last'",
headers={"WWW-Authenticate": "Bearer"},
)
# Проверяем учетные данные администратора
employee = employees.get_employee_by_credentials(db, first_name, last_name)
if not employee or not employee.is_admin or not verify_password(form_data.password, employee.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# Создаем и сохраняем токен
access_token = create_and_save_token(employee.id, db)
return {
"access_token": access_token,
"token_type": "bearer"
}

View File

@@ -1,19 +1,21 @@
"""Employee schemas"""
from pydantic import BaseModel
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, EmailStr
class EmployeeBase(BaseModel):
"""Base employee schema"""
login: str
first_name: str
last_name: str
department: str
office: str
is_active: bool = True
is_admin: bool = False
class EmployeeCreate(EmployeeBase):
"""Employee creation schema"""
password: str
hashed_password: Optional[str] = None
class EmployeeUpdate(BaseModel):
"""Employee update schema"""
@@ -21,12 +23,13 @@ class EmployeeUpdate(BaseModel):
last_name: Optional[str] = None
department: Optional[str] = None
office: Optional[str] = None
password: Optional[str] = None
is_active: Optional[bool] = None
is_admin: Optional[bool] = None
class Employee(EmployeeBase):
"""Employee schema"""
id: int
is_active: bool
created_at: datetime
class Config:
"""Pydantic config"""