UN-2774 [FEAT] Add sharing functionality for ETL/TASK pipelines (#1545)

* UN-2774 [FEAT] Add sharing functionality for ETL pipelines

- Added shared_users and shared_to_org fields to Pipeline model
- Implemented IsOwnerOrSharedUser permission using common permission class
- Added API endpoints for managing pipeline sharing
- Updated frontend to support sharing modal and user selection
- Fixed issue with empty user list in sharing modal

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* [REFACTOR] Move Django Q import to top of file

Move the Django Q import from inside the method to the top-level imports
for better code organization and consistency.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* [FEAT] Add granular permissions for pipeline operations

- Restrict destroy, update operations to owners only
- Allow shared users access to other operations
- Add typing import for type annotations

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* [FEAT] Enhance sharing notification system

- Add conditional import for notification plugin with fallback
- Map pipeline types to proper resource types (ETL/TASK)
- Improve error handling and logging for notification failures
- Prevent update operation failure when notifications fail

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* Addressing review comments

---------

Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
Rahul Johny
2025-10-01 12:45:35 +05:30
committed by GitHub
parent 8480fad97e
commit e37029b814
8 changed files with 317 additions and 5 deletions

View File

@@ -0,0 +1,32 @@
# Generated by Django 4.2.1 on 2025-09-17 13:18
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
("pipeline_v2", "0002_remove_pipeline_unique_pipeline_and_more"),
]
operations = [
migrations.AddField(
model_name="pipeline",
name="shared_to_org",
field=models.BooleanField(
db_comment="Whether this pipeline is shared with the entire organization",
default=False,
),
),
migrations.AddField(
model_name="pipeline",
name="shared_users",
field=models.ManyToManyField(
blank=True,
db_comment="Users with whom this pipeline is shared",
related_name="shared_pipelines",
to=settings.AUTH_USER_MODEL,
),
),
]

View File

@@ -3,6 +3,7 @@ import uuid
from account_v2.models import User
from django.conf import settings
from django.db import models
from django.db.models import Q
from utils.models.base_model import BaseModel
from utils.models.organization_mixin import (
DefaultOrganizationManagerMixin,
@@ -18,7 +19,16 @@ PIPELINE_NAME_LENGTH = 32
class PipelineModelManager(DefaultOrganizationManagerMixin, models.Manager):
pass
def for_user(self, user):
"""Filter pipelines that the user can access:
- Pipelines created by the user
- Pipelines shared with the user
"""
return self.filter(
Q(created_by=user) # Owned by user
| Q(shared_users=user) # Shared with user
# Q(shared_to_org=True) # Org-wide sharing (optional)
).distinct()
class Pipeline(DefaultOrganizationMixin, BaseModel):
@@ -91,6 +101,17 @@ class Pipeline(DefaultOrganizationMixin, BaseModel):
null=True,
blank=True,
)
# Sharing fields
shared_users = models.ManyToManyField(
User,
related_name="shared_pipelines",
blank=True,
db_comment="Users with whom this pipeline is shared",
)
shared_to_org = models.BooleanField(
default=False,
db_comment="Whether this pipeline is shared with the entire organization",
)
# Manager
objects = PipelineModelManager()

View File

@@ -24,6 +24,7 @@ DEPLOYMENT_ENDPOINT = settings.API_DEPLOYMENT_PATH_PREFIX + "/pipeline"
class PipelineSerializer(IntegrityErrorMixin, AuditSerializer):
api_endpoint = SerializerMethodField()
created_by_email = SerializerMethodField()
class Meta:
model = Pipeline
@@ -195,6 +196,10 @@ class PipelineSerializer(IntegrityErrorMixin, AuditSerializer):
"""
return instance.api_endpoint
def get_created_by_email(self, obj):
"""Get the creator's email address."""
return obj.created_by.email if obj.created_by else None
def create(self, validated_data: dict[str, Any]) -> Any:
# TODO: Deduce pipeline type based on WF?
validated_data[PK.ACTIVE] = True

View File

@@ -0,0 +1,37 @@
"""Serializers for pipeline sharing functionality."""
from account_v2.serializer import UserSerializer
from pipeline_v2.models import Pipeline
from rest_framework import serializers
from rest_framework.serializers import SerializerMethodField
class SharedUserListSerializer(serializers.ModelSerializer):
"""Serializer for returning pipeline with shared user details."""
shared_users = SerializerMethodField()
created_by = SerializerMethodField()
created_by_email = SerializerMethodField()
class Meta:
model = Pipeline
fields = [
"id",
"pipeline_name",
"shared_users",
"shared_to_org",
"created_by",
"created_by_email",
]
def get_shared_users(self, obj):
"""Get list of shared users with their details."""
return UserSerializer(obj.shared_users.all(), many=True).data
def get_created_by(self, obj):
"""Get the creator's username."""
return obj.created_by.username if obj.created_by else None
def get_created_by_email(self, obj):
"""Get the creator's email."""
return obj.created_by.email if obj.created_by else None

View File

@@ -31,6 +31,12 @@ download_postman_collection = PipelineViewSet.as_view(
}
)
list_shared_users = PipelineViewSet.as_view(
{
"get": PipelineViewSet.list_of_shared_users.__name__,
}
)
pipeline_execute = PipelineViewSet.as_view({"post": "execute"})
@@ -44,6 +50,11 @@ urlpatterns = format_suffix_patterns(
name=PipelineURL.EXECUTIONS,
),
path("pipeline/execute/", pipeline_execute, name=PipelineURL.EXECUTE),
path(
"pipeline/<uuid:pk>/users/",
list_shared_users,
name="pipeline-shared-users",
),
path(
"pipeline/api/postman_collection/<uuid:pk>/",
download_postman_collection,

View File

@@ -1,5 +1,6 @@
import json
import logging
from typing import Any
from account_v2.custom_exceptions import DuplicateData
from api_v2.exceptions import NoActiveAPIKeyError
@@ -8,7 +9,7 @@ from api_v2.postman_collection.dto import PostmanCollection
from django.db import IntegrityError
from django.db.models import QuerySet
from django.http import HttpResponse
from permissions.permission import IsOwner
from permissions.permission import IsOwner, IsOwnerOrSharedUser
from rest_framework import serializers, status, viewsets
from rest_framework.decorators import action
from rest_framework.request import Request
@@ -29,6 +30,17 @@ from pipeline_v2.serializers.crud import PipelineSerializer
from pipeline_v2.serializers.execute import (
PipelineExecuteSerializer as ExecuteSerializer,
)
from pipeline_v2.serializers.sharing import SharedUserListSerializer
try:
from plugins.notification.constants import ResourceType
from plugins.notification.sharing_notification import SharingNotificationService
NOTIFICATION_PLUGIN_AVAILABLE = True
sharing_notification_service = SharingNotificationService()
except ImportError:
NOTIFICATION_PLUGIN_AVAILABLE = False
sharing_notification_service = None
logger = logging.getLogger(__name__)
@@ -36,11 +48,17 @@ logger = logging.getLogger(__name__)
class PipelineViewSet(viewsets.ModelViewSet):
versioning_class = URLPathVersioning
queryset = Pipeline.objects.all()
permission_classes = [IsOwner]
def get_permissions(self) -> list[Any]:
if self.action in ["destroy", "partial_update", "update"]:
return [IsOwner()]
return [IsOwnerOrSharedUser()]
serializer_class = PipelineSerializer
def get_queryset(self) -> QuerySet:
queryset = Pipeline.objects.filter(created_by=self.request.user)
# Use for_user manager method to include shared pipelines
queryset = Pipeline.objects.for_user(self.request.user)
# Apply type filter if specified
pipeline_type = self.request.query_params.get(PipelineConstants.TYPE)
@@ -101,6 +119,59 @@ class PipelineViewSet(viewsets.ModelViewSet):
super().perform_destroy(instance)
return SchedulerHelper.remove_job(pipeline_to_remove)
@action(detail=True, methods=["get"], url_path="users", permission_classes=[IsOwner])
def list_of_shared_users(self, request: Request, pk: str | None = None) -> Response:
"""Returns the list of users the pipeline is shared with."""
pipeline = self.get_object()
serializer = SharedUserListSerializer(pipeline)
return Response(serializer.data, status=status.HTTP_200_OK)
def partial_update(self, request: Request, *args: Any, **kwargs: Any) -> Response:
"""Override to handle sharing notifications."""
instance = self.get_object()
current_shared_users = set(instance.shared_users.all())
response = super().partial_update(request, *args, **kwargs)
if (
response.status_code == 200
and "shared_users" in request.data
and NOTIFICATION_PLUGIN_AVAILABLE
):
try:
instance.refresh_from_db()
new_shared_users = set(instance.shared_users.all())
newly_shared_users = new_shared_users - current_shared_users
if ResourceType.ETL.value == instance.pipeline_type:
resource_type = ResourceType.ETL.value
elif ResourceType.TASK.value == instance.pipeline_type:
resource_type = ResourceType.TASK.value
if newly_shared_users:
# Only send notifications if there are newly shared users
sharing_notification_service.send_sharing_notification(
resource_type=resource_type,
resource_name=instance.pipeline_name,
resource_id=str(instance.id),
shared_by=request.user,
shared_to=list(newly_shared_users),
resource_instance=instance,
)
logger.info(
f"Sent sharing notifications for {instance.pipeline_type} "
f"to {len(newly_shared_users)} users"
)
except Exception as e:
# Log error but don't fail the update operation
logger.exception(
f"Failed to send sharing notification, continuing update though: {str(e)}"
)
return response
@action(detail=True, methods=["get"])
def download_postman_collection(
self, request: Request, pk: str | None = None

View File

@@ -94,6 +94,32 @@ function pipelineService() {
};
return axiosPrivate(requestOptions);
},
getSharedUsers: (pipelineId) => {
const requestOptions = {
method: "GET",
url: `${path}/pipeline/${pipelineId}/users/`,
};
return axiosPrivate(requestOptions);
},
updateSharing: (pipelineId, sharedUsers, shareWithEveryone = false) => {
const requestOptions = {
method: "PATCH",
url: `${path}/pipeline/${pipelineId}/`,
headers: requestHeaders,
data: {
shared_users: sharedUsers,
shared_to_org: shareWithEveryone,
},
};
return axiosPrivate(requestOptions);
},
getAllUsers: () => {
const requestOptions = {
method: "GET",
url: `${path}/users/`,
};
return axiosPrivate(requestOptions);
},
};
}

View File

@@ -11,6 +11,7 @@ import {
CloudDownloadOutlined,
CopyOutlined,
LoadingOutlined,
ShareAltOutlined,
} from "@ant-design/icons";
import {
Button,
@@ -52,6 +53,7 @@ import {
useInitialFetchCount,
usePromptStudioModal,
} from "../../../hooks/usePromptStudioFetchCount";
import { SharePermission } from "../../widgets/share-permission/SharePermission";
function Pipelines({ type }) {
const [tableData, setTableData] = useState([]);
@@ -78,6 +80,10 @@ function Pipelines({ type }) {
const [openNotificationModal, setOpenNotificationModal] = useState(false);
const { count, isLoading, fetchCount } = usePromptStudioStore();
const { getPromptStudioCount } = usePromptStudioService();
// Sharing state
const [openShareModal, setOpenShareModal] = useState(false);
const [allUsers, setAllUsers] = useState([]);
const [isLoadingShare, setIsLoadingShare] = useState(false);
const initialFetchComplete = useInitialFetchCount(
fetchCount,
@@ -284,6 +290,68 @@ function Pipelines({ type }) {
}
};
const handleShare = async () => {
setIsLoadingShare(true);
// Fetch all users and shared users first, then open modal
try {
const [usersResponse, sharedUsersResponse] = await Promise.all([
pipelineApiService.getAllUsers(),
pipelineApiService.getSharedUsers(selectedPorD.id),
]);
// Extract members array from the response and map to the required format
const userList =
usersResponse?.data?.members?.map((member) => ({
id: member.id,
email: member.email,
})) || [];
const sharedUsersList = sharedUsersResponse.data?.shared_users || [];
// Set shared_users property on selectedPorD for SharePermission component
setSelectedPorD({
...selectedPorD,
shared_users: Array.isArray(sharedUsersList) ? sharedUsersList : [],
});
setAllUsers(userList);
// Only open modal after data is loaded
setOpenShareModal(true);
} catch (error) {
setAlertDetails(handleException(error));
// Ensure allUsers is always an array even on error
setAllUsers([]);
} finally {
setIsLoadingShare(false);
}
};
const onShare = (sharedUsers, _, shareWithEveryone) => {
setIsLoadingShare(true);
// sharedUsers is already an array of user IDs from SharePermission component
pipelineApiService
.updateSharing(selectedPorD.id, sharedUsers, shareWithEveryone)
.then(() => {
setAlertDetails({
type: "success",
content: "Sharing permissions updated successfully",
});
setOpenShareModal(false);
// Refresh pipeline list to show updated ownership
getPipelineList();
})
.catch((error) => {
setAlertDetails(
handleException(error, "Failed to update sharing settings")
);
})
.finally(() => {
setIsLoadingShare(false);
});
};
const actionItems = [
// Configuration Section
{
@@ -336,6 +404,19 @@ function Pipelines({ type }) {
</Space>
),
},
{
key: "share",
label: (
<Space
direction="horizontal"
className="action-items"
onClick={handleShare}
>
<ShareAltOutlined />
<Typography.Text>Share</Typography.Text>
</Space>
),
},
{
key: "divider-config",
type: "divider",
@@ -564,6 +645,22 @@ function Pipelines({ type }) {
</div>
),
},
{
title: "Owner",
dataIndex: "created_by_email",
key: "created_by_email",
align: "center",
render: (email, record) => {
const isOwner = record.created_by === sessionDetails?.userId;
return (
<Tooltip title={email}>
<Typography.Text className="p-or-d-typography">
{isOwner ? "You" : email?.split("@")[0] || "Unknown"}
</Typography.Text>
</Tooltip>
);
},
},
{
title: "Enabled",
key: "active",
@@ -572,7 +669,7 @@ function Pipelines({ type }) {
render: (_, record) => (
<Switch
checked={record.active}
onChange={(e) => {
onChange={() => {
handleEnablePipeline(!record.active, record.id);
}}
/>
@@ -653,6 +750,18 @@ function Pipelines({ type }) {
type={deploymentApiTypes.pipeline}
id={selectedPorD?.id}
/>
{openShareModal && (
<SharePermission
open={openShareModal}
setOpen={setOpenShareModal}
adapter={selectedPorD}
permissionEdit={true}
loading={isLoadingShare}
allUsers={Array.isArray(allUsers) ? allUsers : []}
onApply={onShare}
isSharableToOrg={false}
/>
)}
</div>
);
}