448 lines
13 KiB
Python
448 lines
13 KiB
Python
"""Tests for detecting magic values and numbers.
|
|
|
|
Detects:
|
|
- Hardcoded numeric literals that should be constants
|
|
- String literals that should be enums or constants
|
|
- Repeated literals that indicate missing abstraction
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import ast
|
|
import re
|
|
from collections import defaultdict
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
|
|
|
|
@dataclass
|
|
class MagicValue:
|
|
"""Represents a magic value in the code."""
|
|
|
|
value: object
|
|
file_path: Path
|
|
line_number: int
|
|
context: str
|
|
|
|
|
|
ALLOWED_NUMBERS = {
|
|
0, 1, 2, 3, 4, 5, -1, # Small integers
|
|
10, 20, 30, 50, # Common timeout/limit values
|
|
60, 100, 200, 255, 365, 1000, 1024, # Common constants
|
|
0.1, 0.3, 0.5, # Common float values
|
|
16000, 50051, # Sample rate and gRPC port
|
|
}
|
|
ALLOWED_STRINGS = {
|
|
"",
|
|
" ",
|
|
"\n",
|
|
"\t",
|
|
"utf-8",
|
|
"utf8",
|
|
"w",
|
|
"r",
|
|
"rb",
|
|
"wb",
|
|
"a",
|
|
"GET",
|
|
"POST",
|
|
"PUT",
|
|
"DELETE",
|
|
"PATCH",
|
|
"HEAD",
|
|
"OPTIONS",
|
|
"True",
|
|
"False",
|
|
"None",
|
|
"id",
|
|
"name",
|
|
"type",
|
|
"value",
|
|
# Common domain/infrastructure terms
|
|
"__main__",
|
|
"noteflow",
|
|
"meeting",
|
|
"segment",
|
|
"summary",
|
|
"annotation",
|
|
"CASCADE",
|
|
"selectin",
|
|
"schema",
|
|
"role",
|
|
"user",
|
|
"text",
|
|
"title",
|
|
"status",
|
|
"content",
|
|
"created_at",
|
|
"updated_at",
|
|
"start_time",
|
|
"end_time",
|
|
"meeting_id",
|
|
"user_id",
|
|
"request_id",
|
|
# Domain enums
|
|
"action_item",
|
|
"decision",
|
|
"note",
|
|
"risk",
|
|
"unknown",
|
|
"completed",
|
|
"failed",
|
|
"pending",
|
|
"running",
|
|
"markdown",
|
|
"html",
|
|
# Common patterns
|
|
"base",
|
|
"auto",
|
|
"cuda",
|
|
"int8",
|
|
"float32",
|
|
# argparse actions
|
|
"store_true",
|
|
"store_false",
|
|
# ORM table/column names (intentionally repeated across models/repos)
|
|
"meetings",
|
|
"segments",
|
|
"summaries",
|
|
"annotations",
|
|
"key_points",
|
|
"action_items",
|
|
"word_timings",
|
|
"sample_rate",
|
|
"segment_ids",
|
|
"summary_id",
|
|
"wrapped_dek",
|
|
"diarization_jobs",
|
|
"user_preferences",
|
|
"streamingdiarization_turns",
|
|
# ORM cascade settings
|
|
"all, delete-orphan",
|
|
# Foreign key references
|
|
"noteflow.meetings.id",
|
|
"noteflow.summaries.id",
|
|
# Error message patterns (intentional consistency)
|
|
"UnitOfWork not in context",
|
|
"Invalid meeting_id",
|
|
"Invalid annotation_id",
|
|
# File names (infrastructure constants)
|
|
"manifest.json",
|
|
"audio.enc",
|
|
# HTML tags
|
|
"</div>",
|
|
"</dd>",
|
|
# Model class names (ORM back_populates/relationships - required by SQLAlchemy)
|
|
"ActionItemModel",
|
|
"AnnotationModel",
|
|
"CalendarEventModel",
|
|
"DiarizationJobModel",
|
|
"ExternalRefModel",
|
|
"IntegrationModel",
|
|
"IntegrationSecretModel",
|
|
"IntegrationSyncRunModel",
|
|
"KeyPointModel",
|
|
"MeetingCalendarLinkModel",
|
|
"MeetingModel",
|
|
"MeetingSpeakerModel",
|
|
"MeetingTagModel",
|
|
"NamedEntityModel",
|
|
"PersonModel",
|
|
"SegmentModel",
|
|
"SettingsModel",
|
|
"StreamingDiarizationTurnModel",
|
|
"SummaryModel",
|
|
"TagModel",
|
|
"TaskModel",
|
|
"UserModel",
|
|
"UserPreferencesModel",
|
|
"WebhookConfigModel",
|
|
"WebhookDeliveryModel",
|
|
"WordTimingModel",
|
|
"WorkspaceMembershipModel",
|
|
"WorkspaceModel",
|
|
# ORM relationship back_populates names
|
|
"workspace",
|
|
"memberships",
|
|
"integration",
|
|
"meeting_tags",
|
|
"tasks",
|
|
# Foreign key references
|
|
"noteflow.workspaces.id",
|
|
"noteflow.users.id",
|
|
"noteflow.integrations.id",
|
|
# Database ondelete actions
|
|
"SET NULL",
|
|
"RESTRICT",
|
|
# Column names used in mappings
|
|
"metadata",
|
|
"workspace_id",
|
|
# Database URL prefixes
|
|
"postgres://",
|
|
"postgresql://",
|
|
"postgresql+asyncpg://",
|
|
# OIDC standard claim names (RFC 7519 / OpenID Connect Core spec)
|
|
"sub",
|
|
"email",
|
|
"email_verified",
|
|
"preferred_username",
|
|
"groups",
|
|
"picture",
|
|
"given_name",
|
|
"family_name",
|
|
"openid",
|
|
"profile",
|
|
"offline_access",
|
|
# OIDC discovery document fields (OpenID Connect Discovery spec)
|
|
"issuer",
|
|
"authorization_endpoint",
|
|
"token_endpoint",
|
|
"userinfo_endpoint",
|
|
"jwks_uri",
|
|
"end_session_endpoint",
|
|
"revocation_endpoint",
|
|
"introspection_endpoint",
|
|
"scopes_supported",
|
|
"code_challenge_methods_supported",
|
|
"claims_supported",
|
|
"response_types_supported",
|
|
# OIDC provider config fields
|
|
"discovery",
|
|
"discovery_refreshed_at",
|
|
"issuer_url",
|
|
"client_id",
|
|
"client_secret",
|
|
"claim_mapping",
|
|
"scopes",
|
|
"preset",
|
|
"require_email_verified",
|
|
"allowed_groups",
|
|
"enabled",
|
|
# Integration status values
|
|
"success",
|
|
"error",
|
|
"calendar",
|
|
"provider",
|
|
# Common error message fragments
|
|
" not found",
|
|
# HTML markup
|
|
"<li>",
|
|
"</li>",
|
|
# Logging levels
|
|
"INFO",
|
|
"DEBUG",
|
|
"WARNING",
|
|
"ERROR",
|
|
# Domain terms
|
|
"project",
|
|
# Internal attribute names (used in multiple gRPC handlers)
|
|
"_pending_chunks",
|
|
# Sentinel UUIDs
|
|
"00000000-0000-0000-0000-000000000001",
|
|
# Repository type names (used in TYPE_CHECKING imports and annotations)
|
|
"MeetingRepository",
|
|
"SegmentRepository",
|
|
"SummaryRepository",
|
|
"AnnotationRepository",
|
|
}
|
|
|
|
|
|
def find_python_files(root: Path, exclude_migrations: bool = False) -> list[Path]:
|
|
"""Find Python source files."""
|
|
excluded = {"*_pb2.py", "*_pb2_grpc.py", "*_pb2.pyi"}
|
|
excluded_dirs: set[str] = {"migrations"} if exclude_migrations else set()
|
|
|
|
files: list[Path] = []
|
|
for py_file in root.rglob("*.py"):
|
|
if ".venv" in py_file.parts or "__pycache__" in py_file.parts:
|
|
continue
|
|
if "test" in py_file.parts or "conftest" in py_file.name:
|
|
continue
|
|
if any(py_file.match(p) for p in excluded):
|
|
continue
|
|
if not excluded_dirs or all(
|
|
d not in py_file.parts for d in excluded_dirs
|
|
):
|
|
files.append(py_file)
|
|
|
|
return files
|
|
|
|
|
|
def test_no_magic_numbers() -> None:
|
|
"""Detect hardcoded numeric values that should be constants."""
|
|
src_root = Path(__file__).parent.parent.parent / "src" / "noteflow"
|
|
|
|
magic_numbers: list[MagicValue] = []
|
|
|
|
for py_file in find_python_files(src_root):
|
|
source = py_file.read_text(encoding="utf-8")
|
|
try:
|
|
tree = ast.parse(source)
|
|
except SyntaxError:
|
|
continue
|
|
|
|
for node in ast.walk(tree):
|
|
if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)):
|
|
if node.value not in ALLOWED_NUMBERS:
|
|
if abs(float(node.value)) > 2.0 or isinstance(node.value, float):
|
|
parent = None
|
|
for parent_node in ast.walk(tree):
|
|
for child in ast.iter_child_nodes(parent_node):
|
|
if child is node:
|
|
parent = parent_node
|
|
break
|
|
|
|
if parent and isinstance(parent, ast.Assign):
|
|
targets = [
|
|
t.id for t in parent.targets
|
|
if isinstance(t, ast.Name)
|
|
]
|
|
if any(t.isupper() for t in targets):
|
|
continue
|
|
|
|
magic_numbers.append(
|
|
MagicValue(
|
|
value=node.value,
|
|
file_path=py_file,
|
|
line_number=node.lineno,
|
|
context="numeric literal",
|
|
)
|
|
)
|
|
|
|
occurrences: dict[object, list[MagicValue]] = defaultdict(list)
|
|
for mv in magic_numbers:
|
|
occurrences[mv.value].append(mv)
|
|
|
|
repeated = [
|
|
(value, mvs) for value, mvs in occurrences.items()
|
|
if len(mvs) > 2
|
|
]
|
|
|
|
violations = [
|
|
f"Magic number {value} used {len(mvs)} times:\n"
|
|
+ "\n".join(f" {mv.file_path}:{mv.line_number}" for mv in mvs[:3])
|
|
for value, mvs in repeated
|
|
]
|
|
|
|
# Target: 11 repeated magic numbers max - common values need named constants:
|
|
# - 10 (20x), 1024 (14x), 5 (13x), 50 (12x), 40, 24, 300, 10000, 500 should be constants
|
|
# Note: 40 (model display width), 24 (hours), 300 (timeouts), 10000/500 (http codes) are repeated
|
|
assert len(violations) <= 11, (
|
|
f"Found {len(violations)} repeated magic numbers (max 11 allowed). "
|
|
"Consider extracting to constants:\n\n" + "\n\n".join(violations[:5])
|
|
)
|
|
|
|
|
|
def test_no_repeated_string_literals() -> None:
|
|
"""Detect repeated string literals that should be constants.
|
|
|
|
Note: Excludes migration files as they are standalone scripts with
|
|
intentional repetition of table/column names.
|
|
"""
|
|
src_root = Path(__file__).parent.parent.parent / "src" / "noteflow"
|
|
|
|
string_occurrences: dict[str, list[tuple[Path, int]]] = defaultdict(list)
|
|
|
|
for py_file in find_python_files(src_root, exclude_migrations=True):
|
|
source = py_file.read_text(encoding="utf-8")
|
|
try:
|
|
tree = ast.parse(source)
|
|
except SyntaxError:
|
|
continue
|
|
|
|
for node in ast.walk(tree):
|
|
if isinstance(node, ast.Constant) and isinstance(node.value, str):
|
|
value = node.value
|
|
if value not in ALLOWED_STRINGS and len(value) >= 4:
|
|
# Skip docstrings, SQL, format strings, and common patterns
|
|
skip_prefixes = ("%", "{", "SELECT", "INSERT", "UPDATE", "DELETE FROM")
|
|
if not any(value.startswith(p) for p in skip_prefixes):
|
|
# Skip docstrings and common repeated phrases
|
|
if not (value.endswith(".") or value.endswith(":") or "\n" in value):
|
|
string_occurrences[value].append((py_file, node.lineno))
|
|
|
|
repeated = [
|
|
(value, locs) for value, locs in string_occurrences.items()
|
|
if len(locs) > 2
|
|
]
|
|
|
|
violations = [
|
|
f"String '{value[:50]}' repeated {len(locs)} times:\n"
|
|
+ "\n".join(f" {f}:{line}" for f, line in locs[:3])
|
|
for value, locs in repeated
|
|
]
|
|
|
|
# Target: 31 repeated strings max - many can be extracted to constants
|
|
# - Error messages, schema names, log formats should be centralized
|
|
assert len(violations) <= 31, (
|
|
f"Found {len(violations)} repeated string literals (max 31 allowed). "
|
|
"Consider using constants or enums:\n\n" + "\n\n".join(violations[:5])
|
|
)
|
|
|
|
|
|
def test_no_hardcoded_paths() -> None:
|
|
"""Detect hardcoded file paths that should be configurable."""
|
|
src_root = Path(__file__).parent.parent.parent / "src" / "noteflow"
|
|
|
|
path_patterns = [
|
|
r'["\'][A-Za-z]:\\',
|
|
r'["\']\/(?:home|usr|var|etc|opt|tmp)\/\w+',
|
|
r'["\']\.\.\/\w+',
|
|
r'["\']~\/\w+',
|
|
]
|
|
|
|
violations: list[str] = []
|
|
|
|
for py_file in find_python_files(src_root):
|
|
lines = py_file.read_text(encoding="utf-8").splitlines()
|
|
|
|
for i, line in enumerate(lines, start=1):
|
|
for pattern in path_patterns:
|
|
if match := re.search(pattern, line):
|
|
# Skip if "test" appears in the line (test data)
|
|
if "test" in line.lower():
|
|
continue
|
|
# Skip if the path appears AFTER a # comment
|
|
# (i.e., the comment marker is before the match)
|
|
comment_pos = line.find("#")
|
|
if comment_pos != -1 and comment_pos < match.start():
|
|
continue
|
|
violations.append(f"{py_file}:{i}: hardcoded path detected")
|
|
break
|
|
|
|
assert not violations, (
|
|
f"Found {len(violations)} hardcoded paths:\n" + "\n".join(violations[:10])
|
|
)
|
|
|
|
|
|
def test_no_hardcoded_credentials() -> None:
|
|
"""Detect potential hardcoded credentials or secrets."""
|
|
src_root = Path(__file__).parent.parent.parent / "src" / "noteflow"
|
|
|
|
credential_patterns = [
|
|
(r'(?:password|passwd|pwd)\s*=\s*["\'][^"\']+["\']', "password"),
|
|
(r'(?:api_?key|apikey)\s*=\s*["\'][^"\']+["\']', "API key"),
|
|
(r'(?:secret|token)\s*=\s*["\'][a-zA-Z0-9]{20,}["\']', "secret/token"),
|
|
(r'Bearer\s+[a-zA-Z0-9_\-\.]+', "bearer token"),
|
|
]
|
|
|
|
violations: list[str] = []
|
|
|
|
for py_file in find_python_files(src_root):
|
|
content = py_file.read_text(encoding="utf-8")
|
|
lines = content.splitlines()
|
|
|
|
for i, line in enumerate(lines, start=1):
|
|
lower_line = line.lower()
|
|
for pattern, cred_type in credential_patterns:
|
|
if re.search(pattern, lower_line, re.IGNORECASE):
|
|
if "example" not in lower_line and "test" not in lower_line:
|
|
violations.append(
|
|
f"{py_file}:{i}: potential hardcoded {cred_type}"
|
|
)
|
|
|
|
assert not violations, (
|
|
f"Found {len(violations)} potential hardcoded credentials:\n"
|
|
+ "\n".join(violations)
|
|
)
|