250 lines
8.3 KiB
Python
250 lines
8.3 KiB
Python
"""Integration tests for SqlAlchemyTaskRepository."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import UTC, datetime
|
|
from typing import TYPE_CHECKING, Final
|
|
from uuid import uuid4
|
|
|
|
import pytest
|
|
|
|
from noteflow.domain.entities.task import Task, TaskListFilters, TaskStatus
|
|
from noteflow.infrastructure.persistence.repositories.task_repo import SqlAlchemyTaskRepository
|
|
|
|
if TYPE_CHECKING:
|
|
from uuid import UUID
|
|
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
EXPECTED_SINGLE_RESULT: Final[int] = 1
|
|
EXPECTED_ZERO_RESULTS: Final[int] = 0
|
|
EXPECTED_THREE_TASKS: Final[int] = 3
|
|
EXPECTED_TWO_TASKS: Final[int] = 2
|
|
TASK_PRIORITY_LOW: Final[int] = 1
|
|
TASK_PRIORITY_HIGH: Final[int] = 2
|
|
ACTION_ITEM_ID_FIRST: Final[int] = 1
|
|
PAGE_LIMIT_SMALL: Final[int] = 2
|
|
|
|
|
|
@pytest.mark.integration
|
|
class TestTaskRepositoryCreate:
|
|
"""Integration tests for task creation."""
|
|
|
|
async def test_create_and_get_task(
|
|
self,
|
|
session: AsyncSession,
|
|
task_repo: SqlAlchemyTaskRepository,
|
|
task_workspace: UUID,
|
|
) -> None:
|
|
"""Test creating and retrieving a task."""
|
|
task = Task(
|
|
id=uuid4(),
|
|
workspace_id=task_workspace,
|
|
text="Test task creation",
|
|
status=TaskStatus.OPEN,
|
|
priority=TASK_PRIORITY_LOW,
|
|
created_at=datetime.now(UTC),
|
|
updated_at=datetime.now(UTC),
|
|
)
|
|
|
|
await task_repo.create(task)
|
|
await session.commit()
|
|
|
|
retrieved = await task_repo.get(task.id)
|
|
|
|
assert retrieved is not None, "task should exist after create"
|
|
assert retrieved.id == task.id, "ID should match"
|
|
assert retrieved.text == "Test task creation", "text should match"
|
|
assert retrieved.status == TaskStatus.OPEN, "expected OPEN status"
|
|
assert retrieved.workspace_id == task_workspace, "workspace_id should match"
|
|
|
|
|
|
@pytest.mark.integration
|
|
class TestTaskRepositoryGet:
|
|
"""Integration tests for task retrieval."""
|
|
|
|
async def test_get_persisted_task(
|
|
self,
|
|
task_repo: SqlAlchemyTaskRepository,
|
|
persisted_task: Task,
|
|
) -> None:
|
|
"""Test retrieving a persisted task."""
|
|
retrieved = await task_repo.get(persisted_task.id)
|
|
|
|
assert retrieved is not None, "persisted task should be retrievable"
|
|
assert retrieved.text == persisted_task.text, "text should match"
|
|
|
|
|
|
@pytest.mark.integration
|
|
class TestTaskRepositoryMissing:
|
|
"""Integration tests for missing tasks."""
|
|
|
|
@pytest.mark.parametrize(
|
|
("method_name", "expected"),
|
|
[
|
|
pytest.param("get", None, id="get_missing"),
|
|
pytest.param("delete", False, id="delete_missing"),
|
|
],
|
|
)
|
|
async def test_task_missing_returns_expected(
|
|
self,
|
|
task_repo: SqlAlchemyTaskRepository,
|
|
method_name: str,
|
|
expected: object,
|
|
) -> None:
|
|
"""Missing task operations should return expected values."""
|
|
non_existent_id = uuid4()
|
|
|
|
method = getattr(task_repo, method_name)
|
|
result = await method(non_existent_id)
|
|
|
|
assert result is expected, f"expected {expected!r} for missing task, got {result!r}"
|
|
|
|
|
|
@pytest.mark.integration
|
|
class TestTaskRepositoryUpdate:
|
|
"""Integration tests for task updates."""
|
|
|
|
async def test_update_task_status(
|
|
self,
|
|
session: AsyncSession,
|
|
task_repo: SqlAlchemyTaskRepository,
|
|
persisted_task: Task,
|
|
) -> None:
|
|
"""Test updating task status to DONE."""
|
|
updated_task = Task(
|
|
id=persisted_task.id,
|
|
workspace_id=persisted_task.workspace_id,
|
|
text=persisted_task.text,
|
|
status=TaskStatus.DONE,
|
|
priority=persisted_task.priority,
|
|
completed_at=datetime.now(UTC),
|
|
created_at=persisted_task.created_at,
|
|
updated_at=datetime.now(UTC),
|
|
)
|
|
|
|
await task_repo.update(updated_task)
|
|
await session.commit()
|
|
|
|
retrieved = await task_repo.get(persisted_task.id)
|
|
|
|
assert retrieved is not None, "updated task should exist"
|
|
assert retrieved.status == TaskStatus.DONE, "expected DONE status"
|
|
assert retrieved.completed_at is not None, "completed_at should be set"
|
|
|
|
async def test_update_task_not_found_raises(
|
|
self,
|
|
task_repo: SqlAlchemyTaskRepository,
|
|
task_workspace: UUID,
|
|
) -> None:
|
|
"""Test updating non-existent task raises ValueError."""
|
|
non_existent_task = Task(
|
|
id=uuid4(),
|
|
workspace_id=task_workspace,
|
|
text="Non-existent",
|
|
status=TaskStatus.OPEN,
|
|
priority=TASK_PRIORITY_LOW,
|
|
created_at=datetime.now(UTC),
|
|
updated_at=datetime.now(UTC),
|
|
)
|
|
|
|
with pytest.raises(ValueError, match=r"Task .* not found"):
|
|
await task_repo.update(non_existent_task)
|
|
|
|
|
|
@pytest.mark.integration
|
|
class TestTaskRepositoryDelete:
|
|
"""Integration tests for task deletion."""
|
|
|
|
async def test_delete_task(
|
|
self,
|
|
session: AsyncSession,
|
|
task_repo: SqlAlchemyTaskRepository,
|
|
persisted_task: Task,
|
|
) -> None:
|
|
"""Test deleting an existing task."""
|
|
result = await task_repo.delete(persisted_task.id)
|
|
await session.commit()
|
|
|
|
assert result is True, "delete should return True for existing task"
|
|
|
|
retrieved = await task_repo.get(persisted_task.id)
|
|
assert retrieved is None, "task should not exist after deletion"
|
|
|
|
|
|
@pytest.mark.integration
|
|
class TestTaskRepositoryList:
|
|
"""Integration tests for task listing and filtering."""
|
|
|
|
@pytest.mark.usefixtures("tasks_with_statuses")
|
|
async def test_list_by_workspace_returns_all_tasks(
|
|
self,
|
|
task_repo: SqlAlchemyTaskRepository,
|
|
task_workspace: UUID,
|
|
) -> None:
|
|
"""Test listing all tasks in a workspace."""
|
|
filters = TaskListFilters()
|
|
|
|
tasks, total = await task_repo.list_by_workspace(task_workspace, filters)
|
|
|
|
assert total == EXPECTED_THREE_TASKS, "expected three total tasks"
|
|
assert len(tasks) == EXPECTED_THREE_TASKS, "expected three tasks returned"
|
|
|
|
@pytest.mark.usefixtures("tasks_with_statuses")
|
|
async def test_list_by_workspace_filter_by_status(
|
|
self,
|
|
task_repo: SqlAlchemyTaskRepository,
|
|
task_workspace: UUID,
|
|
) -> None:
|
|
"""Test filtering tasks by status."""
|
|
filters = TaskListFilters(statuses=[TaskStatus.OPEN])
|
|
|
|
tasks, total = await task_repo.list_by_workspace(task_workspace, filters)
|
|
|
|
assert total == EXPECTED_SINGLE_RESULT, "expected one OPEN task"
|
|
assert len(tasks) == EXPECTED_SINGLE_RESULT, "expected one task returned"
|
|
assert tasks[EXPECTED_ZERO_RESULTS].status == TaskStatus.OPEN, (
|
|
"returned task should be OPEN"
|
|
)
|
|
|
|
@pytest.mark.usefixtures("tasks_with_statuses")
|
|
async def test_list_by_workspace_filter_multiple_statuses(
|
|
self,
|
|
task_repo: SqlAlchemyTaskRepository,
|
|
task_workspace: UUID,
|
|
) -> None:
|
|
"""Test filtering tasks by multiple statuses."""
|
|
filters = TaskListFilters(statuses=[TaskStatus.OPEN, TaskStatus.DONE])
|
|
|
|
tasks, total = await task_repo.list_by_workspace(task_workspace, filters)
|
|
|
|
assert total == EXPECTED_TWO_TASKS, "expected two tasks (OPEN + DONE)"
|
|
assert len(tasks) == EXPECTED_TWO_TASKS, "expected two tasks returned"
|
|
|
|
@pytest.mark.usefixtures("tasks_with_statuses")
|
|
async def test_list_by_workspace_with_pagination(
|
|
self,
|
|
task_repo: SqlAlchemyTaskRepository,
|
|
task_workspace: UUID,
|
|
) -> None:
|
|
"""Test pagination with limit and offset."""
|
|
filters = TaskListFilters(limit=PAGE_LIMIT_SMALL, offset=EXPECTED_ZERO_RESULTS)
|
|
|
|
tasks, total = await task_repo.list_by_workspace(task_workspace, filters)
|
|
|
|
assert total == EXPECTED_THREE_TASKS, "total should reflect all tasks"
|
|
assert len(tasks) == PAGE_LIMIT_SMALL, "expected page size tasks returned"
|
|
|
|
async def test_list_by_workspace_empty_result(
|
|
self,
|
|
task_repo: SqlAlchemyTaskRepository,
|
|
task_workspace: UUID,
|
|
) -> None:
|
|
"""Test listing from empty workspace returns empty list."""
|
|
filters = TaskListFilters()
|
|
|
|
tasks, total = await task_repo.list_by_workspace(task_workspace, filters)
|
|
|
|
assert total == EXPECTED_ZERO_RESULTS, "expected zero tasks"
|
|
assert len(tasks) == EXPECTED_ZERO_RESULTS, "expected empty list"
|