Files
noteflow/spikes/spike_04_encryption/demo.py
Travis Vasceannie af1285b181 Add initial project structure and files
- Introduced .python-version for Python version management.
- Added AGENTS.md for documentation on agent usage and best practices.
- Created alembic.ini for database migration configurations.
- Implemented main.py as the entry point for the application.
- Established pyproject.toml for project dependencies and configurations.
- Initialized README.md for project overview.
- Generated uv.lock for dependency locking.
- Documented milestones and specifications in docs/milestones.md and docs/spec.md.
- Created logs/status_line.json for logging status information.
- Added initial spike implementations for UI tray hotkeys, audio capture, ASR latency, and encryption validation.
- Set up NoteFlow core structure in src/noteflow with necessary modules and services.
- Developed test suite in tests directory for application, domain, infrastructure, and integration testing.
- Included initial migration scripts in infrastructure/persistence/migrations for database setup.
- Established security protocols in infrastructure/security for key management and encryption.
- Implemented audio infrastructure for capturing and processing audio data.
- Created converters for ASR and ORM in infrastructure/converters.
- Added export functionality for different formats in infrastructure/export.
- Ensured all new files are included in the repository for future development.
2025-12-17 18:28:59 +00:00

306 lines
10 KiB
Python

"""Interactive encryption demo for Spike 4.
Run with: python -m spikes.spike_04_encryption.demo
Features:
- Creates/retrieves master key from OS keychain
- Generates and wraps/unwraps DEKs
- Encrypts a sample file in chunks
- Decrypts and verifies integrity
- Demonstrates DEK deletion renders file unreadable
- Reports encryption/decryption throughput
"""
from __future__ import annotations
import argparse
import logging
import secrets
import time
from pathlib import Path
from .crypto_impl import AesGcmCryptoBox, ChunkedAssetReader, ChunkedAssetWriter
from .keystore_impl import InMemoryKeyStore, KeyringKeyStore
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger(__name__)
def format_size(size_bytes: float) -> str:
"""Format byte size as human-readable string."""
current_size: float = size_bytes
for unit in ["B", "KB", "MB", "GB"]:
if current_size < 1024:
return f"{current_size:.2f} {unit}"
current_size /= 1024
return f"{current_size:.2f} TB"
def format_speed(bytes_per_sec: float) -> str:
"""Format speed as human-readable string."""
return f"{format_size(int(bytes_per_sec))}/s"
class EncryptionDemo:
"""Interactive encryption demonstration."""
def __init__(self, use_keyring: bool = False) -> None:
"""Initialize the demo.
Args:
use_keyring: If True, use OS keyring; otherwise use in-memory storage.
"""
if use_keyring:
self.keystore = KeyringKeyStore(service_name="noteflow-demo")
print("Using OS keyring for key storage")
else:
self.keystore = InMemoryKeyStore()
print("Using in-memory key storage (keys lost on exit)")
self.crypto = AesGcmCryptoBox(self.keystore)
def demo_key_storage(self) -> None:
"""Demonstrate key storage operations."""
print("\n=== Key Storage Demo ===")
# Check if key exists
has_key = self.keystore.has_master_key()
print(f"Master key exists: {has_key}")
# Get or create key
print("Getting/creating master key...")
start = time.perf_counter()
key = self.keystore.get_or_create_master_key()
elapsed = time.perf_counter() - start
print(f" Key retrieved in {elapsed * 1000:.2f}ms")
print(f" Key size: {len(key)} bytes ({len(key) * 8} bits)")
# Verify same key is returned
key2 = self.keystore.get_or_create_master_key()
print(f" Same key returned: {key == key2}")
def demo_dek_operations(self) -> None:
"""Demonstrate DEK generation and wrapping."""
print("\n=== DEK Operations Demo ===")
# Generate DEK
print("Generating DEK...")
dek = self.crypto.generate_dek()
print(f" DEK size: {len(dek)} bytes")
# Wrap DEK
print("Wrapping DEK with master key...")
start = time.perf_counter()
wrapped = self.crypto.wrap_dek(dek)
wrap_time = time.perf_counter() - start
print(f" Wrapped DEK size: {len(wrapped)} bytes")
print(f" Wrap time: {wrap_time * 1000:.3f}ms")
# Unwrap DEK
print("Unwrapping DEK...")
start = time.perf_counter()
unwrapped = self.crypto.unwrap_dek(wrapped)
unwrap_time = time.perf_counter() - start
print(f" Unwrap time: {unwrap_time * 1000:.3f}ms")
print(f" DEK matches original: {dek == unwrapped}")
def demo_chunk_encryption(self, chunk_size: int = 16384) -> None:
"""Demonstrate chunk encryption/decryption."""
print("\n=== Chunk Encryption Demo ===")
dek = self.crypto.generate_dek()
plaintext = secrets.token_bytes(chunk_size)
print(f"Encrypting {format_size(chunk_size)} chunk...")
start = time.perf_counter()
chunk = self.crypto.encrypt_chunk(plaintext, dek)
encrypt_time = time.perf_counter() - start
overhead = len(chunk.nonce) + len(chunk.tag)
print(f" Nonce size: {len(chunk.nonce)} bytes")
print(f" Ciphertext size: {len(chunk.ciphertext)} bytes")
print(f" Tag size: {len(chunk.tag)} bytes")
print(f" Overhead: {overhead} bytes ({overhead / float(chunk_size) * 100:.1f}%)")
print(f" Encrypt time: {encrypt_time * 1000:.3f}ms")
print(f" Throughput: {format_speed(chunk_size / encrypt_time)}")
print("Decrypting chunk...")
start = time.perf_counter()
decrypted = self.crypto.decrypt_chunk(chunk, dek)
decrypt_time = time.perf_counter() - start
print(f" Decrypt time: {decrypt_time * 1000:.3f}ms")
print(f" Throughput: {format_speed(chunk_size / decrypt_time)}")
print(f" Data matches: {plaintext == decrypted}")
def demo_file_encryption(
self,
output_path: Path,
total_size: int = 1024 * 1024, # 1MB
chunk_size: int = 16384, # 16KB
) -> tuple[bytes, list[bytes]]:
"""Demonstrate file encryption and return the DEK and chunks.
Args:
output_path: Path to write encrypted file.
total_size: Total data size to encrypt.
chunk_size: Size of each chunk.
Returns:
Tuple of (DEK used for encryption, list of original chunks).
"""
print(f"\n=== File Encryption Demo ({format_size(total_size)}) ===")
dek = self.crypto.generate_dek()
writer = ChunkedAssetWriter(self.crypto)
# Generate test data
print("Generating test data...")
chunks = []
remaining = total_size
while remaining > 0:
size = min(chunk_size, remaining)
chunks.append(secrets.token_bytes(size))
remaining -= size
print(f"Writing {len(chunks)} chunks to {output_path}...")
start = time.perf_counter()
writer.open(output_path, dek)
for chunk in chunks:
writer.write_chunk(chunk)
writer.close()
elapsed = time.perf_counter() - start
file_size = output_path.stat().st_size
print(f" File size: {format_size(file_size)}")
print(f" Overhead: {format_size(file_size - total_size)} ({(file_size / total_size - 1) * 100:.1f}%)")
print(f" Time: {elapsed:.3f}s")
print(f" Throughput: {format_speed(total_size / float(elapsed))}")
return dek, chunks
def demo_file_decryption(
self,
input_path: Path,
dek: bytes,
original_chunks: list[bytes],
) -> None:
"""Demonstrate file decryption.
Args:
input_path: Path to encrypted file.
dek: DEK used for encryption.
original_chunks: Original plaintext chunks for verification.
"""
print("\n=== File Decryption Demo ===")
reader = ChunkedAssetReader(self.crypto)
print(f"Reading from {input_path}...")
start = time.perf_counter()
reader.open(input_path, dek)
decrypted_chunks = list(reader.read_chunks())
reader.close()
elapsed = time.perf_counter() - start
total_size = sum(len(c) for c in decrypted_chunks)
print(f" Chunks read: {len(decrypted_chunks)}")
print(f" Total data: {format_size(total_size)}")
print(f" Time: {elapsed:.3f}s")
print(f" Throughput: {format_speed(total_size / elapsed)}")
# Verify integrity
if len(decrypted_chunks) != len(original_chunks):
print(" INTEGRITY FAIL: chunk count mismatch")
else:
all_match = all(d == o for d, o in zip(decrypted_chunks, original_chunks, strict=True))
print(f" Integrity verified: {all_match}")
def demo_dek_deletion(self, input_path: Path, dek: bytes) -> None:
"""Demonstrate that deleting DEK renders file unreadable."""
print("\n=== DEK Deletion Demo ===")
print("Attempting to read file with correct DEK...")
reader = ChunkedAssetReader(self.crypto)
reader.open(input_path, dek)
first_chunk = next(reader.read_chunks())
reader.close()
print(f" Success: read {format_size(len(first_chunk))}")
print("\nSimulating DEK deletion (using wrong key)...")
wrong_dek = secrets.token_bytes(32)
reader = ChunkedAssetReader(self.crypto)
reader.open(input_path, wrong_dek)
try:
list(reader.read_chunks())
print(" FAIL: Should have raised error!")
except ValueError as e:
print(" Success: Decryption failed as expected")
print(f" Error: {e}")
finally:
reader.close()
def run(self, output_path: Path) -> None:
"""Run all demos."""
print("=" * 60)
print("NoteFlow Encryption Demo - Spike 4")
print("=" * 60)
self.demo_key_storage()
self.demo_dek_operations()
self.demo_chunk_encryption()
dek, chunks = self.demo_file_encryption(output_path)
self.demo_file_decryption(output_path, dek, chunks)
self.demo_dek_deletion(output_path, dek)
# Cleanup
print("\n=== Cleanup ===")
if output_path.exists():
output_path.unlink()
print(f"Deleted test file: {output_path}")
print("\nDemo complete!")
def main() -> None:
"""Run the encryption demo."""
parser = argparse.ArgumentParser(description="Encryption Demo - Spike 4")
parser.add_argument(
"-o",
"--output",
type=Path,
default=Path("demo_encrypted.bin"),
help="Output file path for encryption demo (default: demo_encrypted.bin)",
)
parser.add_argument(
"-k",
"--keyring",
action="store_true",
help="Use OS keyring instead of in-memory key storage",
)
parser.add_argument(
"-s",
"--size",
type=int,
default=1024 * 1024,
help="Total data size to encrypt in bytes (default: 1MB)",
)
args = parser.parse_args()
demo = EncryptionDemo(use_keyring=args.keyring)
demo.run(args.output)
if __name__ == "__main__":
main()