Skip to content

Commit 1cbed3d

Browse files
authored
Use name passed to @asset decorator when fetching the asset. (#56434)
1 parent 04d3c44 commit 1cbed3d

File tree

2 files changed

+26
-2
lines changed

2 files changed

+26
-2
lines changed

task-sdk/src/airflow/sdk/definitions/asset/decorators.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ def from_definition(cls, definition: AssetDefinition | MultiAssetDefinition) ->
6666
],
6767
outlets=[v for _, v in definition.iter_assets()],
6868
python_callable=definition._function,
69-
definition_name=definition._function.__name__,
69+
definition_name=definition.name,
7070
)
7171

7272
def _iter_kwargs(self, context: Mapping[str, Any]) -> Iterator[tuple[str, Any]]:
@@ -137,6 +137,7 @@ class MultiAssetDefinition(BaseAsset):
137137
:meta private:
138138
"""
139139

140+
name: str
140141
_function: Callable
141142
_source: asset.multi
142143

@@ -231,7 +232,7 @@ def __call__(self, f: Callable) -> MultiAssetDefinition:
231232
raise ValueError("nested function not supported")
232233
if not self.outlets:
233234
raise ValueError("no outlets provided")
234-
return MultiAssetDefinition(function=f, source=self)
235+
return MultiAssetDefinition(function=f, source=self, name=f.__name__)
235236

236237
def __call__(self, f: Callable) -> AssetDefinition:
237238
if f.__name__ != f.__qualname__:

task-sdk/tests/task_sdk/definitions/test_asset_decorators.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -437,3 +437,26 @@ def test_determine_kwargs_defaults(
437437
assert mock_supervisor_comms.mock_calls == [
438438
mock.call.send(GetAssetByName(name="inlet_asset_1")),
439439
]
440+
441+
def test_from_definition_custom_name(self, mock_supervisor_comms, func_fixer):
442+
@func_fixer
443+
def example_asset_func(self):
444+
pass
445+
446+
definition = asset(schedule=None, name="custom_name")(example_asset_func)
447+
op = _AssetMainOperator.from_definition(definition)
448+
assert op.task_id == "example_asset_func"
449+
assert op.python_callable == example_asset_func
450+
assert op._definition_name == "custom_name"
451+
452+
mock_supervisor_comms.send.side_effect = [
453+
AssetResult(name="custom_name", uri="s3://bucket/object1", group="Asset")
454+
]
455+
456+
assert op.determine_kwargs(context={}) == {
457+
"self": Asset(name="custom_name", uri="s3://bucket/object1", group="Asset")
458+
}
459+
460+
assert mock_supervisor_comms.mock_calls == [
461+
mock.call.send(GetAssetByName(name="custom_name", uri="s3://bucket/object1", group="Asset"))
462+
]

0 commit comments

Comments
 (0)