Skip to content

Commit 9c66c36

Browse files
WIP: Load method strategies
Signed-off-by: Edgar Ramírez Mondragón <edgarrm358@gmail.com>
1 parent 26b30fa commit 9c66c36

12 files changed

Lines changed: 2431 additions & 17 deletions

File tree

singer_sdk/sql/__init__.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,38 @@
33
from __future__ import annotations
44

55
from singer_sdk.sql.connector import SQLConnector
6+
from singer_sdk.sql.load_strategies import (
7+
AppendOnlyStrategy,
8+
LoadMethodStrategy,
9+
OverwriteStrategy,
10+
UpsertStrategy,
11+
)
12+
from singer_sdk.sql.loaders import (
13+
Loader,
14+
MergeUpsertLoader,
15+
SimpleInsertLoader,
16+
TempTableUpsertLoader,
17+
)
618
from singer_sdk.sql.sink import SQLSink
719
from singer_sdk.sql.stream import SQLStream
820
from singer_sdk.sql.tap import SQLTap
921
from singer_sdk.sql.target import SQLTarget
1022

1123
__all__ = [
24+
# Load method strategies
25+
"AppendOnlyStrategy",
26+
"LoadMethodStrategy",
27+
# Loaders (for customization)
28+
"Loader",
29+
"MergeUpsertLoader",
30+
"OverwriteStrategy",
31+
# SQL components
1232
"SQLConnector",
1333
"SQLSink",
1434
"SQLStream",
1535
"SQLTap",
1636
"SQLTarget",
37+
"SimpleInsertLoader",
38+
"TempTableUpsertLoader",
39+
"UpsertStrategy",
1740
]

singer_sdk/sql/connector.py

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@
3939
ReflectedPrimaryKeyConstraint,
4040
)
4141

42+
from singer_sdk.sql.load_strategies import LoadMethodStrategy
43+
from singer_sdk.sql.sink import SQLSink
44+
4245

4346
class FullyQualifiedName(UserString):
4447
"""A fully qualified table name.
@@ -605,6 +608,7 @@ def __init__(
605608
self._config: dict[str, t.Any] = config or {}
606609
self._sqlalchemy_url: str | None = sqlalchemy_url or None
607610
self._tables_prepared: dict[str, bool] = {}
611+
self._load_strategy: LoadMethodStrategy | None = None
608612

609613
@property
610614
def config(self) -> dict:
@@ -647,6 +651,51 @@ def jsonschema_to_sql(self) -> JSONSchemaToSQL:
647651
max_varchar_length=self.max_varchar_length,
648652
)
649653

654+
def _create_load_strategy(
655+
self,
656+
sink: SQLSink, # SQLSink, but avoid circular import
657+
) -> LoadMethodStrategy: # LoadMethodStrategy
658+
"""Create appropriate load strategy based on config.
659+
660+
This factory method instantiates the correct strategy class based on the
661+
load_method configuration. Developers can override this to provide custom
662+
strategies for their database.
663+
664+
Args:
665+
sink: The SQLSink instance that will use this strategy.
666+
667+
Returns:
668+
The appropriate LoadMethodStrategy instance.
669+
670+
Raises:
671+
ValueError: If load_method is unknown or unsupported.
672+
"""
673+
# Import here to avoid circular dependency
674+
from singer_sdk.sql.load_strategies import ( # noqa: PLC0415
675+
AppendOnlyStrategy,
676+
OverwriteStrategy,
677+
UpsertStrategy,
678+
)
679+
680+
load_method = self.config.get("load_method", TargetLoadMethods.APPEND_ONLY)
681+
682+
# Map load methods to strategy classes
683+
strategies: dict[str, type[LoadMethodStrategy]] = {
684+
TargetLoadMethods.APPEND_ONLY: AppendOnlyStrategy,
685+
TargetLoadMethods.UPSERT: UpsertStrategy,
686+
TargetLoadMethods.OVERWRITE: OverwriteStrategy,
687+
}
688+
689+
strategy_class = strategies.get(load_method)
690+
if not strategy_class:
691+
msg = (
692+
f"Unknown load_method: {load_method}. "
693+
f"Supported methods: {list(strategies.keys())}"
694+
)
695+
raise ValueError(msg)
696+
697+
return strategy_class(connector=self, sink=sink)
698+
650699
@contextmanager
651700
def _connect(self) -> t.Iterator[sa.Connection]:
652701
with self._engine.connect().execution_options(stream_results=True) as conn:
@@ -1388,6 +1437,56 @@ def prepare_table(
13881437
) -> None:
13891438
"""Adapt target table to provided schema if possible.
13901439
1440+
Args:
1441+
full_table_name: the target table name.
1442+
schema: the JSON Schema for the table.
1443+
primary_keys: list of key properties.
1444+
partition_keys: list of partition keys.
1445+
as_temp_table: True to create a temp table.
1446+
"""
1447+
# If temp table, always use legacy path (strategies don't handle temp tables)
1448+
if as_temp_table:
1449+
self._prepare_table_legacy(
1450+
full_table_name=full_table_name,
1451+
schema=schema,
1452+
primary_keys=primary_keys,
1453+
partition_keys=partition_keys,
1454+
as_temp_table=as_temp_table,
1455+
)
1456+
return
1457+
1458+
# Check if we have a load strategy (new path)
1459+
if self._load_strategy:
1460+
# Delegate to load strategy
1461+
self._load_strategy.prepare_table(
1462+
full_table_name=str(full_table_name),
1463+
schema=schema,
1464+
primary_keys=primary_keys,
1465+
)
1466+
return
1467+
1468+
# Backward compatibility: use legacy path if no strategy
1469+
self._prepare_table_legacy(
1470+
full_table_name=full_table_name,
1471+
schema=schema,
1472+
primary_keys=primary_keys,
1473+
partition_keys=partition_keys,
1474+
as_temp_table=as_temp_table,
1475+
)
1476+
1477+
def _prepare_table_legacy(
1478+
self,
1479+
full_table_name: str | FullyQualifiedName,
1480+
schema: dict,
1481+
primary_keys: t.Sequence[str],
1482+
partition_keys: list[str] | None = None,
1483+
as_temp_table: bool = False, # noqa: FBT002, FBT001
1484+
) -> None:
1485+
"""Legacy table preparation logic (backward compatibility).
1486+
1487+
This method contains the original prepare_table() implementation,
1488+
kept for backward compatibility when load strategies are not used.
1489+
13911490
Args:
13921491
full_table_name: the target table name.
13931492
schema: the JSON Schema for the table.

0 commit comments

Comments
 (0)