Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions elyra/kfp/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
from kubernetes.client.models import V1VolumeMount

from elyra._version import __version__
from elyra.pipeline.pipeline import KubernetesSecret
from elyra.pipeline.pipeline import VolumeMount
from elyra.pipeline.pipeline_utilities import KubernetesSecret
from elyra.pipeline.pipeline_utilities import VolumeMount

"""
The ExecuteFileOp uses a python script to bootstrap the user supplied image with the required dependencies.
Expand Down
73 changes: 64 additions & 9 deletions elyra/pipeline/airflow/processor_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@
from elyra.pipeline.pipeline import Pipeline
from elyra.pipeline.pipeline_constants import COS_OBJECT_PREFIX
from elyra.pipeline.pipeline_constants import KUBERNETES_SECRETS
from elyra.pipeline.pipeline_constants import MOUNTED_VOLUMES
from elyra.pipeline.pipeline_utilities import KubernetesSecret
from elyra.pipeline.pipeline_utilities import VolumeMount
from elyra.pipeline.processor import PipelineProcessor
from elyra.pipeline.processor import PipelineProcessorResponse
from elyra.pipeline.processor import RuntimePipelineProcessor
Expand Down Expand Up @@ -330,11 +331,11 @@ def _cc_pipeline(self, pipeline: Pipeline, pipeline_name: str, pipeline_instance
"cpu_request": operation.cpu,
"mem_request": operation.memory,
"gpu_limit": operation.gpu,
"operator_source": operation.component_params["filename"],
"is_generic_operator": True,
"operator_source": operation.filename,
"is_generic_operator": operation.is_generic,
"doc": operation.doc,
"volume_mounts": operation.component_params.get(MOUNTED_VOLUMES, []),
"kubernetes_secrets": operation.component_params.get(KUBERNETES_SECRETS, []),
"volumes": operation.mounted_volumes,
"secrets": operation.kubernetes_secrets,
}

if runtime_image_pull_secret is not None:
Expand Down Expand Up @@ -445,8 +446,9 @@ def _cc_pipeline(self, pipeline: Pipeline, pipeline_name: str, pipeline_instance
"parent_operation_ids": operation.parent_operation_ids,
"component_params": operation.component_params_as_dict,
"operator_source": component.component_source,
"is_generic_operator": False,
"is_generic_operator": operation.is_generic,
"doc": operation.doc,
"volumes": operation.mounted_volumes,
}

target_ops.append(target_op)
Expand Down Expand Up @@ -491,10 +493,9 @@ def create_pipeline_file(
template_env = Environment(loader=loader)

template_env.filters["regex_replace"] = lambda string: self._scrub_invalid_characters(string)

template = template_env.get_template("airflow_template.jinja2")

target_ops = self._cc_pipeline(pipeline, pipeline_name, pipeline_instance_id)
ordered_ops = self._cc_pipeline(pipeline, pipeline_name, pipeline_instance_id)
runtime_configuration = self._get_metadata_configuration(
schemaspace=Runtimes.RUNTIMES_SCHEMASPACE_ID, name=pipeline.runtime_config
)
Expand All @@ -505,15 +506,20 @@ def create_pipeline_file(
if pipeline_description is None:
pipeline_description = f"Created with Elyra {__version__} pipeline editor using `{pipeline.source}`."

pipeline_volumes = self._collect_mounted_volumes(ordered_ops)
pipeline_secrets = self._collect_kubernetes_secrets(ordered_ops, cos_secret)

python_output = template.render(
operations_list=target_ops,
operations_list=ordered_ops,
pipeline_name=pipeline_instance_id,
user_namespace=user_namespace,
cos_secret=cos_secret,
kube_config_path=None,
is_paused_upon_creation="False",
in_cluster="True",
pipeline_description=pipeline_description,
secrets=pipeline_secrets,
volumes=pipeline_volumes,
)

# Write to python file and fix formatting
Expand Down Expand Up @@ -581,6 +587,55 @@ def _get_node_name(self, operations_list: list, node_id: str) -> Optional[str]:
return operation["notebook"]
return None

def _collect_mounted_volumes(self, ordered_ops: OrderedDict) -> Dict[str, VolumeMount]:
"""
Collect all volumes necessary to run a pipeline
"""
pipeline_volumes = {}

for op_id, operation in ordered_ops.items():
operation_volumes = {}
operation_volume_mounts = []
for idx, secret in enumerate(operation.get("volumes", [])):
volume_name = f"volume_{op_id}_{idx}"
operation_volumes[volume_name] = secret
operation_volume_mounts.append(f"mount_{volume_name}")

operation["volumes"] = list(operation_volumes.keys())
operation["volume_mounts"] = operation_volume_mounts
pipeline_volumes.update(operation_volumes)

return pipeline_volumes

def _collect_kubernetes_secrets(
self, ordered_ops: OrderedDict, cos_secret: Optional[str] = None
) -> Dict[str, KubernetesSecret]:
"""
Collect all Kubernetes secrets necessary to run a pipeline
"""
pipeline_secrets = {}
if cos_secret:
pipeline_secrets["env_var_secret_id"] = KubernetesSecret(
env_var="AWS_ACCESS_KEY_ID",
name=cos_secret,
key="AWS_ACCESS_KEY_ID",
)
pipeline_secrets["env_var_secret_key"] = KubernetesSecret(
env_var="AWS_SECRET_ACCESS_KEY",
name=cos_secret,
key="AWS_SECRET_ACCESS_KEY",
)

for op_id, operation in ordered_ops.items():
operation_secrets = {}
for idx, secret in enumerate(operation.get("secrets", [])):
operation_secrets[f"secret_{op_id}_{idx}"] = secret

operation[KUBERNETES_SECRETS] = list(operation_secrets.keys())
pipeline_secrets.update(operation_secrets)

return pipeline_secrets


class AirflowPipelineProcessorResponse(PipelineProcessorResponse):

Expand Down
29 changes: 25 additions & 4 deletions elyra/pipeline/kfp/processor_kfp.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
from kfp.dsl import PipelineConf
from kfp.aws import use_aws_secret # noqa H306
from kubernetes import client as k8s_client
from kubernetes.client import V1PersistentVolumeClaimVolumeSource
from kubernetes.client import V1Volume
from kubernetes.client import V1VolumeMount

try:
from kfp_tekton import compiler as kfp_tekton_compiler
Expand All @@ -47,8 +50,6 @@
from elyra.pipeline.pipeline import Operation
from elyra.pipeline.pipeline import Pipeline
from elyra.pipeline.pipeline_constants import COS_OBJECT_PREFIX
from elyra.pipeline.pipeline_constants import KUBERNETES_SECRETS
from elyra.pipeline.pipeline_constants import MOUNTED_VOLUMES
from elyra.pipeline.processor import PipelineProcessor
from elyra.pipeline.processor import PipelineProcessorResponse
from elyra.pipeline.processor import RuntimePipelineProcessor
Expand Down Expand Up @@ -546,8 +547,8 @@ def _cc_pipeline(
"mlpipeline-metrics": f"{pipeline_envs['ELYRA_WRITABLE_CONTAINER_DIR']}/mlpipeline-metrics.json", # noqa
"mlpipeline-ui-metadata": f"{pipeline_envs['ELYRA_WRITABLE_CONTAINER_DIR']}/mlpipeline-ui-metadata.json", # noqa
},
volume_mounts=operation.component_params.get(MOUNTED_VOLUMES, []),
kubernetes_secrets=operation.component_params.get(KUBERNETES_SECRETS, []),
volume_mounts=operation.mounted_volumes,
kubernetes_secrets=operation.kubernetes_secrets,
)

if operation.doc:
Expand Down Expand Up @@ -653,9 +654,29 @@ def _cc_pipeline(
container_op = factory_function(**sanitized_component_params)
container_op.set_display_name(operation.name)

# Attach node comment
if operation.doc:
container_op.add_pod_annotation("elyra/node-user-doc", operation.doc)

# Add user-specified volume mounts: the referenced PVCs must exist
# or this operation will fail
if operation.mounted_volumes:
unique_pvcs = []
for volume_mount in operation.mounted_volumes:
if volume_mount.pvc_name not in unique_pvcs:
container_op.add_volume(
V1Volume(
name=volume_mount.pvc_name,
persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(
claim_name=volume_mount.pvc_name
),
)
)
unique_pvcs.append(volume_mount.pvc_name)
container_op.add_volume_mount(
V1VolumeMount(mount_path=volume_mount.path, name=volume_mount.pvc_name)
)

target_ops[operation.id] = container_op
except Exception as e:
# TODO Fix error messaging and break exceptions down into categories
Expand Down
70 changes: 2 additions & 68 deletions elyra/pipeline/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ def _nodes_to_operations(
raise ValueError(f"Node type '{node.type}' is invalid!")
# parse each node as a pipeline operation

operation = self._create_pipeline_operation(node, super_node)
operation = Operation.create_instance(node, super_node)

# assoicate user comment as docs to operations
# associate user comment as docs to operations
comment = pipeline_definition.get_node_comments(node.id)
if comment:
operation.doc = comment
Expand All @@ -121,69 +121,3 @@ def _super_node_to_operations(
pipeline = pipeline_definition.get_pipeline_definition(pipeline_id)
# recurse to process nodes of super-node
return self._nodes_to_operations(pipeline_definition, pipeline_object, pipeline.nodes, super_node)

def _create_pipeline_operation(self, node: Node, super_node: Node = None) -> Operation:
"""
Creates a pipeline operation instance from the given node.
The node and super_node are used to build the list of parent_operation_ids (links) to
the node (operation dependencies).
"""
parent_operations = PipelineParser._get_parent_operation_links(node.to_dict()) # parse links as dependencies
if super_node: # gather parent-links tied to embedded nodes inputs
parent_operations.extend(PipelineParser._get_parent_operation_links(super_node.to_dict(), node.id))

return Operation.create_instance(
id=node.id,
type=node.type,
classifier=node.op,
name=node.label,
parent_operation_ids=parent_operations,
component_params=node.get("component_parameters", {}),
)

@staticmethod
def _get_port_node_id(link: Dict) -> [None, str]:
"""
Gets the id of the node corresponding to the linked out port.
If the link is on a super_node the appropriate node_id is actually
embedded in the port_id_ref value.
"""
node_id = None
if "port_id_ref" in link:
if link["port_id_ref"] == "outPort": # Regular execution node
if "node_id_ref" in link:
node_id = link["node_id_ref"]
elif link["port_id_ref"].endswith("_outPort"): # Super node
# node_id_ref is the super-node, but the prefix of port_id_ref, is the actual node-id
node_id = link["port_id_ref"].split("_")[0]
return node_id

@staticmethod
def _get_input_node_ids(node_input: Dict) -> List[str]:
"""
Gets a list of node_ids corresponding to the linked out ports on the input node.
"""
input_node_ids = []
if "links" in node_input:
for link in node_input["links"]:
node_id = PipelineParser._get_port_node_id(link)
if node_id:
input_node_ids.append(node_id)
return input_node_ids

@staticmethod
def _get_parent_operation_links(node: Dict, embedded_node_id: Optional[str] = None) -> List[str]:
"""
Gets a list nodes_ids corresponding to parent nodes (outputs directed to this node).
For super_nodes, the node to use has an id of the embedded_node_id suffixed with '_inPort'.
"""
links = []
if "inputs" in node:
for node_input in node["inputs"]:
if embedded_node_id: # node is a super_node, handle matches to {embedded_node_id}_inPort
input_id = node_input.get("id")
if input_id == embedded_node_id + "_inPort":
links.extend(PipelineParser._get_input_node_ids(node_input))
else:
links.extend(PipelineParser._get_input_node_ids(node_input))
return links
Loading