1
0
mirror of https://gitlab.com/MoonTestUse1/AdministrationItDepartmens.git synced 2025-08-14 00:25:46 +02:00

Создание чата9testt

This commit is contained in:
MoonTestUse1
2025-01-05 06:32:34 +06:00
parent 9ba671bdaa
commit 7f7838a0d3
28 changed files with 653 additions and 721 deletions

View File

@@ -1,101 +1,123 @@
"""Test configuration."""
import pytest
from typing import Generator, Dict
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from unittest.mock import MagicMock
from app.db.base import Base
from app.database import get_db
from app.database import Base, get_db
from app.main import app
from app.utils.jwt import create_and_save_token, redis
from app.crud import employees
from app.utils.auth import get_password_hash
from app.models.token import Token
from app.models.employee import Employee
from app.models.request import Request
from app.schemas.employee import EmployeeCreate
from app.core.config import settings
from app.models.user import User
from app.core.auth import get_password_hash
# Используем SQLite для тестов
# Создаем тестовую базу данных
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Создаем мок для Redis
class RedisMock:
# Создаем фиктивный Redis для тестов
class FakeRedis:
"""Fake Redis for testing."""
def __init__(self):
self.data = {}
def setex(self, name, time, value):
self.data[name] = value
return True
def get(self, key):
return self.data.get(key)
def get(self, name):
return self.data.get(name)
def set(self, key, value, ex=None):
self.data[key] = value
def delete(self, name):
if name in self.data:
del self.data[name]
return True
def delete(self, key):
if key in self.data:
del self.data[key]
@pytest.fixture(autouse=True)
def mock_redis(monkeypatch):
redis_mock = RedisMock()
monkeypatch.setattr("app.utils.jwt.redis", redis_mock)
return redis_mock
@pytest.fixture(scope="session")
def redis():
"""Redis fixture."""
return FakeRedis()
@pytest.fixture(scope="function")
def test_db():
# Удаляем все таблицы
Base.metadata.drop_all(bind=engine)
# Создаем все таблицы заново
Base.metadata.create_all(bind=engine)
# Создаем сессию
db = TestingSessionLocal()
try:
yield db
finally:
db.close()
@pytest.fixture(scope="function")
def test_employee(test_db):
hashed_password = get_password_hash("testpass123")
employee_data = EmployeeCreate(
first_name="Test",
last_name="User",
department="IT",
office="101",
password="testpass123"
)
employee = employees.create_employee(test_db, employee_data, hashed_password)
return employee
@pytest.fixture(scope="function")
def test_token(test_db, test_employee):
token = create_and_save_token(test_employee.id, test_db)
return token
@pytest.fixture(scope="function")
def test_auth_header(test_token):
return {"Authorization": f"Bearer {test_token}"}
@pytest.fixture(scope="function")
def admin_token(test_db):
token = create_and_save_token(-1, test_db) # -1 для админа
return token
@pytest.fixture(scope="function")
def admin_auth_header(admin_token):
return {"Authorization": f"Bearer {admin_token}"}
@pytest.fixture(scope="function")
def test_employee_id(test_employee):
return test_employee.id
# Переопределяем зависимость для получения БД
def override_get_db():
db = TestingSessionLocal()
"""Override get_db for testing."""
try:
db = TestingSessionLocal()
yield db
finally:
db.close()
app.dependency_overrides[get_db] = override_get_db
app.dependency_overrides[get_db] = override_get_db
@pytest.fixture(scope="session", autouse=True)
def setup_database():
"""Setup database for testing."""
# Создаем все таблицы
Base.metadata.create_all(bind=engine)
yield
# Удаляем все таблицы после тестов
Base.metadata.drop_all(bind=engine)
@pytest.fixture(scope="function")
def db() -> Generator:
"""Database fixture."""
connection = engine.connect()
transaction = connection.begin()
session = TestingSessionLocal(bind=connection)
yield session
session.close()
transaction.rollback()
connection.close()
@pytest.fixture(scope="function")
def client(db) -> Generator:
"""Test client fixture."""
def override_get_db():
yield db
app.dependency_overrides[get_db] = override_get_db
with TestClient(app) as c:
yield c
@pytest.fixture(scope="function")
def test_user(db) -> Dict[str, str]:
"""Test user fixture."""
user_data = {
"email": "test@example.com",
"password": "test123",
"full_name": "Test User",
"is_admin": False
}
user = User(
email=user_data["email"],
hashed_password=get_password_hash(user_data["password"]),
full_name=user_data["full_name"],
is_admin=user_data["is_admin"]
)
db.add(user)
db.commit()
db.refresh(user)
return user_data
@pytest.fixture(scope="function")
def test_admin(db) -> Dict[str, str]:
"""Test admin fixture."""
admin_data = {
"email": "admin@example.com",
"password": "admin123",
"full_name": "Admin User",
"is_admin": True
}
admin = User(
email=admin_data["email"],
hashed_password=get_password_hash(admin_data["password"]),
full_name=admin_data["full_name"],
is_admin=admin_data["is_admin"]
)
db.add(admin)
db.commit()
db.refresh(admin)
return admin_data

View File

@@ -5,94 +5,87 @@ from app.main import app
from app.crud import employees
from app.utils.auth import verify_password, get_password_hash
from app.schemas.employee import EmployeeCreate
from app.core.auth import create_access_token
from app.core.redis import redis_client
client = TestClient(app)
def test_login_success(test_db: Session):
# Создаем тестового сотрудника
hashed_password = get_password_hash("testpass123")
employee_data = EmployeeCreate(
first_name="Test",
last_name="User",
department="IT",
office="101",
password="testpass123"
)
employee = employees.create_employee(test_db, employee_data, hashed_password)
# Переопределяем redis_client для тестов
def pytest_configure(config):
from app.core import redis
redis.redis_client = config.getoption("--redis", default=None)
pytestmark = pytest.mark.asyncio
@pytest.mark.asyncio
async def test_login_success(client: TestClient, test_user: dict, redis):
"""Test successful login"""
response = client.post(
"/api/auth/login",
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={
"username": "User",
"password": "testpass123"
"username": test_user["email"],
"password": test_user["password"]
}
)
assert response.status_code == 200
assert "access_token" in response.json()
assert response.json()["token_type"] == "bearer"
def test_login_wrong_password(test_db: Session):
# Создаем тестового сотрудника
hashed_password = get_password_hash("testpass123")
employee_data = EmployeeCreate(
first_name="Test",
last_name="User",
department="IT",
office="101",
password="testpass123"
)
employees.create_employee(test_db, employee_data, hashed_password)
@pytest.mark.asyncio
async def test_login_wrong_password(client: TestClient, test_user: dict, redis):
"""Test login with wrong password"""
response = client.post(
"/api/auth/login",
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={
"username": "User",
"password": "wrongpass"
"username": test_user["email"],
"password": "wrongpassword"
}
)
assert response.status_code == 401
assert "detail" in response.json()
assert response.json()["detail"] == "Incorrect email or password"
def test_login_nonexistent_user(test_db: Session):
@pytest.mark.asyncio
async def test_login_wrong_email(client: TestClient, redis):
"""Test login with wrong email"""
response = client.post(
"/api/auth/login",
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={
"username": "NonExistent",
"password": "testpass123"
"username": "wrong@example.com",
"password": "test123"
}
)
assert response.status_code == 401
assert "detail" in response.json()
assert response.json()["detail"] == "Incorrect email or password"
def test_admin_login_success():
response = client.post(
"/api/auth/admin/login",
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={
"username": "admin",
"password": "admin123"
}
@pytest.mark.asyncio
async def test_get_current_user(client: TestClient, test_user: dict, redis):
"""Test getting current user info"""
access_token = create_access_token(
data={"sub": test_user["email"], "is_admin": False}
)
response = client.get(
"/api/auth/me",
headers={"Authorization": f"Bearer {access_token}"}
)
assert response.status_code == 200
assert "access_token" in response.json()
assert response.json()["token_type"] == "bearer"
assert response.json()["email"] == test_user["email"]
assert response.json()["full_name"] == test_user["full_name"]
assert not response.json()["is_admin"]
def test_admin_login_wrong_password():
response = client.post(
"/api/auth/admin/login",
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={
"username": "admin",
"password": "wrongpass"
}
)
@pytest.mark.asyncio
async def test_get_current_user_no_token(client: TestClient, redis):
"""Test getting current user without token"""
response = client.get("/api/auth/me")
assert response.status_code == 401
assert "detail" in response.json()
assert response.json()["detail"] == "Not authenticated"
@pytest.mark.asyncio
async def test_get_current_user_invalid_token(client: TestClient, redis):
"""Test getting current user with invalid token"""
response = client.get(
"/api/auth/me",
headers={"Authorization": "Bearer invalid_token"}
)
assert response.status_code == 401
assert response.json()["detail"] == "Could not validate credentials"

View File

@@ -1,117 +1,93 @@
"""Employee tests."""
import pytest
from fastapi.testclient import TestClient
from sqlalchemy.orm import Session
from app.main import app
from app.crud import employees
from app.utils.auth import get_password_hash
from app.schemas.employee import EmployeeCreate
from app.core.auth import create_access_token
from app.models.user import User
client = TestClient(app)
pytestmark = pytest.mark.asyncio
def test_create_employee(test_db: Session, admin_auth_header):
"""Test creating a new employee"""
@pytest.mark.asyncio
async def test_create_employee(client: TestClient, test_admin: dict, db: Session):
"""Test creating a new employee."""
access_token = create_access_token(
data={"sub": test_admin["email"], "is_admin": True}
)
employee_data = {
"first_name": "John",
"last_name": "Doe",
"department": "IT",
"office": "B205",
"password": "test123"
"email": "new@example.com",
"password": "newpass123",
"full_name": "New Employee",
"is_admin": False
}
response = client.post(
"/api/employees/",
json=employee_data,
headers=admin_auth_header
headers={"Authorization": f"Bearer {access_token}"},
json=employee_data
)
assert response.status_code == 200
data = response.json()
assert data["email"] == employee_data["email"]
assert data["full_name"] == employee_data["full_name"]
assert not data["is_admin"]
@pytest.mark.asyncio
async def test_create_employee_unauthorized(client: TestClient, test_user: dict):
"""Test creating an employee without admin rights."""
access_token = create_access_token(
data={"sub": test_user["email"], "is_admin": False}
)
assert response.status_code == 200
data = response.json()
assert data["first_name"] == employee_data["first_name"]
assert data["last_name"] == employee_data["last_name"]
assert data["department"] == employee_data["department"]
assert data["office"] == employee_data["office"]
assert "password" not in data
def test_get_employees(test_db: Session, test_employee, admin_auth_header):
"""Test getting list of employees"""
response = client.get("/api/employees/", headers=admin_auth_header)
assert response.status_code == 200
data = response.json()
assert len(data) >= 1
assert data[0]["first_name"] == test_employee.first_name
assert data[0]["last_name"] == test_employee.last_name
assert data[0]["department"] == test_employee.department
assert data[0]["office"] == test_employee.office
assert "password" not in data[0]
def test_create_employee_unauthorized(test_db: Session):
"""Test creating employee without authorization"""
employee_data = {
"first_name": "John",
"last_name": "Doe",
"department": "IT",
"office": "B205",
"password": "test123"
"email": "new@example.com",
"password": "newpass123",
"full_name": "New Employee",
"is_admin": False
}
response = client.post("/api/employees/", json=employee_data)
assert response.status_code == 401 # Unauthorized
response = client.post(
"/api/employees/",
headers={"Authorization": f"Bearer {access_token}"},
json=employee_data
)
assert response.status_code == 403
def test_get_employees_unauthorized(test_db: Session):
"""Test getting employees list without authorization"""
response = client.get("/api/employees/")
assert response.status_code == 401 # Unauthorized
def test_get_employee_by_id(test_db: Session, test_employee, admin_auth_header):
"""Test getting employee by ID"""
response = client.get(
f"/api/employees/{test_employee.id}",
headers=admin_auth_header
@pytest.mark.asyncio
async def test_get_employee(client: TestClient, test_admin: dict, test_user: dict, db: Session):
"""Test getting an employee by ID."""
access_token = create_access_token(
data={"sub": test_admin["email"], "is_admin": True}
)
# Получаем ID тестового пользователя
user = db.query(User).filter(User.email == test_user["email"]).first()
response = client.get(
f"/api/employees/{user.id}",
headers={"Authorization": f"Bearer {access_token}"}
)
assert response.status_code == 200
data = response.json()
assert data["first_name"] == test_employee.first_name
assert data["last_name"] == test_employee.last_name
assert data["department"] == test_employee.department
assert data["office"] == test_employee.office
assert "password" not in data
assert data["email"] == test_user["email"]
assert data["full_name"] == test_user["full_name"]
def test_update_employee(test_db: Session, test_employee, admin_auth_header):
"""Test updating employee data"""
@pytest.mark.asyncio
async def test_update_employee_me(client: TestClient, test_user: dict):
"""Test updating current employee info."""
access_token = create_access_token(
data={"sub": test_user["email"], "is_admin": False}
)
update_data = {
"first_name": "Updated",
"last_name": "Name",
"department": "HR",
"office": "B202"
"full_name": "Updated Name"
}
response = client.put(
f"/api/employees/{test_employee.id}",
json=update_data,
headers=admin_auth_header
"/api/employees/me",
headers={"Authorization": f"Bearer {access_token}"},
json=update_data
)
assert response.status_code == 200
data = response.json()
assert data["first_name"] == update_data["first_name"]
assert data["last_name"] == update_data["last_name"]
assert data["department"] == update_data["department"]
assert data["office"] == update_data["office"]
assert "password" not in data
def test_delete_employee(test_db: Session, test_employee, admin_auth_header):
"""Test deleting employee"""
response = client.delete(
f"/api/employees/{test_employee.id}",
headers=admin_auth_header
)
assert response.status_code == 200
# Verify employee is deleted
get_response = client.get(
f"/api/employees/{test_employee.id}",
headers=admin_auth_header
)
assert get_response.status_code == 404
assert data["full_name"] == update_data["full_name"]

View File

@@ -1,164 +1,116 @@
import pytest
from fastapi.testclient import TestClient
from sqlalchemy.orm import Session
from app.main import app
from app.models.request import RequestStatus, RequestPriority
from app.crud import requests
from app.schemas.request import RequestCreate
from app.core.auth import create_access_token
client = TestClient(app)
pytestmark = pytest.mark.asyncio
def test_create_request(test_db: Session, test_employee, test_auth_header):
@pytest.mark.asyncio
async def test_create_request(client: TestClient, test_user: dict, redis):
"""Test creating a new request"""
access_token = create_access_token(
data={"sub": test_user["email"], "is_admin": False}
)
request_data = {
"department": "IT",
"request_type": "hardware",
"description": "This is a test request",
"priority": RequestPriority.MEDIUM.value
"request_type": "technical",
"description": "Test request",
"priority": "medium"
}
response = client.post(
"/api/requests/",
json=request_data,
headers=test_auth_header
headers={"Authorization": f"Bearer {access_token}"},
json=request_data
)
assert response.status_code == 200
data = response.json()
assert data["department"] == request_data["department"]
assert data["request_type"] == request_data["request_type"]
assert data["description"] == request_data["description"]
assert data["priority"] == request_data["priority"]
assert data["status"] == RequestStatus.NEW.value
assert "employee_id" in data
assert data["status"] == "new"
def test_get_employee_requests(test_db: Session, test_employee, test_auth_header):
"""Test getting employee's requests"""
# Создаем тестовую заявку
request_data = RequestCreate(
department="IT",
request_type="hardware",
description="This is a test request",
priority=RequestPriority.MEDIUM.value
@pytest.mark.asyncio
async def test_create_request_invalid_priority(client: TestClient, test_user: dict, redis):
"""Test creating a request with invalid priority"""
access_token = create_access_token(
data={"sub": test_user["email"], "is_admin": False}
)
test_request = requests.create_request(test_db, request_data, test_employee.id)
response = client.get("/api/requests/my", headers=test_auth_header)
request_data = {
"request_type": "technical",
"description": "Test request",
"priority": "invalid"
}
assert response.status_code == 200
data = response.json()
assert len(data) == 1
assert data[0]["department"] == test_request.department
assert data[0]["description"] == test_request.description
assert data[0]["priority"] == test_request.priority
assert data[0]["status"] == test_request.status
assert data[0]["employee_id"] == test_request.employee_id
response = client.post(
"/api/requests/",
headers={"Authorization": f"Bearer {access_token}"},
json=request_data
)
assert response.status_code == 422
def test_update_request_status(test_db: Session, test_employee, admin_auth_header):
"""Test updating request status"""
# Создаем тестовую заявку
request_data = RequestCreate(
department="IT",
request_type="hardware",
description="This is a test request",
priority=RequestPriority.MEDIUM.value
@pytest.mark.asyncio
async def test_get_user_requests(client: TestClient, test_user: dict, redis):
"""Test getting user's requests"""
access_token = create_access_token(
data={"sub": test_user["email"], "is_admin": False}
)
test_request = requests.create_request(test_db, request_data, test_employee.id)
update_data = {"status": RequestStatus.IN_PROGRESS.value}
response = client.patch(
f"/api/requests/{test_request.id}/status",
json=update_data,
headers=admin_auth_header
)
assert response.status_code == 200
data = response.json()
assert data["status"] == RequestStatus.IN_PROGRESS.value
def test_get_all_requests_admin(test_db: Session, test_employee, admin_auth_header):
"""Test getting all requests as admin"""
# Создаем тестовую заявку
request_data = RequestCreate(
department="IT",
request_type="hardware",
description="This is a test request",
priority=RequestPriority.MEDIUM.value
)
test_request = requests.create_request(test_db, request_data, test_employee.id)
response = client.get("/api/requests/admin", headers=admin_auth_header)
assert response.status_code == 200
data = response.json()
assert len(data) == 1
assert data[0]["department"] == test_request.department
def test_get_requests_by_status(test_db: Session, test_employee, admin_auth_header):
"""Test filtering requests by status"""
# Создаем тестовую заявку
request_data = RequestCreate(
department="IT",
request_type="hardware",
description="This is a test request",
priority=RequestPriority.MEDIUM.value
)
test_request = requests.create_request(test_db, request_data, test_employee.id)
response = client.get(
f"/api/requests/admin?status={RequestStatus.NEW.value}",
headers=admin_auth_header
"/api/requests/my",
headers={"Authorization": f"Bearer {access_token}"}
)
assert response.status_code == 200
assert isinstance(response.json(), list)
@pytest.mark.asyncio
async def test_update_request_status(client: TestClient, test_admin: dict, redis):
"""Test updating request status (admin only)"""
access_token = create_access_token(
data={"sub": test_admin["email"], "is_admin": True}
)
assert response.status_code == 200
data = response.json()
assert len(data) == 1
assert data[0]["status"] == RequestStatus.NEW.value
def test_get_request_statistics(test_db: Session, test_employee, admin_auth_header):
"""Test getting request statistics"""
# Создаем тестовые заявки с разными статусами
requests_data = [
RequestCreate(
department="IT",
request_type="hardware",
description="Test request 1",
priority=RequestPriority.HIGH.value
),
RequestCreate(
department="IT",
request_type="software",
description="Test request 2",
priority=RequestPriority.MEDIUM.value
)
]
for data in requests_data:
requests.create_request(test_db, data, test_employee.id)
response = client.get("/api/requests/statistics", headers=admin_auth_header)
assert response.status_code == 200
data = response.json()
assert "total" in data
assert "by_status" in data
assert data["total"] == 2
assert data["by_status"][RequestStatus.NEW.value] == 2
assert data["by_status"][RequestStatus.IN_PROGRESS.value] == 0
assert data["by_status"][RequestStatus.COMPLETED.value] == 0
assert data["by_status"][RequestStatus.REJECTED.value] == 0
def test_create_request_unauthorized(test_db: Session):
"""Test creating request without authorization"""
# Сначала создаем запрос
request_data = {
"department": "IT",
"request_type": "hardware",
"description": "This is a test request",
"priority": RequestPriority.MEDIUM.value
"request_type": "technical",
"description": "Test request",
"priority": "medium"
}
response = client.post("/api/requests/", json=request_data)
assert response.status_code == 401
create_response = client.post(
"/api/requests/",
headers={"Authorization": f"Bearer {access_token}"},
json=request_data
)
request_id = create_response.json()["id"]
# Теперь обновляем статус
update_data = {
"status": "in_progress"
}
response = client.put(
f"/api/admin/requests/{request_id}/status",
headers={"Authorization": f"Bearer {access_token}"},
json=update_data
)
assert response.status_code == 200
assert response.json()["status"] == "in_progress"
def test_get_requests_unauthorized(test_db: Session):
"""Test getting requests without authorization"""
response = client.get("/api/requests/my")
assert response.status_code == 401
@pytest.mark.asyncio
async def test_update_request_status_not_admin(client: TestClient, test_user: dict, redis):
"""Test updating request status without admin rights"""
access_token = create_access_token(
data={"sub": test_user["email"], "is_admin": False}
)
update_data = {
"status": "in_progress"
}
response = client.put(
"/api/admin/requests/1/status",
headers={"Authorization": f"Bearer {access_token}"},
json=update_data
)
assert response.status_code == 403

View File

@@ -0,0 +1,37 @@
from fastapi.testclient import TestClient
import pytest
from app.core.auth import create_access_token
def test_get_statistics_admin(client: TestClient, test_admin: dict):
"""Test getting statistics as admin"""
access_token = create_access_token(
data={"sub": test_admin["email"], "is_admin": True}
)
response = client.get(
"/api/admin/statistics",
headers={"Authorization": f"Bearer {access_token}"}
)
assert response.status_code == 200
data = response.json()
assert "total_requests" in data
assert "total_users" in data
assert "status_stats" in data
assert "daily_stats" in data
def test_get_statistics_not_admin(client: TestClient, test_user: dict):
"""Test getting statistics without admin rights"""
access_token = create_access_token(
data={"sub": test_user["email"], "is_admin": False}
)
response = client.get(
"/api/admin/statistics",
headers={"Authorization": f"Bearer {access_token}"}
)
assert response.status_code == 403
def test_get_statistics_no_auth(client: TestClient):
"""Test getting statistics without authentication"""
response = client.get("/api/admin/statistics")
assert response.status_code == 401