Skip to content

Commit 3ca63ef

Browse files
Scd2 incremental key column (#1712)
* Add PostgreSQL and Snowflake unit tests for scd2_by_column with incremental_key Co-authored-by: Sabri Karagönen <[email protected]> * Add unit tests for scd2_by_column with incremental_key across all platforms Co-authored-by: Sabri Karagönen <[email protected]> * Update documentation for scd2_by_column incremental_key feature Co-authored-by: Sabri Karagönen <[email protected]> * Fix dupword lint warnings in test files Co-authored-by: Sabri Karagönen <[email protected]> * Add integration tests for scd2_by_column with incremental_key using DuckDB Co-authored-by: Sabri Karagönen <[email protected]> * Allow incremental_key with scd2_by_column strategy in linter validation Co-authored-by: Sabri Karagönen <[email protected]> * Add scd2-by-column-incremental-key connection to default environment Co-authored-by: Sabri Karagönen <[email protected]> * Fix timestamp format in integration test expectations Co-authored-by: Sabri Karagönen <[email protected]> * Update expected connections JSON to include scd2-by-column-incremental-key connection Co-authored-by: Sabri Karagönen <[email protected]> --------- Co-authored-by: Cursor Agent <[email protected]>
1 parent 879a7bc commit 3ca63ef

28 files changed

Lines changed: 1235 additions & 72 deletions

docs/assets/materialization.md

Lines changed: 60 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -435,10 +435,28 @@ When changes are detected in non-primary key columns:
435435

436436
**Automatically added columns:**
437437

438-
- `_valid_from`: TIMESTAMP when the record version became active (set to `CURRENT_TIMESTAMP()`)
439-
- `_valid_until`: TIMESTAMP when the record version became inactive (set to `TIMESTAMP('9999-12-31')` for current records)
438+
- `_valid_from`: TIMESTAMP when the record version became active (defaults to `CURRENT_TIMESTAMP()`, or uses `incremental_key` value if specified)
439+
- `_valid_until`: TIMESTAMP when the record version became inactive (set to `TIMESTAMP('9999-12-31')` for current records, or uses `incremental_key` value when a record is expired due to changes)
440440
- `_is_current`: BOOLEAN indicating if this is the current version of the record
441441

442+
**Optional: Using `incremental_key` for timestamps:**
443+
444+
By default, `_valid_from` and `_valid_until` are set using `CURRENT_TIMESTAMP()`. However, if your source data has a column that indicates when changes actually occurred (e.g., an `updated_at` timestamp), you can specify it using the `incremental_key` option:
445+
446+
```yaml
447+
materialization:
448+
type: table
449+
strategy: scd2_by_column
450+
incremental_key: updated_at
451+
```
452+
453+
When `incremental_key` is specified:
454+
- `_valid_from` for new/updated records will be set to the value of the `incremental_key` column
455+
- `_valid_until` for records being expired (due to changes) will be set to the value of the `incremental_key` column from the new record
456+
- Records expiring because they're no longer in the source data will still use `CURRENT_TIMESTAMP()` for `_valid_until`
457+
458+
This is useful when you want the SCD2 timeline to reflect the actual business timestamps from your source data rather than the processing time.
459+
442460
**NOTE:***
443461

444462
- Unless otherwise specified by `partition_by`, the SCD2 table will be partitioned by `_valid_from` for platforms which support partitioning (BigQuery, Athena, Snowflake).
@@ -475,6 +493,43 @@ UNION ALL
475493
SELECT 3 AS ID, 'Keyboard' AS Name, 89.99 AS Price
476494
```
477495

496+
**Example with `incremental_key`:**
497+
498+
When you want `_valid_from` and `_valid_until` to reflect actual business timestamps instead of processing time:
499+
500+
```bruin-sql
501+
/* @bruin
502+
name: test.product_catalog
503+
type: bq.sql
504+
505+
materialization:
506+
type: table
507+
strategy: scd2_by_column
508+
incremental_key: updated_at
509+
510+
columns:
511+
- name: ID
512+
type: INTEGER
513+
description: "Unique identifier for Product"
514+
primary_key: true
515+
- name: Name
516+
type: VARCHAR
517+
description: "Name of the Product"
518+
- name: Price
519+
type: FLOAT
520+
description: "Price of the Product"
521+
- name: updated_at
522+
type: TIMESTAMP
523+
description: "When the product was last modified in the source system"
524+
@bruin */
525+
526+
SELECT 1 AS ID, 'Wireless Mouse' AS Name, 29.99 AS Price, TIMESTAMP '2024-01-15 10:30:00' AS updated_at
527+
UNION ALL
528+
SELECT 2 AS ID, 'USB Cable' AS Name, 12.99 AS Price, TIMESTAMP '2024-01-14 14:00:00' AS updated_at
529+
```
530+
531+
In this case, `_valid_from` will be set to the `updated_at` value from each record, preserving the actual business timeline of when changes occurred.
532+
478533
**Example behavior:**
479534

480535
Let's say you want to create a new table to track product catalog with SCD2. If the table doesn't exist yet, you'll need an initial run with the `--full-refresh` flag:
@@ -628,9 +683,9 @@ Notice how:
628683
| Aspect | scd2_by_column | scd2_by_time |
629684
|--------|----------------|--------------|
630685
| **Change Detection** | Automatically detects changes in any non-primary key column | Based on time values in the incremental_key column |
631-
| **_valid_from Value** | Set to `CURRENT_TIMESTAMP()` when change is processed | Derived from the incremental_key column value |
632-
| **Use Case** | When you want to track any column changes regardless of when they occurred | When your source data has reliable timestamps indicating when changes happened |
633-
| **Configuration** | Only requires primary_key columns | Requires both primary_key columns and incremental_key |
686+
| **_valid_from Value** | Set to `CURRENT_TIMESTAMP()` by default, or uses `incremental_key` value if specified | Always derived from the incremental_key column value |
687+
| **Use Case** | When you want to track any column changes; optionally use `incremental_key` for business timestamps | When your source data has reliable timestamps indicating when changes happened |
688+
| **Configuration** | Only requires primary_key columns; `incremental_key` is optional | Requires both primary_key columns and incremental_key |
634689

635690
> [!WARNING]
636691
> SCD2 materializations are currently only supported for BigQuery, Snowflake, Postgres, Amazon Redshift, MySQL, DuckDB, and Databricks.

integration-tests/.bruin.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ environments:
1111
path: "duckdb-files/scd2-by-column.db"
1212
- name: "duckdb-scd2-by-time"
1313
path: "duckdb-files/scd2-by-time.db"
14+
- name: "duckdb-scd2-by-column-incremental-key"
15+
path: "duckdb-files/scd2-by-column-incremental-key.db"
1416
- name: "duckdb-start-date-flags"
1517
path: "duckdb-files/start-date-flags.db"
1618
- name: "duckdb-mat-test"
@@ -244,6 +246,12 @@ environments:
244246
- name: "duckdb-scd2-by-time"
245247
path: "duckdb-files/scd2-by-time.db"
246248

249+
env-scd2-by-column-incremental-key:
250+
connections:
251+
duckdb:
252+
- name: "duckdb-scd2-by-column-incremental-key"
253+
path: "duckdb-files/scd2-by-column-incremental-key.db"
254+
247255
env-duckdb-decimal:
248256
connections:
249257
duckdb:

integration-tests/expectations/expected_connections.json

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@
2626
"name": "duckdb-scd2-by-time",
2727
"path": "duckdb-files/scd2-by-time.db"
2828
},
29+
{
30+
"name": "duckdb-scd2-by-column-incremental-key",
31+
"path": "duckdb-files/scd2-by-column-incremental-key.db"
32+
},
2933
{
3034
"name": "duckdb-start-date-flags",
3135
"path": "duckdb-files/start-date-flags.db"
@@ -77,6 +81,10 @@
7781
"name": "duckdb-scd2-by-time",
7882
"path": "duckdb-files/scd2-by-time.db"
7983
},
84+
{
85+
"name": "duckdb-scd2-by-column-incremental-key",
86+
"path": "duckdb-files/scd2-by-column-incremental-key.db"
87+
},
8088
{
8189
"name": "duckdb-start-date-flags",
8290
"path": "duckdb-files/start-date-flags.db"
@@ -494,6 +502,17 @@
494502
},
495503
"schema_prefix": ""
496504
},
505+
"env-scd2-by-column-incremental-key": {
506+
"connections": {
507+
"duckdb": [
508+
{
509+
"name": "duckdb-scd2-by-column-incremental-key",
510+
"path": "duckdb-files/scd2-by-column-incremental-key.db"
511+
}
512+
]
513+
},
514+
"schema_prefix": ""
515+
},
497516
"env-scd2-by-time": {
498517
"connections": {
499518
"duckdb": [

integration-tests/integration_test.go

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2168,6 +2168,147 @@ func TestWorkflowTasks(t *testing.T) {
21682168
},
21692169
},
21702170
},
2171+
{
2172+
name: "run_pipeline_with_scd2_by_column_incremental_key",
2173+
workflow: e2e.Workflow{
2174+
Name: "run_pipeline_with_scd2_by_column_incremental_key",
2175+
Steps: []e2e.Task{
2176+
{
2177+
Name: "scd2-col-ik-01a: create test directory",
2178+
Command: "mkdir",
2179+
Args: []string{"-p", filepath.Join(tempdir, "test-scd2-by-column-incremental-key")},
2180+
Expected: e2e.Output{
2181+
ExitCode: 0,
2182+
},
2183+
Asserts: []func(*e2e.Task) error{
2184+
e2e.AssertByExitCode,
2185+
},
2186+
},
2187+
{
2188+
Name: "scd2-col-ik-01b: initialize git repository",
2189+
Command: "git",
2190+
Args: []string{"init"},
2191+
WorkingDir: filepath.Join(tempdir, "test-scd2-by-column-incremental-key"),
2192+
Expected: e2e.Output{
2193+
ExitCode: 0,
2194+
},
2195+
Asserts: []func(*e2e.Task) error{
2196+
e2e.AssertByExitCode,
2197+
},
2198+
},
2199+
{
2200+
Name: "scd2-col-ik-01c: copy pipeline files",
2201+
Command: "cp",
2202+
Args: []string{"-a", filepath.Join(currentFolder, "test-pipelines/duckdb-scd2-tests/scd2-by-column-incremental-key-pipeline"), "."},
2203+
WorkingDir: filepath.Join(tempdir, "test-scd2-by-column-incremental-key"),
2204+
Expected: e2e.Output{
2205+
ExitCode: 0,
2206+
},
2207+
Asserts: []func(*e2e.Task) error{
2208+
e2e.AssertByExitCode,
2209+
},
2210+
},
2211+
{
2212+
Name: "scd2-col-ik-02: run pipeline with full refresh",
2213+
Command: binary,
2214+
Args: []string{"run", "--full-refresh", "--config-file", filepath.Join(currentFolder, ".bruin.yml"), "--env", "env-scd2-by-column-incremental-key", filepath.Join(tempdir, "test-scd2-by-column-incremental-key/scd2-by-column-incremental-key-pipeline")},
2215+
Env: []string{},
2216+
Expected: e2e.Output{
2217+
ExitCode: 0,
2218+
},
2219+
Asserts: []func(*e2e.Task) error{
2220+
e2e.AssertByExitCode,
2221+
},
2222+
},
2223+
{
2224+
Name: "scd2-col-ik-03: query the initial table",
2225+
Command: binary,
2226+
Args: []string{"query", "--connection", "duckdb-scd2-by-column-incremental-key", "--query", "SELECT product_id, product_name, price, _is_current, _valid_from FROM test.products ORDER BY product_id, _valid_from;", "--output", "csv"},
2227+
Env: []string{},
2228+
Expected: e2e.Output{
2229+
ExitCode: 0,
2230+
CSVFile: filepath.Join(currentFolder, "test-pipelines/duckdb-scd2-tests/scd2-by-column-incremental-key-pipeline/expectations/scd2_by_col_ik_expected_initial.csv"),
2231+
},
2232+
Asserts: []func(*e2e.Task) error{
2233+
e2e.AssertByExitCode,
2234+
e2e.AssertByCSV,
2235+
},
2236+
},
2237+
{
2238+
Name: "scd2-col-ik-04a: copy products_incremental_key_updated_01.sql",
2239+
Command: "cp",
2240+
Args: []string{filepath.Join(currentFolder, "test-pipelines/duckdb-scd2-tests/resources/products_incremental_key_updated_01.sql"), filepath.Join(tempdir, "test-scd2-by-column-incremental-key/scd2-by-column-incremental-key-pipeline/assets/products.sql")},
2241+
Expected: e2e.Output{
2242+
ExitCode: 0,
2243+
},
2244+
Asserts: []func(*e2e.Task) error{
2245+
e2e.AssertByExitCode,
2246+
},
2247+
},
2248+
{
2249+
Name: "scd2-col-ik-04b: run pipeline with updated products",
2250+
Command: binary,
2251+
Args: []string{"run", "--config-file", filepath.Join(currentFolder, ".bruin.yml"), "--env", "env-scd2-by-column-incremental-key", filepath.Join(tempdir, "test-scd2-by-column-incremental-key/scd2-by-column-incremental-key-pipeline")},
2252+
Expected: e2e.Output{
2253+
ExitCode: 0,
2254+
},
2255+
Asserts: []func(*e2e.Task) error{
2256+
e2e.AssertByExitCode,
2257+
},
2258+
},
2259+
{
2260+
Name: "scd2-col-ik-05: query the updated table 01",
2261+
Command: binary,
2262+
Args: []string{"query", "--connection", "duckdb-scd2-by-column-incremental-key", "--query", "SELECT product_id, product_name, price, _is_current, _valid_from FROM test.products ORDER BY product_id, _valid_from;", "--output", "csv"},
2263+
Env: []string{},
2264+
Expected: e2e.Output{
2265+
ExitCode: 0,
2266+
CSVFile: filepath.Join(currentFolder, "test-pipelines/duckdb-scd2-tests/scd2-by-column-incremental-key-pipeline/expectations/scd2_by_col_ik_expected_updated_01.csv"),
2267+
},
2268+
Asserts: []func(*e2e.Task) error{
2269+
e2e.AssertByExitCode,
2270+
e2e.AssertByCSV,
2271+
},
2272+
},
2273+
{
2274+
Name: "scd2-col-ik-06a: copy products_incremental_key_updated_02.sql",
2275+
Command: "cp",
2276+
Args: []string{filepath.Join(currentFolder, "test-pipelines/duckdb-scd2-tests/resources/products_incremental_key_updated_02.sql"), filepath.Join(tempdir, "test-scd2-by-column-incremental-key/scd2-by-column-incremental-key-pipeline/assets/products.sql")},
2277+
Expected: e2e.Output{
2278+
ExitCode: 0,
2279+
},
2280+
Asserts: []func(*e2e.Task) error{
2281+
e2e.AssertByExitCode,
2282+
},
2283+
},
2284+
{
2285+
Name: "scd2-col-ik-06b: run pipeline with updated products 02",
2286+
Command: binary,
2287+
Args: []string{"run", "--config-file", filepath.Join(currentFolder, ".bruin.yml"), "--env", "env-scd2-by-column-incremental-key", filepath.Join(tempdir, "test-scd2-by-column-incremental-key/scd2-by-column-incremental-key-pipeline")},
2288+
Expected: e2e.Output{
2289+
ExitCode: 0,
2290+
},
2291+
Asserts: []func(*e2e.Task) error{
2292+
e2e.AssertByExitCode,
2293+
},
2294+
},
2295+
{
2296+
Name: "scd2-col-ik-07: query the updated table 02",
2297+
Command: binary,
2298+
Args: []string{"query", "--connection", "duckdb-scd2-by-column-incremental-key", "--query", "SELECT product_id, product_name, price, _is_current, _valid_from FROM test.products ORDER BY product_id, _valid_from;", "--output", "csv"},
2299+
Env: []string{},
2300+
Expected: e2e.Output{
2301+
ExitCode: 0,
2302+
CSVFile: filepath.Join(currentFolder, "test-pipelines/duckdb-scd2-tests/scd2-by-column-incremental-key-pipeline/expectations/scd2_by_col_ik_expected_updated_02.csv"),
2303+
},
2304+
Asserts: []func(*e2e.Task) error{
2305+
e2e.AssertByExitCode,
2306+
e2e.AssertByCSV,
2307+
},
2308+
},
2309+
},
2310+
},
2311+
},
21712312
{
21722313
name: "start_date_flags_workflow",
21732314
workflow: e2e.Workflow{
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/* @bruin
2+
name: test.products
3+
type: duckdb.sql
4+
materialization:
5+
type: table
6+
strategy: scd2_by_column
7+
incremental_key: updated_at
8+
9+
columns:
10+
- name: product_id
11+
type: INTEGER
12+
description: "Unique identifier for Product"
13+
primary_key: true
14+
- name: product_name
15+
type: VARCHAR
16+
description: "Name of the Product"
17+
- name: price
18+
type: FLOAT
19+
description: "Price of the Product"
20+
- name: updated_at
21+
type: TIMESTAMP
22+
description: "When the product was last updated"
23+
@bruin */
24+
25+
SELECT 1 AS product_id, 'Laptop' AS product_name, 999.99 AS price, TIMESTAMP '2024-01-15 10:00:00' AS updated_at
26+
UNION ALL
27+
SELECT 2 AS product_id, 'Mouse' AS product_name, 29.99 AS price, TIMESTAMP '2024-01-15 10:00:00' AS updated_at
28+
UNION ALL
29+
SELECT 3 AS product_id, 'Keyboard' AS product_name, 79.99 AS price, TIMESTAMP '2024-01-15 10:00:00' AS updated_at
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/* @bruin
2+
name: test.products
3+
type: duckdb.sql
4+
materialization:
5+
type: table
6+
strategy: scd2_by_column
7+
incremental_key: updated_at
8+
9+
columns:
10+
- name: product_id
11+
type: INTEGER
12+
description: "Unique identifier for Product"
13+
primary_key: true
14+
- name: product_name
15+
type: VARCHAR
16+
description: "Name of the Product"
17+
- name: price
18+
type: FLOAT
19+
description: "Price of the Product"
20+
- name: updated_at
21+
type: TIMESTAMP
22+
description: "When the product was last updated"
23+
@bruin */
24+
25+
-- Update 1: Laptop price changed from 999.99 to 1099.99 at 2024-02-01
26+
SELECT 1 AS product_id, 'Laptop' AS product_name, 1099.99 AS price, TIMESTAMP '2024-02-01 14:30:00' AS updated_at
27+
UNION ALL
28+
SELECT 2 AS product_id, 'Mouse' AS product_name, 29.99 AS price, TIMESTAMP '2024-01-15 10:00:00' AS updated_at
29+
UNION ALL
30+
SELECT 3 AS product_id, 'Keyboard' AS product_name, 79.99 AS price, TIMESTAMP '2024-01-15 10:00:00' AS updated_at
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/* @bruin
2+
name: test.products
3+
type: duckdb.sql
4+
materialization:
5+
type: table
6+
strategy: scd2_by_column
7+
incremental_key: updated_at
8+
9+
columns:
10+
- name: product_id
11+
type: INTEGER
12+
description: "Unique identifier for Product"
13+
primary_key: true
14+
- name: product_name
15+
type: VARCHAR
16+
description: "Name of the Product"
17+
- name: price
18+
type: FLOAT
19+
description: "Price of the Product"
20+
- name: updated_at
21+
type: TIMESTAMP
22+
description: "When the product was last updated"
23+
@bruin */
24+
25+
-- Update 2: Keyboard removed, Monitor added at 2024-03-01
26+
SELECT 1 AS product_id, 'Laptop' AS product_name, 1099.99 AS price, TIMESTAMP '2024-02-01 14:30:00' AS updated_at
27+
UNION ALL
28+
SELECT 2 AS product_id, 'Mouse' AS product_name, 29.99 AS price, TIMESTAMP '2024-01-15 10:00:00' AS updated_at
29+
UNION ALL
30+
SELECT 4 AS product_id, 'Monitor' AS product_name, 299.99 AS price, TIMESTAMP '2024-03-01 09:00:00' AS updated_at

0 commit comments

Comments
 (0)