Files
noteflow/docs/roadmap.md
Travis Vasceannie 6fa792990b Enhance summarization model attributes and database schema
- Updated the Summary entity to include provider and model names, along with tokens used and latency metrics for better tracking of summarization performance.
- Modified the ORM converters and repository methods to accommodate new attributes, ensuring backward compatibility.
- Introduced word timing position indexing to maintain order within summaries.
- Added a new SQLAlchemy model structure for improved organization of persistence layers, including core, identity, and integration models.
- Removed deprecated models and files to streamline the codebase.
2025-12-25 13:46:00 -05:00

48 KiB

NoteFlow Feature Gap Analysis & Development Roadmap

Generated: 2025-12-23 | Updated: 2025-12-25 Focus: Core pipeline completion (transcription → summary → diarization → export)


Executive Summary

This document identifies features not yet developed or fully connected between the NoteFlow frontend (Tauri/React) and backend (Python gRPC). The analysis focuses on completing the core pipeline rather than legacy milestones.

Current State

Component Status Coverage
Transcription Working Real-time streaming, VAD, partial/final segments
Summarization Partial AI generation works, templates not passed
Diarization Partial Engine exists, application service missing
Export Partial Markdown/HTML working, PDF missing
Integrations Stub UI exists, backend handlers missing

Sprint Overview

Sprint Name Phase Status Prerequisites
0 Proto & Schema Foundation Foundation New
1 AI Templates Pass-Through Core Pipeline Planned Sprint 0
2 Diarization Application Service Core Pipeline Planned Sprint 0
3 PDF Export Core Pipeline Planned Sprint 0
4 Named Entity Extraction Intelligence Planned Sprint 0
5 Calendar Sync Integrations Planned Sprint 0
6 Webhook Execution Integrations Planned Sprint 0

Feature Gap Summary

Priority Feature Owner Complexity Status
0 Proto & Schema Foundation Backend Medium NEW - Consolidates proto/DB changes
1 AI Templates Pass-Through Both Low Not connected
2 Diarization Application Service Backend Medium Engine exists, service missing
3 PDF Export Backend Low-Medium Not implemented
4 Named Entity Extraction Backend Medium UI only, no backend
5 Calendar Sync Backend Medium-High UI exists, no handlers
6 Webhook Execution Backend Medium Config UI only

Sprint 0: Proto & Schema Foundation (NEW)

Priority: 0 | Owner: Backend | Complexity: Medium Documentation: docs/sprints/phase-0-foundation/sprint-0-proto-schema/README.md

Objective

Consolidate all proto and database schema changes required by Sprints 1-6 into a single, atomic foundation sprint. This prevents proto version conflicts and ensures all sprints start from a consistent base.

Key Components

  1. Consolidated Proto Definitions: All RPCs and messages for Sprints 1-6
  2. Alembic Migrations: named_entities, webhooks, webhook_deliveries tables
  3. Feature Flags: Toggle experimental features (ner_extraction_enabled, calendar_sync_enabled)
  4. Docker Integration: spaCy model downloads, database initialization
  5. Proto Regeneration Script: Consistent stub generation

Critical Fixes Included

  • Resolves proto version conflicts across sprints
  • Ensures database schema exists before feature sprints
  • Provides feature flags for gradual rollout
  • Documents proto changelog for sync points

Ownership Guidelines

Backend (Python gRPC) Responsibilities

Area Rationale
ML inference (NER, diarization, ASR) GPU/model management, heavy computation
LLM summarization API key security, prompt engineering
Audio processing Real-time constraints, encryption
PDF generation Library dependencies (weasyprint/reportlab)
Calendar OAuth Token refresh, security
Webhook execution Reliability, retry logic, error handling

Frontend (Tauri/Rust + React) Responsibilities

Area Rationale
UI rendering User interaction, responsiveness
Local preferences Quick access, offline capability
OS integration Platform-specific APIs (tray, hotkeys)
Audio device management OS-level device enumeration
Real-time event handling Low-latency UI updates

Shared Responsibilities

  • Proto changes: Both sides must stay in sync
  • New RPC methods: Requires coordination
  • Feature flags: May require both UI toggle and backend support

Phase 1: Core Pipeline Completion

Feature 1: AI Templates Pass-Through

Priority: 1 Owner: Both (proto change + frontend wiring) Complexity: Low

Current State

  • Frontend: Settings.tsx has AI template UI (tone, format, verbosity) saved to local preferences
  • Backend: GenerateSummary RPC ignores templates; uses hardcoded prompt in _parsing.py

Gap Analysis

The user can configure summarization style in Settings, but these preferences are never sent to the backend. The GenerateSummaryRequest proto message lacks fields for customization.

Implementation Plan

Step 1: Proto Update

File: src/noteflow/grpc/proto/noteflow.proto

// Add new message
message SummarizationOptions {
  string tone = 1;        // professional, casual, technical, friendly
  string format = 2;      // bullet_points, narrative, structured, concise
  string verbosity = 3;   // minimal, balanced, detailed, comprehensive
}

// Modify existing message
message GenerateSummaryRequest {
  string meeting_id = 1;
  bool force_regenerate = 2;
  SummarizationOptions options = 3;  // NEW FIELD
}

Step 2: Backend - Prompt Builder

File: src/noteflow/infrastructure/summarization/_parsing.py

def build_template_prompt(options: SummarizationOptions | None) -> str:
    """Build prompt prefix based on user template preferences."""
    if not options:
        return ""

    tone_map = {
        "professional": "Use formal, business-appropriate language.",
        "casual": "Use conversational, approachable language.",
        "technical": "Use precise technical terminology.",
        "friendly": "Use warm, personable language.",
    }

    format_map = {
        "bullet_points": "Present information in bullet points.",
        "narrative": "Write in flowing paragraphs.",
        "structured": "Use headers and organized sections.",
        "concise": "Be extremely brief and to the point.",
    }

    verbosity_map = {
        "minimal": "Provide only essential information.",
        "balanced": "Include moderate detail.",
        "detailed": "Include comprehensive information.",
        "comprehensive": "Include all relevant details and context.",
    }

    parts = []
    if options.tone and options.tone in tone_map:
        parts.append(tone_map[options.tone])
    if options.format and options.format in format_map:
        parts.append(format_map[options.format])
    if options.verbosity and options.verbosity in verbosity_map:
        parts.append(verbosity_map[options.verbosity])

    return " ".join(parts)

Step 3: Backend - Service Update

File: src/noteflow/application/services/summarization_service.py

Modify summarize() method to accept and pass options to prompt builder.

Step 4: Backend - gRPC Mixin Update

File: src/noteflow/grpc/_mixins/summarization.py

Extract options from request and pass to service.

Step 5: Frontend - Rust Command Update

File: client/src-tauri/src/commands/summary.rs

#[tauri::command]
pub async fn generate_summary(
    meeting_id: String,
    force_regenerate: Option<bool>,
    tone: Option<String>,
    format: Option<String>,
    verbosity: Option<String>,
    state: State<'_, AppState>,
) -> Result<Summary, String> {
    // Build SummarizationOptions from parameters
    // Send to gRPC
}

Step 6: Frontend - TypeScript Adapter Update

File: client/src/api/tauri-adapter.ts

async generateSummary(
  meetingId: string,
  forceRegenerate?: boolean
): Promise<Summary> {
  const prefs = await preferences.get();
  const template = prefs.ai_template;

  return invoke(Commands.GENERATE_SUMMARY, {
    meetingId,
    forceRegenerate,
    tone: template?.tone,
    format: template?.format,
    verbosity: template?.verbosity,
  });
}

Files Modified

File Change Type
src/noteflow/grpc/proto/noteflow.proto Add SummarizationOptions message
src/noteflow/grpc/_mixins/summarization.py Pass options to service
src/noteflow/application/services/summarization_service.py Accept options parameter
src/noteflow/infrastructure/summarization/_parsing.py Add template prompt builder
client/src-tauri/src/commands/summary.rs Accept template parameters
client/src/api/tauri-adapter.ts Read preferences, pass to command

Testing

  • Unit test for build_template_prompt() with all combinations
  • Integration test: generate summary with different templates
  • Frontend test: verify preferences are read and passed

Feature 2: Diarization Application Service

Priority: 2 Owner: Backend Complexity: Medium

Current State

  • Infrastructure: DiarizationEngine exists in infrastructure/diarization/ with streaming (diart) and offline (pyannote.audio) modes
  • gRPC: RPCs implemented (RefineSpeakerDiarization, GetDiarizationJobStatus, RenameSpeaker)
  • Missing: Application service layer, proper server initialization

Gap Analysis

The gRPC mixin directly calls the diarization engine without an application service layer. This violates the hexagonal architecture pattern used elsewhere and makes testing difficult.

Implementation Plan

Step 1: Create Application Service

File: src/noteflow/application/services/diarization_service.py

"""Diarization application service."""

from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING
from uuid import uuid4

from noteflow.domain.entities.meeting import MeetingId

if TYPE_CHECKING:
    from noteflow.infrastructure.diarization.engine import DiarizationEngine
    from noteflow.infrastructure.persistence.unit_of_work import SQLAlchemyUnitOfWork


@dataclass
class DiarizationJobStatus:
    """Status of a diarization job."""

    job_id: str
    status: str  # queued, running, completed, failed
    segments_updated: int
    speaker_ids: list[str]
    error_message: str | None = None


class DiarizationService:
    """Orchestrates speaker diarization workflows."""

    def __init__(
        self,
        engine: DiarizationEngine,
        uow_factory: type[SQLAlchemyUnitOfWork],
    ) -> None:
        self._engine = engine
        self._uow_factory = uow_factory
        self._jobs: dict[str, DiarizationJobStatus] = {}

    async def refine_meeting(
        self,
        meeting_id: MeetingId,
        num_speakers: int | None = None,
    ) -> str:
        """Start diarization refinement job.

        Args:
            meeting_id: Meeting to process
            num_speakers: Optional hint for number of speakers

        Returns:
            Job ID for status polling
        """
        job_id = str(uuid4())
        self._jobs[job_id] = DiarizationJobStatus(
            job_id=job_id,
            status="queued",
            segments_updated=0,
            speaker_ids=[],
        )

        # Queue background task
        # Implementation delegates to engine with callbacks

        return job_id

    async def get_job_status(self, job_id: str) -> DiarizationJobStatus:
        """Get status of diarization job."""
        if job_id not in self._jobs:
            return DiarizationJobStatus(
                job_id=job_id,
                status="failed",
                segments_updated=0,
                speaker_ids=[],
                error_message="Job not found",
            )
        return self._jobs[job_id]

    async def rename_speaker(
        self,
        meeting_id: MeetingId,
        old_speaker_id: str,
        new_speaker_name: str,
    ) -> int:
        """Rename speaker across all segments.

        Returns:
            Number of segments updated
        """
        async with self._uow_factory() as uow:
            meeting = await uow.meetings.get(meeting_id)
            if not meeting:
                raise ValueError(f"Meeting {meeting_id} not found")

            count = 0
            for segment in meeting.segments:
                if segment.speaker_id == old_speaker_id:
                    segment.speaker_id = new_speaker_name
                    count += 1

            await uow.commit()
            return count

Step 2: Update Server Initialization

File: src/noteflow/grpc/server.py

# Add to serve() function
from noteflow.infrastructure.diarization.engine import DiarizationEngine
from noteflow.application.services.diarization_service import DiarizationService

# Initialize engine
diarization_engine = DiarizationEngine(
    model_path=settings.diarization_model_path,
    device=settings.device,
)

# Create service
diarization_service = DiarizationService(
    engine=diarization_engine,
    uow_factory=SQLAlchemyUnitOfWork,
)

# Pass to servicer
servicer = NoteFlowServicer(
    # ... existing params ...
    diarization_service=diarization_service,
)

Step 3: Update gRPC Mixin

File: src/noteflow/grpc/_mixins/diarization.py

Refactor to use injected service instead of direct engine calls.

Step 4: Add Protocol

File: src/noteflow/domain/diarization/ports.py

"""Diarization port interfaces."""

from typing import Protocol

from noteflow.domain.entities.meeting import MeetingId


class DiarizationPort(Protocol):
    """Port for diarization operations."""

    async def refine_meeting(
        self,
        meeting_id: MeetingId,
        num_speakers: int | None = None,
    ) -> str:
        """Start diarization job, return job_id."""
        ...

    async def rename_speaker(
        self,
        meeting_id: MeetingId,
        old_id: str,
        new_name: str,
    ) -> int:
        """Rename speaker, return segments updated."""
        ...

Files Created/Modified

File Change Type
src/noteflow/application/services/diarization_service.py Create
src/noteflow/domain/diarization/ports.py Create
src/noteflow/grpc/server.py Modify - add initialization
src/noteflow/grpc/service.py Modify - accept service
src/noteflow/grpc/_mixins/diarization.py Modify - use service
tests/application/test_diarization_service.py Create

Testing

  • Unit tests for DiarizationService with mock engine
  • Integration test: full diarization flow with test audio
  • Test job status transitions: queued → running → completed/failed

Feature 3: PDF Export

Priority: 3 Owner: Backend Complexity: Low-Medium

Current State

  • Backend: MarkdownExporter and HtmlExporter exist in infrastructure/export/
  • Frontend: TODO comment in API interface, only markdown/HTML buttons enabled

Gap Analysis

Users expect to export transcripts as PDF for sharing and archival. The existing export infrastructure supports adding new formats.

Implementation Plan

Step 1: Add PDF Enum Value

File: src/noteflow/grpc/proto/noteflow.proto

enum ExportFormat {
  EXPORT_FORMAT_UNSPECIFIED = 0;
  EXPORT_FORMAT_MARKDOWN = 1;
  EXPORT_FORMAT_HTML = 2;
  EXPORT_FORMAT_PDF = 3;  // NEW
}

Step 2: Create PDF Exporter

File: src/noteflow/infrastructure/export/pdf_exporter.py

"""PDF transcript exporter using weasyprint."""

from __future__ import annotations

from typing import TYPE_CHECKING

from weasyprint import HTML

from noteflow.infrastructure.export._formatting import (
    format_datetime,
    format_timestamp,
)
from noteflow.infrastructure.export.protocols import TranscriptExporter

if TYPE_CHECKING:
    from noteflow.domain.entities.meeting import Meeting


class PdfExporter(TranscriptExporter):
    """Export transcripts to PDF format."""

    def export(self, meeting: Meeting) -> bytes:
        """Export meeting transcript to PDF bytes."""
        html_content = self._build_html(meeting)
        pdf_bytes = HTML(string=html_content).write_pdf()
        return pdf_bytes

    def _build_html(self, meeting: Meeting) -> str:
        """Build HTML content for PDF rendering."""
        css = """
        <style>
            body { font-family: 'Helvetica Neue', Arial, sans-serif; margin: 40px; }
            h1 { color: #333; border-bottom: 2px solid #333; padding-bottom: 10px; }
            .metadata { color: #666; margin-bottom: 20px; }
            .segment { margin: 15px 0; padding: 10px; background: #f9f9f9; }
            .speaker { font-weight: bold; color: #2563eb; }
            .timestamp { color: #888; font-size: 0.9em; }
            .text { margin-top: 5px; line-height: 1.6; }
            .summary { background: #f0f9ff; padding: 20px; margin: 20px 0; }
            .summary h2 { color: #1e40af; }
            .action-item { background: #fef3c7; padding: 10px; margin: 5px 0; }
        </style>
        """

        title = meeting.title or f"Meeting {meeting.id}"
        date = format_datetime(meeting.created_at) if meeting.created_at else "Unknown"
        duration = (
            format_timestamp(meeting.duration_seconds)
            if meeting.duration_seconds
            else "Unknown"
        )

        segments_html = ""
        for segment in meeting.segments:
            speaker = segment.speaker_id or "Unknown"
            timestamp = format_timestamp(segment.start_time)
            segments_html += f"""
            <div class="segment">
                <span class="speaker">{speaker}</span>
                <span class="timestamp">[{timestamp}]</span>
                <div class="text">{segment.text}</div>
            </div>
            """

        summary_html = ""
        if meeting.summary:
            summary_html = f"""
            <div class="summary">
                <h2>Summary</h2>
                <p>{meeting.summary.executive_summary}</p>
                <h3>Key Points</h3>
                <ul>
                    {"".join(f"<li>{kp.text}</li>" for kp in meeting.summary.key_points)}
                </ul>
                <h3>Action Items</h3>
                {"".join(f'<div class="action-item">{ai.text}</div>' for ai in meeting.summary.action_items)}
            </div>
            """

        return f"""
        <!DOCTYPE html>
        <html>
        <head>
            <meta charset="utf-8">
            <title>{title}</title>
            {css}
        </head>
        <body>
            <h1>{title}</h1>
            <div class="metadata">
                <p>Date: {date} | Duration: {duration} | Segments: {len(meeting.segments)}</p>
            </div>
            {summary_html}
            <h2>Transcript</h2>
            {segments_html}
        </body>
        </html>
        """

Step 3: Register in Factory

File: src/noteflow/infrastructure/export/__init__.py

from noteflow.infrastructure.export.pdf_exporter import PdfExporter

EXPORTERS = {
    ExportFormat.EXPORT_FORMAT_MARKDOWN: MarkdownExporter,
    ExportFormat.EXPORT_FORMAT_HTML: HtmlExporter,
    ExportFormat.EXPORT_FORMAT_PDF: PdfExporter,
}

Step 4: Update gRPC Mixin

File: src/noteflow/grpc/_mixins/export.py

Handle PDF format - note that PDF returns bytes, not string.

Step 5: Frontend - Add PDF Option

File: client/src-tauri/src/commands/export.rs

// Update format enum handling
match format.as_str() {
    "markdown" => ExportFormat::Markdown,
    "html" => ExportFormat::Html,
    "pdf" => ExportFormat::Pdf,
    _ => return Err("Invalid format".to_string()),
}

File: client/src/pages/MeetingDetail.tsx

// Add PDF button alongside existing export buttons
<Button onClick={() => handleExport('pdf')}>
  Export PDF
</Button>

Dependencies

Add to pyproject.toml:

weasyprint = "^62.0"

Note: weasyprint requires system dependencies (cairo, pango). Document in README.

Files Created/Modified

File Change Type
src/noteflow/grpc/proto/noteflow.proto Modify - add PDF enum
src/noteflow/infrastructure/export/pdf_exporter.py Create
src/noteflow/infrastructure/export/__init__.py Modify - register
src/noteflow/grpc/_mixins/export.py Modify - handle PDF
client/src-tauri/src/commands/export.rs Modify - add PDF
client/src/pages/MeetingDetail.tsx Modify - add button
pyproject.toml Modify - add weasyprint

Testing

  • Unit test for PdfExporter with mock meeting
  • Verify PDF is valid and readable
  • Test with meetings containing/lacking summaries

Phase 2: Intelligence Features

Feature 4: Named Entity Extraction (NER)

Priority: 4 Owner: Backend Complexity: Medium

Current State

  • Frontend: EntityHighlightText.tsx renders highlighted entities, EntityManagementPanel.tsx allows manual CRUD
  • Backend: No NER service; entities are purely client-side

Gap Analysis

Users manually highlight entities in the UI, but automatic extraction would save significant effort and ensure consistency across transcripts.

Implementation Plan

Step 1: Create Domain Entity

File: src/noteflow/domain/entities/named_entity.py

"""Named entity domain entity."""

from __future__ import annotations

from dataclasses import dataclass, field
from enum import Enum
from typing import TYPE_CHECKING
from uuid import UUID, uuid4

if TYPE_CHECKING:
    from noteflow.domain.entities.meeting import MeetingId


class EntityCategory(Enum):
    """Categories for named entities."""

    PERSON = "person"
    COMPANY = "company"
    PRODUCT = "product"
    TECHNICAL = "technical"
    ACRONYM = "acronym"
    LOCATION = "location"
    DATE = "date"
    OTHER = "other"


@dataclass
class NamedEntity:
    """A named entity extracted from transcript."""

    id: UUID = field(default_factory=uuid4)
    meeting_id: MeetingId | None = None
    text: str = ""
    category: EntityCategory = EntityCategory.OTHER
    segment_ids: list[int] = field(default_factory=list)
    confidence: float = 0.0
    is_pinned: bool = False  # User-confirmed entity

    @classmethod
    def create(
        cls,
        text: str,
        category: EntityCategory,
        segment_ids: list[int],
        confidence: float,
        meeting_id: MeetingId | None = None,
    ) -> NamedEntity:
        """Create a new named entity."""
        return cls(
            text=text,
            category=category,
            segment_ids=segment_ids,
            confidence=confidence,
            meeting_id=meeting_id,
        )

Step 2: Create NER Infrastructure

File: src/noteflow/infrastructure/ner/__init__.py

"""Named Entity Recognition infrastructure."""

from noteflow.infrastructure.ner.engine import NerEngine
from noteflow.infrastructure.ner.protocols import NerPort

__all__ = ["NerEngine", "NerPort"]

File: src/noteflow/infrastructure/ner/protocols.py

"""NER port interface."""

from typing import Protocol

from noteflow.domain.entities.named_entity import NamedEntity


class NerPort(Protocol):
    """Port for named entity recognition."""

    def extract(self, text: str) -> list[NamedEntity]:
        """Extract named entities from text."""
        ...

File: src/noteflow/infrastructure/ner/engine.py

"""NER engine using spaCy."""

from __future__ import annotations

import spacy
from spacy.language import Language

from noteflow.domain.entities.named_entity import EntityCategory, NamedEntity


# Map spaCy entity types to our categories
SPACY_TO_CATEGORY = {
    "PERSON": EntityCategory.PERSON,
    "ORG": EntityCategory.COMPANY,
    "PRODUCT": EntityCategory.PRODUCT,
    "GPE": EntityCategory.LOCATION,
    "LOC": EntityCategory.LOCATION,
    "DATE": EntityCategory.DATE,
    "TIME": EntityCategory.DATE,
}


class NerEngine:
    """Named entity recognition engine using spaCy."""

    def __init__(self, model_name: str = "en_core_web_sm") -> None:
        self._model_name = model_name
        self._nlp: Language | None = None

    def _ensure_loaded(self) -> Language:
        """Lazy-load the spaCy model."""
        if self._nlp is None:
            self._nlp = spacy.load(self._model_name)
        return self._nlp

    def extract(self, text: str) -> list[NamedEntity]:
        """Extract named entities from text.

        Args:
            text: Input text to analyze

        Returns:
            List of extracted named entities
        """
        nlp = self._ensure_loaded()
        doc = nlp(text)

        entities: list[NamedEntity] = []
        seen: set[str] = set()

        for ent in doc.ents:
            # Deduplicate by normalized text
            key = ent.text.lower().strip()
            if key in seen or not key:
                continue
            seen.add(key)

            category = SPACY_TO_CATEGORY.get(ent.label_, EntityCategory.OTHER)
            entities.append(
                NamedEntity.create(
                    text=ent.text,
                    category=category,
                    segment_ids=[],  # Will be filled by caller
                    confidence=0.8,  # spaCy doesn't provide per-entity confidence
                )
            )

        return entities

    def extract_from_segments(
        self,
        segments: list[tuple[int, str]],
    ) -> list[NamedEntity]:
        """Extract entities from multiple segments with segment tracking.

        Args:
            segments: List of (segment_id, text) tuples

        Returns:
            Entities with segment_ids populated
        """
        entity_segments: dict[str, list[int]] = {}
        all_entities: dict[str, NamedEntity] = {}

        for segment_id, text in segments:
            entities = self.extract(text)
            for entity in entities:
                key = entity.text.lower()
                if key not in all_entities:
                    all_entities[key] = entity
                    entity_segments[key] = []
                entity_segments[key].append(segment_id)

        # Update segment IDs
        for key, entity in all_entities.items():
            entity.segment_ids = sorted(set(entity_segments[key]))

        return list(all_entities.values())

Step 3: Add Proto Definitions

File: src/noteflow/grpc/proto/noteflow.proto

// Add to service definition
rpc ExtractEntities(ExtractEntitiesRequest) returns (ExtractEntitiesResponse);

message ExtractEntitiesRequest {
  string meeting_id = 1;
  bool force_refresh = 2;  // Re-extract even if entities exist
}

message ExtractedEntity {
  string id = 1;
  string text = 2;
  string category = 3;  // person, company, product, technical, acronym, location, date, other
  repeated int32 segment_ids = 4;
  float confidence = 5;
  bool is_pinned = 6;
}

message ExtractEntitiesResponse {
  repeated ExtractedEntity entities = 1;
  int32 total_count = 2;
}

Step 4: Create gRPC Mixin

File: src/noteflow/grpc/_mixins/entities.py

"""Entity extraction gRPC mixin."""

from __future__ import annotations

from typing import TYPE_CHECKING

from noteflow.grpc.proto import noteflow_pb2

if TYPE_CHECKING:
    import grpc

    from noteflow.grpc._mixins.protocols import ServicerHost


class EntitiesMixin:
    """Mixin for entity extraction RPC methods."""

    async def ExtractEntities(
        self: ServicerHost,
        request: noteflow_pb2.ExtractEntitiesRequest,
        context: grpc.aio.ServicerContext,
    ) -> noteflow_pb2.ExtractEntitiesResponse:
        """Extract named entities from meeting transcript."""
        meeting_id = self._parse_meeting_id(request.meeting_id)

        async with self._create_repository_provider() as provider:
            meeting = await provider.meetings.get(meeting_id)
            if not meeting:
                context.set_code(grpc.StatusCode.NOT_FOUND)
                context.set_details(f"Meeting {meeting_id} not found")
                return noteflow_pb2.ExtractEntitiesResponse()

            # Build segment data for extraction
            segments = [(s.segment_id, s.text) for s in meeting.segments]

            # Extract entities
            entities = self._ner_engine.extract_from_segments(segments)

            # Convert to proto
            proto_entities = [
                noteflow_pb2.ExtractedEntity(
                    id=str(e.id),
                    text=e.text,
                    category=e.category.value,
                    segment_ids=e.segment_ids,
                    confidence=e.confidence,
                    is_pinned=e.is_pinned,
                )
                for e in entities
            ]

            return noteflow_pb2.ExtractEntitiesResponse(
                entities=proto_entities,
                total_count=len(proto_entities),
            )

Step 5: Frontend Integration

File: client/src-tauri/src/commands/entities.rs

#[tauri::command]
pub async fn extract_entities(
    meeting_id: String,
    force_refresh: Option<bool>,
    state: State<'_, AppState>,
) -> Result<Vec<ExtractedEntity>, String> {
    let client = state.grpc_client.lock().await;
    // ... gRPC call
}

File: client/src/api/tauri-adapter.ts

async extractEntities(meetingId: string): Promise<ExtractedEntity[]> {
  return invoke(Commands.EXTRACT_ENTITIES, { meetingId });
}

File: client/src/pages/MeetingDetail.tsx

Add "Auto-Extract Entities" button that calls the API and populates EntityManagementPanel.

Dependencies

Add to pyproject.toml:

spacy = "^3.7"

Post-install: python -m spacy download en_core_web_sm

Files Created/Modified

File Change Type
src/noteflow/domain/entities/named_entity.py Create
src/noteflow/infrastructure/ner/__init__.py Create
src/noteflow/infrastructure/ner/protocols.py Create
src/noteflow/infrastructure/ner/engine.py Create
src/noteflow/grpc/proto/noteflow.proto Modify - add RPC
src/noteflow/grpc/_mixins/entities.py Create
src/noteflow/grpc/service.py Modify - add mixin
client/src-tauri/src/commands/entities.rs Create
client/src/api/tauri-adapter.ts Modify - add method
client/src/pages/MeetingDetail.tsx Modify - add button

Testing

  • Unit tests for NerEngine with sample text
  • Test entity deduplication across segments
  • Test category mapping from spaCy types
  • Integration test: full extraction flow

Phase 3: Integration Pipeline

Feature 5: Calendar Sync

Priority: 5 Owner: Backend Complexity: Medium-High

Current State

  • Frontend: UpcomingMeetings.tsx component renders placeholder calendar events, IntegrationConfigPanel.tsx has OAuth configuration forms
  • Backend: TriggerService has calendar signal provider for meeting detection but no sync handlers

Gap Analysis

Users can configure Google/Outlook calendar OAuth in Settings, but there's no backend to actually sync calendar events. The upcoming meetings display is currently empty/mocked.

Implementation Plan

Step 1: Create Calendar Port

File: src/noteflow/domain/calendar/ports.py

"""Calendar port interfaces."""

from dataclasses import dataclass
from datetime import datetime
from typing import Protocol


@dataclass
class CalendarEvent:
    """A calendar event."""

    id: str
    title: str
    start_time: datetime
    end_time: datetime
    attendees: list[str]
    location: str | None = None
    description: str | None = None
    is_recurring: bool = False
    meeting_url: str | None = None


class CalendarPort(Protocol):
    """Port for calendar operations."""

    async def list_events(
        self,
        from_time: datetime,
        to_time: datetime,
        limit: int = 50,
    ) -> list[CalendarEvent]:
        """List calendar events in time range."""
        ...

    async def refresh_token(self) -> bool:
        """Refresh OAuth token if needed."""
        ...

Step 2: Create Google Calendar Adapter

File: src/noteflow/infrastructure/integrations/google_calendar.py

"""Google Calendar integration."""

from __future__ import annotations

from datetime import datetime
from typing import TYPE_CHECKING

from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build

from noteflow.domain.calendar.ports import CalendarEvent, CalendarPort

if TYPE_CHECKING:
    from noteflow.infrastructure.security.keystore import KeyStore


class GoogleCalendarAdapter(CalendarPort):
    """Google Calendar API adapter."""

    def __init__(
        self,
        keystore: KeyStore,
        client_id: str,
        client_secret: str,
    ) -> None:
        self._keystore = keystore
        self._client_id = client_id
        self._client_secret = client_secret
        self._service = None

    async def _ensure_service(self):
        """Initialize Google Calendar service with stored credentials."""
        if self._service is not None:
            return

        # Retrieve stored tokens from keystore
        tokens = await self._keystore.get("google_calendar_tokens")
        if not tokens:
            raise ValueError("Google Calendar not authenticated")

        creds = Credentials(
            token=tokens["access_token"],
            refresh_token=tokens.get("refresh_token"),
            client_id=self._client_id,
            client_secret=self._client_secret,
            token_uri="https://oauth2.googleapis.com/token",
        )

        self._service = build("calendar", "v3", credentials=creds)

    async def list_events(
        self,
        from_time: datetime,
        to_time: datetime,
        limit: int = 50,
    ) -> list[CalendarEvent]:
        """List calendar events from Google Calendar."""
        await self._ensure_service()

        events_result = (
            self._service.events()
            .list(
                calendarId="primary",
                timeMin=from_time.isoformat() + "Z",
                timeMax=to_time.isoformat() + "Z",
                maxResults=limit,
                singleEvents=True,
                orderBy="startTime",
            )
            .execute()
        )

        events = []
        for item in events_result.get("items", []):
            start = item.get("start", {})
            end = item.get("end", {})

            events.append(
                CalendarEvent(
                    id=item["id"],
                    title=item.get("summary", "Untitled"),
                    start_time=datetime.fromisoformat(
                        start.get("dateTime", start.get("date"))
                    ),
                    end_time=datetime.fromisoformat(
                        end.get("dateTime", end.get("date"))
                    ),
                    attendees=[
                        a.get("email", "")
                        for a in item.get("attendees", [])
                    ],
                    location=item.get("location"),
                    description=item.get("description"),
                    meeting_url=item.get("hangoutLink"),
                )
            )

        return events

    async def refresh_token(self) -> bool:
        """Refresh OAuth token."""
        # Implementation for token refresh
        return True

Step 3: Add Proto Definitions

File: src/noteflow/grpc/proto/noteflow.proto

// Calendar RPCs
rpc ListCalendarEvents(ListCalendarEventsRequest) returns (ListCalendarEventsResponse);

message CalendarEvent {
  string id = 1;
  string title = 2;
  int64 start_time = 3;  // Unix timestamp
  int64 end_time = 4;
  repeated string attendees = 5;
  string location = 6;
  string description = 7;
  string meeting_url = 8;
}

message ListCalendarEventsRequest {
  int64 from_time = 1;
  int64 to_time = 2;
  int32 limit = 3;
  string provider = 4;  // google, outlook
}

message ListCalendarEventsResponse {
  repeated CalendarEvent events = 1;
  string next_sync_token = 2;
}

Step 4: Create Calendar Service

File: src/noteflow/application/services/calendar_service.py

"""Calendar application service."""

from __future__ import annotations

from datetime import datetime, timedelta
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from noteflow.domain.calendar.ports import CalendarEvent, CalendarPort


class CalendarService:
    """Orchestrates calendar operations across providers."""

    def __init__(self) -> None:
        self._providers: dict[str, CalendarPort] = {}

    def register_provider(self, name: str, provider: CalendarPort) -> None:
        """Register a calendar provider."""
        self._providers[name] = provider

    async def list_upcoming_events(
        self,
        hours_ahead: int = 24,
        limit: int = 10,
        provider: str | None = None,
    ) -> list[CalendarEvent]:
        """List upcoming calendar events.

        Args:
            hours_ahead: How far ahead to look
            limit: Maximum events to return
            provider: Specific provider, or None for all

        Returns:
            List of upcoming calendar events
        """
        now = datetime.utcnow()
        until = now + timedelta(hours=hours_ahead)

        events: list[CalendarEvent] = []

        providers = (
            {provider: self._providers[provider]}
            if provider and provider in self._providers
            else self._providers
        )

        for name, cal in providers.items():
            try:
                provider_events = await cal.list_events(now, until, limit)
                events.extend(provider_events)
            except Exception:
                # Log but continue with other providers
                pass

        # Sort by start time and limit
        events.sort(key=lambda e: e.start_time)
        return events[:limit]

Step 5: Create gRPC Mixin

File: src/noteflow/grpc/_mixins/calendar.py

Step 6: Frontend Integration

File: client/src-tauri/src/commands/calendar.rs File: client/src/hooks/use-calendar-sync.ts File: client/src/components/upcoming-meetings.tsx - Wire to real data

Dependencies

google-api-python-client = "^2.100"
google-auth = "^2.23"
google-auth-oauthlib = "^1.1"

Files Created/Modified

File Change Type
src/noteflow/domain/calendar/ports.py Create
src/noteflow/infrastructure/integrations/__init__.py Create
src/noteflow/infrastructure/integrations/google_calendar.py Create
src/noteflow/infrastructure/integrations/outlook_calendar.py Create (similar)
src/noteflow/application/services/calendar_service.py Create
src/noteflow/grpc/proto/noteflow.proto Modify - add RPCs
src/noteflow/grpc/_mixins/calendar.py Create
client/src-tauri/src/commands/calendar.rs Create
client/src/hooks/use-calendar-sync.ts Modify - wire to API
client/src/components/upcoming-meetings.tsx Modify - use real data

Feature 6: Webhook Execution

Priority: 6 Owner: Backend Complexity: Medium

Current State

  • Frontend: IntegrationConfigPanel.tsx has webhook URL input and event selection
  • Backend: No webhook execution logic

Gap Analysis

Users can configure webhooks in Settings but they never fire. Webhooks should trigger on meeting events (completed, summary generated).

Implementation Plan

Step 1: Create Webhook Infrastructure

File: src/noteflow/infrastructure/integrations/webhooks.py

"""Webhook execution infrastructure."""

from __future__ import annotations

import asyncio
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Any

import httpx


class WebhookEvent(Enum):
    """Webhook trigger events."""

    MEETING_COMPLETED = "meeting.completed"
    SUMMARY_GENERATED = "summary.generated"
    RECORDING_STARTED = "recording.started"
    RECORDING_STOPPED = "recording.stopped"


@dataclass
class WebhookConfig:
    """Webhook configuration."""

    url: str
    events: list[WebhookEvent]
    secret: str | None = None
    enabled: bool = True


@dataclass
class WebhookDelivery:
    """Record of webhook delivery attempt."""

    webhook_url: str
    event: WebhookEvent
    payload: dict[str, Any]
    status_code: int | None
    error: str | None
    delivered_at: datetime


class WebhookExecutor:
    """Execute webhooks with retry logic."""

    def __init__(
        self,
        max_retries: int = 3,
        timeout_seconds: float = 10.0,
    ) -> None:
        self._max_retries = max_retries
        self._timeout = timeout_seconds
        self._client = httpx.AsyncClient(timeout=self._timeout)

    async def deliver(
        self,
        config: WebhookConfig,
        event: WebhookEvent,
        payload: dict[str, Any],
    ) -> WebhookDelivery:
        """Deliver webhook with retries.

        Args:
            config: Webhook configuration
            event: Event type
            payload: Event payload

        Returns:
            Delivery record with status
        """
        if not config.enabled or event not in config.events:
            return WebhookDelivery(
                webhook_url=config.url,
                event=event,
                payload=payload,
                status_code=None,
                error="Webhook disabled or event not subscribed",
                delivered_at=datetime.utcnow(),
            )

        headers = {
            "Content-Type": "application/json",
            "X-NoteFlow-Event": event.value,
        }

        if config.secret:
            # Add HMAC signature
            import hashlib
            import hmac
            import json

            body = json.dumps(payload)
            signature = hmac.new(
                config.secret.encode(),
                body.encode(),
                hashlib.sha256,
            ).hexdigest()
            headers["X-NoteFlow-Signature"] = f"sha256={signature}"

        last_error = None
        for attempt in range(self._max_retries):
            try:
                response = await self._client.post(
                    config.url,
                    json=payload,
                    headers=headers,
                )
                return WebhookDelivery(
                    webhook_url=config.url,
                    event=event,
                    payload=payload,
                    status_code=response.status_code,
                    error=None if response.is_success else response.text,
                    delivered_at=datetime.utcnow(),
                )
            except Exception as e:
                last_error = str(e)
                if attempt < self._max_retries - 1:
                    await asyncio.sleep(2**attempt)  # Exponential backoff

        return WebhookDelivery(
            webhook_url=config.url,
            event=event,
            payload=payload,
            status_code=None,
            error=last_error,
            delivered_at=datetime.utcnow(),
        )

    async def close(self) -> None:
        """Close HTTP client."""
        await self._client.aclose()

Step 2: Create Webhook Service

File: src/noteflow/application/services/webhook_service.py

"""Webhook application service."""

from __future__ import annotations

from typing import TYPE_CHECKING, Any

from noteflow.infrastructure.integrations.webhooks import (
    WebhookConfig,
    WebhookDelivery,
    WebhookEvent,
    WebhookExecutor,
)

if TYPE_CHECKING:
    from noteflow.domain.entities.meeting import Meeting


class WebhookService:
    """Orchestrates webhook delivery."""

    def __init__(self, executor: WebhookExecutor | None = None) -> None:
        self._executor = executor or WebhookExecutor()
        self._configs: list[WebhookConfig] = []

    def register_webhook(self, config: WebhookConfig) -> None:
        """Register a webhook configuration."""
        self._configs.append(config)

    async def trigger_meeting_completed(
        self,
        meeting: Meeting,
    ) -> list[WebhookDelivery]:
        """Trigger webhooks for meeting completion."""
        payload = {
            "event": WebhookEvent.MEETING_COMPLETED.value,
            "meeting_id": str(meeting.id),
            "title": meeting.title,
            "duration_seconds": meeting.duration_seconds,
            "segment_count": len(meeting.segments),
            "has_summary": meeting.summary is not None,
        }

        return await self._deliver_to_all(WebhookEvent.MEETING_COMPLETED, payload)

    async def trigger_summary_generated(
        self,
        meeting: Meeting,
    ) -> list[WebhookDelivery]:
        """Trigger webhooks for summary generation."""
        payload = {
            "event": WebhookEvent.SUMMARY_GENERATED.value,
            "meeting_id": str(meeting.id),
            "title": meeting.title,
            "executive_summary": (
                meeting.summary.executive_summary if meeting.summary else None
            ),
            "key_points_count": (
                len(meeting.summary.key_points) if meeting.summary else 0
            ),
            "action_items_count": (
                len(meeting.summary.action_items) if meeting.summary else 0
            ),
        }

        return await self._deliver_to_all(WebhookEvent.SUMMARY_GENERATED, payload)

    async def _deliver_to_all(
        self,
        event: WebhookEvent,
        payload: dict[str, Any],
    ) -> list[WebhookDelivery]:
        """Deliver event to all registered webhooks."""
        deliveries = []
        for config in self._configs:
            delivery = await self._executor.deliver(config, event, payload)
            deliveries.append(delivery)
        return deliveries

Step 3: Wire to gRPC Mixins

File: src/noteflow/grpc/_mixins/meeting.py

After StopMeeting completes successfully, call webhook_service.trigger_meeting_completed().

File: src/noteflow/grpc/_mixins/summarization.py

After GenerateSummary completes successfully, call webhook_service.trigger_summary_generated().

Files Created/Modified

File Change Type
src/noteflow/infrastructure/integrations/webhooks.py Create
src/noteflow/application/services/webhook_service.py Create
src/noteflow/grpc/server.py Modify - initialize service
src/noteflow/grpc/_mixins/meeting.py Modify - trigger on complete
src/noteflow/grpc/_mixins/summarization.py Modify - trigger on summary

Implementation Order & Dependencies

Sprint 0: Foundation (MUST complete first)
└── Proto & Schema Foundation ───────────────→ All proto + DB migrations + feature flags

Phase 1: Core Pipeline (Parallel, after Sprint 0):
├── Sprint 1: AI Templates ─────────────────┐
├── Sprint 3: PDF Export ───────────────────┤─→ Use proto from Sprint 0
└── Sprint 2: Diarization Service ──────────┘   (DB persistence, application layer)

Phase 2: Intelligence (after Phase 1):
└── Sprint 4: NER ──────────────────────────→ Uses NerService application layer

Phase 3: Integrations (Sequential, after Phase 2):
├── Sprint 5: Calendar Sync ────────────────→ Complete OAuth flow with PKCE
└── Sprint 6: Webhooks ─────────────────────→ HMAC signing, retry logic

Critical Path

  1. Sprint 0 is the mandatory prerequisite for all other sprints
  2. All proto/DB changes consolidated in Sprint 0 - no more scattered migrations
  3. Feature flags control feature availability before full rollout
  4. Application service layer required for Sprints 2, 4, 5 (hexagonal architecture)
  5. Sprint 5 OAuth now includes complete PKCE flow, token persistence, and refresh

Architectural Decisions (Updated)

Sprint Key Improvement
Sprint 0 Consolidated proto + feature flags + Docker model downloads
Sprint 2 Database persistence via repository (not in-memory _jobs dict)
Sprint 4 NerService application layer (gRPC → Service → Engine)
Sprint 5 Complete OAuth with PKCE, token storage, and auto-refresh

Quality Gates

Each sprint must pass before merge:

pytest tests/quality/      # 23+ quality checks
ruff check src/noteflow    # Linting
basedpyright               # Type checking

See docs/sprints/QUALITY_STANDARDS.md for thresholds and reduction targets.


Testing Strategy

Feature Unit Tests Integration Tests
AI Templates Prompt builder Full summary with templates
Diarization Service Service methods Engine + DB integration
PDF Export Exporter class Full export + file validation
NER Engine extraction Full meeting extraction
Calendar Adapter methods OAuth flow (mock)
Webhooks Executor delivery Full trigger flow

Risk Assessment

Risk Mitigation
weasyprint system deps Document in README, test in CI
spaCy model size (~50MB) Lazy loading, optional dependency
Google OAuth complexity Use existing library patterns
Webhook reliability Retry logic, delivery logging
Proto breaking changes Careful versioning, staged rollout