Files
noteflow/src/noteflow/grpc/_mixins/converters/_oidc.py
2026-01-02 10:11:45 +00:00

99 lines
3.8 KiB
Python

"""OIDC-related converters for gRPC service."""
from __future__ import annotations
from typing import Protocol, cast
from noteflow.domain.auth.oidc import ClaimMapping, OidcProviderConfig
from ...proto import noteflow_pb2
def claim_mapping_to_proto(mapping: ClaimMapping) -> noteflow_pb2.ClaimMappingProto:
"""Convert domain ClaimMapping to proto message."""
return noteflow_pb2.ClaimMappingProto(
subject_claim=mapping.subject_claim,
email_claim=mapping.email_claim,
email_verified_claim=mapping.email_verified_claim,
name_claim=mapping.name_claim,
preferred_username_claim=mapping.preferred_username_claim,
groups_claim=mapping.groups_claim,
picture_claim=mapping.picture_claim,
first_name_claim=mapping.first_name_claim or "",
last_name_claim=mapping.last_name_claim or "",
phone_claim=mapping.phone_claim or "",
)
def proto_to_claim_mapping(proto: noteflow_pb2.ClaimMappingProto) -> ClaimMapping:
"""Convert proto ClaimMappingProto to domain ClaimMapping."""
return ClaimMapping(
subject_claim=proto.subject_claim or "sub",
email_claim=proto.email_claim or "email",
email_verified_claim=proto.email_verified_claim or "email_verified",
name_claim=proto.name_claim or "name",
preferred_username_claim=proto.preferred_username_claim or "preferred_username",
groups_claim=proto.groups_claim or "groups",
picture_claim=proto.picture_claim or "picture",
first_name_claim=proto.first_name_claim or None,
last_name_claim=proto.last_name_claim or None,
phone_claim=proto.phone_claim or None,
)
def discovery_to_proto(
provider: OidcProviderConfig,
) -> noteflow_pb2.OidcDiscoveryProto | None:
"""Convert domain OidcDiscoveryConfig to proto message."""
if provider.discovery is None:
return None
discovery = provider.discovery
return noteflow_pb2.OidcDiscoveryProto(
issuer=discovery.issuer,
authorization_endpoint=discovery.authorization_endpoint,
token_endpoint=discovery.token_endpoint,
userinfo_endpoint=discovery.userinfo_endpoint or "",
jwks_uri=discovery.jwks_uri or "",
end_session_endpoint=discovery.end_session_endpoint or "",
revocation_endpoint=discovery.revocation_endpoint or "",
scopes_supported=list(discovery.scopes_supported),
claims_supported=list(discovery.claims_supported),
supports_pkce=discovery.supports_pkce(),
)
def oidc_provider_to_proto(
provider: OidcProviderConfig,
warnings: list[str] | None = None,
) -> noteflow_pb2.OidcProviderProto:
"""Convert domain OidcProviderConfig to proto message."""
discovery_proto = discovery_to_proto(provider)
proto = noteflow_pb2.OidcProviderProto(
id=str(provider.id),
workspace_id=str(provider.workspace_id),
name=provider.name,
preset=provider.preset.value,
issuer_url=provider.issuer_url,
client_id=provider.client_id,
enabled=provider.enabled,
claim_mapping=claim_mapping_to_proto(provider.claim_mapping),
scopes=list(provider.scopes),
require_email_verified=provider.require_email_verified,
allowed_groups=list(provider.allowed_groups),
created_at=int(provider.created_at.timestamp()),
updated_at=int(provider.updated_at.timestamp()),
warnings=warnings or [],
)
if discovery_proto is not None:
discovery_field = cast(_Copyable, proto.discovery)
discovery_field.CopyFrom(cast(_Copyable, discovery_proto))
if provider.discovery_refreshed_at is not None:
proto.discovery_refreshed_at = int(provider.discovery_refreshed_at.timestamp())
return proto
class _Copyable(Protocol):
def CopyFrom(self, other: "_Copyable") -> None: ...