From e065e0f51591d818bbc8f8920c78a1c9769d3f91 Mon Sep 17 00:00:00 2001 From: sukaryo-heilscher Date: Fri, 14 Nov 2025 21:34:15 -1000 Subject: [PATCH] Fixed bug of some recipes not working by adding sanitization to AirflowTranslator. (Specifically it's use of task_ids as variable names in the generated DAG.) --- wfcommons/wfbench/translator/airflow.py | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/wfcommons/wfbench/translator/airflow.py b/wfcommons/wfbench/translator/airflow.py index f6983cc8..e8c9d9db 100644 --- a/wfcommons/wfbench/translator/airflow.py +++ b/wfcommons/wfbench/translator/airflow.py @@ -36,6 +36,8 @@ def __init__(self, """Create an object of the translator.""" super().__init__(workflow, logger) + self.sanitized_names = {} + self.seq_num = 0 self.script = f""" from __future__ import annotations @@ -73,7 +75,7 @@ def translate(self, output_folder: pathlib.Path, name: Optional[str] = None) -> for task in self.tasks.values(): self.script += f""" - {task.task_id} = BashOperator( + {self._sanitize_varname(task.task_id)} = BashOperator( task_id="{task.task_id}", depends_on_past=False, bash_command='{self.task_commands[task.task_id]}', @@ -82,10 +84,11 @@ def translate(self, output_folder: pathlib.Path, name: Optional[str] = None) -> ) """ for task in self.tasks.values(): - parents = ", ".join(self.task_parents[task.task_id]) + # Comma-separated list of the task's parents + parents = ", ".join(map(self._sanitize_varname, self.task_parents[task.task_id])) if parents: self.script += f""" - [{parents}] >> {task.task_id} + [{parents}] >> {self._sanitize_varname(task.task_id)} """ # write benchmark files output_folder.mkdir(parents=True) @@ -99,6 +102,20 @@ def translate(self, output_folder: pathlib.Path, name: Optional[str] = None) -> # Create the README file self._write_readme_file(output_folder) + def _sanitize_varname(self, name: str) -> str: + """ + Sanitizes string into a valid variable name. + + :param name: The name to sanitize. + :type name: str + """ + if name not in self.sanitized_names: + sanitized_name = '_' + re.sub(r'[^\w]', '_', name) + str(self.seq_num) + self.seq_num += 1 + self.sanitized_names[name] = sanitized_name + + return self.sanitized_names[name] + def _prep_commands(self, output_folder: pathlib.Path) -> None: """ Prepares the bash_command strings for the BashOperators.