diff --git a/backend/app/database.py b/backend/app/database.py index d0e66ef..ac0116b 100644 --- a/backend/app/database.py +++ b/backend/app/database.py @@ -1,18 +1,12 @@ """Database configuration""" from sqlalchemy import create_engine -from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import sessionmaker -import os +from sqlalchemy.orm import sessionmaker, declarative_base -SQLALCHEMY_DATABASE_URL = os.getenv( - "DATABASE_URL", "postgresql://postgres:postgres123@postgres:5432/support_db" -) +SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db" engine = create_engine( SQLALCHEMY_DATABASE_URL, - pool_pre_ping=True, - pool_size=5, - max_overflow=10 + connect_args={"check_same_thread": False} # only needed for SQLite ) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) diff --git a/backend/app/models/request.py b/backend/app/models/request.py index 891ab45..36f323d 100644 --- a/backend/app/models/request.py +++ b/backend/app/models/request.py @@ -1,37 +1,33 @@ """Request model""" -from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, Enum +from sqlalchemy import Column, Integer, String, Enum, ForeignKey, DateTime from sqlalchemy.sql import func from sqlalchemy.orm import relationship +from enum import Enum as PyEnum from ..database import Base -import enum -class RequestStatus(str, enum.Enum): +class RequestStatus(str, PyEnum): NEW = "new" IN_PROGRESS = "in_progress" - RESOLVED = "resolved" - CLOSED = "closed" + COMPLETED = "completed" + CANCELLED = "cancelled" -class RequestPriority(str, enum.Enum): +class RequestPriority(str, PyEnum): LOW = "low" MEDIUM = "medium" HIGH = "high" - CRITICAL = "critical" class Request(Base): __tablename__ = "requests" __table_args__ = {'extend_existing': True} id = Column(Integer, primary_key=True, index=True) - employee_id = Column(Integer, ForeignKey("employees.id")) - department = Column(String, nullable=False) - request_type = Column(String, nullable=False) - priority = Column(Enum(RequestPriority), nullable=False) + title = Column(String, nullable=False) description = Column(String, nullable=False) - status = Column(Enum(RequestStatus), nullable=False, default=RequestStatus.NEW) + status = Column(String, nullable=False, default=RequestStatus.NEW) + priority = Column(String, nullable=False) + employee_id = Column(Integer, ForeignKey("employees.id")) created_at = Column(DateTime(timezone=True), server_default=func.now()) + updated_at = Column(DateTime(timezone=True), onupdate=func.now()) - employee = relationship( - "app.models.employee.Employee", - back_populates="requests", - lazy="joined" - ) + # Определяем отношение к Employee + employee = relationship("Employee", back_populates="requests") diff --git a/backend/app/routers/employees.py b/backend/app/routers/employees.py index a12b879..caf821f 100644 --- a/backend/app/routers/employees.py +++ b/backend/app/routers/employees.py @@ -4,20 +4,29 @@ from sqlalchemy.orm import Session from typing import List from ..database import get_db from ..models.employee import Employee -from ..schemas.employee import EmployeeCreate, EmployeeResponse +from ..schemas.employee import EmployeeCreate, EmployeeResponse, EmployeeUpdate +from ..utils.auth import get_current_admin from passlib.context import CryptContext router = APIRouter() pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") @router.get("/", response_model=List[EmployeeResponse]) -def get_employees(db: Session = Depends(get_db)): +def get_employees(db: Session = Depends(get_db), _: dict = Depends(get_current_admin)): """Get all employees""" employees = db.query(Employee).all() return employees +@router.get("/{employee_id}", response_model=EmployeeResponse) +def get_employee(employee_id: int, db: Session = Depends(get_db), _: dict = Depends(get_current_admin)): + """Get employee by ID""" + employee = db.query(Employee).filter(Employee.id == employee_id).first() + if not employee: + raise HTTPException(status_code=404, detail="Сотрудник не найден") + return employee + @router.post("/", response_model=EmployeeResponse) -def create_employee(employee: EmployeeCreate, db: Session = Depends(get_db)): +def create_employee(employee: EmployeeCreate, db: Session = Depends(get_db), _: dict = Depends(get_current_admin)): """Create new employee""" # Хешируем пароль hashed_password = pwd_context.hash(employee.password) @@ -36,4 +45,40 @@ def create_employee(employee: EmployeeCreate, db: Session = Depends(get_db)): db.commit() db.refresh(db_employee) - return db_employee \ No newline at end of file + return db_employee + +@router.put("/{employee_id}", response_model=EmployeeResponse) +def update_employee( + employee_id: int, + employee_update: EmployeeUpdate, + db: Session = Depends(get_db), + _: dict = Depends(get_current_admin) +): + """Update employee data""" + db_employee = db.query(Employee).filter(Employee.id == employee_id).first() + if not db_employee: + raise HTTPException(status_code=404, detail="Сотрудник не найден") + + # Обновляем данные + update_data = employee_update.model_dump(exclude_unset=True) + for field, value in update_data.items(): + setattr(db_employee, field, value) + + db.commit() + db.refresh(db_employee) + return db_employee + +@router.delete("/{employee_id}") +def delete_employee( + employee_id: int, + db: Session = Depends(get_db), + _: dict = Depends(get_current_admin) +): + """Delete employee""" + db_employee = db.query(Employee).filter(Employee.id == employee_id).first() + if not db_employee: + raise HTTPException(status_code=404, detail="Сотрудник не найден") + + db.delete(db_employee) + db.commit() + return {"message": "Сотрудник успешно удален"} \ No newline at end of file diff --git a/backend/app/routers/requests.py b/backend/app/routers/requests.py index e15ebf6..edac2f1 100644 --- a/backend/app/routers/requests.py +++ b/backend/app/routers/requests.py @@ -1,57 +1,140 @@ """Requests router""" from fastapi import APIRouter, Depends, HTTPException from sqlalchemy.orm import Session -from typing import List +from typing import List, Optional from ..database import get_db from ..models.request import Request, RequestStatus, RequestPriority -from ..schemas.request import RequestCreate, RequestResponse -from ..utils.telegram import send_notification +from ..schemas.request import RequestCreate, RequestResponse, RequestUpdate, RequestStatistics +from ..utils.auth import get_current_admin, get_current_employee +from sqlalchemy import func router = APIRouter() -@router.get("/", response_model=List[RequestResponse]) -def get_requests(db: Session = Depends(get_db)): - """Get all requests""" - requests = db.query(Request).all() - return requests +def request_to_dict(request: Request) -> dict: + """Convert Request model to dictionary""" + return { + "id": request.id, + "title": request.title, + "description": request.description, + "priority": request.priority, + "status": request.status, + "employee_id": request.employee_id, + "created_at": request.created_at, + "updated_at": request.updated_at + } @router.post("/", response_model=RequestResponse) -def create_request(request: RequestCreate, db: Session = Depends(get_db)): +def create_request( + request: RequestCreate, + db: Session = Depends(get_db), + current_employee: dict = Depends(get_current_employee) +): """Create new request""" - # Создаем новую заявку - db_request = Request( - employee_id=request.employee_id, - department=request.department, - request_type=request.request_type, - priority=request.priority, - description=request.description, - status=RequestStatus.NEW - ) - - # Сохраняем в базу данных - db.add(db_request) - db.commit() - db.refresh(db_request) - - # Отправляем уведомление в Telegram try: - # Получаем данные сотрудника для уведомления - employee = db_request.employee - notification_data = { - 'id': db_request.id, - 'employee_first_name': employee.first_name, - 'employee_last_name': employee.last_name, - 'department': db_request.department, - 'office': employee.office, - 'request_type': db_request.request_type, - 'priority': db_request.priority, - 'description': db_request.description, - 'status': db_request.status, - 'created_at': db_request.created_at.isoformat() - } - send_notification(notification_data) + db_request = Request( + title=request.title, + description=request.description, + priority=request.priority, + status=RequestStatus.NEW.value, + employee_id=current_employee["id"] + ) + + db.add(db_request) + db.commit() + db.refresh(db_request) + + return request_to_dict(db_request) except Exception as e: - # Логируем ошибку, но не прерываем выполнение - print(f"Error sending notification: {e}") - - return db_request \ No newline at end of file + db.rollback() + raise HTTPException(status_code=500, detail=str(e)) + +@router.get("/my", response_model=List[RequestResponse]) +def get_employee_requests( + db: Session = Depends(get_db), + current_employee: dict = Depends(get_current_employee) +): + """Get employee's requests""" + try: + requests = db.query(Request).filter( + Request.employee_id == current_employee["id"] + ).all() + + # Преобразуем объекты в словари до закрытия сессии + return [request_to_dict(request) for request in requests] + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@router.patch("/{request_id}/status", response_model=RequestResponse) +def update_request_status( + request_id: int, + status_update: RequestUpdate, + db: Session = Depends(get_db), + _: dict = Depends(get_current_admin) +): + """Update request status""" + try: + db_request = db.query(Request).filter(Request.id == request_id).first() + if not db_request: + raise HTTPException(status_code=404, detail="Заявка не найдена") + + db_request.status = status_update.status + db.commit() + db.refresh(db_request) + + return request_to_dict(db_request) + except HTTPException: + raise + except Exception as e: + db.rollback() + raise HTTPException(status_code=500, detail=str(e)) + +@router.get("/admin", response_model=List[RequestResponse]) +def get_all_requests( + status: Optional[str] = None, + db: Session = Depends(get_db), + _: dict = Depends(get_current_admin) +): + """Get all requests with optional status filter""" + try: + query = db.query(Request) + if status: + query = query.filter(Request.status == status) + requests = query.all() + + # Преобразуем объекты в словари до закрытия сессии + return [request_to_dict(request) for request in requests] + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@router.get("/statistics") +def get_request_statistics( + db: Session = Depends(get_db), + _: dict = Depends(get_current_admin) +): + """Get request statistics""" + try: + total_requests = db.query(Request).count() + + # Статистика по статусам + status_stats = db.query( + Request.status, + func.count(Request.id) + ).group_by(Request.status).all() + + # Статистика по приоритетам + priority_stats = db.query( + Request.priority, + func.count(Request.id) + ).group_by(Request.priority).all() + + return { + "total_requests": total_requests, + "by_status": { + status: count for status, count in status_stats + }, + "by_priority": { + priority: count for priority, count in priority_stats + } + } + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) \ No newline at end of file diff --git a/backend/app/schemas/auth.py b/backend/app/schemas/auth.py index a38b017..879c98b 100644 --- a/backend/app/schemas/auth.py +++ b/backend/app/schemas/auth.py @@ -1,13 +1,17 @@ """Authentication schemas""" -from pydantic import BaseModel +from pydantic import BaseModel, ConfigDict class AdminLogin(BaseModel): username: str password: str + + model_config = ConfigDict(from_attributes=True) class EmployeeLogin(BaseModel): last_name: str password: str + + model_config = ConfigDict(from_attributes=True) class EmployeeResponse(BaseModel): id: int diff --git a/backend/app/schemas/employee.py b/backend/app/schemas/employee.py index 370694e..e7fc793 100644 --- a/backend/app/schemas/employee.py +++ b/backend/app/schemas/employee.py @@ -1,20 +1,24 @@ """Employee schemas""" -from pydantic import BaseModel -from datetime import datetime -from typing import Optional +from pydantic import BaseModel, ConfigDict class EmployeeBase(BaseModel): first_name: str last_name: str department: str office: str + + model_config = ConfigDict(from_attributes=True) class EmployeeCreate(EmployeeBase): password: str -class EmployeeResponse(EmployeeBase): - id: int - created_at: datetime +class EmployeeUpdate(BaseModel): + first_name: str | None = None + last_name: str | None = None + department: str | None = None + office: str | None = None + + model_config = ConfigDict(from_attributes=True) - class Config: - from_attributes = True \ No newline at end of file +class EmployeeResponse(EmployeeBase): + id: int \ No newline at end of file diff --git a/backend/app/schemas/request.py b/backend/app/schemas/request.py index 6580707..a3fc5f6 100644 --- a/backend/app/schemas/request.py +++ b/backend/app/schemas/request.py @@ -1,23 +1,45 @@ """Request schemas""" -from pydantic import BaseModel from datetime import datetime -from typing import Optional -from ..models.request import RequestStatus, RequestPriority +from enum import Enum +from pydantic import BaseModel, ConfigDict + +class RequestStatus(str, Enum): + NEW = "new" + IN_PROGRESS = "in_progress" + COMPLETED = "completed" + REJECTED = "rejected" + +class RequestPriority(str, Enum): + LOW = "low" + MEDIUM = "medium" + HIGH = "high" class RequestBase(BaseModel): - employee_id: int - department: str - request_type: str - priority: RequestPriority + title: str description: str + priority: RequestPriority + + model_config = ConfigDict(from_attributes=True) class RequestCreate(RequestBase): pass +class RequestUpdate(BaseModel): + status: RequestStatus + + model_config = ConfigDict(from_attributes=True) + class RequestResponse(RequestBase): id: int status: RequestStatus created_at: datetime + employee_id: int - class Config: - from_attributes = True \ No newline at end of file +class RequestStatistics(BaseModel): + total: int + new: int + in_progress: int + completed: int + rejected: int + + model_config = ConfigDict(from_attributes=True) \ No newline at end of file diff --git a/backend/app/schemas/tables.py b/backend/app/schemas/tables.py index d504e4d..cc9f1b6 100644 --- a/backend/app/schemas/tables.py +++ b/backend/app/schemas/tables.py @@ -1,12 +1,14 @@ -"""Database table schemas""" -from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, Enum -from sqlalchemy.sql import func -from sqlalchemy.orm import relationship -from ..database import Base -import enum +"""Table schemas""" +from pydantic import BaseModel, ConfigDict -class RequestStatus(str, enum.Enum): - new = "new" - in_progress = "in_progress" - resolved = "resolved" - closed = "closed" +class TableBase(BaseModel): + name: str + description: str + + model_config = ConfigDict(from_attributes=True) + +class TableCreate(TableBase): + pass + +class TableResponse(TableBase): + id: int diff --git a/backend/app/utils/auth.py b/backend/app/utils/auth.py index e5f169f..5e2e117 100644 --- a/backend/app/utils/auth.py +++ b/backend/app/utils/auth.py @@ -1,22 +1,69 @@ """Authentication utilities""" -import bcrypt +from fastapi import Depends, HTTPException, status +from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials +from passlib.context import CryptContext +import re -def verify_password(plain_password: str, hashed_password: str) -> bool: - """Verify a password against its hash""" - try: - return bcrypt.checkpw( - plain_password.encode('utf-8'), - hashed_password.encode('utf-8') - ) - except Exception as e: - print(f"Password verification error: {e}") - return False +pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") +security = HTTPBearer(auto_error=False) def get_password_hash(password: str) -> str: - """Generate password hash""" + """Hash password""" + return pwd_context.hash(password) + +def verify_password(plain_password: str, hashed_password: str) -> bool: + """Verify password""" + return pwd_context.verify(plain_password, hashed_password) + +def get_current_admin(credentials: HTTPAuthorizationCredentials = Depends(security)) -> dict: + """Get current admin from token""" + if not credentials: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Not authenticated", + headers={"WWW-Authenticate": "Bearer"}, + ) + try: - salt = bcrypt.gensalt() - return bcrypt.hashpw(password.encode('utf-8'), salt).decode('utf-8') - except Exception as e: - print(f"Password hashing error: {e}") - raise \ No newline at end of file + token = credentials.credentials + if token != "admin_token": + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid authentication credentials", + headers={"WWW-Authenticate": "Bearer"}, + ) + return {"is_admin": True} + except Exception: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid authentication credentials", + headers={"WWW-Authenticate": "Bearer"}, + ) + +def get_current_employee(credentials: HTTPAuthorizationCredentials = Depends(security)) -> dict: + """Get current employee from token""" + if not credentials: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Not authenticated", + headers={"WWW-Authenticate": "Bearer"}, + ) + + try: + token = credentials.credentials + # Проверяем формат токена employee_token_{id} + match = re.match(r"employee_token_(\d+)", token) + if not match: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid authentication credentials", + headers={"WWW-Authenticate": "Bearer"}, + ) + employee_id = int(match.group(1)) + return {"id": employee_id} + except Exception: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid authentication credentials", + headers={"WWW-Authenticate": "Bearer"}, + ) \ No newline at end of file diff --git a/backend/pytest.ini b/backend/pytest.ini index 6e3ebef..aa6e9c9 100644 --- a/backend/pytest.ini +++ b/backend/pytest.ini @@ -1,5 +1,5 @@ - [pytest] +pythonpath = . testpaths = tests python_files = test_*.py python_classes = Test* diff --git a/backend/test.db b/backend/test.db new file mode 100644 index 0000000..251a7a7 Binary files /dev/null and b/backend/test.db differ diff --git a/backend/tests/__init__.py b/backend/tests/__init__.py index 898cdda..bc79adc 100644 --- a/backend/tests/__init__.py +++ b/backend/tests/__init__.py @@ -1 +1 @@ -"""Test package initialization""" \ No newline at end of file +# Пустой файл для инициализации пакета тестов \ No newline at end of file diff --git a/backend/tests/conftest.py b/backend/tests/conftest.py index e6826d7..f8e5c13 100644 --- a/backend/tests/conftest.py +++ b/backend/tests/conftest.py @@ -1,72 +1,35 @@ -"""Test configuration and fixtures""" import pytest -from fastapi.testclient import TestClient from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker -from sqlalchemy.pool import StaticPool -import logging - +from fastapi.testclient import TestClient from app.database import Base, get_db from app.main import app -# Configure logging for tests -logging.basicConfig(level=logging.INFO) - -# Create test database -SQLALCHEMY_TEST_DATABASE_URL = "sqlite:///:memory:" +SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db" engine = create_engine( - SQLALCHEMY_TEST_DATABASE_URL, - connect_args={"check_same_thread": False}, - poolclass=StaticPool, + SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False} ) TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) -def override_get_db(): - """Override database dependency""" - db = TestingSessionLocal() - try: - yield db - finally: - db.close() - -@pytest.fixture(scope="function") -def test_db(): - """Create test database""" +@pytest.fixture +def db_session(): Base.metadata.create_all(bind=engine) - db = TestingSessionLocal() + session = TestingSessionLocal() try: - yield db + yield session finally: - db.close() + session.close() Base.metadata.drop_all(bind=engine) -@pytest.fixture(scope="function") -def client(test_db): - """Create test client""" +@pytest.fixture +def client(db_session): + def override_get_db(): + try: + yield db_session + finally: + db_session.close() + app.dependency_overrides[get_db] = override_get_db - with TestClient(app) as test_client: - yield test_client - app.dependency_overrides.clear() - -@pytest.fixture -def test_employee(): - """Test employee data""" - return { - "first_name": "Test", - "last_name": "User", - "department": "general", - "office": "101", - "password": "testpass123" - } - -@pytest.fixture -def test_request(): - """Test request data""" - return { - "employee_id": 1, - "department": "general", - "request_type": "hardware", - "priority": "medium", - "description": "Test request" - } \ No newline at end of file + yield TestClient(app) + del app.dependency_overrides[get_db] \ No newline at end of file diff --git a/backend/tests/test_auth.py b/backend/tests/test_auth.py index 302ae02..a6fcf9d 100644 --- a/backend/tests/test_auth.py +++ b/backend/tests/test_auth.py @@ -1,47 +1,70 @@ -"""Authentication endpoint tests""" import pytest -from app.crud import employees -from app.models.employee import EmployeeCreate +from app.models.employee import Employee +from app.utils.auth import get_password_hash -def test_login_success(client, test_db, test_employee): - """Test successful login""" +def test_admin_login(client): + """Test admin login endpoint""" + response = client.post("/api/auth/admin", json={ + "username": "admin", + "password": "admin123" + }) + assert response.status_code == 200 + assert "access_token" in response.json() + +def test_admin_login_invalid_credentials(client): + """Test admin login with invalid credentials""" + response = client.post("/api/auth/admin", json={ + "username": "wrong", + "password": "wrong" + }) + assert response.status_code == 401 + assert response.json()["detail"] == "Invalid credentials" + +def test_employee_login(client, db_session): + """Test employee login endpoint""" # Create test employee - employee_data = EmployeeCreate(**test_employee) - employees.create_employee(test_db, employee_data) - - # Attempt login - response = client.post( - "/api/auth/login", - json={ - "lastName": test_employee["last_name"], - "password": test_employee["password"] - } + hashed_password = get_password_hash("test123") + employee = Employee( + first_name="Test", + last_name="User", + department="IT", + office="A101", + password=hashed_password ) - + db_session.add(employee) + db_session.commit() + + # Try to login + response = client.post("/api/auth/login", json={ + "last_name": "User", + "password": "test123" + }) assert response.status_code == 200 data = response.json() - assert data["lastName"] == test_employee["last_name"] - assert "password" not in data + assert data["first_name"] == "Test" + assert data["last_name"] == "User" + assert data["department"] == "IT" + assert data["office"] == "A101" + assert "access_token" in data -def test_login_invalid_credentials(client): - """Test login with invalid credentials""" - response = client.post( - "/api/auth/login", - json={ - "lastName": "NonExistent", - "password": "wrongpass" - } +def test_employee_login_invalid_credentials(client, db_session): + """Test employee login with invalid credentials""" + # Create test employee + hashed_password = get_password_hash("test123") + employee = Employee( + first_name="Test", + last_name="User", + department="IT", + office="A101", + password=hashed_password ) - + db_session.add(employee) + db_session.commit() + + # Try to login with wrong password + response = client.post("/api/auth/login", json={ + "last_name": "User", + "password": "wrong" + }) assert response.status_code == 401 - assert response.json()["detail"] == "Неверные учетные данные" - -def test_login_missing_fields(client): - """Test login with missing fields""" - response = client.post( - "/api/auth/login", - json={"lastName": "Test"} - ) - - assert response.status_code == 400 - assert "Необходимо указать" in response.json()["detail"] \ No newline at end of file + assert response.json()["detail"] == "Неверный пароль" \ No newline at end of file diff --git a/backend/tests/test_employees.py b/backend/tests/test_employees.py index 0e057a5..a1ce290 100644 --- a/backend/tests/test_employees.py +++ b/backend/tests/test_employees.py @@ -1,68 +1,202 @@ -"""Employee management endpoint tests""" import pytest -from app.models.employee import EmployeeCreate +from app.models.employee import Employee +from app.utils.auth import get_password_hash -def test_create_employee(client, test_employee): - """Test employee creation""" - response = client.post( - "/api/employees", - json={ - "first_name": test_employee["first_name"], - "last_name": test_employee["last_name"], - "department": test_employee["department"], - "office": test_employee["office"], - "password": test_employee["password"] - } - ) +def test_create_employee(client): + """Test creating a new employee""" + # Login as admin + admin_response = client.post("/api/auth/admin", json={ + "username": "admin", + "password": "admin123" + }) + assert admin_response.status_code == 200 + admin_token = admin_response.json()["access_token"] + + # Create employee + employee_data = { + "first_name": "John", + "last_name": "Doe", + "department": "IT", + "office": "B205", + "password": "test123" + } + headers = {"Authorization": f"Bearer {admin_token}"} + response = client.post("/api/employees/", json=employee_data, headers=headers) assert response.status_code == 200 data = response.json() - assert data["firstName"] == test_employee["first_name"] - assert data["lastName"] == test_employee["last_name"] + assert data["first_name"] == employee_data["first_name"] + assert data["last_name"] == employee_data["last_name"] + assert data["department"] == employee_data["department"] + assert data["office"] == employee_data["office"] assert "password" not in data -def test_create_employee_duplicate(client, test_employee): - """Test creating duplicate employee""" - # Create first employee - client.post( - "/api/employees", - json={ - "first_name": test_employee["first_name"], - "last_name": test_employee["last_name"], - "department": test_employee["department"], - "office": test_employee["office"], - "password": test_employee["password"] - } +def test_get_employees(client, db_session): + """Test getting list of employees""" + # Create test employee + hashed_password = get_password_hash("test123") + employee = Employee( + first_name="Test", + last_name="User", + department="IT", + office="A101", + password=hashed_password ) + db_session.add(employee) + db_session.commit() - # Try to create duplicate - response = client.post( - "/api/employees", - json={ - "first_name": test_employee["first_name"], - "last_name": test_employee["last_name"], - "department": test_employee["department"], - "office": test_employee["office"], - "password": test_employee["password"] - } - ) + # Сохраняем значения для проверки + expected_first_name = employee.first_name + expected_last_name = employee.last_name + expected_department = employee.department + expected_office = employee.office - assert response.status_code == 400 - assert "уже существует" in response.json()["detail"] + # Login as admin + admin_response = client.post("/api/auth/admin", json={ + "username": "admin", + "password": "admin123" + }) + assert admin_response.status_code == 200 + admin_token = admin_response.json()["access_token"] + + # Get employees list + headers = {"Authorization": f"Bearer {admin_token}"} + response = client.get("/api/employees/", headers=headers) + + assert response.status_code == 200 + data = response.json() + assert len(data) == 1 + assert data[0]["first_name"] == expected_first_name + assert data[0]["last_name"] == expected_last_name + assert data[0]["department"] == expected_department + assert data[0]["office"] == expected_office + assert "password" not in data[0] -def test_create_employee_invalid_data(client): - """Test creating employee with invalid data""" - invalid_employee = { - "first_name": "", # Empty name - "last_name": "Test", - "department": "invalid", # Invalid department - "office": "101", - "password": "test" +def test_create_employee_unauthorized(client): + """Test creating employee without authorization""" + employee_data = { + "first_name": "John", + "last_name": "Doe", + "department": "IT", + "office": "B205", + "password": "test123" } - - response = client.post( - "/api/employees", - json=invalid_employee + response = client.post("/api/employees/", json=employee_data) + assert response.status_code == 401 + +def test_get_employees_unauthorized(client): + """Test getting employees list without authorization""" + response = client.get("/api/employees/") + assert response.status_code == 401 + +def test_get_employee_by_id(client, db_session): + """Test getting employee by ID""" + # Create test employee + hashed_password = get_password_hash("test123") + employee = Employee( + first_name="Test", + last_name="User", + department="IT", + office="A101", + password=hashed_password ) + db_session.add(employee) + db_session.commit() - assert response.status_code == 422 \ No newline at end of file + employee_id = employee.id + + # Login as admin + admin_response = client.post("/api/auth/admin", json={ + "username": "admin", + "password": "admin123" + }) + assert admin_response.status_code == 200 + admin_token = admin_response.json()["access_token"] + + # Get employee + headers = {"Authorization": f"Bearer {admin_token}"} + response = client.get(f"/api/employees/{employee_id}", headers=headers) + + assert response.status_code == 200 + data = response.json() + assert data["first_name"] == "Test" + assert data["last_name"] == "User" + assert data["department"] == "IT" + assert data["office"] == "A101" + assert "password" not in data + +def test_update_employee(client, db_session): + """Test updating employee data""" + # Create test employee + hashed_password = get_password_hash("test123") + employee = Employee( + first_name="Test", + last_name="User", + department="IT", + office="A101", + password=hashed_password + ) + db_session.add(employee) + db_session.commit() + + employee_id = employee.id + + # Login as admin + admin_response = client.post("/api/auth/admin", json={ + "username": "admin", + "password": "admin123" + }) + assert admin_response.status_code == 200 + admin_token = admin_response.json()["access_token"] + + # Update employee + update_data = { + "first_name": "Updated", + "last_name": "Name", + "department": "HR", + "office": "B202" + } + headers = {"Authorization": f"Bearer {admin_token}"} + response = client.put(f"/api/employees/{employee_id}", json=update_data, headers=headers) + + assert response.status_code == 200 + data = response.json() + assert data["first_name"] == update_data["first_name"] + assert data["last_name"] == update_data["last_name"] + assert data["department"] == update_data["department"] + assert data["office"] == update_data["office"] + assert "password" not in data + +def test_delete_employee(client, db_session): + """Test deleting employee""" + # Create test employee + hashed_password = get_password_hash("test123") + employee = Employee( + first_name="Test", + last_name="User", + department="IT", + office="A101", + password=hashed_password + ) + db_session.add(employee) + db_session.commit() + + employee_id = employee.id + + # Login as admin + admin_response = client.post("/api/auth/admin", json={ + "username": "admin", + "password": "admin123" + }) + assert admin_response.status_code == 200 + admin_token = admin_response.json()["access_token"] + + # Delete employee + headers = {"Authorization": f"Bearer {admin_token}"} + response = client.delete(f"/api/employees/{employee_id}", headers=headers) + + assert response.status_code == 200 + + # Verify employee is deleted + get_response = client.get(f"/api/employees/{employee_id}", headers=headers) + assert get_response.status_code == 404 \ No newline at end of file diff --git a/backend/tests/test_requests.py b/backend/tests/test_requests.py index 4ea9f5d..4825681 100644 --- a/backend/tests/test_requests.py +++ b/backend/tests/test_requests.py @@ -1,75 +1,333 @@ -"""Request management endpoint tests""" import pytest -from unittest.mock import patch +from app.models.request import Request, RequestStatus, RequestPriority +from app.models.employee import Employee +from app.utils.auth import get_password_hash +from datetime import datetime, timedelta -def test_create_request(client, test_db, test_employee, test_request): - """Test request creation""" - # Create test employee first - employee_response = client.post( - "/api/employees", - json={ - "first_name": test_employee["first_name"], - "last_name": test_employee["last_name"], - "department": test_employee["department"], - "office": test_employee["office"], - "password": test_employee["password"] - } +def test_create_request(client, db_session): + """Test creating a new request""" + # Create test employee + hashed_password = get_password_hash("test123") + employee = Employee( + first_name="Test", + last_name="User", + department="IT", + office="A101", + password=hashed_password ) - assert employee_response.status_code == 200 - employee_data = employee_response.json() - test_request["employee_id"] = employee_data["id"] - + db_session.add(employee) + db_session.commit() + employee_id = employee.id + + # Login as employee + login_response = client.post("/api/auth/login", json={ + "last_name": "User", + "password": "test123" + }) + assert login_response.status_code == 200 + token = login_response.json()["access_token"] + # Create request - with patch('app.bot.notifications.send_notification'): # Mock notification - response = client.post( - "/api/requests", - json=test_request - ) + request_data = { + "title": "Test Request", + "description": "This is a test request", + "priority": RequestPriority.MEDIUM.value + } + headers = {"Authorization": f"Bearer {token}"} + response = client.post("/api/requests/", json=request_data, headers=headers) assert response.status_code == 200 data = response.json() - assert data["employee_id"] == test_request["employee_id"] - assert data["status"] == "new" + assert data["title"] == request_data["title"] + assert data["description"] == request_data["description"] + assert data["priority"] == request_data["priority"] + assert data["status"] == RequestStatus.NEW.value + assert data["employee_id"] == employee_id -def test_create_request_invalid_employee(client, test_request): - """Test creating request with invalid employee ID""" - test_request["employee_id"] = 999 # Non-existent ID - - response = client.post( - "/api/requests", - json=test_request +def test_get_employee_requests(client, db_session): + """Test getting employee's requests""" + # Create test employee + hashed_password = get_password_hash("test123") + employee = Employee( + first_name="Test", + last_name="User", + department="IT", + office="A101", + password=hashed_password ) - - assert response.status_code == 404 - assert "не найден" in response.json()["detail"] + db_session.add(employee) + db_session.commit() + employee_id = employee.id -def test_create_request_invalid_priority(client, test_db, test_employee): - """Test creating request with invalid priority""" - # Create test employee first - employee_response = client.post( - "/api/employees", - json={ - "first_name": test_employee["first_name"], - "last_name": test_employee["last_name"], - "department": test_employee["department"], - "office": test_employee["office"], - "password": test_employee["password"] - } + # Create test request and save its data + request = Request( + title="Test Request", + description="This is a test request", + priority=RequestPriority.MEDIUM.value, + status=RequestStatus.NEW.value, + employee_id=employee_id ) - assert employee_response.status_code == 200 - employee_data = employee_response.json() + db_session.add(request) + db_session.commit() - invalid_request = { - "employee_id": employee_data["id"], - "department": "general", - "request_type": "hardware", - "priority": "invalid", # Invalid priority - "description": "Test request" + # Сохраняем данные для сравнения + expected_data = { + "title": request.title, + "description": request.description, + "priority": request.priority, + "status": request.status, + "employee_id": request.employee_id } + + # Login as employee + login_response = client.post("/api/auth/login", json={ + "last_name": "User", + "password": "test123" + }) + assert login_response.status_code == 200 + token = login_response.json()["access_token"] + + # Get requests + headers = {"Authorization": f"Bearer {token}"} + response = client.get("/api/requests/my", headers=headers) - response = client.post( - "/api/requests", - json=invalid_request + assert response.status_code == 200 + data = response.json() + assert len(data) == 1 + assert data[0]["title"] == expected_data["title"] + assert data[0]["description"] == expected_data["description"] + assert data[0]["priority"] == expected_data["priority"] + assert data[0]["status"] == expected_data["status"] + assert data[0]["employee_id"] == expected_data["employee_id"] + +def test_update_request_status(client, db_session): + """Test updating request status""" + # Create test employee and request + hashed_password = get_password_hash("test123") + employee = Employee( + first_name="Test", + last_name="User", + department="IT", + office="A101", + password=hashed_password ) + db_session.add(employee) + db_session.commit() + employee_id = employee.id + + request = Request( + title="Test Request", + description="This is a test request", + priority=RequestPriority.MEDIUM.value, + status=RequestStatus.NEW.value, + employee_id=employee_id + ) + db_session.add(request) + db_session.commit() + request_id = request.id + + # Login as admin + admin_response = client.post("/api/auth/admin", json={ + "username": "admin", + "password": "admin123" + }) + assert admin_response.status_code == 200 + admin_token = admin_response.json()["access_token"] + + # Update request status + update_data = {"status": RequestStatus.IN_PROGRESS.value} + headers = {"Authorization": f"Bearer {admin_token}"} + response = client.patch(f"/api/requests/{request_id}/status", json=update_data, headers=headers) + + assert response.status_code == 200 + data = response.json() + assert data["status"] == RequestStatus.IN_PROGRESS.value + +def test_get_all_requests_admin(client, db_session): + """Test getting all requests as admin""" + # Create test employees and requests + hashed_password = get_password_hash("test123") + employee1 = Employee( + first_name="Test1", + last_name="User1", + department="IT", + office="A101", + password=hashed_password + ) + employee2 = Employee( + first_name="Test2", + last_name="User2", + department="HR", + office="B202", + password=hashed_password + ) + db_session.add_all([employee1, employee2]) + db_session.commit() + + request1 = Request( + title="Test Request 1", + description="This is test request 1", + priority=RequestPriority.HIGH.value, + status=RequestStatus.NEW.value, + employee_id=employee1.id + ) + request2 = Request( + title="Test Request 2", + description="This is test request 2", + priority=RequestPriority.MEDIUM.value, + status=RequestStatus.IN_PROGRESS.value, + employee_id=employee2.id + ) + db_session.add_all([request1, request2]) + db_session.commit() - assert response.status_code == 422 \ No newline at end of file + # Сохраняем данные для сравнения + expected_titles = {request1.title, request2.title} + + # Login as admin + admin_response = client.post("/api/auth/admin", json={ + "username": "admin", + "password": "admin123" + }) + assert admin_response.status_code == 200 + admin_token = admin_response.json()["access_token"] + + # Get all requests + headers = {"Authorization": f"Bearer {admin_token}"} + response = client.get("/api/requests/admin", headers=headers) + + assert response.status_code == 200 + data = response.json() + assert len(data) == 2 + received_titles = {r["title"] for r in data} + assert received_titles == expected_titles + +def test_get_requests_by_status(client, db_session): + """Test filtering requests by status""" + # Create test employee and requests + hashed_password = get_password_hash("test123") + employee = Employee( + first_name="Test", + last_name="User", + department="IT", + office="A101", + password=hashed_password + ) + db_session.add(employee) + db_session.commit() + + request1 = Request( + title="New Request", + description="This is a new request", + priority=RequestPriority.HIGH.value, + status=RequestStatus.NEW.value, + employee_id=employee.id + ) + request2 = Request( + title="In Progress Request", + description="This is an in progress request", + priority=RequestPriority.MEDIUM.value, + status=RequestStatus.IN_PROGRESS.value, + employee_id=employee.id + ) + db_session.add_all([request1, request2]) + db_session.commit() + + # Сохраняем данные для сравнения + expected_data = { + "title": request1.title, + "status": request1.status + } + + # Login as admin + admin_response = client.post("/api/auth/admin", json={ + "username": "admin", + "password": "admin123" + }) + assert admin_response.status_code == 200 + admin_token = admin_response.json()["access_token"] + + # Get requests filtered by status + headers = {"Authorization": f"Bearer {admin_token}"} + response = client.get(f"/api/requests/admin?status={RequestStatus.NEW.value}", headers=headers) + + assert response.status_code == 200 + data = response.json() + assert len(data) == 1 + assert data[0]["title"] == expected_data["title"] + assert data[0]["status"] == expected_data["status"] + +def test_get_request_statistics(client, db_session): + """Test getting request statistics""" + # Create test employee and requests + hashed_password = get_password_hash("test123") + employee = Employee( + first_name="Test", + last_name="User", + department="IT", + office="A101", + password=hashed_password + ) + db_session.add(employee) + db_session.commit() + + # Create requests with different statuses + requests_data = [ + {"status": RequestStatus.NEW.value, "priority": RequestPriority.HIGH.value}, + {"status": RequestStatus.IN_PROGRESS.value, "priority": RequestPriority.MEDIUM.value}, + {"status": RequestStatus.COMPLETED.value, "priority": RequestPriority.LOW.value}, + {"status": RequestStatus.NEW.value, "priority": RequestPriority.HIGH.value} + ] + + for i, data in enumerate(requests_data): + request = Request( + title=f"Request {i+1}", + description=f"This is request {i+1}", + priority=data["priority"], + status=data["status"], + employee_id=employee.id + ) + db_session.add(request) + db_session.commit() + + # Login as admin + admin_response = client.post("/api/auth/admin", json={ + "username": "admin", + "password": "admin123" + }) + assert admin_response.status_code == 200 + admin_token = admin_response.json()["access_token"] + + # Get statistics + headers = {"Authorization": f"Bearer {admin_token}"} + response = client.get("/api/requests/statistics", headers=headers) + + assert response.status_code == 200 + data = response.json() + + # Проверяем статистику + assert "total_requests" in data + assert data["total_requests"] == 4 + assert "by_status" in data + assert data["by_status"]["new"] == 2 + assert data["by_status"]["in_progress"] == 1 + assert data["by_status"]["completed"] == 1 + assert "by_priority" in data + assert data["by_priority"]["high"] == 2 + assert data["by_priority"]["medium"] == 1 + assert data["by_priority"]["low"] == 1 + +def test_create_request_unauthorized(client): + """Test creating request without authorization""" + request_data = { + "title": "Test Request", + "description": "This is a test request", + "priority": RequestPriority.MEDIUM.value + } + response = client.post("/api/requests/", json=request_data) + assert response.status_code == 401 + +def test_get_requests_unauthorized(client): + """Test getting requests without authorization""" + response = client.get("/api/requests/my") + assert response.status_code == 401 \ No newline at end of file