1
0
mirror of https://gitlab.com/MoonTestUse1/AdministrationItDepartmens.git synced 2025-08-14 00:25:46 +02:00

Fix tests

This commit is contained in:
MoonTestUse1
2025-01-06 23:40:39 +06:00
parent 161361609d
commit fec52c777b
17 changed files with 249 additions and 368 deletions

View File

@@ -1,16 +1,13 @@
"""Settings configuration""" """Application configuration"""
import os from functools import lru_cache
from pydantic_settings import BaseSettings, SettingsConfigDict from pydantic_settings import BaseSettings
class Settings(BaseSettings): class Settings(BaseSettings):
"""Application settings""" """Application settings"""
PROJECT_NAME: str = "Support Service" PROJECT_NAME: str = "Employee Request System"
VERSION: str = "1.0.0"
API_V1_STR: str = "/api"
# Database # Database
TESTING: bool = os.getenv("TESTING", "False") == "True" DATABASE_URL: str = "postgresql://postgres:postgres@localhost:5432/employee_requests"
DATABASE_URL: str = "sqlite:///./test.db" if TESTING else "postgresql://postgres:postgres123@postgres:5432/support_db"
# JWT # JWT
SECRET_KEY: str = "your-secret-key" SECRET_KEY: str = "your-secret-key"
@@ -18,21 +15,16 @@ class Settings(BaseSettings):
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30 ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
# Redis # Redis
REDIS_HOST: str = "redis" REDIS_HOST: str = "localhost"
REDIS_PORT: int = 6379 REDIS_PORT: int = 6379
# Admin class Config:
ADMIN_USERNAME: str = "admin" """Pydantic config"""
ADMIN_PASSWORD: str = "admin123" case_sensitive = True
# Telegram @lru_cache()
TELEGRAM_BOT_TOKEN: str = "your-bot-token" def get_settings() -> Settings:
TELEGRAM_CHAT_ID: str = "your-chat-id" """Get cached settings"""
return Settings()
model_config = SettingsConfigDict( settings = get_settings()
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=True
)
settings = Settings()

View File

@@ -1,36 +1,24 @@
"""Test settings configuration""" """Test configuration"""
from pydantic_settings import BaseSettings, SettingsConfigDict from pydantic_settings import BaseSettings
class TestSettings(BaseSettings): class TestSettings(BaseSettings):
"""Test application settings""" """Test settings"""
PROJECT_NAME: str = "Support Service Test" PROJECT_NAME: str = "Employee Request System Test"
VERSION: str = "1.0.0"
API_V1_STR: str = "/api"
# Database # Database
DATABASE_URL: str = "sqlite:///:memory:" DATABASE_URL: str = "sqlite:///:memory:"
# JWT # JWT
SECRET_KEY: str = "test-secret-key" SECRET_KEY: str = "test_secret_key"
ALGORITHM: str = "HS256" ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30 ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
# Redis # Redis
REDIS_HOST: str = "redis" REDIS_HOST: str = "localhost"
REDIS_PORT: int = 6379 REDIS_PORT: int = 6379
# Admin class Config:
ADMIN_USERNAME: str = "admin" """Pydantic config"""
ADMIN_PASSWORD: str = "admin123" case_sensitive = True
# Telegram
TELEGRAM_BOT_TOKEN: str = "test-bot-token"
TELEGRAM_CHAT_ID: str = "test-chat-id"
model_config = SettingsConfigDict(
env_file=".env.test",
env_file_encoding="utf-8",
case_sensitive=True
)
test_settings = TestSettings() test_settings = TestSettings()

View File

@@ -8,13 +8,7 @@ from . import employees
def create_request(db: Session, request: RequestCreate, employee_id: int) -> Request: def create_request(db: Session, request: RequestCreate, employee_id: int) -> Request:
"""Create new request""" """Create new request"""
# Получаем данные сотрудника
employee = employees.get_employee(db, employee_id)
if not employee:
raise ValueError("Employee not found")
db_request = Request( db_request = Request(
department=employee.department, # Берем отдел из данных сотрудника
request_type=request.request_type, request_type=request.request_type,
description=request.description, description=request.description,
priority=request.priority, priority=request.priority,
@@ -30,27 +24,6 @@ def get_request(db: Session, request_id: int) -> Optional[Request]:
"""Get request by ID""" """Get request by ID"""
return db.query(Request).filter(Request.id == request_id).first() return db.query(Request).filter(Request.id == request_id).first()
def get_request_details(db: Session, request_id: int) -> Optional[Dict]:
"""Get detailed request information including employee data"""
request = get_request(db, request_id)
if not request:
return None
employee = employees.get_employee(db, request.employee_id)
if not employee:
return None
return {
"id": request.id,
"request_type": request.request_type,
"description": request.description,
"priority": request.priority,
"status": request.status,
"department": request.department,
"created_at": request.created_at.isoformat(),
"employee_full_name": employee.full_name
}
def get_employee_requests(db: Session, employee_id: int) -> list[Request]: def get_employee_requests(db: Session, employee_id: int) -> list[Request]:
"""Get employee's requests""" """Get employee's requests"""
return db.query(Request).filter(Request.employee_id == employee_id).all() return db.query(Request).filter(Request.employee_id == employee_id).all()
@@ -80,12 +53,6 @@ def get_statistics(db: Session) -> Dict:
func.count(Request.id) func.count(Request.id)
).group_by(Request.status).all() ).group_by(Request.status).all()
) )
# Добавляем статусы с нулевым количеством
for status in RequestStatus:
if status not in by_status:
by_status[status] = 0
return { return {
"total": total, "total": total,
"by_status": by_status "by_status": by_status

View File

@@ -1,39 +1,27 @@
"""Database configuration""" """Database module"""
import os
from sqlalchemy import create_engine from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
from .core.config import settings from .core.config import settings
# Для создания таблиц импортируем модели # Определяем, используем ли тестовую базу данных
from .models.employee import Employee # noqa TESTING = os.getenv("TESTING", "False") == "True"
from .models.request import Request # noqa DATABASE_URL = "sqlite:///:memory:" if TESTING else settings.DATABASE_URL
from .models.token import Token # noqa
def get_database_url(): # Создаем базовый класс для моделей
"""Получение URL базы данных в зависимости от окружения.""" Base = declarative_base()
try:
from .core.test_config import test_settings
return test_settings.DATABASE_URL
except ImportError:
return settings.DATABASE_URL
# Используем правильный URL для базы данных # Создаем движок базы данных
SQLALCHEMY_DATABASE_URL = get_database_url() connect_args = {"check_same_thread": False} if TESTING else {}
engine = create_engine(DATABASE_URL, connect_args=connect_args)
# Создаем движок с нужными параметрами
connect_args = {}
if SQLALCHEMY_DATABASE_URL.startswith("sqlite"):
connect_args["check_same_thread"] = False
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args=connect_args
)
# Создаем фабрику сессий # Создаем фабрику сессий
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def get_db(): def get_db():
"""Получение сессии базы данных.""" """Get database session"""
db = SessionLocal() db = SessionLocal()
try: try:
yield db yield db

View File

@@ -1,7 +1,10 @@
"""Models initialization""" """Models initialization"""
from .base import Base from ..database import Base, engine
from .employee import Employee from .employee import Employee
from .request import Request from .request import Request
from .token import Token from .token import Token
# Создаем все таблицы
Base.metadata.create_all(bind=engine)
__all__ = ['Base', 'Employee', 'Request', 'Token'] __all__ = ['Base', 'Employee', 'Request', 'Token']

View File

@@ -1,10 +1,12 @@
"""Employee model""" """Employee model"""
from sqlalchemy import Column, Integer, String, DateTime from sqlalchemy import Column, Integer, String, Boolean, DateTime
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship from sqlalchemy.orm import relationship
from .base import Base from datetime import datetime
from ..database import Base
class Employee(Base): class Employee(Base):
"""Employee model"""
__tablename__ = "employees" __tablename__ = "employees"
id = Column(Integer, primary_key=True, index=True) id = Column(Integer, primary_key=True, index=True)
@@ -13,7 +15,8 @@ class Employee(Base):
department = Column(String, nullable=False) department = Column(String, nullable=False)
office = Column(String, nullable=False) office = Column(String, nullable=False)
hashed_password = Column(String, nullable=False) hashed_password = Column(String, nullable=False)
created_at = Column(DateTime(timezone=True), server_default=func.now()) is_active = Column(Boolean, default=True)
is_admin = Column(Boolean, default=False)
created_at = Column(DateTime, nullable=False, default=datetime.utcnow)
# Определяем отношение к Request requests = relationship("Request", back_populates="employee")
requests = relationship("Request", back_populates="employee", cascade="all, delete-orphan")

View File

@@ -1,31 +1,41 @@
"""Request model""" """Request model"""
from enum import Enum from enum import Enum
from sqlalchemy import Column, Integer, String, ForeignKey, DateTime from sqlalchemy import Column, Integer, String, Enum as SQLEnum, ForeignKey, DateTime
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship from sqlalchemy.orm import relationship
from .base import Base from datetime import datetime
from ..database import Base
class RequestStatus(str, Enum): class RequestStatus(str, Enum):
"""Request status enum"""
NEW = "new" NEW = "new"
IN_PROGRESS = "in_progress" IN_PROGRESS = "in_progress"
COMPLETED = "completed" COMPLETED = "completed"
REJECTED = "rejected" REJECTED = "rejected"
class RequestPriority(str, Enum): class RequestPriority(str, Enum):
"""Request priority enum"""
LOW = "low" LOW = "low"
MEDIUM = "medium" MEDIUM = "medium"
HIGH = "high" HIGH = "high"
class RequestType(str, Enum):
"""Request type enum"""
VACATION = "vacation"
SICK_LEAVE = "sick_leave"
EQUIPMENT = "equipment"
OTHER = "other"
class Request(Base): class Request(Base):
"""Request model"""
__tablename__ = "requests" __tablename__ = "requests"
id = Column(Integer, primary_key=True, index=True) id = Column(Integer, primary_key=True, index=True)
request_type = Column(String, index=True) request_type = Column(SQLEnum(RequestType), nullable=False)
description = Column(String) description = Column(String, nullable=False)
priority = Column(String) priority = Column(SQLEnum(RequestPriority), nullable=False, default=RequestPriority.MEDIUM)
status = Column(String, default=RequestStatus.NEW) status = Column(SQLEnum(RequestStatus), nullable=False, default=RequestStatus.NEW)
employee_id = Column(Integer, ForeignKey("employees.id")) created_at = Column(DateTime, nullable=False, default=datetime.utcnow)
created_at = Column(DateTime(timezone=True), server_default=func.now()) employee_id = Column(Integer, ForeignKey("employees.id"), nullable=False)
# Определяем отношение к Employee
employee = relationship("Employee", back_populates="requests") employee = relationship("Employee", back_populates="requests")

View File

@@ -1,12 +1,14 @@
"""Token model""" """Token model"""
from sqlalchemy import Column, Integer, String, DateTime from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy.sql import func from datetime import datetime
from .base import Base
from ..database import Base
class Token(Base): class Token(Base):
"""Token model"""
__tablename__ = "tokens" __tablename__ = "tokens"
id = Column(Integer, primary_key=True, index=True) id = Column(Integer, primary_key=True, index=True)
token = Column(String, unique=True, index=True) token = Column(String, unique=True, index=True)
user_id = Column(Integer, index=True) # -1 для админа, остальные для сотрудников employee_id = Column(Integer)
created_at = Column(DateTime(timezone=True), server_default=func.now()) created_at = Column(DateTime, default=datetime.utcnow)

View File

@@ -42,16 +42,17 @@ async def admin_login(
db: Session = Depends(get_db) db: Session = Depends(get_db)
): ):
"""Авторизация администратора""" """Авторизация администратора"""
if form_data.username != "admin" or form_data.password != "admin123": # Проверяем учетные данные администратора
employee = employees.get_employee_by_last_name(db, form_data.username)
if not employee or not employee.is_admin or not verify_password(form_data.password, employee.hashed_password):
raise HTTPException( raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password", detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"}, headers={"WWW-Authenticate": "Bearer"},
) )
# Для админа используем специальный ID # Создаем и сохраняем токен
admin_id = -1 access_token = create_and_save_token(employee.id, db)
access_token = create_and_save_token(admin_id, db)
return { return {
"access_token": access_token, "access_token": access_token,

View File

@@ -8,6 +8,7 @@ class EmployeeBase(BaseModel):
last_name: str last_name: str
department: str department: str
office: str office: str
is_admin: bool = False
model_config = ConfigDict(from_attributes=True) model_config = ConfigDict(from_attributes=True)
@@ -19,6 +20,7 @@ class EmployeeUpdate(BaseModel):
last_name: Optional[str] = None last_name: Optional[str] = None
department: Optional[str] = None department: Optional[str] = None
office: Optional[str] = None office: Optional[str] = None
is_admin: Optional[bool] = None
model_config = ConfigDict(from_attributes=True) model_config = ConfigDict(from_attributes=True)

View File

@@ -1,29 +1,30 @@
"""Request schemas""" """Request schemas"""
from pydantic import BaseModel, ConfigDict from pydantic import BaseModel
from typing import Optional
from datetime import datetime from datetime import datetime
from ..models.request import RequestStatus, RequestPriority from typing import Optional
from ..models.request import RequestStatus, RequestPriority, RequestType
class RequestBase(BaseModel): class RequestBase(BaseModel):
request_type: str """Base request schema"""
request_type: RequestType
description: str description: str
priority: RequestPriority priority: RequestPriority = RequestPriority.MEDIUM
model_config = ConfigDict(from_attributes=True)
class RequestCreate(RequestBase): class RequestCreate(RequestBase):
"""Request create schema"""
pass pass
class RequestUpdate(BaseModel):
status: RequestStatus
model_config = ConfigDict(from_attributes=True)
class Request(RequestBase): class Request(RequestBase):
"""Request schema"""
id: int id: int
status: RequestStatus status: RequestStatus
employee_id: int
department: Optional[str] = None
created_at: datetime created_at: datetime
employee_id: int
model_config = ConfigDict(from_attributes=True) class Config:
"""Pydantic config"""
from_attributes = True
class RequestUpdate(BaseModel):
"""Request update schema"""
status: RequestStatus

View File

@@ -7,7 +7,7 @@ import re
from .jwt import verify_token from .jwt import verify_token
from ..database import get_db from ..database import get_db
from ..models.employee import Employee from ..crud import employees
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
security = HTTPBearer(auto_error=False) security = HTTPBearer(auto_error=False)
@@ -23,7 +23,7 @@ def verify_password(plain_password: str, hashed_password: str) -> bool:
def get_current_admin( def get_current_admin(
credentials: HTTPAuthorizationCredentials = Depends(security), credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db) db: Session = Depends(get_db)
) -> Employee: ) -> dict:
"""Get current admin from token""" """Get current admin from token"""
if not credentials: if not credentials:
raise HTTPException( raise HTTPException(
@@ -37,28 +37,27 @@ def get_current_admin(
payload = verify_token(token, db) payload = verify_token(token, db)
employee_id = int(payload.get("sub")) employee_id = int(payload.get("sub"))
# Получаем сотрудника из БД # Проверяем, что это админ
from ..crud.employees import get_employee employee = employees.get_employee(db, employee_id)
employee = get_employee(db, employee_id)
if not employee or not employee.is_admin: if not employee or not employee.is_admin:
raise HTTPException( raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, status_code=status.HTTP_403_FORBIDDEN,
detail="Could not validate credentials", detail="Not enough permissions",
headers={"WWW-Authenticate": "Bearer"}, headers={"WWW-Authenticate": "Bearer"},
) )
return employee return employee
except Exception: except Exception as e:
raise HTTPException( raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials", detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"}, headers={"WWW-Authenticate": "Bearer"},
) )
def get_current_employee( def get_current_employee(
credentials: HTTPAuthorizationCredentials = Depends(security), credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db) db: Session = Depends(get_db)
) -> Employee: ) -> dict:
"""Get current employee from token""" """Get current employee from token"""
if not credentials: if not credentials:
raise HTTPException( raise HTTPException(
@@ -72,13 +71,12 @@ def get_current_employee(
payload = verify_token(token, db) payload = verify_token(token, db)
employee_id = int(payload.get("sub")) employee_id = int(payload.get("sub"))
# Получаем сотрудника из БД # Проверяем существование сотрудника
from ..crud.employees import get_employee employee = employees.get_employee(db, employee_id)
employee = get_employee(db, employee_id)
if not employee: if not employee:
raise HTTPException( raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials", detail="Employee not found",
headers={"WWW-Authenticate": "Bearer"}, headers={"WWW-Authenticate": "Bearer"},
) )
@@ -86,6 +84,6 @@ def get_current_employee(
except Exception: except Exception:
raise HTTPException( raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials", detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"}, headers={"WWW-Authenticate": "Bearer"},
) )

View File

@@ -1,22 +1,13 @@
"""JWT utilities""" """JWT utilities"""
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt from jose import JWTError, jwt
from fastapi import HTTPException, status
from redis import Redis
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from ..core.config import settings from ..core.config import settings
from ..models.token import Token from ..models.token import Token
from ..crud.employees import get_employee
redis = Redis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
decode_responses=True
)
def create_access_token(data: dict) -> str: def create_access_token(data: dict) -> str:
"""Create access token"""
to_encode = data.copy() to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire}) to_encode.update({"exp": expire})
@@ -24,59 +15,26 @@ def create_access_token(data: dict) -> str:
return encoded_jwt return encoded_jwt
def verify_token(token: str, db: Session) -> dict: def verify_token(token: str, db: Session) -> dict:
"""Verify token"""
try: try:
# Проверяем, что токен действителен
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]) payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
user_id: int = payload.get("sub")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
)
# Проверяем токен в Redis
if not redis.get(f"token:{token}"):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
)
return payload return payload
except JWTError: except JWTError:
raise HTTPException( return None
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
)
def create_and_save_token(user_id: int, db: Session) -> str: def create_and_save_token(employee_id: int, db: Session) -> str:
# Создаем JWT токен """Create and save token"""
access_token = create_access_token({"sub": str(user_id)}) # Создаем токен
access_token = create_access_token({"sub": str(employee_id)})
# Сохраняем в БД # Сохраняем токен в базу
db_token = Token( db_token = Token(
token=access_token, token=access_token,
user_id=user_id employee_id=employee_id
) )
db.add(db_token) db.add(db_token)
db.commit() db.commit()
db.refresh(db_token)
# Кэшируем в Redis
redis.setex(
f"token:{access_token}",
timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES),
"valid"
)
return access_token return access_token
def get_current_employee(token: str, db: Session):
payload = verify_token(token, db)
employee_id = int(payload.get("sub"))
if employee_id == -1: # Для админа
return {"is_admin": True}
employee = get_employee(db, employee_id)
if employee is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Employee not found",
)
return employee

View File

@@ -1,129 +1,110 @@
"""Test configuration.""" """Test fixtures"""
import os
import pytest import pytest
from typing import Generator
from fastapi.testclient import TestClient from fastapi.testclient import TestClient
from sqlalchemy import create_engine from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool from sqlalchemy.pool import StaticPool
from unittest.mock import patch import fakeredis.aioredis
from typing import Generator
from app.core.test_config import test_settings # Устанавливаем флаг тестирования
from app.database import get_db os.environ["TESTING"] = "True"
from app.models.base import Base
from app.main import app
from app.database import Base, get_db
from app.models.employee import Employee from app.models.employee import Employee
from app.utils.auth import get_password_hash from app.utils.auth import get_password_hash
# Создаем тестовую базу данных в памяти # Создаем тестовую базу данных в памяти
SQLALCHEMY_DATABASE_URL = "sqlite:///:memory:"
engine = create_engine( engine = create_engine(
test_settings.DATABASE_URL, SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False}, connect_args={"check_same_thread": False},
poolclass=StaticPool poolclass=StaticPool,
) )
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
class MockRedis: # Создаем тестовую базу данных
"""Мок для Redis.""" Base.metadata.create_all(bind=engine)
def __init__(self):
self.data = {}
def get(self, key): @pytest.fixture
return self.data.get(key)
def set(self, key, value, ex=None):
self.data[key] = value
return True
def delete(self, key):
if key in self.data:
del self.data[key]
return True
def exists(self, key):
return key in self.data
@pytest.fixture(scope="function")
def redis_mock():
"""Фикстура для мока Redis."""
with patch("app.utils.jwt.redis") as mock:
redis_instance = MockRedis()
mock.get.side_effect = redis_instance.get
mock.set.side_effect = redis_instance.set
mock.delete.side_effect = redis_instance.delete
mock.exists.side_effect = redis_instance.exists
yield mock
@pytest.fixture(scope="function")
def db() -> Generator: def db() -> Generator:
"""Фикстура для создания тестовой базы данных.""" """Фикстура для получения тестовой сессии БД."""
Base.metadata.create_all(bind=engine) connection = engine.connect()
db = TestingSessionLocal() transaction = connection.begin()
try: session = TestingSessionLocal(bind=connection)
yield db
finally:
db.close()
Base.metadata.drop_all(bind=engine)
@pytest.fixture(scope="function") yield session
def client(db: TestingSessionLocal, redis_mock) -> Generator:
"""Фикстура для создания тестового клиента.""" session.close()
transaction.rollback()
connection.close()
@pytest.fixture
def client(db) -> TestClient:
"""Фикстура для получения тестового клиента."""
def override_get_db(): def override_get_db():
try: try:
yield db yield db
finally: finally:
db.close() pass
# Импортируем app здесь, чтобы использовать тестовые настройки
from app.main import get_application
app = get_application(test_settings)
app.dependency_overrides[get_db] = override_get_db app.dependency_overrides[get_db] = override_get_db
return TestClient(app) yield TestClient(app)
app.dependency_overrides.clear()
@pytest.fixture(scope="function") @pytest.fixture
def test_employee(db: TestingSessionLocal) -> Employee: def test_employee(db) -> Employee:
"""Фикстура для создания тестового сотрудника.""" """Фикстура для создания тестового сотрудника."""
employee = Employee( employee = Employee(
first_name="Test", first_name="Test",
last_name="User", last_name="Employee",
department="IT", department="Test Department",
office="101", office="Test Office",
hashed_password=get_password_hash("testpassword") hashed_password=get_password_hash("testpassword"),
is_admin=False
) )
db.add(employee) db.add(employee)
db.commit() db.commit()
db.refresh(employee) db.refresh(employee)
return employee return employee
@pytest.fixture(scope="function") @pytest.fixture
def test_admin(db: TestingSessionLocal) -> Employee: def test_admin(db) -> Employee:
"""Фикстура для создания тестового администратора.""" """Фикстура для создания тестового администратора."""
admin = Employee( admin = Employee(
first_name="Admin", first_name="Admin",
last_name="User", last_name="User",
department="Administration", department="Admin Department",
office="100", office="Admin Office",
hashed_password=get_password_hash("adminpassword") hashed_password=get_password_hash("adminpassword"),
is_admin=True
) )
db.add(admin) db.add(admin)
db.commit() db.commit()
db.refresh(admin) db.refresh(admin)
return admin return admin
@pytest.fixture(scope="function") @pytest.fixture
def employee_token(test_employee: Employee, db: TestingSessionLocal) -> str: def employee_token(client: TestClient, test_employee: Employee) -> str:
"""Фикстура для создания токена тестового сотрудника.""" """Фикстура для получения токена сотрудника."""
from app.utils.jwt import create_access_token response = client.post(
token = create_access_token({"sub": str(test_employee.id)}) "/api/auth/login",
# Сохраняем токен в Redis мок data={"username": test_employee.last_name, "password": "testpassword"}
from app.utils.jwt import redis )
redis.set(f"token:{token}", "valid") return response.json()["access_token"]
return token
@pytest.fixture(scope="function") @pytest.fixture
def admin_token(test_admin: Employee, db: TestingSessionLocal) -> str: def admin_token(client: TestClient, test_admin: Employee) -> str:
"""Фикстура для создания токена администратора.""" """Фикстура для получения токена администратора."""
from app.utils.jwt import create_access_token response = client.post(
token = create_access_token({"sub": str(test_admin.id)}) "/api/auth/admin/login",
# Сохраняем токен в Redis мок data={"username": test_admin.last_name, "password": "adminpassword"}
from app.utils.jwt import redis )
redis.set(f"token:{token}", "valid") return response.json()["access_token"]
return token
@pytest.fixture
def redis_mock():
"""Фикстура для мока Redis."""
return fakeredis.aioredis.FakeRedis()

View File

@@ -8,7 +8,7 @@ def test_login_employee_success(client: TestClient, test_employee: Employee):
"""Тест успешной авторизации сотрудника.""" """Тест успешной авторизации сотрудника."""
response = client.post( response = client.post(
"/api/auth/login", "/api/auth/login",
data={"username": test_employee.email, "password": "testpassword"} data={"username": test_employee.last_name, "password": "testpassword"}
) )
assert response.status_code == 200 assert response.status_code == 200
assert "access_token" in response.json() assert "access_token" in response.json()
@@ -19,7 +19,7 @@ def test_login_employee_wrong_password(client: TestClient, test_employee: Employ
"""Тест авторизации сотрудника с неверным паролем.""" """Тест авторизации сотрудника с неверным паролем."""
response = client.post( response = client.post(
"/api/auth/login", "/api/auth/login",
data={"username": test_employee.email, "password": "wrongpassword"} data={"username": test_employee.last_name, "password": "wrongpassword"}
) )
assert response.status_code == 401 assert response.status_code == 401
assert response.json()["detail"] == "Incorrect username or password" assert response.json()["detail"] == "Incorrect username or password"
@@ -28,7 +28,7 @@ def test_login_employee_wrong_username(client: TestClient):
"""Тест авторизации с несуществующим пользователем.""" """Тест авторизации с несуществующим пользователем."""
response = client.post( response = client.post(
"/api/auth/login", "/api/auth/login",
data={"username": "nonexistent@example.com", "password": "testpassword"} data={"username": "nonexistent", "password": "testpassword"}
) )
assert response.status_code == 401 assert response.status_code == 401
assert response.json()["detail"] == "Incorrect username or password" assert response.json()["detail"] == "Incorrect username or password"
@@ -37,7 +37,7 @@ def test_login_admin_success(client: TestClient, test_admin: Employee):
"""Тест успешной авторизации администратора.""" """Тест успешной авторизации администратора."""
response = client.post( response = client.post(
"/api/auth/admin/login", "/api/auth/admin/login",
data={"username": test_admin.email, "password": "adminpassword"} data={"username": test_admin.last_name, "password": "adminpassword"}
) )
assert response.status_code == 200 assert response.status_code == 200
assert "access_token" in response.json() assert "access_token" in response.json()
@@ -48,7 +48,7 @@ def test_login_admin_wrong_password(client: TestClient, test_admin: Employee):
"""Тест авторизации администратора с неверным паролем.""" """Тест авторизации администратора с неверным паролем."""
response = client.post( response = client.post(
"/api/auth/admin/login", "/api/auth/admin/login",
data={"username": test_admin.email, "password": "wrongpassword"} data={"username": test_admin.last_name, "password": "wrongpassword"}
) )
assert response.status_code == 401 assert response.status_code == 401
assert response.json()["detail"] == "Incorrect username or password" assert response.json()["detail"] == "Incorrect username or password"
@@ -61,8 +61,8 @@ def test_protected_route_with_valid_token(client: TestClient, employee_token: st
) )
assert response.status_code == 200 assert response.status_code == 200
data = response.json() data = response.json()
assert data["email"] == test_employee.email assert data["first_name"] == test_employee.first_name
assert data["full_name"] == test_employee.full_name assert data["last_name"] == test_employee.last_name
def test_protected_route_without_token(client: TestClient): def test_protected_route_without_token(client: TestClient):
"""Тест доступа к защищенному маршруту без токена.""" """Тест доступа к защищенному маршруту без токена."""
@@ -77,4 +77,4 @@ def test_protected_route_with_invalid_token(client: TestClient):
headers={"Authorization": "Bearer invalid_token"} headers={"Authorization": "Bearer invalid_token"}
) )
assert response.status_code == 401 assert response.status_code == 401
assert response.json()["detail"] == "Could not validate credentials" assert response.json()["detail"] == "Invalid authentication credentials"

View File

@@ -10,19 +10,19 @@ def test_create_employee(client: TestClient, admin_token: str, db: Session):
"/api/employees", "/api/employees",
headers={"Authorization": f"Bearer {admin_token}"}, headers={"Authorization": f"Bearer {admin_token}"},
json={ json={
"email": "new@example.com", "first_name": "New",
"password": "newpassword", "last_name": "Employee",
"full_name": "New Employee",
"department": "IT", "department": "IT",
"is_active": True, "office": "102",
"is_admin": False "password": "newpassword"
} }
) )
assert response.status_code == 201 assert response.status_code == 201
data = response.json() data = response.json()
assert data["email"] == "new@example.com" assert data["first_name"] == "New"
assert data["full_name"] == "New Employee" assert data["last_name"] == "Employee"
assert data["department"] == "IT" assert data["department"] == "IT"
assert data["office"] == "102"
assert "id" in data assert "id" in data
def test_create_employee_unauthorized(client: TestClient): def test_create_employee_unauthorized(client: TestClient):
@@ -30,11 +30,11 @@ def test_create_employee_unauthorized(client: TestClient):
response = client.post( response = client.post(
"/api/employees", "/api/employees",
json={ json={
"email": "new@example.com", "first_name": "New",
"password": "newpassword", "last_name": "Employee",
"full_name": "New Employee", "department": "IT",
"is_active": True, "office": "102",
"is_admin": False "password": "newpassword"
} }
) )
assert response.status_code == 401 assert response.status_code == 401
@@ -50,9 +50,10 @@ def test_get_employees(client: TestClient, admin_token: str, test_employee: Empl
data = response.json() data = response.json()
assert isinstance(data, list) assert isinstance(data, list)
assert len(data) > 0 assert len(data) > 0
assert "email" in data[0] assert "first_name" in data[0]
assert "full_name" in data[0] assert "last_name" in data[0]
assert "department" in data[0] assert "department" in data[0]
assert "office" in data[0]
def test_get_employee_by_id(client: TestClient, admin_token: str, test_employee: Employee, db: Session): def test_get_employee_by_id(client: TestClient, admin_token: str, test_employee: Employee, db: Session):
"""Тест получения сотрудника по ID.""" """Тест получения сотрудника по ID."""
@@ -62,9 +63,10 @@ def test_get_employee_by_id(client: TestClient, admin_token: str, test_employee:
) )
assert response.status_code == 200 assert response.status_code == 200
data = response.json() data = response.json()
assert data["email"] == test_employee.email assert data["first_name"] == test_employee.first_name
assert data["full_name"] == test_employee.full_name assert data["last_name"] == test_employee.last_name
assert data["department"] == test_employee.department assert data["department"] == test_employee.department
assert data["office"] == test_employee.office
def test_get_nonexistent_employee(client: TestClient, admin_token: str): def test_get_nonexistent_employee(client: TestClient, admin_token: str):
"""Тест получения несуществующего сотрудника.""" """Тест получения несуществующего сотрудника."""
@@ -81,18 +83,18 @@ def test_update_employee(client: TestClient, admin_token: str, test_employee: Em
f"/api/employees/{test_employee.id}", f"/api/employees/{test_employee.id}",
headers={"Authorization": f"Bearer {admin_token}"}, headers={"Authorization": f"Bearer {admin_token}"},
json={ json={
"email": "updated@example.com", "first_name": "Updated",
"full_name": "Updated Employee", "last_name": "Name",
"department": "HR", "department": "HR",
"is_active": True, "office": "103"
"is_admin": False
} }
) )
assert response.status_code == 200 assert response.status_code == 200
data = response.json() data = response.json()
assert data["email"] == "updated@example.com" assert data["first_name"] == "Updated"
assert data["full_name"] == "Updated Employee" assert data["last_name"] == "Name"
assert data["department"] == "HR" assert data["department"] == "HR"
assert data["office"] == "103"
def test_delete_employee(client: TestClient, admin_token: str, test_employee: Employee, db: Session): def test_delete_employee(client: TestClient, admin_token: str, test_employee: Employee, db: Session):
"""Тест удаления сотрудника.""" """Тест удаления сотрудника."""
@@ -102,9 +104,10 @@ def test_delete_employee(client: TestClient, admin_token: str, test_employee: Em
) )
assert response.status_code == 200 assert response.status_code == 200
data = response.json() data = response.json()
assert data["email"] == test_employee.email assert data["first_name"] == test_employee.first_name
assert data["full_name"] == test_employee.full_name assert data["last_name"] == test_employee.last_name
assert data["department"] == test_employee.department assert data["department"] == test_employee.department
assert data["office"] == test_employee.office
def test_employee_me(client: TestClient, employee_token: str, test_employee: Employee, db: Session): def test_employee_me(client: TestClient, employee_token: str, test_employee: Employee, db: Session):
"""Тест получения информации о текущем сотруднике.""" """Тест получения информации о текущем сотруднике."""
@@ -114,9 +117,10 @@ def test_employee_me(client: TestClient, employee_token: str, test_employee: Emp
) )
assert response.status_code == 200 assert response.status_code == 200
data = response.json() data = response.json()
assert data["email"] == test_employee.email assert data["first_name"] == test_employee.first_name
assert data["full_name"] == test_employee.full_name assert data["last_name"] == test_employee.last_name
assert data["department"] == test_employee.department assert data["department"] == test_employee.department
assert data["office"] == test_employee.office
def test_update_me(client: TestClient, employee_token: str, test_employee: Employee, db: Session): def test_update_me(client: TestClient, employee_token: str, test_employee: Employee, db: Session):
"""Тест обновления информации о текущем сотруднике.""" """Тест обновления информации о текущем сотруднике."""
@@ -124,12 +128,15 @@ def test_update_me(client: TestClient, employee_token: str, test_employee: Emplo
"/api/employees/me", "/api/employees/me",
headers={"Authorization": f"Bearer {employee_token}"}, headers={"Authorization": f"Bearer {employee_token}"},
json={ json={
"full_name": "Updated Name", "first_name": "Updated",
"department": "Support" "last_name": "Name",
"department": "Support",
"office": "104"
} }
) )
assert response.status_code == 200 assert response.status_code == 200
data = response.json() data = response.json()
assert data["full_name"] == "Updated Name" assert data["first_name"] == "Updated"
assert data["email"] == test_employee.email assert data["last_name"] == "Name"
assert data["department"] == "Support" assert data["department"] == "Support"
assert data["office"] == "104"

View File

@@ -11,14 +11,14 @@ def test_create_request(client: TestClient, employee_token: str, db: Session):
"/api/requests", "/api/requests",
headers={"Authorization": f"Bearer {employee_token}"}, headers={"Authorization": f"Bearer {employee_token}"},
json={ json={
"request_type": "support", "request_type": "equipment",
"description": "Test Description", "description": "Test Description",
"priority": "medium" "priority": "medium"
} }
) )
assert response.status_code == 201 assert response.status_code == 201
data = response.json() data = response.json()
assert data["request_type"] == "support" assert data["request_type"] == "equipment"
assert data["description"] == "Test Description" assert data["description"] == "Test Description"
assert data["priority"] == "medium" assert data["priority"] == "medium"
assert data["status"] == "new" assert data["status"] == "new"
@@ -29,7 +29,7 @@ def test_create_request_unauthorized(client: TestClient):
response = client.post( response = client.post(
"/api/requests", "/api/requests",
json={ json={
"request_type": "support", "request_type": "equipment",
"description": "Test Description", "description": "Test Description",
"priority": "medium" "priority": "medium"
} }
@@ -39,13 +39,9 @@ def test_create_request_unauthorized(client: TestClient):
def test_get_employee_requests(client: TestClient, employee_token: str, test_employee: Employee, db: Session): def test_get_employee_requests(client: TestClient, employee_token: str, test_employee: Employee, db: Session):
"""Тест получения списка заявок сотрудника.""" """Тест получения списка заявок сотрудника."""
db.add(test_employee)
db.commit()
db.refresh(test_employee)
# Создаем тестовую заявку # Создаем тестовую заявку
request = Request( request = Request(
request_type="support", request_type="equipment",
description="Test Description", description="Test Description",
priority="medium", priority="medium",
status="new", status="new",
@@ -62,18 +58,14 @@ def test_get_employee_requests(client: TestClient, employee_token: str, test_emp
data = response.json() data = response.json()
assert isinstance(data, list) assert isinstance(data, list)
assert len(data) > 0 assert len(data) > 0
assert data[0]["request_type"] == "support" assert data[0]["request_type"] == "equipment"
assert data[0]["description"] == "Test Description" assert data[0]["description"] == "Test Description"
def test_admin_get_all_requests(client: TestClient, admin_token: str, test_employee: Employee, db: Session): def test_admin_get_all_requests(client: TestClient, admin_token: str, test_employee: Employee, db: Session):
"""Тест получения всех заявок администратором.""" """Тест получения всех заявок администратором."""
db.add(test_employee)
db.commit()
db.refresh(test_employee)
# Создаем тестовую заявку # Создаем тестовую заявку
request = Request( request = Request(
request_type="support", request_type="equipment",
description="Test Description", description="Test Description",
priority="medium", priority="medium",
status="new", status="new",
@@ -90,23 +82,18 @@ def test_admin_get_all_requests(client: TestClient, admin_token: str, test_emplo
data = response.json() data = response.json()
assert isinstance(data, list) assert isinstance(data, list)
assert len(data) > 0 assert len(data) > 0
assert data[0]["request_type"] == "support" assert data[0]["request_type"] == "equipment"
assert data[0]["description"] == "Test Description" assert data[0]["description"] == "Test Description"
def test_update_request_status(client: TestClient, admin_token: str, test_employee: Employee, db: Session): def test_update_request_status(client: TestClient, admin_token: str, test_employee: Employee, db: Session):
"""Тест обновления статуса заявки.""" """Тест обновления статуса заявки."""
db.add(test_employee)
db.commit()
db.refresh(test_employee)
# Создаем тестовую заявку # Создаем тестовую заявку
request = Request( request = Request(
request_type="support", request_type="equipment",
description="Test Description", description="Test Description",
priority="medium", priority="medium",
status="new", status="new",
employee_id=test_employee.id, employee_id=test_employee.id
department=test_employee.department
) )
db.add(request) db.add(request)
db.commit() db.commit()
@@ -122,35 +109,28 @@ def test_update_request_status(client: TestClient, admin_token: str, test_employ
def test_get_request_statistics(client: TestClient, admin_token: str, test_employee: Employee, db: Session): def test_get_request_statistics(client: TestClient, admin_token: str, test_employee: Employee, db: Session):
"""Тест получения статистики по заявкам.""" """Тест получения статистики по заявкам."""
db.add(test_employee)
db.commit()
db.refresh(test_employee)
# Создаем тестовые заявки с разными статусами # Создаем тестовые заявки с разными статусами
requests = [ requests = [
Request( Request(
request_type="support", request_type="equipment",
description="Test Description", description="Test Description",
priority="medium", priority="medium",
status="new", status="new",
employee_id=test_employee.id, employee_id=test_employee.id
department=test_employee.department
), ),
Request( Request(
request_type="support", request_type="equipment",
description="Test Description", description="Test Description",
priority="high", priority="high",
status="in_progress", status="in_progress",
employee_id=test_employee.id, employee_id=test_employee.id
department=test_employee.department
), ),
Request( Request(
request_type="support", request_type="equipment",
description="Test Description", description="Test Description",
priority="low", priority="low",
status="completed", status="completed",
employee_id=test_employee.id, employee_id=test_employee.id
department=test_employee.department
) )
] ]
for req in requests: for req in requests: