Skip to content

Conversation

@treff7es
Copy link
Contributor

@treff7es treff7es commented Jun 17, 2025

Summary

Adds comprehensive support for Apache Airflow 3.x with a clean, version-specific architecture that eliminates complex compatibility shims and provides robust, maintainable code for both Airflow 2.x
and 3.x.

🏗️ Architecture: Separate Implementations for Clean Type Safety

The plugin now has separate implementations for Airflow 2.x and 3.x (instead of version conditionals scattered throughout code):

  • Airflow 2.x: plugin_v2/datahub_listener.py - Uses openlineage-airflow package with operator-specific extractors
  • Airflow 3.x: plugin_v2_airflow3/datahub_listener.py - Uses native OpenLineage provider with unified SQLParser patch

Benefits:

  • ✅ Type Safety - Each implementation is properly typed against its specific Airflow API
  • ✅ Maintainability - No complex version conditionals scattered throughout
  • ✅ Clarity - Clear separation of version-specific logic
  • ✅ Better Lineage - Unified SQLParser patch provides consistent column-level lineage across ALL SQL operators in Airflow 3.x

🚀 Key Technical Improvements

  1. SDK Connection API (Airflow 3.x)
  • Uses airflow.sdk.Connection.get() instead of deprecated airflow.models.Connection.get_connection_from_secrets()
  • Removes deprecation warnings in Airflow 3.x
  • Works in all contexts (listener hooks, task execution, DAG parsing) without SUPERVISOR_COMMS dependency
  1. Robust URL Parsing
  • Uses Python's urllib.parse for host/port extraction instead of manual string manipulation
  • Handles edge cases automatically (IPv6, paths, query strings)
  • More reliable and less error-prone
  1. Better Log Levels
  • Transient emitter creation failures log as DEBUG (not WARNING)
  • Reduces log noise while keeping actionable warnings at call sites
  • Cleaner production logs
  1. Unified SQL Lineage (Airflow 3.x)
  • Single SQLParser patch replaces multiple operator-specific extractors
  • Consistent column-level lineage across ALL SQL operators (Snowflake, BigQuery, Postgres, etc.)
  • Easier maintenance - one integration point instead of many extractors

📦 Installation (IMPORTANT)

Users MUST specify the appropriate extra when installing:

For Airflow 2.x (2.7+)

pip install 'acryl-datahub-airflow-plugin[plugin-v2]'

For Airflow 3.x (3.0+)

pip install 'acryl-datahub-airflow-plugin[plugin-v2-airflow3]'

For Airflow 3.0.x specifically (pydantic issue)

pip install 'acryl-datahub-airflow-plugin[plugin-v2-airflow3]' 'pydantic>=2.11.8'

Why different extras? Airflow 2.x and 3.x have different OpenLineage dependencies:

  • Airflow 2.x → openlineage-airflow>=1.2.0 (standalone package)
  • Airflow 3.x → apache-airflow-providers-openlineage>=1.0.0 (native provider)

Installing without the appropriate extra will result in missing OpenLineage dependencies and lineage extraction will not work.

🔧 Airflow 3.x Specific Changes

API & Configuration

  • JWT Authentication: Token-based auth instead of HTTP Basic Auth
  • Config Keys: Automatic detection of version-appropriate keys (webserver.base_url → api.base_url)
  • Log URLs: Updated format to match Airflow 3.x's simplified structure

Database Access Restrictions

  • Kill Switch: Uses environment variable AIRFLOW_VAR_DATAHUB_AIRFLOW_PLUGIN_DISABLE_LISTENER=true (vs Airflow Variables in 2.x)
  • Complies with Airflow 3.x HA lock restrictions (no database commits in listener hooks)

Hook & Parameter Changes

  • Listener Signatures: Handles removed session parameter and new error parameter using **kwargs
  • DAG Parameters: schedule_interval → schedule, removed default_view
  • Template Rendering: Skip deepcopy for RuntimeTaskInstance (already rendered)
  • SubDAG Removal: Graceful handling (users should migrate to TaskGroups)

✅ Testing & Compatibility

Test Coverage:

  • ✅ All integration tests passing on Airflow 2.7, 2.8, 2.9, 2.10, 3.0, and 3.1
  • ✅ Golden files verified for both Airflow 2.x and 3.x
  • ✅ Column-level lineage working across all SQL operators
  • ✅ Threading support confirmed working in both versions
  • ✅ DAG metadata extraction, execution tracking, and lineage capture all functional

Compatibility Matrix:

Airflow Version Status Notes
2.5 - 2.6 ✅ Supported Use [plugin-v2] extra
2.7 - 2.10 ✅ Fully Supported Use [plugin-v2] extra
3.0.x ⚠️ Supported with pydantic bump Use [plugin-v2-airflow3] + pydantic>=2.11.8
3.1+ ✅ Fully Supported Use [plugin-v2-airflow3] extra

📚 Documentation

Updated Files

  • ✅ README.md: Added comprehensive installation section with version-specific instructions
  • ✅ docs/how/updating-datahub.md: Added breaking change entry explaining installation requirements

Migration Guide Covers

  • Architecture differences between Airflow 2.x and 3.x implementations
  • Step-by-step migration instructions
  • Configuration changes and troubleshooting
  • Known limitations and workarounds
  • Testing and verification procedures

⚠️ Breaking Changes

Installation Method Change:

Users must now specify the appropriate extra when installing the plugin. Installing with just pip install acryl-datahub-airflow-plugin (without extras) will NOT work - OpenLineage dependencies will
be missing.

For Airflow 2.x users:

Before (may have worked):

pip install acryl-datahub-airflow-plugin

Now (required):

pip install 'acryl-datahub-airflow-plugin[plugin-v2]'

For Airflow 3.x users:

Required:

pip install 'acryl-datahub-airflow-plugin[plugin-v2-airflow3]'

Other Changes for Airflow 3.x:

  • Kill switch now uses environment variable instead of Airflow Variable
  • SubDAGs not supported (migrate to TaskGroups - removed by Airflow 3.x, not this plugin)
  • Configuration keys automatically detected (no action needed)

🎯 Code Quality Improvements

  • Cleaner imports: Version-specific modules with clear separation
  • Type safety: Proper typing for each Airflow version without conflicts
  • Better error handling: Graceful fallbacks and clear error messages
  • Reduced complexity: Eliminated scattered version conditionals
  • Improved logging: Appropriate log levels reduce noise

📝 Additional Notes

  • The version dispatcher automatically selects the correct implementation at runtime (no user action needed)
  • Both implementations are fully tested and production-ready
  • The unified SQLParser patch in Airflow 3.x provides better consistency than operator-specific extractors
  • Threading remains enabled by default and works correctly in both versions
  • No changes required for existing Airflow 2.x deployments (except installation extra)

@github-actions github-actions bot added the ingestion PR or Issue related to the ingestion of metadata label Jun 17, 2025
@datahub-cyborg datahub-cyborg bot added the needs-review Label for PRs that need review from a maintainer. label Jun 17, 2025

from datahub.utilities._markupsafe_compat import MARKUPSAFE_PATCHED

assert MARKUPSAFE_PATCHED
Copy link

@aikido-pr-checks aikido-pr-checks bot Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dangerous use of assert - low severity
When running Python in production in optimized mode, assert calls are not executed. This mode is enabled by setting the PYTHONOPTIMIZE command line flag. Optimized mode is usually ON in production. Any safety check done using assert will not be executed.

Remediation: Raise an exception instead of using assert.
View details in Aikido Security


dagrun: "DagRun" = _get_dagrun_from_task_instance(task_instance)
task = task_instance.task
assert task is not None
Copy link

@aikido-pr-checks aikido-pr-checks bot Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dangerous use of assert - low severity
When running Python in production in optimized mode, assert calls are not executed. This mode is enabled by setting the PYTHONOPTIMIZE command line flag. Optimized mode is usually ON in production. Any safety check done using assert will not be executed.

Remediation: Raise an exception instead of using assert.
View details in Aikido Security

logger.debug(f"Completed emitting all DataFlow MCPs for {dataflow.urn}")

if dag.dag_id == _DATAHUB_CLEANUP_DAG:
assert self.graph
Copy link

@aikido-pr-checks aikido-pr-checks bot Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dangerous use of assert - low severity
When running Python in production in optimized mode, assert calls are not executed. This mode is enabled by setting the PYTHONOPTIMIZE command line flag. Optimized mode is usually ON in production. Any safety check done using assert will not be executed.

Remediation: Raise an exception instead of using assert.
View details in Aikido Security

f"DataHub listener got notification about dag run start for {dag_run.dag_id}"
)

assert dag_run.dag_id
Copy link

@aikido-pr-checks aikido-pr-checks bot Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dangerous use of assert - low severity
When running Python in production in optimized mode, assert calls are not executed. This mode is enabled by setting the PYTHONOPTIMIZE command line flag. Optimized mode is usually ON in production. Any safety check done using assert will not be executed.

Remediation: Raise an exception instead of using assert.
View details in Aikido Security

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ingestion PR or Issue related to the ingestion of metadata needs-review Label for PRs that need review from a maintainer.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants