diff --git a/backend/.coverage b/backend/.coverage new file mode 100644 index 0000000..0ffab9d Binary files /dev/null and b/backend/.coverage differ diff --git a/backend/Dockerfile b/backend/Dockerfile index 5e32287..3f2b020 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -1,39 +1,10 @@ -# Use Python 3.11 FROM python:3.11-slim -# Set working directory WORKDIR /app -# Install system dependencies -RUN apt-get update && apt-get install -y \ - gcc \ - libpq-dev \ - postgresql-client \ - && rm -rf /var/lib/apt/lists/* - -# Copy requirements file COPY requirements.txt . - -# Install Python dependencies RUN pip install --no-cache-dir -r requirements.txt -# Copy application code COPY . . -# Create script to run migrations and start app -RUN echo '#!/bin/sh\n\ -echo "Waiting for database..."\n\ -while ! pg_isready -h db -p 5432 -U postgres; do\n\ - sleep 1\n\ -done\n\ -echo "Database is ready!"\n\ -echo "Running migrations..."\n\ -cd /app && alembic upgrade head\n\ -echo "Starting application..."\n\ -python run.py' > /app/start.sh && chmod +x /app/start.sh - -# Expose port -EXPOSE 8000 - -# Run migrations and start application -CMD ["/app/start.sh"] \ No newline at end of file +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] \ No newline at end of file diff --git a/backend/app/api/endpoints/auth.py b/backend/app/api/endpoints/auth.py new file mode 100644 index 0000000..43ab3c2 --- /dev/null +++ b/backend/app/api/endpoints/auth.py @@ -0,0 +1,55 @@ +"""Authentication endpoints.""" +from datetime import timedelta +from typing import Any +from fastapi import APIRouter, Depends, HTTPException, status +from fastapi.security import OAuth2PasswordRequestForm +from sqlalchemy.orm import Session + +from app.core.auth import ( + authenticate_user, + create_access_token, + get_current_user +) +from app.database import get_db +from app.core.config import settings +from app.models.user import User +from app.schemas.token import Token +from app.schemas.user import User as UserSchema + +router = APIRouter() + +@router.post("/login", response_model=Token) +def login( + db: Session = Depends(get_db), + form_data: OAuth2PasswordRequestForm = Depends() +) -> Any: + """OAuth2 compatible token login, get an access token for future requests.""" + user = authenticate_user(db, form_data.username, form_data.password) + if not user: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Incorrect email or password", + headers={"WWW-Authenticate": "Bearer"}, + ) + elif not user.is_active: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Inactive user" + ) + + access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) + access_token = create_access_token( + data={"sub": user.email, "is_admin": user.is_admin}, + expires_delta=access_token_expires + ) + + return { + "access_token": access_token, + "token_type": "bearer" + } + +@router.get("/me", response_model=UserSchema) +def read_users_me(current_user: User = Depends(get_current_user)): + """Get current user.""" + return current_user + diff --git a/backend/app/crud/employees.py b/backend/app/crud/employees.py index 0685a03..2a5505c 100644 --- a/backend/app/crud/employees.py +++ b/backend/app/crud/employees.py @@ -13,19 +13,20 @@ def get_employee(db: Session, employee_id: int) -> Optional[Employee]: """Get employee by ID""" return db.query(Employee).filter(Employee.id == employee_id).first() -def get_employee_by_last_name(db: Session, last_name: str) -> Optional[Employee]: - """Get employee by last name""" - return db.query(Employee).filter(Employee.last_name == last_name).first() +def get_employee_by_email(db: Session, email: str) -> Optional[Employee]: + """Get employee by email""" + return db.query(Employee).filter(Employee.email == email).first() def create_employee(db: Session, employee: EmployeeCreate, hashed_password: str) -> Employee: """Create new employee""" try: db_employee = Employee( - first_name=employee.first_name, - last_name=employee.last_name, - department=employee.department, - office=employee.office, - hashed_password=hashed_password + email=employee.email, + full_name=employee.full_name, + hashed_password=hashed_password, + is_active=employee.is_active, + is_admin=employee.is_admin, + department=employee.department ) db.add(db_employee) db.commit() diff --git a/backend/app/crud/requests.py b/backend/app/crud/requests.py index 210663d..917044b 100644 --- a/backend/app/crud/requests.py +++ b/backend/app/crud/requests.py @@ -48,8 +48,7 @@ def get_request_details(db: Session, request_id: int) -> Optional[Dict]: "status": request.status, "department": request.department, "created_at": request.created_at.isoformat(), - "employee_first_name": employee.first_name, - "employee_last_name": employee.last_name + "employee_full_name": employee.full_name } def get_employee_requests(db: Session, employee_id: int) -> list[Request]: @@ -81,6 +80,12 @@ def get_statistics(db: Session) -> Dict: func.count(Request.id) ).group_by(Request.status).all() ) + + # Добавляем статусы с нулевым количеством + for status in RequestStatus: + if status not in by_status: + by_status[status] = 0 + return { "total": total, "by_status": by_status diff --git a/backend/app/models/employee.py b/backend/app/models/employee.py index 2b70ae1..3406b7d 100644 --- a/backend/app/models/employee.py +++ b/backend/app/models/employee.py @@ -1,5 +1,5 @@ """Employee model""" -from sqlalchemy import Column, Integer, String, DateTime +from sqlalchemy import Column, Integer, String, DateTime, Boolean from sqlalchemy.sql import func from sqlalchemy.orm import relationship from app.db.base_class import Base @@ -8,12 +8,13 @@ class Employee(Base): __tablename__ = "employees" id = Column(Integer, primary_key=True, index=True) - first_name = Column(String, nullable=False) - last_name = Column(String, nullable=False) - department = Column(String, nullable=False) - office = Column(String, nullable=False) + email = Column(String, unique=True, index=True, nullable=False) + full_name = Column(String, nullable=False) hashed_password = Column(String, nullable=False) + is_active = Column(Boolean, default=True) + is_admin = Column(Boolean, default=False) created_at = Column(DateTime(timezone=True), server_default=func.now()) + department = Column(String, nullable=True) # Определяем отношение к Request requests = relationship("Request", back_populates="employee", cascade="all, delete-orphan") \ No newline at end of file diff --git a/backend/app/models/token.py b/backend/app/models/token.py index 13779bf..510073f 100644 --- a/backend/app/models/token.py +++ b/backend/app/models/token.py @@ -8,5 +8,5 @@ class Token(Base): id = Column(Integer, primary_key=True, index=True) token = Column(String, unique=True, index=True) - user_id = Column(Integer, index=True) # -1 для админа, остальные для сотрудников + user_id = Column(Integer, index=True) # ID сотрудника из таблицы employees created_at = Column(DateTime(timezone=True), server_default=func.now()) \ No newline at end of file diff --git a/backend/app/routers/auth.py b/backend/app/routers/auth.py index 6486de6..2a56bc7 100644 --- a/backend/app/routers/auth.py +++ b/backend/app/routers/auth.py @@ -18,8 +18,9 @@ async def login_for_access_token( form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db) ): + """Авторизация сотрудника""" # Проверяем учетные данные сотрудника - employee = employees.get_employee_by_last_name(db, form_data.username) + employee = employees.get_employee_by_email(db, form_data.username) if not employee or not verify_password(form_data.password, employee.hashed_password): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, @@ -40,17 +41,18 @@ async def admin_login( form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db) ): + """Авторизация администратора""" # Проверяем учетные данные администратора - if form_data.username != "admin" or form_data.password != "admin123": + employee = employees.get_employee_by_email(db, form_data.username) + if not employee or not employee.is_admin or not verify_password(form_data.password, employee.hashed_password): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) - # Для админа используем специальный ID - admin_id = -1 - access_token = create_and_save_token(admin_id, db) + # Создаем и сохраняем токен + access_token = create_and_save_token(employee.id, db) return { "access_token": access_token, diff --git a/backend/app/routers/employees.py b/backend/app/routers/employees.py index 4d625d6..4112bac 100644 --- a/backend/app/routers/employees.py +++ b/backend/app/routers/employees.py @@ -5,19 +5,21 @@ from typing import List import logging from ..database import get_db from ..crud import employees -from ..schemas.employee import Employee, EmployeeCreate, EmployeeUpdate -from ..utils.auth import get_current_admin, get_password_hash +from ..schemas.employee import Employee as EmployeeSchema +from ..schemas.employee import EmployeeCreate, EmployeeUpdate +from ..models.employee import Employee +from ..utils.auth import get_current_admin, get_current_employee, get_password_hash # Настройка логирования logger = logging.getLogger(__name__) router = APIRouter(tags=["employees"]) -@router.post("", response_model=Employee, status_code=status.HTTP_201_CREATED) +@router.post("", response_model=EmployeeSchema, status_code=status.HTTP_201_CREATED) async def create_employee( employee: EmployeeCreate, db: Session = Depends(get_db), - _: dict = Depends(get_current_admin) + current_admin: Employee = Depends(get_current_admin) ): """Create new employee""" try: @@ -31,12 +33,12 @@ async def create_employee( detail="Error creating employee" ) -@router.get("", response_model=List[Employee]) +@router.get("", response_model=List[EmployeeSchema]) async def get_employees( skip: int = 0, limit: int = 100, db: Session = Depends(get_db), - _: dict = Depends(get_current_admin) + current_admin: Employee = Depends(get_current_admin) ): """Get all employees""" try: @@ -49,11 +51,40 @@ async def get_employees( detail="Error getting employees" ) -@router.get("/{employee_id}", response_model=Employee) +@router.get("/me", response_model=EmployeeSchema) +async def get_me( + current_employee: Employee = Depends(get_current_employee) +): + """Get current employee""" + return current_employee + +@router.put("/me", response_model=EmployeeSchema) +async def update_me( + employee: EmployeeUpdate, + db: Session = Depends(get_db), + current_employee: Employee = Depends(get_current_employee) +): + """Update current employee data""" + try: + logger.info(f"Updating employee {current_employee.id}: {employee}") + db_employee = employees.update_employee(db, current_employee.id, employee) + if db_employee is None: + raise HTTPException(status_code=404, detail="Employee not found") + return db_employee + except HTTPException: + raise + except Exception as e: + logger.error(f"Error updating employee: {e}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Error updating employee" + ) + +@router.get("/{employee_id}", response_model=EmployeeSchema) async def get_employee( employee_id: int, db: Session = Depends(get_db), - _: dict = Depends(get_current_admin) + current_admin: Employee = Depends(get_current_admin) ): """Get employee by ID""" try: @@ -71,12 +102,12 @@ async def get_employee( detail="Error getting employee" ) -@router.put("/{employee_id}", response_model=Employee) +@router.put("/{employee_id}", response_model=EmployeeSchema) async def update_employee( employee_id: int, employee: EmployeeUpdate, db: Session = Depends(get_db), - _: dict = Depends(get_current_admin) + current_admin: Employee = Depends(get_current_admin) ): """Update employee data""" try: @@ -94,11 +125,11 @@ async def update_employee( detail="Error updating employee" ) -@router.delete("/{employee_id}", response_model=Employee) +@router.delete("/{employee_id}", response_model=EmployeeSchema) async def delete_employee( employee_id: int, db: Session = Depends(get_db), - _: dict = Depends(get_current_admin) + current_admin: Employee = Depends(get_current_admin) ): """Delete employee""" try: diff --git a/backend/app/routers/requests.py b/backend/app/routers/requests.py index 3c5e142..8bc97f0 100644 --- a/backend/app/routers/requests.py +++ b/backend/app/routers/requests.py @@ -6,19 +6,20 @@ from ..database import get_db from ..crud import requests from ..schemas.request import Request, RequestCreate, RequestUpdate from ..models.request import RequestStatus +from ..models.employee import Employee from ..utils.auth import get_current_employee, get_current_admin from ..utils.telegram import notify_new_request router = APIRouter() -@router.post("/", response_model=Request) +@router.post("/", response_model=Request, status_code=201) async def create_request( request: RequestCreate, db: Session = Depends(get_db), - current_employee: dict = Depends(get_current_employee) + current_employee: Employee = Depends(get_current_employee) ): """Create new request""" - db_request = requests.create_request(db, request, current_employee["id"]) + db_request = requests.create_request(db, request, current_employee.id) # Отправляем уведомление в Telegram await notify_new_request(db_request.id) return db_request @@ -26,10 +27,10 @@ async def create_request( @router.get("/my", response_model=List[Request]) def get_employee_requests( db: Session = Depends(get_db), - current_employee: dict = Depends(get_current_employee) + current_employee: Employee = Depends(get_current_employee) ): """Get current employee's requests""" - return requests.get_employee_requests(db, current_employee["id"]) + return requests.get_employee_requests(db, current_employee.id) @router.get("/admin", response_model=List[Request]) def get_all_requests( @@ -37,7 +38,7 @@ def get_all_requests( skip: int = 0, limit: int = 100, db: Session = Depends(get_db), - _: dict = Depends(get_current_admin) + current_admin: Employee = Depends(get_current_admin) ): """Get all requests (admin only)""" return requests.get_requests(db, status=status, skip=skip, limit=limit) @@ -47,9 +48,15 @@ def update_request_status( request_id: int, request_update: RequestUpdate, db: Session = Depends(get_db), - _: dict = Depends(get_current_admin) + current_employee: Employee = Depends(get_current_employee) ): """Update request status (admin only)""" + if not current_employee.is_admin: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Not enough permissions" + ) + db_request = requests.update_request_status(db, request_id, request_update.status) if db_request is None: raise HTTPException(status_code=404, detail="Request not found") @@ -58,14 +65,11 @@ def update_request_status( @router.get("/statistics") def get_request_statistics( db: Session = Depends(get_db), - _: dict = Depends(get_current_admin) + current_admin: Employee = Depends(get_current_admin) ): """Get request statistics (admin only)""" stats = requests.get_statistics(db) return { "total": stats["total"], - "by_status": { - status: count - for status, count in stats["by_status"].items() - } + "by_status": stats["by_status"] } \ No newline at end of file diff --git a/backend/app/routers/statistics.py b/backend/app/routers/statistics.py index 4c5ba6b..17e4cd0 100644 --- a/backend/app/routers/statistics.py +++ b/backend/app/routers/statistics.py @@ -2,7 +2,8 @@ from fastapi import APIRouter, Depends from sqlalchemy.orm import Session from ..database import get_db -from ..crud import statistics +from ..crud import requests +from ..models.employee import Employee from ..utils.auth import get_current_admin router = APIRouter() @@ -10,7 +11,11 @@ router = APIRouter() @router.get("/") def get_statistics( db: Session = Depends(get_db), - _: dict = Depends(get_current_admin) + current_admin: Employee = Depends(get_current_admin) ): - """Get system statistics""" - return statistics.get_request_statistics(db) \ No newline at end of file + """Get request statistics (admin only)""" + stats = requests.get_statistics(db) + return { + "total": stats["total"], + "by_status": stats["by_status"] + } \ No newline at end of file diff --git a/backend/app/schemas/employee.py b/backend/app/schemas/employee.py index f0c8e20..639ee53 100644 --- a/backend/app/schemas/employee.py +++ b/backend/app/schemas/employee.py @@ -4,21 +4,26 @@ from datetime import datetime from typing import Optional class EmployeeBase(BaseModel): - first_name: str - last_name: str + email: str + full_name: str department: str - office: str + is_active: bool = True + is_admin: bool = False model_config = ConfigDict(from_attributes=True) class EmployeeCreate(EmployeeBase): password: str -class EmployeeUpdate(EmployeeBase): - first_name: Optional[str] = None - last_name: Optional[str] = None +class EmployeeUpdate(BaseModel): + email: Optional[str] = None + full_name: Optional[str] = None department: Optional[str] = None - office: Optional[str] = None + password: Optional[str] = None + is_active: Optional[bool] = None + is_admin: Optional[bool] = None + + model_config = ConfigDict(from_attributes=True) class Employee(EmployeeBase): id: int diff --git a/backend/app/schemas/request.py b/backend/app/schemas/request.py index 13ed921..62475c5 100644 --- a/backend/app/schemas/request.py +++ b/backend/app/schemas/request.py @@ -23,7 +23,7 @@ class Request(RequestBase): id: int status: RequestStatus employee_id: int - department: str + department: Optional[str] = None created_at: datetime model_config = ConfigDict(from_attributes=True) \ No newline at end of file diff --git a/backend/app/utils/auth.py b/backend/app/utils/auth.py index b73c92f..7e3755f 100644 --- a/backend/app/utils/auth.py +++ b/backend/app/utils/auth.py @@ -7,6 +7,7 @@ import re from .jwt import verify_token from ..database import get_db +from ..models.employee import Employee pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") security = HTTPBearer(auto_error=False) @@ -22,7 +23,7 @@ def verify_password(plain_password: str, hashed_password: str) -> bool: def get_current_admin( credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db) -) -> dict: +) -> Employee: """Get current admin from token""" if not credentials: raise HTTPException( @@ -36,26 +37,28 @@ def get_current_admin( payload = verify_token(token, db) employee_id = int(payload.get("sub")) - # Проверяем, что это админ (id = -1) - if employee_id != -1: + # Получаем сотрудника из БД + from ..crud.employees import get_employee + employee = get_employee(db, employee_id) + if not employee or not employee.is_admin: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, - detail="Not an admin", + detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) - return {"is_admin": True} - except Exception as e: + return employee + except Exception: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, - detail="Invalid authentication credentials", + detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) def get_current_employee( credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db) -) -> dict: +) -> Employee: """Get current employee from token""" if not credentials: raise HTTPException( @@ -69,18 +72,20 @@ def get_current_employee( payload = verify_token(token, db) employee_id = int(payload.get("sub")) - # Проверяем, что это не админ - if employee_id == -1: + # Получаем сотрудника из БД + from ..crud.employees import get_employee + employee = get_employee(db, employee_id) + if not employee: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, - detail="Admin cannot access employee endpoints", + detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) - return {"id": employee_id} + return employee except Exception: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, - detail="Invalid authentication credentials", + detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) \ No newline at end of file diff --git a/backend/app/utils/jwt.py b/backend/app/utils/jwt.py index a75923b..4b1aa31 100644 --- a/backend/app/utils/jwt.py +++ b/backend/app/utils/jwt.py @@ -35,18 +35,9 @@ def verify_token(token: str, db: Session) -> dict: # Проверяем токен в Redis if not redis.get(f"token:{token}"): - # Если токена нет в Redis, проверяем в БД - db_token = db.query(Token).filter(Token.token == token).first() - if not db_token: - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Token is invalid", - ) - # Если токен валиден, кэшируем его в Redis - redis.setex( - f"token:{token}", - timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES), - "valid" + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Could not validate credentials", ) return payload diff --git a/backend/test.db b/backend/test.db index 9a95898..6644186 100644 Binary files a/backend/test.db and b/backend/test.db differ diff --git a/backend/tests/conftest.py b/backend/tests/conftest.py index 0011a2c..a744c8f 100644 --- a/backend/tests/conftest.py +++ b/backend/tests/conftest.py @@ -1,101 +1,127 @@ +"""Test configuration.""" +import os import pytest +from typing import Generator +from fastapi.testclient import TestClient from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker -from unittest.mock import MagicMock -from app.db.base import Base -from app.database import get_db -from app.main import app -from app.utils.jwt import create_and_save_token, redis -from app.crud import employees -from app.utils.auth import get_password_hash -from app.models.token import Token -from app.models.employee import Employee -from app.models.request import Request -from app.schemas.employee import EmployeeCreate +from sqlalchemy.pool import StaticPool +from unittest.mock import Mock, patch -# Используем SQLite для тестов +from app.database import Base, get_db +from app.main import app +from app.models.employee import Employee +from app.utils.auth import get_password_hash +from app.utils.jwt import create_access_token +from app.core.config import settings + +# Создаем тестовую базу данных в памяти SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db" engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}) TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) -# Создаем мок для Redis -class RedisMock: +class MockRedis: + """Мок для Redis.""" def __init__(self): self.data = {} - def setex(self, name, time, value): - self.data[name] = value + def get(self, key): + return self.data.get(key) + + def set(self, key, value, ex=None): + self.data[key] = value return True - def get(self, name): - return self.data.get(name) - - def delete(self, name): - if name in self.data: - del self.data[name] + def delete(self, key): + if key in self.data: + del self.data[key] return True -@pytest.fixture(autouse=True) -def mock_redis(monkeypatch): - redis_mock = RedisMock() - monkeypatch.setattr("app.utils.jwt.redis", redis_mock) - return redis_mock + def exists(self, key): + return key in self.data @pytest.fixture(scope="function") -def test_db(): - # Удаляем все таблицы - Base.metadata.drop_all(bind=engine) - # Создаем все таблицы заново +def redis_mock(): + """Фикстура для мока Redis.""" + with patch("app.utils.jwt.redis") as mock: + redis_instance = MockRedis() + mock.get.side_effect = redis_instance.get + mock.set.side_effect = redis_instance.set + mock.delete.side_effect = redis_instance.delete + mock.exists.side_effect = redis_instance.exists + yield mock + +@pytest.fixture(scope="function") +def db() -> Generator: + """Фикстура для создания тестовой базы данных.""" Base.metadata.create_all(bind=engine) - - # Создаем сессию db = TestingSessionLocal() try: yield db finally: db.close() + Base.metadata.drop_all(bind=engine) @pytest.fixture(scope="function") -def test_employee(test_db): - hashed_password = get_password_hash("testpass123") - employee_data = EmployeeCreate( - first_name="Test", - last_name="User", - department="IT", - office="101", - password="testpass123" +def client(db: TestingSessionLocal, redis_mock) -> Generator: + """Фикстура для создания тестового клиента.""" + def override_get_db(): + try: + yield db + finally: + db.close() + + app.dependency_overrides[get_db] = override_get_db + return TestClient(app) + +@pytest.fixture(scope="function") +def test_employee(db: TestingSessionLocal) -> Employee: + """Фикстура для создания тестового сотрудника.""" + employee = Employee( + email="test@example.com", + full_name="Test Employee", + hashed_password=get_password_hash("testpassword"), + is_active=True, + is_admin=False, + department="IT" ) - employee = employees.create_employee(test_db, employee_data, hashed_password) + db.add(employee) + db.commit() + db.refresh(employee) return employee @pytest.fixture(scope="function") -def test_token(test_db, test_employee): - token = create_and_save_token(test_employee.id, test_db) +def test_admin(db: TestingSessionLocal) -> Employee: + """Фикстура для создания тестового администратора.""" + admin = Employee( + email="admin@example.com", + full_name="Test Admin", + hashed_password=get_password_hash("adminpassword"), + is_active=True, + is_admin=True, + department="Administration" + ) + db.add(admin) + db.commit() + db.refresh(admin) + return admin + +@pytest.fixture(scope="function") +def employee_token(test_employee: Employee, db: TestingSessionLocal) -> str: + """Фикстура для создания токена тестового сотрудника.""" + from app.utils.jwt import create_access_token + token = create_access_token({"sub": str(test_employee.id)}) + # Сохраняем токен в Redis мок + from app.utils.jwt import redis + redis.set(f"token:{token}", "valid") return token @pytest.fixture(scope="function") -def test_auth_header(test_token): - return {"Authorization": f"Bearer {test_token}"} - -@pytest.fixture(scope="function") -def admin_token(test_db): - token = create_and_save_token(-1, test_db) # -1 для админа - return token - -@pytest.fixture(scope="function") -def admin_auth_header(admin_token): - return {"Authorization": f"Bearer {admin_token}"} - -@pytest.fixture(scope="function") -def test_employee_id(test_employee): - return test_employee.id - -# Переопределяем зависимость для получения БД -def override_get_db(): - db = TestingSessionLocal() - try: - yield db - finally: - db.close() - -app.dependency_overrides[get_db] = override_get_db \ No newline at end of file +def admin_token(test_admin: Employee, db: TestingSessionLocal) -> str: + """Фикстура для создания токена администратора.""" + from app.utils.jwt import create_access_token + token = create_access_token({"sub": str(test_admin.id)}) + # Сохраняем токен в Redis мок + from app.utils.jwt import redis + redis.set(f"token:{token}", "valid") + return token \ No newline at end of file diff --git a/backend/tests/test_auth.py b/backend/tests/test_auth.py index c6b73d3..420b8ff 100644 --- a/backend/tests/test_auth.py +++ b/backend/tests/test_auth.py @@ -1,98 +1,80 @@ +"""Authentication tests.""" import pytest from fastapi.testclient import TestClient from sqlalchemy.orm import Session -from app.main import app -from app.crud import employees -from app.utils.auth import verify_password, get_password_hash -from app.schemas.employee import EmployeeCreate +from app.models.employee import Employee -client = TestClient(app) - -def test_login_success(test_db: Session): - # Создаем тестового сотрудника - hashed_password = get_password_hash("testpass123") - employee_data = EmployeeCreate( - first_name="Test", - last_name="User", - department="IT", - office="101", - password="testpass123" - ) - employee = employees.create_employee(test_db, employee_data, hashed_password) - +def test_login_employee_success(client: TestClient, test_employee: Employee): + """Тест успешной авторизации сотрудника.""" response = client.post( "/api/auth/login", - headers={"Content-Type": "application/x-www-form-urlencoded"}, - data={ - "username": "User", - "password": "testpass123" - } + data={"username": test_employee.email, "password": "testpassword"} ) - assert response.status_code == 200 assert "access_token" in response.json() + assert "token_type" in response.json() assert response.json()["token_type"] == "bearer" -def test_login_wrong_password(test_db: Session): - # Создаем тестового сотрудника - hashed_password = get_password_hash("testpass123") - employee_data = EmployeeCreate( - first_name="Test", - last_name="User", - department="IT", - office="101", - password="testpass123" - ) - employees.create_employee(test_db, employee_data, hashed_password) - +def test_login_employee_wrong_password(client: TestClient, test_employee: Employee): + """Тест авторизации сотрудника с неверным паролем.""" response = client.post( "/api/auth/login", - headers={"Content-Type": "application/x-www-form-urlencoded"}, - data={ - "username": "User", - "password": "wrongpass" - } + data={"username": test_employee.email, "password": "wrongpassword"} ) - assert response.status_code == 401 - assert "detail" in response.json() + assert response.json()["detail"] == "Incorrect username or password" -def test_login_nonexistent_user(test_db: Session): +def test_login_employee_wrong_username(client: TestClient): + """Тест авторизации с несуществующим пользователем.""" response = client.post( "/api/auth/login", - headers={"Content-Type": "application/x-www-form-urlencoded"}, - data={ - "username": "NonExistent", - "password": "testpass123" - } + data={"username": "nonexistent@example.com", "password": "testpassword"} ) - assert response.status_code == 401 - assert "detail" in response.json() + assert response.json()["detail"] == "Incorrect username or password" -def test_admin_login_success(): +def test_login_admin_success(client: TestClient, test_admin: Employee): + """Тест успешной авторизации администратора.""" response = client.post( "/api/auth/admin/login", - headers={"Content-Type": "application/x-www-form-urlencoded"}, - data={ - "username": "admin", - "password": "admin123" - } + data={"username": test_admin.email, "password": "adminpassword"} ) - assert response.status_code == 200 assert "access_token" in response.json() + assert "token_type" in response.json() assert response.json()["token_type"] == "bearer" -def test_admin_login_wrong_password(): +def test_login_admin_wrong_password(client: TestClient, test_admin: Employee): + """Тест авторизации администратора с неверным паролем.""" response = client.post( "/api/auth/admin/login", - headers={"Content-Type": "application/x-www-form-urlencoded"}, - data={ - "username": "admin", - "password": "wrongpass" - } + data={"username": test_admin.email, "password": "wrongpassword"} ) - assert response.status_code == 401 - assert "detail" in response.json() \ No newline at end of file + assert response.json()["detail"] == "Incorrect username or password" + +def test_protected_route_with_valid_token(client: TestClient, employee_token: str, test_employee: Employee, db: Session): + """Тест доступа к защищенному маршруту с валидным токеном.""" + response = client.get( + "/api/employees/me", + headers={"Authorization": f"Bearer {employee_token}"} + ) + assert response.status_code == 200 + data = response.json() + assert data["email"] == test_employee.email + assert data["full_name"] == test_employee.full_name + +def test_protected_route_without_token(client: TestClient): + """Тест доступа к защищенному маршруту без токена.""" + response = client.get("/api/employees/me") + assert response.status_code == 401 + assert response.json()["detail"] == "Not authenticated" + +def test_protected_route_with_invalid_token(client: TestClient): + """Тест доступа к защищенному маршруту с недействительным токеном.""" + response = client.get( + "/api/employees/me", + headers={"Authorization": "Bearer invalid_token"} + ) + assert response.status_code == 401 + assert response.json()["detail"] == "Could not validate credentials" \ No newline at end of file diff --git a/backend/tests/test_employees.py b/backend/tests/test_employees.py index acd5f77..10a037f 100644 --- a/backend/tests/test_employees.py +++ b/backend/tests/test_employees.py @@ -1,117 +1,135 @@ +"""Employee tests.""" import pytest from fastapi.testclient import TestClient from sqlalchemy.orm import Session -from app.main import app -from app.crud import employees -from app.utils.auth import get_password_hash -from app.schemas.employee import EmployeeCreate +from app.models.employee import Employee -client = TestClient(app) - -def test_create_employee(test_db: Session, admin_auth_header): - """Test creating a new employee""" - employee_data = { - "first_name": "John", - "last_name": "Doe", - "department": "IT", - "office": "B205", - "password": "test123" - } - +def test_create_employee(client: TestClient, admin_token: str, db: Session): + """Тест создания сотрудника.""" response = client.post( - "/api/employees/", - json=employee_data, - headers=admin_auth_header + "/api/employees", + headers={"Authorization": f"Bearer {admin_token}"}, + json={ + "email": "new@example.com", + "password": "newpassword", + "full_name": "New Employee", + "department": "IT", + "is_active": True, + "is_admin": False + } + ) + assert response.status_code == 201 + data = response.json() + assert data["email"] == "new@example.com" + assert data["full_name"] == "New Employee" + assert data["department"] == "IT" + assert "id" in data + +def test_create_employee_unauthorized(client: TestClient): + """Тест создания сотрудника без авторизации.""" + response = client.post( + "/api/employees", + json={ + "email": "new@example.com", + "password": "newpassword", + "full_name": "New Employee", + "is_active": True, + "is_admin": False + } + ) + assert response.status_code == 401 + assert response.json()["detail"] == "Not authenticated" + +def test_get_employees(client: TestClient, admin_token: str, test_employee: Employee, db: Session): + """Тест получения списка сотрудников.""" + response = client.get( + "/api/employees", + headers={"Authorization": f"Bearer {admin_token}"} ) - assert response.status_code == 200 data = response.json() - assert data["first_name"] == employee_data["first_name"] - assert data["last_name"] == employee_data["last_name"] - assert data["department"] == employee_data["department"] - assert data["office"] == employee_data["office"] - assert "password" not in data + assert isinstance(data, list) + assert len(data) > 0 + assert "email" in data[0] + assert "full_name" in data[0] + assert "department" in data[0] -def test_get_employees(test_db: Session, test_employee, admin_auth_header): - """Test getting list of employees""" - response = client.get("/api/employees/", headers=admin_auth_header) - - assert response.status_code == 200 - data = response.json() - assert len(data) >= 1 - assert data[0]["first_name"] == test_employee.first_name - assert data[0]["last_name"] == test_employee.last_name - assert data[0]["department"] == test_employee.department - assert data[0]["office"] == test_employee.office - assert "password" not in data[0] - -def test_create_employee_unauthorized(test_db: Session): - """Test creating employee without authorization""" - employee_data = { - "first_name": "John", - "last_name": "Doe", - "department": "IT", - "office": "B205", - "password": "test123" - } - response = client.post("/api/employees/", json=employee_data) - assert response.status_code == 401 # Unauthorized - -def test_get_employees_unauthorized(test_db: Session): - """Test getting employees list without authorization""" - response = client.get("/api/employees/") - assert response.status_code == 401 # Unauthorized - -def test_get_employee_by_id(test_db: Session, test_employee, admin_auth_header): - """Test getting employee by ID""" +def test_get_employee_by_id(client: TestClient, admin_token: str, test_employee: Employee, db: Session): + """Тест получения сотрудника по ID.""" response = client.get( f"/api/employees/{test_employee.id}", - headers=admin_auth_header + headers={"Authorization": f"Bearer {admin_token}"} ) - assert response.status_code == 200 data = response.json() - assert data["first_name"] == test_employee.first_name - assert data["last_name"] == test_employee.last_name + assert data["email"] == test_employee.email + assert data["full_name"] == test_employee.full_name assert data["department"] == test_employee.department - assert data["office"] == test_employee.office - assert "password" not in data -def test_update_employee(test_db: Session, test_employee, admin_auth_header): - """Test updating employee data""" - update_data = { - "first_name": "Updated", - "last_name": "Name", - "department": "HR", - "office": "B202" - } - +def test_get_nonexistent_employee(client: TestClient, admin_token: str): + """Тест получения несуществующего сотрудника.""" + response = client.get( + "/api/employees/999", + headers={"Authorization": f"Bearer {admin_token}"} + ) + assert response.status_code == 404 + assert response.json()["detail"] == "Employee not found" + +def test_update_employee(client: TestClient, admin_token: str, test_employee: Employee, db: Session): + """Тест обновления данных сотрудника.""" response = client.put( f"/api/employees/{test_employee.id}", - json=update_data, - headers=admin_auth_header + headers={"Authorization": f"Bearer {admin_token}"}, + json={ + "email": "updated@example.com", + "full_name": "Updated Employee", + "department": "HR", + "is_active": True, + "is_admin": False + } ) - assert response.status_code == 200 data = response.json() - assert data["first_name"] == update_data["first_name"] - assert data["last_name"] == update_data["last_name"] - assert data["department"] == update_data["department"] - assert data["office"] == update_data["office"] - assert "password" not in data + assert data["email"] == "updated@example.com" + assert data["full_name"] == "Updated Employee" + assert data["department"] == "HR" -def test_delete_employee(test_db: Session, test_employee, admin_auth_header): - """Test deleting employee""" +def test_delete_employee(client: TestClient, admin_token: str, test_employee: Employee, db: Session): + """Тест удаления сотрудника.""" response = client.delete( f"/api/employees/{test_employee.id}", - headers=admin_auth_header + headers={"Authorization": f"Bearer {admin_token}"} ) - assert response.status_code == 200 - - # Verify employee is deleted - get_response = client.get( - f"/api/employees/{test_employee.id}", - headers=admin_auth_header + data = response.json() + assert data["email"] == test_employee.email + assert data["full_name"] == test_employee.full_name + assert data["department"] == test_employee.department + +def test_employee_me(client: TestClient, employee_token: str, test_employee: Employee, db: Session): + """Тест получения информации о текущем сотруднике.""" + response = client.get( + "/api/employees/me", + headers={"Authorization": f"Bearer {employee_token}"} ) - assert get_response.status_code == 404 \ No newline at end of file + assert response.status_code == 200 + data = response.json() + assert data["email"] == test_employee.email + assert data["full_name"] == test_employee.full_name + assert data["department"] == test_employee.department + +def test_update_me(client: TestClient, employee_token: str, test_employee: Employee, db: Session): + """Тест обновления информации о текущем сотруднике.""" + response = client.put( + "/api/employees/me", + headers={"Authorization": f"Bearer {employee_token}"}, + json={ + "full_name": "Updated Name", + "department": "Support" + } + ) + assert response.status_code == 200 + data = response.json() + assert data["full_name"] == "Updated Name" + assert data["email"] == test_employee.email + assert data["department"] == "Support" \ No newline at end of file diff --git a/backend/tests/test_requests.py b/backend/tests/test_requests.py index 6db788c..59cae4a 100644 --- a/backend/tests/test_requests.py +++ b/backend/tests/test_requests.py @@ -1,164 +1,168 @@ +"""Request tests.""" import pytest from fastapi.testclient import TestClient from sqlalchemy.orm import Session -from app.main import app -from app.models.request import RequestStatus, RequestPriority -from app.crud import requests -from app.schemas.request import RequestCreate +from app.models.employee import Employee +from app.models.request import Request -client = TestClient(app) - -def test_create_request(test_db: Session, test_employee, test_auth_header): - """Test creating a new request""" - request_data = { - "department": "IT", - "request_type": "hardware", - "description": "This is a test request", - "priority": RequestPriority.MEDIUM.value - } - +def test_create_request(client: TestClient, employee_token: str, db: Session): + """Тест создания заявки.""" response = client.post( - "/api/requests/", - json=request_data, - headers=test_auth_header + "/api/requests", + headers={"Authorization": f"Bearer {employee_token}"}, + json={ + "request_type": "support", + "description": "Test Description", + "priority": "medium" + } ) - - assert response.status_code == 200 + assert response.status_code == 201 data = response.json() - assert data["department"] == request_data["department"] - assert data["description"] == request_data["description"] - assert data["priority"] == request_data["priority"] - assert data["status"] == RequestStatus.NEW.value - assert "employee_id" in data + assert data["request_type"] == "support" + assert data["description"] == "Test Description" + assert data["priority"] == "medium" + assert data["status"] == "new" + assert "id" in data -def test_get_employee_requests(test_db: Session, test_employee, test_auth_header): - """Test getting employee's requests""" - # Создаем тестовую заявку - request_data = RequestCreate( - department="IT", - request_type="hardware", - description="This is a test request", - priority=RequestPriority.MEDIUM.value +def test_create_request_unauthorized(client: TestClient): + """Тест создания заявки без авторизации.""" + response = client.post( + "/api/requests", + json={ + "request_type": "support", + "description": "Test Description", + "priority": "medium" + } ) - test_request = requests.create_request(test_db, request_data, test_employee.id) - - response = client.get("/api/requests/my", headers=test_auth_header) - - assert response.status_code == 200 - data = response.json() - assert len(data) == 1 - assert data[0]["department"] == test_request.department - assert data[0]["description"] == test_request.description - assert data[0]["priority"] == test_request.priority - assert data[0]["status"] == test_request.status - assert data[0]["employee_id"] == test_request.employee_id + assert response.status_code == 401 + assert response.json()["detail"] == "Not authenticated" -def test_update_request_status(test_db: Session, test_employee, admin_auth_header): - """Test updating request status""" +def test_get_employee_requests(client: TestClient, employee_token: str, test_employee: Employee, db: Session): + """Тест получения списка заявок сотрудника.""" + db.add(test_employee) + db.commit() + db.refresh(test_employee) + # Создаем тестовую заявку - request_data = RequestCreate( - department="IT", - request_type="hardware", - description="This is a test request", - priority=RequestPriority.MEDIUM.value + request = Request( + request_type="support", + description="Test Description", + priority="medium", + status="new", + employee_id=test_employee.id ) - test_request = requests.create_request(test_db, request_data, test_employee.id) - - update_data = {"status": RequestStatus.IN_PROGRESS.value} - response = client.patch( - f"/api/requests/{test_request.id}/status", - json=update_data, - headers=admin_auth_header - ) - - assert response.status_code == 200 - data = response.json() - assert data["status"] == RequestStatus.IN_PROGRESS.value + db.add(request) + db.commit() -def test_get_all_requests_admin(test_db: Session, test_employee, admin_auth_header): - """Test getting all requests as admin""" - # Создаем тестовую заявку - request_data = RequestCreate( - department="IT", - request_type="hardware", - description="This is a test request", - priority=RequestPriority.MEDIUM.value - ) - test_request = requests.create_request(test_db, request_data, test_employee.id) - - response = client.get("/api/requests/admin", headers=admin_auth_header) - - assert response.status_code == 200 - data = response.json() - assert len(data) == 1 - assert data[0]["department"] == test_request.department - -def test_get_requests_by_status(test_db: Session, test_employee, admin_auth_header): - """Test filtering requests by status""" - # Создаем тестовую заявку - request_data = RequestCreate( - department="IT", - request_type="hardware", - description="This is a test request", - priority=RequestPriority.MEDIUM.value - ) - test_request = requests.create_request(test_db, request_data, test_employee.id) - response = client.get( - f"/api/requests/admin?status={RequestStatus.NEW.value}", - headers=admin_auth_header + "/api/requests/my", + headers={"Authorization": f"Bearer {employee_token}"} ) - assert response.status_code == 200 data = response.json() - assert len(data) == 1 - assert data[0]["status"] == RequestStatus.NEW.value + assert isinstance(data, list) + assert len(data) > 0 + assert data[0]["request_type"] == "support" + assert data[0]["description"] == "Test Description" -def test_get_request_statistics(test_db: Session, test_employee, admin_auth_header): - """Test getting request statistics""" +def test_admin_get_all_requests(client: TestClient, admin_token: str, test_employee: Employee, db: Session): + """Тест получения всех заявок администратором.""" + db.add(test_employee) + db.commit() + db.refresh(test_employee) + + # Создаем тестовую заявку + request = Request( + request_type="support", + description="Test Description", + priority="medium", + status="new", + employee_id=test_employee.id + ) + db.add(request) + db.commit() + + response = client.get( + "/api/requests/admin", + headers={"Authorization": f"Bearer {admin_token}"} + ) + assert response.status_code == 200 + data = response.json() + assert isinstance(data, list) + assert len(data) > 0 + assert data[0]["request_type"] == "support" + assert data[0]["description"] == "Test Description" + +def test_update_request_status(client: TestClient, admin_token: str, test_employee: Employee, db: Session): + """Тест обновления статуса заявки.""" + db.add(test_employee) + db.commit() + db.refresh(test_employee) + + # Создаем тестовую заявку + request = Request( + request_type="support", + description="Test Description", + priority="medium", + status="new", + employee_id=test_employee.id, + department=test_employee.department + ) + db.add(request) + db.commit() + + response = client.patch( + f"/api/requests/{request.id}/status", + headers={"Authorization": f"Bearer {admin_token}"}, + json={"status": "in_progress"} + ) + assert response.status_code == 200 + data = response.json() + assert data["status"] == "in_progress" + +def test_get_request_statistics(client: TestClient, admin_token: str, test_employee: Employee, db: Session): + """Тест получения статистики по заявкам.""" + db.add(test_employee) + db.commit() + db.refresh(test_employee) + # Создаем тестовые заявки с разными статусами - requests_data = [ - RequestCreate( - department="IT", - request_type="hardware", - description="Test request 1", - priority=RequestPriority.HIGH.value + requests = [ + Request( + request_type="support", + description="Test Description", + priority="medium", + status="new", + employee_id=test_employee.id, + department=test_employee.department ), - RequestCreate( - department="IT", - request_type="software", - description="Test request 2", - priority=RequestPriority.MEDIUM.value + Request( + request_type="support", + description="Test Description", + priority="high", + status="in_progress", + employee_id=test_employee.id, + department=test_employee.department + ), + Request( + request_type="support", + description="Test Description", + priority="low", + status="completed", + employee_id=test_employee.id, + department=test_employee.department ) ] - - for data in requests_data: - requests.create_request(test_db, data, test_employee.id) - - response = client.get("/api/requests/statistics", headers=admin_auth_header) - + for req in requests: + db.add(req) + db.commit() + + response = client.get( + "/api/statistics", + headers={"Authorization": f"Bearer {admin_token}"} + ) assert response.status_code == 200 data = response.json() assert "total" in data assert "by_status" in data - assert data["total"] == 2 - assert data["by_status"][RequestStatus.NEW.value] == 2 - assert data["by_status"][RequestStatus.IN_PROGRESS.value] == 0 - assert data["by_status"][RequestStatus.COMPLETED.value] == 0 - assert data["by_status"][RequestStatus.REJECTED.value] == 0 - -def test_create_request_unauthorized(test_db: Session): - """Test creating request without authorization""" - request_data = { - "department": "IT", - "request_type": "hardware", - "description": "This is a test request", - "priority": RequestPriority.MEDIUM.value - } - response = client.post("/api/requests/", json=request_data) - assert response.status_code == 401 - -def test_get_requests_unauthorized(test_db: Session): - """Test getting requests without authorization""" - response = client.get("/api/requests/my") - assert response.status_code == 401 \ No newline at end of file + assert data["total"] >= 3 \ No newline at end of file diff --git a/backend/tests/test_statistics.py b/backend/tests/test_statistics.py new file mode 100644 index 0000000..0519ecb --- /dev/null +++ b/backend/tests/test_statistics.py @@ -0,0 +1 @@ + \ No newline at end of file