Conversation
Reviewer's GuideIntroduces a strategy/loader abstraction for SQL load methods (append-only, overwrite, upsert), wires SQLConnector/SQLSink to use a per-sink LoadMethodStrategy instead of hardcoded logic, and adds extensive tests to validate behavior and configuration for each strategy and loader type. Sequence diagram for SQLSink setup and batch loading using strategiessequenceDiagram
actor Target
participant SQLTarget
participant SQLSink
participant SQLConnector
participant LoadMethodStrategy
participant Loader
participant Database as DB
Target->>SQLTarget: run_sync()
SQLTarget->>SQLSink: setup()
activate SQLSink
SQLSink->>SQLConnector: prepare_schema(schema_name)
deactivate SQLSink
note over SQLSink,SQLConnector: Initialize and validate load strategy
activate SQLSink
SQLSink->>SQLSink: load_strategy (property access)
alt strategy not yet created
SQLSink->>SQLConnector: _create_load_strategy(sink)
activate SQLConnector
SQLConnector->>SQLConnector: read config.load_method
SQLConnector-->>SQLSink: new AppendOnlyStrategy | OverwriteStrategy | UpsertStrategy
deactivate SQLConnector
SQLSink->>LoadMethodStrategy: validate_config()
SQLSink->>SQLSink: cache _load_strategy
else strategy already cached
SQLSink-->>SQLSink: reuse existing _load_strategy
end
deactivate SQLSink
note over SQLSink,LoadMethodStrategy: Table preparation delegated to strategy
activate SQLSink
SQLSink->>LoadMethodStrategy: prepare_table(full_table_name, schema, key_properties)
activate LoadMethodStrategy
alt AppendOnlyStrategy
LoadMethodStrategy->>SQLConnector: table_exists(full_table_name)
alt table missing
SQLConnector->>SQLConnector: create_empty_table(...)
else table exists
loop each property
LoadMethodStrategy->>SQLConnector: prepare_column(...)
end
LoadMethodStrategy->>SQLConnector: prepare_primary_key(...)
end
else OverwriteStrategy
LoadMethodStrategy->>SQLConnector: table_exists(full_table_name)
alt first time or table missing
LoadMethodStrategy->>SQLConnector: parse_full_table_name(...)
LoadMethodStrategy->>SQLConnector: drop existing table via SQLAlchemy
LoadMethodStrategy->>SQLConnector: create_empty_table(...)
else already prepared
loop each property
LoadMethodStrategy->>SQLConnector: prepare_column(...)
end
LoadMethodStrategy->>SQLConnector: prepare_primary_key(...)
end
else UpsertStrategy
LoadMethodStrategy->>SQLConnector: table_exists(full_table_name)
alt table missing
SQLConnector->>SQLConnector: create_empty_table(...)
else table exists
loop each property
LoadMethodStrategy->>SQLConnector: prepare_column(...)
end
LoadMethodStrategy->>SQLConnector: prepare_primary_key(...)
end
end
deactivate LoadMethodStrategy
deactivate SQLSink
note over SQLSink,Loader: Batch processing via loader abstraction
SQLTarget->>SQLSink: process_batch(context)
activate SQLSink
SQLSink->>LoadMethodStrategy: load_batch(full_table_name, schema, context.records)
activate LoadMethodStrategy
LoadMethodStrategy->>Loader: load_records(full_table_name, schema, records)
activate Loader
Loader->>DB: execute INSERT / DELETE+INSERT / MERGE
DB-->>Loader: rowcount
Loader-->>LoadMethodStrategy: records_loaded
deactivate Loader
LoadMethodStrategy-->>SQLSink: records_loaded
deactivate LoadMethodStrategy
SQLSink-->>SQLTarget: tally and continue
deactivate SQLSink
Class diagram for SQL load strategies and loadersclassDiagram
class SQLConnector {
- dict config
- dict _tables_prepared
- LoadMethodStrategy _load_strategy
+ jsonschema_to_sql() JSONSchemaToSQL
+ _create_load_strategy(sink: SQLSink) LoadMethodStrategy
+ prepare_table(full_table_name: str, schema: dict, primary_keys: Sequence~str~, partition_keys: list~str~, as_temp_table: bool) void
+ _prepare_table_legacy(full_table_name: str | FullyQualifiedName, schema: dict, primary_keys: Sequence~str~, partition_keys: list~str~ | None, as_temp_table: bool) void
+ table_exists(full_table_name: str) bool
+ create_empty_table(full_table_name: str, schema: dict, primary_keys: Sequence~str~, as_temp_table: bool) void
+ prepare_column(full_table_name: str, property_name: str, sql_type: str) void
+ prepare_primary_key(full_table_name: str, primary_keys: Sequence~str~) void
+ parse_full_table_name(full_table_name: str) tuple
+ allow_overwrite bool
+ allow_temp_tables bool
+ allow_merge_upsert bool
}
class SQLSink {
- SQLConnector _connector
- LoadMethodStrategy _load_strategy
+ connector() SQLConnector
+ load_strategy() LoadMethodStrategy
+ setup() void
+ process_batch(context: dict) void
+ schema dict
+ key_properties Sequence~str~
+ conform_name(name: str, object_type: str) str
+ merge_upsert_from_table(target_table_name: str, from_table_name: str, join_keys: list~str~) int
}
class LoadMethodStrategy {
<<abstract>>
- SQLConnector connector
- SQLSink sink
- Logger logger
- Loader loader
+ _create_loader() Loader
+ prepare_table(full_table_name: str, schema: dict, primary_keys: Sequence~str~) void
+ load_batch(full_table_name: str, schema: dict, records: Iterable~dict~) int | None
+ validate_config() void
}
class AppendOnlyStrategy {
+ _create_loader() Loader
+ prepare_table(full_table_name: str, schema: dict, primary_keys: Sequence~str~) void
+ validate_config() void
}
class OverwriteStrategy {
+ _create_loader() Loader
+ prepare_table(full_table_name: str, schema: dict, primary_keys: Sequence~str~) void
+ validate_config() void
}
class UpsertStrategy {
+ _create_loader() Loader
+ prepare_table(full_table_name: str, schema: dict, primary_keys: Sequence~str~) void
+ validate_config() void
}
class Loader {
<<abstract>>
- Engine engine
- dict schema
- Sequence~str~ key_properties
- Callable conform_name
- Logger logger
+ load_records(full_table_name: str, schema: dict, records: Iterable~dict~) int | None
}
class SimpleInsertLoader {
+ load_records(full_table_name: str, schema: dict, records: Iterable~dict~) int | None
}
class TempTableUpsertLoader {
- Callable temp_table_creator
+ load_records(full_table_name: str, schema: dict, records: Iterable~dict~) int | None
+ _create_temp_table_default(temp_table_name: str, schema: dict, engine: Engine) void
}
class MergeUpsertLoader {
- Callable temp_table_creator
- Callable merge_function
+ load_records(full_table_name: str, schema: dict, records: Iterable~dict~) int | None
+ _create_temp_table_default(temp_table_name: str, schema: dict, engine: Engine) void
}
SQLConnector o-- LoadMethodStrategy : _load_strategy
SQLSink o-- LoadMethodStrategy : _load_strategy
LoadMethodStrategy o-- Loader : loader
LoadMethodStrategy --> SQLConnector : uses
LoadMethodStrategy --> SQLSink : uses
LoadMethodStrategy <|-- AppendOnlyStrategy
LoadMethodStrategy <|-- OverwriteStrategy
LoadMethodStrategy <|-- UpsertStrategy
Loader <|-- SimpleInsertLoader
Loader <|-- TempTableUpsertLoader
Loader <|-- MergeUpsertLoader
SQLSink --> SQLConnector : connector
SQLConnector --> SQLSink : _create_load_strategy(sink)
Flow diagram for selecting load method strategy and upsert loaderflowchart TD
A[Start: resolve load strategy] --> B[Read connector.config.load_method]
B --> C{load_method value}
C --> D[Use AppendOnlyStrategy]:::strategy_label
C --> E[Use OverwriteStrategy]:::strategy_label
C --> F[Use UpsertStrategy]:::strategy_label
classDef strategy_label fill:#eef,stroke:#333,stroke-width:1px
subgraph Strategy_factory_in_SQLConnector
D --> G[Instantiate AppendOnlyStrategy with connector and sink]
E --> H[Instantiate OverwriteStrategy with connector and sink]
F --> I[Instantiate UpsertStrategy with connector and sink]
G --> J[Return strategy to SQLSink]
H --> J
I --> J
end
J --> K[SQLSink.load_strategy caches strategy and calls validate_config]
subgraph UpsertStrategy__create_loader
I --> L{Custom merge_upsert_from_table implemented on sink?}
L --> M[Yes: create temp_table_creator wrapper calling connector.create_empty_table with as_temp_table True]
L --> N[No: create temp_table_creator wrapper calling connector.create_empty_table with as_temp_table True]
M --> O[Create merge_function wrapper calling sink.merge_upsert_from_table]
O --> P[Instantiate MergeUpsertLoader with engine, schema, key_properties, conform_name, logger, temp_table_creator, merge_function]
N --> Q[Instantiate TempTableUpsertLoader with engine, schema, key_properties, conform_name, logger, temp_table_creator]
end
P --> R[Assign loader on UpsertStrategy]
Q --> R
R --> S[Strategy ready: load_batch delegates to loader.load_records]
K --> S
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
Documentation build overview
Show files changed (2 files in total): 📝 2 modified | ➕ 0 added | ➖ 0 deleted
|
Codecov Report❌ Patch coverage is ❌ Your patch check has failed because the patch coverage (87.73%) is below the target coverage (100.00%). You can increase the patch coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## main #3402 +/- ##
==========================================
- Coverage 93.90% 93.42% -0.48%
==========================================
Files 69 71 +2
Lines 5774 6041 +267
Branches 716 737 +21
==========================================
+ Hits 5422 5644 +222
- Misses 248 291 +43
- Partials 104 106 +2
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
CodSpeed Performance ReportMerging #3402 will not alter performanceComparing Summary
Footnotes |
e13bf00 to
9c66c36
Compare
Signed-off-by: Edgar Ramírez Mondragón <edgarrm358@gmail.com>
4a5ae2c to
3c09e48
Compare
Summary by Sourcery
Introduce pluggable SQL load strategies and loaders to support append-only, overwrite, and upsert load methods with backward-compatible behavior.
New Features:
Enhancements:
Tests: