Skip to content

Commit 856b2d4

Browse files
authored
feat(dbt sync): Sync calculated columns from dbt (#332)
* feat(dbt sync): Sync calculated columns from dbt * Fixing test docstring * Addressing PR feedback
1 parent 120d377 commit 856b2d4

File tree

2 files changed

+473
-10
lines changed

2 files changed

+473
-10
lines changed

src/preset_cli/cli/superset/sync/dbt/datasets.py

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -238,12 +238,13 @@ def compute_columns(
238238
return final_dataset_columns
239239

240240

241-
def compute_columns_metadata(
241+
def compute_columns_metadata( # pylint: disable=too-many-branches, too-many-arguments # noqa: C901
242242
dbt_columns: List[Any],
243243
dataset_columns: List[Any],
244244
reload_columns: bool,
245245
merge_metadata: bool,
246-
column_defaults: Dict[str, Any] | None = None,
246+
column_defaults: Dict[str, Any],
247+
dbt_calc_columns: List[Dict[str, Any]],
247248
) -> List[Any]:
248249
"""
249250
Adds dbt metadata to dataset columns.
@@ -267,12 +268,34 @@ def compute_columns_metadata(
267268
dict_merge(final_column, dbt_metadata[column])
268269
dbt_metadata[column] = final_column
269270

271+
dbt_calc_columns_by_name = {c["column_name"]: c for c in dbt_calc_columns}
272+
273+
if column_defaults:
274+
for column, definition in dbt_calc_columns_by_name.items():
275+
final_column = copy.deepcopy(column_defaults)
276+
dict_merge(final_column, definition)
277+
dbt_calc_columns_by_name[column] = final_column
278+
if reload_columns and dbt_calc_columns_by_name:
279+
dataset_columns = [
280+
column
281+
for column in dataset_columns
282+
if not column.get("expression")
283+
or column["column_name"] in dbt_calc_columns_by_name
284+
]
285+
270286
for column in dataset_columns:
271287
name = column["column_name"]
288+
# regular column
272289
if name in dbt_metadata:
273290
for key, value in dbt_metadata[name].items():
274291
if reload_columns or merge_metadata or not column.get(key):
275292
column[key] = value
293+
# calculated column
294+
elif name in dbt_calc_columns_by_name:
295+
for key, value in dbt_calc_columns_by_name[name].items():
296+
if reload_columns or merge_metadata or not column.get(key):
297+
column[key] = value
298+
del dbt_calc_columns_by_name[name]
276299
elif column_defaults and (reload_columns or merge_metadata):
277300
for key, value in column_defaults.items():
278301
column[key] = value
@@ -285,6 +308,11 @@ def compute_columns_metadata(
285308
if "is_active" in column and column["is_active"] is None:
286309
del column["is_active"]
287310

311+
# Add new calc columns to list
312+
if dbt_calc_columns_by_name:
313+
for definition in dbt_calc_columns_by_name.values():
314+
dataset_columns.append(definition)
315+
288316
return dataset_columns
289317

290318

@@ -384,6 +412,11 @@ def sync_datasets( # pylint: disable=too-many-locals, too-many-arguments
384412
failed_datasets.append(model["unique_id"])
385413
continue
386414

415+
# get calculated columns from model
416+
calculated_columns = (
417+
model.get("meta", {}).get("superset", {}).pop("calculated_columns", [])
418+
)
419+
387420
# compute update payload
388421
update = compute_dataset_metadata(
389422
model,
@@ -403,14 +436,16 @@ def sync_datasets( # pylint: disable=too-many-locals, too-many-arguments
403436
continue
404437

405438
# update column metadata
406-
if dbt_columns := model.get("columns"):
439+
dbt_columns = model.get("columns")
440+
if dbt_columns or calculated_columns:
407441
current_dataset_columns = client.get_dataset(dataset["id"])["columns"]
408442
dataset_columns = compute_columns_metadata(
409443
dbt_columns,
410444
current_dataset_columns,
411445
reload_columns,
412446
merge_metadata,
413-
column_defaults=default_configs.get("columns", {}),
447+
default_configs.get("columns", {}),
448+
calculated_columns,
414449
)
415450
try:
416451
client.update_dataset(

0 commit comments

Comments
 (0)