UN-2585 [MISC] Deprecate connector instance fields and refactor endpoint handling (#1454)

* [MISC] Deprecate connector instance fields and migrate to workflow endpoints

- Updated ConnectorInstance __str__ method to use connector_id and connector_mode
- Migrated PipelineSerializer to use WorkflowEndpoint instead of ConnectorInstance
- Enhanced WorkflowEndpointSerializer with backward compatibility fields and CRUD operations
- Updated WorkflowEndpointViewSet with proper filtering and moved serializer import
- Modified frontend components to use workflow-endpoint API instead of connector API
- Added gitignore entries for development tools and prompting files
- Removed duplicate WorkflowEndpointSerializer from workflow_v2/serializers.py

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* [MISC] Deprecate connector instance fields and refactor endpoint handling

- Add migration to make connector_type and workflow fields nullable in ConnectorInstance
- Introduce EncryptedBinaryField for secure metadata storage with automatic encryption/decryption
- Add deprecation warning to ConnectorInstance.metadata property
- Refactor ConnectorInstanceSerializer to use new encrypted field and remove manual encryption
- Update WorkflowEndpointSerializer to use connector_instance_id field instead of nested data
- Fix frontend DsSettingsCard to handle both object and string connector_instance formats
- Update destination/source connectors to use connector_metadata directly instead of deprecated metadata property
- Add proper connector instance extraction logic in handleUpdate to prevent validation errors
- Ensure backward compatibility while transitioning to new workflow endpoint architecture

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* Minor PR comments addressed

* Moved back oauth token refresh and connector metadata addition back to serializer

* Replace hardcoded URLs with useRequestUrl hook in frontend components

Updated InputOutput.jsx and DsSettingsCard.jsx to use the useRequestUrl hook
instead of hardcoded API URLs for better maintainability and consistency.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* UN-2585 [FIX] Fix connector instance queryset organization filtering in serializer

- Add get_fields() method to WorkflowEndpointSerializer to dynamically set connector_instance_id queryset
- Fix issue where connector instances were not visible due to organization context timing in integration environment
- Change clearDestination method from PUT to PATCH for consistency

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* Update backend/utils/fields.py

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Signed-off-by: Chandrasekharan M <117059509+chandrasekharan-zipstack@users.noreply.github.com>

* Added docstring for get_fields method

* Fixed endpoint serializer to include workflow name

* Minor code smell fix

* minor: Refactored sample.env comments

---------

Signed-off-by: Chandrasekharan M <117059509+chandrasekharan-zipstack@users.noreply.github.com>
Co-authored-by: Claude <noreply@anthropic.com>
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
This commit is contained in:
Chandrasekharan M
2025-07-24 19:03:36 +05:30
committed by GitHub
parent 26fa15a826
commit ff5dd19b09
22 changed files with 428 additions and 185 deletions

14
.gitignore vendored
View File

@@ -680,7 +680,15 @@ backend/backend/*_urls.py
backend/backend/*_urls_v2.py
!backend/backend/public_urls_v2.py
# Adding claude prompting directories
## Vibe coding / Prompting-related files ##
prompting/
.prompting/
# Claude
CLAUDE.md
CONTRIBUTION_GUIDE.md
prompting/*
.claude/*
# Windsurf
.qodo
.windsurfrules
.windsurf/rules

View File

@@ -0,0 +1,37 @@
# Generated by Django 4.2.1 on 2025-07-16 10:26
import django.db.models.deletion
import utils.fields
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("workflow_v2", "0015_executionlog_idx_wf_execution_event_time_and_more"),
("connector_v2", "0001_initial"),
]
operations = [
migrations.AlterField(
model_name="connectorinstance",
name="connector_metadata",
field=utils.fields.EncryptedBinaryField(null=True),
),
migrations.AlterField(
model_name="connectorinstance",
name="connector_type",
field=models.CharField(
choices=[("INPUT", "Input"), ("OUTPUT", "Output")], null=True
),
),
migrations.AlterField(
model_name="connectorinstance",
name="workflow",
field=models.ForeignKey(
null=True,
on_delete=django.db.models.deletion.CASCADE,
related_name="connector_workflow",
to="workflow_v2.workflow",
),
),
]

View File

@@ -1,4 +1,4 @@
import json
import logging
import uuid
from typing import Any
@@ -6,10 +6,8 @@ from account_v2.models import User
from connector_auth_v2.models import ConnectorAuth
from connector_processor.connector_processor import ConnectorProcessor
from connector_processor.constants import ConnectorKeys
from cryptography.fernet import Fernet, InvalidToken
from django.conf import settings
from django.db import models
from utils.exceptions import InvalidEncryptionKey
from utils.fields import EncryptedBinaryField
from utils.models.base_model import BaseModel
from utils.models.organization_mixin import (
DefaultOrganizationManagerMixin,
@@ -21,6 +19,8 @@ from backend.constants import FieldLengthConstants as FLC
CONNECTOR_NAME_SIZE = 128
VERSION_NAME_SIZE = 64
logger = logging.getLogger(__name__)
class ConnectorInstanceModelManager(DefaultOrganizationManagerMixin, models.Manager):
pass
@@ -45,13 +45,13 @@ class ConnectorInstance(DefaultOrganizationMixin, BaseModel):
"workflow_v2.Workflow",
on_delete=models.CASCADE,
related_name="connector_workflow",
null=False,
null=True,
blank=False,
)
connector_id = models.CharField(max_length=FLC.CONNECTOR_ID_LENGTH, default="")
connector_metadata = models.BinaryField(null=True)
connector_metadata = EncryptedBinaryField(null=True)
connector_version = models.CharField(max_length=VERSION_NAME_SIZE, default="")
connector_type = models.CharField(choices=ConnectorType.choices)
connector_type = models.CharField(choices=ConnectorType.choices, null=True)
# TODO: handle connector_auth cascade deletion
connector_auth = models.ForeignKey(
ConnectorAuth,
@@ -84,21 +84,6 @@ class ConnectorInstance(DefaultOrganizationMixin, BaseModel):
# Manager
objects = ConnectorInstanceModelManager()
# TODO: Remove if unused
def get_connector_metadata(self) -> dict[str, str]:
"""Gets connector metadata and refreshes the tokens if needed in case
of OAuth.
"""
tokens_refreshed = False
if self.connector_auth:
(
self.connector_metadata,
tokens_refreshed,
) = self.connector_auth.get_and_refresh_tokens()
if tokens_refreshed:
self.save()
return self.connector_metadata
@staticmethod
def supportsOAuth(connector_id: str) -> bool:
return bool(
@@ -109,19 +94,9 @@ class ConnectorInstance(DefaultOrganizationMixin, BaseModel):
def __str__(self) -> str:
return (
f"Connector({self.id}, type{self.connector_type}, workflow: {self.workflow})"
f"Connector({self.id}, ID={self.connector_id}, mode: {self.connector_mode})"
)
def get_connector_metadata_bytes(self):
"""Convert connector_metadata to bytes if it is a memoryview.
Returns:
bytes: The connector_metadata as bytes.
"""
if isinstance(self.connector_metadata, memoryview):
return self.connector_metadata.tobytes()
return self.connector_metadata
@property
def metadata(self) -> Any:
"""Decrypt and return the connector metadata as a dictionary.
@@ -132,21 +107,20 @@ class ConnectorInstance(DefaultOrganizationMixin, BaseModel):
Returns:
dict: The decrypted connector metadata.
.. deprecated::
This property is deprecated. Use `connector_metadata` field directly instead.
This property will be removed in a future version.
"""
try:
connector_metadata_bytes = self.get_connector_metadata_bytes()
import warnings
if connector_metadata_bytes is None:
return None
if isinstance(connector_metadata_bytes, (dict)):
return connector_metadata_bytes
encryption_secret: str = settings.ENCRYPTION_KEY
cipher_suite: Fernet = Fernet(encryption_secret.encode("utf-8"))
decrypted_value = cipher_suite.decrypt(connector_metadata_bytes)
except InvalidToken:
raise InvalidEncryptionKey(entity=InvalidEncryptionKey.Entity.CONNECTOR)
return json.loads(decrypted_value.decode("utf-8"))
warnings.warn(
"The 'metadata' property is deprecated. Use 'connector_metadata' field directly instead. "
"This property will be removed in a future version.",
DeprecationWarning,
stacklevel=2,
)
return self.connector_metadata
class Meta:
db_table = "connector_instance"

View File

@@ -1,4 +1,3 @@
import json
import logging
from collections import OrderedDict
from typing import Any
@@ -8,8 +7,8 @@ from connector_auth_v2.pipeline.common import ConnectorAuthHelper
from connector_processor.connector_processor import ConnectorProcessor
from connector_processor.constants import ConnectorKeys
from connector_processor.exceptions import OAuthTimeOut
from cryptography.fernet import Fernet
from django.conf import settings
from rest_framework.serializers import SerializerMethodField
from utils.fields import EncryptedBinaryFieldSerializer
from utils.serializer_utils import SerializerUtils
from backend.serializers import AuditSerializer
@@ -22,22 +21,13 @@ logger = logging.getLogger(__name__)
class ConnectorInstanceSerializer(AuditSerializer):
connector_metadata = EncryptedBinaryFieldSerializer(required=False, allow_null=True)
icon = SerializerMethodField()
class Meta:
model = ConnectorInstance
fields = "__all__"
def validate_connector_metadata(self, value: dict[Any]) -> dict[Any]:
"""Validating Json metadata This custom validation is to avoid conflict
with user input and db binary data.
Args:
value (Any): dict of metadata
Returns:
dict[Any]: dict of metadata
"""
return value
def save(self, **kwargs): # type: ignore
user = self.context.get("request").user or None
connector_id: str = kwargs[CIKey.CONNECTOR_ID]
@@ -68,15 +58,19 @@ class ConnectorInstanceSerializer(AuditSerializer):
)
kwargs[CIKey.CONNECTOR_MODE] = connector_mode.value
encryption_secret: str = settings.ENCRYPTION_KEY
f: Fernet = Fernet(encryption_secret.encode("utf-8"))
json_string: str = json.dumps(kwargs.get(CIKey.CONNECTOR_METADATA))
kwargs[CIKey.CONNECTOR_METADATA] = f.encrypt(json_string.encode("utf-8"))
instance = super().save(**kwargs)
return instance
def get_icon(self, obj: ConnectorInstance) -> str:
"""Get connector icon from ConnectorProcessor."""
icon_path = ConnectorProcessor.get_connector_data_with_key(
obj.connector_id, ConnectorKeys.ICON
)
# Ensure icon path is properly formatted for frontend
if icon_path and not icon_path.startswith("/"):
return f"/{icon_path}"
return icon_path
def to_representation(self, instance: ConnectorInstance) -> dict[str, str]:
# to remove the sensitive fields being returned
rep: OrderedDict[str, Any] = super().to_representation(instance)
@@ -84,11 +78,6 @@ class ConnectorInstanceSerializer(AuditSerializer):
rep[CIKey.CONNECTOR_METADATA] = {}
if SerializerUtils.check_context_for_GET_or_POST(context=self.context):
rep.pop(CIKey.CONNECTOR_AUTH)
# set icon fields for UI
rep[ConnectorKeys.ICON] = ConnectorProcessor.get_connector_data_with_key(
instance.connector_id, ConnectorKeys.ICON
)
rep[ConnectorKeys.ICON] = self.get_icon(instance)
if instance.connector_metadata:
rep[CIKey.CONNECTOR_METADATA] = instance.metadata
return rep

View File

@@ -35,7 +35,7 @@ class FileManagerHelper:
@staticmethod
def get_file_system(connector: ConnectorInstance) -> UnstractFileSystem:
"""Creates the `UnstractFileSystem` for the corresponding connector."""
metadata = connector.metadata
metadata = connector.connector_metadata
if connector.connector_id in fs_connectors:
connector = fs_connectors[connector.connector_id]["metadata"]["connector"]
connector_class: UnstractFileSystem = connector(metadata)

View File

@@ -224,44 +224,42 @@ class PipelineSerializer(IntegrityErrorMixin, AuditSerializer):
def _add_connector_data(
self,
repr: OrderedDict[str, Any],
connector_instance_list: list[Any],
workflow_endpoints: list[Any],
connectors: list[Any],
) -> OrderedDict[str, Any]:
"""Adds connector Input/Output data.
Args:
sef (_type_): _description_
repr (OrderedDict[str, Any]): _description_
repr (OrderedDict[str, Any]): The representation dictionary
workflow_endpoints (list[Any]): List of WorkflowEndpoint objects
connectors (list[Any]): List of available connectors
Returns:
OrderedDict[str, Any]: _description_
OrderedDict[str, Any]: Updated representation with connector data
"""
repr[PC.SOURCE_NAME] = PC.NOT_CONFIGURED
repr[PC.DESTINATION_NAME] = PC.NOT_CONFIGURED
for instance in connector_instance_list:
if instance.connector_type == "INPUT":
repr[PC.SOURCE_NAME], repr[PC.SOURCE_ICON] = self._get_name_and_icon(
connectors=connectors,
connector_id=instance.connector_id,
)
if instance.connector_type == "OUTPUT":
repr[PC.DESTINATION_NAME], repr[PC.DESTINATION_ICON] = (
self._get_name_and_icon(
for endpoint in workflow_endpoints:
if endpoint.endpoint_type == WorkflowEndpoint.EndpointType.SOURCE:
if endpoint.connector_instance:
repr[PC.SOURCE_NAME], repr[PC.SOURCE_ICON] = self._get_name_and_icon(
connectors=connectors,
connector_id=instance.connector_id,
connector_id=endpoint.connector_instance.connector_id,
)
)
if repr[PC.DESTINATION_NAME] == PC.NOT_CONFIGURED:
try:
check_manual_review = WorkflowEndpoint.objects.get(
workflow=instance.workflow,
endpoint_type=WorkflowEndpoint.EndpointType.DESTINATION,
connection_type=WorkflowEndpoint.ConnectionType.MANUALREVIEW,
elif endpoint.endpoint_type == WorkflowEndpoint.EndpointType.DESTINATION:
if (
endpoint.connection_type
== WorkflowEndpoint.ConnectionType.MANUALREVIEW
):
repr[PC.DESTINATION_NAME] = "Manual Review"
elif endpoint.connector_instance:
repr[PC.DESTINATION_NAME], repr[PC.DESTINATION_ICON] = (
self._get_name_and_icon(
connectors=connectors,
connector_id=endpoint.connector_instance.connector_id,
)
)
if check_manual_review:
repr[PC.DESTINATION_NAME] = "Manual Review"
except Exception as ex:
logger.debug(f"Not a Manual review destination: {ex}")
return repr
def to_representation(self, instance: Pipeline) -> OrderedDict[str, Any]:
@@ -273,15 +271,17 @@ class PipelineSerializer(IntegrityErrorMixin, AuditSerializer):
if SerializerUtils.check_context_for_GET_or_POST(context=self.context):
workflow = instance.workflow
connector_instance_list = ConnectorInstance.objects.filter(
workflow=workflow.id
).all()
workflow_endpoints = (
WorkflowEndpoint.objects.filter(workflow=workflow.id)
.select_related("connector_instance")
.all()
)
repr[PK.WORKFLOW_ID] = workflow.id
repr[PK.WORKFLOW_NAME] = workflow.workflow_name
repr[PK.CRON_STRING] = repr.pop(PK.CRON_STRING)
repr = self._add_connector_data(
repr=repr,
connector_instance_list=connector_instance_list,
workflow_endpoints=workflow_endpoints,
connectors=connectors,
)

View File

@@ -169,25 +169,27 @@ TOOL_REGISTRY_STORAGE_CREDENTIALS='{"provider":"local"}'
ENABLE_HIGHLIGHT_API_DEPLOYMENT=False
# Execution result and cache expire time
# For API results cached per workflow execution
EXECUTION_RESULT_TTL_SECONDS=86400 # 24 hours
# For execution metadata cached per workflow execution
EXECUTION_CACHE_TTL_SECONDS=86400 # 24 hours
# Instant workflow polling timeout in seconds
INSTANT_WF_POLLING_TIMEOUT=300 # 5 minutes
# For API results cached per workflow execution (24 hours)
EXECUTION_RESULT_TTL_SECONDS=86400
# For execution metadata cached per workflow execution (24 hours)
EXECUTION_CACHE_TTL_SECONDS=86400
# Instant workflow polling timeout in seconds (5 minutes)
INSTANT_WF_POLLING_TIMEOUT=300
# Maximum number of batches (i.e., parallel tasks) created for a single workflow execution
MAX_PARALLEL_FILE_BATCHES=1 # 1 file at a time
# Maximum number of batches (i.e., parallel tasks) created for a single workflow execution (1 file at a time)
MAX_PARALLEL_FILE_BATCHES=1
# Maximum allowed value for MAX_PARALLEL_FILE_BATCHES (upper limit for validation)
MAX_PARALLEL_FILE_BATCHES_MAX_VALUE=100
# File execution tracker ttl in seconds
FILE_EXECUTION_TRACKER_TTL_IN_SECOND=18000 # 5 hours
FILE_EXECUTION_TRACKER_COMPLETED_TTL_IN_SECOND=600 # 10 minutes
# File execution tracker TTL in seconds (5 hours)
FILE_EXECUTION_TRACKER_TTL_IN_SECOND=18000
# File execution tracker completed TTL in seconds (10 minutes)
FILE_EXECUTION_TRACKER_COMPLETED_TTL_IN_SECOND=600
# Runner polling timeout
MAX_RUNNER_POLLING_WAIT_SECONDS=10800 # 3 hours
RUNNER_POLLING_INTERVAL_SECONDS=2 # 2 seconds
# Runner polling timeout (3 hours)
MAX_RUNNER_POLLING_WAIT_SECONDS=10800
# Runner polling interval (2 seconds)
RUNNER_POLLING_INTERVAL_SECONDS=2
# ETL Pipeline minimum schedule interval (in seconds)
# Default: 1800 seconds (30 minutes)

199
backend/utils/fields.py Normal file
View File

@@ -0,0 +1,199 @@
"""Custom Django model fields and serializer fields for common use cases."""
import json
import logging
from typing import Any
from cryptography.fernet import Fernet, InvalidToken
from django.conf import settings
from django.db import models
from rest_framework import serializers
from utils.exceptions import InvalidEncryptionKey
logger = logging.getLogger(__name__)
class EncryptedBinaryField(models.BinaryField):
"""A BinaryField that automatically encrypts/decrypts JSON data.
This field transparently handles encryption when storing data to the database
and decryption when retrieving data from the database. It expects dictionary
data as input and returns dictionary data as output.
Features:
- Automatic encryption/decryption using Fernet symmetric encryption
- JSON serialization/deserialization
- Proper error handling for encryption failures
- Null value support
- Compatible with Django ORM and DRF serializers
Usage:
class MyModel(models.Model):
encrypted_data = EncryptedBinaryField(null=True)
# Usage is transparent
instance = MyModel.objects.create(encrypted_data={'key': 'value'})
print(instance.encrypted_data) # {'key': 'value'}
"""
description = "A binary field that automatically encrypts/decrypts JSON data"
def __init__(self, *args, **kwargs):
# Remove any JSON-specific kwargs that might be passed
kwargs.pop("encoder", None)
kwargs.pop("decoder", None)
super().__init__(*args, **kwargs)
def _get_encryption_key(self) -> bytes:
"""Get the encryption key from Django settings."""
encryption_key = getattr(settings, "ENCRYPTION_KEY", None)
if not encryption_key:
raise ValueError("ENCRYPTION_KEY not found in Django settings")
return encryption_key.encode("utf-8")
def _encrypt_value(self, value: Any) -> bytes | None:
"""Encrypt a Python value (typically dict) to bytes."""
if value is None:
return None
try:
# Serialize to JSON string
json_string = json.dumps(value)
# Encrypt the JSON string
cipher_suite = Fernet(self._get_encryption_key())
encrypted_bytes = cipher_suite.encrypt(json_string.encode("utf-8"))
return encrypted_bytes
except Exception as e:
logger.error(f"Failed to encrypt value: {e}")
raise
def _decrypt_value(self, value: bytes) -> dict | None:
"""Decrypt bytes to a Python value (typically dict)."""
if value is None:
return None
try:
# Handle memoryview objects from database
if isinstance(value, memoryview):
value = value.tobytes()
# If it's already a dict, return as-is (for backward compatibility)
if isinstance(value, dict):
logger.warning(
"Detected unencrypted dict in EncryptedBinaryField "
"legacy data will remain unencrypted. "
"Run a migration to secure all records."
)
return value
# Decrypt the bytes
cipher_suite = Fernet(self._get_encryption_key())
decrypted_bytes = cipher_suite.decrypt(value)
# Parse JSON string back to Python object
json_string = decrypted_bytes.decode("utf-8")
return json.loads(json_string)
except InvalidToken:
logger.error("Invalid encryption token - possibly wrong encryption key")
raise InvalidEncryptionKey(entity=InvalidEncryptionKey.Entity.CONNECTOR)
except Exception as e:
logger.error(f"Failed to decrypt value: {e}")
raise
def from_db_value(self, value, expression, connection):
"""Convert database value to Python value.
Called when data is loaded from the database.
"""
return self._decrypt_value(value)
def to_python(self, value):
"""Convert input value to Python value.
Called during model validation and form processing.
"""
if value is None:
return value
# If it's already a Python object (dict), return as-is
if isinstance(value, dict):
return value
# If it's bytes (encrypted), decrypt it
if isinstance(value, (bytes, memoryview)):
return self._decrypt_value(value)
# If it's a string, try to parse as JSON
if isinstance(value, str):
try:
return json.loads(value)
except json.JSONDecodeError:
pass
return value
def get_prep_value(self, value):
"""Convert Python value to database value.
Called before saving to the database.
"""
if value is None:
return None
# If it's already bytes (encrypted), return as-is
if isinstance(value, (bytes, memoryview)):
return value
# Encrypt the value
return self._encrypt_value(value)
def get_db_prep_value(self, value, connection, prepared=False):
"""Convert Python value to database-specific value."""
if not prepared:
value = self.get_prep_value(value)
return super().get_db_prep_value(value, connection, prepared)
def value_to_string(self, obj):
"""Convert field value to string for serialization."""
value = self.value_from_object(obj)
if value is None:
return None
return json.dumps(value)
def formfield(self, **kwargs):
"""Return a form field for this model field."""
# Use a CharField for forms since we want JSON input
from django import forms
defaults = {"widget": forms.Textarea}
defaults.update(kwargs)
return forms.CharField(**defaults)
class EncryptedBinaryFieldSerializer(serializers.Field):
"""Custom serializer field for EncryptedBinaryField.
This field handles serialization/deserialization of encrypted binary data
as JSON dictionaries, making it compatible with DRF's serialization process.
"""
def to_representation(self, value):
"""Convert encrypted binary data to JSON dictionary for API responses."""
if value is None:
return None
# The EncryptedBinaryField already decrypts the value when accessed
# so we just need to return it as-is
return value
def to_internal_value(self, data):
"""Convert JSON dictionary to internal value for database storage."""
if data is None:
return None
# Validate that the data is a dictionary
if not isinstance(data, dict):
raise serializers.ValidationError("Expected a dictionary.")
# Return the data as-is; the EncryptedBinaryField will handle encryption
return data

View File

@@ -100,10 +100,6 @@ class DestinationConnector(BaseConnector):
workflow=workflow,
endpoint_type=WorkflowEndpoint.EndpointType.DESTINATION,
)
if endpoint.connector_instance:
endpoint.connector_instance.connector_metadata = (
endpoint.connector_instance.metadata
)
return endpoint
def _get_source_endpoint_for_workflow(
@@ -123,10 +119,6 @@ class DestinationConnector(BaseConnector):
workflow=workflow,
endpoint_type=WorkflowEndpoint.EndpointType.SOURCE,
)
if endpoint.connector_instance:
endpoint.connector_instance.connector_metadata = (
endpoint.connector_instance.metadata
)
return endpoint
def validate(self) -> None:
@@ -149,7 +141,7 @@ class DestinationConnector(BaseConnector):
# Get database class and test connection
db_class = DatabaseUtils.get_db_class(
connector_id=connector.connector_id,
connector_settings=connector.metadata,
connector_settings=connector.connector_metadata,
)
engine = db_class.get_engine()
if hasattr(engine, "close"):
@@ -319,7 +311,7 @@ class DestinationConnector(BaseConnector):
def insert_into_db(self, input_file_path: str) -> None:
"""Insert data into the database."""
connector_instance: ConnectorInstance = self.endpoint.connector_instance
connector_settings: dict[str, Any] = connector_instance.metadata
connector_settings: dict[str, Any] = connector_instance.connector_metadata
destination_configurations: dict[str, Any] = self.endpoint.configuration
table_name: str = str(destination_configurations.get(DestinationKey.TABLE))
include_agent: bool = bool(

View File

@@ -1,12 +1,36 @@
import logging
from rest_framework.serializers import ModelSerializer
from connector_v2.models import ConnectorInstance
from connector_v2.serializers import ConnectorInstanceSerializer
from rest_framework import serializers
from rest_framework.serializers import (
ModelSerializer,
)
from workflow_manager.endpoint_v2.models import WorkflowEndpoint
logger = logging.getLogger(__name__)
class WorkflowEndpointSerializer(ModelSerializer):
connector_instance = ConnectorInstanceSerializer(read_only=True)
connector_instance_id = serializers.PrimaryKeyRelatedField(
queryset=ConnectorInstance.objects.all(), # To not make DRF shout
source="connector_instance",
write_only=True,
allow_null=True,
)
workflow_name = serializers.CharField(source="workflow.workflow_name", read_only=True)
class Meta:
model = WorkflowEndpoint
fields = "__all__"
def get_fields(self):
"""Override get_fields to dynamically set the connector_instance_id queryset.
This is needed to ensure that the queryset is set after the organization
context is available.
"""
fields = super().get_fields()
fields["connector_instance_id"].queryset = ConnectorInstance.objects.all()
return fields

View File

@@ -118,10 +118,6 @@ class SourceConnector(BaseConnector):
workflow=workflow,
endpoint_type=WorkflowEndpoint.EndpointType.SOURCE,
)
if endpoint.connector_instance:
endpoint.connector_instance.connector_metadata = (
endpoint.connector_instance.metadata
)
return endpoint
def validate(self) -> None:

View File

@@ -6,7 +6,7 @@ workflow_endpoint_list = WorkflowEndpointViewSet.as_view(
)
endpoint_list = WorkflowEndpointViewSet.as_view({"get": "list"})
workflow_endpoint_detail = WorkflowEndpointViewSet.as_view(
{"get": "retrieve", "put": "update"}
{"get": "retrieve", "put": "update", "patch": "partial_update"}
)
endpoint_settings_detail = WorkflowEndpointViewSet.as_view(
{"get": WorkflowEndpointViewSet.get_settings.__name__}

View File

@@ -6,8 +6,8 @@ from rest_framework.response import Response
from workflow_manager.endpoint_v2.destination import DestinationConnector
from workflow_manager.endpoint_v2.endpoint_utils import WorkflowEndpointUtils
from workflow_manager.endpoint_v2.models import WorkflowEndpoint
from workflow_manager.endpoint_v2.serializers import WorkflowEndpointSerializer
from workflow_manager.endpoint_v2.source import SourceConnector
from workflow_manager.workflow_v2.serializers import WorkflowEndpointSerializer
class WorkflowEndpointViewSet(viewsets.ModelViewSet):
@@ -19,10 +19,15 @@ class WorkflowEndpointViewSet(viewsets.ModelViewSet):
.select_related("workflow")
.filter(workflow__created_by=self.request.user)
)
workflow_filter = self.request.query_params.get("workflow", None)
if workflow_filter:
queryset = queryset.filter(workflow_id=workflow_filter)
endpoint_type_filter = self.request.query_params.get("endpoint_type", None)
connection_type_filter = self.request.query_params.get("connection_type", None)
if endpoint_type_filter:
queryset = queryset.filter(endpoint_type=endpoint_type_filter)
connection_type_filter = self.request.query_params.get("connection_type", None)
if connection_type_filter:
queryset = queryset.filter(connection_type=connection_type_filter)
return queryset

View File

@@ -16,7 +16,6 @@ from utils.serializer.integrity_error_mixin import IntegrityErrorMixin
from backend.constants import RequestKey
from backend.serializers import AuditSerializer
from workflow_manager.endpoint_v2.models import WorkflowEndpoint
from workflow_manager.workflow_v2.constants import WorkflowExecutionKey, WorkflowKey
from workflow_manager.workflow_v2.models.execution import WorkflowExecution
from workflow_manager.workflow_v2.models.execution_log import ExecutionLog
@@ -104,14 +103,6 @@ class ExecuteWorkflowResponseSerializer(Serializer):
result = JSONField()
class WorkflowEndpointSerializer(ModelSerializer):
workflow_name = CharField(source="workflow.workflow_name", read_only=True)
class Meta:
model = WorkflowEndpoint
fields = "__all__"
class WorkflowExecutionSerializer(ModelSerializer):
class Meta:
model = WorkflowExecution

View File

@@ -100,10 +100,11 @@ class WorkflowViewSet(viewsets.ModelViewSet):
is_active=True,
)
try:
# Create empty WorkflowEndpoints for UI compatibility
# ConnectorInstances will be created when users actually configure connectors
WorkflowEndpointUtils.create_endpoints_for_workflow(workflow)
# NOTE: Add default connector here if needed
except Exception as e:
logger.error(f"Error saving workflow to DB: {e}")
logger.error(f"Error creating workflow endpoints: {e}")
raise WorkflowGenerationError
return workflow

View File

@@ -23,6 +23,7 @@ import { useAxiosPrivate } from "../../../hooks/useAxiosPrivate";
import { useAlertStore } from "../../../store/alert-store";
import { useSessionStore } from "../../../store/session-store";
import { useWorkflowStore } from "../../../store/workflow-store";
import useRequestUrl from "../../../hooks/useRequestUrl";
import SpaceWrapper from "../../widgets/space-wrapper/SpaceWrapper";
import { ConfigureConnectorModal } from "../configure-connector-modal/ConfigureConnectorModal";
import { useExceptionHandler } from "../../../hooks/useExceptionHandler";
@@ -82,6 +83,7 @@ function DsSettingsCard({ type, endpointDetails, message }) {
const axiosPrivate = useAxiosPrivate();
const handleException = useExceptionHandler();
const { flags } = sessionDetails;
const { getUrl } = useRequestUrl();
const icons = {
input: <ImportOutlined className="ds-set-icon-size" />,
@@ -144,16 +146,30 @@ function DsSettingsCard({ type, endpointDetails, message }) {
setConnType(endpointDetails?.connection_type);
}
if (!endpointDetails?.connector_instance?.length) {
if (!endpointDetails?.connector_instance) {
setConnDetails({});
return;
}
if (connDetails?.id === endpointDetails?.connector_instance) {
// Use connector_instance data directly from endpointDetails if it's an object
if (typeof endpointDetails?.connector_instance === "object") {
const connectorData = endpointDetails.connector_instance;
connectorData.connector_metadata = connectorData.connector_metadata || {};
connectorData.connector_metadata.connectorName =
connectorData?.connector_name || "";
setConnDetails(connectorData);
setSelectedId(connectorData?.connector_id);
return;
}
getSourceDetails();
// Fallback for legacy connector_instance ID format (string)
if (typeof endpointDetails?.connector_instance === "string") {
// Only call getSourceDetails if we haven't already loaded this connector
if (connDetails?.id !== endpointDetails?.connector_instance) {
getSourceDetails();
}
return;
}
}, [endpointDetails]);
useEffect(() => {
@@ -187,7 +203,7 @@ function DsSettingsCard({ type, endpointDetails, message }) {
setFormDataConfig(endpointDetails.configuration || {});
const requestOptions = {
method: "GET",
url: `/api/v1/unstract/${sessionDetails?.orgId}/workflow/endpoint/${endpointDetails?.id}/settings/`,
url: getUrl(`workflow/endpoint/${endpointDetails?.id}/settings/`),
};
setIsSpecConfigLoading(true);
@@ -211,9 +227,7 @@ function DsSettingsCard({ type, endpointDetails, message }) {
const requestOptions = {
method: "GET",
url: `/api/v1/unstract/${
sessionDetails?.orgId
}/supported_connectors/?type=${type.toUpperCase()}`,
url: getUrl(`supported_connectors/?type=${type.toUpperCase()}`),
};
axiosPrivate(requestOptions)
@@ -250,16 +264,14 @@ function DsSettingsCard({ type, endpointDetails, message }) {
};
const clearDestination = (updatedData) => {
const body = { ...destination, ...updatedData };
const requestOptions = {
method: "PUT",
url: `/api/v1/unstract/${sessionDetails?.orgId}/workflow/endpoint/${destination?.id}/`,
method: "PATCH",
url: getUrl(`workflow/endpoint/${destination?.id}/`),
headers: {
"X-CSRFToken": sessionDetails?.csrfToken,
"Content-Type": "application/json",
},
data: body,
data: updatedData,
};
axiosPrivate(requestOptions)
@@ -279,22 +291,20 @@ function DsSettingsCard({ type, endpointDetails, message }) {
if (type === "input") {
clearDestination({
connection_type: "",
connector_instance: null,
connector_instance_id: null,
});
}
};
const handleUpdate = (updatedData, showSuccess) => {
const body = { ...endpointDetails, ...updatedData };
const requestOptions = {
method: "PUT",
url: `/api/v1/unstract/${sessionDetails?.orgId}/workflow/endpoint/${endpointDetails?.id}/`,
method: "PATCH",
url: getUrl(`workflow/endpoint/${endpointDetails?.id}/`),
headers: {
"X-CSRFToken": sessionDetails?.csrfToken,
"Content-Type": "application/json",
},
data: body,
data: updatedData,
};
axiosPrivate(requestOptions)
.then((res) => {
@@ -321,7 +331,7 @@ function DsSettingsCard({ type, endpointDetails, message }) {
const getSourceDetails = () => {
const requestOptions = {
method: "GET",
url: `/api/v1/unstract/${sessionDetails?.orgId}/connector/${endpointDetails?.connector_instance}/`,
url: getUrl(`connector/${endpointDetails?.connector_instance}/`),
};
axiosPrivate(requestOptions)
@@ -362,7 +372,7 @@ function DsSettingsCard({ type, endpointDetails, message }) {
onChange={(value) => {
handleUpdate({
connection_type: value,
connector_instance: null,
connector_instance_id: null,
});
updateDestination();
}}

View File

@@ -1,4 +1,5 @@
import { useCallback } from "react";
import useRequestUrl from "../../hooks/useRequestUrl";
import { useAxiosPrivate } from "../../hooks/useAxiosPrivate";

View File

@@ -460,14 +460,13 @@ const CreateApiDeploymentFromPromptStudio = ({
// Update each endpoint to set connection_type to API
for (const endpoint of endpoints) {
await axiosPrivate({
method: "PUT",
method: "PATCH",
url: getUrl(`workflow/endpoint/${endpoint.id}/`),
headers: {
"X-CSRFToken": sessionDetails?.csrfToken,
"Content-Type": "application/json",
},
data: {
...endpoint,
connection_type: "API",
configuration: endpoint.configuration || {},
},

View File

@@ -184,14 +184,16 @@ function ConfigureDs({
const connectorMetadata = { ...formData };
const connectorName = connectorMetadata?.connectorName;
delete connectorMetadata.connectorName;
body = {
workflow: id,
created_by: sessionDetails?.id,
connector_id: selectedSourceId,
connector_metadata: connectorMetadata,
connector_type: type.toUpperCase(),
connector_name: connectorName,
created_by: sessionDetails?.id,
workflow: id,
connector_type: type.toUpperCase(),
};
url += "connector/";
try {
@@ -254,7 +256,10 @@ function ConfigureDs({
const data = res?.data;
if (sourceTypes.connectors.includes(type)) {
handleUpdate(
{ connector_instance: data?.id, configuration: formDataConfig },
{
connector_instance_id: data?.id,
configuration: formDataConfig,
},
true
);
setIsTcSuccessful(false);

View File

@@ -6,6 +6,7 @@ import { CONNECTOR_TYPE_MAP } from "../../../helpers/GetStaticData.js";
import { useAxiosPrivate } from "../../../hooks/useAxiosPrivate";
import { useAlertStore } from "../../../store/alert-store";
import { useSessionStore } from "../../../store/session-store";
import useRequestUrl from "../../../hooks/useRequestUrl";
import { AddSourceModal } from "../add-source-modal/AddSourceModal.jsx";
import { ManageFiles } from "../manage-files/ManageFiles.jsx";
import { Sidebar } from "../sidebar/Sidebar.jsx";
@@ -26,6 +27,7 @@ function InputOutput() {
const { id } = useParams();
const axiosPrivate = useAxiosPrivate();
const handleException = useExceptionHandler();
const { getUrl } = useRequestUrl();
const location = useLocation();
const currentPath = location.pathname;
@@ -45,25 +47,31 @@ function InputOutput() {
setConnectorType(type);
const endpointType = CONNECTOR_TYPE_MAP[type]?.toUpperCase();
const requestOptions = {
method: "GET",
url: `/api/v1/unstract/${
sessionDetails?.orgId
}/connector/?workflow=${id}&connector_type=${type.toUpperCase()}`,
url: getUrl(
`workflow/endpoint/?workflow=${id}&endpoint_type=${endpointType}`
),
};
axiosPrivate(requestOptions)
.then((res) => {
const sources = res?.data;
if (sources?.length === 0) {
const endpoints = res?.data;
if (endpoints?.length === 0) {
setSelectedItem("");
setListOfItems([]);
return;
}
const menuItems = sources.map((item) =>
getItem(item?.connector_name, item?.id, sourceIcon(item?.icon))
const menuItems = endpoints.map((item) =>
getItem(
item?.connector_instance?.connector_name,
item?.connector_instance?.id,
sourceIcon(item?.connector_instance?.connector_icon)
)
);
const firstId = sources[0].id.toString();
const firstId = endpoints[0]?.connector_instance?.id?.toString();
setSelectedItem(firstId);
setListOfItems(menuItems);
})
@@ -92,7 +100,7 @@ function InputOutput() {
const handleDelete = () => {
const requestOptions = {
method: "DELETE",
url: `/api/v1/unstract/${sessionDetails?.orgId}/connector/${selectedItem}/`,
url: getUrl(`connector/${selectedItem}/`),
headers: {
"X-CSRFToken": sessionDetails?.csrfToken,
},

View File

@@ -1,4 +1,5 @@
import { usePostHog } from "posthog-js/react";
import { useSessionStore } from "../store/session-store";
const usePostHogEvents = () => {

View File

@@ -1,7 +1,8 @@
import { create } from "zustand";
import { isNonNegativeNumber } from "../helpers/GetStaticData";
import { uniqueId } from "lodash";
import { isNonNegativeNumber } from "../helpers/GetStaticData";
const DEFAULT_DURATION = 6;
const STORE_VARIABLES = {