Files
noteflow/tests/stress/test_audio_integrity.py
Travis Vasceannie b333ea5b23 Add initial Docker and development environment setup
- Created .dockerignore to exclude unnecessary files from Docker builds.
- Added .repomixignore for managing ignored patterns in Repomix.
- Introduced Dockerfile.dev for development environment setup with Python 3.12.
- Configured docker-compose.yaml to define services, including a PostgreSQL database.
- Established a devcontainer.json for Visual Studio Code integration.
- Implemented postCreate.sh for automatic dependency installation in the dev container.
- Added constants.py to centralize configuration constants for the project.
- Updated pyproject.toml to include new development dependencies.
- Created initial documentation files for project overview and style conventions.
- Added tests for new functionalities to ensure reliability and correctness.
2025-12-19 05:02:16 +00:00

494 lines
17 KiB
Python

"""Stress tests for encrypted audio file format (NFAE) resilience.
Tests truncation recovery, missing manifest handling, and corruption detection.
"""
from __future__ import annotations
import json
import struct
from pathlib import Path
from uuid import uuid4
import numpy as np
import pytest
from numpy.typing import NDArray
from noteflow.infrastructure.audio.reader import MeetingAudioReader
from noteflow.infrastructure.audio.writer import MeetingAudioWriter
from noteflow.infrastructure.security.crypto import (
FILE_MAGIC,
FILE_VERSION,
AesGcmCryptoBox,
ChunkedAssetReader,
ChunkedAssetWriter,
)
from noteflow.infrastructure.security.keystore import InMemoryKeyStore
@pytest.fixture
def crypto() -> AesGcmCryptoBox:
"""Create crypto with in-memory keystore."""
return AesGcmCryptoBox(InMemoryKeyStore())
@pytest.fixture
def meetings_dir(tmp_path: Path) -> Path:
"""Create temporary meetings directory."""
return tmp_path / "meetings"
def make_audio(samples: int = 1600) -> NDArray[np.float32]:
"""Create test audio with random values."""
return np.random.uniform(-0.5, 0.5, samples).astype(np.float32)
class TestTruncatedWriteRecovery:
"""Test behavior when audio file is truncated (power loss simulation)."""
@pytest.mark.stress
def test_truncated_header_partial_magic(
self, crypto: AesGcmCryptoBox, meetings_dir: Path
) -> None:
"""Truncated file (only partial magic bytes) raises on read."""
meeting_id = str(uuid4())
meeting_dir = meetings_dir / meeting_id
meeting_dir.mkdir(parents=True)
audio_path = meeting_dir / "audio.enc"
audio_path.write_bytes(FILE_MAGIC[:2])
reader = ChunkedAssetReader(crypto)
dek = crypto.generate_dek()
with pytest.raises(ValueError, match="Invalid file format"):
reader.open(audio_path, dek)
@pytest.mark.stress
def test_truncated_header_missing_version(
self, crypto: AesGcmCryptoBox, meetings_dir: Path
) -> None:
"""File with magic but truncated before version byte."""
meeting_id = str(uuid4())
meeting_dir = meetings_dir / meeting_id
meeting_dir.mkdir(parents=True)
audio_path = meeting_dir / "audio.enc"
audio_path.write_bytes(FILE_MAGIC)
reader = ChunkedAssetReader(crypto)
dek = crypto.generate_dek()
with pytest.raises((struct.error, ValueError)):
reader.open(audio_path, dek)
@pytest.mark.stress
def test_truncated_chunk_length_partial(
self, crypto: AesGcmCryptoBox, meetings_dir: Path
) -> None:
"""File with complete header but truncated chunk length."""
meeting_id = str(uuid4())
meeting_dir = meetings_dir / meeting_id
meeting_dir.mkdir(parents=True)
audio_path = meeting_dir / "audio.enc"
with audio_path.open("wb") as f:
f.write(FILE_MAGIC)
f.write(struct.pack("B", FILE_VERSION))
f.write(struct.pack(">I", 1000)[:2])
dek = crypto.generate_dek()
reader = ChunkedAssetReader(crypto)
reader.open(audio_path, dek)
chunks = list(reader.read_chunks())
assert not chunks
reader.close()
@pytest.mark.stress
def test_truncated_chunk_data_raises(self, crypto: AesGcmCryptoBox, meetings_dir: Path) -> None:
"""File with chunk length but truncated data raises ValueError."""
meeting_id = str(uuid4())
meeting_dir = meetings_dir / meeting_id
meeting_dir.mkdir(parents=True)
audio_path = meeting_dir / "audio.enc"
with audio_path.open("wb") as f:
f.write(FILE_MAGIC)
f.write(struct.pack("B", FILE_VERSION))
f.write(struct.pack(">I", 100))
f.write(b"short")
dek = crypto.generate_dek()
reader = ChunkedAssetReader(crypto)
reader.open(audio_path, dek)
with pytest.raises(ValueError, match="Truncated chunk"):
list(reader.read_chunks())
reader.close()
@pytest.mark.stress
def test_valid_chunks_before_truncation_preserved(
self, crypto: AesGcmCryptoBox, meetings_dir: Path
) -> None:
"""Valid chunks before truncation can still be read."""
meeting_id = str(uuid4())
meeting_dir = meetings_dir / meeting_id
meeting_dir.mkdir(parents=True)
audio_path = meeting_dir / "audio.enc"
dek = crypto.generate_dek()
writer = ChunkedAssetWriter(crypto)
writer.open(audio_path, dek)
test_data = b"valid audio chunk data 1"
writer.write_chunk(test_data)
writer.close()
with audio_path.open("ab") as f:
f.write(struct.pack(">I", 500))
f.write(b"truncated")
reader = ChunkedAssetReader(crypto)
reader.open(audio_path, dek)
chunks = []
try:
chunks.extend(iter(reader.read_chunks()))
except ValueError:
pass
finally:
reader.close()
assert len(chunks) == 1
assert chunks[0] == test_data
class TestMissingManifest:
"""Test behavior when manifest.json is missing."""
@pytest.mark.stress
def test_audio_exists_false_without_manifest(
self, crypto: AesGcmCryptoBox, meetings_dir: Path
) -> None:
"""audio_exists returns False when only audio.enc exists."""
meeting_id = str(uuid4())
meeting_dir = meetings_dir / meeting_id
meeting_dir.mkdir(parents=True)
(meeting_dir / "audio.enc").write_bytes(FILE_MAGIC + bytes([FILE_VERSION]))
reader = MeetingAudioReader(crypto, meetings_dir)
assert reader.audio_exists(meeting_id) is False
@pytest.mark.stress
def test_audio_exists_false_without_audio(
self, crypto: AesGcmCryptoBox, meetings_dir: Path
) -> None:
"""audio_exists returns False when only manifest exists."""
meeting_id = str(uuid4())
meeting_dir = meetings_dir / meeting_id
meeting_dir.mkdir(parents=True)
dek = crypto.generate_dek()
wrapped_dek = crypto.wrap_dek(dek)
manifest = {
"meeting_id": meeting_id,
"sample_rate": 16000,
"wrapped_dek": wrapped_dek.hex(),
}
(meeting_dir / "manifest.json").write_text(json.dumps(manifest))
reader = MeetingAudioReader(crypto, meetings_dir)
assert reader.audio_exists(meeting_id) is False
@pytest.mark.stress
def test_audio_exists_true_when_both_exist(
self, crypto: AesGcmCryptoBox, meetings_dir: Path
) -> None:
"""audio_exists returns True when both manifest and audio exist."""
meeting_id = str(uuid4())
dek = crypto.generate_dek()
wrapped_dek = crypto.wrap_dek(dek)
writer = MeetingAudioWriter(crypto, meetings_dir)
writer.open(meeting_id, dek, wrapped_dek)
writer.write_chunk(make_audio())
writer.close()
reader = MeetingAudioReader(crypto, meetings_dir)
assert reader.audio_exists(meeting_id) is True
@pytest.mark.stress
def test_load_audio_raises_without_manifest(
self, crypto: AesGcmCryptoBox, meetings_dir: Path
) -> None:
"""load_meeting_audio raises FileNotFoundError without manifest."""
meeting_id = str(uuid4())
meeting_dir = meetings_dir / meeting_id
meeting_dir.mkdir(parents=True)
(meeting_dir / "audio.enc").write_bytes(FILE_MAGIC + bytes([FILE_VERSION]))
reader = MeetingAudioReader(crypto, meetings_dir)
with pytest.raises(FileNotFoundError, match="Manifest not found"):
reader.load_meeting_audio(meeting_id)
class TestCorruptedCiphertextDetection:
"""Test corrupted ciphertext/tag detection."""
@pytest.mark.stress
def test_bit_flip_in_ciphertext_detected(
self, crypto: AesGcmCryptoBox, meetings_dir: Path
) -> None:
"""Single bit flip in ciphertext causes decryption failure."""
meeting_id = str(uuid4())
dek = crypto.generate_dek()
wrapped_dek = crypto.wrap_dek(dek)
writer = MeetingAudioWriter(crypto, meetings_dir)
writer.open(meeting_id, dek, wrapped_dek)
writer.write_chunk(make_audio(1600))
writer.close()
audio_path = meetings_dir / meeting_id / "audio.enc"
data = bytearray(audio_path.read_bytes())
header_size = 5
length_size = 4
nonce_size = 12
corrupt_offset = header_size + length_size + nonce_size + 5
if len(data) > corrupt_offset:
data[corrupt_offset] ^= 0x01
audio_path.write_bytes(bytes(data))
reader = ChunkedAssetReader(crypto)
reader.open(audio_path, dek)
with pytest.raises(ValueError, match="Chunk decryption failed"):
list(reader.read_chunks())
reader.close()
@pytest.mark.stress
def test_bit_flip_in_tag_detected(self, crypto: AesGcmCryptoBox, meetings_dir: Path) -> None:
"""Bit flip in authentication tag causes decryption failure."""
meeting_id = str(uuid4())
dek = crypto.generate_dek()
wrapped_dek = crypto.wrap_dek(dek)
writer = MeetingAudioWriter(crypto, meetings_dir)
writer.open(meeting_id, dek, wrapped_dek)
writer.write_chunk(make_audio(1600))
writer.close()
audio_path = meetings_dir / meeting_id / "audio.enc"
data = bytearray(audio_path.read_bytes())
data[-5] ^= 0x01
audio_path.write_bytes(bytes(data))
reader = ChunkedAssetReader(crypto)
reader.open(audio_path, dek)
with pytest.raises(ValueError, match="Chunk decryption failed"):
list(reader.read_chunks())
reader.close()
@pytest.mark.stress
def test_wrong_dek_detected(self, crypto: AesGcmCryptoBox, meetings_dir: Path) -> None:
"""Using wrong DEK fails decryption."""
meeting_id = str(uuid4())
dek = crypto.generate_dek()
wrong_dek = crypto.generate_dek()
wrapped_dek = crypto.wrap_dek(dek)
writer = MeetingAudioWriter(crypto, meetings_dir)
writer.open(meeting_id, dek, wrapped_dek)
writer.write_chunk(make_audio(1600))
writer.close()
audio_path = meetings_dir / meeting_id / "audio.enc"
reader = ChunkedAssetReader(crypto)
reader.open(audio_path, wrong_dek)
with pytest.raises(ValueError, match="Chunk decryption failed"):
list(reader.read_chunks())
reader.close()
class TestInvalidManifest:
"""Test handling of invalid manifest.json content."""
@pytest.mark.stress
def test_missing_wrapped_dek_raises(self, crypto: AesGcmCryptoBox, meetings_dir: Path) -> None:
"""Manifest without wrapped_dek raises ValueError."""
meeting_id = str(uuid4())
meeting_dir = meetings_dir / meeting_id
meeting_dir.mkdir(parents=True)
manifest = {"meeting_id": meeting_id, "sample_rate": 16000}
(meeting_dir / "manifest.json").write_text(json.dumps(manifest))
(meeting_dir / "audio.enc").write_bytes(FILE_MAGIC + bytes([FILE_VERSION]))
reader = MeetingAudioReader(crypto, meetings_dir)
with pytest.raises(ValueError, match="missing wrapped_dek"):
reader.load_meeting_audio(meeting_id)
@pytest.mark.stress
def test_invalid_wrapped_dek_hex_raises(
self, crypto: AesGcmCryptoBox, meetings_dir: Path
) -> None:
"""Invalid hex string in wrapped_dek raises ValueError."""
meeting_id = str(uuid4())
meeting_dir = meetings_dir / meeting_id
meeting_dir.mkdir(parents=True)
manifest = {
"meeting_id": meeting_id,
"sample_rate": 16000,
"wrapped_dek": "not_valid_hex_!!!",
}
(meeting_dir / "manifest.json").write_text(json.dumps(manifest))
(meeting_dir / "audio.enc").write_bytes(FILE_MAGIC + bytes([FILE_VERSION]))
reader = MeetingAudioReader(crypto, meetings_dir)
with pytest.raises(ValueError):
reader.load_meeting_audio(meeting_id)
@pytest.mark.stress
def test_corrupted_wrapped_dek_raises(
self, crypto: AesGcmCryptoBox, meetings_dir: Path
) -> None:
"""Corrupted wrapped_dek (valid hex but invalid content) raises."""
meeting_id = str(uuid4())
meeting_dir = meetings_dir / meeting_id
meeting_dir.mkdir(parents=True)
dek = crypto.generate_dek()
wrapped_dek = crypto.wrap_dek(dek)
corrupted = bytearray(wrapped_dek)
corrupted[10] ^= 0xFF
manifest = {
"meeting_id": meeting_id,
"sample_rate": 16000,
"wrapped_dek": bytes(corrupted).hex(),
}
(meeting_dir / "manifest.json").write_text(json.dumps(manifest))
(meeting_dir / "audio.enc").write_bytes(FILE_MAGIC + bytes([FILE_VERSION]))
reader = MeetingAudioReader(crypto, meetings_dir)
with pytest.raises(ValueError, match="unwrap failed"):
reader.load_meeting_audio(meeting_id)
class TestWriterReaderRoundTrip:
"""Test write-read round trip integrity."""
@pytest.mark.stress
def test_single_chunk_roundtrip(self, crypto: AesGcmCryptoBox, meetings_dir: Path) -> None:
"""Single chunk write and read preserves data."""
meeting_id = str(uuid4())
dek = crypto.generate_dek()
wrapped_dek = crypto.wrap_dek(dek)
original_audio = make_audio(1600)
writer = MeetingAudioWriter(crypto, meetings_dir)
writer.open(meeting_id, dek, wrapped_dek)
writer.write_chunk(original_audio)
writer.close()
reader = MeetingAudioReader(crypto, meetings_dir)
chunks = reader.load_meeting_audio(meeting_id)
assert len(chunks) == 1
np.testing.assert_array_almost_equal(chunks[0].frames, original_audio, decimal=4)
@pytest.mark.stress
def test_multiple_chunks_roundtrip(self, crypto: AesGcmCryptoBox, meetings_dir: Path) -> None:
"""Multiple chunk write and read preserves data."""
meeting_id = str(uuid4())
dek = crypto.generate_dek()
wrapped_dek = crypto.wrap_dek(dek)
original_chunks = [make_audio(1600) for _ in range(10)]
writer = MeetingAudioWriter(crypto, meetings_dir)
writer.open(meeting_id, dek, wrapped_dek)
for chunk in original_chunks:
writer.write_chunk(chunk)
writer.close()
reader = MeetingAudioReader(crypto, meetings_dir)
loaded_chunks = reader.load_meeting_audio(meeting_id)
assert len(loaded_chunks) == len(original_chunks)
for original, loaded in zip(original_chunks, loaded_chunks, strict=True):
np.testing.assert_array_almost_equal(loaded.frames, original, decimal=4)
@pytest.mark.stress
@pytest.mark.slow
def test_large_audio_roundtrip(self, crypto: AesGcmCryptoBox, meetings_dir: Path) -> None:
"""Large audio file (1000 chunks) write and read succeeds."""
meeting_id = str(uuid4())
dek = crypto.generate_dek()
wrapped_dek = crypto.wrap_dek(dek)
writer = MeetingAudioWriter(crypto, meetings_dir)
writer.open(meeting_id, dek, wrapped_dek)
np.random.seed(42)
chunk_count = 1000
for _ in range(chunk_count):
writer.write_chunk(make_audio(1600))
writer.close()
reader = MeetingAudioReader(crypto, meetings_dir)
chunks = reader.load_meeting_audio(meeting_id)
assert len(chunks) == chunk_count
total_duration = sum(c.duration for c in chunks)
expected_duration = chunk_count * (1600 / 16000)
assert abs(total_duration - expected_duration) < 0.01
class TestFileVersionHandling:
"""Test file version validation."""
@pytest.mark.stress
def test_unsupported_version_raises(self, crypto: AesGcmCryptoBox, meetings_dir: Path) -> None:
"""Unsupported file version raises ValueError."""
meeting_id = str(uuid4())
meeting_dir = meetings_dir / meeting_id
meeting_dir.mkdir(parents=True)
audio_path = meeting_dir / "audio.enc"
with audio_path.open("wb") as f:
f.write(FILE_MAGIC)
f.write(struct.pack("B", 99))
dek = crypto.generate_dek()
reader = ChunkedAssetReader(crypto)
with pytest.raises(ValueError, match="Unsupported file version"):
reader.open(audio_path, dek)
@pytest.mark.stress
def test_wrong_magic_raises(self, crypto: AesGcmCryptoBox, meetings_dir: Path) -> None:
"""Wrong magic bytes raises ValueError."""
meeting_id = str(uuid4())
meeting_dir = meetings_dir / meeting_id
meeting_dir.mkdir(parents=True)
audio_path = meeting_dir / "audio.enc"
audio_path.write_bytes(b"XXXX" + bytes([FILE_VERSION]))
dek = crypto.generate_dek()
reader = ChunkedAssetReader(crypto)
with pytest.raises(ValueError, match="Invalid file format"):
reader.open(audio_path, dek)