Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions benchmarks/tpch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ This section describes how to reproduce benchmark results for both single node a

| Package | Version |
|----------------|----------------|
| bodo | 2025.12 |
| bodo | 2025.12.2 |
| bodosdk | 2.3.2 |
| polars | 1.35.2 |
| pandas | 2.3.3 |
| duckdb | 1.4.2 |
| dask | 2025.11.0 |
| dask | 2025.12.0 |
| dask-cloudprovider | 2025.9.0 |
| PySpark | 3.5.5 |
<!-- TODO: Daft -->
Expand Down Expand Up @@ -160,7 +160,7 @@ terraform apply \
This will run the script and write logs to an S3 bucket. You can either view the logs in the AWS console or copy them directly using the following scripts:

``` shell
./wait_for_steps.sh
./wait_for_step.sh

aws s3 cp s3://"$(terraform output --json | jq -r '.s3_bucket_id.value')"/logs/"$(terraform output --json | jq -r '.emr_cluster_id.value')" ./emr-logs --recursive --region "$(terraform output --json | jq -r '.emr_cluster_region.value')"

Expand Down
68 changes: 25 additions & 43 deletions benchmarks/tpch/bodo/dataframe_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,88 +12,65 @@
import bodo.spawn.spawner as spawner


@functools.lru_cache
def load_lineitem(data_folder: str, pd=bodo.pandas):
print("Loading lineitem")
data_path = data_folder + "/lineitem.pq"
df = pd.read_parquet(data_path)
df["L_SHIPDATE"] = pd.to_datetime(df.L_SHIPDATE, format="%Y-%m-%d")
df["L_RECEIPTDATE"] = pd.to_datetime(df.L_RECEIPTDATE, format="%Y-%m-%d")
df["L_COMMITDATE"] = pd.to_datetime(df.L_COMMITDATE, format="%Y-%m-%d")
print("Done loading lineitem")
return df


@functools.lru_cache
def load_part(data_folder: str, pd=bodo.pandas):
print("Loading part")
data_path = data_folder + "/part.pq"
df = pd.read_parquet(data_path)
print("Done loading part")
return df


@functools.lru_cache
def load_orders(data_folder: str, pd=bodo.pandas):
print("Loading orders")
data_path = data_folder + "/orders.pq"
df = pd.read_parquet(data_path)
df["O_ORDERDATE"] = pd.to_datetime(df.O_ORDERDATE, format="%Y-%m-%d")
print("Done loading orders")
return df


@functools.lru_cache
def load_customer(data_folder: str, pd=bodo.pandas):
print("Loading customer")
data_path = data_folder + "/customer.pq"
df = pd.read_parquet(data_path)
print("Done loading customer")
return df


@functools.lru_cache
def load_nation(data_folder: str, pd=bodo.pandas):
print("Loading nation")
data_path = data_folder + "/nation.pq"
df = pd.read_parquet(data_path)
print("Done loading nation")
return df


@functools.lru_cache
def load_region(data_folder: str, pd=bodo.pandas):
print("Loading region")
data_path = data_folder + "/region.pq"
df = pd.read_parquet(data_path)
print("Done loading region")
return df


@functools.lru_cache
def load_supplier(data_folder: str, pd=bodo.pandas):
print("Loading supplier")
data_path = data_folder + "/supplier.pq"
df = pd.read_parquet(data_path)
print("Done loading supplier")
return df


@functools.lru_cache
def load_partsupp(data_folder: str, pd=bodo.pandas):
print("Loading partsupp")
data_path = data_folder + "/partsupp.pq"
df = pd.read_parquet(data_path)
print("Done loading partsupp")
return df


def timethis(q: Callable):
def timethis(q: Callable, name: str | None = None):
@functools.wraps(q)
def wrapped(*args, **kwargs):
t = time.time()
q(*args, **kwargs)
print(f"{q.__name__.upper()} Execution time (s): {time.time() - t:f}")
msg = name or f"{q.__name__.upper()} Execution time (s):"
print(f"{msg} {time.time() - t:f}")

return wrapped

Expand Down Expand Up @@ -935,33 +912,38 @@ def q22(customer, orders, pd):
exec_func(tpch_q22(customer, orders, pd))


def _load_args(query: int, root: str, scale_factor: float, backend):
args = []
for arg in _query_to_args[query]:
if arg == "scale_factor":
args.append(scale_factor)
elif arg == "pd":
args.append(backend)
else:
args.append(globals()[f"load_{arg}"](root, pd=backend))
return args


def run_queries(root: str, queries: list[int], scale_factor: float, backend):
if backend is bodo.pandas and bodo.dataframe_library_run_parallel:
spawner.submit_func_to_workers(lambda: warnings.filterwarnings("ignore"), [])

total_start = time.time()
print("Start data loading")
queries_to_args = {}
for query in queries:
args = []
for arg in _query_to_args[query]:
if arg == "scale_factor":
args.append(scale_factor)
elif arg == "pd":
args.append(backend)
else:
args.append(globals()[f"load_{arg}"](root, pd=backend))
queries_to_args[query] = args
print(f"Data loading time (s): {time.time() - total_start}")
q = globals()[f"q{query:02}"]

def query_func():
q(*_load_args(query, root, scale_factor, backend))

query_func = timethis(
query_func, name=f"Q{query:02} Execution time (including read_parquet) (s):"
)

total_start = time.time()
for query in queries:
query_func = globals()[f"q{query:02}"]
# Warm up run:
query_func(*queries_to_args[query])
query_func()

# Second run for timing:
query_func(*queries_to_args[query])
query_func()
print(f"Total query execution time (s): {time.time() - total_start}")


Expand Down
55 changes: 28 additions & 27 deletions benchmarks/tpch/dask/dask_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,6 @@
import dask.dataframe as dd
from dask.distributed import Client

# Cloud provider config
env_vars = {"EXTRA_CONDA_PACKAGES": "s3fs==2024.10.0"}

ec2_config = {
# NOTE: Setting security = False to avoid large config size
# https://github.com/dask/dask-cloudprovider/issues/249
"security": False,
"n_workers": 4,
"scheduler_instance_type": "c6i.xlarge",
"worker_instance_type": "r6i.16xlarge",
"docker_image": "daskdev/dask:latest",
# Profile with AmazonS3FullAccess
"iam_instance_profile": {"Name": "dask-benchmark"},
# Region for accessing bodo-example-data
"region": "us-east-2",
"env_vars": env_vars,
"debug": True,
}


# Bodo Change: make all column names lower case, add extension parameter
def _load_dataset(dataset_path, table_name, ext=".parquet"):
Expand Down Expand Up @@ -1320,9 +1301,6 @@ def get_query_func(q_num: int) -> Callable:

def run_single_query(query_func, dataset_path, scale_factor) -> float:
"""Run a single Dask TPC-H query and return the exectution time in seconds."""
# Warm up run
query_func(dataset_path, scale_factor, ext=".pq").compute()

start = time.time()
query_func(dataset_path, scale_factor, ext=".pq").compute()
return time.time() - start
Expand Down Expand Up @@ -1381,19 +1359,42 @@ def main():
if use_cloudprovider:
from dask_cloudprovider.aws import EC2Cluster

env_vars = {"EXTRA_CONDA_PACKAGES": "s3fs==2025.10.0"}
ec2_config = {
# NOTE: Setting security = False to avoid large config size
# https://github.com/dask/dask-cloudprovider/issues/249
"security": False,
"n_workers": 4,
"scheduler_instance_type": "c6i.xlarge",
"worker_instance_type": "r6i.16xlarge",
"docker_image": "daskdev/dask:2025.12.0-py3.10",
# Region for accessing bodo-example-data
"region": "us-east-2",
"filesystem_size": 1000,
"env_vars": env_vars,
"debug": True,
}

with EC2Cluster(**ec2_config) as cluster:
with cluster.get_client() as client:
print("DASHBOARD LINK: ", client.dashboard_link)

# Running dummy job to warm-up cluster
client.submit(lambda: 1).result()

start = time.time()
for query in queries:
query_func = get_query_func(query)

query_time = client.submit(
run_single_query, query_func, dataset_path, scale_factor
).result(timeout=3600)

print(f"Query {query} execution time: {query_time:.2f} seconds")
try:
print(f"Submitting query {query} at {datetime.now()}")
query_time = client.submit(
run_single_query, query_func, dataset_path, scale_factor
).result(timeout=3600)
print(f"Query {query} execution time: {query_time:.2f} seconds")
except Exception as e:
print(f"Query {query} failed with an exception: {e}")
client.restart()

total_time = time.time() - start
print(f"Total execution time: {total_time:.4f} seconds")
Expand Down
18 changes: 12 additions & 6 deletions benchmarks/tpch/dask/env.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@ dependencies:
- brotli-python=1.2.0=py310h6123dab_1
- bzip2=1.0.8=hd037594_8
- c-ares=1.34.5=h5505292_0
- ca-certificates=2025.11.12=hbd8a1cb_0
- ca-certificates=2026.1.4=hbd8a1cb_0
- cffi=2.0.0=py310hf5b66c1_1
- click=8.3.1=pyh8f84b5b_1
- cloudpickle=3.1.2=pyhd8ed1ab_0
- contourpy=1.3.2=py310h7f4e7e6_0
- cytoolz=1.1.0=py310hfe3a0ae_1
- dask=2025.11.0=pyhcf101f3_0
- dask=2025.12.0=pyhcf101f3_0
- dask-cloudprovider=2025.9.0=pyhd8ed1ab_0
- dask-core=2025.11.0=pyhcf101f3_0
- distributed=2025.11.0=pyhcf101f3_0
- dask-core=2025.12.0=pyhcf101f3_1
- distributed=2025.12.0=pyhcf101f3_1
- frozenlist=1.7.0=py310ha18c8e3_0
- fsspec=2025.10.0=pyhd8ed1ab_0
- gflags=2.2.2=hf9b8971_1005
Expand Down Expand Up @@ -90,6 +90,7 @@ dependencies:
- libjpeg-turbo=3.1.2=hc919400_0
- liblapack=3.11.0=2_hd9741b5_openblas
- liblzma=5.8.1=h39f12f2_2
- liblzma-devel=5.8.1=h39f12f2_2
- libnghttp2=1.67.0=hc438710_0
- libopenblas=0.3.30=openmp_ha158390_3
- libopentelemetry-cpp=1.21.0=he15edb5_1
Expand All @@ -112,6 +113,7 @@ dependencies:
- locket=1.0.0=pyhd8ed1ab_0
- lz4-c=1.10.0=h286801f_1
- markupsafe=3.0.3=py310hf4fd40f_0
- msgpack-python=1.1.2=py310h0e897d2_1
- multidict=6.6.3=py310hdf261b0_0
- narwhals=2.13.0=pyhcf101f3_0
- ncurses=6.5=h5e97a16_3
Expand All @@ -133,7 +135,7 @@ dependencies:
- pyarrow-core=22.0.0=py310h6cc04f2_0_cpu
- pycparser=2.22=pyh29332c3_1
- pysocks=1.7.1=pyha55dd90_7
- python=3.10.19=hcd7f573_2_cpython
- python=3.10.12=h01493a6_0_cpython
- python-dateutil=2.9.0.post0=pyhe01879c_2
- python-tzdata=2025.2=pyhd8ed1ab_0
- python_abi=3.10=8_cp310
Expand All @@ -148,7 +150,7 @@ dependencies:
- sortedcontainers=2.4.0=pyhd8ed1ab_1
- tblib=3.2.2=pyhcf101f3_0
- tk=8.6.13=h892fb3f_3
- tornado=6.5.2=py310hfe3a0ae_2
- tornado=6.5.3=py310hfe3a0ae_0
- typing-extensions=4.15.0=h396c80c_0
- typing_extensions=4.15.0=pyhcf101f3_0
- tzdata=2025b=h78e105d_0
Expand All @@ -158,6 +160,9 @@ dependencies:
- xorg-libxau=1.0.12=hc919400_1
- xorg-libxdmcp=1.1.5=hc919400_1
- xyzservices=2025.11.0=pyhd8ed1ab_0
- xz=5.8.1=h9a6d368_2
- xz-gpl-tools=5.8.1=h9a6d368_2
- xz-tools=5.8.1=h39f12f2_2
- yaml=0.2.5=h925e9cb_3
- yarl=1.22.0=py310hf4fd40f_0
- zict=3.0.0=pyhd8ed1ab_1
Expand Down Expand Up @@ -191,6 +196,7 @@ dependencies:
- pure-eval==0.2.3
- pygments==2.19.2
- pyzmq==27.1.0
- s3fs==2025.10.0
- stack-data==0.6.3
- toolz==0.12.0
- traitlets==5.14.3
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/tpch/pyspark/emr.tf
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ resource "aws_emr_cluster" "emr_cluster" {
instance_count = 3

ebs_config {
size = "40"
size = "300"
type = "gp3"
volumes_per_instance = 1
}
Expand Down
3 changes: 0 additions & 3 deletions benchmarks/tpch/pyspark/pandas_on_spark_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,9 +650,6 @@ def run_queries(data_folder: str, queries: list[int], scale_factor: float = 1.0)
print(f"Query {query:02} not implemented yet.")
continue

# Warm up run
run_query_single(query_func, data_folder, scale_factor)

t2 = time.time()
run_query_single(query_func, data_folder, scale_factor)
print(f"Query {query:02} took {time.time() - t2:.2f} seconds")
Expand Down
Loading