feat: Introduce sync error codes, client-side sync notifications, and update gRPC definitions for integration synchronization.

This commit is contained in:
2026-01-18 20:55:43 +00:00
parent 021451aa70
commit 796f13b358
36 changed files with 6993 additions and 2539 deletions

View File

@@ -10,6 +10,18 @@ use tonic::Code;
use super::core::GrpcClient;
/// Convert proto SyncErrorCode enum integer to domain string code.
fn sync_error_code_to_string(code: i32) -> Option<String> {
match code {
0 => None, // UNSPECIFIED maps to None
1 => Some("auth_required".to_string()),
2 => Some("provider_error".to_string()),
3 => Some("internal_error".to_string()),
4 => Some("unknown".to_string()),
_ => Some("unknown".to_string()),
}
}
/// Map gRPC status to appropriate error, extracting NOT_FOUND for integrations.
fn map_sync_grpc_error(status: tonic::Status, integration_id: &str) -> Error {
if status.code() == Code::NotFound {
@@ -54,8 +66,8 @@ impl GrpcClient {
status: response.status,
items_synced: response.items_synced,
items_total: response.items_total,
error_message: response.error_message,
duration_ms: response.duration_ms,
error_code: sync_error_code_to_string(response.error_code),
})
}
@@ -109,10 +121,10 @@ fn convert_sync_run(run: pb::SyncRunProto) -> SyncRunRecord {
integration_id: run.integration_id,
status: run.status,
items_synced: run.items_synced,
error_message: run.error_message,
duration_ms: run.duration_ms,
started_at: run.started_at,
completed_at: run.completed_at,
error_code: sync_error_code_to_string(run.error_code),
}
}

View File

@@ -1544,12 +1544,13 @@ pub struct GetSyncStatusResponse {
/// Total items to sync (if known)
#[prost(int32, tag = "3")]
pub items_total: i32,
/// Error message if status is "error"
#[prost(string, tag = "4")]
pub error_message: ::prost::alloc::string::String,
/// Duration in milliseconds (when completed)
#[prost(int64, tag = "5")]
pub duration_ms: i64,
/// Structured error code for programmatic error handling
/// (enables client to distinguish auth failures from other errors)
#[prost(enumeration = "SyncErrorCode", tag = "6")]
pub error_code: i32,
/// When this sync run expires from cache (ISO 8601 timestamp)
/// (Sprint GAP-002: State Synchronization)
#[prost(string, optional, tag = "10")]
@@ -1594,9 +1595,6 @@ pub struct SyncRunProto {
/// Number of items synced
#[prost(int32, tag = "4")]
pub items_synced: i32,
/// Error message if failed
#[prost(string, tag = "5")]
pub error_message: ::prost::alloc::string::String,
/// Duration in milliseconds
#[prost(int64, tag = "6")]
pub duration_ms: i64,
@@ -1606,6 +1604,9 @@ pub struct SyncRunProto {
/// Completion timestamp (ISO 8601, empty if running)
#[prost(string, tag = "8")]
pub completed_at: ::prost::alloc::string::String,
/// Structured error code for programmatic error handling
#[prost(enumeration = "SyncErrorCode", tag = "9")]
pub error_code: i32,
}
/// Empty - uses identity context for user/workspace filtering
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
@@ -2694,6 +2695,47 @@ impl JobStatus {
}
}
}
/// Structured error codes for sync failures (enables programmatic error handling)
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum SyncErrorCode {
/// Default/unspecified
Unspecified = 0,
/// Authentication required - token expired or revoked, needs re-auth
AuthRequired = 1,
/// External provider error - API failure, transient error
ProviderError = 2,
/// Internal server error - unexpected failure in sync logic
InternalError = 3,
/// Unknown error - fallback for unclassified errors
Unknown = 4,
}
impl SyncErrorCode {
/// String value of the enum field names used in the ProtoBuf definition.
///
/// The values are not transformed in any way and thus are considered stable
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
pub fn as_str_name(&self) -> &'static str {
match self {
Self::Unspecified => "SYNC_ERROR_CODE_UNSPECIFIED",
Self::AuthRequired => "SYNC_ERROR_CODE_AUTH_REQUIRED",
Self::ProviderError => "SYNC_ERROR_CODE_PROVIDER_ERROR",
Self::InternalError => "SYNC_ERROR_CODE_INTERNAL_ERROR",
Self::Unknown => "SYNC_ERROR_CODE_UNKNOWN",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"SYNC_ERROR_CODE_UNSPECIFIED" => Some(Self::Unspecified),
"SYNC_ERROR_CODE_AUTH_REQUIRED" => Some(Self::AuthRequired),
"SYNC_ERROR_CODE_PROVIDER_ERROR" => Some(Self::ProviderError),
"SYNC_ERROR_CODE_INTERNAL_ERROR" => Some(Self::InternalError),
"SYNC_ERROR_CODE_UNKNOWN" => Some(Self::Unknown),
_ => None,
}
}
}
/// Status of an individual processing step (summary, entities, diarization)
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]

View File

@@ -37,8 +37,10 @@ pub struct GetSyncStatusResult {
pub status: String,
pub items_synced: i32,
pub items_total: i32,
pub error_message: String,
pub duration_ms: i64,
/// Structured error code for programmatic error handling.
#[serde(skip_serializing_if = "Option::is_none")]
pub error_code: Option<String>,
}
/// A single sync run record.
@@ -48,10 +50,12 @@ pub struct SyncRunRecord {
pub integration_id: String,
pub status: String,
pub items_synced: i32,
pub error_message: String,
pub duration_ms: i64,
pub started_at: String,
pub completed_at: String,
/// Structured error code for programmatic error handling.
#[serde(skip_serializing_if = "Option::is_none")]
pub error_code: Option<String>,
}
/// Response from listing sync history.

View File

@@ -5,6 +5,55 @@
*/
export type SyncRunStatus = 'running' | 'success' | 'error';
/**
* Structured error codes for sync failures.
* Enables programmatic detection of specific error types,
* particularly authentication failures that require user action.
*/
export type SyncErrorCode =
| 'unspecified'
| 'auth_required'
| 'provider_error'
| 'internal_error'
| 'unknown';
/**
* Map proto enum values to domain error codes
*/
export const SYNC_ERROR_CODE_MAP: Record<number, SyncErrorCode> = {
0: 'unspecified',
1: 'auth_required',
2: 'provider_error',
3: 'internal_error',
4: 'unknown',
};
/**
* Check if the error code indicates an authentication failure
* that requires the user to re-authenticate.
*/
export function isSyncAuthError(errorCode: SyncErrorCode | undefined): boolean {
return errorCode === 'auth_required';
}
/**
* Convert sync error code to a user-friendly message.
*/
export function getSyncErrorMessage(errorCode: SyncErrorCode | undefined): string {
switch (errorCode) {
case 'auth_required':
return 'Authentication expired - please reconnect';
case 'provider_error':
return 'External service error - please try again later';
case 'internal_error':
return 'Internal error - please contact support';
case 'unknown':
case 'unspecified':
default:
return 'Sync failed';
}
}
/**
* Request to start an integration sync
*/
@@ -34,8 +83,12 @@ export interface GetSyncStatusResponse {
status: SyncRunStatus;
items_synced: number;
items_total: number;
error_message: string;
duration_ms: number;
/**
* Structured error code for programmatic error handling.
* Enables client to distinguish auth failures from other errors.
*/
error_code?: SyncErrorCode;
/**
* When this sync run expires from cache (ISO 8601 timestamp).
* (Sprint GAP-002: State Synchronization)
@@ -65,10 +118,13 @@ export interface SyncRunRecord {
integration_id: string;
status: SyncRunStatus;
items_synced: number;
error_message: string;
duration_ms: number;
started_at: string; // ISO 8601
completed_at: string; // ISO 8601, empty if running
/**
* Structured error code for programmatic error handling.
*/
error_code?: SyncErrorCode;
}
/**

View File

@@ -75,7 +75,8 @@ export interface Integration {
type: 'auth' | 'email' | 'calendar' | 'pkm' | 'custom' | 'oidc';
status: IntegrationStatus;
last_sync?: number;
error_message?: string;
/** Structured error code for programmatic error handling. */
error_code?: import('../features/sync').SyncErrorCode;
/** Whether OAuth override credentials should be used (calendar only). */
oauth_override_enabled?: boolean;
/** Whether server has a stored OAuth override secret (calendar only). */

View File

@@ -0,0 +1,68 @@
/**
* Sync notification utilities for integration sync scheduler.
* Handles quiet hours detection and notification dispatch.
*/
import type { SyncNotificationPreferences } from '@/api/types';
import { toast } from '@/hooks/use-toast';
import { preferences } from '@/lib/preferences';
/**
* Check if current time is within configured quiet hours.
* Handles overnight spans (e.g., 22:00 to 06:00).
*/
export function isInQuietHours(prefs: SyncNotificationPreferences): boolean {
if (!prefs.quiet_hours_enabled || !prefs.quiet_hours_start || !prefs.quiet_hours_end) {
return false;
}
const now = new Date();
const currentMinutes = now.getHours() * 60 + now.getMinutes();
const [startHour, startMin] = prefs.quiet_hours_start.split(':').map(Number);
const [endHour, endMin] = prefs.quiet_hours_end.split(':').map(Number);
const startMinutes = startHour * 60 + startMin;
const endMinutes = endHour * 60 + endMin;
if (startMinutes > endMinutes) {
return currentMinutes >= startMinutes || currentMinutes < endMinutes;
}
return currentMinutes >= startMinutes && currentMinutes < endMinutes;
}
/**
* Send a sync notification based on user preferences.
* Respects enabled state, notification types, and quiet hours.
*/
export function sendSyncNotification(
type: 'success' | 'error',
integrationName: string,
message?: string
): void {
const prefs = preferences.getSyncNotifications();
if (!prefs.enabled) {
return;
}
if (type === 'success' && !prefs.notify_on_success) {
return;
}
if (type === 'error' && !prefs.notify_on_error) {
return;
}
if (isInQuietHours(prefs)) {
return;
}
if (prefs.notify_via_toast) {
toast({
title: type === 'success' ? `${integrationName} synced` : `${integrationName} sync failed`,
description:
message ||
(type === 'success' ? 'Sync completed successfully' : 'An error occurred during sync'),
variant: type === 'error' ? 'destructive' : 'default',
});
}
}

View File

@@ -2,7 +2,10 @@ import { useCallback, useEffect, useRef, useState } from 'react';
import { extractErrorMessage, isIntegrationNotFoundError } from '@/api/helpers';
import { getAPI } from '@/api/interface';
import { generateId } from '@/api/mock-data';
import type { Integration, SyncHistoryEvent, SyncNotificationPreferences } from '@/api/types';
import type { Integration, SyncHistoryEvent } from '@/api/types';
import type { SyncErrorCode } from '@/api/types/features/sync';
import { getSyncErrorMessage, isSyncAuthError } from '@/api/types/features/sync';
import { sendSyncNotification } from '@/hooks/sync-notifications';
import { toast } from '@/hooks/use-toast';
import { preferences } from '@/lib/preferences';
import type { SyncStatus } from '@/lib/status-constants';
@@ -38,62 +41,17 @@ interface UseSyncSchedulerReturn {
isPaused: boolean;
}
function isInQuietHours(prefs: SyncNotificationPreferences): boolean {
if (!prefs.quiet_hours_enabled || !prefs.quiet_hours_start || !prefs.quiet_hours_end) {
return false;
}
const now = new Date();
const currentMinutes = now.getHours() * 60 + now.getMinutes();
const [startHour, startMin] = prefs.quiet_hours_start.split(':').map(Number);
const [endHour, endMin] = prefs.quiet_hours_end.split(':').map(Number);
const startMinutes = startHour * 60 + startMin;
const endMinutes = endHour * 60 + endMin;
if (startMinutes > endMinutes) {
return currentMinutes >= startMinutes || currentMinutes < endMinutes;
}
return currentMinutes >= startMinutes && currentMinutes < endMinutes;
}
function sendNotification(type: 'success' | 'error', integrationName: string, message?: string) {
const prefs = preferences.getSyncNotifications();
if (!prefs.enabled) {
return;
}
if (type === 'success' && !prefs.notify_on_success) {
return;
}
if (type === 'error' && !prefs.notify_on_error) {
return;
}
if (isInQuietHours(prefs)) {
return;
}
if (prefs.notify_via_toast) {
toast({
title: type === 'success' ? `${integrationName} synced` : `${integrationName} sync failed`,
description:
message ||
(type === 'success' ? 'Sync completed successfully' : 'An error occurred during sync'),
variant: type === 'error' ? 'destructive' : 'default',
});
}
}
/** Result of a sync operation with optional not-found flag for cache invalidation. */
interface SyncResult {
success: boolean;
error?: string;
errorCode?: SyncErrorCode;
itemsSynced?: number;
durationMs?: number;
/** Set to true when the integration was not found on the server (stale cache). */
integrationNotFound?: boolean;
/** Set to true when authentication is required (token expired/revoked). */
authRequired?: boolean;
}
async function performSync(integrationId: string): Promise<SyncResult> {
@@ -118,10 +76,13 @@ async function performSync(integrationId: string): Promise<SyncResult> {
}
if (status.status === 'error') {
const errorCode = status.error_code;
return {
success: false,
error: status.error_message || 'Sync failed',
error: getSyncErrorMessage(errorCode),
errorCode,
durationMs: status.duration_ms,
authRequired: isSyncAuthError(errorCode),
};
}
@@ -218,7 +179,7 @@ export function useIntegrationSync(): UseSyncSchedulerReturn {
preferences.updateIntegration(integrationId, {
status: 'disconnected',
integration_id: undefined,
error_message: 'Integration removed - no longer exists on server',
error_code: 'unknown',
});
const interval = intervalsRef.current.get(integrationId);
@@ -257,6 +218,53 @@ export function useIntegrationSync(): UseSyncSchedulerReturn {
return;
}
// Handle authentication errors - token expired or revoked
if (result.authRequired) {
preferences.updateIntegration(integrationId, {
status: 'disconnected',
error_code: 'auth_required',
});
// Stop the scheduler for this integration
const interval = intervalsRef.current.get(integrationId);
if (interval) {
clearInterval(interval);
intervalsRef.current.delete(integrationId);
}
setSyncStates((prev) => ({
...prev,
[integrationId]: {
status: 'error',
lastSync: now,
nextSync: null,
error: 'Authentication expired',
integrationName: integration.name,
},
}));
const historyEvent: SyncHistoryEvent = {
id: generateId(),
integrationId,
integrationName: integration.name,
integrationType: integration.type as 'calendar' | 'pkm',
status: 'error',
timestamp: now,
duration,
error: 'Authentication expired - please reconnect',
};
preferences.addSyncHistoryEvent(historyEvent);
toast({
title: `${integration.name} needs reconnection`,
description:
'Your authentication has expired or been revoked. Please reconnect your account in Settings.',
variant: 'destructive',
});
inFlightRef.current.delete(integrationId);
return;
}
let intervalMinutes = 15; // default
if (integration.type === 'calendar' && integration.calendar_config?.sync_interval_minutes) {
intervalMinutes = integration.calendar_config.sync_interval_minutes;
@@ -289,9 +297,9 @@ export function useIntegrationSync(): UseSyncSchedulerReturn {
if (result.success) {
preferences.updateIntegration(integrationId, { last_sync: now });
sendNotification('success', integration.name);
sendSyncNotification('success', integration.name);
} else {
sendNotification('error', integration.name, result.error);
sendSyncNotification('error', integration.name, result.error);
}
inFlightRef.current.delete(integrationId);
} catch (error) {
@@ -325,7 +333,7 @@ export function useIntegrationSync(): UseSyncSchedulerReturn {
};
preferences.addSyncHistoryEvent(historyEvent);
sendNotification('error', integration.name, errorMessage);
sendSyncNotification('error', integration.name, errorMessage);
}
}, []);

View File

@@ -47,7 +47,7 @@ export function mergeIntegrationsWithDefaults(incoming: Integration[]): Integrat
* Reset integration state for server switch while preserving configs.
* - Clears integration_id (server-assigned)
* - Sets status to 'disconnected'
* - Clears error_message and last_sync
* - Clears error_code and last_sync
* - Preserves all config objects (OAuth, calendar, etc.)
*/
export function resetIntegrationsForServerSwitch(integrations: Integration[]): Integration[] {
@@ -62,7 +62,7 @@ export function resetIntegrationsForServerSwitch(integrations: Integration[]): I
...existing,
integration_id: undefined,
status: 'disconnected' as const,
error_message: undefined,
error_code: undefined,
last_sync: undefined,
};
}
@@ -76,7 +76,7 @@ export function resetIntegrationsForServerSwitch(integrations: Integration[]): I
...custom,
integration_id: undefined,
status: 'disconnected' as const,
error_message: undefined,
error_code: undefined,
last_sync: undefined,
}));

View File

@@ -89,8 +89,8 @@ services:
# =============================================================================
# Application Services
# =============================================================================
# Note: 'server', 'server-gpu', 'server-rocm', and 'server-full' are mutually exclusive (same port 50051).
# Use ONE of: 'server' (CPU), 'server-gpu' (NVIDIA CUDA), 'server-rocm' (AMD ROCm), or 'server-full'.
# Note: 'server', 'server-gpu', 'server-rocm', 'server-rocm-dev', and 'server-full' are mutually exclusive (same port 50051).
# Use ONE of: 'server' (CPU), 'server-gpu' (NVIDIA CUDA), 'server-rocm' (AMD ROCm), 'server-rocm-dev' (ROCm hot reload), or 'server-full'.
#
# GPU Support:
# - NVIDIA CUDA: use profile 'server-gpu' (or 'full-gpu')
@@ -143,65 +143,121 @@ services:
# GPU-enabled server (NVIDIA CUDA)
# Build: docker buildx bake server-gpu
server-gpu:
container_name: noteflow-server
image: git.baked.rocks/vasceannie/noteflow-server-gpu:latest
build:
context: .
dockerfile: docker/server-gpu.Dockerfile
target: server
restart: unless-stopped
ports:
- "50051:50051"
extra_hosts:
- "host.docker.internal:host-gateway"
env_file:
- .env
environment:
NOTEFLOW_DATABASE_URL: postgresql+asyncpg://${POSTGRES_USER:-noteflow}:${POSTGRES_PASSWORD:-noteflow}@db:5432/${POSTGRES_DB:-noteflow}
NOTEFLOW_REDIS_URL: redis://redis:6379/0
NOTEFLOW_QDRANT_URL: http://qdrant:6333
NOTEFLOW_LOG_FORMAT: console
# Enable CUDA device auto-detection
NOTEFLOW_ASR_DEVICE: cuda
NOTEFLOW_DIARIZATION_DEVICE: cuda
volumes:
- .:/workspace
- server_venv_gpu:/workspace/.venv
# NVIDIA GPU configuration (cross-platform compatible)
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
depends_on:
db:
condition: service_healthy
redis:
condition: service_healthy
qdrant:
condition: service_healthy
networks:
- noteflow-net
profiles:
- server-gpu
- full-gpu
- gpu
# server-gpu:
# container_name: noteflow-server
# image: git.baked.rocks/vasceannie/noteflow-server-gpu:latest
# build:
# context: .
# dockerfile: docker/server-gpu.Dockerfile
# target: server
# restart: unless-stopped
# ports:
# - "50051:50051"
# extra_hosts:
# - "host.docker.internal:host-gateway"
# env_file:
# - .env
# environment:
# NOTEFLOW_DATABASE_URL: postgresql+asyncpg://${POSTGRES_USER:-noteflow}:${POSTGRES_PASSWORD:-noteflow}@db:5432/${POSTGRES_DB:-noteflow}
# NOTEFLOW_REDIS_URL: redis://redis:6379/0
# NOTEFLOW_QDRANT_URL: http://qdrant:6333
# NOTEFLOW_LOG_FORMAT: console
# # Enable CUDA device auto-detection
# NOTEFLOW_ASR_DEVICE: cuda
# NOTEFLOW_DIARIZATION_DEVICE: cuda
# volumes:
# - .:/workspace
# - server_venv_gpu:/workspace/.venv
# # NVIDIA GPU configuration (cross-platform compatible)
# deploy:
# resources:
# reservations:
# devices:
# - driver: nvidia
# count: 1
# capabilities: [gpu]
# depends_on:
# db:
# condition: service_healthy
# redis:
# condition: service_healthy
# qdrant:
# condition: service_healthy
# networks:
# - noteflow-net
# profiles:
# - server-gpu
# - full-gpu
# - gpu
# GPU-enabled server (AMD ROCm)
# Build: docker buildx bake server-rocm
server-rocm:
container_name: noteflow-server
image: git.baked.rocks/vasceannie/noteflow-server-rocm:latest
# server-rocm:
# container_name: noteflow-server
# image: git.baked.rocks/vasceannie/noteflow-server-rocm:latest
# build:
# context: .
# dockerfile: docker/Dockerfile.rocm
# target: server
# args:
# ROCM_VERSION: ${ROCM_VERSION:-6.4.1}
# ROCM_PYTORCH_RELEASE: ${ROCM_PYTORCH_RELEASE:-2.6.0}
# SPACY_MODEL_URL: ${SPACY_MODEL_URL:-https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.8.0/en_core_web_sm-3.8.0-py3-none-any.whl}
# restart: unless-stopped
# ports:
# - "50051:50051"
# extra_hosts:
# - "host.docker.internal:host-gateway"
# env_file:
# - .env
# environment:
# NOTEFLOW_DATABASE_URL: postgresql+asyncpg://${POSTGRES_USER:-noteflow}:${POSTGRES_PASSWORD:-noteflow}@db:5432/${POSTGRES_DB:-noteflow}
# NOTEFLOW_REDIS_URL: redis://redis:6379/0
# NOTEFLOW_QDRANT_URL: http://qdrant:6333
# NOTEFLOW_LOG_FORMAT: console
# # Enable ROCm device auto-detection
# NOTEFLOW_ASR_DEVICE: rocm
# NOTEFLOW_DIARIZATION_DEVICE: auto
# NOTEFLOW_FEATURE_ROCM_ENABLED: "true"
# volumes:
# - .:/workspace
# devices:
# - /dev/kfd
# - /dev/dri
# group_add:
# - ${VIDEO_GID:-44}
# - ${RENDER_GID:-993}
# security_opt:
# - seccomp=unconfined
# ulimits:
# memlock: -1
# stack: 67108864
# depends_on:
# db:
# condition: service_healthy
# redis:
# condition: service_healthy
# qdrant:
# condition: service_healthy
# networks:
# - noteflow-net
# profiles:
# - server-rocm
# - full-gpu
# - gpu
# GPU-enabled dev server (AMD ROCm with hot reload)
# Build: docker buildx bake server-rocm-dev
server-rocm-dev:
container_name: noteflow-server-dev
image: git.baked.rocks/vasceannie/noteflow-server-rocm-dev:latest
build:
context: .
dockerfile: docker/Dockerfile.rocm
target: server
target: server-dev
args:
ROCM_VERSION: ${ROCM_VERSION:-7.1.1}
ROCM_PYTORCH_RELEASE: ${ROCM_PYTORCH_RELEASE:-2.9.1}
ROCM_VERSION: ${ROCM_VERSION:-6.4.1}
ROCM_PYTORCH_RELEASE: ${ROCM_PYTORCH_RELEASE:-2.6.0}
SPACY_MODEL_URL: ${SPACY_MODEL_URL:-https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.8.0/en_core_web_sm-3.8.0-py3-none-any.whl}
restart: unless-stopped
ports:
@@ -215,9 +271,8 @@ services:
NOTEFLOW_REDIS_URL: redis://redis:6379/0
NOTEFLOW_QDRANT_URL: http://qdrant:6333
NOTEFLOW_LOG_FORMAT: console
# Enable ROCm device auto-detection
NOTEFLOW_ASR_DEVICE: rocm
NOTEFLOW_DIARIZATION_DEVICE: rocm
NOTEFLOW_DIARIZATION_DEVICE: auto
NOTEFLOW_FEATURE_ROCM_ENABLED: "true"
volumes:
- .:/workspace
@@ -225,10 +280,13 @@ services:
- /dev/kfd
- /dev/dri
group_add:
- video
- render
- ${VIDEO_GID:-44}
- ${RENDER_GID:-993}
security_opt:
- seccomp=unconfined
ulimits:
memlock: -1
stack: 67108864
depends_on:
db:
condition: service_healthy
@@ -239,7 +297,8 @@ services:
networks:
- noteflow-net
profiles:
- server-rocm
- server-rocm-dev
- full-gpu
- gpu
# server-full:

View File

@@ -6,6 +6,7 @@
# docker buildx bake server # Build CPU server only
# docker buildx bake server-gpu # Build GPU server only
# docker buildx bake server-rocm # Build ROCm GPU server only
# docker buildx bake server-rocm-dev # Build ROCm GPU dev server (hot reload)
# docker buildx bake servers # Build all server variants (parallel)
# docker buildx bake servers-gpu # Build GPU variants only (CUDA + ROCm)
# docker buildx bake client # Build client targets
@@ -41,11 +42,11 @@ variable "CUDA_VERSION" {
}
variable "ROCM_VERSION" {
default = "7.1.1"
default = "6.4.1"
}
variable "ROCM_PYTORCH_RELEASE" {
default = "2.9.1"
default = "2.6.0"
}
variable "SPACY_MODEL_URL" {
@@ -279,6 +280,18 @@ target "server-rocm-full" {
}
}
target "server-rocm-dev" {
inherits = ["_server-rocm-common"]
target = "server-dev"
tags = tags("server-rocm-dev")
platforms = ["linux/amd64"]
labels = {
"org.opencontainers.image.title" = "NoteFlow Server Dev (ROCm)"
"org.opencontainers.image.description" = "NoteFlow development server - AMD ROCm GPU build"
"ai.noteflow.rocm.version" = ROCM_VERSION
}
}
# =============================================================================
# Client Targets
# =============================================================================

View File

@@ -15,6 +15,7 @@ NoteFlow supports both CPU-only and GPU-accelerated Docker deployments:
| `server` | CPU-only server (default) | No |
| `server-gpu` | NVIDIA CUDA-enabled server | Yes |
| `server-rocm` | AMD ROCm-enabled server | Yes |
| `server-rocm-dev` | AMD ROCm-enabled server (hot reload) | Yes |
| `full` | CPU server + frontend | No |
| `full-gpu` | GPU server + frontend | Yes |
| `gpu` | Generic GPU profile (use with server-gpu or server-rocm) | Yes |
@@ -31,6 +32,9 @@ docker compose --profile server-gpu --profile infra up -d
# GPU-enabled (AMD ROCm, Linux only)
docker compose --profile server-rocm --profile infra up -d
# GPU-enabled dev (AMD ROCm with hot reload)
docker compose --profile server-rocm-dev --profile infra up -d
# Full stack with GPU
docker compose --profile full-gpu --profile infra up -d
@@ -84,6 +88,7 @@ NoteFlow uses Docker Buildx Bake for efficient parallel builds. Configuration is
| `server-gpu-full` | GPU server with all extras | linux/amd64 |
| `server-rocm` | AMD ROCm GPU server | linux/amd64 |
| `server-rocm-full` | ROCm server with all extras | linux/amd64 |
| `server-rocm-dev` | ROCm server with hot reload | linux/amd64 |
| `client-build` | Tauri client build | linux/amd64 |
| `client-dev` | Client development env | linux/amd64 |
@@ -110,6 +115,9 @@ docker buildx bake server-gpu
# Build ROCm server only
docker buildx bake server-rocm
# Build ROCm dev server
docker buildx bake server-rocm-dev
# Build CPU and GPU servers in parallel (CUDA + ROCm)
docker buildx bake servers
@@ -144,7 +152,7 @@ Override at build time with `--set`:
docker buildx bake --set server-gpu.args.CUDA_VERSION=12.5.0 server-gpu
# Use different ROCm base version
docker buildx bake --set server-rocm.args.ROCM_VERSION=7.1.1 --set server-rocm.args.ROCM_PYTORCH_RELEASE=2.9.1 server-rocm
docker buildx bake --set server-rocm.args.ROCM_VERSION=6.4.1 --set server-rocm.args.ROCM_PYTORCH_RELEASE=2.6.0 server-rocm
# Use custom registry
docker buildx bake --set "*.tags=ghcr.io/myorg/noteflow:sha-abc123" all
@@ -156,8 +164,8 @@ docker buildx bake --set "*.tags=ghcr.io/myorg/noteflow:sha-abc123" all
| `TAG` | latest | Image tag |
| `PYTHON_VERSION` | 3.12 | Python version |
| `CUDA_VERSION` | 12.4.1 | CUDA version for GPU builds |
| `ROCM_VERSION` | 7.1.1 | ROCm version for AMD GPU builds |
| `ROCM_PYTORCH_RELEASE` | 2.9.1 | PyTorch release in ROCm base image |
| `ROCM_VERSION` | 6.4.1 | ROCm version for AMD GPU builds |
| `ROCM_PYTORCH_RELEASE` | 2.6.0 | PyTorch release in ROCm base image |
### Integration with Compose

View File

@@ -11,8 +11,8 @@
# -v /path/to/models:/workspace/models \
# noteflow:rocm
ARG ROCM_VERSION=7.1.1
ARG ROCM_PYTORCH_RELEASE=2.9.1
ARG ROCM_VERSION=6.4.1
ARG ROCM_PYTORCH_RELEASE=2.6.0
ARG SPACY_MODEL_URL=https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.8.0/en_core_web_sm-3.8.0-py3-none-any.whl
# =============================================================================
@@ -36,6 +36,7 @@ COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
pkg-config \
python3-venv \
portaudio19-dev \
libsndfile1 \
ffmpeg \
@@ -51,14 +52,22 @@ WORKDIR /workspace
# Copy dependency files first for better layer caching
COPY pyproject.toml uv.lock* ./
COPY README.md ./
COPY src ./src/
# Install NoteFlow with ROCm extras
RUN uv pip install --system -e ".[rocm]"
# Create venv with access to system site-packages (ROCm torch)
ENV VIRTUAL_ENV=/opt/venv
RUN uv venv --system-site-packages ${VIRTUAL_ENV}
ENV PATH="${VIRTUAL_ENV}/bin:$PATH"
# Install NoteFlow with ROCm extras (into venv)
RUN uv pip install --python ${VIRTUAL_ENV}/bin/python -e ".[rocm,optional]"
# Improve redis client performance and silence hiredis warning.
RUN uv pip install --python ${VIRTUAL_ENV}/bin/python hiredis
# Install spaCy small English model for NER (baked into image)
ARG SPACY_MODEL_URL
RUN uv pip install --system ${SPACY_MODEL_URL}
RUN uv pip install --python ${VIRTUAL_ENV}/bin/python ${SPACY_MODEL_URL}
# Copy remaining files (scripts, configs, etc.)
COPY . .
@@ -80,6 +89,40 @@ HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
# Run gRPC server
CMD ["python", "-m", "noteflow.grpc.server"]
# =============================================================================
# Server Dev Stage - ROCm (hot reload)
# =============================================================================
FROM base AS server-dev
WORKDIR /workspace
COPY pyproject.toml uv.lock* ./
COPY README.md ./
COPY src ./src/
ENV VIRTUAL_ENV=/opt/venv
RUN uv venv --system-site-packages ${VIRTUAL_ENV}
ENV PATH="${VIRTUAL_ENV}/bin:$PATH"
RUN uv pip install --python ${VIRTUAL_ENV}/bin/python -e ".[rocm,optional]"
RUN uv pip install --python ${VIRTUAL_ENV}/bin/python hiredis
RUN uv pip install --python ${VIRTUAL_ENV}/bin/python watchfiles
ARG SPACY_MODEL_URL
RUN uv pip install --python ${VIRTUAL_ENV}/bin/python ${SPACY_MODEL_URL}
COPY . .
ENV ROCM_PATH=/opt/rocm \
HIP_VISIBLE_DEVICES=0 \
HSA_OVERRIDE_GFX_VERSION="" \
NOTEFLOW_ASR_DEVICE=rocm \
NOTEFLOW_FEATURE_ROCM_ENABLED=true
EXPOSE 50051
CMD ["python", "scripts/dev_watch_server.py"]
# =============================================================================
# Server Full Stage - ROCm (optional extras)
# =============================================================================
@@ -88,12 +131,17 @@ FROM base AS server-full
WORKDIR /workspace
COPY pyproject.toml uv.lock* ./
COPY README.md ./
COPY src ./src/
RUN uv pip install --system -e ".[rocm,optional]"
ENV VIRTUAL_ENV=/opt/venv
RUN uv venv --system-site-packages ${VIRTUAL_ENV}
ENV PATH="${VIRTUAL_ENV}/bin:$PATH"
RUN uv pip install --python ${VIRTUAL_ENV}/bin/python -e ".[rocm,optional]"
ARG SPACY_MODEL_URL
RUN uv pip install --system ${SPACY_MODEL_URL}
RUN uv pip install --python ${VIRTUAL_ENV}/bin/python ${SPACY_MODEL_URL}
COPY . .

View File

@@ -47,7 +47,11 @@ export HSA_OVERRIDE_GFX_VERSION=10.3.0
1. **AMD GPU** with ROCm support
2. **Linux** (Ubuntu 24.04 recommended)
3. **Python 3.12+**
4. **ROCm 7.1.1+** installed
4. **ROCm 6.4+** installed
**Note**: NoteFlow requires Python 3.12. The ROCm PyTorch images that ship Python 3.12 and
match our default Docker builds are ROCm 6.4.1 / PyTorch 2.6.0, so the Docker defaults are
pinned to those versions.
### Step 1: Install ROCm
@@ -72,8 +76,8 @@ Follow AMD's ROCm PyTorch install guide for your ROCm version. AMD recommends us
ROCm wheels from repo.radeon.com over PyTorch.org ROCm wheels.
```bash
# Example (ROCm 7.1.x, Python 3.12)
wget https://repo.radeon.com/rocm/manylinux/rocm-rel-7.1/torch-<version>-cp312-cp312-linux_x86_64.whl
# Example (ROCm 6.4.x, Python 3.12)
wget https://repo.radeon.com/rocm/manylinux/rocm-rel-6.4/torch-<version>-cp312-cp312-linux_x86_64.whl
pip install ./torch-<version>-cp312-cp312-linux_x86_64.whl
```
@@ -145,13 +149,16 @@ export AMD_LOG_LEVEL=1 # 0=off, 1=errors, 2=warnings, 3=info
# Build with Buildx Bake (recommended)
docker buildx bake server-rocm
# Build dev image (hot reload)
docker buildx bake server-rocm-dev
# Build and push to a registry
REGISTRY=git.baked.rocks/vasceannie docker buildx bake --push server-rocm
# Classic docker build (override versions as needed)
docker build -f docker/Dockerfile.rocm \
--build-arg ROCM_VERSION=7.1.1 \
--build-arg ROCM_PYTORCH_RELEASE=2.9.1 \
--build-arg ROCM_VERSION=6.4.1 \
--build-arg ROCM_PYTORCH_RELEASE=2.6.0 \
-t noteflow:rocm .
```
@@ -161,6 +168,9 @@ docker build -f docker/Dockerfile.rocm \
# Start AMD ROCm server + infra
docker compose --profile server-rocm --profile infra up -d
# Start AMD ROCm dev server + infra (hot reload)
docker compose --profile server-rocm-dev --profile infra up -d
# Generic GPU profile (choose the variant explicitly)
docker compose --profile gpu --profile infra up server-rocm
```
@@ -173,6 +183,8 @@ docker run \
--device=/dev/dri \
--group-add video \
--security-opt seccomp=unconfined \
--ulimit memlock=-1 \
--ulimit stack=67108864 \
-p 50051:50051 \
-v /path/to/models:/workspace/models \
noteflow:rocm

View File

@@ -60,6 +60,7 @@ from noteflow.config.constants.domain import (
# Error constants
from noteflow.config.constants.errors import (
AUTH_ERROR_PATTERNS,
ERR_API_PREFIX,
ERR_HF_TOKEN_REQUIRED,
ERR_SERVER_RESTARTED,
@@ -118,6 +119,7 @@ __all__ = [
# Core
"APP_DIR_NAME",
"AUDIO_BUFFER_SIZE_BYTES",
"AUTH_ERROR_PATTERNS",
"DEFAULT_GRPC_PORT",
# Domain
"DEFAULT_MEETING_TITLE",

View File

@@ -18,6 +18,20 @@ ERR_SERVER_RESTARTED: Final[str] = "Server restarted"
ERR_TOKEN_EXPIRED: Final[str] = "Access token expired or invalid"
"""Error for expired or invalid OAuth tokens."""
# Patterns in error messages that indicate authentication failures
AUTH_ERROR_PATTERNS: Final[frozenset[str]] = frozenset({
"expired",
"revoked",
"invalid_grant",
"authentication",
"unauthorized",
"token expired",
"invalid token",
"access_denied",
"invalid_client",
})
"""Lowercase substrings that indicate auth-related errors when found in error messages."""
ERR_API_PREFIX: Final[str] = "API error: "
"""Prefix for API error messages."""

View File

@@ -139,6 +139,20 @@ class SyncRunStatus(StrEnum):
ERROR = "error"
class SyncErrorCode(StrEnum):
"""Structured error codes for sync failures.
Enables programmatic detection of specific error types,
particularly authentication failures that require user action.
"""
UNSPECIFIED = "unspecified"
AUTH_REQUIRED = "auth_required"
PROVIDER_ERROR = "provider_error"
INTERNAL_ERROR = "internal_error"
UNKNOWN = "unknown"
@dataclass
class SyncRun:
"""Track a single sync operation for an integration.
@@ -153,7 +167,7 @@ class SyncRun:
started_at: datetime = field(default_factory=utc_now)
ended_at: datetime | None = None
duration_ms: int | None = None
error_message: str | None = None
error_code: SyncErrorCode | None = None
stats: dict[str, object] = field(default_factory=dict)
@classmethod
@@ -194,18 +208,18 @@ class SyncRun:
"sync_run", str(self.id), old_status, self.status, items_synced=items_synced
)
def fail(self, error_message: str) -> None:
def fail(self, error_code: SyncErrorCode) -> None:
"""Mark sync as failed.
Args:
error_message: Description of what went wrong.
error_code: Structured error code indicating failure type.
"""
old_status = self.status
now = utc_now()
self.status = SyncRunStatus.ERROR
self.ended_at = now
self.duration_ms = int((now - self.started_at).total_seconds() * 1000)
self.error_message = error_message
self.error_code = error_code
log_state_transition("sync_run", str(self.id), old_status, self.status, reason="error")
@property

View File

@@ -12,6 +12,10 @@ from uuid import UUID
from opentelemetry.trace import Span
from noteflow.application.services.calendar import CalendarServiceError
from noteflow.config.constants.errors import AUTH_ERROR_PATTERNS
from noteflow.domain.entities.integration import SyncErrorCode
from noteflow.infrastructure.calendar.oauth import OAuthError
from noteflow.infrastructure.logging import get_logger
if TYPE_CHECKING:
@@ -20,6 +24,38 @@ if TYPE_CHECKING:
logger = get_logger(__name__)
def classify_sync_error(exc: Exception) -> SyncErrorCode:
"""Classify an exception into a structured sync error code.
Args:
exc: The exception that caused the sync failure.
Returns:
The appropriate SyncErrorCode for the failure type.
"""
error_lower = str(exc).lower()
# OAuthError: Check for auth-specific patterns
if isinstance(exc, OAuthError):
if any(pattern in error_lower for pattern in AUTH_ERROR_PATTERNS):
return SyncErrorCode.AUTH_REQUIRED
return SyncErrorCode.PROVIDER_ERROR
# CalendarServiceError: External provider failure
if isinstance(exc, CalendarServiceError):
# Check if it's an auth issue wrapped in CalendarServiceError
if any(pattern in error_lower for pattern in AUTH_ERROR_PATTERNS):
return SyncErrorCode.AUTH_REQUIRED
return SyncErrorCode.PROVIDER_ERROR
# Check message patterns for other exceptions
if any(pattern in error_lower for pattern in AUTH_ERROR_PATTERNS):
return SyncErrorCode.AUTH_REQUIRED
# Default to unknown for unclassified errors
return SyncErrorCode.UNKNOWN
@dataclass(frozen=True, slots=True)
class SyncContext:
"""Context for executing a sync operation.
@@ -62,9 +98,11 @@ async def execute_sync_with_context(ctx: SyncContext) -> None:
items_synced,
)
except Exception as e:
error_code = classify_sync_error(e)
ctx.span.record_exception(e)
ctx.span.set_attribute("sync.error", str(e))
logger.exception("Sync run %s failed: %s", ctx.sync_run_id, e)
sync_run = await ctx.host.fail_sync_run(ctx.sync_run_id, str(e))
ctx.span.set_attribute("sync.error_code", error_code.value)
logger.exception("Sync run %s failed: %s (code=%s)", ctx.sync_run_id, e, error_code)
sync_run = await ctx.host.fail_sync_run(ctx.sync_run_id, error_code)
if sync_run:
ctx.host.cache_sync_run(sync_run)

View File

@@ -38,6 +38,7 @@ from ._external import (
entity_to_proto,
log_entry_to_proto,
metrics_to_proto,
sync_error_code_to_proto,
sync_run_to_proto,
webhook_config_to_proto,
webhook_delivery_to_proto,
@@ -105,6 +106,7 @@ __all__ = [
"summarization_template_to_proto",
"summarization_template_version_to_proto",
"summary_to_proto",
"sync_error_code_to_proto",
"sync_run_to_proto",
"webhook_config_to_proto",
"webhook_delivery_to_proto",

View File

@@ -3,6 +3,7 @@
from __future__ import annotations
from noteflow.domain.entities import SyncRun
from noteflow.domain.entities.integration import SyncErrorCode
from noteflow.domain.entities.named_entity import NamedEntity
from noteflow.domain.webhooks import WebhookConfig, WebhookDelivery
from noteflow.infrastructure.logging import LogEntry
@@ -76,6 +77,33 @@ def webhook_delivery_to_proto(
# Sync Converters
# -----------------------------------------------------------------------------
# Map domain SyncErrorCode to proto enum values
_SYNC_ERROR_CODE_TO_PROTO: dict[SyncErrorCode, noteflow_pb2.SyncErrorCode.ValueType] = {
SyncErrorCode.UNSPECIFIED: noteflow_pb2.SYNC_ERROR_CODE_UNSPECIFIED,
SyncErrorCode.AUTH_REQUIRED: noteflow_pb2.SYNC_ERROR_CODE_AUTH_REQUIRED,
SyncErrorCode.PROVIDER_ERROR: noteflow_pb2.SYNC_ERROR_CODE_PROVIDER_ERROR,
SyncErrorCode.INTERNAL_ERROR: noteflow_pb2.SYNC_ERROR_CODE_INTERNAL_ERROR,
SyncErrorCode.UNKNOWN: noteflow_pb2.SYNC_ERROR_CODE_UNKNOWN,
}
def sync_error_code_to_proto(
error_code: SyncErrorCode | None,
) -> noteflow_pb2.SyncErrorCode.ValueType:
"""Convert domain SyncErrorCode to proto enum value.
Args:
error_code: Domain error code, or None for unspecified.
Returns:
Proto enum integer value.
"""
if error_code is None:
return noteflow_pb2.SYNC_ERROR_CODE_UNSPECIFIED
return _SYNC_ERROR_CODE_TO_PROTO.get(
error_code, noteflow_pb2.SYNC_ERROR_CODE_UNKNOWN
)
def sync_run_to_proto(run: SyncRun) -> noteflow_pb2.SyncRunProto:
"""Convert a SyncRun domain entity to protobuf message."""
@@ -84,10 +112,10 @@ def sync_run_to_proto(run: SyncRun) -> noteflow_pb2.SyncRunProto:
integration_id=str(run.integration_id),
status=run.status.value,
items_synced=run.items_synced,
error_message=run.error_message or "",
duration_ms=run.duration_ms or 0,
started_at=run.started_at.isoformat(),
completed_at=run.ended_at.isoformat() if run.ended_at else "",
error_code=sync_error_code_to_proto(run.error_code),
)

View File

@@ -104,4 +104,4 @@ async def parse_project_id_from_request(
if not (request.HasField(PROJECT_ID) and request.project_id):
return None
return await parse_optional_uuid_or_abort(request.project_id, context, "project_id")
return await parse_optional_uuid_or_abort(request.project_id, context, PROJECT_ID)

View File

@@ -9,6 +9,7 @@ if TYPE_CHECKING:
from uuid import UUID
from noteflow.domain.entities import Integration, Meeting, Segment, Summary, SyncRun
from noteflow.domain.entities.integration import SyncErrorCode
from noteflow.domain.ports.unit_of_work import UnitOfWork
from noteflow.domain.value_objects import MeetingId
from noteflow.grpc.mixins.preferences import PreferencesRepositoryProvider
@@ -186,8 +187,9 @@ class ServicerSyncMethods(Protocol):
self,
sync_run_id: UUID,
error_message: str,
error_code: SyncErrorCode | None = None,
) -> SyncRun | None:
"""Mark sync run as failed with error message."""
"""Mark sync run as failed with error message and structured error code."""
...

View File

@@ -15,6 +15,7 @@ from noteflow.config.constants import (
STREAM_INIT_LOCK_TIMEOUT_SECONDS,
)
from noteflow.domain.constants.fields import STATE
from noteflow.domain.webhooks.events import WebhookEventType
from noteflow.infrastructure.logging import get_logger
from .._types import GrpcContext
@@ -66,7 +67,7 @@ async def _trigger_recording_webhook(
meeting_id=meeting_id,
title=title,
),
"recording.started",
WebhookEventType.RECORDING_STARTED.value,
),
name=f"webhook-recording-started-{meeting_id}",
)

View File

@@ -8,6 +8,7 @@ from uuid import UUID
from noteflow.config.constants.core import DAYS_PER_WEEK, HOURS_PER_DAY
from noteflow.domain.constants.fields import CALENDAR, PROVIDER
from noteflow.domain.entities import Integration, SyncRun
from noteflow.domain.entities.integration import SyncErrorCode
from noteflow.domain.ports.unit_of_work import UnitOfWork
from noteflow.infrastructure.logging import get_logger
from noteflow.infrastructure.persistence.constants import DEFAULT_LIST_LIMIT
@@ -15,7 +16,7 @@ from noteflow.infrastructure.persistence.constants import DEFAULT_LIST_LIMIT
from ..proto import noteflow_pb2
from ._sync_execution import SyncContext, execute_sync_with_context, set_sync_span_attributes
from ._types import GrpcContext
from .converters import sync_run_to_proto
from .converters import sync_error_code_to_proto, sync_run_to_proto
from .errors import (
ENTITY_INTEGRATION,
ENTITY_SYNC_RUN,
@@ -64,6 +65,21 @@ def _format_enum_value(value: str | None) -> str:
return "" if value is None else value
def _sync_run_to_status_response(
sync_run: SyncRun,
expires_at: str | None,
) -> noteflow_pb2.GetSyncStatusResponse:
"""Build GetSyncStatusResponse from domain SyncRun."""
return noteflow_pb2.GetSyncStatusResponse(
status=sync_run.status.value,
items_synced=sync_run.items_synced,
items_total=sync_run.items_total or 0,
duration_ms=sync_run.duration_ms or 0,
error_code=sync_error_code_to_proto(sync_run.error_code),
expires_at=expires_at,
)
def _integration_to_proto(integration: Integration) -> noteflow_pb2.IntegrationInfo:
return noteflow_pb2.IntegrationInfo(
id=str(integration.id),
@@ -251,7 +267,7 @@ class SyncMixin:
async def fail_sync_run(
self: ServicerHost,
sync_run_id: UUID,
error_message: str,
error_code: SyncErrorCode,
) -> SyncRun | None:
async with self.create_repository_provider() as uow:
repo = uow.integrations
@@ -259,7 +275,7 @@ class SyncMixin:
if sync_run is None:
return None
sync_run.fail(error_message)
sync_run.fail(error_code)
await repo.update_sync_run(sync_run)
await uow.commit()
return sync_run
@@ -287,14 +303,7 @@ class SyncMixin:
# Sprint GAP-002: Include expiry metadata
expires_at = self.get_sync_run_expires_at(sync_run_id)
return noteflow_pb2.GetSyncStatusResponse(
status=sync_run.status.value,
items_synced=sync_run.items_synced,
items_total=sync_run.items_total or 0,
error_message=sync_run.error_message or "",
duration_ms=sync_run.duration_ms or 0,
expires_at=expires_at,
)
return _sync_run_to_status_response(sync_run, expires_at)
async def ListSyncHistory(
self: ServicerHost,
@@ -319,8 +328,8 @@ class SyncMixin:
async def GetUserIntegrations(
self: ServicerHost,
request: noteflow_pb2.GetUserIntegrationsRequest,
context: GrpcContext,
_request: noteflow_pb2.GetUserIntegrationsRequest,
_context: GrpcContext,
) -> noteflow_pb2.GetUserIntegrationsResponse:
async with self.create_repository_provider() as uow:
# Get all integrations (workspace filtering handled by repository)

View File

@@ -924,6 +924,20 @@ enum JobStatus {
JOB_STATUS_CANCELLED = 5;
}
// Structured error codes for sync failures (enables programmatic error handling)
enum SyncErrorCode {
// Default/unspecified
SYNC_ERROR_CODE_UNSPECIFIED = 0;
// Authentication required - token expired or revoked, needs re-auth
SYNC_ERROR_CODE_AUTH_REQUIRED = 1;
// External provider error - API failure, transient error
SYNC_ERROR_CODE_PROVIDER_ERROR = 2;
// Internal server error - unexpected failure in sync logic
SYNC_ERROR_CODE_INTERNAL_ERROR = 3;
// Unknown error - fallback for unclassified errors
SYNC_ERROR_CODE_UNKNOWN = 4;
}
// =============================================================================
// Post-Processing Status Messages (GAP-W05)
// =============================================================================
@@ -1695,12 +1709,13 @@ message GetSyncStatusResponse {
// Total items to sync (if known)
int32 items_total = 3;
// Error message if status is "error"
string error_message = 4;
// Duration in milliseconds (when completed)
int64 duration_ms = 5;
// Structured error code for programmatic error handling
// (enables client to distinguish auth failures from other errors)
SyncErrorCode error_code = 6;
// When this sync run expires from cache (ISO 8601 timestamp)
// (Sprint GAP-002: State Synchronization)
optional string expires_at = 10;
@@ -1742,9 +1757,6 @@ message SyncRunProto {
// Number of items synced
int32 items_synced = 4;
// Error message if failed
string error_message = 5;
// Duration in milliseconds
int64 duration_ms = 6;
@@ -1753,6 +1765,9 @@ message SyncRunProto {
// Completion timestamp (ISO 8601, empty if running)
string completed_at = 8;
// Structured error code for programmatic error handling
SyncErrorCode error_code = 9;
}
// =============================================================================

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

View File

@@ -70,6 +70,9 @@ class NoteFlowServiceStub:
"""ASR Configuration Management (Sprint 19)"""
UpdateAsrConfiguration: grpc.UnaryUnaryMultiCallable[noteflow_pb2.UpdateAsrConfigurationRequest, noteflow_pb2.UpdateAsrConfigurationResponse]
GetAsrConfigurationJobStatus: grpc.UnaryUnaryMultiCallable[noteflow_pb2.GetAsrConfigurationJobStatusRequest, noteflow_pb2.AsrConfigurationJobStatus]
GetStreamingConfiguration: grpc.UnaryUnaryMultiCallable[noteflow_pb2.GetStreamingConfigurationRequest, noteflow_pb2.GetStreamingConfigurationResponse]
"""Streaming configuration (Sprint 20)"""
UpdateStreamingConfiguration: grpc.UnaryUnaryMultiCallable[noteflow_pb2.UpdateStreamingConfigurationRequest, noteflow_pb2.UpdateStreamingConfigurationResponse]
ExtractEntities: grpc.UnaryUnaryMultiCallable[noteflow_pb2.ExtractEntitiesRequest, noteflow_pb2.ExtractEntitiesResponse]
"""Named entity extraction (Sprint 4) + mutations (Sprint 8)"""
UpdateEntity: grpc.UnaryUnaryMultiCallable[noteflow_pb2.UpdateEntityRequest, noteflow_pb2.UpdateEntityResponse]
@@ -82,6 +85,8 @@ class NoteFlowServiceStub:
CompleteOAuth: grpc.UnaryUnaryMultiCallable[noteflow_pb2.CompleteOAuthRequest, noteflow_pb2.CompleteOAuthResponse]
GetOAuthConnectionStatus: grpc.UnaryUnaryMultiCallable[noteflow_pb2.GetOAuthConnectionStatusRequest, noteflow_pb2.GetOAuthConnectionStatusResponse]
DisconnectOAuth: grpc.UnaryUnaryMultiCallable[noteflow_pb2.DisconnectOAuthRequest, noteflow_pb2.DisconnectOAuthResponse]
GetOAuthClientConfig: grpc.UnaryUnaryMultiCallable[noteflow_pb2.GetOAuthClientConfigRequest, noteflow_pb2.GetOAuthClientConfigResponse]
SetOAuthClientConfig: grpc.UnaryUnaryMultiCallable[noteflow_pb2.SetOAuthClientConfigRequest, noteflow_pb2.SetOAuthClientConfigResponse]
RegisterWebhook: grpc.UnaryUnaryMultiCallable[noteflow_pb2.RegisterWebhookRequest, noteflow_pb2.WebhookConfigProto]
"""Webhook management (Sprint 6)"""
ListWebhooks: grpc.UnaryUnaryMultiCallable[noteflow_pb2.ListWebhooksRequest, noteflow_pb2.ListWebhooksResponse]
@@ -187,6 +192,9 @@ class NoteFlowServiceAsyncStub(NoteFlowServiceStub):
"""ASR Configuration Management (Sprint 19)"""
UpdateAsrConfiguration: grpc.aio.UnaryUnaryMultiCallable[noteflow_pb2.UpdateAsrConfigurationRequest, noteflow_pb2.UpdateAsrConfigurationResponse] # type: ignore[assignment]
GetAsrConfigurationJobStatus: grpc.aio.UnaryUnaryMultiCallable[noteflow_pb2.GetAsrConfigurationJobStatusRequest, noteflow_pb2.AsrConfigurationJobStatus] # type: ignore[assignment]
GetStreamingConfiguration: grpc.aio.UnaryUnaryMultiCallable[noteflow_pb2.GetStreamingConfigurationRequest, noteflow_pb2.GetStreamingConfigurationResponse] # type: ignore[assignment]
"""Streaming configuration (Sprint 20)"""
UpdateStreamingConfiguration: grpc.aio.UnaryUnaryMultiCallable[noteflow_pb2.UpdateStreamingConfigurationRequest, noteflow_pb2.UpdateStreamingConfigurationResponse] # type: ignore[assignment]
ExtractEntities: grpc.aio.UnaryUnaryMultiCallable[noteflow_pb2.ExtractEntitiesRequest, noteflow_pb2.ExtractEntitiesResponse] # type: ignore[assignment]
"""Named entity extraction (Sprint 4) + mutations (Sprint 8)"""
UpdateEntity: grpc.aio.UnaryUnaryMultiCallable[noteflow_pb2.UpdateEntityRequest, noteflow_pb2.UpdateEntityResponse] # type: ignore[assignment]
@@ -199,6 +207,8 @@ class NoteFlowServiceAsyncStub(NoteFlowServiceStub):
CompleteOAuth: grpc.aio.UnaryUnaryMultiCallable[noteflow_pb2.CompleteOAuthRequest, noteflow_pb2.CompleteOAuthResponse] # type: ignore[assignment]
GetOAuthConnectionStatus: grpc.aio.UnaryUnaryMultiCallable[noteflow_pb2.GetOAuthConnectionStatusRequest, noteflow_pb2.GetOAuthConnectionStatusResponse] # type: ignore[assignment]
DisconnectOAuth: grpc.aio.UnaryUnaryMultiCallable[noteflow_pb2.DisconnectOAuthRequest, noteflow_pb2.DisconnectOAuthResponse] # type: ignore[assignment]
GetOAuthClientConfig: grpc.aio.UnaryUnaryMultiCallable[noteflow_pb2.GetOAuthClientConfigRequest, noteflow_pb2.GetOAuthClientConfigResponse] # type: ignore[assignment]
SetOAuthClientConfig: grpc.aio.UnaryUnaryMultiCallable[noteflow_pb2.SetOAuthClientConfigRequest, noteflow_pb2.SetOAuthClientConfigResponse] # type: ignore[assignment]
RegisterWebhook: grpc.aio.UnaryUnaryMultiCallable[noteflow_pb2.RegisterWebhookRequest, noteflow_pb2.WebhookConfigProto] # type: ignore[assignment]
"""Webhook management (Sprint 6)"""
ListWebhooks: grpc.aio.UnaryUnaryMultiCallable[noteflow_pb2.ListWebhooksRequest, noteflow_pb2.ListWebhooksResponse] # type: ignore[assignment]
@@ -476,6 +486,21 @@ class NoteFlowServiceServicer(metaclass=abc.ABCMeta):
context: _ServicerContext,
) -> typing.Union[noteflow_pb2.AsrConfigurationJobStatus, collections.abc.Awaitable[noteflow_pb2.AsrConfigurationJobStatus]]: ...
@abc.abstractmethod
def GetStreamingConfiguration(
self,
request: noteflow_pb2.GetStreamingConfigurationRequest,
context: _ServicerContext,
) -> typing.Union[noteflow_pb2.GetStreamingConfigurationResponse, collections.abc.Awaitable[noteflow_pb2.GetStreamingConfigurationResponse]]:
"""Streaming configuration (Sprint 20)"""
@abc.abstractmethod
def UpdateStreamingConfiguration(
self,
request: noteflow_pb2.UpdateStreamingConfigurationRequest,
context: _ServicerContext,
) -> typing.Union[noteflow_pb2.UpdateStreamingConfigurationResponse, collections.abc.Awaitable[noteflow_pb2.UpdateStreamingConfigurationResponse]]: ...
@abc.abstractmethod
def ExtractEntities(
self,
@@ -542,6 +567,20 @@ class NoteFlowServiceServicer(metaclass=abc.ABCMeta):
context: _ServicerContext,
) -> typing.Union[noteflow_pb2.DisconnectOAuthResponse, collections.abc.Awaitable[noteflow_pb2.DisconnectOAuthResponse]]: ...
@abc.abstractmethod
def GetOAuthClientConfig(
self,
request: noteflow_pb2.GetOAuthClientConfigRequest,
context: _ServicerContext,
) -> typing.Union[noteflow_pb2.GetOAuthClientConfigResponse, collections.abc.Awaitable[noteflow_pb2.GetOAuthClientConfigResponse]]: ...
@abc.abstractmethod
def SetOAuthClientConfig(
self,
request: noteflow_pb2.SetOAuthClientConfigRequest,
context: _ServicerContext,
) -> typing.Union[noteflow_pb2.SetOAuthClientConfigResponse, collections.abc.Awaitable[noteflow_pb2.SetOAuthClientConfigResponse]]: ...
@abc.abstractmethod
def RegisterWebhook(
self,

View File

@@ -284,12 +284,21 @@ async def run_server_with_config(config: GrpcServerConfig) -> None:
def main() -> None:
"""Entry point for NoteFlow gRPC server."""
import warnings
# Configure platform-specific settings BEFORE any torch imports
# This must happen first to suppress NNPACK warnings on unsupported hardware
from noteflow.infrastructure.platform import configure_pytorch_for_platform
configure_pytorch_for_platform()
warnings.filterwarnings(
"ignore",
message=r"torchaudio\._backend\.set_audio_backend has been deprecated.*",
category=UserWarning,
module=r"diart\.audio",
)
args = parse_args()
# Configure centralized logging with structlog

View File

@@ -10,10 +10,10 @@ from noteflow.application.services.calendar import CalendarService
from noteflow.application.services.ner import NerService
from noteflow.application.services.webhooks import WebhookService
from noteflow.config.settings import Settings, get_calendar_settings, get_feature_flags
from noteflow.domain.constants.fields import CALENDAR
from noteflow.domain.constants.fields import CALENDAR, DEVICE
from noteflow.domain.entities.integration import IntegrationStatus
from noteflow.domain.ports.gpu import GpuBackend
from noteflow.infrastructure.diarization import DiarizationEngine
from noteflow.domain.constants.fields import DEVICE
from noteflow.infrastructure.logging import get_logger
from noteflow.infrastructure.ner import NerEngine
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
@@ -134,6 +134,13 @@ async def create_calendar_service(
return calendar_service
def _normalize_diarization_device(device: str) -> str:
normalized = device.strip().lower()
if normalized == GpuBackend.ROCM.value:
return GpuBackend.CUDA.value
return normalized
def create_diarization_engine(diarization: DiarizationConfigLike) -> DiarizationEngine | None:
"""Create diarization engine if enabled and configured.
@@ -153,9 +160,16 @@ def create_diarization_engine(diarization: DiarizationConfigLike) -> Diarization
)
return None
logger.info("Initializing diarization engine on %s...", diarization.device)
normalized_device = _normalize_diarization_device(diarization.device)
if normalized_device != diarization.device:
logger.info(
"Normalizing diarization device '%s' to '%s'",
diarization.device,
normalized_device,
)
logger.info("Initializing diarization engine on %s...", normalized_device)
diarization_kwargs: DiarizationEngineKwargs = {
DEVICE: diarization.device,
DEVICE: normalized_device,
"hf_token": diarization.hf_token,
}
if diarization.streaming_latency is not None:

View File

@@ -2,6 +2,7 @@
from __future__ import annotations
from noteflow.config.constants.errors import AUTH_ERROR_PATTERNS
from noteflow.domain.value_objects import OAuthClientConfig, OAuthProvider, OAuthState, OAuthTokens
from noteflow.infrastructure.calendar.oauth_flow import (
OAuthFlowConfig,
@@ -126,9 +127,10 @@ class OAuthManagerFlowMixin(OAuthManagerBase):
exc: ValueError, provider: OAuthProvider, oauth_state: OAuthState
) -> None:
"""Log warning for state validation failure."""
exc_lower = str(exc).lower()
event = (
"oauth_state_expired"
if "expired" in str(exc).lower()
if any(p in exc_lower for p in AUTH_ERROR_PATTERNS)
else "oauth_provider_mismatch"
)
logger.warning(

View File

@@ -89,6 +89,10 @@ class SyncRunConverter:
"""
status = SyncRunStatus(model.status)
stats = dict(model.stats) if model.stats else {}
error_code = None
if model.error_code:
from noteflow.domain.entities.integration import SyncErrorCode
error_code = SyncErrorCode(model.error_code)
return SyncRun(
id=model.id,
integration_id=model.integration_id,
@@ -96,7 +100,7 @@ class SyncRunConverter:
started_at=model.started_at,
ended_at=model.ended_at,
duration_ms=model.duration_ms,
error_message=model.error_message,
error_code=error_code,
stats=stats,
)
@@ -117,6 +121,6 @@ class SyncRunConverter:
"started_at": entity.started_at,
ENDED_AT: entity.ended_at,
DURATION_MS: entity.duration_ms,
"error_message": entity.error_message,
"error_code": entity.error_code.value if entity.error_code else None,
"stats": entity.stats,
}

View File

@@ -9,6 +9,39 @@ from .engine_base import DiarizationEngineBase
logger = get_logger(__name__)
def _cuda_device_available() -> bool:
import torch
if not torch.cuda.is_available():
return False
try:
device_count = torch.cuda.device_count()
except RuntimeError as exc:
logger.warning(
"CUDA/ROCm device detection failed (%s); falling back to CPU",
exc,
)
return False
if device_count < 1:
logger.warning(
"CUDA/ROCm runtime reported available but no GPUs detected; falling back to CPU"
)
return False
# Explicitly log ROCm detection for debugging clarity
if hasattr(torch.version, "hip") and torch.version.hip:
logger.info("Detected ROCm environment (using cuda backend)")
return True
def _mps_available() -> bool:
import torch
return hasattr(torch.backends, "mps") and torch.backends.mps.is_available()
class DiarizationEngineDeviceMixin(DiarizationEngineBase):
"""Mixin for device resolution."""
@@ -29,11 +62,8 @@ class DiarizationEngineDeviceMixin(DiarizationEngineBase):
if self._device_preference != "auto":
return self._device_preference
import torch
if torch.cuda.is_available():
# Explicitly log ROCm detection for debugging clarity
if hasattr(torch.version, "hip") and torch.version.hip:
logger.info("Detected ROCm environment (using cuda backend)")
if _cuda_device_available():
return "cuda"
return "mps" if torch.backends.mps.is_available() else "cpu"
if _mps_available():
return "mps"
return "cpu"

View File

@@ -0,0 +1,37 @@
"""rename_sync_run_error_message_to_error_code
Revision ID: u5v6w7x8y9z0
Revises: t4u5v6w7x8y9
Create Date: 2026-01-18 00:00:00.000000
"""
from collections.abc import Sequence
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "u5v6w7x8y9z0"
down_revision: str | Sequence[str] | None = "t4u5v6w7x8y9"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
"""Rename error_message column to error_code in integration_sync_runs."""
op.alter_column(
"integration_sync_runs",
"error_message",
new_column_name="error_code",
schema="noteflow",
)
def downgrade() -> None:
"""Restore error_code column back to error_message in integration_sync_runs."""
op.alter_column(
"integration_sync_runs",
"error_code",
new_column_name="error_message",
schema="noteflow",
)

View File

@@ -18,6 +18,8 @@ from sqlalchemy import (
from sqlalchemy.dialects.postgresql import ARRAY, UUID
from sqlalchemy.orm import Mapped, mapped_column, relationship
from noteflow.domain.entities.integration import IntegrationStatus
from .._base import Base
from .._columns import jsonb_dict_column, utc_now_column, workspace_id_fk_column
from .._mixins import CreatedAtMixin, UpdatedAtMixin, UuidPrimaryKeyMixin
@@ -62,7 +64,9 @@ class IntegrationModel(UuidPrimaryKeyMixin, CreatedAtMixin, UpdatedAtMixin, Base
workspace_id: Mapped[PyUUID] = workspace_id_fk_column()
name: Mapped[str] = mapped_column(Text, nullable=False)
type: Mapped[str] = mapped_column(Text, nullable=False)
status: Mapped[str] = mapped_column(Text, nullable=False, default="disconnected")
status: Mapped[str] = mapped_column(
Text, nullable=False, default=IntegrationStatus.DISCONNECTED.value
)
config: Mapped[dict[str, object]] = jsonb_dict_column()
last_sync: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True),
@@ -140,7 +144,7 @@ class IntegrationSyncRunModel(UuidPrimaryKeyMixin, Base):
nullable=True,
)
duration_ms: Mapped[int | None] = mapped_column(Integer, nullable=True)
error_message: Mapped[str | None] = mapped_column(Text, nullable=True)
error_code: Mapped[str | None] = mapped_column(Text, nullable=True)
stats: Mapped[dict[str, object]] = jsonb_dict_column()
# Relationships

View File

@@ -79,7 +79,7 @@ async def update_sync_run(
model.status = sync_run.status.value
model.ended_at = sync_run.ended_at
model.duration_ms = sync_run.duration_ms
model.error_message = sync_run.error_message
model.error_code = sync_run.error_code.value if sync_run.error_code else None
model.stats = sync_run.stats
await session.flush()

View File

@@ -13,6 +13,7 @@ from noteflow.domain.entities.integration import (
Integration,
IntegrationStatus,
IntegrationType,
SyncErrorCode,
SyncRun,
SyncRunStatus,
)
@@ -268,7 +269,7 @@ class TestSyncRunConverterOrmToDomain:
model.started_at = datetime(2024, 1, 15, 12, 0, 0, tzinfo=UTC)
model.ended_at = datetime(2024, 1, 15, 12, 0, 5, tzinfo=UTC)
model.duration_ms = SYNC_RUN_DURATION_MS_SHORT
model.error_message = None
model.error_code = None
model.stats = {"items_synced": 10, "items_total": SYNC_RUN_ITEMS_SYNCED}
return model
@@ -283,7 +284,7 @@ class TestSyncRunConverterOrmToDomain:
assert result.integration_id == mock_sync_run_model.integration_id, "Integration ID should match"
assert result.status == SyncRunStatus.SUCCESS, "Status should be enum"
assert result.duration_ms == SYNC_RUN_DURATION_MS_SHORT, "Duration should match"
assert result.error_message is None, "Error message should be None"
assert result.error_code is None, "Error code should be None"
@pytest.mark.parametrize(
("status_string", "expected_enum"),
@@ -318,14 +319,14 @@ class TestSyncRunConverterOrmToDomain:
assert result.stats == {}, "None stats should become empty dict"
def test_sync_run_handles_error_status(self, mock_sync_run_model: MagicMock) -> None:
"""Error status with error message is converted correctly."""
"""Error status with error code is converted correctly."""
mock_sync_run_model.status = "error"
mock_sync_run_model.error_message = "API rate limit exceeded"
mock_sync_run_model.error_code = "provider_error"
result = SyncRunConverter.orm_to_domain(mock_sync_run_model)
assert result.status == SyncRunStatus.ERROR, "Status should be ERROR enum"
assert result.error_message == "API rate limit exceeded", "Error message should be preserved"
assert result.error_code == SyncErrorCode.PROVIDER_ERROR, "Error code should be preserved"
class TestSyncRunConverterToOrmKwargs:
@@ -340,7 +341,7 @@ class TestSyncRunConverterToOrmKwargs:
started_at=datetime(2024, 1, 15, 12, 0, 0, tzinfo=UTC),
ended_at=datetime(2024, 1, 15, 12, 0, 10, tzinfo=UTC),
duration_ms=SYNC_RUN_DURATION_MS_MEDIUM,
error_message=None,
error_code=None,
stats={"items_synced": SYNC_RUN_ITEMS_COMPLETE},
)
@@ -403,7 +404,7 @@ class TestIntegrationConverterRoundTrip:
started_at=datetime(2024, 1, 15, 12, 0, 0, tzinfo=UTC),
ended_at=datetime(2024, 1, 15, 12, 0, 15, tzinfo=UTC),
duration_ms=15000,
error_message=None,
error_code=None,
stats={"items_synced": 50, "items_total": 50},
)