Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 66 additions & 30 deletions benchmarks/tpch/bodo_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,40 +9,64 @@
import argparse
import time

import numpy as np
import pandas as pd

import bodo


@bodo.jit(cache=True)
def run_queries(data_folder):
def run_queries(data_folder, queries):
t1 = time.time()
q01(data_folder)
q02(data_folder)
q03(data_folder)
q04(data_folder)
q05(data_folder)
q06(data_folder)
q07(data_folder)
q08(data_folder)
q09(data_folder)
q10(data_folder)
q11(data_folder)
q12(data_folder)
q13(data_folder)
q14(data_folder)
q15(data_folder)
q16(data_folder)
q17(data_folder)
q18(data_folder)
q19(data_folder)
q20(data_folder)
q21(data_folder)
q22(data_folder)
for i in range(len(queries)):
if queries[i] == 1:
q01(data_folder)
elif queries[i] == 2:
q02(data_folder)
elif queries[i] == 3:
q03(data_folder)
elif queries[i] == 4:
q04(data_folder)
elif queries[i] == 5:
q05(data_folder)
elif queries[i] == 6:
q06(data_folder)
elif queries[i] == 7:
q07(data_folder)
elif queries[i] == 8:
q08(data_folder)
elif queries[i] == 9:
q09(data_folder)
elif queries[i] == 10:
q10(data_folder)
elif queries[i] == 11:
q11(data_folder)
elif queries[i] == 12:
q12(data_folder)
elif queries[i] == 13:
q13(data_folder)
elif queries[i] == 14:
q14(data_folder)
elif queries[i] == 15:
q15(data_folder)
elif queries[i] == 16:
q16(data_folder)
elif queries[i] == 17:
q17(data_folder)
elif queries[i] == 18:
q18(data_folder)
elif queries[i] == 19:
q19(data_folder)
elif queries[i] == 20:
q20(data_folder)
elif queries[i] == 21:
q21(data_folder)
elif queries[i] == 22:
q22(data_folder)
print("Total Query time (s): ", time.time() - t1)


@bodo.jit
@bodo.jit(cache=True)
def load_lineitem(data_folder):
t0 = time.time()
data_path = data_folder + "/lineitem.pq"
Expand All @@ -53,7 +77,7 @@ def load_lineitem(data_folder):
return df


@bodo.jit
@bodo.jit(cache=True)
def load_part(data_folder):
t0 = time.time()
data_path = data_folder + "/part.pq"
Expand All @@ -64,7 +88,7 @@ def load_part(data_folder):
return df


@bodo.jit
@bodo.jit(cache=True)
def load_orders(data_folder):
t0 = time.time()
data_path = data_folder + "/orders.pq"
Expand All @@ -75,7 +99,7 @@ def load_orders(data_folder):
return df


@bodo.jit
@bodo.jit(cache=True)
def load_customer(data_folder):
t0 = time.time()
data_path = data_folder + "/customer.pq"
Expand Down Expand Up @@ -110,7 +134,7 @@ def load_region(data_folder):
return df


@bodo.jit
@bodo.jit(cache=True)
def load_supplier(data_folder):
t0 = time.time()
data_path = data_folder + "/supplier.pq"
Expand All @@ -121,7 +145,7 @@ def load_supplier(data_folder):
return df


@bodo.jit
@bodo.jit(cache=True)
def load_partsupp(data_folder):
t0 = time.time()
data_path = data_folder + "/partsupp.pq"
Expand Down Expand Up @@ -1099,9 +1123,21 @@ def main():
default="s3://bodo-example-data/tpch/SF1",
help="The folder containing TPCH data",
)
parser.add_argument(
"--queries",
type=int,
nargs="+",
required=False,
help="Space separated TPC-H queries to run.",
)
args = parser.parse_args()
folder = args.folder
run_queries(folder)
queries = list(range(1, 23))
if args.queries is not None:
queries = args.queries
print(f"Queries to run: {queries}")
qarr = np.array(queries)
run_queries(folder, qarr)


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/tpch/dataframe_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -941,7 +941,7 @@ def main():
type=int,
nargs="+",
required=False,
help="Comma separated TPC-H queries to run.",
help="Space separated TPC-H queries to run.",
)
parser.add_argument(
"--scale_factor",
Expand Down
41 changes: 33 additions & 8 deletions bodo/pandas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,14 +490,39 @@ def to_datetime(
)

# 2. Series Case
return _get_series_func_plan(
arg._plan,
new_metadata,
"pandas.to_datetime",
(),
in_kwargs,
is_method=False,
)
if (
errors == "raise"
and dayfirst is False
and yearfirst is False
and utc is False
and unit is None
and origin == "unix"
and cache is True
):
# If only options supported by Bodo JIT then run as cfunc over map.
import bodo.decorators # isort:skip # noqa

if format is None:

def bodo_df_lib_to_datetime(x):
return pd.to_datetime(x)

return arg.map(bodo_df_lib_to_datetime, na_action="ignore")
else:

def bodo_df_lib_to_datetime_format(x):
return pd.to_datetime(x, format=format)

return arg.map(bodo_df_lib_to_datetime_format, na_action="ignore")
else:
return _get_series_func_plan(
arg._plan,
new_metadata,
"pandas.to_datetime",
(),
in_kwargs,
is_method=False,
)


@check_args_fallback(unsupported="all")
Expand Down
2 changes: 1 addition & 1 deletion bodo/pandas/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def bfs_duplicate(self):
else:
# Remember we encountered this node.
visited.add(id(node))
if isinstance(node, LogicalComparisonJoin):
if isinstance(node, (LogicalComparisonJoin, LogicalCrossProduct)):
# For comparison join, the first two args contain source plans.
for arg in node.args[0:2]:
if isinstance(arg, LazyPlan):
Expand Down
9 changes: 7 additions & 2 deletions bodo/tests/test_df_lib/test_end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ def test_read_parquet_series_len_shape(datapath):
assert bodo_out.shape == py_out.shape


@pytest.mark.jit_dependency
def test_read_parquet_filter_projection(datapath):
"""Test TPC-H Q6 bug where filter and projection pushed down to read parquet
and filter column isn't used anywhere in the query.
Expand Down Expand Up @@ -559,6 +560,7 @@ def test_filter_string(datapath):
)


@pytest.mark.jit_dependency
@pytest.mark.parametrize(
"op", [operator.eq, operator.ne, operator.gt, operator.lt, operator.ge, operator.le]
)
Expand Down Expand Up @@ -590,6 +592,7 @@ def test_filter_datetime_pushdown(datapath, op):
)


@pytest.mark.jit_dependency
@pytest.mark.parametrize(
"op", [operator.eq, operator.ne, operator.gt, operator.lt, operator.ge, operator.le]
)
Expand Down Expand Up @@ -3203,7 +3206,8 @@ def test_series_reset_index_pipeline():
bds,
pds,
check_pandas_types=False,
reset_index=False,
reset_index=True,
sort_output=True,
)

long_array = [
Expand All @@ -3230,7 +3234,8 @@ def test_series_reset_index_pipeline():
bds,
pds,
check_pandas_types=False,
reset_index=False,
reset_index=True,
sort_output=True,
)


Expand Down
1 change: 1 addition & 0 deletions bodo/tests/test_df_lib/test_top_level.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def bodo_func():
_test_equal(out_bd, out_pd, check_pandas_types=False, check_names=False)


@pytest.mark.jit_dependency
def test_top_level_to_datetime():
with assert_executed_plan_count(0):
# Single column string case
Expand Down
6 changes: 3 additions & 3 deletions bodo/tests/test_df_lib/test_tpch.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def test_tpch_q10():


def test_tpch_q11():
run_tpch_query_test(tpch.tpch_q11)
run_tpch_query_test(tpch.tpch_q11, ctes_created=1)


def test_tpch_q12():
Expand All @@ -119,7 +119,7 @@ def test_tpch_q14():


def test_tpch_q15():
run_tpch_query_test(tpch.tpch_q15)
run_tpch_query_test(tpch.tpch_q15, ctes_created=1)


def test_tpch_q16():
Expand Down Expand Up @@ -147,4 +147,4 @@ def test_tpch_q21():


def test_tpch_q22():
run_tpch_query_test(tpch.tpch_q22)
run_tpch_query_test(tpch.tpch_q22, ctes_created=1)
36 changes: 28 additions & 8 deletions bodo/utils/conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,15 +259,35 @@ def impl(data): # pragma: no cover
) # pragma: no cover
elif data.dtype == bodo.types.datetime64ns:
if data.layout != "C":
return lambda data: bodo.libs.pd_datetime_arr_ext.init_datetime_array(
np.ascontiguousarray(data),
np.full((len(data) + 7) >> 3, 255, np.uint8),
None,
) # pragma: no cover

def func(data):
new_bitmask = np.full((len(data) + 7) >> 3, 255, np.uint8)

for i in range(len(data)):
bodo.libs.int_arr_ext.set_bit_to_arr(
new_bitmask, i, 0 if np.isnat(data[i]) else 1
)

return bodo.libs.pd_datetime_arr_ext.init_datetime_array(
np.ascontiguousarray(data), new_bitmask, None
)

return func
else:
return lambda data: bodo.libs.pd_datetime_arr_ext.init_datetime_array(
data, np.full((len(data) + 7) >> 3, 255, np.uint8), None
) # pragma: no cover

def func(data):
new_bitmask = np.full((len(data) + 7) >> 3, 255, np.uint8)

for i in range(len(data)):
bodo.libs.int_arr_ext.set_bit_to_arr(
new_bitmask, i, 0 if np.isnat(data[i]) else 1
)

return bodo.libs.pd_datetime_arr_ext.init_datetime_array(
data, new_bitmask, None
)

return func

raise BodoError(
f"np_to_nullable_array: invalid dtype {data.dtype}, integer, bool or float dtype expected"
Expand Down