Files
rag-manager/typings/prefect.pyi
2025-09-21 01:38:47 +00:00

113 lines
3.6 KiB
Python

"""Type stubs for Prefect to fix missing type information."""
from collections.abc import Awaitable, Callable
from typing import Any, TypeVar, overload
from typing_extensions import ParamSpec
# Prefect-specific types for cache key functions
class TaskRunContext:
"""Prefect task run context."""
pass
P = ParamSpec("P")
T = TypeVar("T")
@overload
def task(
*,
name: str | None = None,
description: str | None = None,
tags: list[str] | None = None,
version: str | None = None,
cache_key_fn: Callable[[TaskRunContext, dict[str, Any]], str] | None = None,
cache_expiration: Any = None,
task_run_name: Any = None,
retries: int = 0,
retry_delay_seconds: float | int | list[float] | None = None,
retry_jitter_factor: float | None = None,
persist_result: bool | None = None,
result_storage: Any = None,
result_storage_key: str | None = None,
result_serializer: Any = None,
cache_result_in_memory: bool = True,
timeout_seconds: int | float | None = None,
log_prints: bool | None = None,
refresh_cache: bool | None = None,
on_completion: list[Any] | None = None,
on_failure: list[Any] | None = None,
retry_condition_fn: Any = None,
viz_return_value: Any = None,
) -> Callable[[Callable[P, Awaitable[T]]], Callable[P, Awaitable[T]]]: ...
@overload
def task(
*,
name: str | None = None,
description: str | None = None,
tags: list[str] | None = None,
version: str | None = None,
cache_key_fn: Callable[[TaskRunContext, dict[str, Any]], str] | None = None,
cache_expiration: Any = None,
task_run_name: Any = None,
retries: int = 0,
retry_delay_seconds: float | int | list[float] | None = None,
retry_jitter_factor: float | None = None,
persist_result: bool | None = None,
result_storage: Any = None,
result_storage_key: str | None = None,
result_serializer: Any = None,
cache_result_in_memory: bool = True,
timeout_seconds: int | float | None = None,
log_prints: bool | None = None,
refresh_cache: bool | None = None,
on_completion: list[Any] | None = None,
on_failure: list[Any] | None = None,
retry_condition_fn: Any = None,
viz_return_value: Any = None,
) -> Callable[[Callable[P, T]], Callable[P, T]]: ...
@overload
def task(
fn: Callable[P, Awaitable[T]],
) -> Callable[P, Awaitable[T]]: ...
@overload
def task(
fn: Callable[P, T],
) -> Callable[P, T]: ...
@overload
def flow(
*,
name: str | None = None,
description: str | None = None,
version: str | None = None,
flow_run_name: Any = None,
task_runner: Any = None,
timeout_seconds: int | float | None = None,
validate_parameters: bool = True,
retries: int = 0,
retry_delay_seconds: float | int | list[float] | None = None,
persist_result: bool | None = None,
result_storage: Any = None,
result_storage_key: str | None = None,
result_serializer: Any = None,
cache_result_in_memory: bool = True,
log_prints: bool | None = None,
on_completion: list[Any] | None = None,
on_failure: list[Any] | None = None,
on_crashed: list[Any] | None = None,
on_cancellation: list[Any] | None = None,
on_running: list[Any] | None = None,
) -> Callable[[Callable[P, Awaitable[T]]], Callable[P, Awaitable[T]]]: ...
@overload
def flow(
fn: Callable[P, Awaitable[T]],
) -> Callable[P, Awaitable[T]]: ...
class Logger:
"""Logger interface stub."""
def info(self, msg: str, *args: Any, **kwargs: Any) -> None: ...
def warning(self, msg: str, *args: Any, **kwargs: Any) -> None: ...
def error(self, msg: str, *args: Any, **kwargs: Any) -> None: ...
def get_run_logger() -> Logger: ...