refactor: optimize state updates and reduce unnecessary re-renders across client and server
- Added change detection in audio device preference updates to skip redundant disk writes when values haven't changed - Implemented PartialEq for AudioDevicePrefs to enable equality comparison - Reduced logging verbosity: changed get_preferences and persist_preferences_to_disk from info to trace/debug, and gRPC identity interceptor from debug to trace - Fixed ProjectProvider oscillation loop by using ref
This commit is contained in:
@@ -42,7 +42,12 @@ where
|
||||
{
|
||||
let snapshot = {
|
||||
let mut prefs = state.preferences.write();
|
||||
let before = prefs.audio_devices.clone();
|
||||
updater(&mut prefs.audio_devices);
|
||||
// Skip persistence if nothing changed (avoids redundant disk writes)
|
||||
if prefs.audio_devices == before {
|
||||
return Ok(());
|
||||
}
|
||||
prefs.clone()
|
||||
};
|
||||
persist_preferences_to_disk(&snapshot)?;
|
||||
|
||||
@@ -13,7 +13,7 @@ use super::audio::normalize_audio_device_id;
|
||||
#[tauri::command(rename_all = "snake_case")]
|
||||
pub fn get_preferences(state: State<'_, Arc<AppState>>) -> UserPreferences {
|
||||
let prefs = state.preferences.read().clone();
|
||||
tracing::info!(
|
||||
tracing::trace!(
|
||||
input_device_id = %prefs.audio_devices.input_device_id,
|
||||
output_device_id = %prefs.audio_devices.output_device_id,
|
||||
system_device_id = %prefs.audio_devices.system_device_id,
|
||||
@@ -105,7 +105,7 @@ pub(crate) fn persist_preferences_to_disk(preferences: &UserPreferences) -> Resu
|
||||
let prefs_path = noteflow_dir.join("preferences.json");
|
||||
let json = serde_json::to_string_pretty(preferences)?;
|
||||
std::fs::write(prefs_path, json)?;
|
||||
tracing::info!(
|
||||
tracing::debug!(
|
||||
input_device_id = %preferences.audio_devices.input_device_id,
|
||||
output_device_id = %preferences.audio_devices.output_device_id,
|
||||
system_device_id = %preferences.audio_devices.system_device_id,
|
||||
|
||||
@@ -7,7 +7,7 @@ use tonic::service::interceptor::InterceptedService;
|
||||
use tonic::service::Interceptor;
|
||||
use tonic::transport::{Channel, Endpoint};
|
||||
use tonic::{Request, Status};
|
||||
use tracing::{debug, info, warn};
|
||||
use tracing::{debug, info, trace, warn};
|
||||
|
||||
use crate::constants::grpc as grpc_config;
|
||||
use crate::error::{Error, Result};
|
||||
@@ -67,7 +67,7 @@ impl Interceptor for IdentityInterceptor {
|
||||
let workspace_id = self.identity.workspace_id();
|
||||
let access_token = self.identity.access_token();
|
||||
|
||||
debug!(
|
||||
trace!(
|
||||
request_id = %request_id,
|
||||
user_id = %user_id,
|
||||
workspace_id = %workspace_id,
|
||||
@@ -110,7 +110,7 @@ impl Interceptor for IdentityInterceptor {
|
||||
);
|
||||
}
|
||||
|
||||
debug!(
|
||||
trace!(
|
||||
request_id = %request_id,
|
||||
header_count = metadata.len(),
|
||||
"identity_interceptor_headers_added"
|
||||
|
||||
@@ -115,7 +115,7 @@ impl UserPreferences {
|
||||
}
|
||||
|
||||
/// Audio device preferences
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
|
||||
pub struct AudioDevicePrefs {
|
||||
/// Input device ID (microphone)
|
||||
pub input_device_id: String,
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
// Project context for managing active project selection and project data
|
||||
|
||||
import { useCallback, useEffect, useMemo, useState } from 'react';
|
||||
import { useCallback, useEffect, useMemo, useRef, useState } from 'react';
|
||||
import { IdentityDefaults } from '@/api/constants';
|
||||
import { extractErrorMessage } from '@/api/helpers';
|
||||
import { getAPI } from '@/api/interface';
|
||||
@@ -59,23 +59,29 @@ export function ProjectProvider({ children }: { children: React.ReactNode }) {
|
||||
const [isLoading, setIsLoading] = useState(false);
|
||||
const [error, setError] = useState<string | null>(null);
|
||||
|
||||
// Use ref to avoid recreating loadProjects when currentWorkspace changes
|
||||
// This prevents the oscillation loop where workspace changes → loadProjects recreates → effect runs
|
||||
const workspaceRef = useRef(currentWorkspace);
|
||||
workspaceRef.current = currentWorkspace;
|
||||
|
||||
const loadProjects = useCallback(async () => {
|
||||
if (!currentWorkspace) {
|
||||
const workspace = workspaceRef.current;
|
||||
if (!workspace) {
|
||||
return;
|
||||
}
|
||||
setIsLoading(true);
|
||||
setError(null);
|
||||
try {
|
||||
const response = await getAPI().listProjects({
|
||||
workspace_id: currentWorkspace.id,
|
||||
workspace_id: workspace.id,
|
||||
include_archived: true,
|
||||
limit: 200,
|
||||
offset: 0,
|
||||
});
|
||||
let preferredId = readStoredProjectId(currentWorkspace.id);
|
||||
let preferredId = readStoredProjectId(workspace.id);
|
||||
try {
|
||||
const activeResponse = await getAPI().getActiveProject({
|
||||
workspace_id: currentWorkspace.id,
|
||||
workspace_id: workspace.id,
|
||||
});
|
||||
const activeId = activeResponse.project_id ?? activeResponse.project?.id;
|
||||
if (activeId) {
|
||||
@@ -86,27 +92,29 @@ export function ProjectProvider({ children }: { children: React.ReactNode }) {
|
||||
}
|
||||
const available = response.projects.length
|
||||
? response.projects
|
||||
: [fallbackProject(currentWorkspace.id)];
|
||||
: [fallbackProject(workspace.id)];
|
||||
setProjects(available);
|
||||
const resolved = resolveActiveProject(available, preferredId);
|
||||
setActiveProjectId(resolved?.id ?? null);
|
||||
if (resolved) {
|
||||
persistProjectId(currentWorkspace.id, resolved.id);
|
||||
persistProjectId(workspace.id, resolved.id);
|
||||
}
|
||||
} catch (err) {
|
||||
setError(extractErrorMessage(err, 'Failed to load projects'));
|
||||
const fallback = fallbackProject(currentWorkspace.id);
|
||||
const fallback = fallbackProject(workspace.id);
|
||||
setProjects([fallback]);
|
||||
setActiveProjectId(fallback.id);
|
||||
persistProjectId(currentWorkspace.id, fallback.id);
|
||||
persistProjectId(workspace.id, fallback.id);
|
||||
} finally {
|
||||
setIsLoading(false);
|
||||
}
|
||||
}, [currentWorkspace]);
|
||||
}, []);
|
||||
|
||||
// Reload projects when workspace ID changes (not on every workspace object reference change)
|
||||
const workspaceId = currentWorkspace?.id;
|
||||
useEffect(() => {
|
||||
void loadProjects();
|
||||
}, [loadProjects]);
|
||||
}, [loadProjects, workspaceId]);
|
||||
|
||||
const switchProject = useCallback(
|
||||
(projectId: string) => {
|
||||
|
||||
@@ -98,10 +98,19 @@ export function useAsyncData<T>(
|
||||
// Track current fetch to handle race conditions
|
||||
const fetchIdRef = useRef(0);
|
||||
|
||||
// Use refs for options that shouldn't trigger refetches when they change
|
||||
// This prevents oscillation loops when skip/callbacks change during loading transitions
|
||||
const skipRef = useRef(skip);
|
||||
skipRef.current = skip;
|
||||
const onSuccessRef = useRef(onSuccess);
|
||||
onSuccessRef.current = onSuccess;
|
||||
const onErrorRef = useRef(onError);
|
||||
onErrorRef.current = onError;
|
||||
|
||||
// The fetcher is intentionally excluded - deps are controlled by caller
|
||||
// biome-ignore lint/correctness/useExhaustiveDependencies: fetcher is intentionally excluded, user controls deps
|
||||
const doFetch = useCallback(async () => {
|
||||
if (skip) {
|
||||
if (skipRef.current) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -117,17 +126,17 @@ export function useAsyncData<T>(
|
||||
if (isMountedRef.current && currentFetchId === fetchIdRef.current) {
|
||||
setData(result);
|
||||
setIsLoading(false);
|
||||
onSuccess?.(result);
|
||||
onSuccessRef.current?.(result);
|
||||
}
|
||||
} catch (err) {
|
||||
if (isMountedRef.current && currentFetchId === fetchIdRef.current) {
|
||||
const errorMessage = extractErrorMessage(err);
|
||||
setError(errorMessage);
|
||||
setIsLoading(false);
|
||||
onError?.(errorMessage);
|
||||
onErrorRef.current?.(errorMessage);
|
||||
}
|
||||
}
|
||||
}, [skip, onSuccess, onError, ...deps]);
|
||||
}, deps);
|
||||
|
||||
const refetch = useCallback(() => {
|
||||
void doFetch();
|
||||
|
||||
@@ -161,15 +161,39 @@ export function useDiarization(options: UseDiarizationOptions = {}): UseDiarizat
|
||||
// Reset retry count on successful poll
|
||||
retryCountRef.current = 0;
|
||||
|
||||
// Update state based on status
|
||||
setState((prev) => ({
|
||||
...prev,
|
||||
status: status.status,
|
||||
progress: status.progress_percent ?? 0,
|
||||
speakerIds: status.speaker_ids ?? [],
|
||||
segmentsUpdated: status.segments_updated ?? 0,
|
||||
error: status.error_message || null,
|
||||
}));
|
||||
// Only update state if values have actually changed to avoid unnecessary re-renders
|
||||
setState((prev) => {
|
||||
const newStatus = status.status;
|
||||
const newProgress = status.progress_percent ?? 0;
|
||||
const newSpeakerIds = status.speaker_ids ?? [];
|
||||
const newSegmentsUpdated = status.segments_updated ?? 0;
|
||||
const newError = status.error_message || null;
|
||||
|
||||
// Check if anything changed
|
||||
const speakerIdsChanged =
|
||||
prev.speakerIds.length !== newSpeakerIds.length ||
|
||||
prev.speakerIds.some((id, i) => id !== newSpeakerIds[i]);
|
||||
|
||||
if (
|
||||
prev.status === newStatus &&
|
||||
prev.progress === newProgress &&
|
||||
prev.segmentsUpdated === newSegmentsUpdated &&
|
||||
prev.error === newError &&
|
||||
!speakerIdsChanged
|
||||
) {
|
||||
// No changes - return previous state to avoid re-render
|
||||
return prev;
|
||||
}
|
||||
|
||||
return {
|
||||
...prev,
|
||||
status: newStatus,
|
||||
progress: newProgress,
|
||||
speakerIds: newSpeakerIds,
|
||||
segmentsUpdated: newSegmentsUpdated,
|
||||
error: newError,
|
||||
};
|
||||
});
|
||||
|
||||
// Check terminal states
|
||||
if (status.status === 'completed') {
|
||||
|
||||
@@ -60,7 +60,7 @@ export default function MeetingsPage() {
|
||||
project_ids: projectScope === 'selected' ? selectedProjectIds : undefined,
|
||||
})
|
||||
.then((r) => r.meetings),
|
||||
[stateFilter, resolvedProjectId, projectScope, selectedProjectIds, projectsLoading],
|
||||
[stateFilter, resolvedProjectId, projectScope, selectedProjectIds],
|
||||
{
|
||||
initialData: [],
|
||||
skip: shouldSkipFetch,
|
||||
|
||||
@@ -64,7 +64,7 @@ export default function TasksPage() {
|
||||
project_ids: projectScope === 'selected' ? selectedProjectIds : undefined,
|
||||
})
|
||||
.then((r) => r.meetings),
|
||||
[projectScope, selectedProjectIds, resolvedProjectId, projectsLoading],
|
||||
[projectScope, selectedProjectIds, resolvedProjectId],
|
||||
{
|
||||
initialData: [],
|
||||
skip: shouldSkipFetch,
|
||||
|
||||
@@ -141,6 +141,16 @@ async def _build_segments_from_results(
|
||||
"""
|
||||
segments_to_add: list[tuple[Segment, noteflow_pb2.TranscriptUpdate]] = []
|
||||
for result in results:
|
||||
# Skip results with empty or whitespace-only text
|
||||
if not result.text or not result.text.strip():
|
||||
logger.debug(
|
||||
"Skipping empty ASR result",
|
||||
meeting_id=ctx.meeting_id,
|
||||
start=result.start,
|
||||
end=result.end,
|
||||
)
|
||||
continue
|
||||
|
||||
segment_id = ctx.host.next_segment_id(ctx.meeting_id, fallback=ctx.meeting.next_segment_id)
|
||||
segment = create_segment_from_asr(
|
||||
ctx.meeting.id, segment_id, result, ctx.segment_start_time
|
||||
|
||||
@@ -27,9 +27,9 @@ from noteflow.infrastructure.logging import LoggingConfig, configure_logging, ge
|
||||
|
||||
from ..config.cli import build_config_from_args, parse_args
|
||||
from ..config.config import DEFAULT_BIND_ADDRESS, AsrConfig, GrpcServerConfig, ServicesConfig
|
||||
from ..startup.startup import StartupServices
|
||||
from ..startup.banner import print_startup_banner
|
||||
from ..service import NoteFlowServicer
|
||||
from ..startup.banner import print_startup_banner
|
||||
from ..startup.startup import StartupServices
|
||||
from .internal.bootstrap import (
|
||||
create_services,
|
||||
init_db,
|
||||
@@ -284,6 +284,12 @@ async def run_server_with_config(config: GrpcServerConfig) -> None:
|
||||
|
||||
def main() -> None:
|
||||
"""Entry point for NoteFlow gRPC server."""
|
||||
# Configure platform-specific settings BEFORE any torch imports
|
||||
# This must happen first to suppress NNPACK warnings on unsupported hardware
|
||||
from noteflow.infrastructure.platform import configure_pytorch_for_platform
|
||||
|
||||
configure_pytorch_for_platform()
|
||||
|
||||
args = parse_args()
|
||||
|
||||
# Configure centralized logging with structlog
|
||||
|
||||
@@ -29,8 +29,8 @@ class PartialAudioBuffer:
|
||||
per meeting in the async gRPC streaming context.
|
||||
"""
|
||||
|
||||
# Default buffer capacity (5 seconds should cover any partial window)
|
||||
DEFAULT_MAX_DURATION: Final[float] = 5.0
|
||||
# Default buffer capacity (10 seconds provides margin for slower transcription)
|
||||
DEFAULT_MAX_DURATION: Final[float] = 10.0
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
|
||||
@@ -181,7 +181,24 @@ class DiarizationSession:
|
||||
|
||||
duration = len(audio) / sample_rate
|
||||
current_time = self._stream_time + duration
|
||||
waveform = self._build_waveform(audio, duration)
|
||||
|
||||
# Skip pipeline processing for silent/near-silent or constant audio to avoid
|
||||
# division-by-zero errors in diart's internal normalization calculations.
|
||||
# RMS threshold -60 dB (0.001) filters silence.
|
||||
# Std threshold (0.001) filters constant signals that would cause std=0.
|
||||
rms_energy = float(np.sqrt(np.mean(audio**2)))
|
||||
std_deviation = float(np.std(audio))
|
||||
if rms_energy < 0.001 or std_deviation < 0.001:
|
||||
logger.debug(
|
||||
"Skipping diarization chunk with low energy or variance",
|
||||
rms_energy=round(rms_energy, 6),
|
||||
std_deviation=round(std_deviation, 6),
|
||||
stream_time=round(self._stream_time, 2),
|
||||
)
|
||||
self._stream_time = current_time
|
||||
return []
|
||||
|
||||
waveform = self._build_waveform(audio, sample_rate)
|
||||
new_turns = self._run_pipeline(waveform)
|
||||
if new_turns:
|
||||
self._turns.extend(new_turns)
|
||||
@@ -192,13 +209,28 @@ class DiarizationSession:
|
||||
def _build_waveform(
|
||||
self,
|
||||
audio: NDArray[np.float32],
|
||||
duration: float,
|
||||
sample_rate: int,
|
||||
) -> SlidingWindowFeature:
|
||||
"""Build a SlidingWindowFeature for diarization."""
|
||||
"""Build a SlidingWindowFeature for diarization.
|
||||
|
||||
Args:
|
||||
audio: 1D audio samples array (mono).
|
||||
sample_rate: Audio sample rate in Hz.
|
||||
|
||||
Returns:
|
||||
SlidingWindowFeature with correct temporal window configuration
|
||||
for pyannote/diart processing.
|
||||
"""
|
||||
from pyannote.core import SlidingWindow, SlidingWindowFeature
|
||||
|
||||
audio_2d = audio.reshape(-1, 1)
|
||||
window = SlidingWindow(start=0.0, duration=duration, step=duration)
|
||||
# Reshape to (channels, samples) - pyannote expects channels-first format
|
||||
audio_2d = audio.reshape(1, -1)
|
||||
|
||||
# Configure window for per-sample temporal resolution.
|
||||
# Each row in the data array represents one audio sample with duration 1/sample_rate.
|
||||
# Using chunk duration here was incorrect and caused frames/weights mismatch warnings.
|
||||
sample_duration = 1.0 / sample_rate
|
||||
window = SlidingWindow(start=0.0, duration=sample_duration, step=sample_duration)
|
||||
return SlidingWindowFeature(audio_2d, window)
|
||||
|
||||
def _run_pipeline(self, waveform: SlidingWindowFeature) -> list[SpeakerTurn]:
|
||||
|
||||
59
src/noteflow/infrastructure/platform/__init__.py
Normal file
59
src/noteflow/infrastructure/platform/__init__.py
Normal file
@@ -0,0 +1,59 @@
|
||||
"""Platform detection and configuration utilities.
|
||||
|
||||
Provides utilities for detecting hardware capabilities and configuring
|
||||
runtime optimizations based on the host platform.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import subprocess
|
||||
from functools import cache
|
||||
|
||||
|
||||
@cache
|
||||
def has_avx2_support() -> bool:
|
||||
"""Check if the CPU supports AVX2 instructions.
|
||||
|
||||
AVX2 is required for NNPACK neural network acceleration.
|
||||
Returns cached result after first call.
|
||||
|
||||
Returns:
|
||||
True if AVX2 is supported, False otherwise.
|
||||
"""
|
||||
try:
|
||||
# Linux: check /proc/cpuinfo
|
||||
if os.path.exists("/proc/cpuinfo"):
|
||||
with open("/proc/cpuinfo", encoding="utf-8") as f:
|
||||
cpuinfo = f.read()
|
||||
return "avx2" in cpuinfo.lower()
|
||||
|
||||
# macOS/BSD: use sysctl
|
||||
result = subprocess.run(
|
||||
["sysctl", "-n", "machdep.cpu.features"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=False,
|
||||
)
|
||||
if result.returncode == 0:
|
||||
return "avx2" in result.stdout.lower()
|
||||
|
||||
except (OSError, subprocess.SubprocessError):
|
||||
pass
|
||||
|
||||
# Default to False (conservative - will disable NNPACK)
|
||||
return False
|
||||
|
||||
|
||||
def configure_pytorch_for_platform() -> None:
|
||||
"""Configure PyTorch environment variables based on platform capabilities.
|
||||
|
||||
Must be called BEFORE importing torch to take effect.
|
||||
|
||||
Configures:
|
||||
- NNPACK: Disabled if AVX2 not supported (prevents warning spam)
|
||||
"""
|
||||
# Disable NNPACK if hardware doesn't support it
|
||||
# This prevents the "Could not initialize NNPACK" warnings
|
||||
if not has_avx2_support():
|
||||
os.environ.setdefault("PYTORCH_DISABLE_NNPACK", "1")
|
||||
Reference in New Issue
Block a user