From d7bb25b4858f7e90859209906c9fff951ec68e88 Mon Sep 17 00:00:00 2001 From: Scott Routledge Date: Tue, 20 Jan 2026 13:02:36 -0500 Subject: [PATCH 1/4] minor config changes required for SF1000 --- benchmarks/tpch/README.md | 6 +- benchmarks/tpch/bodo/dataframe_queries.py | 67 ++++++++--------------- benchmarks/tpch/dask/dask_queries.py | 55 ++++++++++--------- benchmarks/tpch/dask/env.yml | 18 ++++-- benchmarks/tpch/pyspark/emr.tf | 2 +- 5 files changed, 68 insertions(+), 80 deletions(-) diff --git a/benchmarks/tpch/README.md b/benchmarks/tpch/README.md index 919bebb8b1..bffc0d19b1 100644 --- a/benchmarks/tpch/README.md +++ b/benchmarks/tpch/README.md @@ -55,12 +55,12 @@ This section describes how to reproduce benchmark results for both single node a | Package | Version | |----------------|----------------| -| bodo | 2025.12 | +| bodo | 2025.12.2 | | bodosdk | 2.3.2 | | polars | 1.35.2 | | pandas | 2.3.3 | | duckdb | 1.4.2 | -| dask | 2025.11.0 | +| dask | 2025.12.0 | | dask-cloudprovider | 2025.9.0 | | PySpark | 3.5.5 | @@ -160,7 +160,7 @@ terraform apply \ This will run the script and write logs to an S3 bucket. You can either view the logs in the AWS console or copy them directly using the following scripts: ``` shell -./wait_for_steps.sh +./wait_for_step.sh aws s3 cp s3://"$(terraform output --json | jq -r '.s3_bucket_id.value')"/logs/"$(terraform output --json | jq -r '.emr_cluster_id.value')" ./emr-logs --recursive --region "$(terraform output --json | jq -r '.emr_cluster_region.value')" diff --git a/benchmarks/tpch/bodo/dataframe_queries.py b/benchmarks/tpch/bodo/dataframe_queries.py index 685b46d170..70bb0980d6 100644 --- a/benchmarks/tpch/bodo/dataframe_queries.py +++ b/benchmarks/tpch/bodo/dataframe_queries.py @@ -12,88 +12,64 @@ import bodo.spawn.spawner as spawner -@functools.lru_cache def load_lineitem(data_folder: str, pd=bodo.pandas): - print("Loading lineitem") data_path = data_folder + "/lineitem.pq" df = pd.read_parquet(data_path) df["L_SHIPDATE"] = pd.to_datetime(df.L_SHIPDATE, format="%Y-%m-%d") df["L_RECEIPTDATE"] = pd.to_datetime(df.L_RECEIPTDATE, format="%Y-%m-%d") df["L_COMMITDATE"] = pd.to_datetime(df.L_COMMITDATE, format="%Y-%m-%d") - print("Done loading lineitem") return df -@functools.lru_cache def load_part(data_folder: str, pd=bodo.pandas): - print("Loading part") data_path = data_folder + "/part.pq" df = pd.read_parquet(data_path) - print("Done loading part") return df -@functools.lru_cache def load_orders(data_folder: str, pd=bodo.pandas): - print("Loading orders") data_path = data_folder + "/orders.pq" df = pd.read_parquet(data_path) df["O_ORDERDATE"] = pd.to_datetime(df.O_ORDERDATE, format="%Y-%m-%d") - print("Done loading orders") return df -@functools.lru_cache def load_customer(data_folder: str, pd=bodo.pandas): - print("Loading customer") data_path = data_folder + "/customer.pq" df = pd.read_parquet(data_path) - print("Done loading customer") return df -@functools.lru_cache def load_nation(data_folder: str, pd=bodo.pandas): - print("Loading nation") data_path = data_folder + "/nation.pq" df = pd.read_parquet(data_path) - print("Done loading nation") return df -@functools.lru_cache def load_region(data_folder: str, pd=bodo.pandas): - print("Loading region") data_path = data_folder + "/region.pq" df = pd.read_parquet(data_path) - print("Done loading region") return df -@functools.lru_cache def load_supplier(data_folder: str, pd=bodo.pandas): - print("Loading supplier") data_path = data_folder + "/supplier.pq" df = pd.read_parquet(data_path) - print("Done loading supplier") return df -@functools.lru_cache def load_partsupp(data_folder: str, pd=bodo.pandas): - print("Loading partsupp") data_path = data_folder + "/partsupp.pq" df = pd.read_parquet(data_path) - print("Done loading partsupp") return df -def timethis(q: Callable): +def timethis(q: Callable, name: str | None = None): @functools.wraps(q) def wrapped(*args, **kwargs): t = time.time() q(*args, **kwargs) - print(f"{q.__name__.upper()} Execution time (s): {time.time() - t:f}") + print(f"{name or q.__name__.upper()} Execution time (s): {time.time() - t:f}") return wrapped @@ -935,33 +911,38 @@ def q22(customer, orders, pd): exec_func(tpch_q22(customer, orders, pd)) +def _load_args(query: int, root: str, scale_factor: float, backend): + args = [] + for arg in _query_to_args[query]: + if arg == "scale_factor": + args.append(scale_factor) + elif arg == "pd": + args.append(backend) + else: + args.append(globals()[f"load_{arg}"](root, pd=backend)) + return args + + def run_queries(root: str, queries: list[int], scale_factor: float, backend): if backend is bodo.pandas and bodo.dataframe_library_run_parallel: spawner.submit_func_to_workers(lambda: warnings.filterwarnings("ignore"), []) total_start = time.time() - print("Start data loading") - queries_to_args = {} for query in queries: - args = [] - for arg in _query_to_args[query]: - if arg == "scale_factor": - args.append(scale_factor) - elif arg == "pd": - args.append(backend) - else: - args.append(globals()[f"load_{arg}"](root, pd=backend)) - queries_to_args[query] = args - print(f"Data loading time (s): {time.time() - total_start}") + q = globals()[f"q{query:02}"] + + def query_func(): + q(*_load_args(query, root, scale_factor, backend)) + + query_func = timethis( + query_func, name=f"Q{query:02} Execution time (including read_parquet) (s):" + ) - total_start = time.time() - for query in queries: - query_func = globals()[f"q{query:02}"] # Warm up run: - query_func(*queries_to_args[query]) + query_func() # Second run for timing: - query_func(*queries_to_args[query]) + query_func() print(f"Total query execution time (s): {time.time() - total_start}") diff --git a/benchmarks/tpch/dask/dask_queries.py b/benchmarks/tpch/dask/dask_queries.py index 243afb05ba..f913879253 100644 --- a/benchmarks/tpch/dask/dask_queries.py +++ b/benchmarks/tpch/dask/dask_queries.py @@ -8,25 +8,6 @@ import dask.dataframe as dd from dask.distributed import Client -# Cloud provider config -env_vars = {"EXTRA_CONDA_PACKAGES": "s3fs==2024.10.0"} - -ec2_config = { - # NOTE: Setting security = False to avoid large config size - # https://github.com/dask/dask-cloudprovider/issues/249 - "security": False, - "n_workers": 4, - "scheduler_instance_type": "c6i.xlarge", - "worker_instance_type": "r6i.16xlarge", - "docker_image": "daskdev/dask:latest", - # Profile with AmazonS3FullAccess - "iam_instance_profile": {"Name": "dask-benchmark"}, - # Region for accessing bodo-example-data - "region": "us-east-2", - "env_vars": env_vars, - "debug": True, -} - # Bodo Change: make all column names lower case, add extension parameter def _load_dataset(dataset_path, table_name, ext=".parquet"): @@ -1320,9 +1301,6 @@ def get_query_func(q_num: int) -> Callable: def run_single_query(query_func, dataset_path, scale_factor) -> float: """Run a single Dask TPC-H query and return the exectution time in seconds.""" - # Warm up run - query_func(dataset_path, scale_factor, ext=".pq").compute() - start = time.time() query_func(dataset_path, scale_factor, ext=".pq").compute() return time.time() - start @@ -1381,19 +1359,42 @@ def main(): if use_cloudprovider: from dask_cloudprovider.aws import EC2Cluster + env_vars = {"EXTRA_CONDA_PACKAGES": "s3fs==2025.10.0"} + ec2_config = { + # NOTE: Setting security = False to avoid large config size + # https://github.com/dask/dask-cloudprovider/issues/249 + "security": False, + "n_workers": 4, + "scheduler_instance_type": "c6i.xlarge", + "worker_instance_type": "r6i.16xlarge", + "docker_image": "daskdev/dask:2025.12.0-py3.10", + # Region for accessing bodo-example-data + "region": "us-east-2", + "filesystem_size": 1000, + "env_vars": env_vars, + "debug": True, + } + with EC2Cluster(**ec2_config) as cluster: with cluster.get_client() as client: print("DASHBOARD LINK: ", client.dashboard_link) + # Running dummy job to warm-up cluster + client.submit(lambda: 1).result() + start = time.time() for query in queries: query_func = get_query_func(query) - query_time = client.submit( - run_single_query, query_func, dataset_path, scale_factor - ).result(timeout=3600) - - print(f"Query {query} execution time: {query_time:.2f} seconds") + try: + print(f"Submitting query {query} at {datetime.now()}") + query_time = client.submit( + run_single_query, query_func, dataset_path, scale_factor + ).result(timeout=3600) + print(f"Query {query} execution time: {query_time:.2f} seconds") + except Exception as e: + print(f"Query {query} failed with an exception: {e}") + client.restart() total_time = time.time() - start print(f"Total execution time: {total_time:.4f} seconds") diff --git a/benchmarks/tpch/dask/env.yml b/benchmarks/tpch/dask/env.yml index 1ea8936a81..df329f24c5 100644 --- a/benchmarks/tpch/dask/env.yml +++ b/benchmarks/tpch/dask/env.yml @@ -34,16 +34,16 @@ dependencies: - brotli-python=1.2.0=py310h6123dab_1 - bzip2=1.0.8=hd037594_8 - c-ares=1.34.5=h5505292_0 - - ca-certificates=2025.11.12=hbd8a1cb_0 + - ca-certificates=2026.1.4=hbd8a1cb_0 - cffi=2.0.0=py310hf5b66c1_1 - click=8.3.1=pyh8f84b5b_1 - cloudpickle=3.1.2=pyhd8ed1ab_0 - contourpy=1.3.2=py310h7f4e7e6_0 - cytoolz=1.1.0=py310hfe3a0ae_1 - - dask=2025.11.0=pyhcf101f3_0 + - dask=2025.12.0=pyhcf101f3_0 - dask-cloudprovider=2025.9.0=pyhd8ed1ab_0 - - dask-core=2025.11.0=pyhcf101f3_0 - - distributed=2025.11.0=pyhcf101f3_0 + - dask-core=2025.12.0=pyhcf101f3_1 + - distributed=2025.12.0=pyhcf101f3_1 - frozenlist=1.7.0=py310ha18c8e3_0 - fsspec=2025.10.0=pyhd8ed1ab_0 - gflags=2.2.2=hf9b8971_1005 @@ -90,6 +90,7 @@ dependencies: - libjpeg-turbo=3.1.2=hc919400_0 - liblapack=3.11.0=2_hd9741b5_openblas - liblzma=5.8.1=h39f12f2_2 + - liblzma-devel=5.8.1=h39f12f2_2 - libnghttp2=1.67.0=hc438710_0 - libopenblas=0.3.30=openmp_ha158390_3 - libopentelemetry-cpp=1.21.0=he15edb5_1 @@ -112,6 +113,7 @@ dependencies: - locket=1.0.0=pyhd8ed1ab_0 - lz4-c=1.10.0=h286801f_1 - markupsafe=3.0.3=py310hf4fd40f_0 + - msgpack-python=1.1.2=py310h0e897d2_1 - multidict=6.6.3=py310hdf261b0_0 - narwhals=2.13.0=pyhcf101f3_0 - ncurses=6.5=h5e97a16_3 @@ -133,7 +135,7 @@ dependencies: - pyarrow-core=22.0.0=py310h6cc04f2_0_cpu - pycparser=2.22=pyh29332c3_1 - pysocks=1.7.1=pyha55dd90_7 - - python=3.10.19=hcd7f573_2_cpython + - python=3.10.12=h01493a6_0_cpython - python-dateutil=2.9.0.post0=pyhe01879c_2 - python-tzdata=2025.2=pyhd8ed1ab_0 - python_abi=3.10=8_cp310 @@ -148,7 +150,7 @@ dependencies: - sortedcontainers=2.4.0=pyhd8ed1ab_1 - tblib=3.2.2=pyhcf101f3_0 - tk=8.6.13=h892fb3f_3 - - tornado=6.5.2=py310hfe3a0ae_2 + - tornado=6.5.3=py310hfe3a0ae_0 - typing-extensions=4.15.0=h396c80c_0 - typing_extensions=4.15.0=pyhcf101f3_0 - tzdata=2025b=h78e105d_0 @@ -158,6 +160,9 @@ dependencies: - xorg-libxau=1.0.12=hc919400_1 - xorg-libxdmcp=1.1.5=hc919400_1 - xyzservices=2025.11.0=pyhd8ed1ab_0 + - xz=5.8.1=h9a6d368_2 + - xz-gpl-tools=5.8.1=h9a6d368_2 + - xz-tools=5.8.1=h39f12f2_2 - yaml=0.2.5=h925e9cb_3 - yarl=1.22.0=py310hf4fd40f_0 - zict=3.0.0=pyhd8ed1ab_1 @@ -191,6 +196,7 @@ dependencies: - pure-eval==0.2.3 - pygments==2.19.2 - pyzmq==27.1.0 + - s3fs==2025.10.0 - stack-data==0.6.3 - toolz==0.12.0 - traitlets==5.14.3 diff --git a/benchmarks/tpch/pyspark/emr.tf b/benchmarks/tpch/pyspark/emr.tf index 26504b7071..a340ab8bec 100644 --- a/benchmarks/tpch/pyspark/emr.tf +++ b/benchmarks/tpch/pyspark/emr.tf @@ -69,7 +69,7 @@ resource "aws_emr_cluster" "emr_cluster" { instance_count = 3 ebs_config { - size = "40" + size = "300" type = "gp3" volumes_per_instance = 1 } From 691b40633a24387a336cff8e9e9211df967da91a Mon Sep 17 00:00:00 2001 From: Scott Routledge Date: Tue, 20 Jan 2026 13:53:43 -0500 Subject: [PATCH 2/4] remove warmup run for spark --- benchmarks/tpch/pyspark/pandas_on_spark_queries.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/benchmarks/tpch/pyspark/pandas_on_spark_queries.py b/benchmarks/tpch/pyspark/pandas_on_spark_queries.py index 12e740f653..35b72e53eb 100644 --- a/benchmarks/tpch/pyspark/pandas_on_spark_queries.py +++ b/benchmarks/tpch/pyspark/pandas_on_spark_queries.py @@ -650,9 +650,6 @@ def run_queries(data_folder: str, queries: list[int], scale_factor: float = 1.0) print(f"Query {query:02} not implemented yet.") continue - # Warm up run - run_query_single(query_func, data_folder, scale_factor) - t2 = time.time() run_query_single(query_func, data_folder, scale_factor) print(f"Query {query:02} took {time.time() - t2:.2f} seconds") From fb3b6c775f2a60c7675880bf45eaabce63e2dce7 Mon Sep 17 00:00:00 2001 From: Scott Routledge Date: Tue, 20 Jan 2026 14:08:13 -0500 Subject: [PATCH 3/4] [run ci] From 96d83aaf2f584c686def65a7e6d0a7b15684baa6 Mon Sep 17 00:00:00 2001 From: Scott Routledge Date: Wed, 21 Jan 2026 09:17:31 -0500 Subject: [PATCH 4/4] fix print msg in bodo script --- benchmarks/tpch/bodo/dataframe_queries.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/benchmarks/tpch/bodo/dataframe_queries.py b/benchmarks/tpch/bodo/dataframe_queries.py index 70bb0980d6..aaf5a2f3d6 100644 --- a/benchmarks/tpch/bodo/dataframe_queries.py +++ b/benchmarks/tpch/bodo/dataframe_queries.py @@ -69,7 +69,8 @@ def timethis(q: Callable, name: str | None = None): def wrapped(*args, **kwargs): t = time.time() q(*args, **kwargs) - print(f"{name or q.__name__.upper()} Execution time (s): {time.time() - t:f}") + msg = name or f"{q.__name__.upper()} Execution time (s):" + print(f"{msg} {time.time() - t:f}") return wrapped