[FEATURE] Remote storage flag removal for remote storage (#1101)

* Remove feature flag condition

* Remove feature flag condition

* mode=r missed in read

* Missed out changes in merge

* Missed out changes in merge

* Remove unwanted env

* Changes to rename platform service env var for costing

* Precommit failure fix

* Fix unit tests

* Fix unit tests

* Fix docker test cases

* Change mime read length to use default

* Support remote storage for OSS users (#1136)

* OSS support with remote storage

* Add data sync

* Remove unwanted dependencies
This commit is contained in:
Gayathri
2025-02-19 11:47:28 +05:30
committed by GitHub
parent 44d63d8e02
commit 13bb36301c
43 changed files with 451 additions and 1067 deletions

View File

@@ -34,4 +34,3 @@ class FeatureFlag:
"""Temporary feature flags."""
APP_DEPLOYMENT = "app_deployment"
REMOTE_FILE_STORAGE = "remote_file_storage"

View File

@@ -117,8 +117,6 @@ X2TEXT_PORT = os.environ.get("X2TEXT_PORT", 3004)
STRUCTURE_TOOL_IMAGE_URL = get_required_setting("STRUCTURE_TOOL_IMAGE_URL")
STRUCTURE_TOOL_IMAGE_NAME = get_required_setting("STRUCTURE_TOOL_IMAGE_NAME")
STRUCTURE_TOOL_IMAGE_TAG = get_required_setting("STRUCTURE_TOOL_IMAGE_TAG")
WORKFLOW_DATA_DIR = os.environ.get("WORKFLOW_DATA_DIR")
API_STORAGE_DIR = os.environ.get("API_STORAGE_DIR")
CACHE_TTL_SEC = os.environ.get("CACHE_TTL_SEC", 10800)
DEFAULT_AUTH_USERNAME = os.environ.get("DEFAULT_AUTH_USERNAME", "unstract")

View File

@@ -1,5 +1,4 @@
import logging
import shutil
import uuid
from typing import Any
@@ -7,7 +6,6 @@ from account_v2.models import User
from adapter_processor_v2.models import AdapterInstance
from django.db import models
from django.db.models import QuerySet
from file_management.file_management_helper import FileManagerHelper
from prompt_studio.prompt_studio_core_v2.constants import DefaultPrompts
from unstract.sdk.file_storage.constants import StorageType
from unstract.sdk.file_storage.env_helper import EnvHelper
@@ -19,9 +17,6 @@ from utils.models.organization_mixin import (
DefaultOrganizationMixin,
)
from backend.constants import FeatureFlag
from unstract.flags.feature_flag import check_feature_flag_status
logger = logging.getLogger(__name__)
@@ -140,38 +135,22 @@ class CustomTool(DefaultOrganizationMixin, BaseModel):
def delete(self, organization_id=None, *args, **kwargs):
# Delete the documents associated with the tool
if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
file_path = FileManagerHelper.handle_sub_directory_for_tenants(
organization_id,
is_create=False,
user_id=self.created_by.user_id,
tool_id=str(self.tool_id),
)
if organization_id:
try:
shutil.rmtree(file_path)
except FileNotFoundError:
logger.error(f"The folder {file_path} does not exist.")
except OSError as e:
logger.error(f"Error: {file_path} : {e.strerror}")
# Continue with the deletion of the tool
else:
fs_instance = EnvHelper.get_storage(
storage_type=StorageType.PERMANENT,
env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE,
)
file_path = PromptStudioFileHelper.get_or_create_prompt_studio_subdirectory(
organization_id,
is_create=False,
user_id=self.created_by.user_id,
tool_id=str(self.tool_id),
)
try:
fs_instance.rm(file_path, True)
except FileNotFoundError:
# Supressed to handle cases when the remote
# file is missing or already deleted
pass
fs_instance = EnvHelper.get_storage(
storage_type=StorageType.PERMANENT,
env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE,
)
file_path = PromptStudioFileHelper.get_or_create_prompt_studio_subdirectory(
organization_id,
is_create=False,
user_id=self.created_by.user_id,
tool_id=str(self.tool_id),
)
try:
fs_instance.rm(file_path, True)
except FileNotFoundError:
# Supressed to handle cases when the remote
# file is missing or already deleted
pass
super().delete(*args, **kwargs)
class Meta:

View File

@@ -3,15 +3,10 @@ import os
from platform_settings_v2.platform_auth_service import PlatformAuthenticationService
from prompt_studio.prompt_studio_core_v2.constants import ToolStudioKeys
from unstract.sdk.constants import LogLevel
from unstract.sdk.file_storage.constants import StorageType
from unstract.sdk.file_storage.env_helper import EnvHelper
from unstract.sdk.tool.stream import StreamMixin
from backend.constants import FeatureFlag
from unstract.flags.feature_flag import check_feature_flag_status
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
from unstract.sdk.file_storage.constants import StorageType
from unstract.sdk.file_storage.env_helper import EnvHelper
from utils.file_storage.constants import FileStorageKeys
from utils.file_storage.constants import FileStorageKeys
class PromptIdeBaseTool(StreamMixin):
@@ -24,12 +19,10 @@ class PromptIdeBaseTool(StreamMixin):
"""
self.log_level = log_level
self.org_id = org_id
self.workflow_filestorage = None
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
self.workflow_filestorage = EnvHelper.get_storage(
storage_type=StorageType.PERMANENT,
env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE,
)
self.workflow_filestorage = EnvHelper.get_storage(
storage_type=StorageType.PERMANENT,
env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE,
)
super().__init__(log_level=log_level)

View File

@@ -12,7 +12,6 @@ from adapter_processor_v2.constants import AdapterKeys
from adapter_processor_v2.models import AdapterInstance
from django.conf import settings
from django.db.models.manager import BaseManager
from file_management.file_management_helper import FileManagerHelper
from prompt_studio.modifier_loader import ModifierConfig
from prompt_studio.modifier_loader import load_plugins as load_modifier_plugins
from prompt_studio.prompt_profile_manager_v2.models import ProfileManager
@@ -60,14 +59,11 @@ from unstract.sdk.file_storage.constants import StorageType
from unstract.sdk.file_storage.env_helper import EnvHelper
from unstract.sdk.index import Index
from unstract.sdk.prompt import PromptTool
from unstract.sdk.utils.tool_utils import ToolUtils
from utils.file_storage.constants import FileStorageKeys
from utils.file_storage.helpers.prompt_studio_file_helper import PromptStudioFileHelper
from utils.local_context import StateStore
from backend.constants import FeatureFlag
from unstract.core.pubsub_helper import LogPublisher
from unstract.flags.feature_flag import check_feature_flag_status
CHOICES_JSON = "/static/select_choices.json"
ERROR_MSG = "User %s doesn't have access to adapter %s"
@@ -343,22 +339,12 @@ class PromptStudioHelper:
file_path = file_name
else:
default_profile = ProfileManager.get_default_llm_profile(tool)
if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
file_path = FileManagerHelper.handle_sub_directory_for_tenants(
org_id,
is_create=False,
user_id=user_id,
tool_id=tool_id,
)
else:
file_path = (
PromptStudioFileHelper.get_or_create_prompt_studio_subdirectory(
org_id,
is_create=False,
user_id=user_id,
tool_id=tool_id,
)
)
file_path = PromptStudioFileHelper.get_or_create_prompt_studio_subdirectory(
org_id,
is_create=False,
user_id=user_id,
tool_id=tool_id,
)
file_path = str(Path(file_path) / file_name)
if not tool:
@@ -382,37 +368,24 @@ class PromptStudioHelper:
process_text = None
if text_processor:
process_text = text_processor.process
if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
doc_id = PromptStudioHelper.dynamic_indexer(
profile_manager=default_profile,
tool_id=tool_id,
file_path=file_path,
org_id=org_id,
document_id=document_id,
is_summary=is_summary,
reindex=True,
run_id=run_id,
user_id=user_id,
process_text=process_text,
)
else:
fs_instance = EnvHelper.get_storage(
storage_type=StorageType.PERMANENT,
env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE,
)
doc_id = PromptStudioHelper.dynamic_indexer(
profile_manager=default_profile,
tool_id=tool_id,
file_path=file_path,
org_id=org_id,
document_id=document_id,
is_summary=is_summary,
reindex=True,
run_id=run_id,
user_id=user_id,
process_text=process_text,
fs=fs_instance,
)
fs_instance = EnvHelper.get_storage(
storage_type=StorageType.PERMANENT,
env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE,
)
doc_id = PromptStudioHelper.dynamic_indexer(
profile_manager=default_profile,
tool_id=tool_id,
file_path=file_path,
org_id=org_id,
document_id=document_id,
is_summary=is_summary,
reindex=True,
run_id=run_id,
user_id=user_id,
process_text=process_text,
fs=fs_instance,
)
elapsed_time = time.time() - start_time
logger.info(
@@ -655,40 +628,24 @@ class PromptStudioHelper:
@staticmethod
def _get_document_path(org_id, user_id, tool_id, doc_name):
if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
doc_path = FileManagerHelper.handle_sub_directory_for_tenants(
org_id=org_id,
user_id=user_id,
tool_id=tool_id,
is_create=False,
)
else:
doc_path = PromptStudioFileHelper.get_or_create_prompt_studio_subdirectory(
org_id=org_id,
user_id=user_id,
tool_id=tool_id,
is_create=False,
)
doc_path = PromptStudioFileHelper.get_or_create_prompt_studio_subdirectory(
org_id=org_id,
user_id=user_id,
tool_id=tool_id,
is_create=False,
)
return str(Path(doc_path) / doc_name)
@staticmethod
def _get_extract_or_summary_document_path(
org_id, user_id, tool_id, doc_name, doc_type
) -> str:
if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
doc_path = FileManagerHelper.handle_sub_directory_for_tenants(
org_id=org_id,
user_id=user_id,
tool_id=tool_id,
is_create=False,
)
else:
doc_path = PromptStudioFileHelper.get_or_create_prompt_studio_subdirectory(
org_id=org_id,
user_id=user_id,
tool_id=tool_id,
is_create=False,
)
doc_path = PromptStudioFileHelper.get_or_create_prompt_studio_subdirectory(
org_id=org_id,
user_id=user_id,
tool_id=tool_id,
is_create=False,
)
extracted_doc_name = Path(doc_name).stem + TSPKeys.TXT_EXTENTION
return str(Path(doc_path) / doc_type / extracted_doc_name)
@@ -789,35 +746,22 @@ class PromptStudioHelper:
x2text = str(profile_manager.x2text.id)
if not profile_manager:
raise DefaultProfileError()
if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
index_result = PromptStudioHelper.dynamic_indexer(
profile_manager=profile_manager,
file_path=doc_path,
tool_id=str(tool.tool_id),
org_id=org_id,
document_id=document_id,
is_summary=tool.summarize_as_source,
run_id=run_id,
user_id=user_id,
process_text=process_text,
)
else:
fs_instance = EnvHelper.get_storage(
storage_type=StorageType.PERMANENT,
env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE,
)
index_result = PromptStudioHelper.dynamic_indexer(
profile_manager=profile_manager,
file_path=doc_path,
tool_id=str(tool.tool_id),
org_id=org_id,
document_id=document_id,
is_summary=tool.summarize_as_source,
run_id=run_id,
user_id=user_id,
process_text=process_text,
fs=fs_instance,
)
fs_instance = EnvHelper.get_storage(
storage_type=StorageType.PERMANENT,
env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE,
)
index_result = PromptStudioHelper.dynamic_indexer(
profile_manager=profile_manager,
file_path=doc_path,
tool_id=str(tool.tool_id),
org_id=org_id,
document_id=document_id,
is_summary=tool.summarize_as_source,
run_id=run_id,
user_id=user_id,
process_text=process_text,
fs=fs_instance,
)
if index_result.get("status") == IndexingStatus.PENDING_STATUS.value:
return {
"status": IndexingStatus.PENDING_STATUS.value,
@@ -888,10 +832,7 @@ class PromptStudioHelper:
tool_settings[TSPKeys.PLATFORM_POSTAMBLE] = getattr(
settings, TSPKeys.PLATFORM_POSTAMBLE.upper(), ""
)
if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
file_hash = ToolUtils.get_hash_from_file(file_path=doc_path)
else:
file_hash = fs_instance.get_hash_from_file(path=doc_path)
file_hash = fs_instance.get_hash_from_file(path=doc_path)
payload = {
TSPKeys.TOOL_SETTINGS: tool_settings,
@@ -1012,27 +953,16 @@ class PromptStudioHelper:
usage_kwargs["file_name"] = filename
util = PromptIdeBaseTool(log_level=LogLevel.INFO, org_id=org_id)
tool_index = Index(tool=util)
if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
doc_id_key = tool_index.generate_index_key(
vector_db=vector_db,
embedding=embedding_model,
x2text=x2text_adapter,
chunk_size=str(profile_manager.chunk_size),
chunk_overlap=str(profile_manager.chunk_overlap),
file_path=file_path,
file_hash=None,
)
else:
doc_id_key = tool_index.generate_index_key(
vector_db=vector_db,
embedding=embedding_model,
x2text=x2text_adapter,
chunk_size=str(profile_manager.chunk_size),
chunk_overlap=str(profile_manager.chunk_overlap),
file_path=file_path,
file_hash=None,
fs=fs,
)
doc_id_key = tool_index.generate_index_key(
vector_db=vector_db,
embedding=embedding_model,
x2text=x2text_adapter,
chunk_size=str(profile_manager.chunk_size),
chunk_overlap=str(profile_manager.chunk_overlap),
file_path=file_path,
file_hash=None,
fs=fs,
)
if not reindex:
indexed_doc_id = DocumentIndexingService.get_indexed_document_id(
org_id=org_id, user_id=user_id, doc_id_key=doc_id_key
@@ -1055,35 +985,20 @@ class PromptStudioHelper:
DocumentIndexingService.set_document_indexing(
org_id=org_id, user_id=user_id, doc_id_key=doc_id_key
)
if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
doc_id: str = tool_index.index(
tool_id=tool_id,
embedding_instance_id=embedding_model,
vector_db_instance_id=vector_db,
x2text_instance_id=x2text_adapter,
file_path=file_path,
chunk_size=profile_manager.chunk_size,
chunk_overlap=profile_manager.chunk_overlap,
reindex=reindex,
output_file_path=extract_file_path,
usage_kwargs=usage_kwargs.copy(),
process_text=process_text,
)
else:
doc_id: str = tool_index.index(
tool_id=tool_id,
embedding_instance_id=embedding_model,
vector_db_instance_id=vector_db,
x2text_instance_id=x2text_adapter,
file_path=file_path,
chunk_size=profile_manager.chunk_size,
chunk_overlap=profile_manager.chunk_overlap,
reindex=reindex,
output_file_path=extract_file_path,
usage_kwargs=usage_kwargs.copy(),
process_text=process_text,
fs=fs,
)
doc_id: str = tool_index.index(
tool_id=tool_id,
embedding_instance_id=embedding_model,
vector_db_instance_id=vector_db,
x2text_instance_id=x2text_adapter,
file_path=file_path,
chunk_size=profile_manager.chunk_size,
chunk_overlap=profile_manager.chunk_overlap,
reindex=reindex,
output_file_path=extract_file_path,
usage_kwargs=usage_kwargs.copy(),
process_text=process_text,
fs=fs,
)
PromptStudioIndexHelper.handle_index_manager(
document_id=document_id,
@@ -1145,35 +1060,22 @@ class PromptStudioHelper:
if not default_profile:
raise DefaultProfileError()
if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
index_result = PromptStudioHelper.dynamic_indexer(
profile_manager=default_profile,
file_path=file_path,
tool_id=tool_id,
org_id=org_id,
is_summary=tool.summarize_as_source,
document_id=document_id,
run_id=run_id,
user_id=user_id,
process_text=process_text,
)
else:
fs_instance = EnvHelper.get_storage(
storage_type=StorageType.PERMANENT,
env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE,
)
index_result = PromptStudioHelper.dynamic_indexer(
profile_manager=default_profile,
file_path=file_path,
tool_id=tool_id,
org_id=org_id,
is_summary=tool.summarize_as_source,
document_id=document_id,
run_id=run_id,
user_id=user_id,
process_text=process_text,
fs=fs_instance,
)
fs_instance = EnvHelper.get_storage(
storage_type=StorageType.PERMANENT,
env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE,
)
index_result = PromptStudioHelper.dynamic_indexer(
profile_manager=default_profile,
file_path=file_path,
tool_id=tool_id,
org_id=org_id,
is_summary=tool.summarize_as_source,
document_id=document_id,
run_id=run_id,
user_id=user_id,
process_text=process_text,
fs=fs_instance,
)
if index_result.get("status") == IndexingStatus.PENDING_STATUS.value:
return {
"status": IndexingStatus.PENDING_STATUS.value,
@@ -1214,10 +1116,7 @@ class PromptStudioHelper:
if tool.summarize_as_source:
path = Path(file_path)
file_path = str(path.parent / TSPKeys.SUMMARIZE / (path.stem + ".txt"))
if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
file_hash = ToolUtils.get_hash_from_file(file_path=file_path)
else:
file_hash = fs_instance.get_hash_from_file(path=file_path)
file_hash = fs_instance.get_hash_from_file(path=file_path)
payload = {
TSPKeys.TOOL_SETTINGS: tool_settings,

View File

@@ -8,7 +8,6 @@ from django.db.models import QuerySet
from django.http import HttpRequest
from file_management.constants import FileInformationKey as FileKey
from file_management.exceptions import FileNotFound
from file_management.file_management_helper import FileManagerHelper
from permissions.permission import IsOwner, IsOwnerOrSharedUser
from prompt_studio.processor_loader import get_plugin_class_by_name, load_plugins
from prompt_studio.prompt_profile_manager_v2.constants import (
@@ -57,10 +56,6 @@ from unstract.sdk.utils.common_utils import CommonUtils
from utils.file_storage.helpers.prompt_studio_file_helper import PromptStudioFileHelper
from utils.user_session import UserSessionUtils
from backend.constants import FeatureFlag
from unstract.connectors.filesystems.local_storage.local_storage import LocalStorageFS
from unstract.flags.feature_flag import check_feature_flag_status
from .models import CustomTool
from .serializers import (
CustomToolSerializer,
@@ -423,49 +418,16 @@ class PromptStudioCoreView(viewsets.ModelViewSet):
f"{FileViewTypes.SUMMARIZE.lower()}/"
f"{filename_without_extension}.txt"
)
if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
file_path = file_path = FileManagerHelper.handle_sub_directory_for_tenants(
UserSessionUtils.get_organization_id(request),
is_create=True,
try:
contents = PromptStudioFileHelper.fetch_file_contents(
file_name=file_name,
org_id=UserSessionUtils.get_organization_id(request),
user_id=custom_tool.created_by.user_id,
tool_id=str(custom_tool.tool_id),
allowed_content_types=allowed_content_types,
)
file_system = LocalStorageFS(settings={"path": file_path})
if not file_path.endswith("/"):
file_path += "/"
file_path += file_name
# Temporary Hack for frictionless onboarding as the user id will be empty
try:
contents = FileManagerHelper.fetch_file_contents(
file_system, file_path, allowed_content_types
)
except FileNotFound:
file_path = file_path = (
FileManagerHelper.handle_sub_directory_for_tenants(
UserSessionUtils.get_organization_id(request),
is_create=True,
user_id="",
tool_id=str(custom_tool.tool_id),
)
)
if not file_path.endswith("/"):
file_path += "/"
file_path += file_name
contents = FileManagerHelper.fetch_file_contents(
file_system, file_path, allowed_content_types
)
else:
try:
contents = PromptStudioFileHelper.fetch_file_contents(
file_name=file_name,
org_id=UserSessionUtils.get_organization_id(request),
user_id=custom_tool.created_by.user_id,
tool_id=str(custom_tool.tool_id),
)
except FileNotFoundError:
raise FileNotFound()
except FileNotFoundError:
raise FileNotFound()
return Response({"data": contents}, status=status.HTTP_200_OK)
@action(detail=True, methods=["post"])
@@ -494,28 +456,14 @@ class PromptStudioCoreView(viewsets.ModelViewSet):
logger.info(
f"Uploading file: {file_name}" if file_name else "Uploading file"
)
if not check_feature_flag_status(flag_key=FeatureFlag.REMOTE_FILE_STORAGE):
file_path = FileManagerHelper.handle_sub_directory_for_tenants(
UserSessionUtils.get_organization_id(request),
is_create=True,
user_id=custom_tool.created_by.user_id,
tool_id=str(custom_tool.tool_id),
)
file_system = LocalStorageFS(settings={"path": file_path})
FileManagerHelper.upload_file(
file_system,
file_path,
file_data,
file_name,
)
else:
PromptStudioFileHelper.upload_for_ide(
org_id=UserSessionUtils.get_organization_id(request),
user_id=custom_tool.created_by.user_id,
tool_id=str(custom_tool.tool_id),
file_name=file_name,
file_data=file_data,
)
PromptStudioFileHelper.upload_for_ide(
org_id=UserSessionUtils.get_organization_id(request),
user_id=custom_tool.created_by.user_id,
tool_id=str(custom_tool.tool_id),
file_name=file_name,
file_data=file_data,
)
# Create a record in the db for the file
document = PromptStudioDocumentHelper.create(
@@ -554,28 +502,12 @@ class PromptStudioCoreView(viewsets.ModelViewSet):
document.delete()
# Delete the files
file_name: str = document.document_name
if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
file_path = FileManagerHelper.handle_sub_directory_for_tenants(
org_id=org_id,
is_create=False,
user_id=user_id,
tool_id=str(custom_tool.tool_id),
)
path = file_path
file_system = LocalStorageFS(settings={"path": path})
FileManagerHelper.delete_file(file_system, path, file_name)
# Directories to delete the text files
directories = ["extract/", "extract/metadata/", "summarize/"]
FileManagerHelper.delete_related_files(
file_system, path, file_name, directories
)
else:
PromptStudioFileHelper.delete_for_ide(
org_id=org_id,
user_id=user_id,
tool_id=str(custom_tool.tool_id),
file_name=file_name,
)
PromptStudioFileHelper.delete_for_ide(
org_id=org_id,
user_id=user_id,
tool_id=str(custom_tool.tool_id),
file_name=file_name,
)
return Response(
{"data": "File deleted succesfully."},
status=status.HTTP_200_OK,

View File

@@ -58,9 +58,6 @@ GOOGLE_STORAGE_ACCESS_KEY_ID=
GOOGLE_STORAGE_SECRET_ACCESS_KEY=
GOOGLE_STORAGE_BASE_URL=https://storage.googleapis.com
# API storage
API_STORAGE_DIR = "/data/api"
# Platform Service
PLATFORM_SERVICE_HOST=http://unstract-platform-service
PLATFORM_SERVICE_PORT=3001
@@ -69,16 +66,12 @@ PLATFORM_SERVICE_PORT=3001
UNSTRACT_RUNNER_HOST=http://unstract-runner
UNSTRACT_RUNNER_PORT=5002
# Workflow execution
WORKFLOW_DATA_DIR = "/data/execution"
# Prompt Service
PROMPT_HOST=http://unstract-prompt-service
PROMPT_PORT=3003
#Prompt Studio
PROMPT_STUDIO_FILE_PATH=/app/prompt-studio-data
REMOTE_PROMPT_STUDIO_FILE_PATH=
# Structure Tool Image (Runs prompt studio exported tools)
# https://hub.docker.com/r/unstract/tool-structure
@@ -156,14 +149,14 @@ API_EXECUTION_DIR_PREFIX="unstract/api"
# Storage Provider for Workflow Execution
# Valid options: MINIO, S3, etc..
WORKFLOW_EXECUTION_FILE_STORAGE_CREDENTIALS='{"provider":"minio","credentials": {"endpoint_url":"http://unstract-minio:9000","key":"XXX","secret":"XXX"}}'
WORKFLOW_EXECUTION_FILE_STORAGE_CREDENTIALS='{"provider": "minio", "credentials": {"endpoint_url": "http://unstract-minio:9000", "key": "minio", "secret": "minio123"}}'
# Storage Provider for API Execution
API_FILE_STORAGE_CREDENTIALS='{"provider": "minio", "credentials": {"endpoint_url": "http://unstract-minio:9000", "key": "XXX", "secret": "XXX"}}'
API_FILE_STORAGE_CREDENTIALS='{"provider": "minio", "credentials": {"endpoint_url": "http://unstract-minio:9000", "key": "minio", "secret": "minio123"}}'
#Remote storage related envs
PERMANENT_REMOTE_STORAGE={"provider":"gcs","credentials":<credentials>}
REMOTE_PROMPT_STUDIO_FILE_PATH="<bucket_name>/prompt_studio_data/"
PERMANENT_REMOTE_STORAGE='{"provider": "minio", "credentials": {"endpoint_url": "http://unstract-minio:9000", "key": "minio", "secret": "minio123"}}'
REMOTE_PROMPT_STUDIO_FILE_PATH="unstract/prompt-studio-data"
# Storage Provider for Tool registry
TOOL_REGISTRY_STORAGE_CREDENTIALS='{"provider":"local"}'

View File

@@ -4,6 +4,7 @@ import os
from pathlib import Path
from typing import Any
from file_management.exceptions import InvalidFileType
from file_management.file_management_helper import FileManagerHelper
from unstract.sdk.file_storage import FileStorage
from unstract.sdk.file_storage.constants import StorageType
@@ -82,7 +83,11 @@ class PromptStudioFileHelper:
@staticmethod
def fetch_file_contents(
org_id: str, user_id: str, tool_id: str, file_name: str
org_id: str,
user_id: str,
tool_id: str,
file_name: str,
allowed_content_types: list[str],
) -> dict[str, Any]:
"""Method to fetch file contents from the remote location.
The path is constructed in runtime based on the args"""
@@ -110,7 +115,7 @@ class PromptStudioFileHelper:
)
# TODO : Handle this with proper fix
# Temporary Hack for frictionless onboarding as the user id will be empty
if not fs_instance.exists(file_system_path):
if not user_id and not fs_instance.exists(file_system_path):
file_system_path = (
PromptStudioFileHelper.get_or_create_prompt_studio_subdirectory(
org_id=org_id,
@@ -121,7 +126,9 @@ class PromptStudioFileHelper:
)
file_path = str(Path(file_system_path) / file_name)
legacy_file_path = str(Path(legacy_file_system_path) / file_name)
file_content_type = fs_instance.mime_type(file_path)
file_content_type = fs_instance.mime_type(
path=file_path, legacy_storage_path=legacy_file_path
)
if file_content_type == "application/pdf":
# Read contents of PDF file into a string
text_content_bytes: bytes = fs_instance.read(
@@ -140,9 +147,14 @@ class PromptStudioFileHelper:
legacy_storage_path=legacy_file_path,
encoding="utf-8",
)
return {"data": text_content_string, "mime_type": file_content_type}
# Check if the file type is in the allowed list
elif file_content_type not in allowed_content_types:
raise InvalidFileType(f"File type '{file_content_type}' is not allowed.")
else:
raise ValueError(f"Unsupported file type: {file_content_type}")
logger.warning(f"File type '{file_content_type}' is not handled.")
return {"data": text_content_string, "mime_type": file_content_type}
@staticmethod
def delete_for_ide(org_id: str, user_id: str, tool_id: str, file_name: str) -> bool:

View File

@@ -1,16 +1,13 @@
import json
from typing import Any
from django.conf import settings
from fsspec import AbstractFileSystem
from unstract.workflow_execution.execution_file_handler import ExecutionFileHandler
from utils.constants import Common
from utils.user_context import UserContext
from backend.constants import FeatureFlag
from unstract.connectors.filesystems import connectors
from unstract.connectors.filesystems.unstract_file_system import UnstractFileSystem
from unstract.flags.feature_flag import check_feature_flag_status
class BaseConnector(ExecutionFileHandler):
@@ -25,13 +22,6 @@ class BaseConnector(ExecutionFileHandler):
utilities.
"""
super().__init__(workflow_id, execution_id, organization_id)
if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
if not (settings.API_STORAGE_DIR and settings.WORKFLOW_DATA_DIR):
raise ValueError("Missed env API_STORAGE_DIR or WORKFLOW_DATA_DIR")
# Directory path for storing execution-related files for API
self.api_storage_dir: str = self.create_execution_dir_path(
workflow_id, execution_id, organization_id, settings.API_STORAGE_DIR
)
def get_fsspec(
self, settings: dict[str, Any], connector_id: str
@@ -103,12 +93,7 @@ class BaseConnector(ExecutionFileHandler):
str: The directory path for the execution.
"""
organization_id = UserContext.get_organization_identifier()
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
api_storage_dir: str = cls.get_api_execution_dir(
workflow_id, execution_id, organization_id
)
else:
api_storage_dir: str = cls.create_execution_dir_path(
workflow_id, execution_id, organization_id, settings.API_STORAGE_DIR
)
api_storage_dir: str = cls.get_api_execution_dir(
workflow_id, execution_id, organization_id
)
return api_storage_dir

View File

@@ -5,13 +5,9 @@ import logging
import os
from typing import Any, Optional, Union
import fsspec
import magic
from connector_v2.models import ConnectorInstance
from fsspec.implementations.local import LocalFileSystem
from rest_framework.exceptions import APIException
from unstract.sdk.constants import ToolExecKey
from unstract.sdk.file_storage.constants import FileOperationParams
from unstract.sdk.tool.mime_types import EXT_MIME_MAP
from unstract.workflow_execution.constants import ToolOutputType
from utils.user_context import UserContext
@@ -40,13 +36,9 @@ from workflow_manager.workflow_v2.models.file_history import FileHistory
from workflow_manager.workflow_v2.models.workflow import Workflow
from workflow_manager.workflow_v2.utils import WorkflowUtil
from backend.constants import FeatureFlag
from backend.exceptions import UnstractFSException
from unstract.connectors.exceptions import ConnectorError
from unstract.flags.feature_flag import check_feature_flag_status
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
from unstract.filesystem import FileStorageType, FileSystem
from unstract.filesystem import FileStorageType, FileSystem
logger = logging.getLogger(__name__)
@@ -240,12 +232,10 @@ class DestinationConnector(BaseConnector):
destination_fs.create_dir_if_not_exists(input_dir=output_directory)
# Traverse local directory and create the same structure in the
# output_directory
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
fs = file_system.get_file_storage()
dir_path = fs.walk(str(destination_volume_path))
else:
dir_path = os.walk(destination_volume_path)
file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
fs = file_system.get_file_storage()
dir_path = fs.walk(str(destination_volume_path))
for root, dirs, files in dir_path:
for dir_name in dirs:
current_dir = os.path.join(
@@ -417,40 +407,7 @@ class DestinationConnector(BaseConnector):
Returns:
Union[dict[str, Any], str]: Result data.
"""
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
return self.get_result_with_file_storage(file_history=file_history)
if file_history and file_history.result:
return self.parse_string(file_history.result)
output_file = os.path.join(self.execution_dir, WorkflowFileType.INFILE)
metadata: dict[str, Any] = self.get_workflow_metadata()
output_type = self.get_output_type(metadata)
result: Union[dict[str, Any], str] = ""
try:
# TODO: SDK handles validation; consider removing here.
with open(output_file, "rb") as f:
file_type = magic.from_buffer(f.read(), mime=True)
if output_type == ToolOutputType.JSON:
if file_type != EXT_MIME_MAP[ToolOutputType.JSON.lower()]:
msg = f"Expected tool output type: JSON, got: '{file_type}'"
logger.error(msg)
raise ToolOutputTypeMismatch(detail=msg)
with open(output_file) as file:
result = json.load(file)
elif output_type == ToolOutputType.TXT:
if file_type == EXT_MIME_MAP[ToolOutputType.JSON.lower()]:
msg = f"Expected tool output type: TXT, got: '{file_type}'"
logger.error(msg)
raise ToolOutputTypeMismatch(detail=msg)
with open(output_file) as file:
result = file.read()
result = result.encode("utf-8").decode("unicode-escape")
else:
raise InvalidToolOutputType()
except (FileNotFoundError, json.JSONDecodeError) as err:
msg = f"Error while getting result from the tool: {err}"
logger.error(msg)
raise APIException(detail=msg)
return result
return self.get_result_with_file_storage(file_history=file_history)
def get_result_with_file_storage(
self, file_history: Optional[FileHistory] = None
@@ -470,9 +427,7 @@ class DestinationConnector(BaseConnector):
file_storage = file_system.get_file_storage()
try:
# TODO: SDK handles validation; consider removing here.
file_type = file_storage.mime_type(
path=output_file, read_length=FileOperationParams.READ_ENTIRE_LENGTH
)
file_type = file_storage.mime_type(path=output_file)
if output_type == ToolOutputType.JSON:
if file_type != EXT_MIME_MAP[ToolOutputType.JSON.lower()]:
msg = f"Expected tool output type: JSON, got: '{file_type}'"
@@ -516,14 +471,10 @@ class DestinationConnector(BaseConnector):
Returns:
None
"""
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
file_storage = file_system.get_file_storage()
if file_storage.exists(self.execution_dir):
file_storage.rm(self.execution_dir, recursive=True)
else:
fs: LocalFileSystem = fsspec.filesystem("file")
fs.rm(self.execution_dir, recursive=True)
file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
file_storage = file_system.get_file_storage()
if file_storage.exists(self.execution_dir):
file_storage.rm(self.execution_dir, recursive=True)
self.delete_api_storage_dir(self.workflow_id, self.execution_id)
@classmethod
@@ -536,14 +487,10 @@ class DestinationConnector(BaseConnector):
api_storage_dir = cls.get_api_storage_dir_path(
workflow_id=workflow_id, execution_id=execution_id
)
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
file_system = FileSystem(FileStorageType.API_EXECUTION)
file_storage = file_system.get_file_storage()
if file_storage.exists(api_storage_dir):
file_storage.rm(api_storage_dir, recursive=True)
else:
fs: LocalFileSystem = fsspec.filesystem("file")
fs.rm(api_storage_dir, recursive=True)
file_system = FileSystem(FileStorageType.API_EXECUTION)
file_storage = file_system.get_file_storage()
if file_storage.exists(api_storage_dir):
file_storage.rm(api_storage_dir, recursive=True)
@classmethod
def create_endpoint_for_workflow(

View File

@@ -39,11 +39,7 @@ from workflow_manager.workflow_v2.execution import WorkflowExecutionServiceHelpe
from workflow_manager.workflow_v2.file_history_helper import FileHistoryHelper
from workflow_manager.workflow_v2.models.workflow import Workflow
from backend.constants import FeatureFlag
from unstract.flags.feature_flag import check_feature_flag_status
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
from unstract.filesystem import FileStorageType, FileSystem
from unstract.filesystem import FileStorageType, FileSystem
logger = logging.getLogger(__name__)
@@ -508,17 +504,10 @@ class SourceConnector(BaseConnector):
)
self.publish_input_file_content(input_file_path, input_log)
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
file_storage = file_system.get_file_storage()
file_storage.write(path=source_file_path, mode="wb", data=file_content)
file_storage.write(path=infile_path, mode="wb", data=file_content)
else:
with fsspec.open(source_file, "wb") as local_file:
local_file.write(file_content)
# Copy file to infile directory
self.copy_file_to_infile_dir(source_file_path, infile_path)
file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
file_storage = file_system.get_file_storage()
file_storage.write(path=source_file_path, mode="wb", data=file_content)
file_storage.write(path=infile_path, mode="wb", data=file_content)
logger.info(f"{input_file_path} is added to execution directory")
return hash_value_of_file_content
@@ -527,20 +516,17 @@ class SourceConnector(BaseConnector):
"""Add input file to execution directory from api storage."""
infile_path = os.path.join(self.execution_dir, WorkflowFileType.INFILE)
source_path = os.path.join(self.execution_dir, WorkflowFileType.SOURCE)
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
api_file_system = FileSystem(FileStorageType.API_EXECUTION)
api_file_storage = api_file_system.get_file_storage()
workflow_file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
workflow_file_storage = workflow_file_system.get_file_storage()
self._copy_file_to_destination(
source_storage=api_file_storage,
destination_storage=workflow_file_storage,
source_path=input_file_path,
destination_paths=[infile_path, source_path],
)
else:
shutil.copyfile(input_file_path, infile_path)
shutil.copyfile(input_file_path, source_path)
api_file_system = FileSystem(FileStorageType.API_EXECUTION)
api_file_storage = api_file_system.get_file_storage()
workflow_file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
workflow_file_storage = workflow_file_system.get_file_storage()
self._copy_file_to_destination(
source_storage=api_file_storage,
destination_storage=workflow_file_storage,
source_path=input_file_path,
destination_paths=[infile_path, source_path],
)
# TODO: replace it with method from SDK Utils
def _copy_file_to_destination(
@@ -701,20 +687,13 @@ class SourceConnector(BaseConnector):
for file in file_objs:
file_name = file.name
destination_path = os.path.join(api_storage_dir, file_name)
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
file_system = FileSystem(FileStorageType.API_EXECUTION)
file_storage = file_system.get_file_storage()
buffer = bytearray()
for chunk in file.chunks():
buffer.extend(chunk)
file_storage.write(path=destination_path, mode="wb", data=buffer)
else:
os.makedirs(os.path.dirname(destination_path), exist_ok=True)
with open(destination_path, "wb") as f:
buffer = bytearray()
for chunk in file.chunks():
buffer.extend(chunk)
f.write(buffer)
file_system = FileSystem(FileStorageType.API_EXECUTION)
file_storage = file_system.get_file_storage()
buffer = bytearray()
for chunk in file.chunks():
buffer.extend(chunk)
file_storage.write(path=destination_path, mode="wb", data=buffer)
file_hash = cls.hash_str(buffer)
connection_type = WorkflowEndpoint.ConnectionType.API

View File

@@ -46,6 +46,21 @@ services:
- traefik.http.routers.minio.rule=Host(`minio.unstract.localhost`)
- traefik.http.services.minio.loadbalancer.server.port=9001
createbuckets:
image: minio/mc
depends_on:
- minio
entrypoint: >
/bin/sh -c "
sleep 5;
mc alias set minio http://unstract-minio:9000 minio minio123;
mc mb minio/unstract;
mc mirror /app/prompt-studio-data minio/unstract/prompt-studio-data;
exit 0;
"
volumes:
- prompt_studio_data:/app/prompt-studio-data
reverse-proxy:
# The official v2 Traefik docker image
image: traefik:v2.10

View File

@@ -17,6 +17,7 @@ services:
- redis
- reverse-proxy
- minio
- createbuckets
- platform-service
- prompt-service
- x2text-service
@@ -171,6 +172,8 @@ services:
restart: unless-stopped
depends_on:
- db
- minio
- createbuckets
ports:
- "3003:3003"
env_file:

View File

@@ -31,10 +31,10 @@ FLIPT_SERVICE_AVAILABLE=False
# Cost calculation related ENVs
MODEL_PRICES_URL="https://raw.githubusercontent.com/BerriAI/litellm/main/model_prices_and_context_window.json"
MODEL_PRICES_TTL_IN_DAYS=7
MODEL_PRICES_FILE_PATH="/tmp/model_prices.json"
MODEL_PRICES_FILE_PATH="<bucket-name>/cost/model_prices.json"
#Remote storage config
FILE_STORAGE_CREDENTIALS='{"provider":"gcs","credentials": {"token": {token-value-json}}'
REMOTE_MODEL_PRICES_FILE_PATH="fsspec-test/cost/model_prices.json"
FILE_STORAGE_CREDENTIALS='{"provider":"local"}'
REMOTE_MODEL_PRICES_FILE_PATH="unstract/cost/model_prices.json"
LOG_LEVEL=INFO

View File

@@ -1,10 +1,3 @@
class FeatureFlag:
"""Temporary feature flags."""
# For enabling remote storage feature
REMOTE_FILE_STORAGE = "remote_file_storage"
class DBTable:
"""Database tables."""

View File

@@ -1,21 +1,13 @@
import json
import os
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from typing import Any, Optional
import requests
from flask import current_app as app
from unstract.platform_service.constants import FeatureFlag
from unstract.platform_service.env import Env
from unstract.platform_service.utils import format_float_positional
from unstract.flags.feature_flag import check_feature_flag_status
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
from datetime import timezone
from unstract.sdk.exceptions import FileStorageError
from unstract.sdk.file_storage import EnvHelper, StorageType
from unstract.sdk.exceptions import FileStorageError
from unstract.sdk.file_storage import EnvHelper, StorageType
class CostCalculationHelper:
@@ -29,25 +21,21 @@ class CostCalculationHelper:
self.url = url
self.file_path = file_path
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
try:
self.file_storage = EnvHelper.get_storage(
StorageType.PERMANENT, "FILE_STORAGE_CREDENTIALS"
)
self.file_path = os.environ.get("REMOTE_MODEL_PRICES_FILE_PATH")
except KeyError as e:
app.logger.error(
f"Required credentials is missing in the env: {str(e)}"
)
raise e
except FileStorageError as e:
app.logger.error(
"Error while initialising storage: %s",
e,
stack_info=True,
exc_info=True,
)
raise e
try:
self.file_storage = EnvHelper.get_storage(
StorageType.PERMANENT, "FILE_STORAGE_CREDENTIALS"
)
except KeyError as e:
app.logger.error(f"Required credentials is missing in the env: {str(e)}")
raise e
except FileStorageError as e:
app.logger.error(
"Error while initialising storage: %s",
e,
stack_info=True,
exc_info=True,
)
raise e
self.model_token_data = self._get_model_token_data()
@@ -58,10 +46,7 @@ class CostCalculationHelper:
item = None
if not self.model_token_data:
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
return json.loads(format_float_positional(cost))
else:
return format_float_positional(cost)
return json.loads(format_float_positional(cost))
# Filter the model objects by model name
filtered_models = {
k: v for k, v in self.model_token_data.items() if k.endswith(model_name)
@@ -80,43 +65,25 @@ class CostCalculationHelper:
def _get_model_token_data(self) -> Optional[dict[str, Any]]:
try:
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
# File does not exist, fetch JSON data from API
if not self.file_storage.exists(self.file_path):
return self._fetch_and_save_json()
# File does not exist, fetch JSON data from API
if not self.file_storage.exists(self.file_path):
return self._fetch_and_save_json()
file_mtime = self.file_storage.modification_time(self.file_path)
file_expiry_date = file_mtime + timedelta(days=self.ttl_days)
file_expiry_date_utc = file_expiry_date.replace(tzinfo=timezone.utc)
now_utc = datetime.now().replace(tzinfo=timezone.utc)
file_mtime = self.file_storage.modification_time(self.file_path)
file_expiry_date = file_mtime + timedelta(days=self.ttl_days)
file_expiry_date_utc = file_expiry_date.replace(tzinfo=timezone.utc)
now_utc = datetime.now().replace(tzinfo=timezone.utc)
if now_utc < file_expiry_date_utc:
app.logger.info(f"Reading model token data from {self.file_path}")
# File exists and TTL has not expired, read and return content
file_contents = self.file_storage.read(
self.file_path, mode="r", encoding="utf-8"
)
return json.loads(file_contents)
else:
# TTL expired, fetch updated JSON data from API
return self._fetch_and_save_json()
else:
# File does not exist, fetch JSON data from API
if not os.path.exists(self.file_path):
return self._fetch_and_save_json()
file_mtime = os.path.getmtime(self.file_path)
file_expiry_date = datetime.fromtimestamp(file_mtime) + timedelta(
days=self.ttl_days
if now_utc < file_expiry_date_utc:
app.logger.info(f"Reading model token data from {self.file_path}")
# File exists and TTL has not expired, read and return content
file_contents = self.file_storage.read(
self.file_path, mode="r", encoding="utf-8"
)
if datetime.now() < file_expiry_date:
app.logger.info(f"Reading model token data from {self.file_path}")
# File exists and TTL has not expired, read and return content
with open(self.file_path, encoding="utf-8") as f:
return json.load(f)
else:
# TTL expired, fetch updated JSON data from API
return self._fetch_and_save_json()
return json.loads(file_contents)
else:
# TTL expired, fetch updated JSON data from API
return self._fetch_and_save_json()
except Exception as e:
app.logger.warning(
"Error in calculate_cost: %s", e, stack_info=True, exc_info=True
@@ -137,21 +104,12 @@ class CostCalculationHelper:
response.raise_for_status()
json_data = response.json()
# Save JSON data to file
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
self.file_storage.json_dump(
path=self.file_path,
data=json_data,
ensure_ascii=False,
indent=4,
)
else:
with open(self.file_path, "w", encoding="utf-8") as f:
json.dump(json_data, f, ensure_ascii=False, indent=4)
# Set the file's modification time to indicate TTL
expiry_date = datetime.now() + timedelta(days=self.ttl_days)
expiry_timestamp = expiry_date.timestamp()
os.utime(self.file_path, (expiry_timestamp, expiry_timestamp))
self.file_storage.json_dump(
path=self.file_path,
data=json_data,
ensure_ascii=False,
indent=4,
)
app.logger.info(
"File '%s' updated successfully with TTL set to %d days.",
self.file_path,

View File

@@ -30,7 +30,7 @@ PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python
# Flipt Service
FLIPT_SERVICE_AVAILABLE=False
#Remote storage related envs
PERMANENT_REMOTE_STORAGE={"provider":"gcs","credentials":<credentials>}
REMOTE_PROMPT_STUDIO_FILE_PATH="<bucket_name>/prompt_studio_data/"
PERMANENT_REMOTE_STORAGE='{"provider": "minio", "credentials": {"endpoint_url": "http://unstract-minio:9000", "key": "minio", "secret": "minio123"}}'
TEMPORARY_REMOTE_STORAGE='{"provider": "minio", "credentials": {"endpoint_url": "http://unstract-minio:9000", "key": "minio", "secret": "minio123"}}'
REMOTE_PROMPT_STUDIO_FILE_PATH="unstract/prompt_studio_data/"

View File

@@ -91,12 +91,6 @@ class RunLevel(Enum):
TABLE_EXTRACTION = "TABLE_EXTRACTION"
class FeatureFlag:
"""Temporary feature flags."""
REMOTE_FILE_STORAGE = "remote_file_storage"
class DBTableV2:
"""Database tables."""

View File

@@ -10,7 +10,6 @@ from unstract.prompt_service.config import db
from unstract.prompt_service.constants import (
DBTableV2,
ExecutionSource,
FeatureFlag,
FileStorageKeys,
)
from unstract.prompt_service.constants import PromptServiceContants as PSKeys
@@ -19,15 +18,11 @@ from unstract.prompt_service.env_manager import EnvLoader
from unstract.prompt_service.exceptions import APIError, RateLimitError
from unstract.sdk.exceptions import RateLimitError as SdkRateLimitError
from unstract.sdk.exceptions import SdkError
from unstract.sdk.file_storage import FileStorage, FileStorageProvider
from unstract.sdk.file_storage.constants import StorageType
from unstract.sdk.file_storage.env_helper import EnvHelper
from unstract.sdk.llm import LLM
from unstract.flags.feature_flag import check_feature_flag_status
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
from unstract.sdk.file_storage import FileStorage, FileStorageProvider
from unstract.sdk.file_storage.constants import StorageType
from unstract.sdk.file_storage.env_helper import EnvHelper
PAID_FEATURE_MSG = (
"It is a cloud / enterprise feature. If you have purchased a plan and still "
"face this issue, please contact support"
@@ -307,27 +302,23 @@ def run_completion(
)
highlight_data = None
if highlight_data_plugin and enable_highlight:
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
fs_instance: FileStorage = FileStorage(FileStorageProvider.LOCAL)
if execution_source == ExecutionSource.IDE.value:
fs_instance = EnvHelper.get_storage(
storage_type=StorageType.PERMANENT,
env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE,
)
if execution_source == ExecutionSource.TOOL.value:
fs_instance = EnvHelper.get_storage(
storage_type=StorageType.SHARED_TEMPORARY,
env_name=FileStorageKeys.TEMPORARY_REMOTE_STORAGE,
)
highlight_data = highlight_data_plugin["entrypoint_cls"](
logger=current_app.logger,
file_path=file_path,
fs_instance=fs_instance,
).run
else:
highlight_data = highlight_data_plugin["entrypoint_cls"](
logger=current_app.logger, file_path=file_path
).run
fs_instance: FileStorage = FileStorage(FileStorageProvider.LOCAL)
if execution_source == ExecutionSource.IDE.value:
fs_instance = EnvHelper.get_storage(
storage_type=StorageType.PERMANENT,
env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE,
)
if execution_source == ExecutionSource.TOOL.value:
fs_instance = EnvHelper.get_storage(
storage_type=StorageType.SHARED_TEMPORARY,
env_name=FileStorageKeys.TEMPORARY_REMOTE_STORAGE,
)
highlight_data = highlight_data_plugin["entrypoint_cls"](
logger=current_app.logger,
file_path=file_path,
fs_instance=fs_instance,
).run
completion = llm.complete(
prompt=prompt,
process_text=highlight_data,
@@ -368,32 +359,24 @@ def extract_table(
"Unable to extract table details. "
"Please contact admin to resolve this issue."
)
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
fs_instance: FileStorage = FileStorage(FileStorageProvider.LOCAL)
if execution_source == ExecutionSource.IDE.value:
fs_instance = EnvHelper.get_storage(
storage_type=StorageType.PERMANENT,
env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE,
)
if execution_source == ExecutionSource.TOOL.value:
fs_instance = EnvHelper.get_storage(
storage_type=StorageType.SHARED_TEMPORARY,
env_name=FileStorageKeys.TEMPORARY_REMOTE_STORAGE,
)
fs_instance: FileStorage = FileStorage(FileStorageProvider.LOCAL)
if execution_source == ExecutionSource.IDE.value:
fs_instance = EnvHelper.get_storage(
storage_type=StorageType.PERMANENT,
env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE,
)
if execution_source == ExecutionSource.TOOL.value:
fs_instance = EnvHelper.get_storage(
storage_type=StorageType.TEMPORARY,
env_name=FileStorageKeys.TEMPORARY_REMOTE_STORAGE,
)
try:
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
answer = table_extractor["entrypoint_cls"].extract_large_table(
llm=llm,
table_settings=table_settings,
enforce_type=enforce_type,
fs_instance=fs_instance,
)
else:
answer = table_extractor["entrypoint_cls"].extract_large_table(
llm=llm,
table_settings=table_settings,
enforce_type=enforce_type,
)
answer = table_extractor["entrypoint_cls"].extract_large_table(
llm=llm,
table_settings=table_settings,
enforce_type=enforce_type,
fs_instance=fs_instance,
)
structured_output[output[PSKeys.NAME]] = answer
# We do not support summary and eval for table.
# Hence returning the result
@@ -428,32 +411,23 @@ def extract_line_item(
)
# Read file content into context
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
fs_instance: FileStorage = FileStorage(FileStorageProvider.LOCAL)
if execution_source == ExecutionSource.IDE.value:
fs_instance = EnvHelper.get_storage(
storage_type=StorageType.PERMANENT,
env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE,
)
if execution_source == ExecutionSource.TOOL.value:
fs_instance = EnvHelper.get_storage(
storage_type=StorageType.SHARED_TEMPORARY,
env_name=FileStorageKeys.TEMPORARY_REMOTE_STORAGE,
)
fs_instance: FileStorage = FileStorage(FileStorageProvider.LOCAL)
if execution_source == ExecutionSource.IDE.value:
fs_instance = EnvHelper.get_storage(
storage_type=StorageType.PERMANENT,
env_name=FileStorageKeys.PERMANENT_REMOTE_STORAGE,
)
if execution_source == ExecutionSource.TOOL.value:
fs_instance = EnvHelper.get_storage(
storage_type=StorageType.SHARED_TEMPORARY,
env_name=FileStorageKeys.TEMPORARY_REMOTE_STORAGE,
)
if not fs_instance.exists(extract_file_path):
raise FileNotFoundError(
f"The file at path '{extract_file_path}' does not exist."
)
context = fs_instance.read(path=extract_file_path, encoding="utf-8", mode="rb")
else:
if not os.path.exists(extract_file_path):
raise FileNotFoundError(
f"The file at path '{extract_file_path}' does not exist."
)
with open(extract_file_path, encoding="utf-8") as file:
context = file.read()
if not fs_instance.exists(extract_file_path):
raise FileNotFoundError(
f"The file at path '{extract_file_path}' does not exist."
)
context = fs_instance.read(path=extract_file_path, encoding="utf-8", mode="rb")
prompt = construct_prompt(
preamble=tool_settings.get(PSKeys.PREAMBLE, ""),

View File

@@ -37,12 +37,11 @@ sudo ln -s "$HOME/.docker/run/docker.sock" /var/run/docker.sock
## Required Environment Variables
| Variable | Description |
| -------------------------- | ---------------------------------------------------------------------------------------|
| `CELERY_BROKER_URL` | URL for Celery's message broker, used to queue tasks. Must match backend configuration.|
| `TOOL_CONTAINER_NETWORK` | Network used for running tool containers. |
| `TOOL_CONTAINER_LABELS` | Labels applied to tool containers for observability [Optional]. |
| `WORKFLOW_DATA_DIR` | Source mount bind directory for tool containers to access input files. |
| `TOOL_DATA_DIR` | Target mount directory within tool containers. (Default: "/data") |
| `LOG_LEVEL` | Log level for runner (Options: INFO, WARNING, ERROR, DEBUG, etc.) |
| Variable | Description |
| -------------------------- |-----------------------------------------------------------------------------------------------|
| `CELERY_BROKER_URL` | URL for Celery's message broker, used to queue tasks. Must match backend configuration. |
| `TOOL_CONTAINER_NETWORK` | Network used for running tool containers. |
| `TOOL_CONTAINER_LABELS` | Labels applied to tool containers for observability [Optional]. |
| `EXECUTION_DATA_DIR` | Target mount directory within tool containers. (Default: "/data") |
| `LOG_LEVEL` | Log level for runner (Options: INFO, WARNING, ERROR, DEBUG, etc.) |
| `REMOVE_CONTAINER_ON_EXIT`| Flag to decide whether to clean up/ remove the tool container after execution. (Default: True) |

View File

@@ -3,8 +3,6 @@ CELERY_BROKER_URL = "redis://unstract-redis:6379"
TOOL_CONTAINER_NETWORK="unstract-network"
TOOL_CONTAINER_LABELS="[]"
WORKFLOW_DATA_DIR="${PWD}/workflow_data/execution"
TOOL_DATA_DIR="/data"
PRIVATE_REGISTRY_CREDENTIAL_PATH=
PRIVATE_REGISTRY_USERNAME=
PRIVATE_REGISTRY_URL=
@@ -19,7 +17,6 @@ REMOVE_CONTAINER_ON_EXIT=True
# Client module path of the container engine to be used.
CONTAINER_CLIENT_PATH=unstract.runner.clients.docker
EXECUTION_RUN_DATA_FOLDER_PREFIX="/app/workflow_data"
# Feature Flags
FLIPT_SERVICE_AVAILABLE=False
@@ -29,7 +26,7 @@ PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python
# File System Configuration for Workflow and API Execution
# Directory Prefixes for storing execution files
WORKFLOW_EXECUTION_DIR_PREFIX="/unstract/execution"
WORKFLOW_EXECUTION_DIR_PREFIX="unstract/execution"
# Storage Provider for Workflow Execution
# Valid options: MINIO, S3, etc..
WORKFLOW_EXECUTION_FILE_STORAGE_CREDENTIALS='{"provider":"minio","credentials": {"endpoint_url":"http://unstract-minio:9000","key":"XXX","secret":"XXX"}}'
WORKFLOW_EXECUTION_FILE_STORAGE_CREDENTIALS='{"provider": "minio", "credentials": {"endpoint_url": "http://unstract-minio:9000", "key": "minio", "secret": "minio123"}}'

View File

@@ -9,12 +9,11 @@ from unstract.runner.clients.interface import (
ContainerClientInterface,
ContainerInterface,
)
from unstract.runner.constants import Env, FeatureFlag
from unstract.runner.constants import Env
from unstract.runner.utils import Utils
from docker import DockerClient
from unstract.core.utilities import UnstractUtils
from unstract.flags.feature_flag import check_feature_flag_status
class DockerContainer(ContainerInterface):
@@ -159,9 +158,6 @@ class Client(ContainerClientInterface):
def get_container_run_config(
self,
command: list[str],
organization_id: str,
workflow_id: str,
execution_id: str,
run_id: str,
envs: Optional[dict[str, Any]] = None,
auto_remove: bool = False,
@@ -169,27 +165,6 @@ class Client(ContainerClientInterface):
if envs is None:
envs = {}
mounts = []
if organization_id and workflow_id and execution_id:
if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
source_path = os.path.join(
os.getenv(Env.WORKFLOW_DATA_DIR, ""),
organization_id,
workflow_id,
execution_id,
)
mounts.append(
{
"type": "bind",
"source": source_path,
"target": os.getenv(Env.TOOL_DATA_DIR, "/data"),
}
)
envs[Env.EXECUTION_RUN_DATA_FOLDER] = os.path.join(
os.getenv(Env.EXECUTION_RUN_DATA_FOLDER_PREFIX, ""),
organization_id,
workflow_id,
execution_id,
)
return {
"name": UnstractUtils.build_tool_container_name(
tool_image=self.image_name, tool_version=self.image_tag, run_id=run_id

View File

@@ -62,9 +62,6 @@ class ContainerClientInterface(ABC):
def get_container_run_config(
self,
command: list[str],
organization_id: str,
workflow_id: str,
execution_id: str,
run_id: str,
envs: Optional[dict[str, Any]] = None,
auto_remove: bool = False,

View File

@@ -108,12 +108,7 @@ def test_get_image(docker_client, mocker):
def test_get_container_run_config(docker_client, mocker):
"""Test the get_container_run_config method."""
os.environ[Env.WORKFLOW_DATA_DIR] = "/source"
os.environ[Env.EXECUTION_RUN_DATA_FOLDER_PREFIX] = "/app/workflow_data"
command = ["echo", "hello"]
organization_id = "org123"
workflow_id = "wf123"
execution_id = "ex123"
run_id = "run123"
mocker.patch.object(docker_client, "_Client__image_exists", return_value=True)
@@ -122,13 +117,7 @@ def test_get_container_run_config(docker_client, mocker):
return_value="test-image",
)
config = docker_client.get_container_run_config(
command,
organization_id,
workflow_id,
execution_id,
run_id,
envs={"KEY": "VALUE"},
auto_remove=True,
command, run_id, envs={"KEY": "VALUE"}, auto_remove=True
)
mocker_normalize.assert_called_once_with(
@@ -137,24 +126,14 @@ def test_get_container_run_config(docker_client, mocker):
assert config["name"] == "test-image"
assert config["image"] == "test-image:latest"
assert config["command"] == ["echo", "hello"]
assert config["environment"] == {
"KEY": "VALUE",
"EXECUTION_RUN_DATA_FOLDER": ("/app/workflow_data/org123/wf123/ex123"),
}
assert config["mounts"] == [
{
"type": "bind",
"source": f"/source/{organization_id}/{workflow_id}/{execution_id}",
"target": "/data",
}
]
assert config["environment"] == {"KEY": "VALUE"}
assert config["mounts"] == []
def test_get_container_run_config_without_mount(docker_client, mocker):
"""Test the get_container_run_config method."""
os.environ[Env.WORKFLOW_DATA_DIR] = "/source"
os.environ[Env.EXECUTION_DATA_DIR] = "/source"
command = ["echo", "hello"]
execution_id = "ex123"
run_id = "run123"
mocker.patch.object(docker_client, "_Client__image_exists", return_value=True)
@@ -162,14 +141,7 @@ def test_get_container_run_config_without_mount(docker_client, mocker):
"unstract.core.utilities.UnstractUtils.build_tool_container_name",
return_value="test-image",
)
config = docker_client.get_container_run_config(
command,
None,
None,
execution_id,
run_id,
auto_remove=True,
)
config = docker_client.get_container_run_config(command, run_id, auto_remove=True)
mocker_normalize.assert_called_once_with(
tool_image="test-image", tool_version="latest", run_id=run_id

View File

@@ -20,10 +20,6 @@ class ToolKey:
class Env:
TOOL_CONTAINER_NETWORK = "TOOL_CONTAINER_NETWORK"
TOOL_CONTAINER_LABELS = "TOOL_CONTAINER_LABELS"
WORKFLOW_DATA_DIR = "WORKFLOW_DATA_DIR"
EXECUTION_RUN_DATA_FOLDER = "EXECUTION_RUN_DATA_FOLDER"
EXECUTION_RUN_DATA_FOLDER_PREFIX = "EXECUTION_RUN_DATA_FOLDER_PREFIX"
TOOL_DATA_DIR = "TOOL_DATA_DIR"
PRIVATE_REGISTRY_CREDENTIAL_PATH = "PRIVATE_REGISTRY_CREDENTIAL_PATH"
PRIVATE_REGISTRY_USERNAME = "PRIVATE_REGISTRY_USERNAME"
PRIVATE_REGISTRY_URL = "PRIVATE_REGISTRY_URL"
@@ -35,9 +31,3 @@ class Env:
)
EXECUTION_DATA_DIR = "EXECUTION_DATA_DIR"
FLIPT_SERVICE_AVAILABLE = "FLIPT_SERVICE_AVAILABLE"
class FeatureFlag:
"""Temporary feature flags."""
REMOTE_FILE_STORAGE = "remote_file_storage"

View File

@@ -11,12 +11,11 @@ from unstract.runner.clients.interface import (
ContainerClientInterface,
ContainerInterface,
)
from unstract.runner.constants import Env, FeatureFlag, LogLevel, LogType, ToolKey
from unstract.runner.constants import Env, LogLevel, LogType, ToolKey
from unstract.runner.exception import ToolRunException
from unstract.core.constants import LogFieldName
from unstract.core.pubsub_helper import LogPublisher
from unstract.flags.feature_flag import check_feature_flag_status
load_dotenv()
# Loads the container clinet class.
@@ -140,12 +139,7 @@ class UnstractRunner:
"""
command = command.upper()
container_config = self.client.get_container_run_config(
command=["--command", command],
organization_id="",
workflow_id="",
execution_id="",
run_id="",
auto_remove=True,
command=["--command", command], run_id="", auto_remove=True
)
container = None
@@ -186,19 +180,16 @@ class UnstractRunner:
Returns:
Optional[Any]: _description_
"""
tool_data_dir = os.getenv(Env.TOOL_DATA_DIR, "/data")
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
envs[Env.EXECUTION_DATA_DIR] = os.path.join(
os.getenv(Env.WORKFLOW_EXECUTION_DIR_PREFIX, ""),
organization_id,
workflow_id,
execution_id,
)
envs[Env.WORKFLOW_EXECUTION_FILE_STORAGE_CREDENTIALS] = os.getenv(
Env.WORKFLOW_EXECUTION_FILE_STORAGE_CREDENTIALS, "{}"
)
else:
envs[Env.TOOL_DATA_DIR] = tool_data_dir
envs[Env.EXECUTION_DATA_DIR] = os.path.join(
os.getenv(Env.WORKFLOW_EXECUTION_DIR_PREFIX, ""),
organization_id,
workflow_id,
execution_id,
)
envs[Env.WORKFLOW_EXECUTION_FILE_STORAGE_CREDENTIALS] = os.getenv(
Env.WORKFLOW_EXECUTION_FILE_STORAGE_CREDENTIALS, "{}"
)
container_config = self.client.get_container_run_config(
command=[
@@ -209,9 +200,6 @@ class UnstractRunner:
"--log-level",
"DEBUG",
],
organization_id=organization_id,
workflow_id=workflow_id,
execution_id=execution_id,
run_id=run_id,
envs=envs,
)

View File

@@ -5,11 +5,11 @@ The document classifier tool classifies documents and copies them to a folder ba
## Required environment variables
| Variable | Description |
| -------------------------- | --------------------------------------------------------------------- |
| -------------------------- |-----------------------------------------------------------------------|
| `PLATFORM_SERVICE_HOST` | The host in which the platform service is running |
| `PLATFORM_SERVICE_PORT` | The port in which the service is listening |
| `PLATFORM_SERVICE_API_KEY` | The API key for the platform |
| `TOOL_DATA_DIR` | The directory in the filesystem which has contents for tool execution |
| `EXECUTION_DATA_DIR` | The directory in the filesystem which has contents for tool execution |
## Testing the tool locally
@@ -41,7 +41,7 @@ Load the environment variables for the tool.
Make a copy of the `sample.env` file and name it `.env`. Fill in the required values.
They get loaded with [python-dotenv](https://pypi.org/project/python-dotenv/) through the SDK.
Update the tool's `data_dir` marked by the `TOOL_DATA_DIR` env. This has to be done before each tool execution since the tool updates the `INFILE` and `METADATA.json`.
Update the tool's `data_dir` marked by the `EXECUTION_DATA_DIR` env. This has to be done before each tool execution since the tool updates the `INFILE` and `METADATA.json`.
### Run SPEC command
@@ -99,7 +99,7 @@ Build the tool docker image from the folder containing the `Dockerfile` with
docker build -t unstract/tool-classifier:0.0.1 .
```
Make sure the directory pointed by `TOOL_DATA_DIR` has the required information for the tool to run and
Make sure the directory pointed by `EXECUTION_DATA_DIR` has the required information for the tool to run and
necessary services like the `platform-service` is up.
To test the tool from its docker image, run the following command

View File

@@ -1,7 +1,7 @@
PLATFORM_SERVICE_HOST=http://unstract-platform-service
PLATFORM_SERVICE_PORT=3001
PLATFORM_SERVICE_API_KEY=<add_platform_key_from_Unstract_frontend>
TOOL_DATA_DIR=../data_dir
EXECUTION_DATA_DIR=../data_dir
X2TEXT_HOST=http://unstract-x2text-service
X2TEXT_PORT=3004

View File

@@ -1,5 +1,4 @@
import re
import shutil
from pathlib import Path
from typing import Any, Optional
@@ -24,7 +23,7 @@ class ClassifierHelper:
Args:
tool (BaseTool): Base tool instance
output_dir (str): Output directory in TOOL_DATA_DIR
output_dir (str): Output directory in EXECUTION_DATA_DIR
"""
self.tool = tool
self.output_dir = output_dir
@@ -67,20 +66,13 @@ class ClassifierHelper:
"""
try:
output_folder_bin = Path(self.output_dir) / classification
if self.tool.workflow_filestorage:
output_file = output_folder_bin / source_name
self._copy_file(
source_fs=self.tool.workflow_filestorage,
destination_fs=self.tool.workflow_filestorage,
source_path=source_file,
destination_path=str(output_file),
)
else:
if not output_folder_bin.is_dir():
output_folder_bin.mkdir(parents=True, exist_ok=True)
output_file = output_folder_bin / source_name
shutil.copyfile(source_file, output_file)
output_file = output_folder_bin / source_name
self._copy_file(
source_fs=self.tool.workflow_filestorage,
destination_fs=self.tool.workflow_filestorage,
source_path=source_file,
destination_path=str(output_file),
)
except Exception as e:
self.tool.stream_error_and_exit(f"Error creating output file: {e}")
@@ -150,16 +142,11 @@ class ClassifierHelper:
self.tool.stream_log("Text extraction adapter has been created successfully.")
try:
if self.tool.workflow_filestorage:
extraction_result: TextExtractionResult = x2text.process(
input_file_path=file,
fs=self.tool.workflow_filestorage,
tags=self.tool.tags,
)
else:
extraction_result: TextExtractionResult = x2text.process(
input_file_path=file, tags=self.tool.tags
)
extraction_result: TextExtractionResult = x2text.process(
input_file_path=file,
fs=self.tool.workflow_filestorage,
tags=self.tool.tags,
)
extracted_text: str = extraction_result.extracted_text
return extracted_text
except Exception as e:
@@ -176,13 +163,9 @@ class ClassifierHelper:
"""
self.tool.stream_log("Extracting text from file")
try:
if self.tool.workflow_filestorage:
text = self.tool.workflow_filestorage.read(path=file, mode="rb").decode(
"utf-8"
)
else:
with open(file, "rb") as f:
text = f.read().decode("utf-8")
text = self.tool.workflow_filestorage.read(path=file, mode="rb").decode(
"utf-8"
)
except Exception as e:
self.tool.stream_log(f"File error: {e}")
return None

View File

@@ -5,11 +5,11 @@ This is a helper tool that needs to be used along with `Prompt Studio`. It helps
## Required environment variables
| Variable | Description |
| -------------------------- | --------------------------------------------------------------------- |
| -------------------------- |-----------------------------------------------------------------------|
| `PLATFORM_SERVICE_HOST` | The host in which the platform service is running |
| `PLATFORM_SERVICE_PORT` | The port in which the service is listening |
| `PLATFORM_SERVICE_API_KEY` | The API key for the platform |
| `TOOL_DATA_DIR` | The directory in the filesystem which has contents for tool execution |
| `EXECUTION_DATA_DIR` | The directory in the filesystem which has contents for tool execution |
| `PROMPT_HOST` | The host in which the prompt service is running |
| `PROMPT_PORT` | The port in which the prompt service is listening |
| `PROMPT_PORT` | The port in which the prompt service is listening |
@@ -46,7 +46,7 @@ Load the environment variables for the tool.
Make a copy of the `sample.env` file and name it `.env`. Fill in the required values.
They get loaded with [python-dotenv](https://pypi.org/project/python-dotenv/) through the SDK.
Update the tool's `data_dir` marked by the `TOOL_DATA_DIR` env. This has to be done before each tool execution since the tool updates the `INFILE` and `METADATA.json`.
Update the tool's `data_dir` marked by the `EXECUTION_DATA_DIR` env. This has to be done before each tool execution since the tool updates the `INFILE` and `METADATA.json`.
### Run SPEC command
@@ -102,7 +102,7 @@ Build the tool docker image from the folder containing the `Dockerfile` with
docker build -t unstract/tool-structure:0.0.1 .
```
Make sure the directory pointed by `TOOL_DATA_DIR` has the required information for the tool to run and
Make sure the directory pointed by `EXECUTION_DATA_DIR` has the required information for the tool to run and
necessary services like the `platform-service` is up.
To test the tool from its docker image, run the following command

View File

@@ -1,9 +1,7 @@
PLATFORM_SERVICE_HOST=http://unstract-platform-service
PLATFORM_SERVICE_PORT=3001
PLATFORM_SERVICE_API_KEY=<add_platform_key_from_Unstract_frontend>
TOOL_DATA_DIR=../data_dir
EXECUTION_RUN_DATA_FOLDER=../data_dir
EXECUTION_DATA_DIR=../data_dir
PROMPT_HOST=http://unstract-prompt-service
PROMPT_PORT=3003

View File

@@ -50,7 +50,6 @@ class SettingsKeys:
SINGLE_PASS_EXTRACTION_MODE = "single_pass_extraction_mode"
CHALLENGE_LLM_ADAPTER_ID = "challenge_llm_adapter_id"
SUMMARIZE_AS_SOURCE = "summarize_as_source"
TOOL_DATA_DIR = "TOOL_DATA_DIR"
SUMMARIZE_PROMPT = "summarize_prompt"
CONTEXT = "context"
ERROR = "error"
@@ -73,7 +72,6 @@ class SettingsKeys:
EPILOGUE = "epilogue"
HIGHLIGHT_DATA = "highlight_data"
CONFIDENCE_DATA = "confidence_data"
EXECUTION_RUN_DATA_FOLDER = "EXECUTION_RUN_DATA_FOLDER"
FILE_PATH = "file_path"
EXECUTION_SOURCE = "execution_source"
TOOL = "tool"

View File

@@ -11,7 +11,6 @@ from unstract.sdk.index import Index
from unstract.sdk.prompt import PromptTool
from unstract.sdk.tool.base import BaseTool
from unstract.sdk.tool.entrypoint import ToolEntrypoint
from unstract.sdk.utils import ToolUtils
from utils import json_to_markdown
logger = logging.getLogger(__name__)
@@ -94,16 +93,10 @@ class StructureTool(BaseTool):
_, file_name = os.path.split(input_file)
if summarize_as_source:
file_name = SettingsKeys.SUMMARIZE
if self.workflow_filestorage:
tool_data_dir = Path(self.get_env_or_die(ToolEnv.EXECUTION_DATA_DIR))
execution_run_data_folder = Path(
self.get_env_or_die(ToolEnv.EXECUTION_DATA_DIR)
)
else:
tool_data_dir = Path(self.get_env_or_die(SettingsKeys.TOOL_DATA_DIR))
execution_run_data_folder = Path(
self.get_env_or_die(SettingsKeys.EXECUTION_RUN_DATA_FOLDER)
)
tool_data_dir = Path(self.get_env_or_die(ToolEnv.EXECUTION_DATA_DIR))
execution_run_data_folder = Path(
self.get_env_or_die(ToolEnv.EXECUTION_DATA_DIR)
)
index = Index(
tool=self,
@@ -153,11 +146,7 @@ class StructureTool(BaseTool):
usage_kwargs=usage_kwargs,
process_text=process_text,
tags=self.tags,
**(
{"fs": self.workflow_filestorage}
if self.workflow_filestorage is not None
else {}
),
**({"fs": self.workflow_filestorage}),
)
index_metrics = {SettingsKeys.INDEXING: index.get_metrics()}
if summarize_as_source:
@@ -196,11 +185,7 @@ class StructureTool(BaseTool):
usage_kwargs=usage_kwargs,
process_text=process_text,
tags=self.tags,
**(
{"fs": self.workflow_filestorage}
if self.workflow_filestorage is not None
else {}
),
**({"fs": self.workflow_filestorage}),
)
index_metrics[output[SettingsKeys.NAME]] = {
SettingsKeys.INDEXING: index.get_metrics()
@@ -289,13 +274,9 @@ class StructureTool(BaseTool):
try:
self.stream_log("Writing parsed output...")
output_path = Path(output_dir) / f"{Path(self.source_file_name).stem}.json"
if self.workflow_filestorage:
self.workflow_filestorage.json_dump(
path=output_path, data=structured_output_dict
)
else:
with open(output_path, "w", encoding="utf-8") as f:
f.write(structured_output)
self.workflow_filestorage.json_dump(
path=output_path, data=structured_output_dict
)
except OSError as e:
self.stream_error_and_exit(f"Error creating output file: {e}")
except json.JSONDecodeError as e:
@@ -336,23 +317,13 @@ class StructureTool(BaseTool):
summarize_file_path = tool_data_dir / SettingsKeys.SUMMARIZE
summarized_context = ""
if self.workflow_filestorage:
if self.workflow_filestorage.exists(summarize_file_path):
summarized_context = self.workflow_filestorage.read(
path=summarize_file_path, mode="r"
)
elif summarize_file_path.exists():
with open(summarize_file_path, encoding="utf-8") as f:
summarized_context = f.read()
if self.workflow_filestorage.exists(summarize_file_path):
summarized_context = self.workflow_filestorage.read(
path=summarize_file_path, mode="r"
)
if not summarized_context:
context = ""
if self.workflow_filestorage:
context = self.workflow_filestorage.read(
path=extract_file_path, mode="r"
)
else:
with open(extract_file_path, encoding="utf-8") as file:
context = file.read()
context = self.workflow_filestorage.read(path=extract_file_path, mode="r")
prompt_keys = []
for output in outputs:
prompt_keys.append(output[SettingsKeys.NAME])
@@ -377,23 +348,14 @@ class StructureTool(BaseTool):
structure_output = json.loads(response[SettingsKeys.STRUCTURE_OUTPUT])
summarized_context = structure_output.get(SettingsKeys.DATA, "")
self.stream_log("Writing summarized context to a file")
if self.workflow_filestorage:
self.workflow_filestorage.write(
path=summarize_file_path, mode="w", data=summarized_context
)
else:
with open(summarize_file_path, "w", encoding="utf-8") as f:
f.write(summarized_context)
self.workflow_filestorage.write(
path=summarize_file_path, mode="w", data=summarized_context
)
self.stream_log("Indexing summarized context")
if self.workflow_filestorage:
summarize_file_hash: str = self.workflow_filestorage.get_hash_from_file(
path=summarize_file_path
)
else:
summarize_file_hash: str = ToolUtils.get_hash_from_file(
file_path=summarize_file_path
)
summarize_file_hash: str = self.workflow_filestorage.get_hash_from_file(
path=summarize_file_path
)
index.index(
tool_id=tool_id,
embedding_instance_id=embedding_instance_id,
@@ -405,11 +367,7 @@ class StructureTool(BaseTool):
chunk_overlap=0,
usage_kwargs=usage_kwargs,
tags=self.tags,
**(
{"fs": self.workflow_filestorage}
if self.workflow_filestorage is not None
else {}
),
**({"fs": self.workflow_filestorage}),
)
return summarize_file_hash

View File

@@ -7,11 +7,11 @@ For example, it can convert PDF to text, image to text, etc.
## Required Environment Variables
| Variable | Description |
| -------------------------- | -------------------------------------------------------------------------- |
| -------------------------- |----------------------------------------------------------------------------|
| `PLATFORM_SERVICE_HOST` | The host where the platform service is running |
| `PLATFORM_SERVICE_PORT` | The port where the service is listening |
| `PLATFORM_SERVICE_API_KEY` | The API key for the platform |
| `TOOL_DATA_DIR` | The directory in the filesystem which contains contents for tool execution |
| `EXECUTION_DATA_DIR` | The directory in the filesystem which contains contents for tool execution |
| `X2TEXT_HOST` | The host where the x2text service is running |
| `X2TEXT_PORT` | The port where the x2text service is listening |
@@ -42,7 +42,7 @@ source .venv/bin/activate
Make a copy of the `sample.env` file and name it `.env`. Fill in the required values.
They get loaded with [python-dotenv](https://pypi.org/project/python-dotenv/) through the SDK.
2. Update the tool's `data_dir` marked by the `TOOL_DATA_DIR` env. This has to be done before each tool execution since the tool updates the `INFILE` and `METADATA.json`.
2. Update the tool's `data_dir` marked by the `EXECUTION_DATA_DIR` env. This has to be done before each tool execution since the tool updates the `INFILE` and `METADATA.json`.
#### Run SPEC command
@@ -94,7 +94,7 @@ Build the tool docker image from the folder containing the `Dockerfile` with
docker build -t unstract/tool-example:0.0.1 .
```
Make sure the directory pointed by `TOOL_DATA_DIR` has the required information for the tool to run and
Make sure the directory pointed by `EXECUTION_DATA_DIR` has the required information for the tool to run and
necessary services like the `platform-service` is up.
To test the tool from its docker image, run the following command

View File

@@ -1,8 +1,7 @@
PLATFORM_SERVICE_HOST=
PLATFORM_SERVICE_PORT=
PLATFORM_SERVICE_API_KEY=
TOOL_DATA_DIR=
EXECUTION_DATA_DIR=
# X2TEXT service
X2TEXT_HOST=
X2TEXT_PORT=

View File

@@ -63,14 +63,9 @@ class TextExtractor(BaseTool):
usage_kwargs=usage_kwargs,
)
self.stream_log("Text extraction adapter has been created successfully.")
if self.workflow_filestorage:
extraction_result: TextExtractionResult = text_extraction_adapter.process(
input_file_path=input_file, fs=self.workflow_filestorage, tags=self.tags
)
else:
extraction_result: TextExtractionResult = text_extraction_adapter.process(
input_file_path=input_file, tags=self.tags
)
extraction_result: TextExtractionResult = text_extraction_adapter.process(
input_file_path=input_file, fs=self.workflow_filestorage, tags=self.tags
)
extracted_text = self.convert_to_actual_string(extraction_result.extracted_text)
self.stream_log("Text has been extracted successfully.")
@@ -85,13 +80,9 @@ class TextExtractor(BaseTool):
output_path = (
Path(output_dir) / f"{Path(self.source_file_name).stem}.txt"
)
if self.workflow_filestorage:
self.workflow_filestorage.write(
path=output_path, mode="w", data=extracted_text
)
else:
with open(output_path, "w", encoding="utf-8") as file:
file.write(extracted_text)
self.workflow_filestorage.write(
path=output_path, mode="w", data=extracted_text
)
self.stream_log("Tool output written successfully.")
else:

View File

@@ -5,13 +5,9 @@ from typing import Any
import azure.core.exceptions as AzureException
from adlfs import AzureBlobFileSystem
from backend.constants import FeatureFlag
from unstract.connectors.exceptions import AzureHttpError, ConnectorError
from unstract.connectors.filesystems.unstract_file_system import UnstractFileSystem
from unstract.flags.feature_flag import check_feature_flag_status
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
from unstract.filesystem import FileStorageType, FileSystem
from unstract.filesystem import FileStorageType, FileSystem
logging.getLogger("azurefs").setLevel(logging.ERROR)
logger = logging.getLogger(__name__)
@@ -97,13 +93,9 @@ class AzureCloudStorageFS(UnstractFileSystem):
normalized_path = os.path.normpath(destination_path)
destination_connector_fs = self.get_fsspec_fs()
try:
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
workflow_fs = file_system.get_file_storage()
data = workflow_fs.read(path=source_path, mode="rb")
else:
with open(source_path, "rb") as source_file:
data = source_file.read()
file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
workflow_fs = file_system.get_file_storage()
data = workflow_fs.read(path=source_path, mode="rb")
destination_connector_fs.write_bytes(normalized_path, data)
except AzureException.HttpResponseError as e:
self.raise_http_exception(e=e, path=normalized_path)

View File

@@ -5,13 +5,9 @@ from typing import Any
from fsspec import AbstractFileSystem
from backend.constants import FeatureFlag
from unstract.connectors.base import UnstractConnector
from unstract.connectors.enums import ConnectorMode
from unstract.flags.feature_flag import check_feature_flag_status
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
from unstract.filesystem import FileStorageType, FileSystem
from unstract.filesystem import FileStorageType, FileSystem
logger = logging.getLogger(__name__)
@@ -106,11 +102,7 @@ class UnstractFileSystem(UnstractConnector, ABC):
"""
normalized_path = os.path.normpath(destination_path)
destination_connector_fs = self.get_fsspec_fs()
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
workflow_fs = file_system.get_file_storage()
data = workflow_fs.read(path=source_path, mode="rb")
else:
with open(source_path, "rb") as source_file:
data = source_file.read()
file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
workflow_fs = file_system.get_file_storage()
data = workflow_fs.read(path=source_path, mode="rb")
destination_connector_fs.write_bytes(normalized_path, data)

View File

@@ -1,13 +1,6 @@
from typing import Any
class FeatureFlag:
"""Temporary feature flags."""
# For enabling remote storage feature
REMOTE_FILE_STORAGE = "remote_file_storage"
class Tools:
TOOLS_DIRECTORY = "tools"
IMAGE_LATEST_TAG = "latest"

View File

@@ -1,26 +1,16 @@
import json
import logging
import os
from typing import Any, Optional
from unstract.tool_registry.constants import (
FeatureFlag,
PropKey,
ToolJsonField,
ToolKey,
)
from unstract.sdk.exceptions import FileStorageError
from unstract.sdk.file_storage import EnvHelper, FileStorage, StorageType
from unstract.tool_registry.constants import PropKey, ToolJsonField, ToolKey
from unstract.tool_registry.dto import Tool
from unstract.tool_registry.exceptions import InvalidToolURLException
from unstract.tool_registry.helper import ToolRegistryHelper
from unstract.tool_registry.schema_validator import JsonSchemaValidator
from unstract.tool_registry.tool_utils import ToolUtils
from unstract.flags.feature_flag import check_feature_flag_status
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
from unstract.sdk.exceptions import FileStorageError
from unstract.sdk.file_storage import FileStorageProvider, PermanentFileStorage
logger = logging.getLogger(__name__)
@@ -58,10 +48,8 @@ class ToolRegistry:
"registry JSONs and YAML to a directory and set the env."
)
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
self.fs = self._get_storage_credentials()
else:
self.fs = None
self.fs = self._get_storage_credentials()
self.helper = ToolRegistryHelper(
registry=os.path.join(directory, registry_file),
private_tools_file=os.path.join(directory, private_tools),
@@ -69,30 +57,23 @@ class ToolRegistry:
fs=self.fs,
)
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
def _get_storage_credentials(self) -> PermanentFileStorage:
try:
# Not creating constants for now for the keywords below as this
# logic ought to change in the near future to maintain unformity
# across services
file_storage = json.loads(
os.environ.get("TOOL_REGISTRY_STORAGE_CREDENTIALS", {})
)
provider = FileStorageProvider(file_storage["provider"])
credentials = file_storage.get("credentials", {})
return PermanentFileStorage(provider, **credentials)
except KeyError as e:
logger.error(f"Required credentials is missing in the env: {str(e)}")
raise e
except FileStorageError as e:
logger.error(
"Error while initialising storage: %s",
e,
stack_info=True,
exc_info=True,
)
raise e
def _get_storage_credentials(self) -> FileStorage:
try:
fs = EnvHelper.get_storage(
StorageType.PERMANENT, "TOOL_REGISTRY_STORAGE_CREDENTIALS"
)
return fs
except KeyError as e:
logger.error(f"Required credentials is missing in the env: {str(e)}")
raise e
except FileStorageError as e:
logger.error(
"Error while initialising storage: %s",
e,
stack_info=True,
exc_info=True,
)
raise e
def load_all_tools_to_disk(self) -> None:
self.helper.load_all_tools_to_disk()

View File

@@ -3,15 +3,12 @@ import logging
import re
from typing import Any, Optional
import yaml
from unstract.sdk.adapters.enums import AdapterTypes
from unstract.sdk.file_storage import FileStorage, FileStorageProvider
from unstract.tool_registry.constants import AdapterPropertyKey, FeatureFlag, Tools
from unstract.tool_registry.constants import AdapterPropertyKey, Tools
from unstract.tool_registry.dto import AdapterProperties, Spec, Tool, ToolMeta
from unstract.tool_registry.exceptions import InvalidToolURLException, RegistryNotFound
from unstract.flags.feature_flag import check_feature_flag_status
logger = logging.getLogger(__name__)
@@ -63,22 +60,14 @@ class ToolUtils:
data: dict[str, Any],
fs: FileStorage = FileStorage(FileStorageProvider.LOCAL),
) -> None:
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
fs.json_dump(path=file_path, mode="w", encoding="utf-8", data=data)
else:
with open(file_path, "w") as json_file:
json.dump(data, json_file)
fs.json_dump(path=file_path, mode="w", encoding="utf-8", data=data)
@staticmethod
def get_all_tools_from_disk(
file_path: str, fs: FileStorage = FileStorage(FileStorageProvider.LOCAL)
) -> dict[str, Any]:
try:
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
data = fs.json_load(file_path)
else:
with open(file_path) as json_file:
data: dict[str, Any] = json.load(json_file)
data = fs.json_load(file_path)
return data
except json.JSONDecodeError as e:
logger.warning(f"Error loading tools from {file_path}: {e}")
@@ -90,11 +79,7 @@ class ToolUtils:
data: dict[str, Any],
fs: FileStorage = FileStorage(FileStorageProvider.LOCAL),
) -> None:
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
fs.yaml_dump(path=file_path, mode="w", encoding="utf-8", data=data)
else:
with open(file_path, "w") as file:
yaml.dump(data, file, default_flow_style=False)
fs.yaml_dump(path=file_path, mode="w", encoding="utf-8", data=data)
@staticmethod
def get_registry(
@@ -114,16 +99,12 @@ class ToolUtils:
yml_data: dict[str, Any] = {}
try:
logger.debug(f"Reading tool registry YAML: {file_path}")
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
yml_data = fs.yaml_load(file_path)
else:
with open(file_path) as file:
yml_data = yaml.safe_load(file)
yml_data = fs.yaml_load(file_path)
except FileNotFoundError:
logger.warning(f"Could not find tool registry YAML: {str(file_path)}")
if raise_exc:
raise RegistryNotFound()
pass
except Exception as error:
logger.error(f"Error while loading {str(file_path)}: {error}")
if raise_exc:

View File

@@ -55,9 +55,3 @@ class ToolMetadataKey:
class ToolOutputType:
TXT = "TXT"
JSON = "JSON"
class FeatureFlag:
"""Temporary feature flags."""
REMOTE_FILE_STORAGE = "remote_file_storage"

View File

@@ -2,11 +2,9 @@ import json
import logging
import os
from pathlib import Path
from typing import Any, Optional
from typing import Any
import fsspec
from unstract.workflow_execution.constants import (
FeatureFlag,
MetaDataKey,
ToolMetadataKey,
ToolOutputType,
@@ -16,10 +14,7 @@ from unstract.workflow_execution.constants import (
from unstract.workflow_execution.exceptions import ToolMetadataNotFound
from unstract.workflow_execution.tools_utils import ToolsUtils
from unstract.flags.feature_flag import check_feature_flag_status
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
from unstract.filesystem import FileStorageType, FileSystem
from unstract.filesystem import FileStorageType, FileSystem
logger = logging.getLogger(__name__)
@@ -31,14 +26,9 @@ class ExecutionFileHandler:
self.organization_id = organization_id
self.workflow_id = workflow_id
self.execution_id = execution_id
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
self.execution_dir = self.get_execution_dir(
workflow_id, execution_id, organization_id
)
else:
self.execution_dir = self.create_execution_dir_path(
workflow_id, execution_id, organization_id
)
self.execution_dir = self.get_execution_dir(
workflow_id, execution_id, organization_id
)
self.source_file = os.path.join(self.execution_dir, WorkflowFileType.SOURCE)
self.infile = os.path.join(self.execution_dir, WorkflowFileType.INFILE)
self.metadata_file = os.path.join(
@@ -51,14 +41,10 @@ class ExecutionFileHandler:
Returns:
dict[str, Any]: Workflow metadata.
"""
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
file_storage = file_system.get_file_storage()
metadata_content = file_storage.read(path=self.metadata_file, mode="r")
metadata = json.loads(metadata_content)
else:
with open(self.metadata_file) as file:
metadata: dict[str, Any] = json.load(file)
file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
file_storage = file_system.get_file_storage()
metadata_content = file_storage.read(path=self.metadata_file, mode="r")
metadata = json.loads(metadata_content)
return metadata
def get_list_of_tool_metadata(
@@ -138,47 +124,14 @@ class ExecutionFileHandler:
MetaDataKey.FILE_EXECUTION_ID: str(file_execution_id),
MetaDataKey.TAGS: tags,
}
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
file_storage = file_system.get_file_storage()
file_storage.json_dump(path=metadata_path, data=content)
else:
with fsspec.open(f"file://{metadata_path}", "w") as local_file:
json.dump(content, local_file)
file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
file_storage = file_system.get_file_storage()
file_storage.json_dump(path=metadata_path, data=content)
logger.info(
f"metadata for {input_file_path} is " "added in to execution directory"
)
@classmethod
def create_execution_dir_path(
cls,
workflow_id: str,
execution_id: str,
organization_id: str,
data_volume: Optional[str] = None,
) -> str:
"""Create the directory path for storing execution-related files.
Parameters:
- workflow_id (str): Identifier for the workflow.
- execution_id (str): Identifier for the execution.
- organization_id (Optional[str]):
Identifier for the organization (default: None).
Returns:
str: The directory path for the execution.
"""
workflow_data_dir = os.getenv("WORKFLOW_DATA_DIR")
data_volume = data_volume if data_volume else workflow_data_dir
if not data_volume:
raise ValueError("Missed data_volume")
execution_dir = Path(
data_volume, organization_id, str(workflow_id), str(execution_id)
)
execution_dir.mkdir(parents=True, exist_ok=True)
return str(execution_dir)
@classmethod
def get_execution_dir(
cls, workflow_id: str, execution_id: str, organization_id: str