This repository was archived by the owner on Sep 2, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 240
Add spark session connection #279
Merged
jtcohen6
merged 60 commits into
dbt-labs:main
from
JCZuurmond:add-spark-session-connection
Mar 26, 2022
Merged
Changes from 59 commits
Commits
Show all changes
60 commits
Select commit
Hold shift + click to select a range
23a114e
Add session module
JCZuurmond 1a19d96
Add session connection method
JCZuurmond cd707b1
Add session extras to setup.py
JCZuurmond dbee595
Add check for session method
JCZuurmond 195e925
Add session connection wrapper
JCZuurmond ba1b5d9
Add sessioin to connection manager
JCZuurmond 7f8e88d
Remove unused imports
JCZuurmond b3db680
Add spark session dbtspec
JCZuurmond 1a040d7
Add tox spark session environment
JCZuurmond 97b562a
Add missing settings to dbt spec
JCZuurmond eec242c
Install session requirements
JCZuurmond 7162ee6
Add tox spark session to circle ci
JCZuurmond ff421db
Add pytest spark as test requirement
JCZuurmond 10267b1
Add fixutre to force use spark session
JCZuurmond 0c39bf1
Add pytest ini
JCZuurmond 0bd8499
Update passenv in tox
JCZuurmond 1926a54
Set catalog implementation to hive
JCZuurmond 8969056
Make separate session connection wrapper
JCZuurmond 4163251
Format parameters
JCZuurmond 425a002
Run spark session before thrift
JCZuurmond 4573733
Add spark to dev requirements
JCZuurmond 61119c6
Fix session module
JCZuurmond 0b88252
Bump Spark session python version
JCZuurmond 4a3008d
Change docker image for spark session
JCZuurmond 65aafcc
Install python3
JCZuurmond 546d509
Update ci
JCZuurmond 4fb252c
Remove spark fixture
JCZuurmond e962364
Move session connection wrapper to session module
JCZuurmond 15aaa7c
Disable tests that require hive support
JCZuurmond 94fb929
Format
JCZuurmond 1d69d63
Change python 3 to python 3.8
JCZuurmond f1abb75
Install non-python dependencies
JCZuurmond 48ac80d
Remove dev-requirements
JCZuurmond 5791dc9
Remove pytest ini
JCZuurmond 99f44e3
Update the install
JCZuurmond 237bc56
Add session method to change log
JCZuurmond 5d26e1b
Do not pin sasl version
JCZuurmond 3643f50
Delete spark session test profile
JCZuurmond 0d9ef41
Add postgres container for hive support
JCZuurmond edfa4d0
Enable all session tests
JCZuurmond c2ea0b2
Enable hive support
JCZuurmond 311928a
Add delta as file format
JCZuurmond cd0ed28
Use equals in spark defaults
JCZuurmond f2d4baa
Change reference to find spark home
JCZuurmond b53bdc3
Copy configs in one go
JCZuurmond 9a41af7
List spark conf
JCZuurmond 7196cd7
Let session test be the same as thrift
JCZuurmond fda5b01
Update spark defaults
JCZuurmond 1164a81
Enable error logging on postgres
JCZuurmond d5e6bd6
Remove ls
JCZuurmond afce326
Add port to connection url
JCZuurmond 3ff142a
Do not copy spark config
JCZuurmond 1efabb3
Print postgres
JCZuurmond 63ecb64
Remove postgres logging from thrift
JCZuurmond 85d23b8
Remove postgres from spark session tests
JCZuurmond 902b993
Change connection url back to dbt-hive-metastore
JCZuurmond ef3c35b
Revert Spark defaults changes
JCZuurmond 40504cf
Disable tests and explain why
JCZuurmond 37ebe30
Move change log to top of file
JCZuurmond 5d745ff
Move contributor note up in changelog
jtcohen6 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,221 @@ | ||
| """Spark session integration.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import datetime as dt | ||
| from types import TracebackType | ||
| from typing import Any | ||
|
|
||
| from dbt.events import AdapterLogger | ||
| from dbt.utils import DECIMALS | ||
| from pyspark.sql import DataFrame, Row, SparkSession | ||
|
|
||
|
|
||
| logger = AdapterLogger("Spark") | ||
| NUMBERS = DECIMALS + (int, float) | ||
|
|
||
|
|
||
| class Cursor: | ||
| """ | ||
| Mock a pyodbc cursor. | ||
|
|
||
| Source | ||
| ------ | ||
| https://github.com/mkleehammer/pyodbc/wiki/Cursor | ||
| """ | ||
|
|
||
| def __init__(self) -> None: | ||
| self._df: DataFrame | None = None | ||
| self._rows: list[Row] | None = None | ||
|
|
||
| def __enter__(self) -> Cursor: | ||
| return self | ||
|
|
||
| def __exit__( | ||
| self, | ||
| exc_type: type[BaseException] | None, | ||
| exc_val: Exception | None, | ||
| exc_tb: TracebackType | None, | ||
| ) -> bool: | ||
| self.close() | ||
| return True | ||
|
|
||
| @property | ||
| def description( | ||
| self, | ||
| ) -> list[tuple[str, str, None, None, None, None, bool]]: | ||
| """ | ||
| Get the description. | ||
|
|
||
| Returns | ||
| ------- | ||
| out : list[tuple[str, str, None, None, None, None, bool]] | ||
| The description. | ||
|
|
||
| Source | ||
| ------ | ||
| https://github.com/mkleehammer/pyodbc/wiki/Cursor#description | ||
| """ | ||
| if self._df is None: | ||
| description = list() | ||
| else: | ||
| description = [ | ||
| ( | ||
| field.name, | ||
| field.dataType.simpleString(), | ||
| None, | ||
| None, | ||
| None, | ||
| None, | ||
| field.nullable, | ||
| ) | ||
| for field in self._df.schema.fields | ||
| ] | ||
| return description | ||
|
|
||
| def close(self) -> None: | ||
| """ | ||
| Close the connection. | ||
|
|
||
| Source | ||
| ------ | ||
| https://github.com/mkleehammer/pyodbc/wiki/Cursor#close | ||
| """ | ||
| self._df = None | ||
| self._rows = None | ||
|
|
||
| def execute(self, sql: str, *parameters: Any) -> None: | ||
| """ | ||
| Execute a sql statement. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| sql : str | ||
| Execute a sql statement. | ||
| *parameters : Any | ||
| The parameters. | ||
|
|
||
| Raises | ||
| ------ | ||
| NotImplementedError | ||
| If there are parameters given. We do not format sql statements. | ||
|
|
||
| Source | ||
| ------ | ||
| https://github.com/mkleehammer/pyodbc/wiki/Cursor#executesql-parameters | ||
| """ | ||
| if len(parameters) > 0: | ||
| sql = sql % parameters | ||
| spark_session = SparkSession.builder.enableHiveSupport().getOrCreate() | ||
| self._df = spark_session.sql(sql) | ||
JCZuurmond marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| def fetchall(self) -> list[Row] | None: | ||
| """ | ||
| Fetch all data. | ||
|
|
||
| Returns | ||
| ------- | ||
| out : list[Row] | None | ||
| The rows. | ||
|
|
||
| Source | ||
| ------ | ||
| https://github.com/mkleehammer/pyodbc/wiki/Cursor#fetchall | ||
| """ | ||
| if self._rows is None and self._df is not None: | ||
| self._rows = self._df.collect() | ||
| return self._rows | ||
|
|
||
| def fetchone(self) -> Row | None: | ||
| """ | ||
| Fetch the first output. | ||
|
|
||
| Returns | ||
| ------- | ||
| out : Row | None | ||
| The first row. | ||
|
|
||
| Source | ||
| ------ | ||
| https://github.com/mkleehammer/pyodbc/wiki/Cursor#fetchone | ||
| """ | ||
| if self._rows is None and self._df is not None: | ||
| self._rows = self._df.collect() | ||
|
|
||
| if self._rows is not None and len(self._rows) > 0: | ||
| row = self._rows.pop(0) | ||
| else: | ||
| row = None | ||
|
|
||
| return row | ||
|
|
||
|
|
||
| class Connection: | ||
| """ | ||
| Mock a pyodbc connection. | ||
|
|
||
| Source | ||
| ------ | ||
| https://github.com/mkleehammer/pyodbc/wiki/Connection | ||
| """ | ||
|
|
||
| def cursor(self) -> Cursor: | ||
| """ | ||
| Get a cursor. | ||
|
|
||
| Returns | ||
| ------- | ||
| out : Cursor | ||
| The cursor. | ||
| """ | ||
| return Cursor() | ||
|
|
||
|
|
||
| class SessionConnectionWrapper(object): | ||
| """Connection wrapper for the sessoin connection method.""" | ||
|
|
||
| def __init__(self, handle): | ||
| self.handle = handle | ||
| self._cursor = None | ||
|
|
||
| def cursor(self): | ||
| self._cursor = self.handle.cursor() | ||
| return self | ||
|
|
||
| def cancel(self): | ||
| logger.debug("NotImplemented: cancel") | ||
|
|
||
| def close(self): | ||
| if self._cursor: | ||
| self._cursor.close() | ||
|
|
||
| def rollback(self, *args, **kwargs): | ||
| logger.debug("NotImplemented: rollback") | ||
|
|
||
| def fetchall(self): | ||
| return self._cursor.fetchall() | ||
|
|
||
| def execute(self, sql, bindings=None): | ||
| if sql.strip().endswith(";"): | ||
| sql = sql.strip()[:-1] | ||
|
|
||
| if bindings is None: | ||
| self._cursor.execute(sql) | ||
| else: | ||
| bindings = [self._fix_binding(binding) for binding in bindings] | ||
| self._cursor.execute(sql, *bindings) | ||
|
|
||
| @property | ||
| def description(self): | ||
| return self._cursor.description | ||
|
|
||
| @classmethod | ||
| def _fix_binding(cls, value): | ||
| """Convert complex datatypes to primitives that can be loaded by | ||
| the Spark driver""" | ||
| if isinstance(value, NUMBERS): | ||
| return float(value) | ||
| elif isinstance(value, dt.datetime): | ||
| return f"'{value.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}'" | ||
| else: | ||
| return f"'{value}'" | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,5 +16,5 @@ pytest-csv | |
|
|
||
| # Test requirements | ||
| pytest-dbt-adapter==0.6.0 | ||
| sasl==0.2.1 | ||
| sasl>=0.2.1 | ||
| thrift_sasl==0.4.1 | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,10 +1,4 @@ | ||
| def pytest_configure(config): | ||
| config.addinivalue_line( | ||
| "markers", "profile_databricks_cluster" | ||
| ) | ||
| config.addinivalue_line( | ||
| "markers", "profile_databricks_sql_endpoint" | ||
| ) | ||
| config.addinivalue_line( | ||
| "markers", "profile_apache_spark" | ||
| ) | ||
| config.addinivalue_line("markers", "profile_databricks_cluster") | ||
| config.addinivalue_line("markers", "profile_databricks_sql_endpoint") | ||
| config.addinivalue_line("markers", "profile_apache_spark") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,17 @@ | ||
| target: | ||
| type: spark | ||
| method: session | ||
| host: localhost | ||
| schema: "analytics_{{ var('_dbt_random_suffix') }}" | ||
| sequences: | ||
| test_dbt_empty: empty | ||
| # requires a metastore for persisting over dbt runs | ||
| # test_dbt_base: base | ||
| # test_dbt_ephemeral: ephemeral | ||
| # test_dbt_incremental: incremental | ||
| # snapshots require delta format | ||
| # test_dbt_snapshot_strategy_timestamp: snapshot_strategy_timestamp | ||
| # test_dbt_snapshot_strategy_check_cols: snapshot_strategy_check_cols | ||
| test_dbt_data_test: data_test | ||
| test_dbt_schema_test: schema_test | ||
| test_dbt_ephemeral_data_tests: data_test_ephemeral_models |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.