Files
noteflow/tests/integration/test_task_repository.py
2026-01-23 07:45:20 +00:00

250 lines
8.3 KiB
Python

"""Integration tests for SqlAlchemyTaskRepository."""
from __future__ import annotations
from datetime import UTC, datetime
from typing import TYPE_CHECKING, Final
from uuid import uuid4
import pytest
from noteflow.domain.entities.task import Task, TaskListFilters, TaskStatus
from noteflow.infrastructure.persistence.repositories.task_repo import SqlAlchemyTaskRepository
if TYPE_CHECKING:
from uuid import UUID
from sqlalchemy.ext.asyncio import AsyncSession
EXPECTED_SINGLE_RESULT: Final[int] = 1
EXPECTED_ZERO_RESULTS: Final[int] = 0
EXPECTED_THREE_TASKS: Final[int] = 3
EXPECTED_TWO_TASKS: Final[int] = 2
TASK_PRIORITY_LOW: Final[int] = 1
TASK_PRIORITY_HIGH: Final[int] = 2
ACTION_ITEM_ID_FIRST: Final[int] = 1
PAGE_LIMIT_SMALL: Final[int] = 2
@pytest.mark.integration
class TestTaskRepositoryCreate:
"""Integration tests for task creation."""
async def test_create_and_get_task(
self,
session: AsyncSession,
task_repo: SqlAlchemyTaskRepository,
task_workspace: UUID,
) -> None:
"""Test creating and retrieving a task."""
task = Task(
id=uuid4(),
workspace_id=task_workspace,
text="Test task creation",
status=TaskStatus.OPEN,
priority=TASK_PRIORITY_LOW,
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
)
await task_repo.create(task)
await session.commit()
retrieved = await task_repo.get(task.id)
assert retrieved is not None, "task should exist after create"
assert retrieved.id == task.id, "ID should match"
assert retrieved.text == "Test task creation", "text should match"
assert retrieved.status == TaskStatus.OPEN, "expected OPEN status"
assert retrieved.workspace_id == task_workspace, "workspace_id should match"
@pytest.mark.integration
class TestTaskRepositoryGet:
"""Integration tests for task retrieval."""
async def test_get_persisted_task(
self,
task_repo: SqlAlchemyTaskRepository,
persisted_task: Task,
) -> None:
"""Test retrieving a persisted task."""
retrieved = await task_repo.get(persisted_task.id)
assert retrieved is not None, "persisted task should be retrievable"
assert retrieved.text == persisted_task.text, "text should match"
@pytest.mark.integration
class TestTaskRepositoryMissing:
"""Integration tests for missing tasks."""
@pytest.mark.parametrize(
("method_name", "expected"),
[
pytest.param("get", None, id="get_missing"),
pytest.param("delete", False, id="delete_missing"),
],
)
async def test_task_missing_returns_expected(
self,
task_repo: SqlAlchemyTaskRepository,
method_name: str,
expected: object,
) -> None:
"""Missing task operations should return expected values."""
non_existent_id = uuid4()
method = getattr(task_repo, method_name)
result = await method(non_existent_id)
assert result is expected, f"expected {expected!r} for missing task, got {result!r}"
@pytest.mark.integration
class TestTaskRepositoryUpdate:
"""Integration tests for task updates."""
async def test_update_task_status(
self,
session: AsyncSession,
task_repo: SqlAlchemyTaskRepository,
persisted_task: Task,
) -> None:
"""Test updating task status to DONE."""
updated_task = Task(
id=persisted_task.id,
workspace_id=persisted_task.workspace_id,
text=persisted_task.text,
status=TaskStatus.DONE,
priority=persisted_task.priority,
completed_at=datetime.now(UTC),
created_at=persisted_task.created_at,
updated_at=datetime.now(UTC),
)
await task_repo.update(updated_task)
await session.commit()
retrieved = await task_repo.get(persisted_task.id)
assert retrieved is not None, "updated task should exist"
assert retrieved.status == TaskStatus.DONE, "expected DONE status"
assert retrieved.completed_at is not None, "completed_at should be set"
async def test_update_task_not_found_raises(
self,
task_repo: SqlAlchemyTaskRepository,
task_workspace: UUID,
) -> None:
"""Test updating non-existent task raises ValueError."""
non_existent_task = Task(
id=uuid4(),
workspace_id=task_workspace,
text="Non-existent",
status=TaskStatus.OPEN,
priority=TASK_PRIORITY_LOW,
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
)
with pytest.raises(ValueError, match=r"Task .* not found"):
await task_repo.update(non_existent_task)
@pytest.mark.integration
class TestTaskRepositoryDelete:
"""Integration tests for task deletion."""
async def test_delete_task(
self,
session: AsyncSession,
task_repo: SqlAlchemyTaskRepository,
persisted_task: Task,
) -> None:
"""Test deleting an existing task."""
result = await task_repo.delete(persisted_task.id)
await session.commit()
assert result is True, "delete should return True for existing task"
retrieved = await task_repo.get(persisted_task.id)
assert retrieved is None, "task should not exist after deletion"
@pytest.mark.integration
class TestTaskRepositoryList:
"""Integration tests for task listing and filtering."""
@pytest.mark.usefixtures("tasks_with_statuses")
async def test_list_by_workspace_returns_all_tasks(
self,
task_repo: SqlAlchemyTaskRepository,
task_workspace: UUID,
) -> None:
"""Test listing all tasks in a workspace."""
filters = TaskListFilters()
tasks, total = await task_repo.list_by_workspace(task_workspace, filters)
assert total == EXPECTED_THREE_TASKS, "expected three total tasks"
assert len(tasks) == EXPECTED_THREE_TASKS, "expected three tasks returned"
@pytest.mark.usefixtures("tasks_with_statuses")
async def test_list_by_workspace_filter_by_status(
self,
task_repo: SqlAlchemyTaskRepository,
task_workspace: UUID,
) -> None:
"""Test filtering tasks by status."""
filters = TaskListFilters(statuses=[TaskStatus.OPEN])
tasks, total = await task_repo.list_by_workspace(task_workspace, filters)
assert total == EXPECTED_SINGLE_RESULT, "expected one OPEN task"
assert len(tasks) == EXPECTED_SINGLE_RESULT, "expected one task returned"
assert tasks[EXPECTED_ZERO_RESULTS].status == TaskStatus.OPEN, (
"returned task should be OPEN"
)
@pytest.mark.usefixtures("tasks_with_statuses")
async def test_list_by_workspace_filter_multiple_statuses(
self,
task_repo: SqlAlchemyTaskRepository,
task_workspace: UUID,
) -> None:
"""Test filtering tasks by multiple statuses."""
filters = TaskListFilters(statuses=[TaskStatus.OPEN, TaskStatus.DONE])
tasks, total = await task_repo.list_by_workspace(task_workspace, filters)
assert total == EXPECTED_TWO_TASKS, "expected two tasks (OPEN + DONE)"
assert len(tasks) == EXPECTED_TWO_TASKS, "expected two tasks returned"
@pytest.mark.usefixtures("tasks_with_statuses")
async def test_list_by_workspace_with_pagination(
self,
task_repo: SqlAlchemyTaskRepository,
task_workspace: UUID,
) -> None:
"""Test pagination with limit and offset."""
filters = TaskListFilters(limit=PAGE_LIMIT_SMALL, offset=EXPECTED_ZERO_RESULTS)
tasks, total = await task_repo.list_by_workspace(task_workspace, filters)
assert total == EXPECTED_THREE_TASKS, "total should reflect all tasks"
assert len(tasks) == PAGE_LIMIT_SMALL, "expected page size tasks returned"
async def test_list_by_workspace_empty_result(
self,
task_repo: SqlAlchemyTaskRepository,
task_workspace: UUID,
) -> None:
"""Test listing from empty workspace returns empty list."""
filters = TaskListFilters()
tasks, total = await task_repo.list_by_workspace(task_workspace, filters)
assert total == EXPECTED_ZERO_RESULTS, "expected zero tasks"
assert len(tasks) == EXPECTED_ZERO_RESULTS, "expected empty list"