Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from pathlib import Path

import pytest

from elyra.metadata.metadata import Metadata
Expand Down Expand Up @@ -41,6 +43,15 @@
},
"schema_name": "url-catalog",
}
AIRFLOW_TEST_OPERATOR_CATALOG = {
"display_name": "Airflow Test Operator",
"metadata": {
"runtime_type": "APACHE_AIRFLOW",
"base_path": str(Path(__file__).parent / "elyra" / "tests" / "pipeline" / "resources" / "components"),
"paths": ["airflow_test_operator.py"],
},
"schema_name": "local-file-catalog",
}


@pytest.fixture
Expand Down
4 changes: 2 additions & 2 deletions elyra/pipeline/airflow/processor_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,8 +396,6 @@ def _cc_pipeline(self, pipeline: Pipeline, pipeline_name: str, pipeline_instance
)
processed_value = "\"{{ ti.xcom_pull(task_ids='" + parent_node_name + "') }}\""
operation.component_params[component_property.ref] = processed_value
elif component_property.data_type == "boolean":
operation.component_params[component_property.ref] = property_value
elif component_property.data_type == "string":
# Add surrounding quotation marks to string value for correct rendering
# in jinja DAG template
Expand All @@ -408,6 +406,8 @@ def _cc_pipeline(self, pipeline: Pipeline, pipeline_name: str, pipeline_instance
elif component_property.data_type == "list":
processed_value = self._process_list_value(property_value)
operation.component_params[component_property.ref] = processed_value
else: # booleans and numbers can be rendered as-is
operation.component_params[component_property.ref] = property_value

# Remove inputs and outputs from params dict until support for data exchange is provided
operation.component_params_as_dict.pop("inputs")
Expand Down
13 changes: 7 additions & 6 deletions elyra/tests/pipeline/airflow/test_component_parser_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ async def test_directory_based_component_catalog(component_cache, metadata_manag

# Get new set of components from all active registries, including added test registry
components_after_create = component_cache.get_all_components(RUNTIME_PROCESSOR)
assert len(components_after_create) == len(initial_components) + 6
assert len(components_after_create) == len(initial_components) + 5

# Check that all relevant components from the new registry have been added
added_component_names = [component.name for component in components_after_create]
Expand Down Expand Up @@ -371,7 +371,7 @@ def test_parse_airflow_component_url():
reader = UrlComponentCatalogConnector(airflow_supported_file_types)

# Read contents of given path
url = "https://raw.githubusercontent.com/apache/airflow/1.10.15/airflow/operators/bash_operator.py" # noqa: E501
url = "https://raw.githubusercontent.com/elyra-ai/elyra/main/elyra/tests/pipeline/resources/components/airflow_test_operator.py" # noqa: E501
catalog_entry_data = {"url": url}

# Construct a catalog instance
Expand Down Expand Up @@ -399,10 +399,11 @@ def get_parameter(param_name):

component_source = json.dumps({"catalog_type": catalog_type, "component_ref": catalog_entry.entry_reference})
assert properties_json["current_parameters"]["component_source"] == component_source
assert get_parameter("elyra_bash_command") == ""
assert get_parameter("elyra_xcom_push") is True
assert get_parameter("elyra_env") == "{}" # {}
assert get_parameter("elyra_output_encoding") == "utf-8"
assert get_parameter("elyra_str_no_default") == ""
assert get_parameter("elyra_bool_default_true") is True
assert get_parameter("elyra_int_default_non_zero") == 2
assert get_parameter("elyra_unusual_type_dict") == "{}" # {}
assert get_parameter("elyra_unusual_type_list") == "[]"


def test_parse_airflow_component_file_no_inputs():
Expand Down
43 changes: 32 additions & 11 deletions elyra/tests/pipeline/airflow/test_processor_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from types import SimpleNamespace
from unittest import mock

from conftest import AIRFLOW_COMPONENT_CACHE_INSTANCE
from conftest import AIRFLOW_TEST_OPERATOR_CATALOG
import github
import pytest

Expand All @@ -38,8 +38,12 @@


@pytest.fixture
def processor(setup_factory_data):
def processor(monkeypatch, setup_factory_data):
processor = AirflowPipelineProcessor(os.getcwd())

# Add spoofed TestOperator to class import map
class_import_map = {"TestOperator": "from airflow.operators.test_operator import TestOperator"}
monkeypatch.setattr(processor, "class_import_map", class_import_map)
return processor


Expand Down Expand Up @@ -240,9 +244,14 @@ def test_create_file(monkeypatch, processor, parsed_pipeline, parsed_ordered_dic


@pytest.mark.parametrize("parsed_pipeline", [PIPELINE_FILE_CUSTOM_COMPONENTS], indirect=True)
@pytest.mark.parametrize("catalog_instance", [AIRFLOW_COMPONENT_CACHE_INSTANCE], indirect=True)
@pytest.mark.parametrize("catalog_instance", [AIRFLOW_TEST_OPERATOR_CATALOG], indirect=True)
def test_create_file_custom_components(
monkeypatch, processor, catalog_instance, component_cache, parsed_pipeline, parsed_ordered_dict, sample_metadata
monkeypatch,
processor,
catalog_instance,
parsed_pipeline,
parsed_ordered_dict,
sample_metadata,
):
pipeline_json = _read_pipeline_resource(PIPELINE_FILE_CUSTOM_COMPONENTS)

Expand Down Expand Up @@ -306,6 +315,18 @@ def test_create_file_custom_components(
parameter_clause = i + 1
assert len(list(filter(r.match, file_as_lines[parameter_clause:]))) > 0

# Test that parameter value processing proceeded as expected for each data type
op_id = "bb9606ca-29ec-4133-a36a-67bd2a1f6dc3"
op_params = parsed_ordered_dict[op_id].get("component_params", {})
expected_params = {
"str_no_default": "\"echo 'test one'\"",
"bool_no_default": True,
"unusual_type_list": [1, 2],
"unusual_type_dict": {},
"int_default_non_zero": 2,
}
assert op_params == expected_params


@pytest.mark.parametrize("parsed_pipeline", [PIPELINE_FILE_COMPLEX], indirect=True)
def test_export_overwrite(monkeypatch, processor, parsed_pipeline):
Expand Down Expand Up @@ -604,7 +625,7 @@ def test_process_dictionary_value_function(processor):
@pytest.mark.parametrize(
"parsed_pipeline", ["resources/validation_pipelines/aa_operator_same_name.json"], indirect=True
)
@pytest.mark.parametrize("catalog_instance", [AIRFLOW_COMPONENT_CACHE_INSTANCE], indirect=True)
@pytest.mark.parametrize("catalog_instance", [AIRFLOW_TEST_OPERATOR_CATALOG], indirect=True)
def test_same_name_operator_in_pipeline(monkeypatch, processor, catalog_instance, parsed_pipeline, sample_metadata):
task_id = "e3922a29-f4c0-43d9-8d8b-4509aab80032"
upstream_task_id = "0eb57369-99d1-4cd0-a205-8d8d96af3ad4"
Expand All @@ -618,19 +639,19 @@ def test_same_name_operator_in_pipeline(monkeypatch, processor, catalog_instance

pipeline_def_operation = parsed_pipeline.operations[task_id]
pipeline_def_operation_parameters = pipeline_def_operation.component_params_as_dict
pipeline_def_operation_bash_param = pipeline_def_operation_parameters["bash_command"]
pipeline_def_operation_str_param = pipeline_def_operation_parameters["str_no_default"]

assert pipeline_def_operation_bash_param["activeControl"] == "NestedEnumControl"
assert set(pipeline_def_operation_bash_param["NestedEnumControl"].keys()) == {"value", "option"}
assert pipeline_def_operation_bash_param["NestedEnumControl"]["value"] == upstream_task_id
assert pipeline_def_operation_str_param["activeControl"] == "NestedEnumControl"
assert set(pipeline_def_operation_str_param["NestedEnumControl"].keys()) == {"value", "option"}
assert pipeline_def_operation_str_param["NestedEnumControl"]["value"] == upstream_task_id

ordered_operations = processor._cc_pipeline(
parsed_pipeline, pipeline_name="some-name", pipeline_instance_id="some-instance-name"
)
operation_parameters = ordered_operations[task_id]["component_params"]
operation_parameter_bash_command = operation_parameters["bash_command"]
operation_parameter_str_command = operation_parameters["str_no_default"]

assert operation_parameter_bash_command == "\"{{ ti.xcom_pull(task_ids='BashOperator_1') }}\""
assert operation_parameter_str_command == "\"{{ ti.xcom_pull(task_ids='TestOperator_1') }}\""


def test_scrub_invalid_characters(processor):
Expand Down
164 changes: 0 additions & 164 deletions elyra/tests/pipeline/resources/components/bash_operator.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,29 @@
{
"id": "bb9606ca-29ec-4133-a36a-67bd2a1f6dc3",
"type": "execution_node",
"op": "url-catalog:6571bc331575",
"op": "local-file-catalog:8371f5970c7b:TestOperator",
"app_data": {
"label": "b",
"component_parameters": {
"bash_command": {
"str_no_default": {
"activeControl": "StringControl",
"StringControl": "echo hello"
"StringControl": "echo 'test one'"
},
"xcom_push": {
"bool_no_default": {
"activeControl": "BooleanControl",
"BooleanControl": true
},
"env": {
"unusual_type_dict": {
"activeControl": "StringControl",
"StringControl": ""
"StringControl": "{}"
},
"output_encoding": {
"unusual_type_list": {
"activeControl": "StringControl",
"StringControl": "utf-8"
"StringControl": "[1, 2]"
},
"int_default_non_zero": {
"activeControl": "NumberControl",
"NumberControl": 2
}
},
"ui_data": {
Expand Down

Large diffs are not rendered by default.

Loading