diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index 283da2518..3c066c02f 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -90,13 +90,19 @@ jobs:
- name: Install sqlalchemy and docker pkg for postgres test
shell: bash -l {0}
run: |
- # explicitly install docker, fugue and sqlalchemy package
- # Also install ciso8601 (needed by docker) via conda, as the pip installation fails.
- mamba install sqlalchemy psycopg2 ciso8601 -c conda-forge
- pip install docker "fugue[sql]>=0.5.3"
- pip install mlflow
- pip install tpot
- pip install dask-ml
+ # explicitly install docker, fugue and other packages
+ mamba install \
+ sqlalchemy>=1.4.23 \
+ pyhive>=0.6.4 \
+ psycopg2>=2.9.1 \
+ ciso8601>=2.2.0 \
+ tpot>=0.11.7 \
+ mlflow>=1.19.0 \
+ docker-py>=5.0.0 \
+ -c conda-forge
+ pip install "fugue[sql]>=0.5.3"
+ docker pull bde2020/hive:2.3.2-postgresql-metastore
+ docker pull bde2020/hive-metastore-postgresql:2.3.0
if: matrix.os == 'ubuntu-latest'
- name: Install Java (again) and test with pytest
shell: bash -l {0}
@@ -118,7 +124,10 @@ jobs:
# Use always() to always run this step to publish test results when there are test failures
if: ${{ always() }}
- name: Upload coverage to Codecov
- uses: codecov/codecov-action@v1
+ uses: codecov/codecov-action@v2
+ with:
+ fail_ci_if_error: true
+ token: ${{ secrets.CODECOV_TOKEN }}
test_independent:
name: "Test in a dask cluster"
needs: build
diff --git a/dask_sql/input_utils/hive.py b/dask_sql/input_utils/hive.py
index 9e2173bdc..1feacce1d 100644
--- a/dask_sql/input_utils/hive.py
+++ b/dask_sql/input_utils/hive.py
@@ -13,7 +13,7 @@
try:
import sqlalchemy
-except ImportError:
+except ImportError: # pragma: no cover
sqlalchemy = None
from dask_sql.input_utils.base import BaseInputPlugin
@@ -35,9 +35,7 @@ def is_correct_input(
return is_sqlalchemy_hive or is_hive_cursor or format == "hive"
- def to_dc(
- self, input_item: Any, table_name: str, format: str = None, **kwargs
- ): # pragma: no cover
+ def to_dc(self, input_item: Any, table_name: str, format: str = None, **kwargs):
table_name = kwargs.pop("hive_table_name", table_name)
schema = kwargs.pop("hive_schema_name", "default")
@@ -65,14 +63,16 @@ def to_dc(
if "InputFormat" in storage_information:
format = storage_information["InputFormat"].split(".")[-1]
# databricks format is different, see https://github.com/dask-contrib/dask-sql/issues/83
- elif "InputFormat" in table_information:
+ elif "InputFormat" in table_information: # pragma: no cover
format = table_information["InputFormat"].split(".")[-1]
- else:
+ else: # pragma: no cover
raise RuntimeError(
"Do not understand the output of 'DESCRIBE FORMATTED
'"
)
- if format == "TextInputFormat" or format == "SequenceFileInputFormat":
+ if (
+ format == "TextInputFormat" or format == "SequenceFileInputFormat"
+ ): # pragma: no cover
storage_description = storage_information.get("Storage Desc Params", {})
read_function = partial(
dd.read_csv,
@@ -81,15 +81,17 @@ def to_dc(
)
elif format == "ParquetInputFormat" or format == "MapredParquetInputFormat":
read_function = dd.read_parquet
- elif format == "OrcInputFormat":
+ elif format == "OrcInputFormat": # pragma: no cover
read_function = dd.read_orc
- elif format == "JsonInputFormat":
+ elif format == "JsonInputFormat": # pragma: no cover
read_function = dd.read_json
- else:
+ else: # pragma: no cover
raise AttributeError(f"Do not understand hive's table format {format}")
def _normalize(loc):
- if loc.startswith("dbfs:/") and not loc.startswith("dbfs://"):
+ if loc.startswith("dbfs:/") and not loc.startswith(
+ "dbfs://"
+ ): # pragma: no cover
# dask (or better: fsspec) needs to have the URL in a specific form
# starting with two // after the protocol
loc = f"dbfs://{loc.lstrip('dbfs:')}"
@@ -102,6 +104,19 @@ def _normalize(loc):
def wrapped_read_function(location, column_information, **kwargs):
location = _normalize(location)
logger.debug(f"Reading in hive data from {location}")
+ if format == "ParquetInputFormat" or format == "MapredParquetInputFormat":
+ # Hack needed for parquet files.
+ # If the folder structure is like .../col=3/...
+ # parquet wants to read in the partition information.
+ # However, we add the partition information by ourself
+ # which will lead to problems afterwards
+ # Therefore tell parquet to only read in the columns
+ # we actually care right now
+ kwargs.setdefault("columns", list(column_information.keys()))
+ else: # pragma: no cover
+ # prevent python to optimize it away and make coverage not respect the
+ # pragma
+ dummy = 0
df = read_function(location, **kwargs)
logger.debug(f"Applying column information: {column_information}")
@@ -165,7 +180,7 @@ def _parse_hive_table_description(
schema: str,
table_name: str,
partition: str = None,
- ): # pragma: no cover
+ ):
"""
Extract all information from the output
of the DESCRIBE FORMATTED call, which is unfortunately
@@ -207,7 +222,7 @@ def _parse_hive_table_description(
elif key == "# Partition Information":
mode = "partition"
elif key.startswith("#"):
- mode = None
+ mode = None # pragma: no cover
elif key:
if not value:
value = dict()
@@ -223,6 +238,10 @@ def _parse_hive_table_description(
elif mode == "partition":
partition_information[key] = value
last_field = partition_information[key]
+ else: # pragma: no cover
+ # prevent python to optimize it away and make coverage not respect the
+ # pragma
+ dummy = 0
elif value and last_field is not None:
last_field[value] = value2
@@ -238,7 +257,7 @@ def _parse_hive_partition_description(
cursor: Union["sqlalchemy.engine.base.Connection", "hive.Cursor"],
schema: str,
table_name: str,
- ): # pragma: no cover
+ ):
"""
Extract all partition informaton for a given table
"""
@@ -251,7 +270,7 @@ def _fetch_all_results(
self,
cursor: Union["sqlalchemy.engine.base.Connection", "hive.Cursor"],
sql: str,
- ): # pragma: no cover
+ ):
"""
The pyhive.Cursor and the sqlalchemy connection behave slightly different.
The former has the fetchall method on the cursor,
@@ -261,5 +280,5 @@ def _fetch_all_results(
try:
return result.fetchall()
- except AttributeError:
+ except AttributeError: # pragma: no cover
return cursor.fetchall()
diff --git a/dask_sql/physical/rel/logical/filter.py b/dask_sql/physical/rel/logical/filter.py
index 7e37a45ff..41cf16951 100644
--- a/dask_sql/physical/rel/logical/filter.py
+++ b/dask_sql/physical/rel/logical/filter.py
@@ -18,7 +18,7 @@ def filter_or_scalar(df: dd.DataFrame, filter_condition: Union[np.bool_, dd.Seri
See https://github.com/dask-contrib/dask-sql/issues/87.
"""
if np.isscalar(filter_condition):
- if not filter_condition:
+ if not filter_condition: # pragma: no cover
# empty dataset
logger.warning("Join condition is always false - returning empty dataset")
return df.head(0, compute=False)
diff --git a/dask_sql/physical/rel/logical/window.py b/dask_sql/physical/rel/logical/window.py
index 2ecd10b24..ca23acfb1 100644
--- a/dask_sql/physical/rel/logical/window.py
+++ b/dask_sql/physical/rel/logical/window.py
@@ -93,6 +93,10 @@ def to_bound_description(
# Here, we do the de-referencing.
index = offset.getIndex() - constant_count_offset
offset = constants[index]
+ else: # pragma: no cover
+ # prevent python to optimize it away and make coverage not respect the
+ # pragma
+ dummy = 0
offset = int(RexLiteralPlugin().convert(offset, None, None))
else:
offset = None