-
Notifications
You must be signed in to change notification settings - Fork 623
Expand file tree
/
Copy pathserializers.py
More file actions
522 lines (428 loc) · 19.1 KB
/
serializers.py
File metadata and controls
522 lines (428 loc) · 19.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
import re
import uuid
from collections import OrderedDict
from typing import Any
from urllib.parse import urlparse
from django.apps import apps
from django.core.validators import RegexValidator
from pipeline_v2.models import Pipeline
from prompt_studio.prompt_profile_manager_v2.models import ProfileManager
from rest_framework.serializers import (
BooleanField,
CharField,
FileField,
IntegerField,
JSONField,
ListField,
ModelSerializer,
Serializer,
SerializerMethodField,
URLField,
ValidationError,
)
from tags.serializers import TagParamsSerializer
from utils.serializer.integrity_error_mixin import IntegrityErrorMixin
from workflow_manager.endpoint_v2.models import WorkflowEndpoint
from workflow_manager.workflow_v2.exceptions import ExecutionDoesNotExistError
from workflow_manager.workflow_v2.models.execution import WorkflowExecution
from api_v2.constants import ApiExecution
from api_v2.models import APIDeployment, APIKey
from backend.serializers import AuditSerializer
class APIDeploymentSerializer(IntegrityErrorMixin, AuditSerializer):
class Meta:
model = APIDeployment
fields = "__all__"
unique_error_message_map: dict[str, dict[str, str]] = {
"unique_api_name": {
"field": "api_name",
"message": (
"This API name is already in use. Please select a different name."
),
},
"api_deployment_api_endpoint_key": {
"field": "api_name",
"message": (
"This API name is already in use. Please select a different name."
),
},
}
def validate_api_name(self, value: str) -> str:
api_name_validator = RegexValidator(
regex=r"^[a-zA-Z0-9_-]+$",
message="Only letters, numbers, hyphen and \
underscores are allowed.",
code="invalid_api_name",
)
api_name_validator(value)
return value
def validate_workflow(self, workflow):
"""Validate that the workflow has properly configured source and destination endpoints."""
# Get all endpoints for this workflow with related data
endpoints = WorkflowEndpoint.objects.filter(workflow=workflow).select_related(
"connector_instance"
)
# Check for source endpoint
source_endpoints = endpoints.filter(
endpoint_type=WorkflowEndpoint.EndpointType.SOURCE
)
if not source_endpoints.exists():
raise ValidationError(
"Workflow must have a source endpoint configured before creating an API deployment."
)
source_endpoint = source_endpoints.first()
# First check if connection_type is configured
if not source_endpoint.connection_type:
raise ValidationError(
"Source endpoint must have a connection type configured before creating an API deployment."
)
# For non-API connections, check if connector instance is configured
if source_endpoint.connection_type == WorkflowEndpoint.ConnectionType.API:
# API connections don't need connector instances
pass
elif not source_endpoint.connector_instance:
raise ValidationError(
"Source endpoint must have a connector configured for non-API connections before creating an API deployment."
)
# Check for destination endpoint
destination_endpoints = endpoints.filter(
endpoint_type=WorkflowEndpoint.EndpointType.DESTINATION
)
if not destination_endpoints.exists():
raise ValidationError(
"Workflow must have a destination endpoint configured before creating an API deployment."
)
destination_endpoint = destination_endpoints.first()
# First check if connection_type is configured
if not destination_endpoint.connection_type:
raise ValidationError(
"Destination endpoint must have a connection type configured before creating an API deployment."
)
# For non-API and non-manual review connections, check if connector instance is configured
if destination_endpoint.connection_type in [
WorkflowEndpoint.ConnectionType.API,
WorkflowEndpoint.ConnectionType.MANUALREVIEW,
]:
# API and MANUALREVIEW connections don't need connector instances
pass
elif not destination_endpoint.connector_instance:
raise ValidationError(
"Destination endpoint must have a connector configured for non-API and non-manual review connections before creating an API deployment."
)
return workflow
def validate(self, data):
"""Validate that only one API deployment per workflow is allowed for new deployments."""
workflow = data.get("workflow")
# Only apply this validation for new deployments (not updates)
if workflow and not self.instance:
# Check if this workflow already has an active API deployment
existing_active_count = APIDeployment.objects.filter(
workflow=workflow, is_active=True
).count()
# If there's already an active API deployment, prevent creating a new one
if existing_active_count > 0:
raise ValidationError(
{
"workflow": "This workflow already has an active API deployment. Only one API deployment per workflow is allowed."
}
)
return data
class APIKeySerializer(AuditSerializer):
class Meta:
model = APIKey
fields = "__all__"
def validate(self, data):
api = data.get("api")
pipeline = data.get("pipeline")
if api and pipeline:
raise ValidationError(
"Only one of `api` or `pipeline` should be set, not both."
)
elif not api and not pipeline:
raise ValidationError("At least one of `api` or `pipeline` must be set.")
return data
def to_representation(self, instance: APIKey) -> OrderedDict[str, Any]:
"""Override the to_representation method to include additional
context.
"""
deployment: APIDeployment | Pipeline = self.context.get("deployment")
representation: OrderedDict[str, Any] = super().to_representation(instance)
if deployment:
# Handle APIDeployment and Pipeline separately
if isinstance(deployment, APIDeployment):
representation["api"] = deployment.id
representation["pipeline"] = None
representation["description"] = f"API Key for {deployment.api_name}"
elif isinstance(deployment, Pipeline):
representation["api"] = None
representation["pipeline"] = deployment.id
representation["description"] = f"API Key for {deployment.pipeline_name}"
else:
raise ValueError(
"Context must be an instance of APIDeployment or Pipeline"
)
representation["is_active"] = True
return representation
class ExecutionRequestSerializer(TagParamsSerializer):
"""Execution request serializer.
Attributes:
timeout (int): Timeout for the API deployment, maximum value can be 300s.
If -1 it corresponds to async execution. Defaults to -1
include_metadata (bool): Flag to include metadata in API response
include_metrics (bool): Flag to include metrics in API response
use_file_history (bool): Flag to use FileHistory to save and retrieve
responses quickly. This is undocumented to the user and can be
helpful for demos.
tags (str): Comma-separated List of tags to associate with the execution.
e.g:'tag1,tag2-name,tag3_name'
llm_profile_id (str): UUID of the LLM profile to override the default profile.
If not provided, the default profile will be used.
hitl_queue_name (str, optional): Document class name for manual review queue.
If not provided, uses API name as document class.
presigned_urls (list): List of presigned URLs to fetch files from.
URLs are validated for HTTPS and S3 endpoint requirements.
custom_data (dict, optional): User-provided data for variable replacement in prompts.
Can be accessed in prompts using {{custom_data.key}} syntax for dot notation traversal.
"""
MAX_FILES_ALLOWED = 32
timeout = IntegerField(
min_value=-1, max_value=ApiExecution.MAXIMUM_TIMEOUT_IN_SEC, default=-1
)
include_metadata = BooleanField(default=False)
include_metrics = BooleanField(default=False)
use_file_history = BooleanField(default=False)
presigned_urls = ListField(child=URLField(), required=False)
llm_profile_id = CharField(required=False, allow_null=True, allow_blank=True)
hitl_queue_name = CharField(required=False, allow_null=True, allow_blank=True)
hitl_packet_id = CharField(required=False, allow_null=True, allow_blank=True)
custom_data = JSONField(required=False, allow_null=True)
def validate_hitl_queue_name(self, value: str | None) -> str | None:
"""Validate queue name format using enterprise validation if available."""
# Try to use enterprise validation from pluggable_apps
try:
from pluggable_apps.manual_review_v2.serializers import (
validate_hitl_queue_name_format,
)
return validate_hitl_queue_name_format(value)
except ModuleNotFoundError:
# Fallback to basic validation if enterprise features not available
raise ValidationError(
"Human-in-the-Loop (HITL) queue management requires Unstract Enterprise. "
"This advanced workflow feature is available in our enterprise version. "
"Learn more at https://docs.unstract.com/unstract/unstract_platform/features/workflows/hqr_deployment_workflows/ or "
"contact our sales team at https://unstract.com/contact/"
)
return value
def validate_hitl_packet_id(self, value: str | None) -> str | None:
"""Validate packet ID format using enterprise validation if available."""
if not value:
return value
# Check if HITL feature is available
if not apps.is_installed("pluggable_apps.manual_review_v2"):
raise ValidationError(
"Packet-based HITL processing requires Unstract Enterprise. "
"This advanced workflow feature is available in our enterprise version. "
"Learn more at https://docs.unstract.com/unstract/unstract_platform/features/workflows/hqr_deployment_workflows/ or "
"contact our sales team at https://unstract.com/contact/"
)
# Validate packet ID format (alphanumeric string, typically 8-character hex)
value = value.strip()
if not value:
raise ValidationError("Packet ID cannot be empty or whitespace only.")
# Basic format validation: alphanumeric, reasonable length
if not re.match(r"^[a-zA-Z0-9_-]+$", value):
raise ValidationError(
"Invalid packet ID format. Packet ID must contain only letters, numbers, hyphens, or underscores."
)
if len(value) > 16: # Reasonable max length
raise ValidationError("Packet ID is too long (maximum 100 characters).")
return value
def validate_custom_data(self, value):
"""Validate custom_data is a valid JSON object."""
if value is None:
return value
if not isinstance(value, dict):
raise ValidationError("custom_data must be a JSON object")
return value
files = ListField(
child=FileField(),
required=False,
allow_empty=True,
)
def _validate_presigned_url(self, url: str) -> bool:
"""Validate presigned URL for security and compatibility.
Args:
url (str): The presigned URL to validate
Returns:
bool: True if URL is valid
Raises:
ValidationError: If the URL is invalid or not secure
"""
parsed_url = urlparse(url)
scheme = parsed_url.scheme.lower()
host = (parsed_url.hostname or "").lower()
# Require HTTPS for security
if scheme != "https":
raise ValidationError(
{
"presigned_urls": f"Only HTTPS presigned URLs are allowed. URL scheme found: {scheme}"
}
)
# Only allow S3 endpoints
is_aws = host.endswith(".amazonaws.com")
looks_like_s3 = (
host == "s3.amazonaws.com"
or host.endswith(".s3.amazonaws.com")
or re.match(r"(^|.*\.)s3[.-]([a-z0-9-]+)\.amazonaws\.com$", host) is not None
)
if not (is_aws and looks_like_s3):
raise ValidationError(
{
"presigned_urls": f"URL host '{host}' is not a valid S3 endpoint. Only S3 pre-signed URLs are supported currently."
}
)
return True
def validate_presigned_urls(self, urls):
"""Validate presigned URLs for proper format and endpoint requirements."""
if not urls:
return urls
for url in urls:
self._validate_presigned_url(url)
return urls
def validate(self, data):
"""Validate all parameters including presigned URLs."""
data = super().validate(data)
files = data.get("files", [])
urls = data.get("presigned_urls", [])
total = len(files) + len(urls)
if total == 0:
raise ValidationError("You must provide at least one file or presigned URL.")
if total > self.MAX_FILES_ALLOWED:
raise ValidationError(
f"You can upload a maximum of {self.MAX_FILES_ALLOWED} files in total (uploaded or via presigned URLs)."
)
return data
def validate_llm_profile_id(self, value):
"""Validate that the llm_profile_id belongs to the API key owner."""
if not value:
return value
# Get context from serializer
api = self.context.get("api")
api_key = self.context.get("api_key")
if not api or not api_key:
raise ValidationError("Unable to validate LLM profile ownership")
# Check if profile exists
try:
profile = ProfileManager.objects.get(profile_id=value)
except ProfileManager.DoesNotExist:
raise ValidationError("Profile not found")
# Get the specific API key being used
try:
active_api_key = api.api_keys.get(api_key=api_key, is_active=True)
except api.api_keys.model.DoesNotExist:
raise ValidationError("API key not found or not active for this deployment")
# Check if the profile owner matches the API key owner
if profile.created_by != active_api_key.created_by:
raise ValidationError("You can only use profiles that you own")
return value
class ExecutionQuerySerializer(Serializer):
execution_id = CharField(required=True)
include_metadata = BooleanField(default=False)
include_metrics = BooleanField(default=False)
def validate_execution_id(self, value):
"""Trim spaces, validate UUID format, and check if execution_id exists."""
value = value.strip()
# Validate UUID format
try:
uuid_obj = uuid.UUID(value)
except ValueError:
raise ValidationError(
f"Invalid execution_id '{value}'. Must be a valid UUID."
)
# Check if UUID exists in the database
exists = WorkflowExecution.objects.filter(id=uuid_obj).exists()
if not exists:
raise ExecutionDoesNotExistError(
f"Execution with ID '{value}' does not exist."
)
return str(uuid_obj)
class APIDeploymentListSerializer(ModelSerializer):
workflow_name = CharField(source="workflow.workflow_name", read_only=True)
created_by_email = SerializerMethodField()
last_5_run_statuses = SerializerMethodField()
run_count = SerializerMethodField()
last_run_time = SerializerMethodField()
class Meta:
model = APIDeployment
fields = [
"id",
"workflow",
"workflow_name",
"display_name",
"description",
"is_active",
"api_endpoint",
"api_name",
"created_by",
"created_by_email",
"last_5_run_statuses",
"run_count",
"last_run_time",
]
def get_created_by_email(self, obj):
"""Get the email of the creator."""
return obj.created_by.email if obj.created_by else None
def get_run_count(self, instance) -> int:
"""Get total execution count for this API deployment."""
return WorkflowExecution.objects.filter(pipeline_id=instance.id).count()
def get_last_run_time(self, instance) -> str | None:
"""Get the timestamp of the most recent execution."""
last_execution = (
WorkflowExecution.objects.filter(pipeline_id=instance.id)
.order_by("-created_at")
.first()
)
return last_execution.created_at.isoformat() if last_execution else None
def get_last_5_run_statuses(self, instance) -> list[dict]:
"""Fetch the last 5 execution statuses with timestamps for this API deployment."""
return WorkflowExecution.get_last_run_statuses(instance.id, limit=5)
class APIKeyListSerializer(ModelSerializer):
class Meta:
model = APIKey
fields = [
"id",
"created_at",
"modified_at",
"api_key",
"is_active",
"description",
"api",
"pipeline",
]
class DeploymentResponseSerializer(Serializer):
is_active = CharField()
id = CharField()
api_key = CharField()
api_endpoint = CharField()
display_name = CharField()
description = CharField()
api_name = CharField()
class APIExecutionResponseSerializer(Serializer):
execution_status = CharField()
status_api = CharField()
error = CharField()
result = JSONField()
class SharedUserListSerializer(ModelSerializer):
"""Serializer for returning API deployment with shared user details."""
shared_users = SerializerMethodField()
created_by = SerializerMethodField()
class Meta:
model = APIDeployment
fields = ["id", "display_name", "shared_users", "shared_to_org", "created_by"]
def get_shared_users(self, obj):
"""Return list of shared users with id and email."""
return [{"id": user.id, "email": user.email} for user in obj.shared_users.all()]
def get_created_by(self, obj):
"""Return creator details."""
if obj.created_by:
return {"id": obj.created_by.id, "email": obj.created_by.email}
return None