1
0
mirror of https://gitlab.com/MoonTestUse1/AdministrationItDepartmens.git synced 2025-08-14 00:25:46 +02:00

добавление редисаъ4

This commit is contained in:
MoonTestUse1
2025-01-03 16:53:39 +06:00
parent 1749cff039
commit 9f5cb4d4c2
3 changed files with 132 additions and 77 deletions

View File

@@ -0,0 +1,21 @@
"""merge heads
Revision ID: merge_heads
Revises: create_tokens_table
Create Date: 2024-01-03 10:50:00.000000
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'merge_heads'
down_revision = 'create_tokens_table'
branch_labels = None
depends_on = None
def upgrade() -> None:
pass
def downgrade() -> None:
pass

View File

@@ -1,24 +1,31 @@
"""Employee CRUD operations""" """Employee CRUD operations"""
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from ..models.employee import Employee from ..models.employee import Employee
from ..schemas.employee import EmployeeCreate, EmployeeUpdate
from ..utils.loggers import auth_logger from ..utils.loggers import auth_logger
def get_employees(db: Session): def get_employees(db: Session, skip: int = 0, limit: int = 100):
"""Get all employees""" """Get all employees"""
return db.query(Employee).all() return db.query(Employee).offset(skip).limit(limit).all()
def get_employee(db: Session, employee_id: int): def get_employee(db: Session, employee_id: int):
"""Get employee by ID""" """Get employee by ID"""
return db.query(Employee).filter(Employee.id == employee_id).first() return db.query(Employee).filter(Employee.id == employee_id).first()
def get_employee_by_lastname(db: Session, last_name: str): def get_employee_by_last_name(db: Session, last_name: str):
"""Get employee by last name""" """Get employee by last name"""
return db.query(Employee).filter(Employee.last_name == last_name).first() return db.query(Employee).filter(Employee.last_name == last_name).first()
def create_employee(db: Session, employee_data: dict): def create_employee(db: Session, employee: EmployeeCreate, hashed_password: str):
"""Create new employee""" """Create new employee"""
try: try:
db_employee = Employee(**employee_data) db_employee = Employee(
first_name=employee.first_name,
last_name=employee.last_name,
department=employee.department,
office=employee.office,
hashed_password=hashed_password
)
db.add(db_employee) db.add(db_employee)
db.commit() db.commit()
db.refresh(db_employee) db.refresh(db_employee)
@@ -26,4 +33,24 @@ def create_employee(db: Session, employee_data: dict):
except Exception as e: except Exception as e:
db.rollback() db.rollback()
auth_logger.error(f"Error creating employee: {e}") auth_logger.error(f"Error creating employee: {e}")
raise raise
def update_employee(db: Session, employee_id: int, employee: EmployeeUpdate):
db_employee = get_employee(db, employee_id)
if not db_employee:
return None
update_data = employee.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(db_employee, field, value)
db.commit()
db.refresh(db_employee)
return db_employee
def delete_employee(db: Session, employee_id: int):
db_employee = get_employee(db, employee_id)
if db_employee:
db.delete(db_employee)
db.commit()
return db_employee

View File

@@ -1,91 +1,98 @@
"""Employees router""" """Employees router"""
from fastapi import APIRouter, Depends, HTTPException from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from typing import List from typing import List
from ..database import get_db from ..database import get_db
from ..models.employee import Employee from ..crud import employees
from ..schemas.employee import EmployeeCreate, EmployeeResponse, EmployeeUpdate from ..schemas.employee import Employee, EmployeeCreate, EmployeeUpdate
from ..utils.auth import get_current_admin from ..utils.auth import get_current_admin
from passlib.context import CryptContext from ..utils.auth import get_password_hash
router = APIRouter() router = APIRouter()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
@router.get("", response_model=List[EmployeeResponse]) @router.post("", response_model=Employee)
@router.get("/", response_model=List[EmployeeResponse]) def create_employee(
def get_employees(db: Session = Depends(get_db), _: dict = Depends(get_current_admin)): employee: EmployeeCreate,
"""Get all employees"""
employees = db.query(Employee).all()
return employees
@router.get("/{employee_id}", response_model=EmployeeResponse)
def get_employee(employee_id: int, db: Session = Depends(get_db), _: dict = Depends(get_current_admin)):
"""Get employee by ID"""
employee = db.query(Employee).filter(Employee.id == employee_id).first()
if not employee:
raise HTTPException(status_code=404, detail="Сотрудник не найден")
return employee
@router.post("", response_model=EmployeeResponse)
@router.post("/", response_model=EmployeeResponse)
def create_employee(employee: EmployeeCreate, db: Session = Depends(get_db), _: dict = Depends(get_current_admin)):
"""Create new employee"""
# Хешируем пароль
hashed_password = pwd_context.hash(employee.password)
# Создаем нового сотрудника
db_employee = Employee(
first_name=employee.first_name,
last_name=employee.last_name,
department=employee.department,
office=employee.office,
password=hashed_password
)
# Сохраняем в базу данных
db.add(db_employee)
db.commit()
db.refresh(db_employee)
return db_employee
@router.put("/{employee_id}", response_model=EmployeeResponse)
def update_employee(
employee_id: int,
employee_update: EmployeeUpdate,
db: Session = Depends(get_db), db: Session = Depends(get_db),
_: dict = Depends(get_current_admin) _: dict = Depends(get_current_admin)
): ):
"""Update employee data""" """
db_employee = db.query(Employee).filter(Employee.id == employee_id).first() Создание нового сотрудника (только для админа)
if not db_employee: """
raise HTTPException(status_code=404, detail="Сотрудник не найден") # Хэшируем пароль
hashed_password = get_password_hash(employee.password)
# Обновляем данные # Создаем сотрудника
update_data = employee_update.model_dump(exclude_unset=True) db_employee = employees.create_employee(
db=db,
# Если передан пароль, хешируем его employee=employee,
if 'password' in update_data: hashed_password=hashed_password
update_data['password'] = pwd_context.hash(update_data['password']) )
for field, value in update_data.items():
setattr(db_employee, field, value)
db.commit()
db.refresh(db_employee)
return db_employee return db_employee
@router.get("", response_model=List[Employee])
def get_employees(
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db),
_: dict = Depends(get_current_admin)
):
"""
Получение списка всех сотрудников (только для админа)
"""
employees_list = employees.get_employees(db, skip=skip, limit=limit)
return employees_list
@router.get("/{employee_id}", response_model=Employee)
def get_employee(
employee_id: int,
db: Session = Depends(get_db),
_: dict = Depends(get_current_admin)
):
"""
Получение информации о сотруднике по ID (только для админа)
"""
db_employee = employees.get_employee(db, employee_id=employee_id)
if db_employee is None:
raise HTTPException(status_code=404, detail="Employee not found")
return db_employee
@router.put("/{employee_id}", response_model=Employee)
def update_employee(
employee_id: int,
employee: EmployeeUpdate,
db: Session = Depends(get_db),
_: dict = Depends(get_current_admin)
):
"""
Обновление информации о сотруднике (только для админа)
"""
db_employee = employees.get_employee(db, employee_id=employee_id)
if db_employee is None:
raise HTTPException(status_code=404, detail="Employee not found")
# Если указан новый пароль, хэшируем его
if employee.password:
employee.password = get_password_hash(employee.password)
updated_employee = employees.update_employee(
db=db,
employee_id=employee_id,
employee=employee
)
return updated_employee
@router.delete("/{employee_id}") @router.delete("/{employee_id}")
def delete_employee( def delete_employee(
employee_id: int, employee_id: int,
db: Session = Depends(get_db), db: Session = Depends(get_db),
_: dict = Depends(get_current_admin) _: dict = Depends(get_current_admin)
): ):
"""Delete employee""" """
db_employee = db.query(Employee).filter(Employee.id == employee_id).first() Удаление сотрудника (только для админа)
if not db_employee: """
raise HTTPException(status_code=404, detail="Сотрудник не найден") db_employee = employees.get_employee(db, employee_id=employee_id)
if db_employee is None:
db.delete(db_employee) raise HTTPException(status_code=404, detail="Employee not found")
db.commit() employees.delete_employee(db=db, employee_id=employee_id)
return {"message": "Сотрудник успешно удален"} return {"message": "Employee deleted successfully"}