Skip to content

Commit eba4211

Browse files
committed
Airflow 2.x generic pipeline mods. See also #3166.
- Changed processor airflow and jinja2 template airflow to comply with Airflow 2.x. - Added new location of KubernetesPodOperator library in Airflow 2.x to test Pipeline for processor airflow. - Added cpu and memory limits fields in airflow 2.x fashion as well. - Added documentation related to Airflow 2 support and Elyra 4 for readthedocs. - Also mentioning what does not yet work in Elyra 4, e.g. custom components. - tests_kubernetes.py fixed lint-server E226 missing whitespace around arithmetic operator Signed-off-by: Sven Thoms <[email protected]>
1 parent 1aeb32e commit eba4211

File tree

8 files changed

+69
-41
lines changed

8 files changed

+69
-41
lines changed

docs/source/getting_started/tutorials.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,12 @@ Learn how to [run runtime-specific pipelines on Kubeflow Pipelines](https://gith
3838
#### Run generic pipelines on Apache Airflow
3939

4040
Learn how to [run generic pipelines on Apache Airflow](https://github.com/elyra-ai/examples/tree/main/pipelines/run-generic-pipelines-on-apache-airflow). This tutorial requires an Apache Airflow deployment in a local environment or on the cloud.
41+
If you want to run generic pipelines in Airflow 2.x, you have to use Elyra 4. In Elyra 4, generic pipeline support for Airflow 1 is removed.
4142

4243
#### Run runtime-specific pipelines on Apache Airflow
4344

4445
Learn how to [run runtime-specific pipelines on Apache Airflow](https://github.com/elyra-ai/examples/tree/main/pipelines/run-pipelines-on-apache-airflow). This tutorial requires an Apache Airflow deployment in a local environment or on the cloud.
46+
If you want to run generic components (R, Python, ipynb Notebooks) in runtime-specific pipelines in Airflow 2.x, you have to use Elyra 4. In Elyra 4, generic pipeline support for Airflow 1 is removed and custom Airflow components are not yet supported.
4547

4648

4749
#### Examples

docs/source/recipes/configure-airflow-as-a-runtime.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ Pipelines in Elyra can be run locally in JupyterLab, or remotely on Kubeflow Pip
2424
**Note: Support for Apache Airflow is experimental.**
2525

2626
This document outlines how to set up a new Elyra-enabled Apache Airflow environment or add Elyra support to an existing deployment.
27+
You can submit pipelines with generic components to Airflow 2.x from Elyra 4 on.
28+
Generic components DAG code generation support for Airflow 1.x is removed in Elyra 4.
2729

2830
This guide assumes a general working knowledge of and administration of a Kubernetes cluster.
2931

@@ -42,8 +44,10 @@ AND
4244
OR
4345

4446
- An existing Apache Airflow cluster
45-
- Ensure Apache Airflow is at least v1.10.8 and below v2.0.0. Other versions might work but have not been tested.
47+
- Ensure Apache Airflow is at least v1.10.8 and below v2.0.0. This applies to Elyra < 4.
48+
- Ensure Apache Airflow is at least v2.7.0. This applies to Elyra 4.
4649
- Apache Airflow is configured to use the Kubernetes Executor.
50+
- Apache Airflow must be configured to use git-sync, which is configurable both in [Airflow 1](https://airflow.apache.org/docs/apache-airflow/1.10.12/configurations-ref.html?highlight=git%20sync#git-repo) as well as in [Airflow 2](https://airflow.apache.org/docs/helm-chart/stable/parameters-ref.html#airflow)
4751
- Ensure the KubernetesPodOperator is installed and available in the Apache Airflow deployment
4852

4953
## Setting up a DAG repository on Git

docs/source/user_guide/best-practices-custom-pipeline-components.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ See the [Kubeflow Pipelines documentation](https://www.kubeflow.org/docs/compone
4040
### Apache Airflow components
4141

4242
#### Requirements
43+
Apache Airflow components are currently only supported for Airflow < 2 and in Elyra < 4.
44+
Elyra 4 starts with generic components support (R, Python, ipync Notebooks), not (yet) custom components, for Airflow 2.x.
4345

4446
##### Configure fully qualified package names for custom operator classes
4547

@@ -161,4 +163,4 @@ The missing component definition is stored in a [Machine Learning Exchange](http
161163

162164
#### Component catalogs not listed here
163165

164-
Check the [component catalog connector directory](https://github.com/elyra-ai/examples/blob/main/component-catalog-connectors/connector-directory.md) if the referenced catalog type is not listed here.
166+
Check the [component catalog connector directory](https://github.com/elyra-ai/examples/blob/main/component-catalog-connectors/connector-directory.md) if the referenced catalog type is not listed here.

docs/source/user_guide/pipeline-components.md

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,15 @@ The same pipeline could be implemented using a single component that performs al
2828

2929
#### Generic components
3030

31-
Elyra includes three _generic components_ that allow for the processing of Jupyter notebooks, Python scripts, and R scripts. These components are called generic because they can be included in pipelines for any supported runtime type: local/JupyterLab, Kubeflow Pipelines, and Apache Airflow. Components are exposed in the pipeline editor via the palette.
31+
Elyra includes three _generic components_ that allow for the processing of Jupyter notebooks, Python scripts, and R scripts. These components are called generic because they can be included in pipelines for any supported runtime type: local/JupyterLab, Kubeflow Pipelines, and Apache Airflow 2.x. Components are exposed in the pipeline editor via the palette.
3232

3333
![Generic components in the palette](../images/user_guide/pipeline-components/generic-components-in-palette.png)
3434

3535
Note: Refer to the [_Best practices_ topic in the _User Guide_](best-practices-file-based-nodes.md) to learn more about special considerations for generic components.
3636

3737
#### Custom components
3838

39-
_Custom components_ are commonly only implemented for one runtime type, such as Kubeflow Pipelines or Apache Airflow. (The local runtime type does not support custom components.)
39+
_Custom components_ are commonly only implemented for one runtime type, such as Kubeflow Pipelines or Apache Airflow < 2. (The local runtime type does not support custom components). Custom components, due to their being supported only for Airflow 1.x, are only supported on Elyra < 4.
4040

4141
![Kubeflow components in the palette](../images/user_guide/pipeline-components/custom-kubeflow-components-in-palette.png)
4242

@@ -63,9 +63,9 @@ Elyra includes connectors for the following component catalog types:
6363

6464
Example: A URL component catalog that is configured using the `http://myserver:myport/mypath/my_component.yaml` URL makes the `my_component.yaml` component file available to Elyra.
6565

66-
- [_Apache Airflow package catalogs_](#apache-airflow-package-catalog) provide access to Apache Airflow operators that are stored in Apache Airflow built distributions.
66+
- [_Apache Airflow package catalogs_](#apache-airflow-package-catalog) provide access to Apache Airflow operators that are stored in Apache Airflow built distributions. This is currently only supported for Airflow < 2.
6767

68-
- [_Apache Airflow provider package catalogs_](#apache-airflow-provider-package-catalog) provide access to Apache Airflow operators that are stored in Apache Airflow provider packages.
68+
- [_Apache Airflow provider package catalogs_](#apache-airflow-provider-package-catalog) provide access to Apache Airflow operators that are stored in Apache Airflow provider packages. This is currently only supported for Airflow < 2.
6969

7070
Refer to section [Built-in catalog connector reference](#built-in-catalog-connector-reference) for details about these connectors.
7171

@@ -438,6 +438,7 @@ Examples (CLI):
438438

439439
The [Apache Airflow package catalog connector](https://github.com/elyra-ai/elyra/tree/main/elyra/pipeline/airflow/package_catalog_connector) provides access to operators that are stored in Apache Airflow [built distributions](https://packaging.python.org/en/latest/glossary/#term-built-distribution):
440440
- Only the [wheel distribution format](https://packaging.python.org/en/latest/glossary/#term-Wheel) is supported.
441+
- Only Airflow < 2 is supported. Use of that functionality is not working in Elyra >=4, which is no longer supporting Airflow 1.x.
441442
- The specified URL must be retrievable using an HTTP `GET` request. `http`, `https`, and `file` [URI schemes](https://www.iana.org/assignments/uri-schemes/uri-schemes.xhtml) are supported.
442443
- In secured environments where SSL server authenticity can only be validated using certificates based on private public key infrastructure (PKI) with root and optionally intermediate certificate authorities (CAs) that are not publicly trusted, you must define environment variable `TRUSTED_CA_BUNDLE_PATH` in the environment where JupyterLab/Elyra is running. The variable value must identify an existing [Privacy-Enhanced Mail (PEM) file](https://en.wikipedia.org/wiki/Privacy-Enhanced_Mail).
443444

@@ -454,6 +455,7 @@ Examples:
454455
#### Apache Airflow provider package catalog
455456
The [Apache Airflow provider package catalog connector](https://github.com/elyra-ai/elyra/tree/main/elyra/pipeline/airflow/provider_package_catalog_connector) provides access to operators that are stored in [Apache Airflow provider packages](https://airflow.apache.org/docs/apache-airflow-providers/):
456457
- Only the [wheel distribution format](https://packaging.python.org/en/latest/glossary/#term-Wheel) is supported.
458+
- Only Airflow < 2 and operators for Airflow < 2 are supported. Use of that functionality is not working in Elyra >=4, which is no longer supporting Airflow 1.x.
457459
- The specified URL must be retrievable using an HTTP `GET` request. `http`, `https`, and `file` [URI schemes](https://www.iana.org/assignments/uri-schemes/uri-schemes.xhtml) are supported.
458460
- In secured environments where SSL server authenticity can only be validated using certificates based on private public key infrastructure (PKI) with root and optionally intermediate certificate authorities (CAs) that are not publicly trusted, you must define environment variable `TRUSTED_CA_BUNDLE_PATH` in the environment where JupyterLab/Elyra is running. The variable value must identify an existing [Privacy-Enhanced Mail (PEM) file](https://en.wikipedia.org/wiki/Privacy-Enhanced_Mail).
459461

elyra/pipeline/airflow/processor_airflow.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,7 @@ def _cc_pipeline(self, pipeline: Pipeline, pipeline_name: str, pipeline_instance
345345
"cpu_limit": operation.cpu_limit,
346346
"memory_limit": operation.memory_limit,
347347
"gpu_limit": operation.gpu,
348+
"gpu_vendor": operation.gpu_vendor,
348349
"operator_source": operation.filename,
349350
}
350351

@@ -598,13 +599,23 @@ def render_volumes(self, elyra_properties: Dict[str, ElyraProperty]) -> str:
598599
str_to_render = ""
599600
for v in elyra_properties.get(pipeline_constants.MOUNTED_VOLUMES, []):
600601
str_to_render += f"""
601-
Volume(name="{v.pvc_name}", configs={{"persistentVolumeClaim": {{"claimName": "{v.pvc_name}"}}}}),"""
602+
k8s.V1Volume(
603+
name="{v.pvc_name}",
604+
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(
605+
claim_name="{v.pvc_name}",
606+
),
607+
),"""
602608
# set custom shared memory size
603609
shm = elyra_properties.get(pipeline_constants.KUBERNETES_SHARED_MEM_SIZE)
604610
if shm is not None and shm.size:
605-
config = f"""configs={{"emptyDir": {{"medium": "Memory", "sizeLimit": "{shm.size}{shm.units}"}}}}"""
606611
str_to_render += f"""
607-
Volume(name="shm", {config}),"""
612+
k8s.V1Volume(
613+
name="shm",
614+
empty_dir=k8s.V1EmptyDirVolumeSource(
615+
medium="Memory",
616+
size_limit="{shm.size}{shm.units}",
617+
),
618+
),"""
608619
return dedent(str_to_render)
609620

610621
def render_mounts(self, elyra_properties: Dict[str, ElyraProperty]) -> str:
@@ -615,8 +626,12 @@ def render_mounts(self, elyra_properties: Dict[str, ElyraProperty]) -> str:
615626
str_to_render = ""
616627
for v in elyra_properties.get(pipeline_constants.MOUNTED_VOLUMES, []):
617628
str_to_render += f"""
618-
VolumeMount(name="{v.pvc_name}", mount_path="{v.path}",
619-
sub_path="{v.sub_path}", read_only={v.read_only}),"""
629+
k8s.V1VolumeMount(
630+
name="{v.pvc_name}",
631+
mount_path="{v.path}",
632+
sub_path="{v.sub_path}",
633+
read_only={v.read_only},
634+
),"""
620635
return dedent(str_to_render)
621636

622637
def render_secrets(self, elyra_properties: Dict[str, ElyraProperty], cos_secret: Optional[str]) -> str:

elyra/templates/airflow/airflow_template.jinja2

Lines changed: 29 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
from airflow import DAG
2-
from airflow.utils.dates import days_ago
2+
import pendulum
33

44
args = {
55
'project_id' : '{{ pipeline_name }}',
66
}
77

88
dag = DAG(
9-
'{{ pipeline_name }}',
9+
dag_id='{{ pipeline_name }}',
1010
default_args=args,
11-
schedule_interval='@once',
12-
start_date=days_ago(1),
11+
schedule='@once',
12+
start_date=pendulum.today('UTC').add(days=-1),
1313
description="""
1414
{{ pipeline_description|replace("\"\"\"", "\\\"\\\"\\\"") }}
1515
""",
@@ -22,10 +22,9 @@ dag = DAG(
2222
{{import_statement}}
2323
{% endfor %}
2424
{% else %}
25-
from airflow.kubernetes.secret import Secret
26-
from airflow.contrib.kubernetes.volume import Volume
27-
from airflow.contrib.kubernetes.volume_mount import VolumeMount
28-
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
25+
from airflow.providers.cncf.kubernetes.secret import Secret
26+
from kubernetes.client import models as k8s
27+
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
2928
{% endif %}
3029

3130
{% if operation.operator_source %}# Operator source: {{ operation.operator_source }}{% endif %}
@@ -48,23 +47,27 @@ op_{{ operation.id|regex_replace }} = KubernetesPodOperator(name='{{ operation.n
4847
task_id='{{ operation.notebook|regex_replace }}',
4948
env_vars={{ operation.pipeline_envs }},
5049
{% if operation.cpu_request or operation.mem_request or operation.cpu_limit or operation.memory_limit or operation.gpu_limit %}
51-
resources = {
52-
{% if operation.cpu_request %}
53-
'request_cpu': '{{ operation.cpu_request }}',
54-
{% endif %}
55-
{% if operation.mem_request %}
56-
'request_memory': '{{ operation.mem_request }}G',
57-
{% endif %}
58-
{% if operation.cpu_limit %}
59-
'limit_cpu': '{{ operation.cpu_limit }}',
60-
{% endif %}
61-
{% if operation.memory_limit %}
62-
'limit_memory': '{{ operation.memory_limit }}G',
63-
{% endif %}
64-
{% if operation.gpu_limit %}
65-
'limit_gpu': '{{ operation.gpu_limit }}',
66-
{% endif %}
67-
},
50+
container_resources=k8s.V1ResourceRequirements(
51+
requests={
52+
{% if operation.cpu_request %}
53+
'cpu': '{{ operation.cpu_request }}',
54+
{% endif %}
55+
{% if operation.mem_request %}
56+
'memory': '{{ operation.mem_request }}G',
57+
{% endif %}
58+
},
59+
limits={
60+
{% if operation.cpu_limit %}
61+
'cpu': '{{ operation.cpu_limit }}',
62+
{% endif %}
63+
{% if operation.memory_limit %}
64+
'memory': '{{ operation.memory_limit }}G',
65+
{% endif %}
66+
{% if operation.gpu_limit %}
67+
'{{ operation.gpu_vendor }}': '{{ operation.gpu_limit }}',
68+
{% endif %}
69+
}
70+
),
6871
{% endif %}
6972
volumes=[{{ processor.render_volumes(operation.elyra_props) }}],
7073
volume_mounts=[{{ processor.render_mounts(operation.elyra_props) }}],
@@ -73,7 +76,7 @@ op_{{ operation.id|regex_replace }} = KubernetesPodOperator(name='{{ operation.n
7376
labels={{ processor.render_labels(operation.elyra_props) }},
7477
tolerations=[{{ processor.render_tolerations(operation.elyra_props) }}],
7578
in_cluster={{ in_cluster }},
76-
config_file="{{ kube_config_path }}",
79+
config_file={% if kube_config_path is string %}"{{ kube_config_path }}"{% else %}{{ kube_config_path }}{% endif %},
7780
{% endif %}
7881
dag=dag)
7982
{% if operation.image_pull_policy %}

elyra/tests/pipeline/airflow/test_processor_airflow.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ def test_create_file(monkeypatch, processor, parsed_pipeline, parsed_ordered_dic
195195
with open(response) as f:
196196
file_as_lines = f.read().splitlines()
197197

198-
assert "from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator" in file_as_lines
198+
assert "from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator" in file_as_lines
199199

200200
# Check DAG project name
201201
for i in range(len(file_as_lines)):

elyra/tests/util/test_kubernetes.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ def test_is_valid_label_key_valid_input():
116116
assert is_valid_label_key(key="p/n")
117117
assert is_valid_label_key(key="prefix/you.2")
118118
assert is_valid_label_key(key="how.sad/to-see")
119-
assert is_valid_label_key(key=f"{'d'*253}/{'n'*63}")
119+
assert is_valid_label_key(key=f"{'d' * 253}/{'n' * 63}")
120120

121121

122122
def test_is_valid_label_value_invalid_input():
@@ -175,7 +175,7 @@ def test_is_valid_annotation_key_invalid_input():
175175
assert not is_valid_annotation_key(key="/n") # prefix too short
176176
assert not is_valid_annotation_key(key="p/") # name too short
177177
assert not is_valid_annotation_key(key="a" * 254) # name too long
178-
assert not is_valid_annotation_key(key=f"d/{'b'*64}") # name too long
178+
assert not is_valid_annotation_key(key=f"d/{'b' * 64}") # name too long
179179
# test first character violations (not alphanum)
180180
assert not is_valid_annotation_key(key="-a")
181181
assert not is_valid_annotation_key(key=".b")
@@ -204,7 +204,7 @@ def test_is_valid_annotation_key_valid_input():
204204
assert is_valid_annotation_key(key="p/n")
205205
assert is_valid_annotation_key(key="prefix/you.2")
206206
assert is_valid_annotation_key(key="how.sad/to-see")
207-
assert is_valid_annotation_key(key=f"{'d'*253}/{'n'*63}")
207+
assert is_valid_annotation_key(key=f"{'d' * 253}/{'n' * 63}")
208208

209209

210210
def test_is_valid_annotation_value_invalid_input():

0 commit comments

Comments
 (0)