1
0
mirror of https://gitlab.com/MoonTestUse1/AdministrationItDepartmens.git synced 2025-08-14 00:25:46 +02:00

Добавления тестов бекенда

This commit is contained in:
MoonTestUse1
2025-01-02 04:30:51 +06:00
parent 7729acaa09
commit 01677cf5df
16 changed files with 903 additions and 328 deletions

View File

@@ -1,18 +1,12 @@
"""Database configuration"""
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import os
from sqlalchemy.orm import sessionmaker, declarative_base
SQLALCHEMY_DATABASE_URL = os.getenv(
"DATABASE_URL", "postgresql://postgres:postgres123@postgres:5432/support_db"
)
SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
pool_pre_ping=True,
pool_size=5,
max_overflow=10
connect_args={"check_same_thread": False} # only needed for SQLite
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

View File

@@ -1,37 +1,33 @@
"""Request model"""
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, Enum
from sqlalchemy import Column, Integer, String, Enum, ForeignKey, DateTime
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from enum import Enum as PyEnum
from ..database import Base
import enum
class RequestStatus(str, enum.Enum):
class RequestStatus(str, PyEnum):
NEW = "new"
IN_PROGRESS = "in_progress"
RESOLVED = "resolved"
CLOSED = "closed"
COMPLETED = "completed"
CANCELLED = "cancelled"
class RequestPriority(str, enum.Enum):
class RequestPriority(str, PyEnum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class Request(Base):
__tablename__ = "requests"
__table_args__ = {'extend_existing': True}
id = Column(Integer, primary_key=True, index=True)
employee_id = Column(Integer, ForeignKey("employees.id"))
department = Column(String, nullable=False)
request_type = Column(String, nullable=False)
priority = Column(Enum(RequestPriority), nullable=False)
title = Column(String, nullable=False)
description = Column(String, nullable=False)
status = Column(Enum(RequestStatus), nullable=False, default=RequestStatus.NEW)
status = Column(String, nullable=False, default=RequestStatus.NEW)
priority = Column(String, nullable=False)
employee_id = Column(Integer, ForeignKey("employees.id"))
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
employee = relationship(
"app.models.employee.Employee",
back_populates="requests",
lazy="joined"
)
# Определяем отношение к Employee
employee = relationship("Employee", back_populates="requests")

View File

@@ -4,20 +4,29 @@ from sqlalchemy.orm import Session
from typing import List
from ..database import get_db
from ..models.employee import Employee
from ..schemas.employee import EmployeeCreate, EmployeeResponse
from ..schemas.employee import EmployeeCreate, EmployeeResponse, EmployeeUpdate
from ..utils.auth import get_current_admin
from passlib.context import CryptContext
router = APIRouter()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
@router.get("/", response_model=List[EmployeeResponse])
def get_employees(db: Session = Depends(get_db)):
def get_employees(db: Session = Depends(get_db), _: dict = Depends(get_current_admin)):
"""Get all employees"""
employees = db.query(Employee).all()
return employees
@router.get("/{employee_id}", response_model=EmployeeResponse)
def get_employee(employee_id: int, db: Session = Depends(get_db), _: dict = Depends(get_current_admin)):
"""Get employee by ID"""
employee = db.query(Employee).filter(Employee.id == employee_id).first()
if not employee:
raise HTTPException(status_code=404, detail="Сотрудник не найден")
return employee
@router.post("/", response_model=EmployeeResponse)
def create_employee(employee: EmployeeCreate, db: Session = Depends(get_db)):
def create_employee(employee: EmployeeCreate, db: Session = Depends(get_db), _: dict = Depends(get_current_admin)):
"""Create new employee"""
# Хешируем пароль
hashed_password = pwd_context.hash(employee.password)
@@ -36,4 +45,40 @@ def create_employee(employee: EmployeeCreate, db: Session = Depends(get_db)):
db.commit()
db.refresh(db_employee)
return db_employee
return db_employee
@router.put("/{employee_id}", response_model=EmployeeResponse)
def update_employee(
employee_id: int,
employee_update: EmployeeUpdate,
db: Session = Depends(get_db),
_: dict = Depends(get_current_admin)
):
"""Update employee data"""
db_employee = db.query(Employee).filter(Employee.id == employee_id).first()
if not db_employee:
raise HTTPException(status_code=404, detail="Сотрудник не найден")
# Обновляем данные
update_data = employee_update.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(db_employee, field, value)
db.commit()
db.refresh(db_employee)
return db_employee
@router.delete("/{employee_id}")
def delete_employee(
employee_id: int,
db: Session = Depends(get_db),
_: dict = Depends(get_current_admin)
):
"""Delete employee"""
db_employee = db.query(Employee).filter(Employee.id == employee_id).first()
if not db_employee:
raise HTTPException(status_code=404, detail="Сотрудник не найден")
db.delete(db_employee)
db.commit()
return {"message": "Сотрудник успешно удален"}

View File

@@ -1,57 +1,140 @@
"""Requests router"""
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List
from typing import List, Optional
from ..database import get_db
from ..models.request import Request, RequestStatus, RequestPriority
from ..schemas.request import RequestCreate, RequestResponse
from ..utils.telegram import send_notification
from ..schemas.request import RequestCreate, RequestResponse, RequestUpdate, RequestStatistics
from ..utils.auth import get_current_admin, get_current_employee
from sqlalchemy import func
router = APIRouter()
@router.get("/", response_model=List[RequestResponse])
def get_requests(db: Session = Depends(get_db)):
"""Get all requests"""
requests = db.query(Request).all()
return requests
def request_to_dict(request: Request) -> dict:
"""Convert Request model to dictionary"""
return {
"id": request.id,
"title": request.title,
"description": request.description,
"priority": request.priority,
"status": request.status,
"employee_id": request.employee_id,
"created_at": request.created_at,
"updated_at": request.updated_at
}
@router.post("/", response_model=RequestResponse)
def create_request(request: RequestCreate, db: Session = Depends(get_db)):
def create_request(
request: RequestCreate,
db: Session = Depends(get_db),
current_employee: dict = Depends(get_current_employee)
):
"""Create new request"""
# Создаем новую заявку
db_request = Request(
employee_id=request.employee_id,
department=request.department,
request_type=request.request_type,
priority=request.priority,
description=request.description,
status=RequestStatus.NEW
)
# Сохраняем в базу данных
db.add(db_request)
db.commit()
db.refresh(db_request)
# Отправляем уведомление в Telegram
try:
# Получаем данные сотрудника для уведомления
employee = db_request.employee
notification_data = {
'id': db_request.id,
'employee_first_name': employee.first_name,
'employee_last_name': employee.last_name,
'department': db_request.department,
'office': employee.office,
'request_type': db_request.request_type,
'priority': db_request.priority,
'description': db_request.description,
'status': db_request.status,
'created_at': db_request.created_at.isoformat()
}
send_notification(notification_data)
db_request = Request(
title=request.title,
description=request.description,
priority=request.priority,
status=RequestStatus.NEW.value,
employee_id=current_employee["id"]
)
db.add(db_request)
db.commit()
db.refresh(db_request)
return request_to_dict(db_request)
except Exception as e:
# Логируем ошибку, но не прерываем выполнение
print(f"Error sending notification: {e}")
return db_request
db.rollback()
raise HTTPException(status_code=500, detail=str(e))
@router.get("/my", response_model=List[RequestResponse])
def get_employee_requests(
db: Session = Depends(get_db),
current_employee: dict = Depends(get_current_employee)
):
"""Get employee's requests"""
try:
requests = db.query(Request).filter(
Request.employee_id == current_employee["id"]
).all()
# Преобразуем объекты в словари до закрытия сессии
return [request_to_dict(request) for request in requests]
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.patch("/{request_id}/status", response_model=RequestResponse)
def update_request_status(
request_id: int,
status_update: RequestUpdate,
db: Session = Depends(get_db),
_: dict = Depends(get_current_admin)
):
"""Update request status"""
try:
db_request = db.query(Request).filter(Request.id == request_id).first()
if not db_request:
raise HTTPException(status_code=404, detail="Заявка не найдена")
db_request.status = status_update.status
db.commit()
db.refresh(db_request)
return request_to_dict(db_request)
except HTTPException:
raise
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=str(e))
@router.get("/admin", response_model=List[RequestResponse])
def get_all_requests(
status: Optional[str] = None,
db: Session = Depends(get_db),
_: dict = Depends(get_current_admin)
):
"""Get all requests with optional status filter"""
try:
query = db.query(Request)
if status:
query = query.filter(Request.status == status)
requests = query.all()
# Преобразуем объекты в словари до закрытия сессии
return [request_to_dict(request) for request in requests]
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/statistics")
def get_request_statistics(
db: Session = Depends(get_db),
_: dict = Depends(get_current_admin)
):
"""Get request statistics"""
try:
total_requests = db.query(Request).count()
# Статистика по статусам
status_stats = db.query(
Request.status,
func.count(Request.id)
).group_by(Request.status).all()
# Статистика по приоритетам
priority_stats = db.query(
Request.priority,
func.count(Request.id)
).group_by(Request.priority).all()
return {
"total_requests": total_requests,
"by_status": {
status: count for status, count in status_stats
},
"by_priority": {
priority: count for priority, count in priority_stats
}
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

View File

@@ -1,13 +1,17 @@
"""Authentication schemas"""
from pydantic import BaseModel
from pydantic import BaseModel, ConfigDict
class AdminLogin(BaseModel):
username: str
password: str
model_config = ConfigDict(from_attributes=True)
class EmployeeLogin(BaseModel):
last_name: str
password: str
model_config = ConfigDict(from_attributes=True)
class EmployeeResponse(BaseModel):
id: int

View File

@@ -1,20 +1,24 @@
"""Employee schemas"""
from pydantic import BaseModel
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, ConfigDict
class EmployeeBase(BaseModel):
first_name: str
last_name: str
department: str
office: str
model_config = ConfigDict(from_attributes=True)
class EmployeeCreate(EmployeeBase):
password: str
class EmployeeResponse(EmployeeBase):
id: int
created_at: datetime
class EmployeeUpdate(BaseModel):
first_name: str | None = None
last_name: str | None = None
department: str | None = None
office: str | None = None
model_config = ConfigDict(from_attributes=True)
class Config:
from_attributes = True
class EmployeeResponse(EmployeeBase):
id: int

View File

@@ -1,23 +1,45 @@
"""Request schemas"""
from pydantic import BaseModel
from datetime import datetime
from typing import Optional
from ..models.request import RequestStatus, RequestPriority
from enum import Enum
from pydantic import BaseModel, ConfigDict
class RequestStatus(str, Enum):
NEW = "new"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
REJECTED = "rejected"
class RequestPriority(str, Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
class RequestBase(BaseModel):
employee_id: int
department: str
request_type: str
priority: RequestPriority
title: str
description: str
priority: RequestPriority
model_config = ConfigDict(from_attributes=True)
class RequestCreate(RequestBase):
pass
class RequestUpdate(BaseModel):
status: RequestStatus
model_config = ConfigDict(from_attributes=True)
class RequestResponse(RequestBase):
id: int
status: RequestStatus
created_at: datetime
employee_id: int
class Config:
from_attributes = True
class RequestStatistics(BaseModel):
total: int
new: int
in_progress: int
completed: int
rejected: int
model_config = ConfigDict(from_attributes=True)

View File

@@ -1,12 +1,14 @@
"""Database table schemas"""
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, Enum
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from ..database import Base
import enum
"""Table schemas"""
from pydantic import BaseModel, ConfigDict
class RequestStatus(str, enum.Enum):
new = "new"
in_progress = "in_progress"
resolved = "resolved"
closed = "closed"
class TableBase(BaseModel):
name: str
description: str
model_config = ConfigDict(from_attributes=True)
class TableCreate(TableBase):
pass
class TableResponse(TableBase):
id: int

View File

@@ -1,22 +1,69 @@
"""Authentication utilities"""
import bcrypt
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from passlib.context import CryptContext
import re
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash"""
try:
return bcrypt.checkpw(
plain_password.encode('utf-8'),
hashed_password.encode('utf-8')
)
except Exception as e:
print(f"Password verification error: {e}")
return False
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
security = HTTPBearer(auto_error=False)
def get_password_hash(password: str) -> str:
"""Generate password hash"""
"""Hash password"""
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify password"""
return pwd_context.verify(plain_password, hashed_password)
def get_current_admin(credentials: HTTPAuthorizationCredentials = Depends(security)) -> dict:
"""Get current admin from token"""
if not credentials:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
try:
salt = bcrypt.gensalt()
return bcrypt.hashpw(password.encode('utf-8'), salt).decode('utf-8')
except Exception as e:
print(f"Password hashing error: {e}")
raise
token = credentials.credentials
if token != "admin_token":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return {"is_admin": True}
except Exception:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
def get_current_employee(credentials: HTTPAuthorizationCredentials = Depends(security)) -> dict:
"""Get current employee from token"""
if not credentials:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
try:
token = credentials.credentials
# Проверяем формат токена employee_token_{id}
match = re.match(r"employee_token_(\d+)", token)
if not match:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
employee_id = int(match.group(1))
return {"id": employee_id}
except Exception:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)

View File

@@ -1,5 +1,5 @@
[pytest]
pythonpath = .
testpaths = tests
python_files = test_*.py
python_classes = Test*

BIN
backend/test.db Normal file

Binary file not shown.

View File

@@ -1 +1 @@
"""Test package initialization"""
# Пустой файл для инициализации пакета тестов

View File

@@ -1,72 +1,35 @@
"""Test configuration and fixtures"""
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
import logging
from fastapi.testclient import TestClient
from app.database import Base, get_db
from app.main import app
# Configure logging for tests
logging.basicConfig(level=logging.INFO)
# Create test database
SQLALCHEMY_TEST_DATABASE_URL = "sqlite:///:memory:"
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(
SQLALCHEMY_TEST_DATABASE_URL,
connect_args={"check_same_thread": False},
poolclass=StaticPool,
SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def override_get_db():
"""Override database dependency"""
db = TestingSessionLocal()
try:
yield db
finally:
db.close()
@pytest.fixture(scope="function")
def test_db():
"""Create test database"""
@pytest.fixture
def db_session():
Base.metadata.create_all(bind=engine)
db = TestingSessionLocal()
session = TestingSessionLocal()
try:
yield db
yield session
finally:
db.close()
session.close()
Base.metadata.drop_all(bind=engine)
@pytest.fixture(scope="function")
def client(test_db):
"""Create test client"""
@pytest.fixture
def client(db_session):
def override_get_db():
try:
yield db_session
finally:
db_session.close()
app.dependency_overrides[get_db] = override_get_db
with TestClient(app) as test_client:
yield test_client
app.dependency_overrides.clear()
@pytest.fixture
def test_employee():
"""Test employee data"""
return {
"first_name": "Test",
"last_name": "User",
"department": "general",
"office": "101",
"password": "testpass123"
}
@pytest.fixture
def test_request():
"""Test request data"""
return {
"employee_id": 1,
"department": "general",
"request_type": "hardware",
"priority": "medium",
"description": "Test request"
}
yield TestClient(app)
del app.dependency_overrides[get_db]

View File

@@ -1,47 +1,70 @@
"""Authentication endpoint tests"""
import pytest
from app.crud import employees
from app.models.employee import EmployeeCreate
from app.models.employee import Employee
from app.utils.auth import get_password_hash
def test_login_success(client, test_db, test_employee):
"""Test successful login"""
def test_admin_login(client):
"""Test admin login endpoint"""
response = client.post("/api/auth/admin", json={
"username": "admin",
"password": "admin123"
})
assert response.status_code == 200
assert "access_token" in response.json()
def test_admin_login_invalid_credentials(client):
"""Test admin login with invalid credentials"""
response = client.post("/api/auth/admin", json={
"username": "wrong",
"password": "wrong"
})
assert response.status_code == 401
assert response.json()["detail"] == "Invalid credentials"
def test_employee_login(client, db_session):
"""Test employee login endpoint"""
# Create test employee
employee_data = EmployeeCreate(**test_employee)
employees.create_employee(test_db, employee_data)
# Attempt login
response = client.post(
"/api/auth/login",
json={
"lastName": test_employee["last_name"],
"password": test_employee["password"]
}
hashed_password = get_password_hash("test123")
employee = Employee(
first_name="Test",
last_name="User",
department="IT",
office="A101",
password=hashed_password
)
db_session.add(employee)
db_session.commit()
# Try to login
response = client.post("/api/auth/login", json={
"last_name": "User",
"password": "test123"
})
assert response.status_code == 200
data = response.json()
assert data["lastName"] == test_employee["last_name"]
assert "password" not in data
assert data["first_name"] == "Test"
assert data["last_name"] == "User"
assert data["department"] == "IT"
assert data["office"] == "A101"
assert "access_token" in data
def test_login_invalid_credentials(client):
"""Test login with invalid credentials"""
response = client.post(
"/api/auth/login",
json={
"lastName": "NonExistent",
"password": "wrongpass"
}
def test_employee_login_invalid_credentials(client, db_session):
"""Test employee login with invalid credentials"""
# Create test employee
hashed_password = get_password_hash("test123")
employee = Employee(
first_name="Test",
last_name="User",
department="IT",
office="A101",
password=hashed_password
)
db_session.add(employee)
db_session.commit()
# Try to login with wrong password
response = client.post("/api/auth/login", json={
"last_name": "User",
"password": "wrong"
})
assert response.status_code == 401
assert response.json()["detail"] == "Неверные учетные данные"
def test_login_missing_fields(client):
"""Test login with missing fields"""
response = client.post(
"/api/auth/login",
json={"lastName": "Test"}
)
assert response.status_code == 400
assert "Необходимо указать" in response.json()["detail"]
assert response.json()["detail"] == "Неверный пароль"

View File

@@ -1,68 +1,202 @@
"""Employee management endpoint tests"""
import pytest
from app.models.employee import EmployeeCreate
from app.models.employee import Employee
from app.utils.auth import get_password_hash
def test_create_employee(client, test_employee):
"""Test employee creation"""
response = client.post(
"/api/employees",
json={
"first_name": test_employee["first_name"],
"last_name": test_employee["last_name"],
"department": test_employee["department"],
"office": test_employee["office"],
"password": test_employee["password"]
}
)
def test_create_employee(client):
"""Test creating a new employee"""
# Login as admin
admin_response = client.post("/api/auth/admin", json={
"username": "admin",
"password": "admin123"
})
assert admin_response.status_code == 200
admin_token = admin_response.json()["access_token"]
# Create employee
employee_data = {
"first_name": "John",
"last_name": "Doe",
"department": "IT",
"office": "B205",
"password": "test123"
}
headers = {"Authorization": f"Bearer {admin_token}"}
response = client.post("/api/employees/", json=employee_data, headers=headers)
assert response.status_code == 200
data = response.json()
assert data["firstName"] == test_employee["first_name"]
assert data["lastName"] == test_employee["last_name"]
assert data["first_name"] == employee_data["first_name"]
assert data["last_name"] == employee_data["last_name"]
assert data["department"] == employee_data["department"]
assert data["office"] == employee_data["office"]
assert "password" not in data
def test_create_employee_duplicate(client, test_employee):
"""Test creating duplicate employee"""
# Create first employee
client.post(
"/api/employees",
json={
"first_name": test_employee["first_name"],
"last_name": test_employee["last_name"],
"department": test_employee["department"],
"office": test_employee["office"],
"password": test_employee["password"]
}
def test_get_employees(client, db_session):
"""Test getting list of employees"""
# Create test employee
hashed_password = get_password_hash("test123")
employee = Employee(
first_name="Test",
last_name="User",
department="IT",
office="A101",
password=hashed_password
)
db_session.add(employee)
db_session.commit()
# Try to create duplicate
response = client.post(
"/api/employees",
json={
"first_name": test_employee["first_name"],
"last_name": test_employee["last_name"],
"department": test_employee["department"],
"office": test_employee["office"],
"password": test_employee["password"]
}
)
# Сохраняем значения для проверки
expected_first_name = employee.first_name
expected_last_name = employee.last_name
expected_department = employee.department
expected_office = employee.office
assert response.status_code == 400
assert "уже существует" in response.json()["detail"]
# Login as admin
admin_response = client.post("/api/auth/admin", json={
"username": "admin",
"password": "admin123"
})
assert admin_response.status_code == 200
admin_token = admin_response.json()["access_token"]
# Get employees list
headers = {"Authorization": f"Bearer {admin_token}"}
response = client.get("/api/employees/", headers=headers)
assert response.status_code == 200
data = response.json()
assert len(data) == 1
assert data[0]["first_name"] == expected_first_name
assert data[0]["last_name"] == expected_last_name
assert data[0]["department"] == expected_department
assert data[0]["office"] == expected_office
assert "password" not in data[0]
def test_create_employee_invalid_data(client):
"""Test creating employee with invalid data"""
invalid_employee = {
"first_name": "", # Empty name
"last_name": "Test",
"department": "invalid", # Invalid department
"office": "101",
"password": "test"
def test_create_employee_unauthorized(client):
"""Test creating employee without authorization"""
employee_data = {
"first_name": "John",
"last_name": "Doe",
"department": "IT",
"office": "B205",
"password": "test123"
}
response = client.post(
"/api/employees",
json=invalid_employee
response = client.post("/api/employees/", json=employee_data)
assert response.status_code == 401
def test_get_employees_unauthorized(client):
"""Test getting employees list without authorization"""
response = client.get("/api/employees/")
assert response.status_code == 401
def test_get_employee_by_id(client, db_session):
"""Test getting employee by ID"""
# Create test employee
hashed_password = get_password_hash("test123")
employee = Employee(
first_name="Test",
last_name="User",
department="IT",
office="A101",
password=hashed_password
)
db_session.add(employee)
db_session.commit()
assert response.status_code == 422
employee_id = employee.id
# Login as admin
admin_response = client.post("/api/auth/admin", json={
"username": "admin",
"password": "admin123"
})
assert admin_response.status_code == 200
admin_token = admin_response.json()["access_token"]
# Get employee
headers = {"Authorization": f"Bearer {admin_token}"}
response = client.get(f"/api/employees/{employee_id}", headers=headers)
assert response.status_code == 200
data = response.json()
assert data["first_name"] == "Test"
assert data["last_name"] == "User"
assert data["department"] == "IT"
assert data["office"] == "A101"
assert "password" not in data
def test_update_employee(client, db_session):
"""Test updating employee data"""
# Create test employee
hashed_password = get_password_hash("test123")
employee = Employee(
first_name="Test",
last_name="User",
department="IT",
office="A101",
password=hashed_password
)
db_session.add(employee)
db_session.commit()
employee_id = employee.id
# Login as admin
admin_response = client.post("/api/auth/admin", json={
"username": "admin",
"password": "admin123"
})
assert admin_response.status_code == 200
admin_token = admin_response.json()["access_token"]
# Update employee
update_data = {
"first_name": "Updated",
"last_name": "Name",
"department": "HR",
"office": "B202"
}
headers = {"Authorization": f"Bearer {admin_token}"}
response = client.put(f"/api/employees/{employee_id}", json=update_data, headers=headers)
assert response.status_code == 200
data = response.json()
assert data["first_name"] == update_data["first_name"]
assert data["last_name"] == update_data["last_name"]
assert data["department"] == update_data["department"]
assert data["office"] == update_data["office"]
assert "password" not in data
def test_delete_employee(client, db_session):
"""Test deleting employee"""
# Create test employee
hashed_password = get_password_hash("test123")
employee = Employee(
first_name="Test",
last_name="User",
department="IT",
office="A101",
password=hashed_password
)
db_session.add(employee)
db_session.commit()
employee_id = employee.id
# Login as admin
admin_response = client.post("/api/auth/admin", json={
"username": "admin",
"password": "admin123"
})
assert admin_response.status_code == 200
admin_token = admin_response.json()["access_token"]
# Delete employee
headers = {"Authorization": f"Bearer {admin_token}"}
response = client.delete(f"/api/employees/{employee_id}", headers=headers)
assert response.status_code == 200
# Verify employee is deleted
get_response = client.get(f"/api/employees/{employee_id}", headers=headers)
assert get_response.status_code == 404

View File

@@ -1,75 +1,333 @@
"""Request management endpoint tests"""
import pytest
from unittest.mock import patch
from app.models.request import Request, RequestStatus, RequestPriority
from app.models.employee import Employee
from app.utils.auth import get_password_hash
from datetime import datetime, timedelta
def test_create_request(client, test_db, test_employee, test_request):
"""Test request creation"""
# Create test employee first
employee_response = client.post(
"/api/employees",
json={
"first_name": test_employee["first_name"],
"last_name": test_employee["last_name"],
"department": test_employee["department"],
"office": test_employee["office"],
"password": test_employee["password"]
}
def test_create_request(client, db_session):
"""Test creating a new request"""
# Create test employee
hashed_password = get_password_hash("test123")
employee = Employee(
first_name="Test",
last_name="User",
department="IT",
office="A101",
password=hashed_password
)
assert employee_response.status_code == 200
employee_data = employee_response.json()
test_request["employee_id"] = employee_data["id"]
db_session.add(employee)
db_session.commit()
employee_id = employee.id
# Login as employee
login_response = client.post("/api/auth/login", json={
"last_name": "User",
"password": "test123"
})
assert login_response.status_code == 200
token = login_response.json()["access_token"]
# Create request
with patch('app.bot.notifications.send_notification'): # Mock notification
response = client.post(
"/api/requests",
json=test_request
)
request_data = {
"title": "Test Request",
"description": "This is a test request",
"priority": RequestPriority.MEDIUM.value
}
headers = {"Authorization": f"Bearer {token}"}
response = client.post("/api/requests/", json=request_data, headers=headers)
assert response.status_code == 200
data = response.json()
assert data["employee_id"] == test_request["employee_id"]
assert data["status"] == "new"
assert data["title"] == request_data["title"]
assert data["description"] == request_data["description"]
assert data["priority"] == request_data["priority"]
assert data["status"] == RequestStatus.NEW.value
assert data["employee_id"] == employee_id
def test_create_request_invalid_employee(client, test_request):
"""Test creating request with invalid employee ID"""
test_request["employee_id"] = 999 # Non-existent ID
response = client.post(
"/api/requests",
json=test_request
def test_get_employee_requests(client, db_session):
"""Test getting employee's requests"""
# Create test employee
hashed_password = get_password_hash("test123")
employee = Employee(
first_name="Test",
last_name="User",
department="IT",
office="A101",
password=hashed_password
)
assert response.status_code == 404
assert "не найден" in response.json()["detail"]
db_session.add(employee)
db_session.commit()
employee_id = employee.id
def test_create_request_invalid_priority(client, test_db, test_employee):
"""Test creating request with invalid priority"""
# Create test employee first
employee_response = client.post(
"/api/employees",
json={
"first_name": test_employee["first_name"],
"last_name": test_employee["last_name"],
"department": test_employee["department"],
"office": test_employee["office"],
"password": test_employee["password"]
}
# Create test request and save its data
request = Request(
title="Test Request",
description="This is a test request",
priority=RequestPriority.MEDIUM.value,
status=RequestStatus.NEW.value,
employee_id=employee_id
)
assert employee_response.status_code == 200
employee_data = employee_response.json()
db_session.add(request)
db_session.commit()
invalid_request = {
"employee_id": employee_data["id"],
"department": "general",
"request_type": "hardware",
"priority": "invalid", # Invalid priority
"description": "Test request"
# Сохраняем данные для сравнения
expected_data = {
"title": request.title,
"description": request.description,
"priority": request.priority,
"status": request.status,
"employee_id": request.employee_id
}
# Login as employee
login_response = client.post("/api/auth/login", json={
"last_name": "User",
"password": "test123"
})
assert login_response.status_code == 200
token = login_response.json()["access_token"]
# Get requests
headers = {"Authorization": f"Bearer {token}"}
response = client.get("/api/requests/my", headers=headers)
response = client.post(
"/api/requests",
json=invalid_request
assert response.status_code == 200
data = response.json()
assert len(data) == 1
assert data[0]["title"] == expected_data["title"]
assert data[0]["description"] == expected_data["description"]
assert data[0]["priority"] == expected_data["priority"]
assert data[0]["status"] == expected_data["status"]
assert data[0]["employee_id"] == expected_data["employee_id"]
def test_update_request_status(client, db_session):
"""Test updating request status"""
# Create test employee and request
hashed_password = get_password_hash("test123")
employee = Employee(
first_name="Test",
last_name="User",
department="IT",
office="A101",
password=hashed_password
)
db_session.add(employee)
db_session.commit()
employee_id = employee.id
request = Request(
title="Test Request",
description="This is a test request",
priority=RequestPriority.MEDIUM.value,
status=RequestStatus.NEW.value,
employee_id=employee_id
)
db_session.add(request)
db_session.commit()
request_id = request.id
# Login as admin
admin_response = client.post("/api/auth/admin", json={
"username": "admin",
"password": "admin123"
})
assert admin_response.status_code == 200
admin_token = admin_response.json()["access_token"]
# Update request status
update_data = {"status": RequestStatus.IN_PROGRESS.value}
headers = {"Authorization": f"Bearer {admin_token}"}
response = client.patch(f"/api/requests/{request_id}/status", json=update_data, headers=headers)
assert response.status_code == 200
data = response.json()
assert data["status"] == RequestStatus.IN_PROGRESS.value
def test_get_all_requests_admin(client, db_session):
"""Test getting all requests as admin"""
# Create test employees and requests
hashed_password = get_password_hash("test123")
employee1 = Employee(
first_name="Test1",
last_name="User1",
department="IT",
office="A101",
password=hashed_password
)
employee2 = Employee(
first_name="Test2",
last_name="User2",
department="HR",
office="B202",
password=hashed_password
)
db_session.add_all([employee1, employee2])
db_session.commit()
request1 = Request(
title="Test Request 1",
description="This is test request 1",
priority=RequestPriority.HIGH.value,
status=RequestStatus.NEW.value,
employee_id=employee1.id
)
request2 = Request(
title="Test Request 2",
description="This is test request 2",
priority=RequestPriority.MEDIUM.value,
status=RequestStatus.IN_PROGRESS.value,
employee_id=employee2.id
)
db_session.add_all([request1, request2])
db_session.commit()
assert response.status_code == 422
# Сохраняем данные для сравнения
expected_titles = {request1.title, request2.title}
# Login as admin
admin_response = client.post("/api/auth/admin", json={
"username": "admin",
"password": "admin123"
})
assert admin_response.status_code == 200
admin_token = admin_response.json()["access_token"]
# Get all requests
headers = {"Authorization": f"Bearer {admin_token}"}
response = client.get("/api/requests/admin", headers=headers)
assert response.status_code == 200
data = response.json()
assert len(data) == 2
received_titles = {r["title"] for r in data}
assert received_titles == expected_titles
def test_get_requests_by_status(client, db_session):
"""Test filtering requests by status"""
# Create test employee and requests
hashed_password = get_password_hash("test123")
employee = Employee(
first_name="Test",
last_name="User",
department="IT",
office="A101",
password=hashed_password
)
db_session.add(employee)
db_session.commit()
request1 = Request(
title="New Request",
description="This is a new request",
priority=RequestPriority.HIGH.value,
status=RequestStatus.NEW.value,
employee_id=employee.id
)
request2 = Request(
title="In Progress Request",
description="This is an in progress request",
priority=RequestPriority.MEDIUM.value,
status=RequestStatus.IN_PROGRESS.value,
employee_id=employee.id
)
db_session.add_all([request1, request2])
db_session.commit()
# Сохраняем данные для сравнения
expected_data = {
"title": request1.title,
"status": request1.status
}
# Login as admin
admin_response = client.post("/api/auth/admin", json={
"username": "admin",
"password": "admin123"
})
assert admin_response.status_code == 200
admin_token = admin_response.json()["access_token"]
# Get requests filtered by status
headers = {"Authorization": f"Bearer {admin_token}"}
response = client.get(f"/api/requests/admin?status={RequestStatus.NEW.value}", headers=headers)
assert response.status_code == 200
data = response.json()
assert len(data) == 1
assert data[0]["title"] == expected_data["title"]
assert data[0]["status"] == expected_data["status"]
def test_get_request_statistics(client, db_session):
"""Test getting request statistics"""
# Create test employee and requests
hashed_password = get_password_hash("test123")
employee = Employee(
first_name="Test",
last_name="User",
department="IT",
office="A101",
password=hashed_password
)
db_session.add(employee)
db_session.commit()
# Create requests with different statuses
requests_data = [
{"status": RequestStatus.NEW.value, "priority": RequestPriority.HIGH.value},
{"status": RequestStatus.IN_PROGRESS.value, "priority": RequestPriority.MEDIUM.value},
{"status": RequestStatus.COMPLETED.value, "priority": RequestPriority.LOW.value},
{"status": RequestStatus.NEW.value, "priority": RequestPriority.HIGH.value}
]
for i, data in enumerate(requests_data):
request = Request(
title=f"Request {i+1}",
description=f"This is request {i+1}",
priority=data["priority"],
status=data["status"],
employee_id=employee.id
)
db_session.add(request)
db_session.commit()
# Login as admin
admin_response = client.post("/api/auth/admin", json={
"username": "admin",
"password": "admin123"
})
assert admin_response.status_code == 200
admin_token = admin_response.json()["access_token"]
# Get statistics
headers = {"Authorization": f"Bearer {admin_token}"}
response = client.get("/api/requests/statistics", headers=headers)
assert response.status_code == 200
data = response.json()
# Проверяем статистику
assert "total_requests" in data
assert data["total_requests"] == 4
assert "by_status" in data
assert data["by_status"]["new"] == 2
assert data["by_status"]["in_progress"] == 1
assert data["by_status"]["completed"] == 1
assert "by_priority" in data
assert data["by_priority"]["high"] == 2
assert data["by_priority"]["medium"] == 1
assert data["by_priority"]["low"] == 1
def test_create_request_unauthorized(client):
"""Test creating request without authorization"""
request_data = {
"title": "Test Request",
"description": "This is a test request",
"priority": RequestPriority.MEDIUM.value
}
response = client.post("/api/requests/", json=request_data)
assert response.status_code == 401
def test_get_requests_unauthorized(client):
"""Test getting requests without authorization"""
response = client.get("/api/requests/my")
assert response.status_code == 401