diff --git a/backend/backend/constants.py b/backend/backend/constants.py index 53126ceb..26d944d9 100644 --- a/backend/backend/constants.py +++ b/backend/backend/constants.py @@ -34,4 +34,3 @@ class FeatureFlag: """Temporary feature flags.""" APP_DEPLOYMENT = "app_deployment" - REMOTE_FILE_STORAGE = "remote_file_storage" diff --git a/backend/backend/settings/base.py b/backend/backend/settings/base.py index 76ef114a..94db8aa2 100644 --- a/backend/backend/settings/base.py +++ b/backend/backend/settings/base.py @@ -117,8 +117,6 @@ X2TEXT_PORT = os.environ.get("X2TEXT_PORT", 3004) STRUCTURE_TOOL_IMAGE_URL = get_required_setting("STRUCTURE_TOOL_IMAGE_URL") STRUCTURE_TOOL_IMAGE_NAME = get_required_setting("STRUCTURE_TOOL_IMAGE_NAME") STRUCTURE_TOOL_IMAGE_TAG = get_required_setting("STRUCTURE_TOOL_IMAGE_TAG") -WORKFLOW_DATA_DIR = os.environ.get("WORKFLOW_DATA_DIR") -API_STORAGE_DIR = os.environ.get("API_STORAGE_DIR") CACHE_TTL_SEC = os.environ.get("CACHE_TTL_SEC", 10800) DEFAULT_AUTH_USERNAME = os.environ.get("DEFAULT_AUTH_USERNAME", "unstract") diff --git a/backend/prompt_studio/prompt_studio_core_v2/models.py b/backend/prompt_studio/prompt_studio_core_v2/models.py index c4a71cf1..3a6cc9da 100644 --- a/backend/prompt_studio/prompt_studio_core_v2/models.py +++ b/backend/prompt_studio/prompt_studio_core_v2/models.py @@ -1,5 +1,4 @@ import logging -import shutil import uuid from typing import Any @@ -7,7 +6,6 @@ from account_v2.models import User from adapter_processor_v2.models import AdapterInstance from django.db import models from django.db.models import QuerySet -from file_management.file_management_helper import FileManagerHelper from prompt_studio.prompt_studio_core_v2.constants import DefaultPrompts from unstract.sdk.file_storage.constants import StorageType from unstract.sdk.file_storage.env_helper import EnvHelper @@ -19,9 +17,6 @@ from utils.models.organization_mixin import ( DefaultOrganizationMixin, ) -from backend.constants import FeatureFlag -from unstract.flags.feature_flag import check_feature_flag_status - logger = logging.getLogger(__name__) @@ -140,38 +135,22 @@ class CustomTool(DefaultOrganizationMixin, BaseModel): def delete(self, organization_id=None, *args, **kwargs): # Delete the documents associated with the tool - if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - file_path = FileManagerHelper.handle_sub_directory_for_tenants( - organization_id, - is_create=False, - user_id=self.created_by.user_id, - tool_id=str(self.tool_id), - ) - if organization_id: - try: - shutil.rmtree(file_path) - except FileNotFoundError: - logger.error(f"The folder {file_path} does not exist.") - except OSError as e: - logger.error(f"Error: {file_path} : {e.strerror}") - # Continue with the deletion of the tool - else: - fs_instance = EnvHelper.get_storage( - storage_type=StorageType.PERMANENT, - env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE, - ) - file_path = PromptStudioFileHelper.get_or_create_prompt_studio_subdirectory( - organization_id, - is_create=False, - user_id=self.created_by.user_id, - tool_id=str(self.tool_id), - ) - try: - fs_instance.rm(file_path, True) - except FileNotFoundError: - # Supressed to handle cases when the remote - # file is missing or already deleted - pass + fs_instance = EnvHelper.get_storage( + storage_type=StorageType.PERMANENT, + env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE, + ) + file_path = PromptStudioFileHelper.get_or_create_prompt_studio_subdirectory( + organization_id, + is_create=False, + user_id=self.created_by.user_id, + tool_id=str(self.tool_id), + ) + try: + fs_instance.rm(file_path, True) + except FileNotFoundError: + # Supressed to handle cases when the remote + # file is missing or already deleted + pass super().delete(*args, **kwargs) class Meta: diff --git a/backend/prompt_studio/prompt_studio_core_v2/prompt_ide_base_tool.py b/backend/prompt_studio/prompt_studio_core_v2/prompt_ide_base_tool.py index 2ce14016..43b4b109 100644 --- a/backend/prompt_studio/prompt_studio_core_v2/prompt_ide_base_tool.py +++ b/backend/prompt_studio/prompt_studio_core_v2/prompt_ide_base_tool.py @@ -3,15 +3,10 @@ import os from platform_settings_v2.platform_auth_service import PlatformAuthenticationService from prompt_studio.prompt_studio_core_v2.constants import ToolStudioKeys from unstract.sdk.constants import LogLevel +from unstract.sdk.file_storage.constants import StorageType +from unstract.sdk.file_storage.env_helper import EnvHelper from unstract.sdk.tool.stream import StreamMixin - -from backend.constants import FeatureFlag -from unstract.flags.feature_flag import check_feature_flag_status - -if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - from unstract.sdk.file_storage.constants import StorageType - from unstract.sdk.file_storage.env_helper import EnvHelper - from utils.file_storage.constants import FileStorageKeys +from utils.file_storage.constants import FileStorageKeys class PromptIdeBaseTool(StreamMixin): @@ -24,12 +19,10 @@ class PromptIdeBaseTool(StreamMixin): """ self.log_level = log_level self.org_id = org_id - self.workflow_filestorage = None - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - self.workflow_filestorage = EnvHelper.get_storage( - storage_type=StorageType.PERMANENT, - env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE, - ) + self.workflow_filestorage = EnvHelper.get_storage( + storage_type=StorageType.PERMANENT, + env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE, + ) super().__init__(log_level=log_level) diff --git a/backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py b/backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py index 3c604422..06a2fadb 100644 --- a/backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py +++ b/backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py @@ -12,7 +12,6 @@ from adapter_processor_v2.constants import AdapterKeys from adapter_processor_v2.models import AdapterInstance from django.conf import settings from django.db.models.manager import BaseManager -from file_management.file_management_helper import FileManagerHelper from prompt_studio.modifier_loader import ModifierConfig from prompt_studio.modifier_loader import load_plugins as load_modifier_plugins from prompt_studio.prompt_profile_manager_v2.models import ProfileManager @@ -60,14 +59,11 @@ from unstract.sdk.file_storage.constants import StorageType from unstract.sdk.file_storage.env_helper import EnvHelper from unstract.sdk.index import Index from unstract.sdk.prompt import PromptTool -from unstract.sdk.utils.tool_utils import ToolUtils from utils.file_storage.constants import FileStorageKeys from utils.file_storage.helpers.prompt_studio_file_helper import PromptStudioFileHelper from utils.local_context import StateStore -from backend.constants import FeatureFlag from unstract.core.pubsub_helper import LogPublisher -from unstract.flags.feature_flag import check_feature_flag_status CHOICES_JSON = "/static/select_choices.json" ERROR_MSG = "User %s doesn't have access to adapter %s" @@ -343,22 +339,12 @@ class PromptStudioHelper: file_path = file_name else: default_profile = ProfileManager.get_default_llm_profile(tool) - if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - file_path = FileManagerHelper.handle_sub_directory_for_tenants( - org_id, - is_create=False, - user_id=user_id, - tool_id=tool_id, - ) - else: - file_path = ( - PromptStudioFileHelper.get_or_create_prompt_studio_subdirectory( - org_id, - is_create=False, - user_id=user_id, - tool_id=tool_id, - ) - ) + file_path = PromptStudioFileHelper.get_or_create_prompt_studio_subdirectory( + org_id, + is_create=False, + user_id=user_id, + tool_id=tool_id, + ) file_path = str(Path(file_path) / file_name) if not tool: @@ -382,37 +368,24 @@ class PromptStudioHelper: process_text = None if text_processor: process_text = text_processor.process - if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - doc_id = PromptStudioHelper.dynamic_indexer( - profile_manager=default_profile, - tool_id=tool_id, - file_path=file_path, - org_id=org_id, - document_id=document_id, - is_summary=is_summary, - reindex=True, - run_id=run_id, - user_id=user_id, - process_text=process_text, - ) - else: - fs_instance = EnvHelper.get_storage( - storage_type=StorageType.PERMANENT, - env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE, - ) - doc_id = PromptStudioHelper.dynamic_indexer( - profile_manager=default_profile, - tool_id=tool_id, - file_path=file_path, - org_id=org_id, - document_id=document_id, - is_summary=is_summary, - reindex=True, - run_id=run_id, - user_id=user_id, - process_text=process_text, - fs=fs_instance, - ) + + fs_instance = EnvHelper.get_storage( + storage_type=StorageType.PERMANENT, + env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE, + ) + doc_id = PromptStudioHelper.dynamic_indexer( + profile_manager=default_profile, + tool_id=tool_id, + file_path=file_path, + org_id=org_id, + document_id=document_id, + is_summary=is_summary, + reindex=True, + run_id=run_id, + user_id=user_id, + process_text=process_text, + fs=fs_instance, + ) elapsed_time = time.time() - start_time logger.info( @@ -655,40 +628,24 @@ class PromptStudioHelper: @staticmethod def _get_document_path(org_id, user_id, tool_id, doc_name): - if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - doc_path = FileManagerHelper.handle_sub_directory_for_tenants( - org_id=org_id, - user_id=user_id, - tool_id=tool_id, - is_create=False, - ) - else: - doc_path = PromptStudioFileHelper.get_or_create_prompt_studio_subdirectory( - org_id=org_id, - user_id=user_id, - tool_id=tool_id, - is_create=False, - ) + doc_path = PromptStudioFileHelper.get_or_create_prompt_studio_subdirectory( + org_id=org_id, + user_id=user_id, + tool_id=tool_id, + is_create=False, + ) return str(Path(doc_path) / doc_name) @staticmethod def _get_extract_or_summary_document_path( org_id, user_id, tool_id, doc_name, doc_type ) -> str: - if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - doc_path = FileManagerHelper.handle_sub_directory_for_tenants( - org_id=org_id, - user_id=user_id, - tool_id=tool_id, - is_create=False, - ) - else: - doc_path = PromptStudioFileHelper.get_or_create_prompt_studio_subdirectory( - org_id=org_id, - user_id=user_id, - tool_id=tool_id, - is_create=False, - ) + doc_path = PromptStudioFileHelper.get_or_create_prompt_studio_subdirectory( + org_id=org_id, + user_id=user_id, + tool_id=tool_id, + is_create=False, + ) extracted_doc_name = Path(doc_name).stem + TSPKeys.TXT_EXTENTION return str(Path(doc_path) / doc_type / extracted_doc_name) @@ -789,35 +746,22 @@ class PromptStudioHelper: x2text = str(profile_manager.x2text.id) if not profile_manager: raise DefaultProfileError() - if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - index_result = PromptStudioHelper.dynamic_indexer( - profile_manager=profile_manager, - file_path=doc_path, - tool_id=str(tool.tool_id), - org_id=org_id, - document_id=document_id, - is_summary=tool.summarize_as_source, - run_id=run_id, - user_id=user_id, - process_text=process_text, - ) - else: - fs_instance = EnvHelper.get_storage( - storage_type=StorageType.PERMANENT, - env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE, - ) - index_result = PromptStudioHelper.dynamic_indexer( - profile_manager=profile_manager, - file_path=doc_path, - tool_id=str(tool.tool_id), - org_id=org_id, - document_id=document_id, - is_summary=tool.summarize_as_source, - run_id=run_id, - user_id=user_id, - process_text=process_text, - fs=fs_instance, - ) + fs_instance = EnvHelper.get_storage( + storage_type=StorageType.PERMANENT, + env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE, + ) + index_result = PromptStudioHelper.dynamic_indexer( + profile_manager=profile_manager, + file_path=doc_path, + tool_id=str(tool.tool_id), + org_id=org_id, + document_id=document_id, + is_summary=tool.summarize_as_source, + run_id=run_id, + user_id=user_id, + process_text=process_text, + fs=fs_instance, + ) if index_result.get("status") == IndexingStatus.PENDING_STATUS.value: return { "status": IndexingStatus.PENDING_STATUS.value, @@ -888,10 +832,7 @@ class PromptStudioHelper: tool_settings[TSPKeys.PLATFORM_POSTAMBLE] = getattr( settings, TSPKeys.PLATFORM_POSTAMBLE.upper(), "" ) - if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - file_hash = ToolUtils.get_hash_from_file(file_path=doc_path) - else: - file_hash = fs_instance.get_hash_from_file(path=doc_path) + file_hash = fs_instance.get_hash_from_file(path=doc_path) payload = { TSPKeys.TOOL_SETTINGS: tool_settings, @@ -1012,27 +953,16 @@ class PromptStudioHelper: usage_kwargs["file_name"] = filename util = PromptIdeBaseTool(log_level=LogLevel.INFO, org_id=org_id) tool_index = Index(tool=util) - if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - doc_id_key = tool_index.generate_index_key( - vector_db=vector_db, - embedding=embedding_model, - x2text=x2text_adapter, - chunk_size=str(profile_manager.chunk_size), - chunk_overlap=str(profile_manager.chunk_overlap), - file_path=file_path, - file_hash=None, - ) - else: - doc_id_key = tool_index.generate_index_key( - vector_db=vector_db, - embedding=embedding_model, - x2text=x2text_adapter, - chunk_size=str(profile_manager.chunk_size), - chunk_overlap=str(profile_manager.chunk_overlap), - file_path=file_path, - file_hash=None, - fs=fs, - ) + doc_id_key = tool_index.generate_index_key( + vector_db=vector_db, + embedding=embedding_model, + x2text=x2text_adapter, + chunk_size=str(profile_manager.chunk_size), + chunk_overlap=str(profile_manager.chunk_overlap), + file_path=file_path, + file_hash=None, + fs=fs, + ) if not reindex: indexed_doc_id = DocumentIndexingService.get_indexed_document_id( org_id=org_id, user_id=user_id, doc_id_key=doc_id_key @@ -1055,35 +985,20 @@ class PromptStudioHelper: DocumentIndexingService.set_document_indexing( org_id=org_id, user_id=user_id, doc_id_key=doc_id_key ) - if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - doc_id: str = tool_index.index( - tool_id=tool_id, - embedding_instance_id=embedding_model, - vector_db_instance_id=vector_db, - x2text_instance_id=x2text_adapter, - file_path=file_path, - chunk_size=profile_manager.chunk_size, - chunk_overlap=profile_manager.chunk_overlap, - reindex=reindex, - output_file_path=extract_file_path, - usage_kwargs=usage_kwargs.copy(), - process_text=process_text, - ) - else: - doc_id: str = tool_index.index( - tool_id=tool_id, - embedding_instance_id=embedding_model, - vector_db_instance_id=vector_db, - x2text_instance_id=x2text_adapter, - file_path=file_path, - chunk_size=profile_manager.chunk_size, - chunk_overlap=profile_manager.chunk_overlap, - reindex=reindex, - output_file_path=extract_file_path, - usage_kwargs=usage_kwargs.copy(), - process_text=process_text, - fs=fs, - ) + doc_id: str = tool_index.index( + tool_id=tool_id, + embedding_instance_id=embedding_model, + vector_db_instance_id=vector_db, + x2text_instance_id=x2text_adapter, + file_path=file_path, + chunk_size=profile_manager.chunk_size, + chunk_overlap=profile_manager.chunk_overlap, + reindex=reindex, + output_file_path=extract_file_path, + usage_kwargs=usage_kwargs.copy(), + process_text=process_text, + fs=fs, + ) PromptStudioIndexHelper.handle_index_manager( document_id=document_id, @@ -1145,35 +1060,22 @@ class PromptStudioHelper: if not default_profile: raise DefaultProfileError() - if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - index_result = PromptStudioHelper.dynamic_indexer( - profile_manager=default_profile, - file_path=file_path, - tool_id=tool_id, - org_id=org_id, - is_summary=tool.summarize_as_source, - document_id=document_id, - run_id=run_id, - user_id=user_id, - process_text=process_text, - ) - else: - fs_instance = EnvHelper.get_storage( - storage_type=StorageType.PERMANENT, - env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE, - ) - index_result = PromptStudioHelper.dynamic_indexer( - profile_manager=default_profile, - file_path=file_path, - tool_id=tool_id, - org_id=org_id, - is_summary=tool.summarize_as_source, - document_id=document_id, - run_id=run_id, - user_id=user_id, - process_text=process_text, - fs=fs_instance, - ) + fs_instance = EnvHelper.get_storage( + storage_type=StorageType.PERMANENT, + env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE, + ) + index_result = PromptStudioHelper.dynamic_indexer( + profile_manager=default_profile, + file_path=file_path, + tool_id=tool_id, + org_id=org_id, + is_summary=tool.summarize_as_source, + document_id=document_id, + run_id=run_id, + user_id=user_id, + process_text=process_text, + fs=fs_instance, + ) if index_result.get("status") == IndexingStatus.PENDING_STATUS.value: return { "status": IndexingStatus.PENDING_STATUS.value, @@ -1214,10 +1116,7 @@ class PromptStudioHelper: if tool.summarize_as_source: path = Path(file_path) file_path = str(path.parent / TSPKeys.SUMMARIZE / (path.stem + ".txt")) - if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - file_hash = ToolUtils.get_hash_from_file(file_path=file_path) - else: - file_hash = fs_instance.get_hash_from_file(path=file_path) + file_hash = fs_instance.get_hash_from_file(path=file_path) payload = { TSPKeys.TOOL_SETTINGS: tool_settings, diff --git a/backend/prompt_studio/prompt_studio_core_v2/views.py b/backend/prompt_studio/prompt_studio_core_v2/views.py index c33d6ac9..fe3ad9f1 100644 --- a/backend/prompt_studio/prompt_studio_core_v2/views.py +++ b/backend/prompt_studio/prompt_studio_core_v2/views.py @@ -8,7 +8,6 @@ from django.db.models import QuerySet from django.http import HttpRequest from file_management.constants import FileInformationKey as FileKey from file_management.exceptions import FileNotFound -from file_management.file_management_helper import FileManagerHelper from permissions.permission import IsOwner, IsOwnerOrSharedUser from prompt_studio.processor_loader import get_plugin_class_by_name, load_plugins from prompt_studio.prompt_profile_manager_v2.constants import ( @@ -57,10 +56,6 @@ from unstract.sdk.utils.common_utils import CommonUtils from utils.file_storage.helpers.prompt_studio_file_helper import PromptStudioFileHelper from utils.user_session import UserSessionUtils -from backend.constants import FeatureFlag -from unstract.connectors.filesystems.local_storage.local_storage import LocalStorageFS -from unstract.flags.feature_flag import check_feature_flag_status - from .models import CustomTool from .serializers import ( CustomToolSerializer, @@ -423,49 +418,16 @@ class PromptStudioCoreView(viewsets.ModelViewSet): f"{FileViewTypes.SUMMARIZE.lower()}/" f"{filename_without_extension}.txt" ) - - if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - - file_path = file_path = FileManagerHelper.handle_sub_directory_for_tenants( - UserSessionUtils.get_organization_id(request), - is_create=True, + try: + contents = PromptStudioFileHelper.fetch_file_contents( + file_name=file_name, + org_id=UserSessionUtils.get_organization_id(request), user_id=custom_tool.created_by.user_id, tool_id=str(custom_tool.tool_id), + allowed_content_types=allowed_content_types, ) - file_system = LocalStorageFS(settings={"path": file_path}) - if not file_path.endswith("/"): - file_path += "/" - file_path += file_name - # Temporary Hack for frictionless onboarding as the user id will be empty - try: - contents = FileManagerHelper.fetch_file_contents( - file_system, file_path, allowed_content_types - ) - except FileNotFound: - file_path = file_path = ( - FileManagerHelper.handle_sub_directory_for_tenants( - UserSessionUtils.get_organization_id(request), - is_create=True, - user_id="", - tool_id=str(custom_tool.tool_id), - ) - ) - if not file_path.endswith("/"): - file_path += "/" - file_path += file_name - contents = FileManagerHelper.fetch_file_contents( - file_system, file_path, allowed_content_types - ) - else: - try: - contents = PromptStudioFileHelper.fetch_file_contents( - file_name=file_name, - org_id=UserSessionUtils.get_organization_id(request), - user_id=custom_tool.created_by.user_id, - tool_id=str(custom_tool.tool_id), - ) - except FileNotFoundError: - raise FileNotFound() + except FileNotFoundError: + raise FileNotFound() return Response({"data": contents}, status=status.HTTP_200_OK) @action(detail=True, methods=["post"]) @@ -494,28 +456,14 @@ class PromptStudioCoreView(viewsets.ModelViewSet): logger.info( f"Uploading file: {file_name}" if file_name else "Uploading file" ) - if not check_feature_flag_status(flag_key=FeatureFlag.REMOTE_FILE_STORAGE): - file_path = FileManagerHelper.handle_sub_directory_for_tenants( - UserSessionUtils.get_organization_id(request), - is_create=True, - user_id=custom_tool.created_by.user_id, - tool_id=str(custom_tool.tool_id), - ) - file_system = LocalStorageFS(settings={"path": file_path}) - FileManagerHelper.upload_file( - file_system, - file_path, - file_data, - file_name, - ) - else: - PromptStudioFileHelper.upload_for_ide( - org_id=UserSessionUtils.get_organization_id(request), - user_id=custom_tool.created_by.user_id, - tool_id=str(custom_tool.tool_id), - file_name=file_name, - file_data=file_data, - ) + + PromptStudioFileHelper.upload_for_ide( + org_id=UserSessionUtils.get_organization_id(request), + user_id=custom_tool.created_by.user_id, + tool_id=str(custom_tool.tool_id), + file_name=file_name, + file_data=file_data, + ) # Create a record in the db for the file document = PromptStudioDocumentHelper.create( @@ -554,28 +502,12 @@ class PromptStudioCoreView(viewsets.ModelViewSet): document.delete() # Delete the files file_name: str = document.document_name - if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - file_path = FileManagerHelper.handle_sub_directory_for_tenants( - org_id=org_id, - is_create=False, - user_id=user_id, - tool_id=str(custom_tool.tool_id), - ) - path = file_path - file_system = LocalStorageFS(settings={"path": path}) - FileManagerHelper.delete_file(file_system, path, file_name) - # Directories to delete the text files - directories = ["extract/", "extract/metadata/", "summarize/"] - FileManagerHelper.delete_related_files( - file_system, path, file_name, directories - ) - else: - PromptStudioFileHelper.delete_for_ide( - org_id=org_id, - user_id=user_id, - tool_id=str(custom_tool.tool_id), - file_name=file_name, - ) + PromptStudioFileHelper.delete_for_ide( + org_id=org_id, + user_id=user_id, + tool_id=str(custom_tool.tool_id), + file_name=file_name, + ) return Response( {"data": "File deleted succesfully."}, status=status.HTTP_200_OK, diff --git a/backend/sample.env b/backend/sample.env index 7217fb80..569cb8dc 100644 --- a/backend/sample.env +++ b/backend/sample.env @@ -58,9 +58,6 @@ GOOGLE_STORAGE_ACCESS_KEY_ID= GOOGLE_STORAGE_SECRET_ACCESS_KEY= GOOGLE_STORAGE_BASE_URL=https://storage.googleapis.com -# API storage -API_STORAGE_DIR = "/data/api" - # Platform Service PLATFORM_SERVICE_HOST=http://unstract-platform-service PLATFORM_SERVICE_PORT=3001 @@ -69,16 +66,12 @@ PLATFORM_SERVICE_PORT=3001 UNSTRACT_RUNNER_HOST=http://unstract-runner UNSTRACT_RUNNER_PORT=5002 -# Workflow execution -WORKFLOW_DATA_DIR = "/data/execution" - # Prompt Service PROMPT_HOST=http://unstract-prompt-service PROMPT_PORT=3003 #Prompt Studio PROMPT_STUDIO_FILE_PATH=/app/prompt-studio-data -REMOTE_PROMPT_STUDIO_FILE_PATH= # Structure Tool Image (Runs prompt studio exported tools) # https://hub.docker.com/r/unstract/tool-structure @@ -156,14 +149,14 @@ API_EXECUTION_DIR_PREFIX="unstract/api" # Storage Provider for Workflow Execution # Valid options: MINIO, S3, etc.. -WORKFLOW_EXECUTION_FILE_STORAGE_CREDENTIALS='{"provider":"minio","credentials": {"endpoint_url":"http://unstract-minio:9000","key":"XXX","secret":"XXX"}}' +WORKFLOW_EXECUTION_FILE_STORAGE_CREDENTIALS='{"provider": "minio", "credentials": {"endpoint_url": "http://unstract-minio:9000", "key": "minio", "secret": "minio123"}}' # Storage Provider for API Execution -API_FILE_STORAGE_CREDENTIALS='{"provider": "minio", "credentials": {"endpoint_url": "http://unstract-minio:9000", "key": "XXX", "secret": "XXX"}}' +API_FILE_STORAGE_CREDENTIALS='{"provider": "minio", "credentials": {"endpoint_url": "http://unstract-minio:9000", "key": "minio", "secret": "minio123"}}' #Remote storage related envs -PERMANENT_REMOTE_STORAGE={"provider":"gcs","credentials":} -REMOTE_PROMPT_STUDIO_FILE_PATH="/prompt_studio_data/" +PERMANENT_REMOTE_STORAGE='{"provider": "minio", "credentials": {"endpoint_url": "http://unstract-minio:9000", "key": "minio", "secret": "minio123"}}' +REMOTE_PROMPT_STUDIO_FILE_PATH="unstract/prompt-studio-data" # Storage Provider for Tool registry TOOL_REGISTRY_STORAGE_CREDENTIALS='{"provider":"local"}' diff --git a/backend/utils/file_storage/helpers/prompt_studio_file_helper.py b/backend/utils/file_storage/helpers/prompt_studio_file_helper.py index 86fdfa7b..39c33656 100644 --- a/backend/utils/file_storage/helpers/prompt_studio_file_helper.py +++ b/backend/utils/file_storage/helpers/prompt_studio_file_helper.py @@ -4,6 +4,7 @@ import os from pathlib import Path from typing import Any +from file_management.exceptions import InvalidFileType from file_management.file_management_helper import FileManagerHelper from unstract.sdk.file_storage import FileStorage from unstract.sdk.file_storage.constants import StorageType @@ -82,7 +83,11 @@ class PromptStudioFileHelper: @staticmethod def fetch_file_contents( - org_id: str, user_id: str, tool_id: str, file_name: str + org_id: str, + user_id: str, + tool_id: str, + file_name: str, + allowed_content_types: list[str], ) -> dict[str, Any]: """Method to fetch file contents from the remote location. The path is constructed in runtime based on the args""" @@ -110,7 +115,7 @@ class PromptStudioFileHelper: ) # TODO : Handle this with proper fix # Temporary Hack for frictionless onboarding as the user id will be empty - if not fs_instance.exists(file_system_path): + if not user_id and not fs_instance.exists(file_system_path): file_system_path = ( PromptStudioFileHelper.get_or_create_prompt_studio_subdirectory( org_id=org_id, @@ -121,7 +126,9 @@ class PromptStudioFileHelper: ) file_path = str(Path(file_system_path) / file_name) legacy_file_path = str(Path(legacy_file_system_path) / file_name) - file_content_type = fs_instance.mime_type(file_path) + file_content_type = fs_instance.mime_type( + path=file_path, legacy_storage_path=legacy_file_path + ) if file_content_type == "application/pdf": # Read contents of PDF file into a string text_content_bytes: bytes = fs_instance.read( @@ -140,9 +147,14 @@ class PromptStudioFileHelper: legacy_storage_path=legacy_file_path, encoding="utf-8", ) - return {"data": text_content_string, "mime_type": file_content_type} + # Check if the file type is in the allowed list + elif file_content_type not in allowed_content_types: + raise InvalidFileType(f"File type '{file_content_type}' is not allowed.") + else: - raise ValueError(f"Unsupported file type: {file_content_type}") + logger.warning(f"File type '{file_content_type}' is not handled.") + + return {"data": text_content_string, "mime_type": file_content_type} @staticmethod def delete_for_ide(org_id: str, user_id: str, tool_id: str, file_name: str) -> bool: diff --git a/backend/workflow_manager/endpoint_v2/base_connector.py b/backend/workflow_manager/endpoint_v2/base_connector.py index 7077be73..a5ad644e 100644 --- a/backend/workflow_manager/endpoint_v2/base_connector.py +++ b/backend/workflow_manager/endpoint_v2/base_connector.py @@ -1,16 +1,13 @@ import json from typing import Any -from django.conf import settings from fsspec import AbstractFileSystem from unstract.workflow_execution.execution_file_handler import ExecutionFileHandler from utils.constants import Common from utils.user_context import UserContext -from backend.constants import FeatureFlag from unstract.connectors.filesystems import connectors from unstract.connectors.filesystems.unstract_file_system import UnstractFileSystem -from unstract.flags.feature_flag import check_feature_flag_status class BaseConnector(ExecutionFileHandler): @@ -25,13 +22,6 @@ class BaseConnector(ExecutionFileHandler): utilities. """ super().__init__(workflow_id, execution_id, organization_id) - if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - if not (settings.API_STORAGE_DIR and settings.WORKFLOW_DATA_DIR): - raise ValueError("Missed env API_STORAGE_DIR or WORKFLOW_DATA_DIR") - # Directory path for storing execution-related files for API - self.api_storage_dir: str = self.create_execution_dir_path( - workflow_id, execution_id, organization_id, settings.API_STORAGE_DIR - ) def get_fsspec( self, settings: dict[str, Any], connector_id: str @@ -103,12 +93,7 @@ class BaseConnector(ExecutionFileHandler): str: The directory path for the execution. """ organization_id = UserContext.get_organization_identifier() - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - api_storage_dir: str = cls.get_api_execution_dir( - workflow_id, execution_id, organization_id - ) - else: - api_storage_dir: str = cls.create_execution_dir_path( - workflow_id, execution_id, organization_id, settings.API_STORAGE_DIR - ) + api_storage_dir: str = cls.get_api_execution_dir( + workflow_id, execution_id, organization_id + ) return api_storage_dir diff --git a/backend/workflow_manager/endpoint_v2/destination.py b/backend/workflow_manager/endpoint_v2/destination.py index 15f37f91..e1f00d27 100644 --- a/backend/workflow_manager/endpoint_v2/destination.py +++ b/backend/workflow_manager/endpoint_v2/destination.py @@ -5,13 +5,9 @@ import logging import os from typing import Any, Optional, Union -import fsspec -import magic from connector_v2.models import ConnectorInstance -from fsspec.implementations.local import LocalFileSystem from rest_framework.exceptions import APIException from unstract.sdk.constants import ToolExecKey -from unstract.sdk.file_storage.constants import FileOperationParams from unstract.sdk.tool.mime_types import EXT_MIME_MAP from unstract.workflow_execution.constants import ToolOutputType from utils.user_context import UserContext @@ -40,13 +36,9 @@ from workflow_manager.workflow_v2.models.file_history import FileHistory from workflow_manager.workflow_v2.models.workflow import Workflow from workflow_manager.workflow_v2.utils import WorkflowUtil -from backend.constants import FeatureFlag from backend.exceptions import UnstractFSException from unstract.connectors.exceptions import ConnectorError -from unstract.flags.feature_flag import check_feature_flag_status - -if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - from unstract.filesystem import FileStorageType, FileSystem +from unstract.filesystem import FileStorageType, FileSystem logger = logging.getLogger(__name__) @@ -240,12 +232,10 @@ class DestinationConnector(BaseConnector): destination_fs.create_dir_if_not_exists(input_dir=output_directory) # Traverse local directory and create the same structure in the # output_directory - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) - fs = file_system.get_file_storage() - dir_path = fs.walk(str(destination_volume_path)) - else: - dir_path = os.walk(destination_volume_path) + file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) + fs = file_system.get_file_storage() + dir_path = fs.walk(str(destination_volume_path)) + for root, dirs, files in dir_path: for dir_name in dirs: current_dir = os.path.join( @@ -417,40 +407,7 @@ class DestinationConnector(BaseConnector): Returns: Union[dict[str, Any], str]: Result data. """ - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - return self.get_result_with_file_storage(file_history=file_history) - if file_history and file_history.result: - return self.parse_string(file_history.result) - output_file = os.path.join(self.execution_dir, WorkflowFileType.INFILE) - metadata: dict[str, Any] = self.get_workflow_metadata() - output_type = self.get_output_type(metadata) - result: Union[dict[str, Any], str] = "" - try: - # TODO: SDK handles validation; consider removing here. - with open(output_file, "rb") as f: - file_type = magic.from_buffer(f.read(), mime=True) - if output_type == ToolOutputType.JSON: - if file_type != EXT_MIME_MAP[ToolOutputType.JSON.lower()]: - msg = f"Expected tool output type: JSON, got: '{file_type}'" - logger.error(msg) - raise ToolOutputTypeMismatch(detail=msg) - with open(output_file) as file: - result = json.load(file) - elif output_type == ToolOutputType.TXT: - if file_type == EXT_MIME_MAP[ToolOutputType.JSON.lower()]: - msg = f"Expected tool output type: TXT, got: '{file_type}'" - logger.error(msg) - raise ToolOutputTypeMismatch(detail=msg) - with open(output_file) as file: - result = file.read() - result = result.encode("utf-8").decode("unicode-escape") - else: - raise InvalidToolOutputType() - except (FileNotFoundError, json.JSONDecodeError) as err: - msg = f"Error while getting result from the tool: {err}" - logger.error(msg) - raise APIException(detail=msg) - return result + return self.get_result_with_file_storage(file_history=file_history) def get_result_with_file_storage( self, file_history: Optional[FileHistory] = None @@ -470,9 +427,7 @@ class DestinationConnector(BaseConnector): file_storage = file_system.get_file_storage() try: # TODO: SDK handles validation; consider removing here. - file_type = file_storage.mime_type( - path=output_file, read_length=FileOperationParams.READ_ENTIRE_LENGTH - ) + file_type = file_storage.mime_type(path=output_file) if output_type == ToolOutputType.JSON: if file_type != EXT_MIME_MAP[ToolOutputType.JSON.lower()]: msg = f"Expected tool output type: JSON, got: '{file_type}'" @@ -516,14 +471,10 @@ class DestinationConnector(BaseConnector): Returns: None """ - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) - file_storage = file_system.get_file_storage() - if file_storage.exists(self.execution_dir): - file_storage.rm(self.execution_dir, recursive=True) - else: - fs: LocalFileSystem = fsspec.filesystem("file") - fs.rm(self.execution_dir, recursive=True) + file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) + file_storage = file_system.get_file_storage() + if file_storage.exists(self.execution_dir): + file_storage.rm(self.execution_dir, recursive=True) self.delete_api_storage_dir(self.workflow_id, self.execution_id) @classmethod @@ -536,14 +487,10 @@ class DestinationConnector(BaseConnector): api_storage_dir = cls.get_api_storage_dir_path( workflow_id=workflow_id, execution_id=execution_id ) - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - file_system = FileSystem(FileStorageType.API_EXECUTION) - file_storage = file_system.get_file_storage() - if file_storage.exists(api_storage_dir): - file_storage.rm(api_storage_dir, recursive=True) - else: - fs: LocalFileSystem = fsspec.filesystem("file") - fs.rm(api_storage_dir, recursive=True) + file_system = FileSystem(FileStorageType.API_EXECUTION) + file_storage = file_system.get_file_storage() + if file_storage.exists(api_storage_dir): + file_storage.rm(api_storage_dir, recursive=True) @classmethod def create_endpoint_for_workflow( diff --git a/backend/workflow_manager/endpoint_v2/source.py b/backend/workflow_manager/endpoint_v2/source.py index 633d4db6..a0393214 100644 --- a/backend/workflow_manager/endpoint_v2/source.py +++ b/backend/workflow_manager/endpoint_v2/source.py @@ -39,11 +39,7 @@ from workflow_manager.workflow_v2.execution import WorkflowExecutionServiceHelpe from workflow_manager.workflow_v2.file_history_helper import FileHistoryHelper from workflow_manager.workflow_v2.models.workflow import Workflow -from backend.constants import FeatureFlag -from unstract.flags.feature_flag import check_feature_flag_status - -if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - from unstract.filesystem import FileStorageType, FileSystem +from unstract.filesystem import FileStorageType, FileSystem logger = logging.getLogger(__name__) @@ -508,17 +504,10 @@ class SourceConnector(BaseConnector): ) self.publish_input_file_content(input_file_path, input_log) - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) - file_storage = file_system.get_file_storage() - file_storage.write(path=source_file_path, mode="wb", data=file_content) - file_storage.write(path=infile_path, mode="wb", data=file_content) - else: - with fsspec.open(source_file, "wb") as local_file: - local_file.write(file_content) - - # Copy file to infile directory - self.copy_file_to_infile_dir(source_file_path, infile_path) + file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) + file_storage = file_system.get_file_storage() + file_storage.write(path=source_file_path, mode="wb", data=file_content) + file_storage.write(path=infile_path, mode="wb", data=file_content) logger.info(f"{input_file_path} is added to execution directory") return hash_value_of_file_content @@ -527,20 +516,17 @@ class SourceConnector(BaseConnector): """Add input file to execution directory from api storage.""" infile_path = os.path.join(self.execution_dir, WorkflowFileType.INFILE) source_path = os.path.join(self.execution_dir, WorkflowFileType.SOURCE) - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - api_file_system = FileSystem(FileStorageType.API_EXECUTION) - api_file_storage = api_file_system.get_file_storage() - workflow_file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) - workflow_file_storage = workflow_file_system.get_file_storage() - self._copy_file_to_destination( - source_storage=api_file_storage, - destination_storage=workflow_file_storage, - source_path=input_file_path, - destination_paths=[infile_path, source_path], - ) - else: - shutil.copyfile(input_file_path, infile_path) - shutil.copyfile(input_file_path, source_path) + + api_file_system = FileSystem(FileStorageType.API_EXECUTION) + api_file_storage = api_file_system.get_file_storage() + workflow_file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) + workflow_file_storage = workflow_file_system.get_file_storage() + self._copy_file_to_destination( + source_storage=api_file_storage, + destination_storage=workflow_file_storage, + source_path=input_file_path, + destination_paths=[infile_path, source_path], + ) # TODO: replace it with method from SDK Utils def _copy_file_to_destination( @@ -701,20 +687,13 @@ class SourceConnector(BaseConnector): for file in file_objs: file_name = file.name destination_path = os.path.join(api_storage_dir, file_name) - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - file_system = FileSystem(FileStorageType.API_EXECUTION) - file_storage = file_system.get_file_storage() - buffer = bytearray() - for chunk in file.chunks(): - buffer.extend(chunk) - file_storage.write(path=destination_path, mode="wb", data=buffer) - else: - os.makedirs(os.path.dirname(destination_path), exist_ok=True) - with open(destination_path, "wb") as f: - buffer = bytearray() - for chunk in file.chunks(): - buffer.extend(chunk) - f.write(buffer) + + file_system = FileSystem(FileStorageType.API_EXECUTION) + file_storage = file_system.get_file_storage() + buffer = bytearray() + for chunk in file.chunks(): + buffer.extend(chunk) + file_storage.write(path=destination_path, mode="wb", data=buffer) file_hash = cls.hash_str(buffer) connection_type = WorkflowEndpoint.ConnectionType.API diff --git a/docker/docker-compose-dev-essentials.yaml b/docker/docker-compose-dev-essentials.yaml index 7ad99fdb..7b637838 100644 --- a/docker/docker-compose-dev-essentials.yaml +++ b/docker/docker-compose-dev-essentials.yaml @@ -46,6 +46,21 @@ services: - traefik.http.routers.minio.rule=Host(`minio.unstract.localhost`) - traefik.http.services.minio.loadbalancer.server.port=9001 + createbuckets: + image: minio/mc + depends_on: + - minio + entrypoint: > + /bin/sh -c " + sleep 5; + mc alias set minio http://unstract-minio:9000 minio minio123; + mc mb minio/unstract; + mc mirror /app/prompt-studio-data minio/unstract/prompt-studio-data; + exit 0; + " + volumes: + - prompt_studio_data:/app/prompt-studio-data + reverse-proxy: # The official v2 Traefik docker image image: traefik:v2.10 diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index 0ed380af..6fd95823 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -17,6 +17,7 @@ services: - redis - reverse-proxy - minio + - createbuckets - platform-service - prompt-service - x2text-service @@ -171,6 +172,8 @@ services: restart: unless-stopped depends_on: - db + - minio + - createbuckets ports: - "3003:3003" env_file: diff --git a/platform-service/sample.env b/platform-service/sample.env index b7de0f4c..54eec703 100644 --- a/platform-service/sample.env +++ b/platform-service/sample.env @@ -31,10 +31,10 @@ FLIPT_SERVICE_AVAILABLE=False # Cost calculation related ENVs MODEL_PRICES_URL="https://raw.githubusercontent.com/BerriAI/litellm/main/model_prices_and_context_window.json" MODEL_PRICES_TTL_IN_DAYS=7 -MODEL_PRICES_FILE_PATH="/tmp/model_prices.json" +MODEL_PRICES_FILE_PATH="/cost/model_prices.json" #Remote storage config -FILE_STORAGE_CREDENTIALS='{"provider":"gcs","credentials": {"token": {token-value-json}}' -REMOTE_MODEL_PRICES_FILE_PATH="fsspec-test/cost/model_prices.json" +FILE_STORAGE_CREDENTIALS='{"provider":"local"}' +REMOTE_MODEL_PRICES_FILE_PATH="unstract/cost/model_prices.json" LOG_LEVEL=INFO diff --git a/platform-service/src/unstract/platform_service/constants.py b/platform-service/src/unstract/platform_service/constants.py index f286cc7d..89f01664 100644 --- a/platform-service/src/unstract/platform_service/constants.py +++ b/platform-service/src/unstract/platform_service/constants.py @@ -1,10 +1,3 @@ -class FeatureFlag: - """Temporary feature flags.""" - - # For enabling remote storage feature - REMOTE_FILE_STORAGE = "remote_file_storage" - - class DBTable: """Database tables.""" diff --git a/platform-service/src/unstract/platform_service/helper/cost_calculation.py b/platform-service/src/unstract/platform_service/helper/cost_calculation.py index 53b5cf06..d844f1fe 100644 --- a/platform-service/src/unstract/platform_service/helper/cost_calculation.py +++ b/platform-service/src/unstract/platform_service/helper/cost_calculation.py @@ -1,21 +1,13 @@ import json -import os -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from typing import Any, Optional import requests from flask import current_app as app -from unstract.platform_service.constants import FeatureFlag from unstract.platform_service.env import Env from unstract.platform_service.utils import format_float_positional - -from unstract.flags.feature_flag import check_feature_flag_status - -if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - from datetime import timezone - - from unstract.sdk.exceptions import FileStorageError - from unstract.sdk.file_storage import EnvHelper, StorageType +from unstract.sdk.exceptions import FileStorageError +from unstract.sdk.file_storage import EnvHelper, StorageType class CostCalculationHelper: @@ -29,25 +21,21 @@ class CostCalculationHelper: self.url = url self.file_path = file_path - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - try: - self.file_storage = EnvHelper.get_storage( - StorageType.PERMANENT, "FILE_STORAGE_CREDENTIALS" - ) - self.file_path = os.environ.get("REMOTE_MODEL_PRICES_FILE_PATH") - except KeyError as e: - app.logger.error( - f"Required credentials is missing in the env: {str(e)}" - ) - raise e - except FileStorageError as e: - app.logger.error( - "Error while initialising storage: %s", - e, - stack_info=True, - exc_info=True, - ) - raise e + try: + self.file_storage = EnvHelper.get_storage( + StorageType.PERMANENT, "FILE_STORAGE_CREDENTIALS" + ) + except KeyError as e: + app.logger.error(f"Required credentials is missing in the env: {str(e)}") + raise e + except FileStorageError as e: + app.logger.error( + "Error while initialising storage: %s", + e, + stack_info=True, + exc_info=True, + ) + raise e self.model_token_data = self._get_model_token_data() @@ -58,10 +46,7 @@ class CostCalculationHelper: item = None if not self.model_token_data: - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - return json.loads(format_float_positional(cost)) - else: - return format_float_positional(cost) + return json.loads(format_float_positional(cost)) # Filter the model objects by model name filtered_models = { k: v for k, v in self.model_token_data.items() if k.endswith(model_name) @@ -80,43 +65,25 @@ class CostCalculationHelper: def _get_model_token_data(self) -> Optional[dict[str, Any]]: try: - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - # File does not exist, fetch JSON data from API - if not self.file_storage.exists(self.file_path): - return self._fetch_and_save_json() + # File does not exist, fetch JSON data from API + if not self.file_storage.exists(self.file_path): + return self._fetch_and_save_json() - file_mtime = self.file_storage.modification_time(self.file_path) - file_expiry_date = file_mtime + timedelta(days=self.ttl_days) - file_expiry_date_utc = file_expiry_date.replace(tzinfo=timezone.utc) - now_utc = datetime.now().replace(tzinfo=timezone.utc) + file_mtime = self.file_storage.modification_time(self.file_path) + file_expiry_date = file_mtime + timedelta(days=self.ttl_days) + file_expiry_date_utc = file_expiry_date.replace(tzinfo=timezone.utc) + now_utc = datetime.now().replace(tzinfo=timezone.utc) - if now_utc < file_expiry_date_utc: - app.logger.info(f"Reading model token data from {self.file_path}") - # File exists and TTL has not expired, read and return content - file_contents = self.file_storage.read( - self.file_path, mode="r", encoding="utf-8" - ) - return json.loads(file_contents) - else: - # TTL expired, fetch updated JSON data from API - return self._fetch_and_save_json() - else: - # File does not exist, fetch JSON data from API - if not os.path.exists(self.file_path): - return self._fetch_and_save_json() - - file_mtime = os.path.getmtime(self.file_path) - file_expiry_date = datetime.fromtimestamp(file_mtime) + timedelta( - days=self.ttl_days + if now_utc < file_expiry_date_utc: + app.logger.info(f"Reading model token data from {self.file_path}") + # File exists and TTL has not expired, read and return content + file_contents = self.file_storage.read( + self.file_path, mode="r", encoding="utf-8" ) - if datetime.now() < file_expiry_date: - app.logger.info(f"Reading model token data from {self.file_path}") - # File exists and TTL has not expired, read and return content - with open(self.file_path, encoding="utf-8") as f: - return json.load(f) - else: - # TTL expired, fetch updated JSON data from API - return self._fetch_and_save_json() + return json.loads(file_contents) + else: + # TTL expired, fetch updated JSON data from API + return self._fetch_and_save_json() except Exception as e: app.logger.warning( "Error in calculate_cost: %s", e, stack_info=True, exc_info=True @@ -137,21 +104,12 @@ class CostCalculationHelper: response.raise_for_status() json_data = response.json() # Save JSON data to file - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - self.file_storage.json_dump( - path=self.file_path, - data=json_data, - ensure_ascii=False, - indent=4, - ) - else: - with open(self.file_path, "w", encoding="utf-8") as f: - json.dump(json_data, f, ensure_ascii=False, indent=4) - # Set the file's modification time to indicate TTL - expiry_date = datetime.now() + timedelta(days=self.ttl_days) - expiry_timestamp = expiry_date.timestamp() - os.utime(self.file_path, (expiry_timestamp, expiry_timestamp)) - + self.file_storage.json_dump( + path=self.file_path, + data=json_data, + ensure_ascii=False, + indent=4, + ) app.logger.info( "File '%s' updated successfully with TTL set to %d days.", self.file_path, diff --git a/prompt-service/sample.env b/prompt-service/sample.env index 4a447653..c59b082c 100644 --- a/prompt-service/sample.env +++ b/prompt-service/sample.env @@ -30,7 +30,7 @@ PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python # Flipt Service FLIPT_SERVICE_AVAILABLE=False - #Remote storage related envs -PERMANENT_REMOTE_STORAGE={"provider":"gcs","credentials":} -REMOTE_PROMPT_STUDIO_FILE_PATH="/prompt_studio_data/" +PERMANENT_REMOTE_STORAGE='{"provider": "minio", "credentials": {"endpoint_url": "http://unstract-minio:9000", "key": "minio", "secret": "minio123"}}' +TEMPORARY_REMOTE_STORAGE='{"provider": "minio", "credentials": {"endpoint_url": "http://unstract-minio:9000", "key": "minio", "secret": "minio123"}}' +REMOTE_PROMPT_STUDIO_FILE_PATH="unstract/prompt_studio_data/" diff --git a/prompt-service/src/unstract/prompt_service/constants.py b/prompt-service/src/unstract/prompt_service/constants.py index 88d33b08..6aed9e68 100644 --- a/prompt-service/src/unstract/prompt_service/constants.py +++ b/prompt-service/src/unstract/prompt_service/constants.py @@ -91,12 +91,6 @@ class RunLevel(Enum): TABLE_EXTRACTION = "TABLE_EXTRACTION" -class FeatureFlag: - """Temporary feature flags.""" - - REMOTE_FILE_STORAGE = "remote_file_storage" - - class DBTableV2: """Database tables.""" diff --git a/prompt-service/src/unstract/prompt_service/helper.py b/prompt-service/src/unstract/prompt_service/helper.py index 1f538515..b1f4d70c 100644 --- a/prompt-service/src/unstract/prompt_service/helper.py +++ b/prompt-service/src/unstract/prompt_service/helper.py @@ -10,7 +10,6 @@ from unstract.prompt_service.config import db from unstract.prompt_service.constants import ( DBTableV2, ExecutionSource, - FeatureFlag, FileStorageKeys, ) from unstract.prompt_service.constants import PromptServiceContants as PSKeys @@ -19,15 +18,11 @@ from unstract.prompt_service.env_manager import EnvLoader from unstract.prompt_service.exceptions import APIError, RateLimitError from unstract.sdk.exceptions import RateLimitError as SdkRateLimitError from unstract.sdk.exceptions import SdkError +from unstract.sdk.file_storage import FileStorage, FileStorageProvider +from unstract.sdk.file_storage.constants import StorageType +from unstract.sdk.file_storage.env_helper import EnvHelper from unstract.sdk.llm import LLM -from unstract.flags.feature_flag import check_feature_flag_status - -if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - from unstract.sdk.file_storage import FileStorage, FileStorageProvider - from unstract.sdk.file_storage.constants import StorageType - from unstract.sdk.file_storage.env_helper import EnvHelper - PAID_FEATURE_MSG = ( "It is a cloud / enterprise feature. If you have purchased a plan and still " "face this issue, please contact support" @@ -307,27 +302,23 @@ def run_completion( ) highlight_data = None if highlight_data_plugin and enable_highlight: - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - fs_instance: FileStorage = FileStorage(FileStorageProvider.LOCAL) - if execution_source == ExecutionSource.IDE.value: - fs_instance = EnvHelper.get_storage( - storage_type=StorageType.PERMANENT, - env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE, - ) - if execution_source == ExecutionSource.TOOL.value: - fs_instance = EnvHelper.get_storage( - storage_type=StorageType.SHARED_TEMPORARY, - env_name=FileStorageKeys.TEMPORARY_REMOTE_STORAGE, - ) - highlight_data = highlight_data_plugin["entrypoint_cls"]( - logger=current_app.logger, - file_path=file_path, - fs_instance=fs_instance, - ).run - else: - highlight_data = highlight_data_plugin["entrypoint_cls"]( - logger=current_app.logger, file_path=file_path - ).run + fs_instance: FileStorage = FileStorage(FileStorageProvider.LOCAL) + if execution_source == ExecutionSource.IDE.value: + fs_instance = EnvHelper.get_storage( + storage_type=StorageType.PERMANENT, + env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE, + ) + if execution_source == ExecutionSource.TOOL.value: + fs_instance = EnvHelper.get_storage( + storage_type=StorageType.SHARED_TEMPORARY, + env_name=FileStorageKeys.TEMPORARY_REMOTE_STORAGE, + ) + highlight_data = highlight_data_plugin["entrypoint_cls"]( + logger=current_app.logger, + file_path=file_path, + fs_instance=fs_instance, + ).run + completion = llm.complete( prompt=prompt, process_text=highlight_data, @@ -368,32 +359,24 @@ def extract_table( "Unable to extract table details. " "Please contact admin to resolve this issue." ) - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - fs_instance: FileStorage = FileStorage(FileStorageProvider.LOCAL) - if execution_source == ExecutionSource.IDE.value: - fs_instance = EnvHelper.get_storage( - storage_type=StorageType.PERMANENT, - env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE, - ) - if execution_source == ExecutionSource.TOOL.value: - fs_instance = EnvHelper.get_storage( - storage_type=StorageType.SHARED_TEMPORARY, - env_name=FileStorageKeys.TEMPORARY_REMOTE_STORAGE, - ) + fs_instance: FileStorage = FileStorage(FileStorageProvider.LOCAL) + if execution_source == ExecutionSource.IDE.value: + fs_instance = EnvHelper.get_storage( + storage_type=StorageType.PERMANENT, + env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE, + ) + if execution_source == ExecutionSource.TOOL.value: + fs_instance = EnvHelper.get_storage( + storage_type=StorageType.TEMPORARY, + env_name=FileStorageKeys.TEMPORARY_REMOTE_STORAGE, + ) try: - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - answer = table_extractor["entrypoint_cls"].extract_large_table( - llm=llm, - table_settings=table_settings, - enforce_type=enforce_type, - fs_instance=fs_instance, - ) - else: - answer = table_extractor["entrypoint_cls"].extract_large_table( - llm=llm, - table_settings=table_settings, - enforce_type=enforce_type, - ) + answer = table_extractor["entrypoint_cls"].extract_large_table( + llm=llm, + table_settings=table_settings, + enforce_type=enforce_type, + fs_instance=fs_instance, + ) structured_output[output[PSKeys.NAME]] = answer # We do not support summary and eval for table. # Hence returning the result @@ -428,32 +411,23 @@ def extract_line_item( ) # Read file content into context - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - fs_instance: FileStorage = FileStorage(FileStorageProvider.LOCAL) - if execution_source == ExecutionSource.IDE.value: - fs_instance = EnvHelper.get_storage( - storage_type=StorageType.PERMANENT, - env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE, - ) - if execution_source == ExecutionSource.TOOL.value: - fs_instance = EnvHelper.get_storage( - storage_type=StorageType.SHARED_TEMPORARY, - env_name=FileStorageKeys.TEMPORARY_REMOTE_STORAGE, - ) + fs_instance: FileStorage = FileStorage(FileStorageProvider.LOCAL) + if execution_source == ExecutionSource.IDE.value: + fs_instance = EnvHelper.get_storage( + storage_type=StorageType.PERMANENT, + env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE, + ) + if execution_source == ExecutionSource.TOOL.value: + fs_instance = EnvHelper.get_storage( + storage_type=StorageType.SHARED_TEMPORARY, + env_name=FileStorageKeys.TEMPORARY_REMOTE_STORAGE, + ) - if not fs_instance.exists(extract_file_path): - raise FileNotFoundError( - f"The file at path '{extract_file_path}' does not exist." - ) - context = fs_instance.read(path=extract_file_path, encoding="utf-8", mode="rb") - else: - if not os.path.exists(extract_file_path): - raise FileNotFoundError( - f"The file at path '{extract_file_path}' does not exist." - ) - - with open(extract_file_path, encoding="utf-8") as file: - context = file.read() + if not fs_instance.exists(extract_file_path): + raise FileNotFoundError( + f"The file at path '{extract_file_path}' does not exist." + ) + context = fs_instance.read(path=extract_file_path, encoding="utf-8", mode="rb") prompt = construct_prompt( preamble=tool_settings.get(PSKeys.PREAMBLE, ""), diff --git a/runner/README.md b/runner/README.md index eda2dbdb..5dae3f38 100644 --- a/runner/README.md +++ b/runner/README.md @@ -37,12 +37,11 @@ sudo ln -s "$HOME/.docker/run/docker.sock" /var/run/docker.sock ## Required Environment Variables -| Variable | Description | -| -------------------------- | ---------------------------------------------------------------------------------------| -| `CELERY_BROKER_URL` | URL for Celery's message broker, used to queue tasks. Must match backend configuration.| -| `TOOL_CONTAINER_NETWORK` | Network used for running tool containers. | -| `TOOL_CONTAINER_LABELS` | Labels applied to tool containers for observability [Optional]. | -| `WORKFLOW_DATA_DIR` | Source mount bind directory for tool containers to access input files. | -| `TOOL_DATA_DIR` | Target mount directory within tool containers. (Default: "/data") | -| `LOG_LEVEL` | Log level for runner (Options: INFO, WARNING, ERROR, DEBUG, etc.) | +| Variable | Description | +| -------------------------- |-----------------------------------------------------------------------------------------------| +| `CELERY_BROKER_URL` | URL for Celery's message broker, used to queue tasks. Must match backend configuration. | +| `TOOL_CONTAINER_NETWORK` | Network used for running tool containers. | +| `TOOL_CONTAINER_LABELS` | Labels applied to tool containers for observability [Optional]. | +| `EXECUTION_DATA_DIR` | Target mount directory within tool containers. (Default: "/data") | +| `LOG_LEVEL` | Log level for runner (Options: INFO, WARNING, ERROR, DEBUG, etc.) | | `REMOVE_CONTAINER_ON_EXIT`| Flag to decide whether to clean up/ remove the tool container after execution. (Default: True) | diff --git a/runner/sample.env b/runner/sample.env index 0ff991e9..26aa344e 100644 --- a/runner/sample.env +++ b/runner/sample.env @@ -3,8 +3,6 @@ CELERY_BROKER_URL = "redis://unstract-redis:6379" TOOL_CONTAINER_NETWORK="unstract-network" TOOL_CONTAINER_LABELS="[]" -WORKFLOW_DATA_DIR="${PWD}/workflow_data/execution" -TOOL_DATA_DIR="/data" PRIVATE_REGISTRY_CREDENTIAL_PATH= PRIVATE_REGISTRY_USERNAME= PRIVATE_REGISTRY_URL= @@ -19,7 +17,6 @@ REMOVE_CONTAINER_ON_EXIT=True # Client module path of the container engine to be used. CONTAINER_CLIENT_PATH=unstract.runner.clients.docker -EXECUTION_RUN_DATA_FOLDER_PREFIX="/app/workflow_data" # Feature Flags FLIPT_SERVICE_AVAILABLE=False @@ -29,7 +26,7 @@ PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python # File System Configuration for Workflow and API Execution # Directory Prefixes for storing execution files -WORKFLOW_EXECUTION_DIR_PREFIX="/unstract/execution" +WORKFLOW_EXECUTION_DIR_PREFIX="unstract/execution" # Storage Provider for Workflow Execution # Valid options: MINIO, S3, etc.. -WORKFLOW_EXECUTION_FILE_STORAGE_CREDENTIALS='{"provider":"minio","credentials": {"endpoint_url":"http://unstract-minio:9000","key":"XXX","secret":"XXX"}}' +WORKFLOW_EXECUTION_FILE_STORAGE_CREDENTIALS='{"provider": "minio", "credentials": {"endpoint_url": "http://unstract-minio:9000", "key": "minio", "secret": "minio123"}}' diff --git a/runner/src/unstract/runner/clients/docker.py b/runner/src/unstract/runner/clients/docker.py index 5676423b..483bb16f 100644 --- a/runner/src/unstract/runner/clients/docker.py +++ b/runner/src/unstract/runner/clients/docker.py @@ -9,12 +9,11 @@ from unstract.runner.clients.interface import ( ContainerClientInterface, ContainerInterface, ) -from unstract.runner.constants import Env, FeatureFlag +from unstract.runner.constants import Env from unstract.runner.utils import Utils from docker import DockerClient from unstract.core.utilities import UnstractUtils -from unstract.flags.feature_flag import check_feature_flag_status class DockerContainer(ContainerInterface): @@ -159,9 +158,6 @@ class Client(ContainerClientInterface): def get_container_run_config( self, command: list[str], - organization_id: str, - workflow_id: str, - execution_id: str, run_id: str, envs: Optional[dict[str, Any]] = None, auto_remove: bool = False, @@ -169,27 +165,6 @@ class Client(ContainerClientInterface): if envs is None: envs = {} mounts = [] - if organization_id and workflow_id and execution_id: - if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - source_path = os.path.join( - os.getenv(Env.WORKFLOW_DATA_DIR, ""), - organization_id, - workflow_id, - execution_id, - ) - mounts.append( - { - "type": "bind", - "source": source_path, - "target": os.getenv(Env.TOOL_DATA_DIR, "/data"), - } - ) - envs[Env.EXECUTION_RUN_DATA_FOLDER] = os.path.join( - os.getenv(Env.EXECUTION_RUN_DATA_FOLDER_PREFIX, ""), - organization_id, - workflow_id, - execution_id, - ) return { "name": UnstractUtils.build_tool_container_name( tool_image=self.image_name, tool_version=self.image_tag, run_id=run_id diff --git a/runner/src/unstract/runner/clients/interface.py b/runner/src/unstract/runner/clients/interface.py index 231a573d..9d0876ba 100644 --- a/runner/src/unstract/runner/clients/interface.py +++ b/runner/src/unstract/runner/clients/interface.py @@ -62,9 +62,6 @@ class ContainerClientInterface(ABC): def get_container_run_config( self, command: list[str], - organization_id: str, - workflow_id: str, - execution_id: str, run_id: str, envs: Optional[dict[str, Any]] = None, auto_remove: bool = False, diff --git a/runner/src/unstract/runner/clients/test_docker.py b/runner/src/unstract/runner/clients/test_docker.py index 1a8dde0f..42d99b56 100644 --- a/runner/src/unstract/runner/clients/test_docker.py +++ b/runner/src/unstract/runner/clients/test_docker.py @@ -108,12 +108,7 @@ def test_get_image(docker_client, mocker): def test_get_container_run_config(docker_client, mocker): """Test the get_container_run_config method.""" - os.environ[Env.WORKFLOW_DATA_DIR] = "/source" - os.environ[Env.EXECUTION_RUN_DATA_FOLDER_PREFIX] = "/app/workflow_data" command = ["echo", "hello"] - organization_id = "org123" - workflow_id = "wf123" - execution_id = "ex123" run_id = "run123" mocker.patch.object(docker_client, "_Client__image_exists", return_value=True) @@ -122,13 +117,7 @@ def test_get_container_run_config(docker_client, mocker): return_value="test-image", ) config = docker_client.get_container_run_config( - command, - organization_id, - workflow_id, - execution_id, - run_id, - envs={"KEY": "VALUE"}, - auto_remove=True, + command, run_id, envs={"KEY": "VALUE"}, auto_remove=True ) mocker_normalize.assert_called_once_with( @@ -137,24 +126,14 @@ def test_get_container_run_config(docker_client, mocker): assert config["name"] == "test-image" assert config["image"] == "test-image:latest" assert config["command"] == ["echo", "hello"] - assert config["environment"] == { - "KEY": "VALUE", - "EXECUTION_RUN_DATA_FOLDER": ("/app/workflow_data/org123/wf123/ex123"), - } - assert config["mounts"] == [ - { - "type": "bind", - "source": f"/source/{organization_id}/{workflow_id}/{execution_id}", - "target": "/data", - } - ] + assert config["environment"] == {"KEY": "VALUE"} + assert config["mounts"] == [] def test_get_container_run_config_without_mount(docker_client, mocker): """Test the get_container_run_config method.""" - os.environ[Env.WORKFLOW_DATA_DIR] = "/source" + os.environ[Env.EXECUTION_DATA_DIR] = "/source" command = ["echo", "hello"] - execution_id = "ex123" run_id = "run123" mocker.patch.object(docker_client, "_Client__image_exists", return_value=True) @@ -162,14 +141,7 @@ def test_get_container_run_config_without_mount(docker_client, mocker): "unstract.core.utilities.UnstractUtils.build_tool_container_name", return_value="test-image", ) - config = docker_client.get_container_run_config( - command, - None, - None, - execution_id, - run_id, - auto_remove=True, - ) + config = docker_client.get_container_run_config(command, run_id, auto_remove=True) mocker_normalize.assert_called_once_with( tool_image="test-image", tool_version="latest", run_id=run_id diff --git a/runner/src/unstract/runner/constants.py b/runner/src/unstract/runner/constants.py index 6db1d28c..d2533962 100644 --- a/runner/src/unstract/runner/constants.py +++ b/runner/src/unstract/runner/constants.py @@ -20,10 +20,6 @@ class ToolKey: class Env: TOOL_CONTAINER_NETWORK = "TOOL_CONTAINER_NETWORK" TOOL_CONTAINER_LABELS = "TOOL_CONTAINER_LABELS" - WORKFLOW_DATA_DIR = "WORKFLOW_DATA_DIR" - EXECUTION_RUN_DATA_FOLDER = "EXECUTION_RUN_DATA_FOLDER" - EXECUTION_RUN_DATA_FOLDER_PREFIX = "EXECUTION_RUN_DATA_FOLDER_PREFIX" - TOOL_DATA_DIR = "TOOL_DATA_DIR" PRIVATE_REGISTRY_CREDENTIAL_PATH = "PRIVATE_REGISTRY_CREDENTIAL_PATH" PRIVATE_REGISTRY_USERNAME = "PRIVATE_REGISTRY_USERNAME" PRIVATE_REGISTRY_URL = "PRIVATE_REGISTRY_URL" @@ -35,9 +31,3 @@ class Env: ) EXECUTION_DATA_DIR = "EXECUTION_DATA_DIR" FLIPT_SERVICE_AVAILABLE = "FLIPT_SERVICE_AVAILABLE" - - -class FeatureFlag: - """Temporary feature flags.""" - - REMOTE_FILE_STORAGE = "remote_file_storage" diff --git a/runner/src/unstract/runner/runner.py b/runner/src/unstract/runner/runner.py index eda4c652..a1e31b75 100644 --- a/runner/src/unstract/runner/runner.py +++ b/runner/src/unstract/runner/runner.py @@ -11,12 +11,11 @@ from unstract.runner.clients.interface import ( ContainerClientInterface, ContainerInterface, ) -from unstract.runner.constants import Env, FeatureFlag, LogLevel, LogType, ToolKey +from unstract.runner.constants import Env, LogLevel, LogType, ToolKey from unstract.runner.exception import ToolRunException from unstract.core.constants import LogFieldName from unstract.core.pubsub_helper import LogPublisher -from unstract.flags.feature_flag import check_feature_flag_status load_dotenv() # Loads the container clinet class. @@ -140,12 +139,7 @@ class UnstractRunner: """ command = command.upper() container_config = self.client.get_container_run_config( - command=["--command", command], - organization_id="", - workflow_id="", - execution_id="", - run_id="", - auto_remove=True, + command=["--command", command], run_id="", auto_remove=True ) container = None @@ -186,19 +180,16 @@ class UnstractRunner: Returns: Optional[Any]: _description_ """ - tool_data_dir = os.getenv(Env.TOOL_DATA_DIR, "/data") - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - envs[Env.EXECUTION_DATA_DIR] = os.path.join( - os.getenv(Env.WORKFLOW_EXECUTION_DIR_PREFIX, ""), - organization_id, - workflow_id, - execution_id, - ) - envs[Env.WORKFLOW_EXECUTION_FILE_STORAGE_CREDENTIALS] = os.getenv( - Env.WORKFLOW_EXECUTION_FILE_STORAGE_CREDENTIALS, "{}" - ) - else: - envs[Env.TOOL_DATA_DIR] = tool_data_dir + + envs[Env.EXECUTION_DATA_DIR] = os.path.join( + os.getenv(Env.WORKFLOW_EXECUTION_DIR_PREFIX, ""), + organization_id, + workflow_id, + execution_id, + ) + envs[Env.WORKFLOW_EXECUTION_FILE_STORAGE_CREDENTIALS] = os.getenv( + Env.WORKFLOW_EXECUTION_FILE_STORAGE_CREDENTIALS, "{}" + ) container_config = self.client.get_container_run_config( command=[ @@ -209,9 +200,6 @@ class UnstractRunner: "--log-level", "DEBUG", ], - organization_id=organization_id, - workflow_id=workflow_id, - execution_id=execution_id, run_id=run_id, envs=envs, ) diff --git a/tools/classifier/README.md b/tools/classifier/README.md index cdef5a18..34dd5a88 100644 --- a/tools/classifier/README.md +++ b/tools/classifier/README.md @@ -5,11 +5,11 @@ The document classifier tool classifies documents and copies them to a folder ba ## Required environment variables | Variable | Description | -| -------------------------- | --------------------------------------------------------------------- | +| -------------------------- |-----------------------------------------------------------------------| | `PLATFORM_SERVICE_HOST` | The host in which the platform service is running | | `PLATFORM_SERVICE_PORT` | The port in which the service is listening | | `PLATFORM_SERVICE_API_KEY` | The API key for the platform | -| `TOOL_DATA_DIR` | The directory in the filesystem which has contents for tool execution | +| `EXECUTION_DATA_DIR` | The directory in the filesystem which has contents for tool execution | ## Testing the tool locally @@ -41,7 +41,7 @@ Load the environment variables for the tool. Make a copy of the `sample.env` file and name it `.env`. Fill in the required values. They get loaded with [python-dotenv](https://pypi.org/project/python-dotenv/) through the SDK. -Update the tool's `data_dir` marked by the `TOOL_DATA_DIR` env. This has to be done before each tool execution since the tool updates the `INFILE` and `METADATA.json`. +Update the tool's `data_dir` marked by the `EXECUTION_DATA_DIR` env. This has to be done before each tool execution since the tool updates the `INFILE` and `METADATA.json`. ### Run SPEC command @@ -99,7 +99,7 @@ Build the tool docker image from the folder containing the `Dockerfile` with docker build -t unstract/tool-classifier:0.0.1 . ``` -Make sure the directory pointed by `TOOL_DATA_DIR` has the required information for the tool to run and +Make sure the directory pointed by `EXECUTION_DATA_DIR` has the required information for the tool to run and necessary services like the `platform-service` is up. To test the tool from its docker image, run the following command diff --git a/tools/classifier/sample.env b/tools/classifier/sample.env index fda10fc4..b1d6474d 100644 --- a/tools/classifier/sample.env +++ b/tools/classifier/sample.env @@ -1,7 +1,7 @@ PLATFORM_SERVICE_HOST=http://unstract-platform-service PLATFORM_SERVICE_PORT=3001 PLATFORM_SERVICE_API_KEY= -TOOL_DATA_DIR=../data_dir +EXECUTION_DATA_DIR=../data_dir X2TEXT_HOST=http://unstract-x2text-service X2TEXT_PORT=3004 diff --git a/tools/classifier/src/helper.py b/tools/classifier/src/helper.py index 66179d68..8e27d699 100644 --- a/tools/classifier/src/helper.py +++ b/tools/classifier/src/helper.py @@ -1,5 +1,4 @@ import re -import shutil from pathlib import Path from typing import Any, Optional @@ -24,7 +23,7 @@ class ClassifierHelper: Args: tool (BaseTool): Base tool instance - output_dir (str): Output directory in TOOL_DATA_DIR + output_dir (str): Output directory in EXECUTION_DATA_DIR """ self.tool = tool self.output_dir = output_dir @@ -67,20 +66,13 @@ class ClassifierHelper: """ try: output_folder_bin = Path(self.output_dir) / classification - if self.tool.workflow_filestorage: - output_file = output_folder_bin / source_name - self._copy_file( - source_fs=self.tool.workflow_filestorage, - destination_fs=self.tool.workflow_filestorage, - source_path=source_file, - destination_path=str(output_file), - ) - else: - if not output_folder_bin.is_dir(): - output_folder_bin.mkdir(parents=True, exist_ok=True) - - output_file = output_folder_bin / source_name - shutil.copyfile(source_file, output_file) + output_file = output_folder_bin / source_name + self._copy_file( + source_fs=self.tool.workflow_filestorage, + destination_fs=self.tool.workflow_filestorage, + source_path=source_file, + destination_path=str(output_file), + ) except Exception as e: self.tool.stream_error_and_exit(f"Error creating output file: {e}") @@ -150,16 +142,11 @@ class ClassifierHelper: self.tool.stream_log("Text extraction adapter has been created successfully.") try: - if self.tool.workflow_filestorage: - extraction_result: TextExtractionResult = x2text.process( - input_file_path=file, - fs=self.tool.workflow_filestorage, - tags=self.tool.tags, - ) - else: - extraction_result: TextExtractionResult = x2text.process( - input_file_path=file, tags=self.tool.tags - ) + extraction_result: TextExtractionResult = x2text.process( + input_file_path=file, + fs=self.tool.workflow_filestorage, + tags=self.tool.tags, + ) extracted_text: str = extraction_result.extracted_text return extracted_text except Exception as e: @@ -176,13 +163,9 @@ class ClassifierHelper: """ self.tool.stream_log("Extracting text from file") try: - if self.tool.workflow_filestorage: - text = self.tool.workflow_filestorage.read(path=file, mode="rb").decode( - "utf-8" - ) - else: - with open(file, "rb") as f: - text = f.read().decode("utf-8") + text = self.tool.workflow_filestorage.read(path=file, mode="rb").decode( + "utf-8" + ) except Exception as e: self.tool.stream_log(f"File error: {e}") return None diff --git a/tools/structure/README.md b/tools/structure/README.md index cddcf798..458f5589 100644 --- a/tools/structure/README.md +++ b/tools/structure/README.md @@ -5,11 +5,11 @@ This is a helper tool that needs to be used along with `Prompt Studio`. It helps ## Required environment variables | Variable | Description | -| -------------------------- | --------------------------------------------------------------------- | +| -------------------------- |-----------------------------------------------------------------------| | `PLATFORM_SERVICE_HOST` | The host in which the platform service is running | | `PLATFORM_SERVICE_PORT` | The port in which the service is listening | | `PLATFORM_SERVICE_API_KEY` | The API key for the platform | -| `TOOL_DATA_DIR` | The directory in the filesystem which has contents for tool execution | +| `EXECUTION_DATA_DIR` | The directory in the filesystem which has contents for tool execution | | `PROMPT_HOST` | The host in which the prompt service is running | | `PROMPT_PORT` | The port in which the prompt service is listening | | `PROMPT_PORT` | The port in which the prompt service is listening | @@ -46,7 +46,7 @@ Load the environment variables for the tool. Make a copy of the `sample.env` file and name it `.env`. Fill in the required values. They get loaded with [python-dotenv](https://pypi.org/project/python-dotenv/) through the SDK. -Update the tool's `data_dir` marked by the `TOOL_DATA_DIR` env. This has to be done before each tool execution since the tool updates the `INFILE` and `METADATA.json`. +Update the tool's `data_dir` marked by the `EXECUTION_DATA_DIR` env. This has to be done before each tool execution since the tool updates the `INFILE` and `METADATA.json`. ### Run SPEC command @@ -102,7 +102,7 @@ Build the tool docker image from the folder containing the `Dockerfile` with docker build -t unstract/tool-structure:0.0.1 . ``` -Make sure the directory pointed by `TOOL_DATA_DIR` has the required information for the tool to run and +Make sure the directory pointed by `EXECUTION_DATA_DIR` has the required information for the tool to run and necessary services like the `platform-service` is up. To test the tool from its docker image, run the following command diff --git a/tools/structure/sample.env b/tools/structure/sample.env index 54057540..0e5f82c2 100644 --- a/tools/structure/sample.env +++ b/tools/structure/sample.env @@ -1,9 +1,7 @@ PLATFORM_SERVICE_HOST=http://unstract-platform-service PLATFORM_SERVICE_PORT=3001 PLATFORM_SERVICE_API_KEY= -TOOL_DATA_DIR=../data_dir -EXECUTION_RUN_DATA_FOLDER=../data_dir - +EXECUTION_DATA_DIR=../data_dir PROMPT_HOST=http://unstract-prompt-service PROMPT_PORT=3003 diff --git a/tools/structure/src/constants.py b/tools/structure/src/constants.py index 8f77d9ed..ecac9f1d 100644 --- a/tools/structure/src/constants.py +++ b/tools/structure/src/constants.py @@ -50,7 +50,6 @@ class SettingsKeys: SINGLE_PASS_EXTRACTION_MODE = "single_pass_extraction_mode" CHALLENGE_LLM_ADAPTER_ID = "challenge_llm_adapter_id" SUMMARIZE_AS_SOURCE = "summarize_as_source" - TOOL_DATA_DIR = "TOOL_DATA_DIR" SUMMARIZE_PROMPT = "summarize_prompt" CONTEXT = "context" ERROR = "error" @@ -73,7 +72,6 @@ class SettingsKeys: EPILOGUE = "epilogue" HIGHLIGHT_DATA = "highlight_data" CONFIDENCE_DATA = "confidence_data" - EXECUTION_RUN_DATA_FOLDER = "EXECUTION_RUN_DATA_FOLDER" FILE_PATH = "file_path" EXECUTION_SOURCE = "execution_source" TOOL = "tool" diff --git a/tools/structure/src/main.py b/tools/structure/src/main.py index d7fb7dfa..d1351ed9 100644 --- a/tools/structure/src/main.py +++ b/tools/structure/src/main.py @@ -11,7 +11,6 @@ from unstract.sdk.index import Index from unstract.sdk.prompt import PromptTool from unstract.sdk.tool.base import BaseTool from unstract.sdk.tool.entrypoint import ToolEntrypoint -from unstract.sdk.utils import ToolUtils from utils import json_to_markdown logger = logging.getLogger(__name__) @@ -94,16 +93,10 @@ class StructureTool(BaseTool): _, file_name = os.path.split(input_file) if summarize_as_source: file_name = SettingsKeys.SUMMARIZE - if self.workflow_filestorage: - tool_data_dir = Path(self.get_env_or_die(ToolEnv.EXECUTION_DATA_DIR)) - execution_run_data_folder = Path( - self.get_env_or_die(ToolEnv.EXECUTION_DATA_DIR) - ) - else: - tool_data_dir = Path(self.get_env_or_die(SettingsKeys.TOOL_DATA_DIR)) - execution_run_data_folder = Path( - self.get_env_or_die(SettingsKeys.EXECUTION_RUN_DATA_FOLDER) - ) + tool_data_dir = Path(self.get_env_or_die(ToolEnv.EXECUTION_DATA_DIR)) + execution_run_data_folder = Path( + self.get_env_or_die(ToolEnv.EXECUTION_DATA_DIR) + ) index = Index( tool=self, @@ -153,11 +146,7 @@ class StructureTool(BaseTool): usage_kwargs=usage_kwargs, process_text=process_text, tags=self.tags, - **( - {"fs": self.workflow_filestorage} - if self.workflow_filestorage is not None - else {} - ), + **({"fs": self.workflow_filestorage}), ) index_metrics = {SettingsKeys.INDEXING: index.get_metrics()} if summarize_as_source: @@ -196,11 +185,7 @@ class StructureTool(BaseTool): usage_kwargs=usage_kwargs, process_text=process_text, tags=self.tags, - **( - {"fs": self.workflow_filestorage} - if self.workflow_filestorage is not None - else {} - ), + **({"fs": self.workflow_filestorage}), ) index_metrics[output[SettingsKeys.NAME]] = { SettingsKeys.INDEXING: index.get_metrics() @@ -289,13 +274,9 @@ class StructureTool(BaseTool): try: self.stream_log("Writing parsed output...") output_path = Path(output_dir) / f"{Path(self.source_file_name).stem}.json" - if self.workflow_filestorage: - self.workflow_filestorage.json_dump( - path=output_path, data=structured_output_dict - ) - else: - with open(output_path, "w", encoding="utf-8") as f: - f.write(structured_output) + self.workflow_filestorage.json_dump( + path=output_path, data=structured_output_dict + ) except OSError as e: self.stream_error_and_exit(f"Error creating output file: {e}") except json.JSONDecodeError as e: @@ -336,23 +317,13 @@ class StructureTool(BaseTool): summarize_file_path = tool_data_dir / SettingsKeys.SUMMARIZE summarized_context = "" - if self.workflow_filestorage: - if self.workflow_filestorage.exists(summarize_file_path): - summarized_context = self.workflow_filestorage.read( - path=summarize_file_path, mode="r" - ) - elif summarize_file_path.exists(): - with open(summarize_file_path, encoding="utf-8") as f: - summarized_context = f.read() + if self.workflow_filestorage.exists(summarize_file_path): + summarized_context = self.workflow_filestorage.read( + path=summarize_file_path, mode="r" + ) if not summarized_context: context = "" - if self.workflow_filestorage: - context = self.workflow_filestorage.read( - path=extract_file_path, mode="r" - ) - else: - with open(extract_file_path, encoding="utf-8") as file: - context = file.read() + context = self.workflow_filestorage.read(path=extract_file_path, mode="r") prompt_keys = [] for output in outputs: prompt_keys.append(output[SettingsKeys.NAME]) @@ -377,23 +348,14 @@ class StructureTool(BaseTool): structure_output = json.loads(response[SettingsKeys.STRUCTURE_OUTPUT]) summarized_context = structure_output.get(SettingsKeys.DATA, "") self.stream_log("Writing summarized context to a file") - if self.workflow_filestorage: - self.workflow_filestorage.write( - path=summarize_file_path, mode="w", data=summarized_context - ) - else: - with open(summarize_file_path, "w", encoding="utf-8") as f: - f.write(summarized_context) + self.workflow_filestorage.write( + path=summarize_file_path, mode="w", data=summarized_context + ) self.stream_log("Indexing summarized context") - if self.workflow_filestorage: - summarize_file_hash: str = self.workflow_filestorage.get_hash_from_file( - path=summarize_file_path - ) - else: - summarize_file_hash: str = ToolUtils.get_hash_from_file( - file_path=summarize_file_path - ) + summarize_file_hash: str = self.workflow_filestorage.get_hash_from_file( + path=summarize_file_path + ) index.index( tool_id=tool_id, embedding_instance_id=embedding_instance_id, @@ -405,11 +367,7 @@ class StructureTool(BaseTool): chunk_overlap=0, usage_kwargs=usage_kwargs, tags=self.tags, - **( - {"fs": self.workflow_filestorage} - if self.workflow_filestorage is not None - else {} - ), + **({"fs": self.workflow_filestorage}), ) return summarize_file_hash diff --git a/tools/text_extractor/README.md b/tools/text_extractor/README.md index 687fda4d..65ef0843 100644 --- a/tools/text_extractor/README.md +++ b/tools/text_extractor/README.md @@ -7,11 +7,11 @@ For example, it can convert PDF to text, image to text, etc. ## Required Environment Variables | Variable | Description | -| -------------------------- | -------------------------------------------------------------------------- | +| -------------------------- |----------------------------------------------------------------------------| | `PLATFORM_SERVICE_HOST` | The host where the platform service is running | | `PLATFORM_SERVICE_PORT` | The port where the service is listening | | `PLATFORM_SERVICE_API_KEY` | The API key for the platform | -| `TOOL_DATA_DIR` | The directory in the filesystem which contains contents for tool execution | +| `EXECUTION_DATA_DIR` | The directory in the filesystem which contains contents for tool execution | | `X2TEXT_HOST` | The host where the x2text service is running | | `X2TEXT_PORT` | The port where the x2text service is listening | @@ -42,7 +42,7 @@ source .venv/bin/activate Make a copy of the `sample.env` file and name it `.env`. Fill in the required values. They get loaded with [python-dotenv](https://pypi.org/project/python-dotenv/) through the SDK. -2. Update the tool's `data_dir` marked by the `TOOL_DATA_DIR` env. This has to be done before each tool execution since the tool updates the `INFILE` and `METADATA.json`. +2. Update the tool's `data_dir` marked by the `EXECUTION_DATA_DIR` env. This has to be done before each tool execution since the tool updates the `INFILE` and `METADATA.json`. #### Run SPEC command @@ -94,7 +94,7 @@ Build the tool docker image from the folder containing the `Dockerfile` with docker build -t unstract/tool-example:0.0.1 . ``` -Make sure the directory pointed by `TOOL_DATA_DIR` has the required information for the tool to run and +Make sure the directory pointed by `EXECUTION_DATA_DIR` has the required information for the tool to run and necessary services like the `platform-service` is up. To test the tool from its docker image, run the following command diff --git a/tools/text_extractor/sample.env b/tools/text_extractor/sample.env index f6b2fc3c..de3ab3e9 100644 --- a/tools/text_extractor/sample.env +++ b/tools/text_extractor/sample.env @@ -1,8 +1,7 @@ PLATFORM_SERVICE_HOST= PLATFORM_SERVICE_PORT= PLATFORM_SERVICE_API_KEY= -TOOL_DATA_DIR= - +EXECUTION_DATA_DIR= # X2TEXT service X2TEXT_HOST= X2TEXT_PORT= diff --git a/tools/text_extractor/src/main.py b/tools/text_extractor/src/main.py index 65f3e916..4315ab4c 100644 --- a/tools/text_extractor/src/main.py +++ b/tools/text_extractor/src/main.py @@ -63,14 +63,9 @@ class TextExtractor(BaseTool): usage_kwargs=usage_kwargs, ) self.stream_log("Text extraction adapter has been created successfully.") - if self.workflow_filestorage: - extraction_result: TextExtractionResult = text_extraction_adapter.process( - input_file_path=input_file, fs=self.workflow_filestorage, tags=self.tags - ) - else: - extraction_result: TextExtractionResult = text_extraction_adapter.process( - input_file_path=input_file, tags=self.tags - ) + extraction_result: TextExtractionResult = text_extraction_adapter.process( + input_file_path=input_file, fs=self.workflow_filestorage, tags=self.tags + ) extracted_text = self.convert_to_actual_string(extraction_result.extracted_text) self.stream_log("Text has been extracted successfully.") @@ -85,13 +80,9 @@ class TextExtractor(BaseTool): output_path = ( Path(output_dir) / f"{Path(self.source_file_name).stem}.txt" ) - if self.workflow_filestorage: - self.workflow_filestorage.write( - path=output_path, mode="w", data=extracted_text - ) - else: - with open(output_path, "w", encoding="utf-8") as file: - file.write(extracted_text) + self.workflow_filestorage.write( + path=output_path, mode="w", data=extracted_text + ) self.stream_log("Tool output written successfully.") else: diff --git a/unstract/connectors/src/unstract/connectors/filesystems/azure_cloud_storage/azure_cloud_storage.py b/unstract/connectors/src/unstract/connectors/filesystems/azure_cloud_storage/azure_cloud_storage.py index c9898343..e4cc47ab 100644 --- a/unstract/connectors/src/unstract/connectors/filesystems/azure_cloud_storage/azure_cloud_storage.py +++ b/unstract/connectors/src/unstract/connectors/filesystems/azure_cloud_storage/azure_cloud_storage.py @@ -5,13 +5,9 @@ from typing import Any import azure.core.exceptions as AzureException from adlfs import AzureBlobFileSystem -from backend.constants import FeatureFlag from unstract.connectors.exceptions import AzureHttpError, ConnectorError from unstract.connectors.filesystems.unstract_file_system import UnstractFileSystem -from unstract.flags.feature_flag import check_feature_flag_status - -if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - from unstract.filesystem import FileStorageType, FileSystem +from unstract.filesystem import FileStorageType, FileSystem logging.getLogger("azurefs").setLevel(logging.ERROR) logger = logging.getLogger(__name__) @@ -97,13 +93,9 @@ class AzureCloudStorageFS(UnstractFileSystem): normalized_path = os.path.normpath(destination_path) destination_connector_fs = self.get_fsspec_fs() try: - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) - workflow_fs = file_system.get_file_storage() - data = workflow_fs.read(path=source_path, mode="rb") - else: - with open(source_path, "rb") as source_file: - data = source_file.read() + file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) + workflow_fs = file_system.get_file_storage() + data = workflow_fs.read(path=source_path, mode="rb") destination_connector_fs.write_bytes(normalized_path, data) except AzureException.HttpResponseError as e: self.raise_http_exception(e=e, path=normalized_path) diff --git a/unstract/connectors/src/unstract/connectors/filesystems/unstract_file_system.py b/unstract/connectors/src/unstract/connectors/filesystems/unstract_file_system.py index 51542357..b2162420 100644 --- a/unstract/connectors/src/unstract/connectors/filesystems/unstract_file_system.py +++ b/unstract/connectors/src/unstract/connectors/filesystems/unstract_file_system.py @@ -5,13 +5,9 @@ from typing import Any from fsspec import AbstractFileSystem -from backend.constants import FeatureFlag from unstract.connectors.base import UnstractConnector from unstract.connectors.enums import ConnectorMode -from unstract.flags.feature_flag import check_feature_flag_status - -if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - from unstract.filesystem import FileStorageType, FileSystem +from unstract.filesystem import FileStorageType, FileSystem logger = logging.getLogger(__name__) @@ -106,11 +102,7 @@ class UnstractFileSystem(UnstractConnector, ABC): """ normalized_path = os.path.normpath(destination_path) destination_connector_fs = self.get_fsspec_fs() - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) - workflow_fs = file_system.get_file_storage() - data = workflow_fs.read(path=source_path, mode="rb") - else: - with open(source_path, "rb") as source_file: - data = source_file.read() + file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) + workflow_fs = file_system.get_file_storage() + data = workflow_fs.read(path=source_path, mode="rb") destination_connector_fs.write_bytes(normalized_path, data) diff --git a/unstract/tool-registry/src/unstract/tool_registry/constants.py b/unstract/tool-registry/src/unstract/tool_registry/constants.py index 405a4013..334fc744 100644 --- a/unstract/tool-registry/src/unstract/tool_registry/constants.py +++ b/unstract/tool-registry/src/unstract/tool_registry/constants.py @@ -1,13 +1,6 @@ from typing import Any -class FeatureFlag: - """Temporary feature flags.""" - - # For enabling remote storage feature - REMOTE_FILE_STORAGE = "remote_file_storage" - - class Tools: TOOLS_DIRECTORY = "tools" IMAGE_LATEST_TAG = "latest" diff --git a/unstract/tool-registry/src/unstract/tool_registry/tool_registry.py b/unstract/tool-registry/src/unstract/tool_registry/tool_registry.py index 4a060a3a..907fbdc1 100644 --- a/unstract/tool-registry/src/unstract/tool_registry/tool_registry.py +++ b/unstract/tool-registry/src/unstract/tool_registry/tool_registry.py @@ -1,26 +1,16 @@ -import json import logging import os from typing import Any, Optional -from unstract.tool_registry.constants import ( - FeatureFlag, - PropKey, - ToolJsonField, - ToolKey, -) +from unstract.sdk.exceptions import FileStorageError +from unstract.sdk.file_storage import EnvHelper, FileStorage, StorageType +from unstract.tool_registry.constants import PropKey, ToolJsonField, ToolKey from unstract.tool_registry.dto import Tool from unstract.tool_registry.exceptions import InvalidToolURLException from unstract.tool_registry.helper import ToolRegistryHelper from unstract.tool_registry.schema_validator import JsonSchemaValidator from unstract.tool_registry.tool_utils import ToolUtils -from unstract.flags.feature_flag import check_feature_flag_status - -if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - from unstract.sdk.exceptions import FileStorageError - from unstract.sdk.file_storage import FileStorageProvider, PermanentFileStorage - logger = logging.getLogger(__name__) @@ -58,10 +48,8 @@ class ToolRegistry: "registry JSONs and YAML to a directory and set the env." ) - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - self.fs = self._get_storage_credentials() - else: - self.fs = None + self.fs = self._get_storage_credentials() + self.helper = ToolRegistryHelper( registry=os.path.join(directory, registry_file), private_tools_file=os.path.join(directory, private_tools), @@ -69,30 +57,23 @@ class ToolRegistry: fs=self.fs, ) - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - - def _get_storage_credentials(self) -> PermanentFileStorage: - try: - # Not creating constants for now for the keywords below as this - # logic ought to change in the near future to maintain unformity - # across services - file_storage = json.loads( - os.environ.get("TOOL_REGISTRY_STORAGE_CREDENTIALS", {}) - ) - provider = FileStorageProvider(file_storage["provider"]) - credentials = file_storage.get("credentials", {}) - return PermanentFileStorage(provider, **credentials) - except KeyError as e: - logger.error(f"Required credentials is missing in the env: {str(e)}") - raise e - except FileStorageError as e: - logger.error( - "Error while initialising storage: %s", - e, - stack_info=True, - exc_info=True, - ) - raise e + def _get_storage_credentials(self) -> FileStorage: + try: + fs = EnvHelper.get_storage( + StorageType.PERMANENT, "TOOL_REGISTRY_STORAGE_CREDENTIALS" + ) + return fs + except KeyError as e: + logger.error(f"Required credentials is missing in the env: {str(e)}") + raise e + except FileStorageError as e: + logger.error( + "Error while initialising storage: %s", + e, + stack_info=True, + exc_info=True, + ) + raise e def load_all_tools_to_disk(self) -> None: self.helper.load_all_tools_to_disk() diff --git a/unstract/tool-registry/src/unstract/tool_registry/tool_utils.py b/unstract/tool-registry/src/unstract/tool_registry/tool_utils.py index f2b7f7cf..916fdb76 100644 --- a/unstract/tool-registry/src/unstract/tool_registry/tool_utils.py +++ b/unstract/tool-registry/src/unstract/tool_registry/tool_utils.py @@ -3,15 +3,12 @@ import logging import re from typing import Any, Optional -import yaml from unstract.sdk.adapters.enums import AdapterTypes from unstract.sdk.file_storage import FileStorage, FileStorageProvider -from unstract.tool_registry.constants import AdapterPropertyKey, FeatureFlag, Tools +from unstract.tool_registry.constants import AdapterPropertyKey, Tools from unstract.tool_registry.dto import AdapterProperties, Spec, Tool, ToolMeta from unstract.tool_registry.exceptions import InvalidToolURLException, RegistryNotFound -from unstract.flags.feature_flag import check_feature_flag_status - logger = logging.getLogger(__name__) @@ -63,22 +60,14 @@ class ToolUtils: data: dict[str, Any], fs: FileStorage = FileStorage(FileStorageProvider.LOCAL), ) -> None: - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - fs.json_dump(path=file_path, mode="w", encoding="utf-8", data=data) - else: - with open(file_path, "w") as json_file: - json.dump(data, json_file) + fs.json_dump(path=file_path, mode="w", encoding="utf-8", data=data) @staticmethod def get_all_tools_from_disk( file_path: str, fs: FileStorage = FileStorage(FileStorageProvider.LOCAL) ) -> dict[str, Any]: try: - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - data = fs.json_load(file_path) - else: - with open(file_path) as json_file: - data: dict[str, Any] = json.load(json_file) + data = fs.json_load(file_path) return data except json.JSONDecodeError as e: logger.warning(f"Error loading tools from {file_path}: {e}") @@ -90,11 +79,7 @@ class ToolUtils: data: dict[str, Any], fs: FileStorage = FileStorage(FileStorageProvider.LOCAL), ) -> None: - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - fs.yaml_dump(path=file_path, mode="w", encoding="utf-8", data=data) - else: - with open(file_path, "w") as file: - yaml.dump(data, file, default_flow_style=False) + fs.yaml_dump(path=file_path, mode="w", encoding="utf-8", data=data) @staticmethod def get_registry( @@ -114,16 +99,12 @@ class ToolUtils: yml_data: dict[str, Any] = {} try: logger.debug(f"Reading tool registry YAML: {file_path}") - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - yml_data = fs.yaml_load(file_path) - else: - with open(file_path) as file: - yml_data = yaml.safe_load(file) + yml_data = fs.yaml_load(file_path) + except FileNotFoundError: logger.warning(f"Could not find tool registry YAML: {str(file_path)}") if raise_exc: raise RegistryNotFound() - pass except Exception as error: logger.error(f"Error while loading {str(file_path)}: {error}") if raise_exc: diff --git a/unstract/workflow-execution/src/unstract/workflow_execution/constants.py b/unstract/workflow-execution/src/unstract/workflow_execution/constants.py index f7e309c9..04dd880f 100644 --- a/unstract/workflow-execution/src/unstract/workflow_execution/constants.py +++ b/unstract/workflow-execution/src/unstract/workflow_execution/constants.py @@ -55,9 +55,3 @@ class ToolMetadataKey: class ToolOutputType: TXT = "TXT" JSON = "JSON" - - -class FeatureFlag: - """Temporary feature flags.""" - - REMOTE_FILE_STORAGE = "remote_file_storage" diff --git a/unstract/workflow-execution/src/unstract/workflow_execution/execution_file_handler.py b/unstract/workflow-execution/src/unstract/workflow_execution/execution_file_handler.py index 2c699b94..74145d47 100644 --- a/unstract/workflow-execution/src/unstract/workflow_execution/execution_file_handler.py +++ b/unstract/workflow-execution/src/unstract/workflow_execution/execution_file_handler.py @@ -2,11 +2,9 @@ import json import logging import os from pathlib import Path -from typing import Any, Optional +from typing import Any -import fsspec from unstract.workflow_execution.constants import ( - FeatureFlag, MetaDataKey, ToolMetadataKey, ToolOutputType, @@ -16,10 +14,7 @@ from unstract.workflow_execution.constants import ( from unstract.workflow_execution.exceptions import ToolMetadataNotFound from unstract.workflow_execution.tools_utils import ToolsUtils -from unstract.flags.feature_flag import check_feature_flag_status - -if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - from unstract.filesystem import FileStorageType, FileSystem +from unstract.filesystem import FileStorageType, FileSystem logger = logging.getLogger(__name__) @@ -31,14 +26,9 @@ class ExecutionFileHandler: self.organization_id = organization_id self.workflow_id = workflow_id self.execution_id = execution_id - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - self.execution_dir = self.get_execution_dir( - workflow_id, execution_id, organization_id - ) - else: - self.execution_dir = self.create_execution_dir_path( - workflow_id, execution_id, organization_id - ) + self.execution_dir = self.get_execution_dir( + workflow_id, execution_id, organization_id + ) self.source_file = os.path.join(self.execution_dir, WorkflowFileType.SOURCE) self.infile = os.path.join(self.execution_dir, WorkflowFileType.INFILE) self.metadata_file = os.path.join( @@ -51,14 +41,10 @@ class ExecutionFileHandler: Returns: dict[str, Any]: Workflow metadata. """ - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) - file_storage = file_system.get_file_storage() - metadata_content = file_storage.read(path=self.metadata_file, mode="r") - metadata = json.loads(metadata_content) - else: - with open(self.metadata_file) as file: - metadata: dict[str, Any] = json.load(file) + file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) + file_storage = file_system.get_file_storage() + metadata_content = file_storage.read(path=self.metadata_file, mode="r") + metadata = json.loads(metadata_content) return metadata def get_list_of_tool_metadata( @@ -138,47 +124,14 @@ class ExecutionFileHandler: MetaDataKey.FILE_EXECUTION_ID: str(file_execution_id), MetaDataKey.TAGS: tags, } - if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): - file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) - file_storage = file_system.get_file_storage() - file_storage.json_dump(path=metadata_path, data=content) - else: - with fsspec.open(f"file://{metadata_path}", "w") as local_file: - json.dump(content, local_file) + file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) + file_storage = file_system.get_file_storage() + file_storage.json_dump(path=metadata_path, data=content) logger.info( f"metadata for {input_file_path} is " "added in to execution directory" ) - @classmethod - def create_execution_dir_path( - cls, - workflow_id: str, - execution_id: str, - organization_id: str, - data_volume: Optional[str] = None, - ) -> str: - """Create the directory path for storing execution-related files. - - Parameters: - - workflow_id (str): Identifier for the workflow. - - execution_id (str): Identifier for the execution. - - organization_id (Optional[str]): - Identifier for the organization (default: None). - - Returns: - str: The directory path for the execution. - """ - workflow_data_dir = os.getenv("WORKFLOW_DATA_DIR") - data_volume = data_volume if data_volume else workflow_data_dir - if not data_volume: - raise ValueError("Missed data_volume") - execution_dir = Path( - data_volume, organization_id, str(workflow_id), str(execution_id) - ) - execution_dir.mkdir(parents=True, exist_ok=True) - return str(execution_dir) - @classmethod def get_execution_dir( cls, workflow_id: str, execution_id: str, organization_id: str