Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 66 additions & 30 deletions benchmarks/tpch/bodo_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,40 +9,64 @@
import argparse
import time

import numpy as np
import pandas as pd

import bodo


@bodo.jit(cache=True)
def run_queries(data_folder):
def run_queries(data_folder, queries):
t1 = time.time()
q01(data_folder)
q02(data_folder)
q03(data_folder)
q04(data_folder)
q05(data_folder)
q06(data_folder)
q07(data_folder)
q08(data_folder)
q09(data_folder)
q10(data_folder)
q11(data_folder)
q12(data_folder)
q13(data_folder)
q14(data_folder)
q15(data_folder)
q16(data_folder)
q17(data_folder)
q18(data_folder)
q19(data_folder)
q20(data_folder)
q21(data_folder)
q22(data_folder)
for i in range(len(queries)):
if queries[i] == 1:
q01(data_folder)
elif queries[i] == 2:
q02(data_folder)
elif queries[i] == 3:
q03(data_folder)
elif queries[i] == 4:
q04(data_folder)
elif queries[i] == 5:
q05(data_folder)
elif queries[i] == 6:
q06(data_folder)
elif queries[i] == 7:
q07(data_folder)
elif queries[i] == 8:
q08(data_folder)
elif queries[i] == 9:
q09(data_folder)
elif queries[i] == 10:
q10(data_folder)
elif queries[i] == 11:
q11(data_folder)
elif queries[i] == 12:
q12(data_folder)
elif queries[i] == 13:
q13(data_folder)
elif queries[i] == 14:
q14(data_folder)
elif queries[i] == 15:
q15(data_folder)
elif queries[i] == 16:
q16(data_folder)
elif queries[i] == 17:
q17(data_folder)
elif queries[i] == 18:
q18(data_folder)
elif queries[i] == 19:
q19(data_folder)
elif queries[i] == 20:
q20(data_folder)
elif queries[i] == 21:
q21(data_folder)
elif queries[i] == 22:
q22(data_folder)
print("Total Query time (s): ", time.time() - t1)


@bodo.jit
@bodo.jit(cache=True)
def load_lineitem(data_folder):
t0 = time.time()
data_path = data_folder + "/lineitem.pq"
Expand All @@ -53,7 +77,7 @@ def load_lineitem(data_folder):
return df


@bodo.jit
@bodo.jit(cache=True)
def load_part(data_folder):
t0 = time.time()
data_path = data_folder + "/part.pq"
Expand All @@ -64,7 +88,7 @@ def load_part(data_folder):
return df


@bodo.jit
@bodo.jit(cache=True)
def load_orders(data_folder):
t0 = time.time()
data_path = data_folder + "/orders.pq"
Expand All @@ -75,7 +99,7 @@ def load_orders(data_folder):
return df


@bodo.jit
@bodo.jit(cache=True)
def load_customer(data_folder):
t0 = time.time()
data_path = data_folder + "/customer.pq"
Expand Down Expand Up @@ -110,7 +134,7 @@ def load_region(data_folder):
return df


@bodo.jit
@bodo.jit(cache=True)
def load_supplier(data_folder):
t0 = time.time()
data_path = data_folder + "/supplier.pq"
Expand All @@ -121,7 +145,7 @@ def load_supplier(data_folder):
return df


@bodo.jit
@bodo.jit(cache=True)
def load_partsupp(data_folder):
t0 = time.time()
data_path = data_folder + "/partsupp.pq"
Expand Down Expand Up @@ -1099,9 +1123,21 @@ def main():
default="s3://bodo-example-data/tpch/SF1",
help="The folder containing TPCH data",
)
parser.add_argument(
"--queries",
type=int,
nargs="+",
required=False,
help="Space separated TPC-H queries to run.",
)
args = parser.parse_args()
folder = args.folder
run_queries(folder)
queries = list(range(1, 23))
if args.queries is not None:
queries = args.queries
print(f"Queries to run: {queries}")
qarr = np.array(queries)
run_queries(folder, qarr)


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/tpch/dataframe_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -941,7 +941,7 @@ def main():
type=int,
nargs="+",
required=False,
help="Comma separated TPC-H queries to run.",
help="Space separated TPC-H queries to run.",
)
parser.add_argument(
"--scale_factor",
Expand Down
37 changes: 29 additions & 8 deletions bodo/pandas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,14 +490,35 @@ def to_datetime(
)

# 2. Series Case
return _get_series_func_plan(
arg._plan,
new_metadata,
"pandas.to_datetime",
(),
in_kwargs,
is_method=False,
)
if (
errors == "raise"
and dayfirst == False
and yearfirst == False
and utc == False
and unit == None
and origin == "unix"
and cache == True
):
# If only options supported by Bodo JIT then run as cfunc over map.
import bodo.decorators # isort:skip # noqa
from bodo.utils.utils import bodo_spawn_exec

# Declare function to be compiled to run to_datetime over series.
func = "def bodo_to_datetime(x):\n"
# Embed format string as constant in function.
func += f" return pd.to_datetime(x, format='{in_kwargs['format']}')\n"
# Create the function from string.
to_datetime_func = bodo_spawn_exec(func, {"pd": pd}, {}, __name__)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this needs bodo.jit(func, cache=True) here?

return arg.map(to_datetime_func)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to use a c++ kernel for this maybe using Arrow instead of the compiler? Not saying it needs done now but maybe a comment for a followup

else:
return _get_series_func_plan(
arg._plan,
new_metadata,
"pandas.to_datetime",
(),
in_kwargs,
is_method=False,
)


@check_args_fallback(unsupported="all")
Expand Down
2 changes: 1 addition & 1 deletion bodo/pandas/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def bfs_duplicate(self):
else:
# Remember we encountered this node.
visited.add(id(node))
if isinstance(node, LogicalComparisonJoin):
if isinstance(node, (LogicalComparisonJoin, LogicalCrossProduct)):
# For comparison join, the first two args contain source plans.
for arg in node.args[0:2]:
if isinstance(arg, LazyPlan):
Expand Down
3 changes: 3 additions & 0 deletions bodo/tests/test_df_lib/test_end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ def test_read_parquet_series_len_shape(datapath):
assert bodo_out.shape == py_out.shape


@pytest.mark.jit_dependency
def test_read_parquet_filter_projection(datapath):
"""Test TPC-H Q6 bug where filter and projection pushed down to read parquet
and filter column isn't used anywhere in the query.
Expand Down Expand Up @@ -559,6 +560,7 @@ def test_filter_string(datapath):
)


@pytest.mark.jit_dependency
@pytest.mark.parametrize(
"op", [operator.eq, operator.ne, operator.gt, operator.lt, operator.ge, operator.le]
)
Expand Down Expand Up @@ -590,6 +592,7 @@ def test_filter_datetime_pushdown(datapath, op):
)


@pytest.mark.jit_dependency
@pytest.mark.parametrize(
"op", [operator.eq, operator.ne, operator.gt, operator.lt, operator.ge, operator.le]
)
Expand Down
6 changes: 3 additions & 3 deletions bodo/tests/test_df_lib/test_tpch.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def test_tpch_q10():


def test_tpch_q11():
run_tpch_query_test(tpch.tpch_q11)
run_tpch_query_test(tpch.tpch_q11, ctes_created=1)


def test_tpch_q12():
Expand All @@ -119,7 +119,7 @@ def test_tpch_q14():


def test_tpch_q15():
run_tpch_query_test(tpch.tpch_q15)
run_tpch_query_test(tpch.tpch_q15, ctes_created=1)


def test_tpch_q16():
Expand Down Expand Up @@ -147,4 +147,4 @@ def test_tpch_q21():


def test_tpch_q22():
run_tpch_query_test(tpch.tpch_q22)
run_tpch_query_test(tpch.tpch_q22, ctes_created=1)
Loading