1
0
mirror of https://gitlab.com/MoonTestUse1/AdministrationItDepartmens.git synced 2025-08-14 00:25:46 +02:00

Тесты для бекенда

This commit is contained in:
MoonTestUse1
2025-01-04 03:22:23 +06:00
parent 0d543ed4f6
commit 2bde43c076
28 changed files with 740 additions and 990 deletions

View File

@@ -1,20 +1,36 @@
from pydantic_settings import BaseSettings
from typing import Optional
"""Settings configuration"""
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
DATABASE_URL: str = "postgresql://postgres:postgres123@db:5432/support_db"
REDIS_URL: str = "redis://redis:6379/0"
"""Application settings"""
PROJECT_NAME: str = "Support Service"
VERSION: str = "1.0.0"
API_V1_STR: str = "/api"
# JWT settings
JWT_SECRET_KEY: str = "your-secret-key" # в продакшене использовать сложный ключ
JWT_ALGORITHM: str = "HS256"
# Database
DATABASE_URL: str = "postgresql://postgres:postgres123@db:5432/support_db"
# JWT
SECRET_KEY: str = "your-secret-key"
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
# Telegram settings
TELEGRAM_BOT_TOKEN: Optional[str] = None
TELEGRAM_CHAT_ID: Optional[str] = None
# Redis
REDIS_HOST: str = "redis"
REDIS_PORT: int = 6379
# Admin
ADMIN_USERNAME: str = "admin"
ADMIN_PASSWORD: str = "admin123"
class Config:
env_file = ".env"
# Telegram
TELEGRAM_BOT_TOKEN: str = "your-bot-token"
TELEGRAM_CHAT_ID: str = "your-chat-id"
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=True
)
settings = Settings()

View File

@@ -1,150 +1,72 @@
"""Request CRUD operations"""
"""Requests CRUD operations"""
from sqlalchemy.orm import Session
from sqlalchemy import func, text
from sqlalchemy import func
from typing import Optional, Dict
from ..models.request import Request, RequestStatus
from ..schemas.request import RequestCreate, RequestUpdate
from ..utils.loggers import request_logger
from typing import List, Optional
from enum import Enum
from ..schemas.request import RequestCreate
def create_request(db: Session, request: RequestCreate, employee_id: int):
def create_request(db: Session, request: RequestCreate, employee_id: int) -> Request:
"""Create new request"""
try:
db_request = Request(
title=request.title,
description=request.description,
priority=request.priority.value,
status=request.status.value,
employee_id=employee_id
)
db.add(db_request)
db.commit()
db.refresh(db_request)
request_logger.info(
"Request created",
extra={"request_id": db_request.id}
)
return db_request
except Exception as e:
db.rollback()
request_logger.error(f"Error creating request: {e}", exc_info=True)
raise
def get_request_details(db: Session, request_id: int):
"""Get detailed request information including employee details"""
request = (
db.query(Request)
.join(Request.employee)
.filter(Request.id == request_id)
.first()
db_request = Request(
department=request.department,
request_type=request.request_type,
description=request.description,
priority=request.priority,
status=RequestStatus.NEW,
employee_id=employee_id
)
if not request:
return None
return {
"id": request.id,
"employee_id": request.employee_id,
"employee_last_name": request.employee.last_name,
"employee_first_name": request.employee.first_name,
"title": request.title,
"description": request.description,
"priority": request.priority,
"status": request.status,
"created_at": request.created_at.isoformat()
}
def get_requests(db: Session, skip: int = 0, limit: int = 100) -> List[Request]:
"""Get all requests with pagination"""
return db.query(Request).offset(skip).limit(limit).all()
db.add(db_request)
db.commit()
db.refresh(db_request)
return db_request
def get_request(db: Session, request_id: int) -> Optional[Request]:
"""Get request by ID"""
return db.query(Request).filter(Request.id == request_id).first()
def get_employee_requests(db: Session, employee_id: int, skip: int = 0, limit: int = 100) -> List[Request]:
"""Get requests by employee ID"""
return db.query(Request).filter(Request.employee_id == employee_id).offset(skip).limit(limit).all()
def get_employee_requests(db: Session, employee_id: int) -> list[Request]:
"""Get employee's requests"""
return db.query(Request).filter(Request.employee_id == employee_id).all()
def update_request(db: Session, request_id: int, request: RequestUpdate):
"""Update request"""
db_request = get_request(db, request_id)
if not db_request:
return None
update_data = request.model_dump(exclude_unset=True)
for field, value in update_data.items():
if isinstance(value, Enum):
value = value.value
setattr(db_request, field, value)
db.commit()
db.refresh(db_request)
return db_request
def get_requests(db: Session, status: Optional[RequestStatus] = None, skip: int = 0, limit: int = 100) -> list[Request]:
"""Get all requests with optional status filter"""
query = db.query(Request)
if status:
query = query.filter(Request.status == status)
return query.offset(skip).limit(limit).all()
def delete_request(db: Session, request_id: int):
"""Delete request"""
def update_request_status(db: Session, request_id: int, status: RequestStatus) -> Optional[Request]:
"""Update request status"""
db_request = get_request(db, request_id)
if db_request:
db.delete(db_request)
db_request.status = status
db.commit()
db.refresh(db_request)
return db_request
def get_statistics(db: Session):
"""Get requests statistics"""
# Прямой SQL запрос для проверки таблицы
sql_check = db.execute(text("SELECT * FROM requests")).fetchall()
request_logger.info(f"Direct SQL check - all requests: {sql_check}")
def get_statistics(db: Session) -> Dict:
"""Get request statistics"""
total = db.query(func.count(Request.id)).scalar()
# Проверяем структуру таблицы
table_info = db.execute(text("""
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = 'requests'
""")).fetchall()
request_logger.info(f"Table structure: {table_info}")
# Получаем количество заявок по статусам
status_counts = db.query(
Request.status,
func.count(Request.id)
).group_by(Request.status).all()
# Проверяем все заявки через ORM
all_requests = db.query(Request).all()
request_logger.info(f"ORM check - Found {len(all_requests)} requests")
for req in all_requests:
request_logger.info(
f"Request #{req.id}: "
f"title='{req.title}', "
f"status='{req.status}', "
f"priority='{req.priority}', "
f"employee_id={req.employee_id}"
)
# Подсчитываем статусы
status_counts = {
RequestStatus.NEW.value: 0,
RequestStatus.IN_PROGRESS.value: 0,
RequestStatus.COMPLETED.value: 0,
RequestStatus.REJECTED.value: 0
# Инициализируем словарь всеми возможными статусами
by_status = {
RequestStatus.NEW: 0,
RequestStatus.IN_PROGRESS: 0,
RequestStatus.COMPLETED: 0,
RequestStatus.REJECTED: 0
}
# Прямой подсчет через SQL
for status in RequestStatus:
count = db.execute(
text(f"SELECT COUNT(*) FROM requests WHERE status = :status"),
{"status": status.value}
).scalar()
status_counts[status.value] = count
request_logger.info(f"SQL count for status {status.value}: {count}")
# Обновляем значения из базы
for status, count in status_counts:
by_status[status] = count
result = {
"total": len(all_requests),
"new": status_counts[RequestStatus.NEW.value],
"in_progress": status_counts[RequestStatus.IN_PROGRESS.value],
"completed": status_counts[RequestStatus.COMPLETED.value],
"rejected": status_counts[RequestStatus.REJECTED.value]
}
request_logger.info(f"Status counts: {status_counts}")
request_logger.info(f"Final statistics: {result}")
return result
return {
"total": total or 0,
"by_status": by_status
}

View File

@@ -1,53 +1,40 @@
"""Statistics CRUD operations"""
from sqlalchemy.orm import Session
from sqlalchemy.sql import func
from sqlalchemy import func
from datetime import datetime, timedelta
from ..models.request import Request, RequestStatus
def get_statistics(db: Session, period: str = "week"):
"""Get statistics for the given period"""
try:
# Calculate date range
end_date = datetime.now()
if period == "week":
start_date = end_date - timedelta(days=7)
elif period == "month":
start_date = end_date - timedelta(days=30)
else:
start_date = end_date - timedelta(days=7) # default to week
def get_request_statistics(db: Session):
"""Get request statistics"""
# Общее количество заявок
total_requests = db.query(func.count(Request.id)).scalar()
# Get total requests
total_requests = db.query(func.count(Request.id)).scalar() or 0
# Количество заявок по статусам
status_counts = {
RequestStatus.NEW: 0,
RequestStatus.IN_PROGRESS: 0,
RequestStatus.COMPLETED: 0,
RequestStatus.REJECTED: 0
}
# Get requests by status
requests_by_status = (
db.query(Request.status, func.count(Request.id))
.group_by(Request.status)
.all()
)
# Получаем количество заявок для каждого статуса
status_query = db.query(
Request.status,
func.count(Request.id)
).group_by(Request.status).all()
# Convert to dictionary
status_counts = {
status.name: 0 for status in RequestStatus
}
for status, count in requests_by_status:
status_counts[status.name] = count
for status, count in status_query:
if status in status_counts:
status_counts[status] = count
# Get recent requests
recent_requests = (
db.query(Request)
.filter(Request.created_at >= start_date)
.order_by(Request.created_at.desc())
.limit(5)
.all()
)
# Статистика за последние 7 дней
week_ago = datetime.now() - timedelta(days=7)
recent_requests = db.query(func.count(Request.id)).filter(
Request.created_at >= week_ago
).scalar()
return {
"total_requests": total_requests,
"status_counts": status_counts,
"recent_requests": recent_requests
}
except Exception as e:
print(f"Error getting statistics: {e}")
raise
return {
"total_requests": total_requests or 0,
"by_status": status_counts,
"recent_requests": recent_requests or 0
}

View File

@@ -0,0 +1,31 @@
"""Token CRUD operations"""
from sqlalchemy.orm import Session
from typing import Optional
from ..models.token import Token
def create_token(db: Session, token: str, user_id: int) -> Token:
"""Create new token"""
db_token = Token(token=token, user_id=user_id)
db.add(db_token)
db.commit()
db.refresh(db_token)
return db_token
def get_token(db: Session, token: str) -> Optional[Token]:
"""Get token by value"""
return db.query(Token).filter(Token.token == token).first()
def delete_token(db: Session, token: str) -> bool:
"""Delete token"""
db_token = get_token(db, token)
if db_token:
db.delete(db_token)
db.commit()
return True
return False
def delete_user_tokens(db: Session, user_id: int) -> bool:
"""Delete all tokens for a user"""
db.query(Token).filter(Token.user_id == user_id).delete()
db.commit()
return True

View File

@@ -1,17 +1,20 @@
"""Database configuration"""
import os
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy.orm import sessionmaker
from .core.config import settings
from .db.base import Base
SQLALCHEMY_DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://postgres:postgres123@db:5432/support_db")
# Для создания таблиц импортируем модели
from .models.employee import Employee # noqa
from .models.request import Request # noqa
from .models.token import Token # noqa
SQLALCHEMY_DATABASE_URL = settings.DATABASE_URL
engine = create_engine(SQLALCHEMY_DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
def get_db():
"""Get database session"""
db = SessionLocal()
try:
yield db

4
backend/app/db/base.py Normal file
View File

@@ -0,0 +1,4 @@
"""Base class for SQLAlchemy models"""
from sqlalchemy.orm import declarative_base
Base = declarative_base()

View File

@@ -1,25 +1,21 @@
"""Employee model"""
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy.sql import func
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import relationship
from ..database import Base
from ..db.base import Base
class Employee(Base):
__tablename__ = "employees"
__table_args__ = {'extend_existing': True}
id = Column(Integer, primary_key=True, index=True)
first_name = Column(String, nullable=False)
last_name = Column(String, nullable=False)
department = Column(String, nullable=False)
office = Column(String, nullable=False)
hashed_password = Column(String, nullable=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
first_name = Column(String, index=True)
last_name = Column(String, index=True)
department = Column(String)
office = Column(String)
hashed_password = Column(String)
# Определяем отношение к Request
requests = relationship(
"Request",
back_populates="employee",
lazy="dynamic",
cascade="all, delete-orphan"
)

View File

@@ -1,17 +1,17 @@
"""Request model"""
from enum import Enum
from sqlalchemy import Column, Integer, String, ForeignKey, DateTime
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from enum import Enum as PyEnum
from ..database import Base
from ..db.base import Base
class RequestStatus(str, PyEnum):
class RequestStatus(str, Enum):
NEW = "new"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
REJECTED = "rejected"
class RequestPriority(str, PyEnum):
class RequestPriority(str, Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
@@ -20,13 +20,13 @@ class Request(Base):
__tablename__ = "requests"
id = Column(Integer, primary_key=True, index=True)
title = Column(String, nullable=False)
description = Column(String, nullable=False)
status = Column(String, nullable=False, default=RequestStatus.NEW)
priority = Column(String, nullable=False)
department = Column(String, index=True)
request_type = Column(String, index=True)
description = Column(String)
priority = Column(String)
status = Column(String, default=RequestStatus.NEW)
employee_id = Column(Integer, ForeignKey("employees.id"))
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
# Определяем отношение к Employee
employee = relationship("Employee", back_populates="requests")

View File

@@ -1,12 +1,12 @@
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey
"""Token model"""
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy.sql import func
from ..database import Base
from ..db.base import Base
class Token(Base):
__tablename__ = "tokens"
id = Column(Integer, primary_key=True, index=True)
access_token = Column(String, unique=True, index=True)
employee_id = Column(Integer, ForeignKey("employees.id", ondelete="CASCADE"))
created_at = Column(DateTime(timezone=True), server_default=func.now())
expires_at = Column(DateTime(timezone=True))
token = Column(String, unique=True, index=True)
user_id = Column(Integer, index=True) # -1 для админа, остальные для сотрудников
created_at = Column(DateTime(timezone=True), server_default=func.now())

View File

@@ -1,4 +1,4 @@
"""Admin routes"""
"""Admin router"""
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List
@@ -6,35 +6,24 @@ from ..database import get_db
from ..crud import requests, statistics
from ..schemas.request import Request
from ..utils.auth import get_current_admin
from ..utils.loggers import request_logger
router = APIRouter()
@router.get("/statistics")
async def get_statistics(period: str = "week", db: Session = Depends(get_db)):
"""Get request statistics"""
try:
return statistics.get_statistics(db, period)
except Exception as e:
request_logger.error(f"Error getting statistics: {e}")
raise HTTPException(status_code=500, detail="Ошибка при получении статистики")
@router.get("/requests", response_model=List[Request])
async def get_all_requests(
skip: int = 0,
limit: int = 100,
def get_statistics(
db: Session = Depends(get_db),
_: dict = Depends(get_current_admin)
):
"""
Получить список всех заявок (только для админа)
"""
try:
requests_list = requests.get_requests(db, skip=skip, limit=limit)
return requests_list
except Exception as e:
print(f"Error getting requests: {str(e)}")
raise HTTPException(status_code=500, detail="Internal server error")
"""Get system statistics"""
return statistics.get_request_statistics(db)
@router.get("/requests", response_model=List[Request])
def get_all_requests(
db: Session = Depends(get_db),
_: dict = Depends(get_current_admin)
):
"""Get all requests"""
return requests.get_requests(db)
@router.get("/requests/{request_id}", response_model=Request)
async def get_request_by_id(

View File

@@ -1,47 +1,33 @@
"""Employees router"""
"""Employee router"""
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import List
from ..database import get_db
from ..crud import employees
from ..schemas.employee import Employee, EmployeeCreate, EmployeeUpdate
from ..utils.auth import get_current_admin
from ..utils.auth import get_password_hash
from ..utils.auth import get_current_admin, get_password_hash
router = APIRouter()
@router.post("", response_model=Employee)
@router.post("/", response_model=Employee)
def create_employee(
employee: EmployeeCreate,
db: Session = Depends(get_db),
_: dict = Depends(get_current_admin)
):
"""
Создание нового сотрудника (только для админа)
"""
# Хэшируем пароль
"""Create new employee"""
hashed_password = get_password_hash(employee.password)
# Создаем сотрудника
db_employee = employees.create_employee(
db=db,
employee=employee,
hashed_password=hashed_password
)
return db_employee
return employees.create_employee(db, employee, hashed_password)
@router.get("", response_model=List[Employee])
@router.get("/", response_model=List[Employee])
def get_employees(
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db),
_: dict = Depends(get_current_admin)
):
"""
Получение списка всех сотрудников (только для админа)
"""
employees_list = employees.get_employees(db, skip=skip, limit=limit)
return employees_list
"""Get all employees"""
return employees.get_employees(db, skip=skip, limit=limit)
@router.get("/{employee_id}", response_model=Employee)
def get_employee(
@@ -49,10 +35,8 @@ def get_employee(
db: Session = Depends(get_db),
_: dict = Depends(get_current_admin)
):
"""
Получение информации о сотруднике по ID (только для админа)
"""
db_employee = employees.get_employee(db, employee_id=employee_id)
"""Get employee by ID"""
db_employee = employees.get_employee(db, employee_id)
if db_employee is None:
raise HTTPException(status_code=404, detail="Employee not found")
return db_employee
@@ -64,35 +48,20 @@ def update_employee(
db: Session = Depends(get_db),
_: dict = Depends(get_current_admin)
):
"""
Обновление информации о сотруднике (только для админа)
"""
db_employee = employees.get_employee(db, employee_id=employee_id)
"""Update employee data"""
db_employee = employees.update_employee(db, employee_id, employee)
if db_employee is None:
raise HTTPException(status_code=404, detail="Employee not found")
# Если указан новый пароль, хэшируем его
if employee.password:
employee.password = get_password_hash(employee.password)
updated_employee = employees.update_employee(
db=db,
employee_id=employee_id,
employee=employee
)
return updated_employee
return db_employee
@router.delete("/{employee_id}")
@router.delete("/{employee_id}", response_model=Employee)
def delete_employee(
employee_id: int,
db: Session = Depends(get_db),
_: dict = Depends(get_current_admin)
):
"""
Удаление сотрудника (только для админа)
"""
db_employee = employees.get_employee(db, employee_id=employee_id)
"""Delete employee"""
db_employee = employees.delete_employee(db, employee_id)
if db_employee is None:
raise HTTPException(status_code=404, detail="Employee not found")
employees.delete_employee(db=db, employee_id=employee_id)
return {"message": "Employee deleted successfully"}
return db_employee

View File

@@ -1,107 +1,67 @@
"""Requests router"""
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from typing import List
from typing import List, Optional
from ..database import get_db
from ..crud import requests
from ..schemas.request import Request, RequestCreate, RequestUpdate, RequestStatistics
from ..schemas.request import Request, RequestCreate, RequestUpdate
from ..models.request import RequestStatus
from ..utils.auth import get_current_employee, get_current_admin
from ..utils.telegram import notify_new_request
from ..utils.loggers import request_logger
router = APIRouter()
@router.post("", response_model=Request)
@router.post("/", response_model=Request)
async def create_request(
def create_request(
request: RequestCreate,
db: Session = Depends(get_db),
current_employee: dict = Depends(get_current_employee)
):
"""
Создание новой заявки
"""
# Логируем входящие данные
request_logger.info(
"Creating new request",
extra={
"request_data": request.model_dump(),
"employee_id": current_employee["id"]
}
)
# Проверяем, что все поля заполнены правильно
request_logger.info(f"Request title: {request.title}")
request_logger.info(f"Request description: {request.description}")
request_logger.info(f"Request priority: {request.priority} (type: {type(request.priority)})")
request_logger.info(f"Request status: {request.status} (type: {type(request.status)})")
db_request = requests.create_request(db=db, request=request, employee_id=current_employee["id"])
# Логируем созданную заявку
request_logger.info(
"Request created successfully",
extra={
"request_id": db_request.id,
"status": db_request.status,
"priority": db_request.priority
}
)
await notify_new_request(db_request.id)
return db_request
"""Create new request"""
return requests.create_request(db, request, current_employee["id"])
@router.get("", response_model=List[Request])
@router.get("/", response_model=List[Request])
@router.get("/my", response_model=List[Request])
def get_employee_requests(
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db),
current_employee: dict = Depends(get_current_employee)
):
"""
Получение списка заявок текущего сотрудника
"""
return requests.get_employee_requests(db, employee_id=current_employee["id"], skip=skip, limit=limit)
"""Get current employee's requests"""
return requests.get_employee_requests(db, current_employee["id"])
@router.get("/statistics", response_model=RequestStatistics)
def get_request_statistics(
@router.get("/admin", response_model=List[Request])
def get_all_requests(
status: Optional[RequestStatus] = Query(None),
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db),
_: dict = Depends(get_current_admin)
):
"""
Получение статистики по заявкам (только для админа)
"""
return requests.get_statistics(db)
"""Get all requests (admin only)"""
return requests.get_requests(db, status=status, skip=skip, limit=limit)
@router.put("/{request_id}", response_model=Request)
def update_request(
@router.patch("/{request_id}/status", response_model=Request)
def update_request_status(
request_id: int,
request_update: RequestUpdate,
db: Session = Depends(get_db),
_: dict = Depends(get_current_admin)
):
"""
Обновление статуса заявки (только для админа)
"""
db_request = requests.get_request(db, request_id=request_id)
"""Update request status (admin only)"""
db_request = requests.update_request_status(db, request_id, request_update.status)
if db_request is None:
raise HTTPException(status_code=404, detail="Request not found")
return requests.update_request(db=db, request_id=request_id, request=request_update)
return db_request
@router.delete("/{request_id}")
def delete_request(
request_id: int,
@router.get("/statistics")
def get_request_statistics(
db: Session = Depends(get_db),
_: dict = Depends(get_current_admin)
):
"""
Удаление заявки (только для админа)
"""
db_request = requests.get_request(db, request_id=request_id)
if db_request is None:
raise HTTPException(status_code=404, detail="Request not found")
requests.delete_request(db=db, request_id=request_id)
return {"message": "Request deleted successfully"}
"""Get request statistics (admin only)"""
stats = requests.get_statistics(db)
return {
"total": stats["total"],
"by_status": {
status: count
for status, count in stats["by_status"].items()
}
}

View File

@@ -2,35 +2,15 @@
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from ..database import get_db
from ..models.request import Request, RequestStatus
from ..utils.loggers import request_logger
from ..crud import statistics
from ..utils.auth import get_current_admin
router = APIRouter()
@router.get("/")
def get_statistics(db: Session = Depends(get_db)):
"""Get request statistics"""
# Получаем общее количество заявок
total = db.query(Request).count()
request_logger.info(f"Total requests: {total}")
# Получаем количество заявок по статусам
new_requests = db.query(Request).filter(Request.status == RequestStatus.NEW.value).count()
in_progress = db.query(Request).filter(Request.status == RequestStatus.IN_PROGRESS.value).count()
completed = db.query(Request).filter(Request.status == RequestStatus.COMPLETED.value).count()
rejected = db.query(Request).filter(Request.status == RequestStatus.REJECTED.value).count()
request_logger.info(f"Status counts - new: {new_requests}, in_progress: {in_progress}, completed: {completed}, rejected: {rejected}")
result = {
"total_requests": total,
"by_status": {
"new": new_requests,
"in_progress": in_progress,
"completed": completed,
"rejected": rejected
}
}
request_logger.info(f"Returning statistics: {result}")
return result
def get_statistics(
db: Session = Depends(get_db),
_: dict = Depends(get_current_admin)
):
"""Get system statistics"""
return statistics.get_request_statistics(db)

View File

@@ -1,6 +1,5 @@
"""Employee schemas"""
from pydantic import BaseModel, ConfigDict
from typing import Optional
class EmployeeBase(BaseModel):
first_name: str
@@ -8,14 +7,15 @@ class EmployeeBase(BaseModel):
department: str
office: str
model_config = ConfigDict(from_attributes=True)
class EmployeeCreate(EmployeeBase):
password: str
class EmployeeUpdate(EmployeeBase):
password: Optional[str] = None
pass
class Employee(EmployeeBase):
model_config = ConfigDict(from_attributes=True)
id: int
hashed_password: str
model_config = ConfigDict(from_attributes=True)

View File

@@ -1,47 +1,29 @@
"""Request schemas"""
from datetime import datetime
from enum import Enum
from pydantic import BaseModel, ConfigDict
from typing import Optional
class RequestStatus(str, Enum):
NEW = "new"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
REJECTED = "rejected"
class RequestPriority(str, Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
from datetime import datetime
from ..models.request import RequestStatus, RequestPriority
class RequestBase(BaseModel):
title: str
department: str
request_type: str
description: str
priority: RequestPriority
status: RequestStatus = RequestStatus.NEW
model_config = ConfigDict(from_attributes=True)
class RequestCreate(RequestBase):
pass
class RequestUpdate(BaseModel):
status: RequestStatus
model_config = ConfigDict(from_attributes=True)
class Request(RequestBase):
id: int
status: RequestStatus
employee_id: int
created_at: datetime
updated_at: Optional[datetime] = None
model_config = ConfigDict(from_attributes=True)
class RequestStatistics(BaseModel):
total: int
new: int
in_progress: int
completed: int
rejected: int
model_config = ConfigDict(from_attributes=True)

View File

@@ -0,0 +1,13 @@
"""Token schemas"""
from pydantic import BaseModel, ConfigDict
class Token(BaseModel):
access_token: str
token_type: str
model_config = ConfigDict(from_attributes=True)
class TokenData(BaseModel):
user_id: int | None = None
model_config = ConfigDict(from_attributes=True)

View File

@@ -1,22 +1,35 @@
import os
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from fastapi.testclient import TestClient
from unittest.mock import Mock, patch
from ..database import Base, get_db
from ..main import app
from ..utils.jwt import create_and_save_token
from ..crud import employees
from ..utils.auth import get_password_hash
# Получаем URL базы данных из переменной окружения или используем значение по умолчанию
SQLALCHEMY_DATABASE_URL = os.getenv(
"DATABASE_URL",
"postgresql://postgres:postgres@localhost:5432/support_test"
)
engine = create_engine(SQLALCHEMY_DATABASE_URL)
# Используем SQLite для тестов
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Мок для Redis
class MockRedis:
def __init__(self):
self.data = {}
def setex(self, name, time, value):
self.data[name] = value
return True
def get(self, name):
return self.data.get(name)
@pytest.fixture(autouse=True)
def mock_redis():
with patch("app.utils.jwt.redis", MockRedis()):
yield
@pytest.fixture(scope="function")
def test_db():
# Создаем таблицы
@@ -33,14 +46,14 @@ def test_db():
@pytest.fixture(scope="function")
def test_employee(test_db):
hashed_password = get_password_hash("testpass123")
employee_data = {
"first_name": "Test",
"last_name": "User",
"department": "IT",
"office": "101",
"password": "testpass123"
"office": "101"
}
employee = employees.create_employee(test_db, employee_data)
employee = employees.create_employee(test_db, employee_data, hashed_password=hashed_password)
return employee
@pytest.fixture(scope="function")

View File

@@ -16,9 +16,9 @@ def test_login_success(test_db: Session):
"first_name": "Test",
"last_name": "User",
"department": "IT",
"office": "101",
"password": "testpass123"
}
"office": "101"
},
hashed_password=hashed_password
)
response = client.post(
@@ -34,10 +34,23 @@ def test_login_success(test_db: Session):
assert response.json()["token_type"] == "bearer"
def test_login_wrong_password(test_db: Session):
# Создаем тестового сотрудника с известным паролем
hashed_password = get_password_hash("testpass123")
employee = employees.create_employee(
test_db,
{
"first_name": "Test",
"last_name": "WrongPass",
"department": "IT",
"office": "101"
},
hashed_password=hashed_password
)
response = client.post(
"/api/auth/login",
data={
"username": "User",
"username": "WrongPass",
"password": "wrongpass"
}
)

View File

@@ -3,7 +3,7 @@ from fastapi.testclient import TestClient
from sqlalchemy.orm import Session
from ..main import app
from ..crud import employees
from ..utils.auth import verify_password
from ..utils.auth import verify_password, get_password_hash
client = TestClient(app)
@@ -49,15 +49,16 @@ def test_create_employee_unauthorized():
def test_get_employees(test_db: Session, admin_token: str):
# Создаем несколько тестовых сотрудников
for i in range(3):
hashed_password = get_password_hash("testpass123")
employees.create_employee(
test_db,
{
"first_name": f"Test{i}",
"last_name": f"User{i}",
"department": "IT",
"office": f"10{i}",
"password": "testpass123"
}
"office": f"10{i}"
},
hashed_password=hashed_password
)
response = client.get(
@@ -71,15 +72,16 @@ def test_get_employees(test_db: Session, admin_token: str):
def test_get_employee_by_id(test_db: Session, admin_token: str):
# Создаем тестового сотрудника
hashed_password = get_password_hash("testpass123")
employee = employees.create_employee(
test_db,
{
"first_name": "Test",
"last_name": "User",
"department": "IT",
"office": "101",
"password": "testpass123"
}
"office": "101"
},
hashed_password=hashed_password
)
response = client.get(
@@ -95,15 +97,16 @@ def test_get_employee_by_id(test_db: Session, admin_token: str):
def test_update_employee(test_db: Session, admin_token: str):
# Создаем тестового сотрудника
hashed_password = get_password_hash("testpass123")
employee = employees.create_employee(
test_db,
{
"first_name": "Test",
"last_name": "User",
"department": "IT",
"office": "101",
"password": "testpass123"
}
"office": "101"
},
hashed_password=hashed_password
)
update_data = {
@@ -124,15 +127,16 @@ def test_update_employee(test_db: Session, admin_token: str):
def test_delete_employee(test_db: Session, admin_token: str):
# Создаем тестового сотрудника
hashed_password = get_password_hash("testpass123")
employee = employees.create_employee(
test_db,
{
"first_name": "Test",
"last_name": "User",
"department": "IT",
"office": "101",
"password": "testpass123"
}
"office": "101"
},
hashed_password=hashed_password
)
response = client.delete(

View File

@@ -2,17 +2,18 @@ import pytest
from fastapi.testclient import TestClient
from sqlalchemy.orm import Session
from ..main import app
from ..models.request import RequestStatus, RequestPriority
from ..crud import requests, employees
from ..models.request import RequestStatus
from ..utils.auth import get_password_hash
client = TestClient(app)
def test_create_request(test_db: Session, test_token: str):
request_data = {
"title": "Test Request",
"description": "Test Description",
"priority": "low",
"status": "new"
"department": "IT",
"request_type": "hardware",
"priority": RequestPriority.LOW.value,
"description": "Test Description"
}
response = client.post(
@@ -23,16 +24,17 @@ def test_create_request(test_db: Session, test_token: str):
assert response.status_code == 200
data = response.json()
assert data["title"] == request_data["title"]
assert data["department"] == request_data["department"]
assert data["description"] == request_data["description"]
assert data["priority"] == request_data["priority"]
assert data["priority"] == RequestPriority.LOW.value
assert data["status"] == RequestStatus.NEW.value
def test_create_request_unauthorized():
request_data = {
"title": "Test Request",
"description": "Test Description",
"priority": "low"
"department": "IT",
"request_type": "hardware",
"priority": RequestPriority.LOW.value,
"description": "Test Description"
}
response = client.post(
@@ -48,10 +50,10 @@ def test_get_employee_requests(test_db: Session, test_token: str, test_employee_
requests.create_request(
test_db,
{
"title": f"Test Request {i}",
"description": f"Test Description {i}",
"priority": "low",
"status": "new"
"department": "IT",
"request_type": f"hardware_{i}",
"priority": RequestPriority.LOW.value,
"description": f"Test Description {i}"
},
test_employee_id
)
@@ -67,32 +69,34 @@ def test_get_employee_requests(test_db: Session, test_token: str, test_employee_
assert all(req["employee_id"] == test_employee_id for req in data)
def test_update_request_status(test_db: Session, admin_token: str):
# Создаем тестовую заявку
# Создаем тестового сотрудника
hashed_password = get_password_hash("testpass123")
employee = employees.create_employee(
test_db,
{
"first_name": "Test",
"last_name": "User",
"department": "IT",
"office": "101",
"password": "testpass123"
}
"office": "101"
},
hashed_password=hashed_password
)
# Создаем тестовую заявку
request = requests.create_request(
test_db,
{
"title": "Test Request",
"description": "Test Description",
"priority": "low",
"status": "new"
"department": "IT",
"request_type": "hardware",
"priority": RequestPriority.LOW.value,
"description": "Test Description"
},
employee.id
)
response = client.put(
f"/api/requests/{request.id}",
json={"status": "in_progress"},
json={"status": RequestStatus.IN_PROGRESS.value},
headers={"Authorization": f"Bearer {admin_token}"}
)
@@ -107,8 +111,9 @@ def test_get_request_statistics(test_db: Session, admin_token: str):
assert response.status_code == 200
data = response.json()
assert "total" in data
assert "new" in data
assert "in_progress" in data
assert "completed" in data
assert "rejected" in data
assert "total_requests" in data
assert "by_status" in data
assert RequestStatus.NEW.value in data["by_status"]
assert RequestStatus.IN_PROGRESS.value in data["by_status"]
assert RequestStatus.COMPLETED.value in data["by_status"]
assert RequestStatus.REJECTED.value in data["by_status"]

View File

@@ -1,3 +1,4 @@
"""JWT utilities"""
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
@@ -9,20 +10,24 @@ from ..core.config import settings
from ..models.token import Token
from ..crud.employees import get_employee
redis = Redis.from_url(settings.REDIS_URL, decode_responses=True)
redis = Redis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
decode_responses=True
)
def create_access_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM)
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def verify_token(token: str, db: Session) -> dict:
try:
payload = jwt.decode(token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM])
employee_id: int = payload.get("sub")
if employee_id is None:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
user_id: int = payload.get("sub")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
@@ -31,11 +36,11 @@ def verify_token(token: str, db: Session) -> dict:
# Проверяем токен в Redis
if not redis.get(f"token:{token}"):
# Если токена нет в Redis, проверяем в БД
db_token = db.query(Token).filter(Token.access_token == token).first()
if not db_token or db_token.expires_at < datetime.utcnow():
db_token = db.query(Token).filter(Token.token == token).first()
if not db_token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has expired or is invalid",
detail="Token is invalid",
)
# Если токен валиден, кэшируем его в Redis
redis.setex(
@@ -51,16 +56,14 @@ def verify_token(token: str, db: Session) -> dict:
detail="Could not validate credentials",
)
def create_and_save_token(employee_id: int, db: Session) -> str:
def create_and_save_token(user_id: int, db: Session) -> str:
# Создаем JWT токен
access_token = create_access_token({"sub": str(employee_id)})
expires_at = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token({"sub": str(user_id)})
# Сохраняем в БД
db_token = Token(
access_token=access_token,
employee_id=employee_id,
expires_at=expires_at
token=access_token,
user_id=user_id
)
db.add(db_token)
db.commit()
@@ -77,6 +80,8 @@ def create_and_save_token(employee_id: int, db: Session) -> str:
def get_current_employee(token: str, db: Session):
payload = verify_token(token, db)
employee_id = int(payload.get("sub"))
if employee_id == -1: # Для админа
return {"is_admin": True}
employee = get_employee(db, employee_id)
if employee is None:
raise HTTPException(

View File

@@ -1,8 +1,18 @@
from pydantic import BaseModel
"""Schemas for the application"""
from pydantic import BaseModel, ConfigDict
from datetime import datetime
from typing import Optional
from models import RequestStatus
class Token(BaseModel):
access_token: str
token_type: str
model_config = ConfigDict(from_attributes=True)
class TokenData(BaseModel):
user_id: int | None = None
model_config = ConfigDict(from_attributes=True)
class EmployeeBase(BaseModel):
last_name: str
@@ -10,35 +20,28 @@ class EmployeeBase(BaseModel):
department: str
office: str
model_config = ConfigDict(from_attributes=True)
class EmployeeCreate(EmployeeBase):
password: str
class Employee(EmployeeBase):
id: int
created_at: datetime
class Config:
from_attributes = True
class RequestBase(BaseModel):
department: str
request_type: str
priority: str
description: str
model_config = ConfigDict(from_attributes=True)
class RequestCreate(RequestBase):
employee_id: int
class Request(RequestBase):
id: int
status: RequestStatus
created_at: datetime
employee_id: int
class Config:
from_attributes = True

14
backend/setup.py Normal file
View File

@@ -0,0 +1,14 @@
from setuptools import setup, find_packages
setup(
name="support-app",
version="0.1",
packages=find_packages(),
install_requires=[
"fastapi",
"sqlalchemy",
"pytest",
"pytest-asyncio",
"pytest-cov"
],
)

Binary file not shown.

View File

@@ -1,34 +1,101 @@
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from fastapi.testclient import TestClient
from app.database import Base, get_db
from unittest.mock import MagicMock
from app.db.base import Base
from app.database import get_db
from app.main import app
from app.utils.jwt import create_and_save_token, redis
from app.crud import employees
from app.utils.auth import get_password_hash
from app.models.token import Token
from app.models.employee import Employee
from app.models.request import Request
from app.schemas.employee import EmployeeCreate
SQLALCHEMY_DATABASE_URL = "postgresql://postgres:postgres123@postgres:5432/support_db_test"
engine = create_engine(SQLALCHEMY_DATABASE_URL)
# Используем SQLite для тестов
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
@pytest.fixture
def db_session():
Base.metadata.drop_all(bind=engine) # Сначала удаляем все таблицы
Base.metadata.create_all(bind=engine) # Создаем таблицы заново
session = TestingSessionLocal()
try:
yield session
finally:
session.close()
Base.metadata.drop_all(bind=engine)
# Создаем мок для Redis
class RedisMock:
def __init__(self):
self.data = {}
@pytest.fixture
def client(db_session):
def override_get_db():
try:
yield db_session
finally:
db_session.close()
app.dependency_overrides[get_db] = override_get_db
yield TestClient(app)
del app.dependency_overrides[get_db]
def setex(self, name, time, value):
self.data[name] = value
return True
def get(self, name):
return self.data.get(name)
def delete(self, name):
if name in self.data:
del self.data[name]
return True
@pytest.fixture(autouse=True)
def mock_redis(monkeypatch):
redis_mock = RedisMock()
monkeypatch.setattr("app.utils.jwt.redis", redis_mock)
return redis_mock
@pytest.fixture(scope="function")
def test_db():
# Удаляем все таблицы
Base.metadata.drop_all(bind=engine)
# Создаем все таблицы заново
Base.metadata.create_all(bind=engine)
# Создаем сессию
db = TestingSessionLocal()
try:
yield db
finally:
db.close()
@pytest.fixture(scope="function")
def test_employee(test_db):
hashed_password = get_password_hash("testpass123")
employee_data = EmployeeCreate(
first_name="Test",
last_name="User",
department="IT",
office="101",
password="testpass123"
)
employee = employees.create_employee(test_db, employee_data, hashed_password)
return employee
@pytest.fixture(scope="function")
def test_token(test_db, test_employee):
token = create_and_save_token(test_employee.id, test_db)
return token
@pytest.fixture(scope="function")
def test_auth_header(test_token):
return {"Authorization": f"Bearer {test_token}"}
@pytest.fixture(scope="function")
def admin_token(test_db):
token = create_and_save_token(-1, test_db) # -1 для админа
return token
@pytest.fixture(scope="function")
def admin_auth_header(admin_token):
return {"Authorization": f"Bearer {admin_token}"}
@pytest.fixture(scope="function")
def test_employee_id(test_employee):
return test_employee.id
# Переопределяем зависимость для получения БД
def override_get_db():
db = TestingSessionLocal()
try:
yield db
finally:
db.close()
app.dependency_overrides[get_db] = override_get_db

View File

@@ -1,70 +1,98 @@
import pytest
from app.models.employee import Employee
from app.utils.auth import get_password_hash
from fastapi.testclient import TestClient
from sqlalchemy.orm import Session
from app.main import app
from app.crud import employees
from app.utils.auth import verify_password, get_password_hash
from app.schemas.employee import EmployeeCreate
def test_admin_login(client):
"""Test admin login endpoint"""
response = client.post("/api/auth/admin", json={
"username": "admin",
"password": "admin123"
})
client = TestClient(app)
def test_login_success(test_db: Session):
# Создаем тестового сотрудника
hashed_password = get_password_hash("testpass123")
employee_data = EmployeeCreate(
first_name="Test",
last_name="User",
department="IT",
office="101",
password="testpass123"
)
employee = employees.create_employee(test_db, employee_data, hashed_password)
response = client.post(
"/api/auth/login",
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={
"username": "User",
"password": "testpass123"
}
)
assert response.status_code == 200
assert "access_token" in response.json()
assert response.json()["token_type"] == "bearer"
def test_admin_login_invalid_credentials(client):
"""Test admin login with invalid credentials"""
response = client.post("/api/auth/admin", json={
"username": "wrong",
"password": "wrong"
})
assert response.status_code == 401
assert response.json()["detail"] == "Invalid credentials"
def test_employee_login(client, db_session):
"""Test employee login endpoint"""
# Create test employee
hashed_password = get_password_hash("test123")
employee = Employee(
def test_login_wrong_password(test_db: Session):
# Создаем тестового сотрудника
hashed_password = get_password_hash("testpass123")
employee_data = EmployeeCreate(
first_name="Test",
last_name="User",
department="IT",
office="A101",
password=hashed_password
office="101",
password="testpass123"
)
db_session.add(employee)
db_session.commit()
employees.create_employee(test_db, employee_data, hashed_password)
response = client.post(
"/api/auth/login",
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={
"username": "User",
"password": "wrongpass"
}
)
assert response.status_code == 401
assert "detail" in response.json()
# Try to login
response = client.post("/api/auth/login", json={
"last_name": "User",
"password": "test123"
})
def test_login_nonexistent_user(test_db: Session):
response = client.post(
"/api/auth/login",
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={
"username": "NonExistent",
"password": "testpass123"
}
)
assert response.status_code == 401
assert "detail" in response.json()
def test_admin_login_success():
response = client.post(
"/api/auth/admin/login",
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={
"username": "admin",
"password": "admin123"
}
)
assert response.status_code == 200
data = response.json()
assert data["first_name"] == "Test"
assert data["last_name"] == "User"
assert data["department"] == "IT"
assert data["office"] == "A101"
assert "access_token" in data
assert "access_token" in response.json()
assert response.json()["token_type"] == "bearer"
def test_employee_login_invalid_credentials(client, db_session):
"""Test employee login with invalid credentials"""
# Create test employee
hashed_password = get_password_hash("test123")
employee = Employee(
first_name="Test",
last_name="User",
department="IT",
office="A101",
password=hashed_password
def test_admin_login_wrong_password():
response = client.post(
"/api/auth/admin/login",
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={
"username": "admin",
"password": "wrongpass"
}
)
db_session.add(employee)
db_session.commit()
# Try to login with wrong password
response = client.post("/api/auth/login", json={
"last_name": "User",
"password": "wrong"
})
assert response.status_code == 401
assert response.json()["detail"] == "Неверный пароль"
assert "detail" in response.json()

View File

@@ -1,18 +1,15 @@
import pytest
from app.models.employee import Employee
from fastapi.testclient import TestClient
from sqlalchemy.orm import Session
from app.main import app
from app.crud import employees
from app.utils.auth import get_password_hash
from app.schemas.employee import EmployeeCreate
def test_create_employee(client):
client = TestClient(app)
def test_create_employee(test_db: Session, admin_auth_header):
"""Test creating a new employee"""
# Login as admin
admin_response = client.post("/api/auth/admin", json={
"username": "admin",
"password": "admin123"
})
assert admin_response.status_code == 200
admin_token = admin_response.json()["access_token"]
# Create employee
employee_data = {
"first_name": "John",
"last_name": "Doe",
@@ -20,8 +17,12 @@ def test_create_employee(client):
"office": "B205",
"password": "test123"
}
headers = {"Authorization": f"Bearer {admin_token}"}
response = client.post("/api/employees/", json=employee_data, headers=headers)
response = client.post(
"/api/employees/",
json=employee_data,
headers=admin_auth_header
)
assert response.status_code == 200
data = response.json()
@@ -31,48 +32,20 @@ def test_create_employee(client):
assert data["office"] == employee_data["office"]
assert "password" not in data
def test_get_employees(client, db_session):
def test_get_employees(test_db: Session, test_employee, admin_auth_header):
"""Test getting list of employees"""
# Create test employee
hashed_password = get_password_hash("test123")
employee = Employee(
first_name="Test",
last_name="User",
department="IT",
office="A101",
password=hashed_password
)
db_session.add(employee)
db_session.commit()
# Сохраняем значения для проверки
expected_first_name = employee.first_name
expected_last_name = employee.last_name
expected_department = employee.department
expected_office = employee.office
# Login as admin
admin_response = client.post("/api/auth/admin", json={
"username": "admin",
"password": "admin123"
})
assert admin_response.status_code == 200
admin_token = admin_response.json()["access_token"]
# Get employees list
headers = {"Authorization": f"Bearer {admin_token}"}
response = client.get("/api/employees/", headers=headers)
response = client.get("/api/employees/", headers=admin_auth_header)
assert response.status_code == 200
data = response.json()
assert len(data) == 1
assert data[0]["first_name"] == expected_first_name
assert data[0]["last_name"] == expected_last_name
assert data[0]["department"] == expected_department
assert data[0]["office"] == expected_office
assert len(data) >= 1
assert data[0]["first_name"] == test_employee.first_name
assert data[0]["last_name"] == test_employee.last_name
assert data[0]["department"] == test_employee.department
assert data[0]["office"] == test_employee.office
assert "password" not in data[0]
def test_create_employee_unauthorized(client):
def test_create_employee_unauthorized(test_db: Session):
"""Test creating employee without authorization"""
employee_data = {
"first_name": "John",
@@ -82,82 +55,42 @@ def test_create_employee_unauthorized(client):
"password": "test123"
}
response = client.post("/api/employees/", json=employee_data)
assert response.status_code == 401
assert response.status_code == 401 # Unauthorized
def test_get_employees_unauthorized(client):
def test_get_employees_unauthorized(test_db: Session):
"""Test getting employees list without authorization"""
response = client.get("/api/employees/")
assert response.status_code == 401
assert response.status_code == 401 # Unauthorized
def test_get_employee_by_id(client, db_session):
def test_get_employee_by_id(test_db: Session, test_employee, admin_auth_header):
"""Test getting employee by ID"""
# Create test employee
hashed_password = get_password_hash("test123")
employee = Employee(
first_name="Test",
last_name="User",
department="IT",
office="A101",
password=hashed_password
response = client.get(
f"/api/employees/{test_employee.id}",
headers=admin_auth_header
)
db_session.add(employee)
db_session.commit()
employee_id = employee.id
# Login as admin
admin_response = client.post("/api/auth/admin", json={
"username": "admin",
"password": "admin123"
})
assert admin_response.status_code == 200
admin_token = admin_response.json()["access_token"]
# Get employee
headers = {"Authorization": f"Bearer {admin_token}"}
response = client.get(f"/api/employees/{employee_id}", headers=headers)
assert response.status_code == 200
data = response.json()
assert data["first_name"] == "Test"
assert data["last_name"] == "User"
assert data["department"] == "IT"
assert data["office"] == "A101"
assert data["first_name"] == test_employee.first_name
assert data["last_name"] == test_employee.last_name
assert data["department"] == test_employee.department
assert data["office"] == test_employee.office
assert "password" not in data
def test_update_employee(client, db_session):
def test_update_employee(test_db: Session, test_employee, admin_auth_header):
"""Test updating employee data"""
# Create test employee
hashed_password = get_password_hash("test123")
employee = Employee(
first_name="Test",
last_name="User",
department="IT",
office="A101",
password=hashed_password
)
db_session.add(employee)
db_session.commit()
employee_id = employee.id
# Login as admin
admin_response = client.post("/api/auth/admin", json={
"username": "admin",
"password": "admin123"
})
assert admin_response.status_code == 200
admin_token = admin_response.json()["access_token"]
# Update employee
update_data = {
"first_name": "Updated",
"last_name": "Name",
"department": "HR",
"office": "B202"
}
headers = {"Authorization": f"Bearer {admin_token}"}
response = client.put(f"/api/employees/{employee_id}", json=update_data, headers=headers)
response = client.put(
f"/api/employees/{test_employee.id}",
json=update_data,
headers=admin_auth_header
)
assert response.status_code == 200
data = response.json()
@@ -167,36 +100,18 @@ def test_update_employee(client, db_session):
assert data["office"] == update_data["office"]
assert "password" not in data
def test_delete_employee(client, db_session):
def test_delete_employee(test_db: Session, test_employee, admin_auth_header):
"""Test deleting employee"""
# Create test employee
hashed_password = get_password_hash("test123")
employee = Employee(
first_name="Test",
last_name="User",
department="IT",
office="A101",
password=hashed_password
response = client.delete(
f"/api/employees/{test_employee.id}",
headers=admin_auth_header
)
db_session.add(employee)
db_session.commit()
employee_id = employee.id
# Login as admin
admin_response = client.post("/api/auth/admin", json={
"username": "admin",
"password": "admin123"
})
assert admin_response.status_code == 200
admin_token = admin_response.json()["access_token"]
# Delete employee
headers = {"Authorization": f"Bearer {admin_token}"}
response = client.delete(f"/api/employees/{employee_id}", headers=headers)
assert response.status_code == 200
# Verify employee is deleted
get_response = client.get(f"/api/employees/{employee_id}", headers=headers)
get_response = client.get(
f"/api/employees/{test_employee.id}",
headers=admin_auth_header
)
assert get_response.status_code == 404

View File

@@ -1,333 +1,164 @@
import pytest
from app.models.request import Request, RequestStatus, RequestPriority
from app.models.employee import Employee
from app.utils.auth import get_password_hash
from datetime import datetime, timedelta
from fastapi.testclient import TestClient
from sqlalchemy.orm import Session
from app.main import app
from app.models.request import RequestStatus, RequestPriority
from app.crud import requests
from app.schemas.request import RequestCreate
def test_create_request(client, db_session):
client = TestClient(app)
def test_create_request(test_db: Session, test_employee, test_auth_header):
"""Test creating a new request"""
# Create test employee
hashed_password = get_password_hash("test123")
employee = Employee(
first_name="Test",
last_name="User",
department="IT",
office="A101",
password=hashed_password
)
db_session.add(employee)
db_session.commit()
employee_id = employee.id
# Login as employee
login_response = client.post("/api/auth/login", json={
"last_name": "User",
"password": "test123"
})
assert login_response.status_code == 200
token = login_response.json()["access_token"]
# Create request
request_data = {
"title": "Test Request",
"department": "IT",
"request_type": "hardware",
"description": "This is a test request",
"priority": RequestPriority.MEDIUM.value
}
headers = {"Authorization": f"Bearer {token}"}
response = client.post("/api/requests/", json=request_data, headers=headers)
response = client.post(
"/api/requests/",
json=request_data,
headers=test_auth_header
)
assert response.status_code == 200
data = response.json()
assert data["title"] == request_data["title"]
assert data["department"] == request_data["department"]
assert data["description"] == request_data["description"]
assert data["priority"] == request_data["priority"]
assert data["status"] == RequestStatus.NEW.value
assert data["employee_id"] == employee_id
assert "employee_id" in data
def test_get_employee_requests(client, db_session):
def test_get_employee_requests(test_db: Session, test_employee, test_auth_header):
"""Test getting employee's requests"""
# Create test employee
hashed_password = get_password_hash("test123")
employee = Employee(
first_name="Test",
last_name="User",
# Создаем тестовую заявку
request_data = RequestCreate(
department="IT",
office="A101",
password=hashed_password
)
db_session.add(employee)
db_session.commit()
employee_id = employee.id
# Create test request and save its data
request = Request(
title="Test Request",
request_type="hardware",
description="This is a test request",
priority=RequestPriority.MEDIUM.value,
status=RequestStatus.NEW.value,
employee_id=employee_id
priority=RequestPriority.MEDIUM.value
)
db_session.add(request)
db_session.commit()
test_request = requests.create_request(test_db, request_data, test_employee.id)
# Сохраняем данные для сравнения
expected_data = {
"title": request.title,
"description": request.description,
"priority": request.priority,
"status": request.status,
"employee_id": request.employee_id
}
# Login as employee
login_response = client.post("/api/auth/login", json={
"last_name": "User",
"password": "test123"
})
assert login_response.status_code == 200
token = login_response.json()["access_token"]
# Get requests
headers = {"Authorization": f"Bearer {token}"}
response = client.get("/api/requests/my", headers=headers)
response = client.get("/api/requests/my", headers=test_auth_header)
assert response.status_code == 200
data = response.json()
assert len(data) == 1
assert data[0]["title"] == expected_data["title"]
assert data[0]["description"] == expected_data["description"]
assert data[0]["priority"] == expected_data["priority"]
assert data[0]["status"] == expected_data["status"]
assert data[0]["employee_id"] == expected_data["employee_id"]
assert data[0]["department"] == test_request.department
assert data[0]["description"] == test_request.description
assert data[0]["priority"] == test_request.priority
assert data[0]["status"] == test_request.status
assert data[0]["employee_id"] == test_request.employee_id
def test_update_request_status(client, db_session):
def test_update_request_status(test_db: Session, test_employee, admin_auth_header):
"""Test updating request status"""
# Create test employee and request
hashed_password = get_password_hash("test123")
employee = Employee(
first_name="Test",
last_name="User",
# Создаем тестовую заявку
request_data = RequestCreate(
department="IT",
office="A101",
password=hashed_password
)
db_session.add(employee)
db_session.commit()
employee_id = employee.id
request = Request(
title="Test Request",
request_type="hardware",
description="This is a test request",
priority=RequestPriority.MEDIUM.value,
status=RequestStatus.NEW.value,
employee_id=employee_id
priority=RequestPriority.MEDIUM.value
)
db_session.add(request)
db_session.commit()
request_id = request.id
# Login as admin
admin_response = client.post("/api/auth/admin", json={
"username": "admin",
"password": "admin123"
})
assert admin_response.status_code == 200
admin_token = admin_response.json()["access_token"]
# Update request status
test_request = requests.create_request(test_db, request_data, test_employee.id)
update_data = {"status": RequestStatus.IN_PROGRESS.value}
headers = {"Authorization": f"Bearer {admin_token}"}
response = client.patch(f"/api/requests/{request_id}/status", json=update_data, headers=headers)
response = client.patch(
f"/api/requests/{test_request.id}/status",
json=update_data,
headers=admin_auth_header
)
assert response.status_code == 200
data = response.json()
assert data["status"] == RequestStatus.IN_PROGRESS.value
def test_get_all_requests_admin(client, db_session):
def test_get_all_requests_admin(test_db: Session, test_employee, admin_auth_header):
"""Test getting all requests as admin"""
# Create test employees and requests
hashed_password = get_password_hash("test123")
employee1 = Employee(
first_name="Test1",
last_name="User1",
# Создаем тестовую заявку
request_data = RequestCreate(
department="IT",
office="A101",
password=hashed_password
request_type="hardware",
description="This is a test request",
priority=RequestPriority.MEDIUM.value
)
employee2 = Employee(
first_name="Test2",
last_name="User2",
department="HR",
office="B202",
password=hashed_password
)
db_session.add_all([employee1, employee2])
db_session.commit()
request1 = Request(
title="Test Request 1",
description="This is test request 1",
priority=RequestPriority.HIGH.value,
status=RequestStatus.NEW.value,
employee_id=employee1.id
)
request2 = Request(
title="Test Request 2",
description="This is test request 2",
priority=RequestPriority.MEDIUM.value,
status=RequestStatus.IN_PROGRESS.value,
employee_id=employee2.id
)
db_session.add_all([request1, request2])
db_session.commit()
test_request = requests.create_request(test_db, request_data, test_employee.id)
# Сохраняем данные для сравнения
expected_titles = {request1.title, request2.title}
# Login as admin
admin_response = client.post("/api/auth/admin", json={
"username": "admin",
"password": "admin123"
})
assert admin_response.status_code == 200
admin_token = admin_response.json()["access_token"]
# Get all requests
headers = {"Authorization": f"Bearer {admin_token}"}
response = client.get("/api/requests/admin", headers=headers)
assert response.status_code == 200
data = response.json()
assert len(data) == 2
received_titles = {r["title"] for r in data}
assert received_titles == expected_titles
def test_get_requests_by_status(client, db_session):
"""Test filtering requests by status"""
# Create test employee and requests
hashed_password = get_password_hash("test123")
employee = Employee(
first_name="Test",
last_name="User",
department="IT",
office="A101",
password=hashed_password
)
db_session.add(employee)
db_session.commit()
request1 = Request(
title="New Request",
description="This is a new request",
priority=RequestPriority.HIGH.value,
status=RequestStatus.NEW.value,
employee_id=employee.id
)
request2 = Request(
title="In Progress Request",
description="This is an in progress request",
priority=RequestPriority.MEDIUM.value,
status=RequestStatus.IN_PROGRESS.value,
employee_id=employee.id
)
db_session.add_all([request1, request2])
db_session.commit()
response = client.get("/api/requests/admin", headers=admin_auth_header)
# Сохраняем данные для сравнения
expected_data = {
"title": request1.title,
"status": request1.status
}
# Login as admin
admin_response = client.post("/api/auth/admin", json={
"username": "admin",
"password": "admin123"
})
assert admin_response.status_code == 200
admin_token = admin_response.json()["access_token"]
# Get requests filtered by status
headers = {"Authorization": f"Bearer {admin_token}"}
response = client.get(f"/api/requests/admin?status={RequestStatus.NEW.value}", headers=headers)
assert response.status_code == 200
data = response.json()
assert len(data) == 1
assert data[0]["title"] == expected_data["title"]
assert data[0]["status"] == expected_data["status"]
assert data[0]["department"] == test_request.department
def test_get_request_statistics(client, db_session):
"""Test getting request statistics"""
# Create test employee and requests
hashed_password = get_password_hash("test123")
employee = Employee(
first_name="Test",
last_name="User",
def test_get_requests_by_status(test_db: Session, test_employee, admin_auth_header):
"""Test filtering requests by status"""
# Создаем тестовую заявку
request_data = RequestCreate(
department="IT",
office="A101",
password=hashed_password
request_type="hardware",
description="This is a test request",
priority=RequestPriority.MEDIUM.value
)
db_session.add(employee)
db_session.commit()
# Create requests with different statuses
requests_data = [
{"status": RequestStatus.NEW.value, "priority": RequestPriority.HIGH.value},
{"status": RequestStatus.IN_PROGRESS.value, "priority": RequestPriority.MEDIUM.value},
{"status": RequestStatus.COMPLETED.value, "priority": RequestPriority.LOW.value},
{"status": RequestStatus.NEW.value, "priority": RequestPriority.HIGH.value}
]
for i, data in enumerate(requests_data):
request = Request(
title=f"Request {i+1}",
description=f"This is request {i+1}",
priority=data["priority"],
status=data["status"],
employee_id=employee.id
)
db_session.add(request)
db_session.commit()
# Login as admin
admin_response = client.post("/api/auth/admin", json={
"username": "admin",
"password": "admin123"
})
assert admin_response.status_code == 200
admin_token = admin_response.json()["access_token"]
# Get statistics
headers = {"Authorization": f"Bearer {admin_token}"}
response = client.get("/api/requests/statistics", headers=headers)
test_request = requests.create_request(test_db, request_data, test_employee.id)
response = client.get(
f"/api/requests/admin?status={RequestStatus.NEW.value}",
headers=admin_auth_header
)
assert response.status_code == 200
data = response.json()
# Проверяем статистику
assert "total_requests" in data
assert data["total_requests"] == 4
assert "by_status" in data
assert data["by_status"]["new"] == 2
assert data["by_status"]["in_progress"] == 1
assert data["by_status"]["completed"] == 1
assert "by_priority" in data
assert data["by_priority"]["high"] == 2
assert data["by_priority"]["medium"] == 1
assert data["by_priority"]["low"] == 1
assert len(data) == 1
assert data[0]["status"] == RequestStatus.NEW.value
def test_create_request_unauthorized(client):
def test_get_request_statistics(test_db: Session, test_employee, admin_auth_header):
"""Test getting request statistics"""
# Создаем тестовые заявки с разными статусами
requests_data = [
RequestCreate(
department="IT",
request_type="hardware",
description="Test request 1",
priority=RequestPriority.HIGH.value
),
RequestCreate(
department="IT",
request_type="software",
description="Test request 2",
priority=RequestPriority.MEDIUM.value
)
]
for data in requests_data:
requests.create_request(test_db, data, test_employee.id)
response = client.get("/api/requests/statistics", headers=admin_auth_header)
assert response.status_code == 200
data = response.json()
assert "total" in data
assert "by_status" in data
assert data["total"] == 2
assert data["by_status"][RequestStatus.NEW.value] == 2
assert data["by_status"][RequestStatus.IN_PROGRESS.value] == 0
assert data["by_status"][RequestStatus.COMPLETED.value] == 0
assert data["by_status"][RequestStatus.REJECTED.value] == 0
def test_create_request_unauthorized(test_db: Session):
"""Test creating request without authorization"""
request_data = {
"title": "Test Request",
"department": "IT",
"request_type": "hardware",
"description": "This is a test request",
"priority": RequestPriority.MEDIUM.value
}
response = client.post("/api/requests/", json=request_data)
assert response.status_code == 401
def test_get_requests_unauthorized(client):
def test_get_requests_unauthorized(test_db: Session):
"""Test getting requests without authorization"""
response = client.get("/api/requests/my")
assert response.status_code == 401