From ff5dd19b09c3aeb95f1553d1a1cfc4f2d3fa06ce Mon Sep 17 00:00:00 2001 From: Chandrasekharan M <117059509+chandrasekharan-zipstack@users.noreply.github.com> Date: Thu, 24 Jul 2025 19:03:36 +0530 Subject: [PATCH] UN-2585 [MISC] Deprecate connector instance fields and refactor endpoint handling (#1454) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [MISC] Deprecate connector instance fields and migrate to workflow endpoints - Updated ConnectorInstance __str__ method to use connector_id and connector_mode - Migrated PipelineSerializer to use WorkflowEndpoint instead of ConnectorInstance - Enhanced WorkflowEndpointSerializer with backward compatibility fields and CRUD operations - Updated WorkflowEndpointViewSet with proper filtering and moved serializer import - Modified frontend components to use workflow-endpoint API instead of connector API - Added gitignore entries for development tools and prompting files - Removed duplicate WorkflowEndpointSerializer from workflow_v2/serializers.py 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude * [MISC] Deprecate connector instance fields and refactor endpoint handling - Add migration to make connector_type and workflow fields nullable in ConnectorInstance - Introduce EncryptedBinaryField for secure metadata storage with automatic encryption/decryption - Add deprecation warning to ConnectorInstance.metadata property - Refactor ConnectorInstanceSerializer to use new encrypted field and remove manual encryption - Update WorkflowEndpointSerializer to use connector_instance_id field instead of nested data - Fix frontend DsSettingsCard to handle both object and string connector_instance formats - Update destination/source connectors to use connector_metadata directly instead of deprecated metadata property - Add proper connector instance extraction logic in handleUpdate to prevent validation errors - Ensure backward compatibility while transitioning to new workflow endpoint architecture 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude * Minor PR comments addressed * Moved back oauth token refresh and connector metadata addition back to serializer * Replace hardcoded URLs with useRequestUrl hook in frontend components Updated InputOutput.jsx and DsSettingsCard.jsx to use the useRequestUrl hook instead of hardcoded API URLs for better maintainability and consistency. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude * UN-2585 [FIX] Fix connector instance queryset organization filtering in serializer - Add get_fields() method to WorkflowEndpointSerializer to dynamically set connector_instance_id queryset - Fix issue where connector instances were not visible due to organization context timing in integration environment - Change clearDestination method from PUT to PATCH for consistency 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude * Update backend/utils/fields.py Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> Signed-off-by: Chandrasekharan M <117059509+chandrasekharan-zipstack@users.noreply.github.com> * Added docstring for get_fields method * Fixed endpoint serializer to include workflow name * Minor code smell fix * minor: Refactored sample.env comments --------- Signed-off-by: Chandrasekharan M <117059509+chandrasekharan-zipstack@users.noreply.github.com> Co-authored-by: Claude Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- .gitignore | 14 +- ...torinstance_connector_metadata_and_more.py | 37 ++++ backend/connector_v2/models.py | 66 ++---- backend/connector_v2/serializers.py | 43 ++-- .../file_management/file_management_helper.py | 2 +- backend/pipeline_v2/serializers/crud.py | 58 ++--- backend/sample.env | 30 +-- backend/utils/fields.py | 199 ++++++++++++++++++ .../endpoint_v2/destination.py | 12 +- .../endpoint_v2/serializers.py | 26 ++- .../workflow_manager/endpoint_v2/source.py | 4 - backend/workflow_manager/endpoint_v2/urls.py | 2 +- backend/workflow_manager/endpoint_v2/views.py | 9 +- .../workflow_v2/serializers.py | 9 - backend/workflow_manager/workflow_v2/views.py | 5 +- .../ds-settings-card/DsSettingsCard.jsx | 50 +++-- .../components/api/prompt-studio-service.js | 1 + .../CreateApiDeploymentFromPromptStudio.jsx | 3 +- .../input-output/configure-ds/ConfigureDs.jsx | 13 +- .../input-output/input-output/InputOutput.jsx | 26 ++- frontend/src/hooks/usePostHogEvents.js | 1 + frontend/src/store/alert-store.js | 3 +- 22 files changed, 428 insertions(+), 185 deletions(-) create mode 100644 backend/connector_v2/migrations/0002_alter_connectorinstance_connector_metadata_and_more.py create mode 100644 backend/utils/fields.py diff --git a/.gitignore b/.gitignore index b197d162..53b1a840 100644 --- a/.gitignore +++ b/.gitignore @@ -680,7 +680,15 @@ backend/backend/*_urls.py backend/backend/*_urls_v2.py !backend/backend/public_urls_v2.py -# Adding claude prompting directories +## Vibe coding / Prompting-related files ## +prompting/ +.prompting/ + +# Claude CLAUDE.md -CONTRIBUTION_GUIDE.md -prompting/* +.claude/* + +# Windsurf +.qodo +.windsurfrules +.windsurf/rules diff --git a/backend/connector_v2/migrations/0002_alter_connectorinstance_connector_metadata_and_more.py b/backend/connector_v2/migrations/0002_alter_connectorinstance_connector_metadata_and_more.py new file mode 100644 index 00000000..267bcb87 --- /dev/null +++ b/backend/connector_v2/migrations/0002_alter_connectorinstance_connector_metadata_and_more.py @@ -0,0 +1,37 @@ +# Generated by Django 4.2.1 on 2025-07-16 10:26 + +import django.db.models.deletion +import utils.fields +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("workflow_v2", "0015_executionlog_idx_wf_execution_event_time_and_more"), + ("connector_v2", "0001_initial"), + ] + + operations = [ + migrations.AlterField( + model_name="connectorinstance", + name="connector_metadata", + field=utils.fields.EncryptedBinaryField(null=True), + ), + migrations.AlterField( + model_name="connectorinstance", + name="connector_type", + field=models.CharField( + choices=[("INPUT", "Input"), ("OUTPUT", "Output")], null=True + ), + ), + migrations.AlterField( + model_name="connectorinstance", + name="workflow", + field=models.ForeignKey( + null=True, + on_delete=django.db.models.deletion.CASCADE, + related_name="connector_workflow", + to="workflow_v2.workflow", + ), + ), + ] diff --git a/backend/connector_v2/models.py b/backend/connector_v2/models.py index dbeaeb01..d7276aa1 100644 --- a/backend/connector_v2/models.py +++ b/backend/connector_v2/models.py @@ -1,4 +1,4 @@ -import json +import logging import uuid from typing import Any @@ -6,10 +6,8 @@ from account_v2.models import User from connector_auth_v2.models import ConnectorAuth from connector_processor.connector_processor import ConnectorProcessor from connector_processor.constants import ConnectorKeys -from cryptography.fernet import Fernet, InvalidToken -from django.conf import settings from django.db import models -from utils.exceptions import InvalidEncryptionKey +from utils.fields import EncryptedBinaryField from utils.models.base_model import BaseModel from utils.models.organization_mixin import ( DefaultOrganizationManagerMixin, @@ -21,6 +19,8 @@ from backend.constants import FieldLengthConstants as FLC CONNECTOR_NAME_SIZE = 128 VERSION_NAME_SIZE = 64 +logger = logging.getLogger(__name__) + class ConnectorInstanceModelManager(DefaultOrganizationManagerMixin, models.Manager): pass @@ -45,13 +45,13 @@ class ConnectorInstance(DefaultOrganizationMixin, BaseModel): "workflow_v2.Workflow", on_delete=models.CASCADE, related_name="connector_workflow", - null=False, + null=True, blank=False, ) connector_id = models.CharField(max_length=FLC.CONNECTOR_ID_LENGTH, default="") - connector_metadata = models.BinaryField(null=True) + connector_metadata = EncryptedBinaryField(null=True) connector_version = models.CharField(max_length=VERSION_NAME_SIZE, default="") - connector_type = models.CharField(choices=ConnectorType.choices) + connector_type = models.CharField(choices=ConnectorType.choices, null=True) # TODO: handle connector_auth cascade deletion connector_auth = models.ForeignKey( ConnectorAuth, @@ -84,21 +84,6 @@ class ConnectorInstance(DefaultOrganizationMixin, BaseModel): # Manager objects = ConnectorInstanceModelManager() - # TODO: Remove if unused - def get_connector_metadata(self) -> dict[str, str]: - """Gets connector metadata and refreshes the tokens if needed in case - of OAuth. - """ - tokens_refreshed = False - if self.connector_auth: - ( - self.connector_metadata, - tokens_refreshed, - ) = self.connector_auth.get_and_refresh_tokens() - if tokens_refreshed: - self.save() - return self.connector_metadata - @staticmethod def supportsOAuth(connector_id: str) -> bool: return bool( @@ -109,19 +94,9 @@ class ConnectorInstance(DefaultOrganizationMixin, BaseModel): def __str__(self) -> str: return ( - f"Connector({self.id}, type{self.connector_type}, workflow: {self.workflow})" + f"Connector({self.id}, ID={self.connector_id}, mode: {self.connector_mode})" ) - def get_connector_metadata_bytes(self): - """Convert connector_metadata to bytes if it is a memoryview. - - Returns: - bytes: The connector_metadata as bytes. - """ - if isinstance(self.connector_metadata, memoryview): - return self.connector_metadata.tobytes() - return self.connector_metadata - @property def metadata(self) -> Any: """Decrypt and return the connector metadata as a dictionary. @@ -132,21 +107,20 @@ class ConnectorInstance(DefaultOrganizationMixin, BaseModel): Returns: dict: The decrypted connector metadata. + + .. deprecated:: + This property is deprecated. Use `connector_metadata` field directly instead. + This property will be removed in a future version. """ - try: - connector_metadata_bytes = self.get_connector_metadata_bytes() + import warnings - if connector_metadata_bytes is None: - return None - - if isinstance(connector_metadata_bytes, (dict)): - return connector_metadata_bytes - encryption_secret: str = settings.ENCRYPTION_KEY - cipher_suite: Fernet = Fernet(encryption_secret.encode("utf-8")) - decrypted_value = cipher_suite.decrypt(connector_metadata_bytes) - except InvalidToken: - raise InvalidEncryptionKey(entity=InvalidEncryptionKey.Entity.CONNECTOR) - return json.loads(decrypted_value.decode("utf-8")) + warnings.warn( + "The 'metadata' property is deprecated. Use 'connector_metadata' field directly instead. " + "This property will be removed in a future version.", + DeprecationWarning, + stacklevel=2, + ) + return self.connector_metadata class Meta: db_table = "connector_instance" diff --git a/backend/connector_v2/serializers.py b/backend/connector_v2/serializers.py index 2d274c19..c88598e3 100644 --- a/backend/connector_v2/serializers.py +++ b/backend/connector_v2/serializers.py @@ -1,4 +1,3 @@ -import json import logging from collections import OrderedDict from typing import Any @@ -8,8 +7,8 @@ from connector_auth_v2.pipeline.common import ConnectorAuthHelper from connector_processor.connector_processor import ConnectorProcessor from connector_processor.constants import ConnectorKeys from connector_processor.exceptions import OAuthTimeOut -from cryptography.fernet import Fernet -from django.conf import settings +from rest_framework.serializers import SerializerMethodField +from utils.fields import EncryptedBinaryFieldSerializer from utils.serializer_utils import SerializerUtils from backend.serializers import AuditSerializer @@ -22,22 +21,13 @@ logger = logging.getLogger(__name__) class ConnectorInstanceSerializer(AuditSerializer): + connector_metadata = EncryptedBinaryFieldSerializer(required=False, allow_null=True) + icon = SerializerMethodField() + class Meta: model = ConnectorInstance fields = "__all__" - def validate_connector_metadata(self, value: dict[Any]) -> dict[Any]: - """Validating Json metadata This custom validation is to avoid conflict - with user input and db binary data. - - Args: - value (Any): dict of metadata - - Returns: - dict[Any]: dict of metadata - """ - return value - def save(self, **kwargs): # type: ignore user = self.context.get("request").user or None connector_id: str = kwargs[CIKey.CONNECTOR_ID] @@ -68,15 +58,19 @@ class ConnectorInstanceSerializer(AuditSerializer): ) kwargs[CIKey.CONNECTOR_MODE] = connector_mode.value - encryption_secret: str = settings.ENCRYPTION_KEY - f: Fernet = Fernet(encryption_secret.encode("utf-8")) - json_string: str = json.dumps(kwargs.get(CIKey.CONNECTOR_METADATA)) - - kwargs[CIKey.CONNECTOR_METADATA] = f.encrypt(json_string.encode("utf-8")) - instance = super().save(**kwargs) return instance + def get_icon(self, obj: ConnectorInstance) -> str: + """Get connector icon from ConnectorProcessor.""" + icon_path = ConnectorProcessor.get_connector_data_with_key( + obj.connector_id, ConnectorKeys.ICON + ) + # Ensure icon path is properly formatted for frontend + if icon_path and not icon_path.startswith("/"): + return f"/{icon_path}" + return icon_path + def to_representation(self, instance: ConnectorInstance) -> dict[str, str]: # to remove the sensitive fields being returned rep: OrderedDict[str, Any] = super().to_representation(instance) @@ -84,11 +78,6 @@ class ConnectorInstanceSerializer(AuditSerializer): rep[CIKey.CONNECTOR_METADATA] = {} if SerializerUtils.check_context_for_GET_or_POST(context=self.context): rep.pop(CIKey.CONNECTOR_AUTH) - # set icon fields for UI - rep[ConnectorKeys.ICON] = ConnectorProcessor.get_connector_data_with_key( - instance.connector_id, ConnectorKeys.ICON - ) + rep[ConnectorKeys.ICON] = self.get_icon(instance) - if instance.connector_metadata: - rep[CIKey.CONNECTOR_METADATA] = instance.metadata return rep diff --git a/backend/file_management/file_management_helper.py b/backend/file_management/file_management_helper.py index cd6ab2d6..ab56c4db 100644 --- a/backend/file_management/file_management_helper.py +++ b/backend/file_management/file_management_helper.py @@ -35,7 +35,7 @@ class FileManagerHelper: @staticmethod def get_file_system(connector: ConnectorInstance) -> UnstractFileSystem: """Creates the `UnstractFileSystem` for the corresponding connector.""" - metadata = connector.metadata + metadata = connector.connector_metadata if connector.connector_id in fs_connectors: connector = fs_connectors[connector.connector_id]["metadata"]["connector"] connector_class: UnstractFileSystem = connector(metadata) diff --git a/backend/pipeline_v2/serializers/crud.py b/backend/pipeline_v2/serializers/crud.py index d69f616f..ab4f9705 100644 --- a/backend/pipeline_v2/serializers/crud.py +++ b/backend/pipeline_v2/serializers/crud.py @@ -224,44 +224,42 @@ class PipelineSerializer(IntegrityErrorMixin, AuditSerializer): def _add_connector_data( self, repr: OrderedDict[str, Any], - connector_instance_list: list[Any], + workflow_endpoints: list[Any], connectors: list[Any], ) -> OrderedDict[str, Any]: """Adds connector Input/Output data. Args: - sef (_type_): _description_ - repr (OrderedDict[str, Any]): _description_ + repr (OrderedDict[str, Any]): The representation dictionary + workflow_endpoints (list[Any]): List of WorkflowEndpoint objects + connectors (list[Any]): List of available connectors Returns: - OrderedDict[str, Any]: _description_ + OrderedDict[str, Any]: Updated representation with connector data """ repr[PC.SOURCE_NAME] = PC.NOT_CONFIGURED repr[PC.DESTINATION_NAME] = PC.NOT_CONFIGURED - for instance in connector_instance_list: - if instance.connector_type == "INPUT": - repr[PC.SOURCE_NAME], repr[PC.SOURCE_ICON] = self._get_name_and_icon( - connectors=connectors, - connector_id=instance.connector_id, - ) - if instance.connector_type == "OUTPUT": - repr[PC.DESTINATION_NAME], repr[PC.DESTINATION_ICON] = ( - self._get_name_and_icon( + + for endpoint in workflow_endpoints: + if endpoint.endpoint_type == WorkflowEndpoint.EndpointType.SOURCE: + if endpoint.connector_instance: + repr[PC.SOURCE_NAME], repr[PC.SOURCE_ICON] = self._get_name_and_icon( connectors=connectors, - connector_id=instance.connector_id, + connector_id=endpoint.connector_instance.connector_id, ) - ) - if repr[PC.DESTINATION_NAME] == PC.NOT_CONFIGURED: - try: - check_manual_review = WorkflowEndpoint.objects.get( - workflow=instance.workflow, - endpoint_type=WorkflowEndpoint.EndpointType.DESTINATION, - connection_type=WorkflowEndpoint.ConnectionType.MANUALREVIEW, + elif endpoint.endpoint_type == WorkflowEndpoint.EndpointType.DESTINATION: + if ( + endpoint.connection_type + == WorkflowEndpoint.ConnectionType.MANUALREVIEW + ): + repr[PC.DESTINATION_NAME] = "Manual Review" + elif endpoint.connector_instance: + repr[PC.DESTINATION_NAME], repr[PC.DESTINATION_ICON] = ( + self._get_name_and_icon( + connectors=connectors, + connector_id=endpoint.connector_instance.connector_id, + ) ) - if check_manual_review: - repr[PC.DESTINATION_NAME] = "Manual Review" - except Exception as ex: - logger.debug(f"Not a Manual review destination: {ex}") return repr def to_representation(self, instance: Pipeline) -> OrderedDict[str, Any]: @@ -273,15 +271,17 @@ class PipelineSerializer(IntegrityErrorMixin, AuditSerializer): if SerializerUtils.check_context_for_GET_or_POST(context=self.context): workflow = instance.workflow - connector_instance_list = ConnectorInstance.objects.filter( - workflow=workflow.id - ).all() + workflow_endpoints = ( + WorkflowEndpoint.objects.filter(workflow=workflow.id) + .select_related("connector_instance") + .all() + ) repr[PK.WORKFLOW_ID] = workflow.id repr[PK.WORKFLOW_NAME] = workflow.workflow_name repr[PK.CRON_STRING] = repr.pop(PK.CRON_STRING) repr = self._add_connector_data( repr=repr, - connector_instance_list=connector_instance_list, + workflow_endpoints=workflow_endpoints, connectors=connectors, ) diff --git a/backend/sample.env b/backend/sample.env index 9c55b203..1e0076cd 100644 --- a/backend/sample.env +++ b/backend/sample.env @@ -169,25 +169,27 @@ TOOL_REGISTRY_STORAGE_CREDENTIALS='{"provider":"local"}' ENABLE_HIGHLIGHT_API_DEPLOYMENT=False # Execution result and cache expire time -# For API results cached per workflow execution -EXECUTION_RESULT_TTL_SECONDS=86400 # 24 hours -# For execution metadata cached per workflow execution -EXECUTION_CACHE_TTL_SECONDS=86400 # 24 hours -# Instant workflow polling timeout in seconds -INSTANT_WF_POLLING_TIMEOUT=300 # 5 minutes +# For API results cached per workflow execution (24 hours) +EXECUTION_RESULT_TTL_SECONDS=86400 +# For execution metadata cached per workflow execution (24 hours) +EXECUTION_CACHE_TTL_SECONDS=86400 +# Instant workflow polling timeout in seconds (5 minutes) +INSTANT_WF_POLLING_TIMEOUT=300 -# Maximum number of batches (i.e., parallel tasks) created for a single workflow execution -MAX_PARALLEL_FILE_BATCHES=1 # 1 file at a time +# Maximum number of batches (i.e., parallel tasks) created for a single workflow execution (1 file at a time) +MAX_PARALLEL_FILE_BATCHES=1 # Maximum allowed value for MAX_PARALLEL_FILE_BATCHES (upper limit for validation) MAX_PARALLEL_FILE_BATCHES_MAX_VALUE=100 -# File execution tracker ttl in seconds -FILE_EXECUTION_TRACKER_TTL_IN_SECOND=18000 # 5 hours -FILE_EXECUTION_TRACKER_COMPLETED_TTL_IN_SECOND=600 # 10 minutes +# File execution tracker TTL in seconds (5 hours) +FILE_EXECUTION_TRACKER_TTL_IN_SECOND=18000 +# File execution tracker completed TTL in seconds (10 minutes) +FILE_EXECUTION_TRACKER_COMPLETED_TTL_IN_SECOND=600 -# Runner polling timeout -MAX_RUNNER_POLLING_WAIT_SECONDS=10800 # 3 hours -RUNNER_POLLING_INTERVAL_SECONDS=2 # 2 seconds +# Runner polling timeout (3 hours) +MAX_RUNNER_POLLING_WAIT_SECONDS=10800 +# Runner polling interval (2 seconds) +RUNNER_POLLING_INTERVAL_SECONDS=2 # ETL Pipeline minimum schedule interval (in seconds) # Default: 1800 seconds (30 minutes) diff --git a/backend/utils/fields.py b/backend/utils/fields.py new file mode 100644 index 00000000..e64f0412 --- /dev/null +++ b/backend/utils/fields.py @@ -0,0 +1,199 @@ +"""Custom Django model fields and serializer fields for common use cases.""" + +import json +import logging +from typing import Any + +from cryptography.fernet import Fernet, InvalidToken +from django.conf import settings +from django.db import models +from rest_framework import serializers + +from utils.exceptions import InvalidEncryptionKey + +logger = logging.getLogger(__name__) + + +class EncryptedBinaryField(models.BinaryField): + """A BinaryField that automatically encrypts/decrypts JSON data. + + This field transparently handles encryption when storing data to the database + and decryption when retrieving data from the database. It expects dictionary + data as input and returns dictionary data as output. + + Features: + - Automatic encryption/decryption using Fernet symmetric encryption + - JSON serialization/deserialization + - Proper error handling for encryption failures + - Null value support + - Compatible with Django ORM and DRF serializers + + Usage: + class MyModel(models.Model): + encrypted_data = EncryptedBinaryField(null=True) + + # Usage is transparent + instance = MyModel.objects.create(encrypted_data={'key': 'value'}) + print(instance.encrypted_data) # {'key': 'value'} + """ + + description = "A binary field that automatically encrypts/decrypts JSON data" + + def __init__(self, *args, **kwargs): + # Remove any JSON-specific kwargs that might be passed + kwargs.pop("encoder", None) + kwargs.pop("decoder", None) + super().__init__(*args, **kwargs) + + def _get_encryption_key(self) -> bytes: + """Get the encryption key from Django settings.""" + encryption_key = getattr(settings, "ENCRYPTION_KEY", None) + if not encryption_key: + raise ValueError("ENCRYPTION_KEY not found in Django settings") + return encryption_key.encode("utf-8") + + def _encrypt_value(self, value: Any) -> bytes | None: + """Encrypt a Python value (typically dict) to bytes.""" + if value is None: + return None + + try: + # Serialize to JSON string + json_string = json.dumps(value) + + # Encrypt the JSON string + cipher_suite = Fernet(self._get_encryption_key()) + encrypted_bytes = cipher_suite.encrypt(json_string.encode("utf-8")) + + return encrypted_bytes + except Exception as e: + logger.error(f"Failed to encrypt value: {e}") + raise + + def _decrypt_value(self, value: bytes) -> dict | None: + """Decrypt bytes to a Python value (typically dict).""" + if value is None: + return None + + try: + # Handle memoryview objects from database + if isinstance(value, memoryview): + value = value.tobytes() + + # If it's already a dict, return as-is (for backward compatibility) + if isinstance(value, dict): + logger.warning( + "Detected unencrypted dict in EncryptedBinaryField – " + "legacy data will remain unencrypted. " + "Run a migration to secure all records." + ) + return value + # Decrypt the bytes + cipher_suite = Fernet(self._get_encryption_key()) + decrypted_bytes = cipher_suite.decrypt(value) + + # Parse JSON string back to Python object + json_string = decrypted_bytes.decode("utf-8") + return json.loads(json_string) + + except InvalidToken: + logger.error("Invalid encryption token - possibly wrong encryption key") + raise InvalidEncryptionKey(entity=InvalidEncryptionKey.Entity.CONNECTOR) + except Exception as e: + logger.error(f"Failed to decrypt value: {e}") + raise + + def from_db_value(self, value, expression, connection): + """Convert database value to Python value. + Called when data is loaded from the database. + """ + return self._decrypt_value(value) + + def to_python(self, value): + """Convert input value to Python value. + Called during model validation and form processing. + """ + if value is None: + return value + + # If it's already a Python object (dict), return as-is + if isinstance(value, dict): + return value + + # If it's bytes (encrypted), decrypt it + if isinstance(value, (bytes, memoryview)): + return self._decrypt_value(value) + + # If it's a string, try to parse as JSON + if isinstance(value, str): + try: + return json.loads(value) + except json.JSONDecodeError: + pass + + return value + + def get_prep_value(self, value): + """Convert Python value to database value. + Called before saving to the database. + """ + if value is None: + return None + + # If it's already bytes (encrypted), return as-is + if isinstance(value, (bytes, memoryview)): + return value + + # Encrypt the value + return self._encrypt_value(value) + + def get_db_prep_value(self, value, connection, prepared=False): + """Convert Python value to database-specific value.""" + if not prepared: + value = self.get_prep_value(value) + return super().get_db_prep_value(value, connection, prepared) + + def value_to_string(self, obj): + """Convert field value to string for serialization.""" + value = self.value_from_object(obj) + if value is None: + return None + return json.dumps(value) + + def formfield(self, **kwargs): + """Return a form field for this model field.""" + # Use a CharField for forms since we want JSON input + from django import forms + + defaults = {"widget": forms.Textarea} + defaults.update(kwargs) + return forms.CharField(**defaults) + + +class EncryptedBinaryFieldSerializer(serializers.Field): + """Custom serializer field for EncryptedBinaryField. + + This field handles serialization/deserialization of encrypted binary data + as JSON dictionaries, making it compatible with DRF's serialization process. + """ + + def to_representation(self, value): + """Convert encrypted binary data to JSON dictionary for API responses.""" + if value is None: + return None + + # The EncryptedBinaryField already decrypts the value when accessed + # so we just need to return it as-is + return value + + def to_internal_value(self, data): + """Convert JSON dictionary to internal value for database storage.""" + if data is None: + return None + + # Validate that the data is a dictionary + if not isinstance(data, dict): + raise serializers.ValidationError("Expected a dictionary.") + + # Return the data as-is; the EncryptedBinaryField will handle encryption + return data diff --git a/backend/workflow_manager/endpoint_v2/destination.py b/backend/workflow_manager/endpoint_v2/destination.py index b83d8621..ab3c7fd4 100644 --- a/backend/workflow_manager/endpoint_v2/destination.py +++ b/backend/workflow_manager/endpoint_v2/destination.py @@ -100,10 +100,6 @@ class DestinationConnector(BaseConnector): workflow=workflow, endpoint_type=WorkflowEndpoint.EndpointType.DESTINATION, ) - if endpoint.connector_instance: - endpoint.connector_instance.connector_metadata = ( - endpoint.connector_instance.metadata - ) return endpoint def _get_source_endpoint_for_workflow( @@ -123,10 +119,6 @@ class DestinationConnector(BaseConnector): workflow=workflow, endpoint_type=WorkflowEndpoint.EndpointType.SOURCE, ) - if endpoint.connector_instance: - endpoint.connector_instance.connector_metadata = ( - endpoint.connector_instance.metadata - ) return endpoint def validate(self) -> None: @@ -149,7 +141,7 @@ class DestinationConnector(BaseConnector): # Get database class and test connection db_class = DatabaseUtils.get_db_class( connector_id=connector.connector_id, - connector_settings=connector.metadata, + connector_settings=connector.connector_metadata, ) engine = db_class.get_engine() if hasattr(engine, "close"): @@ -319,7 +311,7 @@ class DestinationConnector(BaseConnector): def insert_into_db(self, input_file_path: str) -> None: """Insert data into the database.""" connector_instance: ConnectorInstance = self.endpoint.connector_instance - connector_settings: dict[str, Any] = connector_instance.metadata + connector_settings: dict[str, Any] = connector_instance.connector_metadata destination_configurations: dict[str, Any] = self.endpoint.configuration table_name: str = str(destination_configurations.get(DestinationKey.TABLE)) include_agent: bool = bool( diff --git a/backend/workflow_manager/endpoint_v2/serializers.py b/backend/workflow_manager/endpoint_v2/serializers.py index 2ce3d4a8..27990f40 100644 --- a/backend/workflow_manager/endpoint_v2/serializers.py +++ b/backend/workflow_manager/endpoint_v2/serializers.py @@ -1,12 +1,36 @@ import logging -from rest_framework.serializers import ModelSerializer +from connector_v2.models import ConnectorInstance +from connector_v2.serializers import ConnectorInstanceSerializer +from rest_framework import serializers +from rest_framework.serializers import ( + ModelSerializer, +) from workflow_manager.endpoint_v2.models import WorkflowEndpoint logger = logging.getLogger(__name__) class WorkflowEndpointSerializer(ModelSerializer): + connector_instance = ConnectorInstanceSerializer(read_only=True) + connector_instance_id = serializers.PrimaryKeyRelatedField( + queryset=ConnectorInstance.objects.all(), # To not make DRF shout + source="connector_instance", + write_only=True, + allow_null=True, + ) + workflow_name = serializers.CharField(source="workflow.workflow_name", read_only=True) + class Meta: model = WorkflowEndpoint fields = "__all__" + + def get_fields(self): + """Override get_fields to dynamically set the connector_instance_id queryset. + + This is needed to ensure that the queryset is set after the organization + context is available. + """ + fields = super().get_fields() + fields["connector_instance_id"].queryset = ConnectorInstance.objects.all() + return fields diff --git a/backend/workflow_manager/endpoint_v2/source.py b/backend/workflow_manager/endpoint_v2/source.py index 4098963d..f1cc4d72 100644 --- a/backend/workflow_manager/endpoint_v2/source.py +++ b/backend/workflow_manager/endpoint_v2/source.py @@ -118,10 +118,6 @@ class SourceConnector(BaseConnector): workflow=workflow, endpoint_type=WorkflowEndpoint.EndpointType.SOURCE, ) - if endpoint.connector_instance: - endpoint.connector_instance.connector_metadata = ( - endpoint.connector_instance.metadata - ) return endpoint def validate(self) -> None: diff --git a/backend/workflow_manager/endpoint_v2/urls.py b/backend/workflow_manager/endpoint_v2/urls.py index 522859e2..d4dec7c3 100644 --- a/backend/workflow_manager/endpoint_v2/urls.py +++ b/backend/workflow_manager/endpoint_v2/urls.py @@ -6,7 +6,7 @@ workflow_endpoint_list = WorkflowEndpointViewSet.as_view( ) endpoint_list = WorkflowEndpointViewSet.as_view({"get": "list"}) workflow_endpoint_detail = WorkflowEndpointViewSet.as_view( - {"get": "retrieve", "put": "update"} + {"get": "retrieve", "put": "update", "patch": "partial_update"} ) endpoint_settings_detail = WorkflowEndpointViewSet.as_view( {"get": WorkflowEndpointViewSet.get_settings.__name__} diff --git a/backend/workflow_manager/endpoint_v2/views.py b/backend/workflow_manager/endpoint_v2/views.py index 1766cf45..986803e8 100644 --- a/backend/workflow_manager/endpoint_v2/views.py +++ b/backend/workflow_manager/endpoint_v2/views.py @@ -6,8 +6,8 @@ from rest_framework.response import Response from workflow_manager.endpoint_v2.destination import DestinationConnector from workflow_manager.endpoint_v2.endpoint_utils import WorkflowEndpointUtils from workflow_manager.endpoint_v2.models import WorkflowEndpoint +from workflow_manager.endpoint_v2.serializers import WorkflowEndpointSerializer from workflow_manager.endpoint_v2.source import SourceConnector -from workflow_manager.workflow_v2.serializers import WorkflowEndpointSerializer class WorkflowEndpointViewSet(viewsets.ModelViewSet): @@ -19,10 +19,15 @@ class WorkflowEndpointViewSet(viewsets.ModelViewSet): .select_related("workflow") .filter(workflow__created_by=self.request.user) ) + workflow_filter = self.request.query_params.get("workflow", None) + if workflow_filter: + queryset = queryset.filter(workflow_id=workflow_filter) + endpoint_type_filter = self.request.query_params.get("endpoint_type", None) - connection_type_filter = self.request.query_params.get("connection_type", None) if endpoint_type_filter: queryset = queryset.filter(endpoint_type=endpoint_type_filter) + + connection_type_filter = self.request.query_params.get("connection_type", None) if connection_type_filter: queryset = queryset.filter(connection_type=connection_type_filter) return queryset diff --git a/backend/workflow_manager/workflow_v2/serializers.py b/backend/workflow_manager/workflow_v2/serializers.py index b076b889..1ec9a559 100644 --- a/backend/workflow_manager/workflow_v2/serializers.py +++ b/backend/workflow_manager/workflow_v2/serializers.py @@ -16,7 +16,6 @@ from utils.serializer.integrity_error_mixin import IntegrityErrorMixin from backend.constants import RequestKey from backend.serializers import AuditSerializer -from workflow_manager.endpoint_v2.models import WorkflowEndpoint from workflow_manager.workflow_v2.constants import WorkflowExecutionKey, WorkflowKey from workflow_manager.workflow_v2.models.execution import WorkflowExecution from workflow_manager.workflow_v2.models.execution_log import ExecutionLog @@ -104,14 +103,6 @@ class ExecuteWorkflowResponseSerializer(Serializer): result = JSONField() -class WorkflowEndpointSerializer(ModelSerializer): - workflow_name = CharField(source="workflow.workflow_name", read_only=True) - - class Meta: - model = WorkflowEndpoint - fields = "__all__" - - class WorkflowExecutionSerializer(ModelSerializer): class Meta: model = WorkflowExecution diff --git a/backend/workflow_manager/workflow_v2/views.py b/backend/workflow_manager/workflow_v2/views.py index a0d60772..01f75034 100644 --- a/backend/workflow_manager/workflow_v2/views.py +++ b/backend/workflow_manager/workflow_v2/views.py @@ -100,10 +100,11 @@ class WorkflowViewSet(viewsets.ModelViewSet): is_active=True, ) try: + # Create empty WorkflowEndpoints for UI compatibility + # ConnectorInstances will be created when users actually configure connectors WorkflowEndpointUtils.create_endpoints_for_workflow(workflow) - # NOTE: Add default connector here if needed except Exception as e: - logger.error(f"Error saving workflow to DB: {e}") + logger.error(f"Error creating workflow endpoints: {e}") raise WorkflowGenerationError return workflow diff --git a/frontend/src/components/agency/ds-settings-card/DsSettingsCard.jsx b/frontend/src/components/agency/ds-settings-card/DsSettingsCard.jsx index d41064a6..8c1e34b8 100644 --- a/frontend/src/components/agency/ds-settings-card/DsSettingsCard.jsx +++ b/frontend/src/components/agency/ds-settings-card/DsSettingsCard.jsx @@ -23,6 +23,7 @@ import { useAxiosPrivate } from "../../../hooks/useAxiosPrivate"; import { useAlertStore } from "../../../store/alert-store"; import { useSessionStore } from "../../../store/session-store"; import { useWorkflowStore } from "../../../store/workflow-store"; +import useRequestUrl from "../../../hooks/useRequestUrl"; import SpaceWrapper from "../../widgets/space-wrapper/SpaceWrapper"; import { ConfigureConnectorModal } from "../configure-connector-modal/ConfigureConnectorModal"; import { useExceptionHandler } from "../../../hooks/useExceptionHandler"; @@ -82,6 +83,7 @@ function DsSettingsCard({ type, endpointDetails, message }) { const axiosPrivate = useAxiosPrivate(); const handleException = useExceptionHandler(); const { flags } = sessionDetails; + const { getUrl } = useRequestUrl(); const icons = { input: , @@ -144,16 +146,30 @@ function DsSettingsCard({ type, endpointDetails, message }) { setConnType(endpointDetails?.connection_type); } - if (!endpointDetails?.connector_instance?.length) { + if (!endpointDetails?.connector_instance) { setConnDetails({}); return; } - if (connDetails?.id === endpointDetails?.connector_instance) { + // Use connector_instance data directly from endpointDetails if it's an object + if (typeof endpointDetails?.connector_instance === "object") { + const connectorData = endpointDetails.connector_instance; + connectorData.connector_metadata = connectorData.connector_metadata || {}; + connectorData.connector_metadata.connectorName = + connectorData?.connector_name || ""; + setConnDetails(connectorData); + setSelectedId(connectorData?.connector_id); return; } - getSourceDetails(); + // Fallback for legacy connector_instance ID format (string) + if (typeof endpointDetails?.connector_instance === "string") { + // Only call getSourceDetails if we haven't already loaded this connector + if (connDetails?.id !== endpointDetails?.connector_instance) { + getSourceDetails(); + } + return; + } }, [endpointDetails]); useEffect(() => { @@ -187,7 +203,7 @@ function DsSettingsCard({ type, endpointDetails, message }) { setFormDataConfig(endpointDetails.configuration || {}); const requestOptions = { method: "GET", - url: `/api/v1/unstract/${sessionDetails?.orgId}/workflow/endpoint/${endpointDetails?.id}/settings/`, + url: getUrl(`workflow/endpoint/${endpointDetails?.id}/settings/`), }; setIsSpecConfigLoading(true); @@ -211,9 +227,7 @@ function DsSettingsCard({ type, endpointDetails, message }) { const requestOptions = { method: "GET", - url: `/api/v1/unstract/${ - sessionDetails?.orgId - }/supported_connectors/?type=${type.toUpperCase()}`, + url: getUrl(`supported_connectors/?type=${type.toUpperCase()}`), }; axiosPrivate(requestOptions) @@ -250,16 +264,14 @@ function DsSettingsCard({ type, endpointDetails, message }) { }; const clearDestination = (updatedData) => { - const body = { ...destination, ...updatedData }; - const requestOptions = { - method: "PUT", - url: `/api/v1/unstract/${sessionDetails?.orgId}/workflow/endpoint/${destination?.id}/`, + method: "PATCH", + url: getUrl(`workflow/endpoint/${destination?.id}/`), headers: { "X-CSRFToken": sessionDetails?.csrfToken, "Content-Type": "application/json", }, - data: body, + data: updatedData, }; axiosPrivate(requestOptions) @@ -279,22 +291,20 @@ function DsSettingsCard({ type, endpointDetails, message }) { if (type === "input") { clearDestination({ connection_type: "", - connector_instance: null, + connector_instance_id: null, }); } }; const handleUpdate = (updatedData, showSuccess) => { - const body = { ...endpointDetails, ...updatedData }; - const requestOptions = { - method: "PUT", - url: `/api/v1/unstract/${sessionDetails?.orgId}/workflow/endpoint/${endpointDetails?.id}/`, + method: "PATCH", + url: getUrl(`workflow/endpoint/${endpointDetails?.id}/`), headers: { "X-CSRFToken": sessionDetails?.csrfToken, "Content-Type": "application/json", }, - data: body, + data: updatedData, }; axiosPrivate(requestOptions) .then((res) => { @@ -321,7 +331,7 @@ function DsSettingsCard({ type, endpointDetails, message }) { const getSourceDetails = () => { const requestOptions = { method: "GET", - url: `/api/v1/unstract/${sessionDetails?.orgId}/connector/${endpointDetails?.connector_instance}/`, + url: getUrl(`connector/${endpointDetails?.connector_instance}/`), }; axiosPrivate(requestOptions) @@ -362,7 +372,7 @@ function DsSettingsCard({ type, endpointDetails, message }) { onChange={(value) => { handleUpdate({ connection_type: value, - connector_instance: null, + connector_instance_id: null, }); updateDestination(); }} diff --git a/frontend/src/components/api/prompt-studio-service.js b/frontend/src/components/api/prompt-studio-service.js index 407726fa..8a2a7c17 100644 --- a/frontend/src/components/api/prompt-studio-service.js +++ b/frontend/src/components/api/prompt-studio-service.js @@ -1,4 +1,5 @@ import { useCallback } from "react"; + import useRequestUrl from "../../hooks/useRequestUrl"; import { useAxiosPrivate } from "../../hooks/useAxiosPrivate"; diff --git a/frontend/src/components/deployments/create-api-deployment-from-prompt-studio/CreateApiDeploymentFromPromptStudio.jsx b/frontend/src/components/deployments/create-api-deployment-from-prompt-studio/CreateApiDeploymentFromPromptStudio.jsx index dcb5bddf..b65630f6 100644 --- a/frontend/src/components/deployments/create-api-deployment-from-prompt-studio/CreateApiDeploymentFromPromptStudio.jsx +++ b/frontend/src/components/deployments/create-api-deployment-from-prompt-studio/CreateApiDeploymentFromPromptStudio.jsx @@ -460,14 +460,13 @@ const CreateApiDeploymentFromPromptStudio = ({ // Update each endpoint to set connection_type to API for (const endpoint of endpoints) { await axiosPrivate({ - method: "PUT", + method: "PATCH", url: getUrl(`workflow/endpoint/${endpoint.id}/`), headers: { "X-CSRFToken": sessionDetails?.csrfToken, "Content-Type": "application/json", }, data: { - ...endpoint, connection_type: "API", configuration: endpoint.configuration || {}, }, diff --git a/frontend/src/components/input-output/configure-ds/ConfigureDs.jsx b/frontend/src/components/input-output/configure-ds/ConfigureDs.jsx index 75ecb2e1..e4a527b4 100644 --- a/frontend/src/components/input-output/configure-ds/ConfigureDs.jsx +++ b/frontend/src/components/input-output/configure-ds/ConfigureDs.jsx @@ -184,14 +184,16 @@ function ConfigureDs({ const connectorMetadata = { ...formData }; const connectorName = connectorMetadata?.connectorName; delete connectorMetadata.connectorName; + body = { - workflow: id, - created_by: sessionDetails?.id, connector_id: selectedSourceId, connector_metadata: connectorMetadata, - connector_type: type.toUpperCase(), connector_name: connectorName, + created_by: sessionDetails?.id, + workflow: id, + connector_type: type.toUpperCase(), }; + url += "connector/"; try { @@ -254,7 +256,10 @@ function ConfigureDs({ const data = res?.data; if (sourceTypes.connectors.includes(type)) { handleUpdate( - { connector_instance: data?.id, configuration: formDataConfig }, + { + connector_instance_id: data?.id, + configuration: formDataConfig, + }, true ); setIsTcSuccessful(false); diff --git a/frontend/src/components/input-output/input-output/InputOutput.jsx b/frontend/src/components/input-output/input-output/InputOutput.jsx index a8059ce0..95e37b97 100644 --- a/frontend/src/components/input-output/input-output/InputOutput.jsx +++ b/frontend/src/components/input-output/input-output/InputOutput.jsx @@ -6,6 +6,7 @@ import { CONNECTOR_TYPE_MAP } from "../../../helpers/GetStaticData.js"; import { useAxiosPrivate } from "../../../hooks/useAxiosPrivate"; import { useAlertStore } from "../../../store/alert-store"; import { useSessionStore } from "../../../store/session-store"; +import useRequestUrl from "../../../hooks/useRequestUrl"; import { AddSourceModal } from "../add-source-modal/AddSourceModal.jsx"; import { ManageFiles } from "../manage-files/ManageFiles.jsx"; import { Sidebar } from "../sidebar/Sidebar.jsx"; @@ -26,6 +27,7 @@ function InputOutput() { const { id } = useParams(); const axiosPrivate = useAxiosPrivate(); const handleException = useExceptionHandler(); + const { getUrl } = useRequestUrl(); const location = useLocation(); const currentPath = location.pathname; @@ -45,25 +47,31 @@ function InputOutput() { setConnectorType(type); + const endpointType = CONNECTOR_TYPE_MAP[type]?.toUpperCase(); + const requestOptions = { method: "GET", - url: `/api/v1/unstract/${ - sessionDetails?.orgId - }/connector/?workflow=${id}&connector_type=${type.toUpperCase()}`, + url: getUrl( + `workflow/endpoint/?workflow=${id}&endpoint_type=${endpointType}` + ), }; axiosPrivate(requestOptions) .then((res) => { - const sources = res?.data; - if (sources?.length === 0) { + const endpoints = res?.data; + if (endpoints?.length === 0) { setSelectedItem(""); setListOfItems([]); return; } - const menuItems = sources.map((item) => - getItem(item?.connector_name, item?.id, sourceIcon(item?.icon)) + const menuItems = endpoints.map((item) => + getItem( + item?.connector_instance?.connector_name, + item?.connector_instance?.id, + sourceIcon(item?.connector_instance?.connector_icon) + ) ); - const firstId = sources[0].id.toString(); + const firstId = endpoints[0]?.connector_instance?.id?.toString(); setSelectedItem(firstId); setListOfItems(menuItems); }) @@ -92,7 +100,7 @@ function InputOutput() { const handleDelete = () => { const requestOptions = { method: "DELETE", - url: `/api/v1/unstract/${sessionDetails?.orgId}/connector/${selectedItem}/`, + url: getUrl(`connector/${selectedItem}/`), headers: { "X-CSRFToken": sessionDetails?.csrfToken, }, diff --git a/frontend/src/hooks/usePostHogEvents.js b/frontend/src/hooks/usePostHogEvents.js index 33a5ada0..e890ef29 100644 --- a/frontend/src/hooks/usePostHogEvents.js +++ b/frontend/src/hooks/usePostHogEvents.js @@ -1,4 +1,5 @@ import { usePostHog } from "posthog-js/react"; + import { useSessionStore } from "../store/session-store"; const usePostHogEvents = () => { diff --git a/frontend/src/store/alert-store.js b/frontend/src/store/alert-store.js index 0e42186b..95dec9b7 100644 --- a/frontend/src/store/alert-store.js +++ b/frontend/src/store/alert-store.js @@ -1,7 +1,8 @@ import { create } from "zustand"; -import { isNonNegativeNumber } from "../helpers/GetStaticData"; import { uniqueId } from "lodash"; +import { isNonNegativeNumber } from "../helpers/GetStaticData"; + const DEFAULT_DURATION = 6; const STORE_VARIABLES = {