Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions backend/api_v2/api_deployment_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from django.db.models import QuerySet
from django.http import HttpResponse
from permissions.permission import IsOwner
from prompt_studio.prompt_profile_manager_v2.models import ProfileManager
from rest_framework import serializers, status, views, viewsets
from rest_framework.decorators import action
from rest_framework.request import Request
Expand Down Expand Up @@ -55,6 +56,12 @@ def post(
include_metrics = serializer.validated_data.get(ApiExecution.INCLUDE_METRICS)
use_file_history = serializer.validated_data.get(ApiExecution.USE_FILE_HISTORY)
tag_names = serializer.validated_data.get(ApiExecution.TAGS)
llm_profile_id = serializer.validated_data.get(ApiExecution.LLM_PROFILE_ID)

# Validate llm_profile_id ownership if provided
if llm_profile_id:
self._validate_llm_profile_ownership(llm_profile_id, api)

response = DeploymentHelper.execute_workflow(
organization_name=org_name,
api=api,
Expand All @@ -64,6 +71,7 @@ def post(
include_metrics=include_metrics,
use_file_history=use_file_history,
tag_names=tag_names,
llm_profile_id=llm_profile_id,
)
if "error" in response and response["error"]:
return Response(
Expand All @@ -72,6 +80,36 @@ def post(
)
return Response({"message": response}, status=status.HTTP_200_OK)

def _validate_llm_profile_ownership(
self, llm_profile_id: str, api: APIDeployment
) -> None:
"""Validate that the llm_profile_id belongs to the API key owner.

Args:
llm_profile_id (str): The profile ID to validate
api (APIDeployment): The API deployment instance

Raises:
ValidationError: If profile doesn't exist or user doesn't own it
"""
try:
profile = ProfileManager.objects.get(profile_id=llm_profile_id)
except ProfileManager.DoesNotExist:
raise serializers.ValidationError({"llm_profile_id": "Profile not found"})

# Get the API key owner from the active API key
active_api_key = api.api_keys.filter(is_active=True).first()
if not active_api_key:
raise serializers.ValidationError(
{"api": "No active API key found for this deployment"}
)

# Check if the profile owner matches the API key owner
if profile.created_by != active_api_key.created_by:
raise serializers.ValidationError(
{"llm_profile_id": "You can only use profiles that you own"}
)

@DeploymentHelper.validate_api_key
def get(
self, request: Request, org_name: str, api_name: str, api: APIDeployment
Expand Down
1 change: 1 addition & 0 deletions backend/api_v2/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ class ApiExecution:
USE_FILE_HISTORY: str = "use_file_history" # Undocumented parameter
EXECUTION_ID: str = "execution_id"
TAGS: str = "tags"
LLM_PROFILE_ID: str = "llm_profile_id"
3 changes: 3 additions & 0 deletions backend/api_v2/deployment_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ def execute_workflow(
include_metrics: bool = False,
use_file_history: bool = False,
tag_names: list[str] = [],
llm_profile_id: str | None = None,
) -> ReturnDict:
"""Execute workflow by api.

Expand All @@ -153,6 +154,7 @@ def execute_workflow(
use_file_history (bool): Use FileHistory table to return results on already
processed files. Defaults to False
tag_names (list(str)): list of tag names
llm_profile_id (str, optional): LLM profile ID for overriding tool settings

Returns:
ReturnDict: execution status/ result
Expand Down Expand Up @@ -185,6 +187,7 @@ def execute_workflow(
execution_id=execution_id,
queue=CeleryQueue.CELERY_API_DEPLOYMENTS,
use_file_history=use_file_history,
llm_profile_id=llm_profile_id,
)
result.status_api = DeploymentHelper.construct_status_endpoint(
api_endpoint=api.api_endpoint, execution_id=execution_id
Expand Down
3 changes: 3 additions & 0 deletions backend/api_v2/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ class ExecutionRequestSerializer(TagParamsSerializer):
helpful for demos.
tags (str): Comma-separated List of tags to associate with the execution.
e.g:'tag1,tag2-name,tag3_name'
llm_profile_id (str): UUID of the LLM profile to override the default profile.
If not provided, the default profile will be used.
"""

MAX_FILES_ALLOWED = 32
Expand All @@ -124,6 +126,7 @@ class ExecutionRequestSerializer(TagParamsSerializer):
include_metadata = BooleanField(default=False)
include_metrics = BooleanField(default=False)
use_file_history = BooleanField(default=False)
llm_profile_id = CharField(required=False, allow_null=True, allow_blank=True)
files = ListField(
child=FileField(),
required=True,
Expand Down
3 changes: 3 additions & 0 deletions backend/workflow_manager/endpoint_v2/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -810,13 +810,15 @@ def add_file_to_volume(
workflow_file_execution: WorkflowFileExecution,
tags: list[str],
file_hash: FileHash,
llm_profile_id: str | None = None,
) -> str:
"""Add input file to execution directory.

Args:
workflow_file_execution: WorkflowFileExecution model
tags (list[str]): Tag names associated with the workflow execution.
file_hash: FileHash model
llm_profile_id (str, optional): LLM profile ID for overriding tool settings.

Raises:
InvalidSource: _description_
Expand Down Expand Up @@ -849,6 +851,7 @@ def add_file_to_volume(
file_execution_id=workflow_file_execution.id,
source_hash=file_content_hash,
tags=tags,
llm_profile_id=llm_profile_id,
)
return file_content_hash

Expand Down
134 changes: 134 additions & 0 deletions backend/workflow_manager/execution/check_rule.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import operator
from typing import Any


class ResultKeys:
METADATA = "metadata"
CONFIDENCE_DATA = "confidence_data"
OUTPUT = "output"


def _check_confidence(field_entries, threshold):
"""Check if any confidence value meets or exceeds the threshold."""
return any(
float(entry.get("confidence", 0)) * 100 >= threshold
for entries in field_entries
if entries
for entry in entries
if "confidence" in entry
)


def _get_field_value(context: dict[str, Any], keys: list[str]) -> Any:
"""Retrieve nested field value using dot notation keys."""
field_value = context.get(ResultKeys.OUTPUT, {})
for key in keys:
if isinstance(field_value, dict):
field_value = field_value.get(key, {})
else:
return None
return field_value


def _evaluate_rule(rule: dict[str, Any], context: dict[str, Any]) -> bool:
"""Evaluate a single rule against the context."""
operator_map = {
"less": operator.lt,
"greater": operator.gt,
"equal": operator.eq,
"not_equal": operator.ne,
"less_or_equal": operator.le,
"greater_or_equal": operator.ge,
"starts_with": lambda field, value: str(field).startswith(value),
"ends_with": lambda field, value: str(field).endswith(value),
"like": lambda field, value: value in str(field),
"not_like": lambda field, value: value not in str(field),
}

field = rule["properties"]["field"]
operator_name = rule["properties"]["operator"]
value = rule["properties"]["value"][0]
keys = field.split(".")

if keys[0] == "jsonField":
field_value = _get_field_value(context, keys[1:])
elif keys[0] == "confidence":
confidence_data = context.get(ResultKeys.METADATA, {}).get(
ResultKeys.CONFIDENCE_DATA, {}
)
field_entries = confidence_data.get(keys[1], [])
return _check_confidence(field_entries, value)
else:
return False

try:
if isinstance(field_value, bool):
field_value = str(field_value).lower()
value = str(value).lower()
return operator_map[operator_name](field_value, value)
except Exception as e:
print(f"Error evaluating rule: {e}")
return False


def _evaluate_group(group: dict[str, Any], context: dict[str, Any]) -> bool:
"""Evaluate a group of rules or nested groups."""
conjunction = group["properties"].get("conjunction", "and").lower()
negate = group["properties"].get("not", False)

results = []
for child in group.get("children1", []):
if child["type"] == "rule":
results.append(_evaluate_rule(child, context))
elif child["type"] in ["group", "rule_group"]:
results.append(_evaluate_group(child, context))

result = all(results) if conjunction == "and" else any(results)
return not result if negate else result


rules_json = {
"type": "group",
"id": "b88b8b8b-89ab-4cde-b012-31958e37dae3",
"children1": [
{
"type": "rule_group",
"id": "989ab9b8-0123-4456-b89a-b1958e37e4a8",
"properties": {
"conjunction": "AND",
"not": False,
"field": "jsonField.newrule_2",
"fieldSrc": "field",
},
"children1": [
{
"type": "rule",
"id": "9a89a8b9-89ab-4cde-b012-31958e7d3ee2",
"properties": {
"fieldSrc": "field",
"field": "jsonField.newrule_2.city",
"operator": "equal",
"value": ["Cannanore"],
"valueSrc": ["value"],
"valueType": ["text"],
"valueError": [None],
},
}
],
}
],
"properties": {"conjunction": "AND", "not": False},
}

result = {
"output": {
"newrule_1": "Mr. Vishnu Sathyanesan",
"newrule_2": {
"address": "165/26, Panjanyan, West Thayinari Kara Road",
"city": "Cannanore",
"district": "Kannur",
},
}
}

print(_evaluate_group(rules_json, result))
1 change: 1 addition & 0 deletions backend/workflow_manager/workflow_v2/dto.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ class FileData:
execution_mode: str
use_file_history: bool
q_file_no_list: list[int]
llm_profile_id: str | None = None

@classmethod
def from_dict(cls, data: dict[str, Any]) -> FileData:
Expand Down
3 changes: 3 additions & 0 deletions backend/workflow_manager/workflow_v2/file_execution_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ def _process_file(
workflow_file_execution,
file_hash,
workflow_execution,
file_data,
)

# Update file execution tracker with updated file hash
Expand Down Expand Up @@ -671,6 +672,7 @@ def _prepare_file_for_processing(
workflow_file_exec: WorkflowFileExecution,
file_hash: FileHash,
workflow_execution: WorkflowExecution,
file_data: FileData,
) -> str:
"""Handle file preparation and volume storage."""
workflow_file_exec.update_status(ExecutionStatus.EXECUTING)
Expand All @@ -687,6 +689,7 @@ def _prepare_file_for_processing(
workflow_file_execution=workflow_file_exec,
tags=workflow_execution.tag_names,
file_hash=file_hash,
llm_profile_id=file_data.llm_profile_id,
)
file_hash.file_hash = content_hash
workflow_file_exec.update(file_hash=content_hash)
Expand Down
12 changes: 11 additions & 1 deletion backend/workflow_manager/workflow_v2/workflow_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ def process_input_files(
scheduled: bool,
execution_mode: tuple[str, str],
use_file_history: bool,
llm_profile_id: str | None,
) -> str | None:
total_files = len(input_files)
workflow_log.publish_initial_workflow_logs(total_files=total_files)
Expand Down Expand Up @@ -196,6 +197,7 @@ def process_input_files(
execution_mode=mode,
use_file_history=use_file_history,
q_file_no_list=list(q_file_no_list) if q_file_no_list else [],
llm_profile_id=llm_profile_id,
)
batch_data = FileBatchData(files=batch, file_data=file_data)

Expand Down Expand Up @@ -255,6 +257,7 @@ def run_workflow(
single_step: bool = False,
execution_mode: tuple[str, str] | None = None,
use_file_history: bool = True,
llm_profile_id: str | None = None,
) -> ExecutionResponse:
tool_instances: list[ToolInstance] = (
ToolInstanceHelper.get_tool_instances_by_workflow(
Expand Down Expand Up @@ -304,6 +307,7 @@ def run_workflow(
scheduled=scheduled,
use_file_history=use_file_history,
execution_mode=execution_mode,
llm_profile_id=llm_profile_id,
)
api_results = []
return ExecutionResponse(
Expand Down Expand Up @@ -424,6 +428,7 @@ def execute_workflow_async(
pipeline_id: str | None = None,
queue: str | None = None,
use_file_history: bool = True,
llm_profile_id: str | None = None,
) -> ExecutionResponse:
"""Adding a workflow to the queue for execution.

Expand All @@ -435,6 +440,7 @@ def execute_workflow_async(
queue (Optional[str]): Name of the celery queue to push into
use_file_history (bool): Use FileHistory table to return results on already
processed files. Defaults to True
llm_profile_id (str, optional): LLM profile ID for overriding tool settings

Returns:
ExecutionResponse: Existing status of execution
Expand All @@ -459,6 +465,7 @@ def execute_workflow_async(
"pipeline_id": pipeline_id,
"log_events_id": log_events_id,
"use_file_history": use_file_history,
"llm_profile_id": llm_profile_id,
},
queue=queue,
)
Expand Down Expand Up @@ -625,14 +632,16 @@ def execute_workflow(
workflow = Workflow.objects.get(id=workflow_id)
# TODO: Make use of WorkflowExecution.get_or_create()
try:
# Filter out llm_profile_id from kwargs as create_workflow_execution doesn't accept it
filtered_kwargs = {k: v for k, v in kwargs.items() if k != "llm_profile_id"}
workflow_execution = WorkflowExecutionServiceHelper.create_workflow_execution(
workflow_id=workflow_id,
single_step=False,
pipeline_id=pipeline_id,
mode=WorkflowExecution.Mode.QUEUE,
execution_id=execution_id,
total_files=len(hash_values),
**kwargs, # type: ignore
**filtered_kwargs, # type: ignore
)
except IntegrityError:
# Use existing instance on retry attempt
Expand All @@ -650,6 +659,7 @@ def execute_workflow(
execution_mode=execution_mode,
hash_values_of_files=hash_values,
use_file_history=use_file_history,
llm_profile_id=kwargs.get("llm_profile_id"),
)
except Exception as error:
error_message = traceback.format_exc()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,9 @@
.manage-llm-pro-icon {
font-size: 10px;
}

.action-buttons-container {
display: flex;
align-items: center;
gap: 8px;
}
Loading