diff --git a/conftest.py b/conftest.py index ce65984c8..bd97b75bd 100644 --- a/conftest.py +++ b/conftest.py @@ -32,17 +32,6 @@ "schema_name": "elyra-kfp-examples-catalog", } -AIRFLOW_COMPONENT_CACHE_INSTANCE = { - "display_name": "Airflow Example Components", - "metadata": { - "runtime_type": "APACHE_AIRFLOW", - "categories": ["examples"], - "paths": [ - "https://raw.githubusercontent.com/elyra-ai/elyra/main/elyra/tests/pipeline/resources/components/bash_operator.py" # noqa - ], - }, - "schema_name": "url-catalog", -} AIRFLOW_TEST_OPERATOR_CATALOG = { "display_name": "Airflow Test Operator", "metadata": { diff --git a/docs/source/images/user_guide/pipelines/pipeline-properties.png b/docs/source/images/user_guide/pipelines/pipeline-properties.png index 34dc36aeb..6e0c2e6e8 100644 Binary files a/docs/source/images/user_guide/pipelines/pipeline-properties.png and b/docs/source/images/user_guide/pipelines/pipeline-properties.png differ diff --git a/docs/source/user_guide/pipelines.md b/docs/source/user_guide/pipelines.md index ec4aa2315..897a5b12f 100644 --- a/docs/source/user_guide/pipelines.md +++ b/docs/source/user_guide/pipelines.md @@ -74,6 +74,11 @@ The [tutorials](/getting_started/tutorials.md) provide comprehensive step-by-ste Pipeline properties include: - An optional description, summarizing the pipeline purpose. + - Properties that appy to every pipeline node (both generic and custom) + - **Data volumes** + - A list of [Persistent Volume Claims](https://kubernetes.io/docs/concepts/storage/persistent-volumes/) (PVC) to be mounted into the container that executes the component. Format: `/mnt/path=existing-pvc-name`. Entries that are empty (`/mnt/path=`) or malformed are ignored. Entries with a PVC name considered to be an [invalid Kubernetes resource name](https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names) will raise a validation error after pipeline submission or export. + - The referenced PVCs must exist in the Kubernetes namespace where the pipeline nodes are executed. + - Data volumes are not mounted when the pipeline is executed locally. - Properties that apply to every generic pipeline node. In this release the following properties are supported: - **Object storage path prefix**. Elyra stores pipeline input and output artifacts in a cloud object storage bucket. By default these artifacts are located in the `/` path. The example below depicts the artifact location for several pipelines in the `pipeline-examples` bucket: ![artifacts default storage layout on object storage](../images/user_guide/pipelines/node-artifacts-on-object-storage.png) @@ -87,10 +92,6 @@ The [tutorials](/getting_started/tutorials.md) provide comprehensive step-by-ste - The value is ignored when the pipeline is executed locally. - **Environment variables** - A list of environment variables to be set in the container that executes the Jupyter notebook or script. Format: `ENV_VAR_NAME=value`. Entries that are empty (`ENV_VAR_NAME=`) or malformed are ignored. - - **Data volumes** - - A list of [Persistent Volume Claims](https://kubernetes.io/docs/concepts/storage/persistent-volumes/) (PVC) to be mounted into the container that executes the Jupyter notebook or script. Format: `/mnt/path=existing-pvc-name`. Entries that are empty (`/mnt/path=`) or malformed are ignored. Entries with a PVC name considered to be an [invalid Kubernetes resource name](https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names) will raise a validation error after pipeline submission or export. - - The referenced PVCs must exist in the Kubernetes namespace where the generic pipeline nodes are executed. - - Data volumes are not mounted when the pipeline is executed locally. - **Kubernetes secrets** - A list of [Kubernetes Secrets](https://kubernetes.io/docs/concepts/configuration/secret/) to be accessed as environment variables during Jupyter notebook or script execution. Format: `ENV_VAR=secret-name:secret-key`. Entries that are empty (`ENV_VAR=`) or malformed are ignored. Entries with a secret name considered to be an [invalid Kubernetes resource name](https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names) or with [an invalid secret key](https://kubernetes.io/docs/concepts/configuration/secret/#restriction-names-data) will raise a validation error after pipeline submission or export. - The referenced secrets must exist in the Kubernetes namespace where the generic pipeline nodes are executed. @@ -140,15 +141,17 @@ The [tutorials](/getting_started/tutorials.md) provide comprehensive step-by-ste - Optional. A list of files generated by the notebook inside the image to be passed as inputs to the next step of the pipeline. Specify one file, directory, or expression per line. Supported patterns are `*` and `?`. - Example: `data/*.csv` - **Data Volumes** - - Optional. A list of [Persistent Volume Claims](https://kubernetes.io/docs/concepts/storage/persistent-volumes/) (PVC) to be mounted into the container that executes the Jupyter notebook or script. Format: `/mnt/path=existing-pvc-name`. Entries that are empty (`/mnt/path=`) or malformed are ignored. Entries with a PVC name considered to be an [invalid Kubernetes resource name](https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names) will raise a validation error after pipeline submission or export. The referenced PVCs must exist in the Kubernetes namespace where the node is executed. - - Data volumes are not mounted when the pipeline is executed locally. - - Example: `/mnt/vol1=data-pvc` - **Kubernetes Secrets** - Optional. A list of [Kubernetes Secrets](https://kubernetes.io/docs/concepts/configuration/secret/) to be accessed as environment variables during Jupyter notebook or script execution. Format: `ENV_VAR=secret-name:secret-key`. Entries that are empty (`ENV_VAR=`) or malformed are ignored. Entries with a secret name considered to be an [invalid Kubernetes resource name](https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names) or with [an invalid secret key](https://kubernetes.io/docs/concepts/configuration/secret/#restriction-names-data) will raise a validation error after pipeline submission or export. The referenced secrets must exist in the Kubernetes namespace where the generic pipeline nodes are executed. - Secrets are ignored when the pipeline is executed locally. For remote execution, if an environment variable was assigned both a static value (via the 'Environment Variables' property) and a Kubernetes secret value, the secret's value is used. - Example: `ENV_VAR=secret-name:secret-key` + + Both generic and certain [custom components](pipeline-components.html#custom-components) support the following property: + + **Data Volumes** + - Optional. A list of [Persistent Volume Claims](https://kubernetes.io/docs/concepts/storage/persistent-volumes/) (PVC) to be mounted into the container that executes the component and allows for data exchange between components. Format: `/mnt/path=existing-pvc-name`. Entries that are empty (`/mnt/path=`) or malformed are ignored. Entries with a PVC name considered to be an [invalid Kubernetes resource name](https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names) will raise a validation error after pipeline submission or export. The referenced PVCs must exist in the Kubernetes namespace where the node is executed. Note that only certain Apache Airflow operators are capable of supporting volumes in the manner explained here. + - Data volumes are not mounted when the pipeline is executed locally. + - Example: `/mnt/vol1=data-pvc` 5. Associate each node with a comment to document its purpose. @@ -262,7 +265,7 @@ The pipeline dependencies output includes: - Python or R scripts - Local files that the notebooks or scripts require - Custom components - - Data volumes that generic nodes are mounting + - Data volumes that custom and generic nodes are mounting - Container images that generic nodes are using to run notebooks or scripts - Kubernetes secrets that generic nodes are exposing as environment variables to notebooks or scripts diff --git a/elyra/cli/pipeline_app.py b/elyra/cli/pipeline_app.py index ff80594c8..f60ee81ec 100644 --- a/elyra/cli/pipeline_app.py +++ b/elyra/cli/pipeline_app.py @@ -589,6 +589,9 @@ def describe(json_option, pipeline_path): for node in primary_pipeline.nodes: # update describe_dict stats that take into account every operation # (... there are none today) + # volumes + for vm in node.get_component_parameter(pipeline_constants.MOUNTED_VOLUMES, []): + describe_dict["volume_dependencies"]["value"].add(vm.pvc_name) if Operation.is_generic_operation(node.op): # update stats that are specific to generic components @@ -611,10 +614,6 @@ def describe(json_option, pipeline_path): node.get_component_parameter(pipeline_constants.RUNTIME_IMAGE) ) - # volumes - for vm in node.get_component_parameter(pipeline_constants.MOUNTED_VOLUMES, []): - describe_dict["volume_dependencies"]["value"].add(vm.pvc_name) - # Kubernetes secrets for ks in node.get_component_parameter(pipeline_constants.KUBERNETES_SECRETS, []): describe_dict["kubernetes_secret_dependencies"]["value"].add(ks.name) diff --git a/elyra/pipeline/airflow/processor_airflow.py b/elyra/pipeline/airflow/processor_airflow.py index ed1154262..3185be42e 100644 --- a/elyra/pipeline/airflow/processor_airflow.py +++ b/elyra/pipeline/airflow/processor_airflow.py @@ -20,6 +20,7 @@ import re import string import tempfile +from textwrap import dedent import time from typing import Dict from typing import List @@ -41,8 +42,6 @@ from elyra.pipeline.pipeline import Operation from elyra.pipeline.pipeline import Pipeline from elyra.pipeline.pipeline_constants import COS_OBJECT_PREFIX -from elyra.pipeline.pipeline_constants import KUBERNETES_SECRETS -from elyra.pipeline.pipeline_constants import MOUNTED_VOLUMES from elyra.pipeline.processor import PipelineProcessor from elyra.pipeline.processor import PipelineProcessorResponse from elyra.pipeline.processor import RuntimePipelineProcessor @@ -330,11 +329,11 @@ def _cc_pipeline(self, pipeline: Pipeline, pipeline_name: str, pipeline_instance "cpu_request": operation.cpu, "mem_request": operation.memory, "gpu_limit": operation.gpu, - "operator_source": operation.component_params["filename"], - "is_generic_operator": True, + "operator_source": operation.filename, + "is_generic_operator": operation.is_generic, "doc": operation.doc, - "volume_mounts": operation.component_params.get(MOUNTED_VOLUMES, []), - "kubernetes_secrets": operation.component_params.get(KUBERNETES_SECRETS, []), + "volumes": operation.mounted_volumes, + "secrets": operation.kubernetes_secrets, } if runtime_image_pull_secret is not None: @@ -445,8 +444,9 @@ def _cc_pipeline(self, pipeline: Pipeline, pipeline_name: str, pipeline_instance "parent_operation_ids": operation.parent_operation_ids, "component_params": operation.component_params_as_dict, "operator_source": component.component_source, - "is_generic_operator": False, + "is_generic_operator": operation.is_generic, "doc": operation.doc, + "volumes": operation.mounted_volumes, } target_ops.append(target_op) @@ -490,11 +490,19 @@ def create_pipeline_file( loader = PackageLoader("elyra", "templates/airflow") template_env = Environment(loader=loader) - template_env.filters["regex_replace"] = lambda string: self._scrub_invalid_characters(string) - + template_env.filters["regex_replace"] = lambda x: AirflowPipelineProcessor.scrub_invalid_characters(x) template = template_env.get_template("airflow_template.jinja2") - target_ops = self._cc_pipeline(pipeline, pipeline_name, pipeline_instance_id) + # Pass functions used to render data volumes and secrets to the template env + rendering_functions = { + "render_volumes_for_generic_op": AirflowPipelineProcessor.render_volumes_for_generic_op, + "render_executor_config_for_custom_op": AirflowPipelineProcessor.render_executor_config_for_custom_op, + "render_secrets_for_generic_op": AirflowPipelineProcessor.render_secrets_for_generic_op, + "render_secrets_for_cos": AirflowPipelineProcessor.render_secrets_for_cos, + } + template.globals.update(rendering_functions) + + ordered_ops = self._cc_pipeline(pipeline, pipeline_name, pipeline_instance_id) runtime_configuration = self._get_metadata_configuration( schemaspace=Runtimes.RUNTIMES_SCHEMASPACE_ID, name=pipeline.runtime_config ) @@ -506,7 +514,7 @@ def create_pipeline_file( pipeline_description = f"Created with Elyra {__version__} pipeline editor using `{pipeline.source}`." python_output = template.render( - operations_list=target_ops, + operations_list=ordered_ops, pipeline_name=pipeline_instance_id, user_namespace=user_namespace, cos_secret=cos_secret, @@ -544,11 +552,12 @@ def _create_unique_node_names(self, operation_list: List[Operation]) -> List[Ope def _scrub_invalid_characters_from_list(self, operation_list: List[Operation]) -> List[Operation]: for operation in operation_list: - operation.name = self._scrub_invalid_characters(operation.name) + operation.name = AirflowPipelineProcessor.scrub_invalid_characters(operation.name) return operation_list - def _scrub_invalid_characters(self, name: str) -> str: + @staticmethod + def scrub_invalid_characters(name: str) -> str: chars = re.escape(string.punctuation) clean_name = re.sub(r"[" + chars + "\\s]", "_", name) # noqa E226 return clean_name @@ -581,6 +590,128 @@ def _get_node_name(self, operations_list: list, node_id: str) -> Optional[str]: return operation["notebook"] return None + @staticmethod + def render_volumes_for_generic_op(op: Dict) -> str: + """ + Render any data volumes defined for the specified generic op for use in + the Airflow DAG template + + :returns: a string literal containing the python code to be rendered in the DAG + """ + if not op.get("volumes"): + return "" + op["volume_vars"] = [] # store variable names in op's dict for template to access + + # Include import statements and comment + str_to_render = f""" + from airflow.contrib.kubernetes.volume import Volume + from airflow.contrib.kubernetes.volume_mount import VolumeMount + # Volumes and mounts for operation '{op['id']}'""" + for idx, volume in enumerate(op.get("volumes", [])): + var_name = AirflowPipelineProcessor.scrub_invalid_characters(f"volume_{op['id']}_{idx}") + + # Define VolumeMount and Volume objects + str_to_render += f""" + mount_{var_name} = VolumeMount( + name='{volume.pvc_name}', + mount_path='{volume.path}', + sub_path=None, + read_only=False + ) + {var_name} = Volume( + name='{volume.pvc_name}', configs={{"persistentVolumeClaim": {{"claimName": "{volume.pvc_name}"}}}} + ) + """ + + op["volume_vars"].append(var_name) + + op["volume_mount_vars"] = [f"mount_{volume_var}" for volume_var in op["volume_vars"]] + return dedent(str_to_render) + + @staticmethod + def render_executor_config_for_custom_op(op: Dict) -> Dict[str, Dict[str, List]]: + """ + Render any data volumes defined for the specified custom op for use in + the Airflow DAG template + + :returns: a dict defining the volumes and mounts to be rendered in the DAG + """ + executor_config = {"KubernetesExecutor": {"volumes": [], "volume_mounts": []}} + for volume in op.get("volumes", []): + # Define volumes and volume mounts + executor_config["KubernetesExecutor"]["volumes"].append( + { + "name": volume.pvc_name, + "persistentVolumeClaim": {"claimName": volume.pvc_name}, + } + ) + executor_config["KubernetesExecutor"]["volume_mounts"].append( + {"mountPath": volume.path, "name": volume.pvc_name, "read_only": False} + ) + + return executor_config + + @staticmethod + def render_secrets_for_cos(cos_secret: str): + """ + Render the Kubernetes secrets required for COS + + :returns: a string literal of the python code to be rendered in the DAG + """ + if not cos_secret: + return "" + + return dedent( + f""" + from airflow.kubernetes.secret import Secret + ## Ensure that the secret '{cos_secret}' is defined in the Kubernetes namespace where the pipeline is run + env_var_secret_id = Secret( + deploy_type="env", + deploy_target="AWS_ACCESS_KEY_ID", + secret="{cos_secret}", + key="AWS_ACCESS_KEY_ID", + ) + env_var_secret_key = Secret( + deploy_type="env", + deploy_target="AWS_SECRET_ACCESS_KEY", + secret="{cos_secret}", + key="AWS_SECRET_ACCESS_KEY", + ) + """ + ) + + @staticmethod + def render_secrets_for_generic_op(op: Dict) -> str: + """ + Render any Kubernetes secrets defined for the specified op for use in + the Airflow DAG template + + :returns: a string literal containing the python code to be rendered in the DAG + """ + if not op.get("secrets"): + return "" + op["secret_vars"] = [] # store variable names in op's dict for template to access + + # Include import statement and comment + str_to_render = f""" + from airflow.kubernetes.secret import Secret + # Secrets for operation '{op['id']}'""" + for idx, secret in enumerate(op.get("secrets", [])): + var_name = AirflowPipelineProcessor.scrub_invalid_characters(f"secret_{op['id']}_{idx}") + + # Define Secret object + str_to_render += f""" + {var_name} = Secret( + deploy_type='env', + deploy_target="{secret.env_var}", + secret="{secret.name}", + key="{secret.key}", + ) + """ + + op["secret_vars"].append(var_name) + return dedent(str_to_render) + class AirflowPipelineProcessorResponse(PipelineProcessorResponse): diff --git a/elyra/pipeline/component_catalog.py b/elyra/pipeline/component_catalog.py index 61e213343..cdc857b6a 100644 --- a/elyra/pipeline/component_catalog.py +++ b/elyra/pipeline/component_catalog.py @@ -43,6 +43,7 @@ from elyra.pipeline.component import Component from elyra.pipeline.component import ComponentParser from elyra.pipeline.component_metadata import ComponentCatalogMetadata +from elyra.pipeline.pipeline_constants import ELYRA_COMPONENT_PROPERTIES from elyra.pipeline.runtime_type import RuntimeProcessorType BLOCKING_TIMEOUT = 0.5 @@ -667,12 +668,21 @@ def to_canvas_properties(component: Component) -> Dict: If component_id is one of the generic set, generic template is rendered, otherwise, the runtime-specific property template is rendered """ + kwargs = {} if ComponentCache.get_generic_component(component.id) is not None: template = ComponentCache.load_jinja_template("generic_properties_template.jinja2") else: + # Determine which component properties parsed from the definition + # collide with Elyra-defined properties (in the case of a collision, + # only the parsed property will be displayed) + kwargs = {"elyra_property_collisions_list": []} + for param in component.properties: + if param.ref in ELYRA_COMPONENT_PROPERTIES: + kwargs["elyra_property_collisions_list"].append(param.ref) + template = ComponentCache.load_jinja_template("canvas_properties_template.jinja2") - canvas_properties = template.render(component=component) + canvas_properties = template.render(component=component, **kwargs) return json.loads(canvas_properties) diff --git a/elyra/pipeline/kfp/processor_kfp.py b/elyra/pipeline/kfp/processor_kfp.py index ef0464422..79ee9aaee 100644 --- a/elyra/pipeline/kfp/processor_kfp.py +++ b/elyra/pipeline/kfp/processor_kfp.py @@ -27,6 +27,9 @@ from kfp.dsl import PipelineConf from kfp.aws import use_aws_secret # noqa H306 from kubernetes import client as k8s_client +from kubernetes.client import V1PersistentVolumeClaimVolumeSource +from kubernetes.client import V1Volume +from kubernetes.client import V1VolumeMount try: from kfp_tekton import compiler as kfp_tekton_compiler @@ -47,8 +50,6 @@ from elyra.pipeline.pipeline import Operation from elyra.pipeline.pipeline import Pipeline from elyra.pipeline.pipeline_constants import COS_OBJECT_PREFIX -from elyra.pipeline.pipeline_constants import KUBERNETES_SECRETS -from elyra.pipeline.pipeline_constants import MOUNTED_VOLUMES from elyra.pipeline.processor import PipelineProcessor from elyra.pipeline.processor import PipelineProcessorResponse from elyra.pipeline.processor import RuntimePipelineProcessor @@ -546,8 +547,8 @@ def _cc_pipeline( "mlpipeline-metrics": f"{pipeline_envs['ELYRA_WRITABLE_CONTAINER_DIR']}/mlpipeline-metrics.json", # noqa "mlpipeline-ui-metadata": f"{pipeline_envs['ELYRA_WRITABLE_CONTAINER_DIR']}/mlpipeline-ui-metadata.json", # noqa }, - volume_mounts=operation.component_params.get(MOUNTED_VOLUMES, []), - kubernetes_secrets=operation.component_params.get(KUBERNETES_SECRETS, []), + volume_mounts=operation.mounted_volumes, + kubernetes_secrets=operation.kubernetes_secrets, ) if operation.doc: @@ -653,9 +654,29 @@ def _cc_pipeline( container_op = factory_function(**sanitized_component_params) container_op.set_display_name(operation.name) + # Attach node comment if operation.doc: container_op.add_pod_annotation("elyra/node-user-doc", operation.doc) + # Add user-specified volume mounts: the referenced PVCs must exist + # or this operation will fail + if operation.mounted_volumes: + unique_pvcs = [] + for volume_mount in operation.mounted_volumes: + if volume_mount.pvc_name not in unique_pvcs: + container_op.add_volume( + V1Volume( + name=volume_mount.pvc_name, + persistent_volume_claim=V1PersistentVolumeClaimVolumeSource( + claim_name=volume_mount.pvc_name + ), + ) + ) + unique_pvcs.append(volume_mount.pvc_name) + container_op.add_volume_mount( + V1VolumeMount(mount_path=volume_mount.path, name=volume_mount.pvc_name) + ) + target_ops[operation.id] = container_op except Exception as e: # TODO Fix error messaging and break exceptions down into categories diff --git a/elyra/pipeline/pipeline.py b/elyra/pipeline/pipeline.py index 27ee868b8..7e99b07da 100644 --- a/elyra/pipeline/pipeline.py +++ b/elyra/pipeline/pipeline.py @@ -25,6 +25,8 @@ from typing import Optional from elyra.pipeline.pipeline_constants import ENV_VARIABLES +from elyra.pipeline.pipeline_constants import KUBERNETES_SECRETS +from elyra.pipeline.pipeline_constants import MOUNTED_VOLUMES # TODO: Make pipeline version available more widely # as today is only available on the pipeline editor @@ -97,6 +99,13 @@ def __init__( self._component_params = component_params or {} self._doc = None + self._mounted_volumes = [] + param_volumes = component_params.get(MOUNTED_VOLUMES) + if param_volumes and isinstance(param_volumes, list) and isinstance(param_volumes[0], VolumeMount): + # The mounted_volumes property is the Elyra system property (ie, not defined in the component + # spec) and must be removed from the component_params dict + self._mounted_volumes = self._component_params.pop(MOUNTED_VOLUMES, []) + # Scrub the inputs and outputs lists self._component_params["inputs"] = Operation._scrub_list(component_params.get("inputs", [])) self._component_params["outputs"] = Operation._scrub_list(component_params.get("outputs", [])) @@ -141,6 +150,10 @@ def component_params(self) -> Optional[Dict[str, Any]]: def component_params_as_dict(self) -> Dict[str, Any]: return self._component_params or {} + @property + def mounted_volumes(self) -> List["VolumeMount"]: + return self._mounted_volumes + @property def inputs(self) -> Optional[List[str]]: return self._component_params.get("inputs") @@ -153,6 +166,10 @@ def inputs(self, value: List[str]): def outputs(self) -> Optional[List[str]]: return self._component_params.get("outputs") + @property + def is_generic(self) -> bool: + return isinstance(self, GenericOperation) + @outputs.setter def outputs(self, value: List[str]): self._component_params["outputs"] = value @@ -308,6 +325,10 @@ def memory(self) -> Optional[str]: def gpu(self) -> Optional[str]: return self._component_params.get("gpu") + @property + def kubernetes_secrets(self) -> List["KubernetesSecret"]: + return self._component_params.get(KUBERNETES_SECRETS) + def __eq__(self, other: "GenericOperation") -> bool: if isinstance(self, other.__class__): return super().__eq__(other) diff --git a/elyra/pipeline/pipeline_constants.py b/elyra/pipeline/pipeline_constants.py index 0c2be96cb..cc729ca6d 100644 --- a/elyra/pipeline/pipeline_constants.py +++ b/elyra/pipeline/pipeline_constants.py @@ -22,3 +22,4 @@ PIPELINE_META_PROPERTIES = ["name", "description", "runtime"] # optional static prefix to be used when generating an object name for object storage COS_OBJECT_PREFIX = "cos_object_prefix" +ELYRA_COMPONENT_PROPERTIES = [MOUNTED_VOLUMES] diff --git a/elyra/pipeline/pipeline_definition.py b/elyra/pipeline/pipeline_definition.py index d846d66fd..0f5f748c2 100644 --- a/elyra/pipeline/pipeline_definition.py +++ b/elyra/pipeline/pipeline_definition.py @@ -19,19 +19,24 @@ from typing import Dict from typing import List from typing import Optional +from typing import Set -from jinja2 import Environment, Undefined +from jinja2 import Environment from jinja2 import PackageLoader +from jinja2 import Undefined +from elyra.pipeline.component_catalog import ComponentCache from elyra.pipeline.pipeline import KeyValueList from elyra.pipeline.pipeline import KubernetesSecret from elyra.pipeline.pipeline import Operation from elyra.pipeline.pipeline import VolumeMount +from elyra.pipeline.pipeline_constants import ELYRA_COMPONENT_PROPERTIES from elyra.pipeline.pipeline_constants import ENV_VARIABLES from elyra.pipeline.pipeline_constants import KUBERNETES_SECRETS from elyra.pipeline.pipeline_constants import MOUNTED_VOLUMES from elyra.pipeline.pipeline_constants import PIPELINE_DEFAULTS from elyra.pipeline.pipeline_constants import PIPELINE_META_PROPERTIES +from elyra.pipeline.runtime_type import RuntimeProcessorType class AppDataBase(object): # ABC @@ -216,7 +221,7 @@ def set_property(self, key: str, value: Any): self._node["app_data"]["properties"][key] = value - def convert_kv_properties(self, kv_properties: List[str]): + def convert_kv_properties(self, kv_properties: Set[str]): """ Convert pipeline defaults-level list properties that have been identified as sets of key-value pairs from a plain list type to the KeyValueList type. @@ -236,6 +241,7 @@ def convert_kv_properties(self, kv_properties: List[str]): class Node(AppDataBase): def __init__(self, node: Dict): super().__init__(node) + self._elyra_properties_to_skip = set() @property def type(self) -> str: @@ -297,6 +303,39 @@ def component_source(self) -> Optional[str]: return self._node["app_data"].get("component_source", None) return None + @property + def elyra_properties_to_skip(self) -> Set[str]: + """ + Elyra-defined node properties whose processing should be skipped + on the basis that their id collides with a property defined in + the component definition for this Node. + """ + return self._elyra_properties_to_skip + + def set_elyra_properties_to_skip(self, runtime_type_name: Optional[str]) -> None: + """ + Determine which Elyra-defined node-level properties to skip + on the basis that their id collides with a property defined in + the component definition for this Node. Then, set the node + property accordingly. + """ + if Operation.is_generic_operation(self.op): + # Generic operations will never have any collisions as all their properties are Elyra-owned + return + + if not runtime_type_name: + return + + runtime_type = RuntimeProcessorType.get_instance_by_name(runtime_type_name) + component = ComponentCache.instance().get_component(runtime_type, self.op) + if not component: + return + + # Properties that have the same ref (id) as Elyra-owned node properties + # should be skipped during property propagation and conversion + properties_to_skip = [prop.ref for prop in component.properties if prop.ref in ELYRA_COMPONENT_PROPERTIES] + self._elyra_properties_to_skip = set(properties_to_skip) + def get_component_parameter(self, key: str, default_value=None) -> Any: """ Retrieve component parameter values. @@ -305,7 +344,7 @@ def get_component_parameter(self, key: str, default_value=None) -> Any: :param default_value: a default value in case the key is not found :return: the value or the default value if the key is not found """ - value = self._node["app_data"]["component_parameters"].get(key, default_value) + value = self._node["app_data"].get("component_parameters", {}).get(key, default_value) return None if value == "None" else value def set_component_parameter(self, key: str, value: Any): @@ -321,7 +360,10 @@ def set_component_parameter(self, key: str, value: Any): if value is None: raise ValueError("Value is required") - self._node["app_data"]["component_parameters"][key] = value + if key not in self.elyra_properties_to_skip: + # This parameter has been parsed from a custom component definition and + # its value should not be manually set + self._node["app_data"]["component_parameters"][key] = value def get_all_component_parameters(self) -> Dict[str, Any]: """ @@ -329,7 +371,7 @@ def get_all_component_parameters(self) -> Dict[str, Any]: """ return self._node["app_data"]["component_parameters"] - def convert_kv_properties(self, kv_properties: List[str]): + def convert_kv_properties(self, kv_properties: Set[str]): """ Convert node-level list properties that have been identified as sets of key-value pairs from a plain list type to the KeyValueList type. If any @@ -338,7 +380,7 @@ def convert_kv_properties(self, kv_properties: List[str]): """ for kv_property in kv_properties: value = self.get_component_parameter(kv_property) - if not value: + if not value or not isinstance(value, list): # not list or KeyValueList continue if isinstance(value, KeyValueList) or not isinstance(value[0], str): @@ -571,8 +613,8 @@ def propagate_pipeline_default_properties(self): pipeline_default_properties = self.primary_pipeline.get_property(PIPELINE_DEFAULTS, {}) for node in self.pipeline_nodes: - if not Operation.is_generic_operation(node.op): - continue + # Determine which Elyra-owned properties collide with parsed properties (and therefore must be skipped) + node.set_elyra_properties_to_skip(self.primary_pipeline.type) # Convert any key-value list node properties to the KeyValueList type if not done already node.convert_kv_properties(kv_properties) @@ -686,7 +728,7 @@ def get_canvas_properties_from_template(package_name: str, template_name: str) - return json.loads(output) @staticmethod - def get_kv_properties() -> List[str]: + def get_kv_properties() -> Set[str]: """ Get pipeline properties in its canvas form and loop through to find those that should consist of key/value pairs, as given in @@ -696,14 +738,14 @@ def get_kv_properties() -> List[str]: package_name="templates/pipeline", template_name="pipeline_properties_template.jinja2" ) - kv_properties = [] + kv_properties = set() parameter_info = canvas_pipeline_properties.get("uihints", {}).get("parameter_info", []) for parameter in parameter_info: if parameter.get("data", {}).get("keyValueEntries", False): parameter_ref = parameter.get("parameter_ref", "") if parameter_ref.startswith("elyra_"): parameter_ref = parameter_ref.replace("elyra_", "") - kv_properties.append(parameter_ref) + kv_properties.add(parameter_ref) return kv_properties diff --git a/elyra/pipeline/validation.py b/elyra/pipeline/validation.py index da275acd4..485aa0107 100644 --- a/elyra/pipeline/validation.py +++ b/elyra/pipeline/validation.py @@ -480,6 +480,10 @@ async def _validate_custom_component_node_properties( current_parameter_defaults_list.remove("component_source") current_parameter_defaults_list.remove("label") + volumes = node.get_component_parameter(MOUNTED_VOLUMES) + if volumes and MOUNTED_VOLUMES not in node.elyra_properties_to_skip: + self._validate_mounted_volumes(node.id, node.label, volumes, response=response) + for default_parameter in current_parameter_defaults_list: node_param = node.get_component_parameter(default_parameter) if self._is_required_property(component_property_dict, default_parameter): diff --git a/elyra/templates/airflow/airflow_template.jinja2 b/elyra/templates/airflow/airflow_template.jinja2 index 246ff6e14..8c38bccd8 100644 --- a/elyra/templates/airflow/airflow_template.jinja2 +++ b/elyra/templates/airflow/airflow_template.jinja2 @@ -1,9 +1,6 @@ from airflow import DAG from airflow.utils.dates import days_ago -{% if cos_secret %} -from airflow.kubernetes.secret import Secret -{% endif %} args = { 'project_id' : '{{ pipeline_name }}', @@ -20,75 +17,30 @@ dag = DAG( is_paused_upon_creation={{ is_paused_upon_creation }}, ) -{% if cos_secret %} -## Ensure that the secret named '{{ cos_secret }}' is defined in the Kubernetes namespace where this pipeline will be run -env_var_secret_id = Secret(deploy_type='env', - deploy_target='AWS_ACCESS_KEY_ID', - secret='{{ cos_secret }}', - key='AWS_ACCESS_KEY_ID', -) -env_var_secret_key = Secret(deploy_type='env', - deploy_target='AWS_SECRET_ACCESS_KEY', - secret='{{ cos_secret }}', - key='AWS_SECRET_ACCESS_KEY', -) -{% endif %} -{% set ns = namespace() %} +{{ render_secrets_for_cos(cos_secret) }} {% for key, operation in operations_list.items() %} -{% set ns.operation_kubernetes_secrets = "" %} - {% if not operation.is_generic_operator %} {% for import_statement in operation.imports %} {{import_statement}} {% endfor %} {% else %} -from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator -{% endif %} +{{ render_secrets_for_generic_op(operation) }} +{{ render_volumes_for_generic_op(operation) }} -{% if operation.kubernetes_secrets %} -from airflow.kubernetes.secret import Secret -# Kubernetes secrets for operation '{{ operation.id|regex_replace }}' - {% for secret in operation.kubernetes_secrets %} -secret_{{ operation.id|regex_replace }}_{{ loop.index }} = Secret(deploy_type='env', - deploy_target='{{ secret.env_var }}', - secret='{{ secret.name }}', - key='{{ secret.key }}', -) - {% set ns.operation_kubernetes_secrets = ns.operation_kubernetes_secrets ~ 'secret_' ~ operation.id|regex_replace ~ '_' ~ loop.index ~ ', ' %} - {% endfor %} +from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator {% endif %} -{% if cos_secret %}{% set ns.operation_kubernetes_secrets = ns.operation_kubernetes_secrets ~ "env_var_secret_id, env_var_secret_key" %}{% endif %} - {% if operation.operator_source %}# Operator source: {{ operation.operator_source }}{% endif %}{% if not operation.is_generic_operator %} op_{{ operation.id|regex_replace }} = {{ operation.class_name }}( task_id='{{ operation.notebook|regex_replace }}', {% for param, value in operation.component_params.items() %} {{ param }}={{ value }}, {% endfor %} + {% if operation.volumes %} + executor_config={{ render_executor_config_for_custom_op(operation) }}, + {% endif %} {% else %} -{% if operation.volume_mounts %} -from airflow.contrib.kubernetes.volume import Volume -from airflow.contrib.kubernetes.volume_mount import VolumeMount -volumes_{{ operation.id|regex_replace }} = [] -volume_mounts_{{ operation.id|regex_replace }} = [] - {% for volume_mount in operation.volume_mounts %} -volume_mount_{{loop.index}} = VolumeMount(name='{{ volume_mount.pvc_name }}', - mount_path='{{ volume_mount.path }}', - sub_path=None, - read_only=False) -volume_config_{{ loop.index }}= { - 'persistentVolumeClaim': - { - 'claimName': '{{ volume_mount.pvc_name }}' - } - } -volume_{{ loop.index }} = Volume(name='{{ volume_mount.pvc_name }}', configs=volume_config_{{ loop.index }}) -volumes_{{ operation.id|regex_replace }}.append(volume_{{ loop.index }}) -volume_mounts_{{ operation.id|regex_replace }}.append(volume_mount_{{ loop.index }}) - {% endfor %} -{% endif %} op_{{ operation.id|regex_replace }} = KubernetesPodOperator(name='{{ operation.notebook|regex_replace }}', namespace='{{ user_namespace }}', image='{{ operation.runtime_image }}', @@ -99,10 +51,6 @@ op_{{ operation.id|regex_replace }} = KubernetesPodOperator(name='{{ operation.n arguments=["{{ operation.argument_list }}"], task_id='{{ operation.notebook|regex_replace }}', env_vars={{ operation.pipeline_envs }}, - {% if operation.volume_mounts %} - volumes=volumes_{{ operation.id|regex_replace }}, - volume_mounts=volume_mounts_{{ operation.id|regex_replace }}, - {% endif %} {% if operation.cpu_request or operation.mem_request or operation.gpu_limit %} resources = { {% if operation.cpu_request %} @@ -115,10 +63,13 @@ op_{{ operation.id|regex_replace }} = KubernetesPodOperator(name='{{ operation.n 'limit_gpu': '{{ operation.gpu_limit }}', {% endif %} }, - {% endif %} - {% if ns.operation_kubernetes_secrets %} - secrets=[{{ ns.operation_kubernetes_secrets }}], + {% if operation.secrets or cos_secret %} + secrets=[{% if operation.secrets %}{% for secret_var in operation.secret_vars %}{{ secret_var }},{% endfor %}{% endif %}{% if cos_secret %}env_var_secret_id, env_var_secret_key{% endif %}], + {% endif %} + {% if operation.volumes %} + volumes=[{% for volume_var in operation.volume_vars %}{{ volume_var }},{% endfor %}], + volume_mounts=[{% for mount_var in operation.volume_mount_vars %}{{ mount_var }},{% endfor %}], {% endif %} in_cluster={{ in_cluster }}, config_file="{{ kube_config_path }}", diff --git a/elyra/templates/components/canvas_properties_template.jinja2 b/elyra/templates/components/canvas_properties_template.jinja2 index 6798d3974..ca5b0eb47 100644 --- a/elyra/templates/components/canvas_properties_template.jinja2 +++ b/elyra/templates/components/canvas_properties_template.jinja2 @@ -25,6 +25,9 @@ } {% endif %}, {% endfor %} +{% if "mounted_volumes" not in elyra_property_collisions_list %} + "elyra_mounted_volumes": [], +{% endif %} "component_source": {{ component.component_source|tojson|safe }} }, "parameters": [ @@ -39,6 +42,11 @@ "id": "elyra_{{ property.ref }}" }, {% endfor %} +{% if "mounted_volumes" not in elyra_property_collisions_list %} + { + "id": "elyra_mounted_volumes" + }, +{% endif %} { "id": "component_source" } @@ -126,6 +134,25 @@ } }, {% endfor %} +{% if "mounted_volumes" not in elyra_property_collisions_list %} + { + "parameter_ref": "elyra_mounted_volumes", + "control": "custom", + "custom_control_id": "StringArrayControl", + "label": { + "default": "Data Volumes" + }, + "description": { + "default": "Volumes to be mounted in this node. The specified Persistent Volume Claims must exist in the Kubernetes namespace where the node is executed or this node will not run.", + "placement": "on_panel" + }, + "data": { + "required": false, + "placeholder": "/mount/path=pvc-name", + "keyValueEntries": true + } + }, +{% endif %} { "parameter_ref": "component_source", "control": "readonly", @@ -187,6 +214,25 @@ "parameter_refs": ["elyra_{{ property.ref }}"] }, {% endfor %} +{% endif %} +{% if "mounted_volumes" not in elyra_property_collisions_list %} + { + "id": "elyra_other_propertiesCategoryHeader", + "type": "textPanel", + "class_name": "elyra_categoryHeader", + "label": { + "default": "Additional Properties" + }, + "description": { + "default": "Additional properties used by Elyra that are not given in the component definition.", + "placement": "on_panel" + } + }, + { + "id": "elyra_mounted_volumes", + "type": "controls", + "parameter_refs": ["elyra_mounted_volumes"] + }, {% endif %} { "id": "elyra_component_sourceCategoryHeader", diff --git a/elyra/templates/pipeline/pipeline_properties_template.jinja2 b/elyra/templates/pipeline/pipeline_properties_template.jinja2 index 7afa1c2e0..8a57b112e 100644 --- a/elyra/templates/pipeline/pipeline_properties_template.jinja2 +++ b/elyra/templates/pipeline/pipeline_properties_template.jinja2 @@ -162,11 +162,28 @@ "parameter_refs": ["cos_object_prefix"] }, { - "id": "elyra_outputCategoryHeader", + "id": "elyra_nodesCategoryHeader", "type": "textPanel", "class_name": "elyra_categoryHeader", "label": { - "default": "Generic nodes defaults" + "default": "Node Defaults" + }, + "description": { + "default": "Default values are applied to all nodes in this pipeline and can be customized in each node.", + "placement": "on_panel" + } + }, + { + "id": "elyra_mounted_volumes", + "type": "controls", + "parameter_refs": ["elyra_mounted_volumes"] + }, + { + "id": "elyra_generic_nodesCategoryHeader", + "type": "textPanel", + "class_name": "elyra_categoryHeader", + "label": { + "default": "Generic Node Defaults" }, "description": { "default": "Default values are applied to all generic nodes in this pipeline and can be customized in each node.", @@ -187,11 +204,6 @@ "id": "elyra_kubernetes_secrets", "type": "controls", "parameter_refs": ["elyra_kubernetes_secrets"] - }, - { - "id": "elyra_mounted_volumes", - "type": "controls", - "parameter_refs": ["elyra_mounted_volumes"] } ] } diff --git a/elyra/tests/pipeline/airflow/test_component_parser_airflow.py b/elyra/tests/pipeline/airflow/test_component_parser_airflow.py index 1198ab473..43e34fdcd 100644 --- a/elyra/tests/pipeline/airflow/test_component_parser_airflow.py +++ b/elyra/tests/pipeline/airflow/test_component_parser_airflow.py @@ -18,7 +18,7 @@ from subprocess import CompletedProcess from subprocess import run -from conftest import AIRFLOW_COMPONENT_CACHE_INSTANCE +from conftest import AIRFLOW_TEST_OPERATOR_CATALOG from conftest import TEST_CATALOG_NAME import jupyter_core.paths import pytest @@ -49,7 +49,7 @@ def _get_resource_path(filename): return resource_path -@pytest.mark.parametrize("catalog_instance", [AIRFLOW_COMPONENT_CACHE_INSTANCE], indirect=True) +@pytest.mark.parametrize("catalog_instance", [AIRFLOW_TEST_OPERATOR_CATALOG], indirect=True) def test_component_catalog_can_load_components_from_registries(catalog_instance, component_cache): components = component_cache.get_all_components(RUNTIME_PROCESSOR) assert len(components) > 0 @@ -213,12 +213,14 @@ def test_parse_airflow_component_file(): # Helper method to retrieve the requested parameter value from the dictionary def get_parameter_value(param_name): + param_name = f"elyra_{param_name}" # add elyra_prefix to param name property_dict = properties_json["current_parameters"][param_name] return property_dict[property_dict["activeControl"]] # Helper method to retrieve the requested parameter info from the dictionary def get_parameter_format(param_name, control_id="StringControl"): param_info = None + param_name = f"elyra_{param_name}" # add elyra_prefix to param name for prop_info in properties_json["uihints"]["parameter_info"]: if prop_info.get("parameter_ref") == param_name: param_info = prop_info["data"]["controls"][control_id]["format"] @@ -229,6 +231,7 @@ def get_parameter_format(param_name, control_id="StringControl"): # Helper method to retrieve the requested parameter description from the dictionary def get_parameter_description(param_name): param_desc = None + param_name = f"elyra_{param_name}" # add elyra_prefix to param name for prop_info in properties_json["uihints"]["parameter_info"]: if prop_info.get("parameter_ref") == param_name: param_desc = prop_info["description"]["default"] @@ -239,6 +242,7 @@ def get_parameter_description(param_name): # Helper method to retrieve whether the requested parameter is required def get_parameter_required(param_name): param_info = None + param_name = f"elyra_{param_name}" # add elyra_prefix to param name for prop_info in properties_json["uihints"]["parameter_info"]: if prop_info.get("parameter_ref") == param_name: param_info = prop_info["data"]["required"] @@ -257,112 +261,119 @@ def get_parameter_required(param_name): assert properties_json["current_parameters"]["component_source"] == component_source # Ensure component parameters are prefixed with 'elyra_' and values are as expected - assert get_parameter_value("elyra_str_no_default") == "" - assert get_parameter_value("elyra_str_default") == "default" - assert get_parameter_value("elyra_str_empty") == "" - assert get_parameter_value("elyra_str_not_in_docstring") == "" + assert get_parameter_value("str_no_default") == "" + assert get_parameter_value("str_default") == "default" + assert get_parameter_value("str_empty") == "" + assert get_parameter_value("str_not_in_docstring") == "" - assert get_parameter_value("elyra_bool_no_default") is False - assert get_parameter_value("elyra_bool_default_false") is False - assert get_parameter_value("elyra_bool_default_true") is True - assert get_parameter_value("elyra_bool_not_in_docstring") is False + assert get_parameter_value("bool_no_default") is False + assert get_parameter_value("bool_default_false") is False + assert get_parameter_value("bool_default_true") is True + assert get_parameter_value("bool_not_in_docstring") is False - assert get_parameter_value("elyra_int_no_default") == 0 - assert get_parameter_value("elyra_int_default_zero") == 0 - assert get_parameter_value("elyra_int_default_non_zero") == 2 - assert get_parameter_value("elyra_int_not_in_docstring") == 3 + assert get_parameter_value("int_no_default") == 0 + assert get_parameter_value("int_default_zero") == 0 + assert get_parameter_value("int_default_non_zero") == 2 + assert get_parameter_value("int_not_in_docstring") == 3 - assert get_parameter_value("elyra_dict_default_is_none") == "{}" # {} - assert get_parameter_value("elyra_list_default_is_none") == "[]" # [] + assert get_parameter_value("dict_default_is_none") == "{}" # {} + assert get_parameter_value("list_default_is_none") == "[]" # [] # Ensure that type information is inferred correctly for properties that # define 'unusual' types, such as 'a dictionary of lists' - assert get_parameter_format("elyra_unusual_type_dict") == "dictionary" - assert get_parameter_format("elyra_unusual_type_list") == "list" + assert get_parameter_format("unusual_type_dict") == "dictionary" + assert get_parameter_format("unusual_type_list") == "list" + + # TestOperator has a property, 'mounted_volumes', whose id/ref collides with + # the system-defined property of the same id. In these cases, the parsed property + # should be preferred to the system-defined property, which should not appear. + # Here we ensure that the 'mounted_volumes' property is a string-type (as defined + # in the Operator class) rather than the system-defined list-type + assert get_parameter_format("mounted_volumes") == "string" # Ensure that type information falls back to string if no type hint present # and no ':type: ' phrase found in docstring - assert get_parameter_format("elyra_fallback_type") == "string" + assert get_parameter_format("fallback_type") == "string" # Ensure component parameters are marked as required in the correct circumstances # (parameter is required if there is no default value provided or if a type hint # does not include 'Optional[...]') - assert get_parameter_required("elyra_str_no_default") is True - assert get_parameter_required("elyra_str_default") is False - assert get_parameter_required("elyra_str_empty") is False + assert get_parameter_required("str_no_default") is True + assert get_parameter_required("str_default") is False + assert get_parameter_required("str_empty") is False # Ensure descriptions are rendered properly with type hint in parentheses assert ( - get_parameter_description("elyra_unusual_type_dict") == "a dictionary parameter with the " + get_parameter_description("unusual_type_dict") == "a dictionary parameter with the " "phrase 'list' in type description " "(type: a dictionary of arrays)" ) assert ( - get_parameter_description("elyra_unusual_type_list") == "a list parameter with the phrase " + get_parameter_description("unusual_type_list") == "a list parameter with the phrase " "'string' in type description " "(type: a list of strings)" ) - assert get_parameter_description("elyra_fallback_type") == "(type: str)" + assert get_parameter_description("fallback_type") == "(type: str)" # Ensure that a long description with line wrapping and a backslash escape has rendered # (and hence did not raise an error during json.loads in the properties API request) parsed_description = """a string parameter with a very long description that wraps lines and also has an escaped underscore in it, as shown here: (\_) # noqa W605""" modified_description = parsed_description.replace("\n", " ") + " (type: str)" # modify desc acc. to parser rules - assert get_parameter_description("elyra_long_description_property") == modified_description + assert get_parameter_description("long_description_property") == modified_description # Retrieve properties for DeriveFromTestOperator # DeriveFromTestOperator includes type hints for all init arguments properties_json = ComponentCache.to_canvas_properties(derive_test_op) # Ensure default values are parsed correct in the case where type hints are present - assert get_parameter_value("elyra_str_default") == "default" - assert get_parameter_value("elyra_bool_default") is True - assert get_parameter_value("elyra_int_default") == 2 + assert get_parameter_value("str_default") == "default" + assert get_parameter_value("bool_default") is True + assert get_parameter_value("int_default") == 2 # Ensure component parameters are prefixed with 'elyra_' and types are as expected # in the case when a type hint is provided (and regardless of whether or not the # parameter type is included in the docstring) - assert get_parameter_format("elyra_str_no_default") == "string" - assert get_parameter_format("elyra_str_default") == "string" - assert get_parameter_format("elyra_str_optional_default") == "string" - assert get_parameter_format("elyra_str_not_in_docstring") == "string" + assert get_parameter_format("str_no_default") == "string" + assert get_parameter_format("str_default") == "string" + assert get_parameter_format("str_optional_default") == "string" + assert get_parameter_format("str_not_in_docstring") == "string" - assert get_parameter_format("elyra_bool_no_default", "BooleanControl") == "boolean" - assert get_parameter_format("elyra_bool_default", "BooleanControl") == "boolean" - assert get_parameter_format("elyra_bool_not_in_docstring", "BooleanControl") == "boolean" + assert get_parameter_format("bool_no_default", "BooleanControl") == "boolean" + assert get_parameter_format("bool_default", "BooleanControl") == "boolean" + assert get_parameter_format("bool_not_in_docstring", "BooleanControl") == "boolean" - assert get_parameter_format("elyra_int_no_default", "NumberControl") == "number" - assert get_parameter_format("elyra_int_default", "NumberControl") == "number" - assert get_parameter_format("elyra_int_not_in_docstring", "NumberControl") == "number" + assert get_parameter_format("int_no_default", "NumberControl") == "number" + assert get_parameter_format("int_default", "NumberControl") == "number" + assert get_parameter_format("int_not_in_docstring", "NumberControl") == "number" - assert get_parameter_format("elyra_list_optional_default") == "list" + assert get_parameter_format("list_optional_default") == "list" # Ensure component parameters are marked as required in the correct circumstances - assert get_parameter_required("elyra_str_no_default") is True - assert get_parameter_required("elyra_str_default") is False - assert get_parameter_required("elyra_str_optional_default") is False - assert get_parameter_required("elyra_str_not_in_docstring") is True + assert get_parameter_required("str_no_default") is True + assert get_parameter_required("str_default") is False + assert get_parameter_required("str_optional_default") is False + assert get_parameter_required("str_not_in_docstring") is True # Retrieve properties for DeriveFromImportedOperator - # DeriveFromImportedOperator includes type hints for dictionary and + # DeriveFromImportedOperator includes type hints for dictionary and # list values to test the more complex parsing required in this case properties_json = ComponentCache.to_canvas_properties(import_test_op) - # Ensure component parameters are prefixed with 'elyra_' and types are as expected - assert get_parameter_format("elyra_dict_no_default") == "dictionary" - assert get_parameter_format("elyra_dict_optional_no_default") == "dictionary" - assert get_parameter_format("elyra_nested_dict_default") == "dictionary" - assert get_parameter_format("elyra_dict_not_in_docstring") == "dictionary" + # Ensure component parameters are prefixed with '' and types are as expected + assert get_parameter_format("dict_no_default") == "dictionary" + assert get_parameter_format("dict_optional_no_default") == "dictionary" + assert get_parameter_format("nested_dict_default") == "dictionary" + assert get_parameter_format("dict_not_in_docstring") == "dictionary" - assert get_parameter_format("elyra_list_no_default") == "list" - assert get_parameter_format("elyra_list_optional_no_default") == "list" - assert get_parameter_format("elyra_list_default") == "list" - assert get_parameter_format("elyra_list_optional_default") == "list" - assert get_parameter_format("elyra_list_not_in_docstring") == "list" + assert get_parameter_format("list_no_default") == "list" + assert get_parameter_format("list_optional_no_default") == "list" + assert get_parameter_format("list_default") == "list" + assert get_parameter_format("list_optional_default") == "list" + assert get_parameter_format("list_not_in_docstring") == "list" - assert get_parameter_value("elyra_dict_no_default") == "{}" - assert get_parameter_value("elyra_list_no_default") == "[]" + assert get_parameter_value("dict_no_default") == "{}" + assert get_parameter_value("list_no_default") == "[]" def test_parse_airflow_component_url(): @@ -431,15 +442,17 @@ def test_parse_airflow_component_file_no_inputs(): properties_json = ComponentCache.to_canvas_properties(no_input_op) # Properties JSON should only include the two parameters common to every - # component:'label' and 'component_source' - num_common_params = 2 + # component: ('label', 'component_source' and 'mounted_volumes') + num_common_params = 3 assert len(properties_json["current_parameters"].keys()) == num_common_params assert len(properties_json["parameters"]) == num_common_params assert len(properties_json["uihints"]["parameter_info"]) == num_common_params - # Total number of groups includes one for each parameter, plus 1 for the component_source header + # Total number of groups includes one for each parameter, + # plus 1 for the component_source header, + # plus 1 for the 'other properties' header (that includes, e.g., mounted_volumes) # (Airflow does not include an output header since there are no formally defined outputs) - num_groups = num_common_params + 1 + num_groups = num_common_params + 2 assert len(properties_json["uihints"]["group_info"][0]["group_info"]) == num_groups # Ensure that template still renders the two common parameters correctly diff --git a/elyra/tests/pipeline/airflow/test_processor_airflow.py b/elyra/tests/pipeline/airflow/test_processor_airflow.py index caee50659..179dc9e0f 100644 --- a/elyra/tests/pipeline/airflow/test_processor_airflow.py +++ b/elyra/tests/pipeline/airflow/test_processor_airflow.py @@ -654,11 +654,11 @@ def test_same_name_operator_in_pipeline(monkeypatch, processor, catalog_instance assert operation_parameter_str_command == "\"{{ ti.xcom_pull(task_ids='TestOperator_1') }}\"" -def test_scrub_invalid_characters(processor): +def test_scrub_invalid_characters(): invalid_character_list_string = "[-!@#$%^&*(){};:,/<>?|`~=+ ]" valid_character_list_string = list(string.ascii_lowercase + string.ascii_uppercase + string.digits) for character in invalid_character_list_string: - assert processor._scrub_invalid_characters(character) == "_" + assert AirflowPipelineProcessor.scrub_invalid_characters(character) == "_" for character in valid_character_list_string: - assert processor._scrub_invalid_characters(character) == character + assert AirflowPipelineProcessor.scrub_invalid_characters(character) == character diff --git a/elyra/tests/pipeline/kfp/test_component_parser_kfp.py b/elyra/tests/pipeline/kfp/test_component_parser_kfp.py index c15e6cd75..1d49fcdff 100644 --- a/elyra/tests/pipeline/kfp/test_component_parser_kfp.py +++ b/elyra/tests/pipeline/kfp/test_component_parser_kfp.py @@ -295,6 +295,11 @@ def test_parse_kfp_component_file(): "activeControl": "StringControl", } # [] + assert properties_json["current_parameters"]["elyra_mounted_volumes"] == { + "StringControl": "", + "activeControl": "StringControl", + } + # Ensure that the 'required' attribute was set correctly. KFP components default to required # unless explicitly marked otherwise in component YAML. required_property = next( @@ -340,13 +345,6 @@ def test_parse_kfp_component_file(): ) assert unusual_string_property["data"]["controls"]["StringControl"]["format"] == "string" - file_property = next( - prop - for prop in properties_json["uihints"]["parameter_info"] - if prop.get("parameter_ref") == "elyra_test_unusual_type_file" - ) - assert file_property["data"]["format"] == "inputpath" - no_type_property = next( prop for prop in properties_json["uihints"]["parameter_info"] @@ -361,9 +359,6 @@ def test_parse_kfp_component_file(): ) assert unusual_list_property["description"]["default"] == "The test command description (type: An array)" assert unusual_string_property["description"]["default"] == "The test command description (type: A string)" - assert ( - file_property["description"]["default"] == "The test command description" - ) # No data type info is included in parentheses for inputPath variables assert no_type_property["description"]["default"] == "The test command description (type: string)" @@ -435,19 +430,20 @@ def test_parse_kfp_component_file_no_inputs(): component = parser.parse(catalog_entry)[0] properties_json = ComponentCache.to_canvas_properties(component) - # Properties JSON should only include the two parameters common to every - # component:'label' and 'component_source', the component description if - # exists (which it does for this component), and the output parameter for - # this component - num_common_params = 4 + # Properties JSON should only include the three parameters common to every + # component ('label', 'component_source' and 'mounted_volumes'), the component + # description if it exists (which it does for this component), and the output + # parameter for this component + num_common_params = 5 assert len(properties_json["current_parameters"].keys()) == num_common_params assert len(properties_json["parameters"]) == num_common_params assert len(properties_json["uihints"]["parameter_info"]) == num_common_params # Total number of groups includes one for each parameter, - # plus one for the output group header, - # plus 1 for the component_source header - num_groups = num_common_params + 2 + # plus 1 for the output group header, + # plus 1 for the component_source header, + # plus 1 for the 'other properties' header (that includes, e.g., mounted_volumes) + num_groups = num_common_params + 3 assert len(properties_json["uihints"]["group_info"][0]["group_info"]) == num_groups # Ensure that template still renders the two common parameters correctly diff --git a/elyra/tests/pipeline/resources/components/airflow_test_operator.py b/elyra/tests/pipeline/resources/components/airflow_test_operator.py index c1d3c913f..9aa56201f 100644 --- a/elyra/tests/pipeline/resources/components/airflow_test_operator.py +++ b/elyra/tests/pipeline/resources/components/airflow_test_operator.py @@ -57,6 +57,8 @@ class TestOperator(BaseOperator): :param long_description_property: a string parameter with a very long description that wraps lines and also has an escaped underscore in it, as shown here: (\_) # noqa W605 :type long_description_property: str + :param: mounted_volumes: a property with the same name as an Elyra system property + :type: str """ def __init__( @@ -79,6 +81,7 @@ def __init__( unusual_type_list=None, fallback_type=None, long_description_property=None, + mounted_volumes=None, *args, **kwargs, ): diff --git a/elyra/tests/pipeline/resources/components/kfp_test_operator.yaml b/elyra/tests/pipeline/resources/components/kfp_test_operator.yaml index b9c8ea912..6cfae34d3 100644 --- a/elyra/tests/pipeline/resources/components/kfp_test_operator.yaml +++ b/elyra/tests/pipeline/resources/components/kfp_test_operator.yaml @@ -21,8 +21,8 @@ inputs: - {name: test_unusual_type_dict, description: 'The test command description', type: Dictionary of arrays} - {name: test_unusual_type_list, description: 'The test command description', type: An array} - {name: test_unusual_type_string, description: 'The test command description', type: A string} -- {name: test_unusual_type_file, description: 'The test command description', type: Notebook} - {name: test_unusual_type_notgiven, description: 'The test command description'} +- {name: mounted_volumes, description: 'A property with the same name as an Elyra system property', type: String} outputs: - {name: Filtered text} implementation: @@ -32,13 +32,8 @@ implementation: - sh - -ec - | - text_path=$0 - pattern=$1 - filtered_text_path=$2 - mkdir -p "$(dirname "$filtered_text_path")" + test_string_no_default=$0 + mkdir -p "$(dirname "test_string_no_default")" - grep "$pattern" < "$text_path" > "$filtered_text_path" - - {inputPath: Text} - - {inputValue: Pattern} - - {inputPath: test_unusual_type_file} + - {inputValue: test_string_no_default} - {outputPath: Filtered text} diff --git a/elyra/tests/pipeline/resources/sample_pipelines/pipeline_valid_with_pipeline_default.json b/elyra/tests/pipeline/resources/sample_pipelines/pipeline_valid_with_pipeline_default.json index 130e2d2bb..b3afb194f 100644 --- a/elyra/tests/pipeline/resources/sample_pipelines/pipeline_valid_with_pipeline_default.json +++ b/elyra/tests/pipeline/resources/sample_pipelines/pipeline_valid_with_pipeline_default.json @@ -28,11 +28,44 @@ "description": "Notebook file" } } + }, + { + "id": "{{uuid}}", + "type": "execution_node", + "op": "local-file-catalog:8371f5970c7b:TestOperator", + "app_data": { + "label": "{{label}}", + "component_parameters": { + "mounted_volumes": "a component-parsed property" + }, + "component_source": "{{component_source}}", + "ui_data": { + "label": "{{label}}", + "description": "Notebook file" + } + } + }, + { + "id": "{{uuid}}", + "type": "execution_node", + "op": "local-file-catalog:8371f5970c7b:DeriveFromTestOperator", + "app_data": { + "label": "{{label}}", + "component_parameters": { + "mounted_volumes": ["/mnt/vol1=pvc-claim-1"] + }, + "component_source": "{{component_source}}", + "ui_data": { + "label": "{{label}}", + "description": "Notebook file" + } + } } ], "app_data": { "runtime": "{{runtime}}", "runtime_config": "{{runtime-config}}", + "runtime_type": "APACHE_AIRFLOW", "ui_data": { "comments": [] }, @@ -45,7 +78,8 @@ "var1=var_one ", "var2=var2", "var3 =var_three" - ] + ], + "mounted_volumes": ["/mnt/vol2=pvc-claim-2"] }, "runtime": "{{runtime description}}" }, diff --git a/elyra/tests/pipeline/test_pipeline_definition.py b/elyra/tests/pipeline/test_pipeline_definition.py index 4dabd548a..626dee7b9 100644 --- a/elyra/tests/pipeline/test_pipeline_definition.py +++ b/elyra/tests/pipeline/test_pipeline_definition.py @@ -15,12 +15,16 @@ # from unittest import mock +from conftest import AIRFLOW_TEST_OPERATOR_CATALOG import pytest from elyra.pipeline import pipeline_constants from elyra.pipeline.pipeline import KeyValueList +from elyra.pipeline.pipeline import VolumeMount from elyra.pipeline.pipeline_constants import ENV_VARIABLES from elyra.pipeline.pipeline_constants import KUBERNETES_SECRETS +from elyra.pipeline.pipeline_constants import MOUNTED_VOLUMES +from elyra.pipeline.pipeline_definition import Node from elyra.pipeline.pipeline_definition import PipelineDefinition from elyra.tests.pipeline.util import _read_pipeline_resource @@ -122,9 +126,15 @@ def test_convert_kv_properties(monkeypatch): mock_kv_property_list = [pipeline_constants.ENV_VARIABLES, kv_test_property_name] monkeypatch.setattr(PipelineDefinition, "get_kv_properties", mock.Mock(return_value=mock_kv_property_list)) + # Mock set_elyra_properties_to_skip() so that a ComponentCache instance is not created unnecessarily + monkeypatch.setattr(Node, "set_elyra_properties_to_skip", mock.Mock(return_value=None)) + pipeline_definition = PipelineDefinition(pipeline_definition=pipeline_json) - node = pipeline_definition.primary_pipeline.nodes.pop() + node = None + for node in pipeline_definition.pipeline_nodes: + if node.op == "execute-notebook-node": # assign the generic node to the node variable + break pipeline_defaults = pipeline_definition.primary_pipeline.get_property(pipeline_constants.PIPELINE_DEFAULTS) for kv_property in mock_kv_property_list: @@ -149,16 +159,55 @@ def test_propagate_pipeline_default_properties(monkeypatch): mock_kv_property_list = [pipeline_constants.ENV_VARIABLES, kv_test_property_name] monkeypatch.setattr(PipelineDefinition, "get_kv_properties", mock.Mock(return_value=mock_kv_property_list)) + # Mock set_elyra_properties_to_skip() so that a ComponentCache instance is not created unnecessarily + monkeypatch.setattr(Node, "set_elyra_properties_to_skip", mock.Mock(return_value=None)) + pipeline_definition = PipelineDefinition(pipeline_definition=pipeline_json) - node = pipeline_definition.primary_pipeline.nodes.pop() + + node = None + for node in pipeline_definition.pipeline_nodes: + if node.op == "execute-notebook-node": # assign the generic node to the node variable + break assert node.get_component_parameter(pipeline_constants.ENV_VARIABLES) == kv_list_correct assert node.get_component_parameter(kv_test_property_name) == kv_list_correct -def test_remove_env_vars_with_matching_secrets(): +@pytest.mark.parametrize("catalog_instance", [AIRFLOW_TEST_OPERATOR_CATALOG], indirect=True) +def test_property_id_collision_with_system_property(monkeypatch, catalog_instance): pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid_with_pipeline_default.json") pipeline_definition = PipelineDefinition(pipeline_definition=pipeline_json) - node = pipeline_definition.primary_pipeline.nodes.pop() + for node in pipeline_definition.pipeline_nodes: + if node.op.endswith(":DeriveFromTestOperator"): + # DeriveFromTestOperator does not define its own 'mounted_volumes' + # property and should not skip the Elyra 'mounted_volumes' property + assert MOUNTED_VOLUMES not in node.elyra_properties_to_skip + + # Property value should be a combination of the lists given on the + # pipeline node and in the pipeline default properties + assert node.get_component_parameter(MOUNTED_VOLUMES) == [ + VolumeMount(path="/mnt/vol2", pvc_name="pvc-claim-2"), + VolumeMount(path="/mnt/vol1", pvc_name="pvc-claim-1"), + ] + elif node.op.endswith(":TestOperator"): + # TestOperator defines its own 'mounted_volumes' property + # and should skip the Elyra system property of the same name + assert MOUNTED_VOLUMES in node.elyra_properties_to_skip + + # Property value should be as-assigned in pipeline file + assert node.get_component_parameter(MOUNTED_VOLUMES) == "a component-parsed property" + + +def test_remove_env_vars_with_matching_secrets(monkeypatch): + pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid_with_pipeline_default.json") + + # Mock set_elyra_properties_to_skip() so that a ComponentCache instance is not created unnecessarily + monkeypatch.setattr(Node, "set_elyra_properties_to_skip", mock.Mock(return_value=None)) + + pipeline_definition = PipelineDefinition(pipeline_definition=pipeline_json) + node = None + for node in pipeline_definition.pipeline_nodes: + if node.op == "execute-notebook-node": # assign the generic node to the node variable + break # Set kubernetes_secret property to have all the same keys as those in the env_vars property kubernetes_secrets = KeyValueList(["var1=name1:key1", "var2=name2:key2", "var3=name3:key3"])