Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,19 @@ jobs:
- name: Install sqlalchemy and docker pkg for postgres test
shell: bash -l {0}
run: |
# explicitly install docker, fugue and sqlalchemy package
# Also install ciso8601 (needed by docker) via conda, as the pip installation fails.
mamba install sqlalchemy psycopg2 ciso8601 -c conda-forge
pip install docker "fugue[sql]>=0.5.3"
pip install mlflow
pip install tpot
pip install dask-ml
# explicitly install docker, fugue and other packages
mamba install \
sqlalchemy>=1.4.23 \
pyhive>=0.6.4 \
psycopg2>=2.9.1 \
ciso8601>=2.2.0 \
tpot>=0.11.7 \
mlflow>=1.19.0 \
docker-py>=5.0.0 \
-c conda-forge
pip install "fugue[sql]>=0.5.3"
docker pull bde2020/hive:2.3.2-postgresql-metastore
docker pull bde2020/hive-metastore-postgresql:2.3.0
if: matrix.os == 'ubuntu-latest'
- name: Install Java (again) and test with pytest
shell: bash -l {0}
Expand All @@ -118,7 +124,10 @@ jobs:
# Use always() to always run this step to publish test results when there are test failures
if: ${{ always() }}
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v1
uses: codecov/codecov-action@v2
with:
fail_ci_if_error: true
token: ${{ secrets.CODECOV_TOKEN }}
test_independent:
name: "Test in a dask cluster"
needs: build
Expand Down
51 changes: 35 additions & 16 deletions dask_sql/input_utils/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

try:
import sqlalchemy
except ImportError:
except ImportError: # pragma: no cover
sqlalchemy = None

from dask_sql.input_utils.base import BaseInputPlugin
Expand All @@ -35,9 +35,7 @@ def is_correct_input(

return is_sqlalchemy_hive or is_hive_cursor or format == "hive"

def to_dc(
self, input_item: Any, table_name: str, format: str = None, **kwargs
): # pragma: no cover
def to_dc(self, input_item: Any, table_name: str, format: str = None, **kwargs):
table_name = kwargs.pop("hive_table_name", table_name)
schema = kwargs.pop("hive_schema_name", "default")

Expand Down Expand Up @@ -65,14 +63,16 @@ def to_dc(
if "InputFormat" in storage_information:
format = storage_information["InputFormat"].split(".")[-1]
# databricks format is different, see https://github.com/dask-contrib/dask-sql/issues/83
elif "InputFormat" in table_information:
elif "InputFormat" in table_information: # pragma: no cover
format = table_information["InputFormat"].split(".")[-1]
else:
else: # pragma: no cover
raise RuntimeError(
"Do not understand the output of 'DESCRIBE FORMATTED <table>'"
)

if format == "TextInputFormat" or format == "SequenceFileInputFormat":
if (
format == "TextInputFormat" or format == "SequenceFileInputFormat"
): # pragma: no cover
storage_description = storage_information.get("Storage Desc Params", {})
read_function = partial(
dd.read_csv,
Expand All @@ -81,15 +81,17 @@ def to_dc(
)
elif format == "ParquetInputFormat" or format == "MapredParquetInputFormat":
read_function = dd.read_parquet
elif format == "OrcInputFormat":
elif format == "OrcInputFormat": # pragma: no cover
read_function = dd.read_orc
elif format == "JsonInputFormat":
elif format == "JsonInputFormat": # pragma: no cover
read_function = dd.read_json
else:
else: # pragma: no cover
raise AttributeError(f"Do not understand hive's table format {format}")

def _normalize(loc):
if loc.startswith("dbfs:/") and not loc.startswith("dbfs://"):
if loc.startswith("dbfs:/") and not loc.startswith(
"dbfs://"
): # pragma: no cover
# dask (or better: fsspec) needs to have the URL in a specific form
# starting with two // after the protocol
loc = f"dbfs://{loc.lstrip('dbfs:')}"
Expand All @@ -102,6 +104,19 @@ def _normalize(loc):
def wrapped_read_function(location, column_information, **kwargs):
location = _normalize(location)
logger.debug(f"Reading in hive data from {location}")
if format == "ParquetInputFormat" or format == "MapredParquetInputFormat":
# Hack needed for parquet files.
# If the folder structure is like .../col=3/...
# parquet wants to read in the partition information.
# However, we add the partition information by ourself
# which will lead to problems afterwards
# Therefore tell parquet to only read in the columns
# we actually care right now
kwargs.setdefault("columns", list(column_information.keys()))
else: # pragma: no cover
# prevent python to optimize it away and make coverage not respect the
# pragma
dummy = 0
df = read_function(location, **kwargs)

logger.debug(f"Applying column information: {column_information}")
Expand Down Expand Up @@ -165,7 +180,7 @@ def _parse_hive_table_description(
schema: str,
table_name: str,
partition: str = None,
): # pragma: no cover
):
"""
Extract all information from the output
of the DESCRIBE FORMATTED call, which is unfortunately
Expand Down Expand Up @@ -207,7 +222,7 @@ def _parse_hive_table_description(
elif key == "# Partition Information":
mode = "partition"
elif key.startswith("#"):
mode = None
mode = None # pragma: no cover
elif key:
if not value:
value = dict()
Expand All @@ -223,6 +238,10 @@ def _parse_hive_table_description(
elif mode == "partition":
partition_information[key] = value
last_field = partition_information[key]
else: # pragma: no cover
# prevent python to optimize it away and make coverage not respect the
# pragma
dummy = 0
elif value and last_field is not None:
last_field[value] = value2

Expand All @@ -238,7 +257,7 @@ def _parse_hive_partition_description(
cursor: Union["sqlalchemy.engine.base.Connection", "hive.Cursor"],
schema: str,
table_name: str,
): # pragma: no cover
):
"""
Extract all partition informaton for a given table
"""
Expand All @@ -251,7 +270,7 @@ def _fetch_all_results(
self,
cursor: Union["sqlalchemy.engine.base.Connection", "hive.Cursor"],
sql: str,
): # pragma: no cover
):
"""
The pyhive.Cursor and the sqlalchemy connection behave slightly different.
The former has the fetchall method on the cursor,
Expand All @@ -261,5 +280,5 @@ def _fetch_all_results(

try:
return result.fetchall()
except AttributeError:
except AttributeError: # pragma: no cover
return cursor.fetchall()
2 changes: 1 addition & 1 deletion dask_sql/physical/rel/logical/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def filter_or_scalar(df: dd.DataFrame, filter_condition: Union[np.bool_, dd.Seri
See https://github.com/dask-contrib/dask-sql/issues/87.
"""
if np.isscalar(filter_condition):
if not filter_condition:
if not filter_condition: # pragma: no cover
# empty dataset
logger.warning("Join condition is always false - returning empty dataset")
return df.head(0, compute=False)
Expand Down
4 changes: 4 additions & 0 deletions dask_sql/physical/rel/logical/window.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ def to_bound_description(
# Here, we do the de-referencing.
index = offset.getIndex() - constant_count_offset
offset = constants[index]
else: # pragma: no cover
# prevent python to optimize it away and make coverage not respect the
# pragma
dummy = 0
offset = int(RexLiteralPlugin().convert(offset, None, None))
else:
offset = None
Expand Down