Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
22f4e75
Update processing to handle mounted_volumes w/i component params stanza
kiersten-stokes Jun 24, 2022
3f1125e
Update properties template to reflect data volumes for custom components
kiersten-stokes Jun 24, 2022
ab3921c
Update KFP processing to handle volumes for custom components
kiersten-stokes Jun 24, 2022
be15f74
Refactor airflow DAG template for readability; add volume processing
kiersten-stokes Jun 24, 2022
d5d1ddf
Update server tests
kiersten-stokes Jun 24, 2022
9607670
Fix Codescan warning
kiersten-stokes Jun 24, 2022
05393f7
Ensure volumes is not None in Operation constructor
kiersten-stokes Jun 27, 2022
1fa45b0
Add server test for property collision
kiersten-stokes Jun 28, 2022
a859e53
Fix import statements
kiersten-stokes Jun 29, 2022
825490b
Remove accidental print statement
kiersten-stokes Jun 29, 2022
4cd5b57
Revert unintended change to docs/requirements.txt
kiersten-stokes Jun 29, 2022
6ae989e
Move new catalog instance to conftest
kiersten-stokes Jun 29, 2022
5a8f6c7
Remove last instance of AIRFLOW_COMPONENT_CACHE_INSTANCE
kiersten-stokes Jul 1, 2022
aec8eb4
Merge branch 'main' into custom-component-volumes-2
kiersten-stokes Jul 1, 2022
f8b34d3
Merge branch 'test-fix' into custom-component-volumes-2
kiersten-stokes Jul 1, 2022
433e57a
Add volumes to custom component 'describe'
kiersten-stokes Jul 1, 2022
3ed7b89
Add validation to custom component volumes
kiersten-stokes Jul 1, 2022
70cdd52
Adjust documentation to reflect volumes for custom nodes
kiersten-stokes Jul 1, 2022
95443dc
Remove 'system-owned' from documentation
kiersten-stokes Jul 1, 2022
64883bb
Create Secret object in DAG instead of KubernetesSecret
kiersten-stokes Jul 1, 2022
9c54ab2
Change var names in airflow DAG
kiersten-stokes Jul 1, 2022
883a9cd
Add disclaimer re:Airflow operator volumes
kiersten-stokes Jul 1, 2022
6ac07dd
Update DAG to add executor_config
kiersten-stokes Jul 5, 2022
808f0c4
Fix lint
kiersten-stokes Jul 5, 2022
9be0edd
Change executor_config format
kiersten-stokes Jul 6, 2022
eae03f7
Add pvc name
kiersten-stokes Jul 6, 2022
f674cff
Fix sentence re: volumes that references generic nodes only
kiersten-stokes Jul 6, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,6 @@
"schema_name": "elyra-kfp-examples-catalog",
}

AIRFLOW_COMPONENT_CACHE_INSTANCE = {
"display_name": "Airflow Example Components",
"metadata": {
"runtime_type": "APACHE_AIRFLOW",
"categories": ["examples"],
"paths": [
"https://raw.githubusercontent.com/elyra-ai/elyra/main/elyra/tests/pipeline/resources/components/bash_operator.py" # noqa
],
},
"schema_name": "url-catalog",
}
AIRFLOW_TEST_OPERATOR_CATALOG = {
"display_name": "Airflow Test Operator",
"metadata": {
Expand Down
Binary file modified docs/source/images/user_guide/pipelines/pipeline-properties.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
23 changes: 13 additions & 10 deletions docs/source/user_guide/pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ The [tutorials](/getting_started/tutorials.md) provide comprehensive step-by-ste

Pipeline properties include:
- An optional description, summarizing the pipeline purpose.
- Properties that appy to every pipeline node (both generic and custom)
- **Data volumes**
- A list of [Persistent Volume Claims](https://kubernetes.io/docs/concepts/storage/persistent-volumes/) (PVC) to be mounted into the container that executes the component. Format: `/mnt/path=existing-pvc-name`. Entries that are empty (`/mnt/path=`) or malformed are ignored. Entries with a PVC name considered to be an [invalid Kubernetes resource name](https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names) will raise a validation error after pipeline submission or export.
- The referenced PVCs must exist in the Kubernetes namespace where the generic pipeline nodes are executed.
- Data volumes are not mounted when the pipeline is executed locally.
- Properties that apply to every generic pipeline node. In this release the following properties are supported:
- **Object storage path prefix**. Elyra stores pipeline input and output artifacts in a cloud object storage bucket. By default these artifacts are located in the `/<pipeline-instance-name>` path. The example below depicts the artifact location for several pipelines in the `pipeline-examples` bucket:
![artifacts default storage layout on object storage](../images/user_guide/pipelines/node-artifacts-on-object-storage.png)
Expand All @@ -87,10 +92,6 @@ The [tutorials](/getting_started/tutorials.md) provide comprehensive step-by-ste
- The value is ignored when the pipeline is executed locally.
- **Environment variables**
- A list of environment variables to be set in the container that executes the Jupyter notebook or script. Format: `ENV_VAR_NAME=value`. Entries that are empty (`ENV_VAR_NAME=`) or malformed are ignored.
- **Data volumes**
- A list of [Persistent Volume Claims](https://kubernetes.io/docs/concepts/storage/persistent-volumes/) (PVC) to be mounted into the container that executes the Jupyter notebook or script. Format: `/mnt/path=existing-pvc-name`. Entries that are empty (`/mnt/path=`) or malformed are ignored. Entries with a PVC name considered to be an [invalid Kubernetes resource name](https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names) will raise a validation error after pipeline submission or export.
- The referenced PVCs must exist in the Kubernetes namespace where the generic pipeline nodes are executed.
- Data volumes are not mounted when the pipeline is executed locally.
- **Kubernetes secrets**
- A list of [Kubernetes Secrets](https://kubernetes.io/docs/concepts/configuration/secret/) to be accessed as environment variables during Jupyter notebook or script execution. Format: `ENV_VAR=secret-name:secret-key`. Entries that are empty (`ENV_VAR=`) or malformed are ignored. Entries with a secret name considered to be an [invalid Kubernetes resource name](https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names) or with [an invalid secret key](https://kubernetes.io/docs/concepts/configuration/secret/#restriction-names-data) will raise a validation error after pipeline submission or export.
- The referenced secrets must exist in the Kubernetes namespace where the generic pipeline nodes are executed.
Expand Down Expand Up @@ -140,15 +141,17 @@ The [tutorials](/getting_started/tutorials.md) provide comprehensive step-by-ste
- Optional. A list of files generated by the notebook inside the image to be passed as inputs to the next step of the pipeline. Specify one file, directory, or expression per line. Supported patterns are `*` and `?`.
- Example: `data/*.csv`

**Data Volumes**
- Optional. A list of [Persistent Volume Claims](https://kubernetes.io/docs/concepts/storage/persistent-volumes/) (PVC) to be mounted into the container that executes the Jupyter notebook or script. Format: `/mnt/path=existing-pvc-name`. Entries that are empty (`/mnt/path=`) or malformed are ignored. Entries with a PVC name considered to be an [invalid Kubernetes resource name](https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names) will raise a validation error after pipeline submission or export. The referenced PVCs must exist in the Kubernetes namespace where the node is executed.
- Data volumes are not mounted when the pipeline is executed locally.
- Example: `/mnt/vol1=data-pvc`

**Kubernetes Secrets**
- Optional. A list of [Kubernetes Secrets](https://kubernetes.io/docs/concepts/configuration/secret/) to be accessed as environment variables during Jupyter notebook or script execution. Format: `ENV_VAR=secret-name:secret-key`. Entries that are empty (`ENV_VAR=`) or malformed are ignored. Entries with a secret name considered to be an [invalid Kubernetes resource name](https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names) or with [an invalid secret key](https://kubernetes.io/docs/concepts/configuration/secret/#restriction-names-data) will raise a validation error after pipeline submission or export. The referenced secrets must exist in the Kubernetes namespace where the generic pipeline nodes are executed.
- Secrets are ignored when the pipeline is executed locally. For remote execution, if an environment variable was assigned both a static value (via the 'Environment Variables' property) and a Kubernetes secret value, the secret's value is used.
- Example: `ENV_VAR=secret-name:secret-key`

Both generic and [custom components](pipeline-components.html#custom-components) include the following system-owned property:

**Data Volumes**
- Optional. A list of [Persistent Volume Claims](https://kubernetes.io/docs/concepts/storage/persistent-volumes/) (PVC) to be mounted into the container that executes the component and allows for data exchange between components. Format: `/mnt/path=existing-pvc-name`. Entries that are empty (`/mnt/path=`) or malformed are ignored. Entries with a PVC name considered to be an [invalid Kubernetes resource name](https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names) will raise a validation error after pipeline submission or export. The referenced PVCs must exist in the Kubernetes namespace where the node is executed.
- Data volumes are not mounted when the pipeline is executed locally.
- Example: `/mnt/vol1=data-pvc`

5. Associate each node with a comment to document its purpose.

Expand Down Expand Up @@ -262,7 +265,7 @@ The pipeline dependencies output includes:
- Python or R scripts
- Local files that the notebooks or scripts require
- Custom components
- Data volumes that generic nodes are mounting
- Data volumes that custom and generic nodes are mounting
- Container images that generic nodes are using to run notebooks or scripts
- Kubernetes secrets that generic nodes are exposing as environment variables to notebooks or scripts

Expand Down
7 changes: 3 additions & 4 deletions elyra/cli/pipeline_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,9 @@ def describe(json_option, pipeline_path):
for node in primary_pipeline.nodes:
# update describe_dict stats that take into account every operation
# (... there are none today)
# volumes
for vm in node.get_component_parameter(pipeline_constants.MOUNTED_VOLUMES, []):
describe_dict["volume_dependencies"]["value"].add(vm.pvc_name)

if Operation.is_generic_operation(node.op):
# update stats that are specific to generic components
Expand All @@ -611,10 +614,6 @@ def describe(json_option, pipeline_path):
node.get_component_parameter(pipeline_constants.RUNTIME_IMAGE)
)

# volumes
for vm in node.get_component_parameter(pipeline_constants.MOUNTED_VOLUMES, []):
describe_dict["volume_dependencies"]["value"].add(vm.pvc_name)

# Kubernetes secrets
for ks in node.get_component_parameter(pipeline_constants.KUBERNETES_SECRETS, []):
describe_dict["kubernetes_secret_dependencies"]["value"].add(ks.name)
Expand Down
131 changes: 118 additions & 13 deletions elyra/pipeline/airflow/processor_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import re
import string
import tempfile
from textwrap import dedent
import time
from typing import Dict
from typing import List
Expand All @@ -41,8 +42,6 @@
from elyra.pipeline.pipeline import Operation
from elyra.pipeline.pipeline import Pipeline
from elyra.pipeline.pipeline_constants import COS_OBJECT_PREFIX
from elyra.pipeline.pipeline_constants import KUBERNETES_SECRETS
from elyra.pipeline.pipeline_constants import MOUNTED_VOLUMES
from elyra.pipeline.processor import PipelineProcessor
from elyra.pipeline.processor import PipelineProcessorResponse
from elyra.pipeline.processor import RuntimePipelineProcessor
Expand Down Expand Up @@ -330,11 +329,11 @@ def _cc_pipeline(self, pipeline: Pipeline, pipeline_name: str, pipeline_instance
"cpu_request": operation.cpu,
"mem_request": operation.memory,
"gpu_limit": operation.gpu,
"operator_source": operation.component_params["filename"],
"is_generic_operator": True,
"operator_source": operation.filename,
"is_generic_operator": operation.is_generic,
"doc": operation.doc,
"volume_mounts": operation.component_params.get(MOUNTED_VOLUMES, []),
"kubernetes_secrets": operation.component_params.get(KUBERNETES_SECRETS, []),
"volumes": operation.mounted_volumes,
"secrets": operation.kubernetes_secrets,
}

if runtime_image_pull_secret is not None:
Expand Down Expand Up @@ -445,8 +444,9 @@ def _cc_pipeline(self, pipeline: Pipeline, pipeline_name: str, pipeline_instance
"parent_operation_ids": operation.parent_operation_ids,
"component_params": operation.component_params_as_dict,
"operator_source": component.component_source,
"is_generic_operator": False,
"is_generic_operator": operation.is_generic,
"doc": operation.doc,
"volumes": operation.mounted_volumes,
}

target_ops.append(target_op)
Expand Down Expand Up @@ -490,11 +490,18 @@ def create_pipeline_file(
loader = PackageLoader("elyra", "templates/airflow")
template_env = Environment(loader=loader)

template_env.filters["regex_replace"] = lambda string: self._scrub_invalid_characters(string)

template_env.filters["regex_replace"] = lambda x: AirflowPipelineProcessor.scrub_invalid_characters(x)
template = template_env.get_template("airflow_template.jinja2")

target_ops = self._cc_pipeline(pipeline, pipeline_name, pipeline_instance_id)
# Pass functions used to render data volumes and secrets to the template env
rendering_functions = {
"render_volumes_for_op": AirflowPipelineProcessor.render_volumes_for_op,
"render_secrets_for_op": AirflowPipelineProcessor.render_secrets_for_op,
"render_secrets_for_cos": AirflowPipelineProcessor.render_secrets_for_cos,
}
template.globals.update(rendering_functions)

ordered_ops = self._cc_pipeline(pipeline, pipeline_name, pipeline_instance_id)
runtime_configuration = self._get_metadata_configuration(
schemaspace=Runtimes.RUNTIMES_SCHEMASPACE_ID, name=pipeline.runtime_config
)
Expand All @@ -506,7 +513,7 @@ def create_pipeline_file(
pipeline_description = f"Created with Elyra {__version__} pipeline editor using `{pipeline.source}`."

python_output = template.render(
operations_list=target_ops,
operations_list=ordered_ops,
pipeline_name=pipeline_instance_id,
user_namespace=user_namespace,
cos_secret=cos_secret,
Expand Down Expand Up @@ -544,11 +551,12 @@ def _create_unique_node_names(self, operation_list: List[Operation]) -> List[Ope

def _scrub_invalid_characters_from_list(self, operation_list: List[Operation]) -> List[Operation]:
for operation in operation_list:
operation.name = self._scrub_invalid_characters(operation.name)
operation.name = AirflowPipelineProcessor.scrub_invalid_characters(operation.name)

return operation_list

def _scrub_invalid_characters(self, name: str) -> str:
@staticmethod
def scrub_invalid_characters(name: str) -> str:
chars = re.escape(string.punctuation)
clean_name = re.sub(r"[" + chars + "\\s]", "_", name) # noqa E226
return clean_name
Expand Down Expand Up @@ -581,6 +589,103 @@ def _get_node_name(self, operations_list: list, node_id: str) -> Optional[str]:
return operation["notebook"]
return None

@staticmethod
def render_volumes_for_op(op: Dict) -> str:
"""
Render any data volumes defined for the specified op for use in
the Airflow DAG template

:returns: a string literal containing the python code to be rendered in the DAG
"""
if not op.get("volumes"):
return ""
op["volume_vars"] = [] # store variable names in op's dict for template to access

# Include import statements and comment
str_to_render = f"""
from airflow.contrib.kubernetes.volume import Volume
from airflow.contrib.kubernetes.volume_mount import VolumeMount
# Volumes for operation '{op['id']}'"""
for idx, volume in enumerate(op.get("volumes", [])):
var_name = AirflowPipelineProcessor.scrub_invalid_characters(f"volume_{op['id']}_{idx}")

# Define VolumeMount and Volume objects
str_to_render += f"""
mount_{var_name} = VolumeMount(
name='{volume.pvc_name}',
mount_path='{volume.path}',
sub_path=None,
read_only=False
)
{var_name} = Volume(
name='{volume.pvc_name}', configs={{"persistentVolumeClaim": {{"claimName": "{volume.pvc_name}"}}}}
)
"""

op["volume_vars"].append(var_name)

op["volume_mount_vars"] = [f"mount_{volume_var}" for volume_var in op["volume_vars"]]
return dedent(str_to_render)

@staticmethod
def render_secrets_for_cos(cos_secret: str):
"""
Render the Kubernetes secrets required for COS

:returns: a string literal of the python code to be rendered in the DAG
"""
if not cos_secret:
return ""

return dedent(
f"""
from airflow.kubernetes.secret import Secret
## Ensure that the secret '{cos_secret}' is defined in the Kubernetes namespace where the pipeline is run
env_var_secret_id = KubernetesSecret(
env_var="AWS_ACCESS_KEY_ID",
name='{cos_secret}',
key="AWS_ACCESS_KEY_ID",
)
env_var_secret_key = KubernetesSecret(
env_var="AWS_SECRET_ACCESS_KEY",
name='{cos_secret}',
key="AWS_SECRET_ACCESS_KEY",
)
"""
)

@staticmethod
def render_secrets_for_op(op: Dict) -> str:
"""
Render any Kubernetes secrets defined for the specified op for use in
the Airflow DAG template

:returns: a string literal containing the python code to be rendered in the DAG
"""
if not op.get("secrets"):
return ""
op["secret_vars"] = [] # store variable names in op's dict for template to access

# Include import statement and comment
str_to_render = f"""
from airflow.kubernetes.secret import Secret
# Secrets for operation '{op['id']}'"""
for idx, secret in enumerate(op.get("secrets", [])):
var_name = AirflowPipelineProcessor.scrub_invalid_characters(f"secret_{op['id']}_{idx}")

# Define Secret object
str_to_render += f"""
{var_name} = Secret(
deploy_type='env',
deploy_target="{secret.env_var}",
secret="{secret.name}",
key="{secret.key}",
)
"""

op["secret_vars"].append(var_name)
return dedent(str_to_render)


class AirflowPipelineProcessorResponse(PipelineProcessorResponse):

Expand Down
12 changes: 11 additions & 1 deletion elyra/pipeline/component_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from elyra.pipeline.component import Component
from elyra.pipeline.component import ComponentParser
from elyra.pipeline.component_metadata import ComponentCatalogMetadata
from elyra.pipeline.pipeline_constants import ELYRA_COMPONENT_PROPERTIES
from elyra.pipeline.runtime_type import RuntimeProcessorType

BLOCKING_TIMEOUT = 0.5
Expand Down Expand Up @@ -667,12 +668,21 @@ def to_canvas_properties(component: Component) -> Dict:
If component_id is one of the generic set, generic template is rendered,
otherwise, the runtime-specific property template is rendered
"""
kwargs = {}
if ComponentCache.get_generic_component(component.id) is not None:
template = ComponentCache.load_jinja_template("generic_properties_template.jinja2")
else:
# Determine which component properties parsed from the definition
# collide with Elyra-defined properties (in the case of a collision,
# only the parsed property will be displayed)
kwargs = {"elyra_property_collisions_list": []}
for param in component.properties:
if param.ref in ELYRA_COMPONENT_PROPERTIES:
kwargs["elyra_property_collisions_list"].append(param.ref)

template = ComponentCache.load_jinja_template("canvas_properties_template.jinja2")

canvas_properties = template.render(component=component)
canvas_properties = template.render(component=component, **kwargs)
return json.loads(canvas_properties)


Expand Down
Loading