Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions benchmarks/tpch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,10 @@ This section describes how to reproduce benchmark results for both single node a
<!-- TODO: Daft -->
<!-- TODO: Modin -->

You can install the required libraries with the correct versions using the provided `env.yml` file:
You can install the required libraries with the correct versions using the provided `requirements.txt` file:

``` shell
conda env create -n tpch_bodo --file env.yml
conda activate tpch_bodo
pip install -r requirements.txt
```

### Single Node
Expand Down
28 changes: 14 additions & 14 deletions benchmarks/tpch/dask/dask_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
from collections.abc import Callable
from datetime import datetime, timedelta

import dask
import dask.dataframe as dd
from dask.distributed import Client

# Cloud provider config
env_vars = {"EXTRA_CONDA_PACKAGES": "s3fs==2024.10.0"}
Expand All @@ -15,9 +15,9 @@
# NOTE: Setting security = False to avoid large config size
# https://github.com/dask/dask-cloudprovider/issues/249
"security": False,
"n_workers": 3,
"n_workers": 4,
"scheduler_instance_type": "c6i.xlarge",
"worker_instance_type": "r6i.8xlarge",
"worker_instance_type": "c6i.16xlarge",
"docker_image": "daskdev/dask:latest",
# Profile with AmazonS3FullAccess
"iam_instance_profile": {"Name": "dask-benchmark"},
Expand Down Expand Up @@ -1320,21 +1320,25 @@ def get_query_func(q_num: int) -> Callable:

def run_single_query(query_func, dataset_path, scale_factor) -> float:
"""Run a single Dask TPC-H query and return the exectution time in seconds."""
# Warm up run
query_func(dataset_path, scale_factor, ext=".pq").compute()

start = time.time()
query_func(dataset_path, scale_factor, ext=".pq").compute()
return time.time() - start


def run_queries(query_nums, dataset_path, scale_factor) -> None:
total_start = time.time()
with Client(): # Use default LocalCluster settings
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using distributed scheduler in local mode (better performance/generally recommended):
https://dask-local.readthedocs.io/en/latest/setup/single-distributed.html

total_start = time.time()

for i in query_nums:
query_func = get_query_func(i)
query_time = run_single_query(query_func, dataset_path, scale_factor)
print(f"Query {i} execution time: {query_time:.2f} seconds")
for i in query_nums:
query_func = get_query_func(i)
query_time = run_single_query(query_func, dataset_path, scale_factor)
print(f"Query {i} execution time: {query_time:.2f} seconds")

total_time = time.time() - total_start
print(f"Total execution time: {total_time:.4f} seconds")
total_time = time.time() - total_start
print(f"Total execution time: {total_time:.4f} seconds")


def main():
Expand Down Expand Up @@ -1370,10 +1374,6 @@ def main():
scale_factor = args.scale_factor
use_cloudprovider = args.use_cloudprovider

# Configure Scheduler
if not use_cloudprovider:
dask.config.set(scheduler="threads")

queries = list(range(1, 23))
if args.queries is not None:
queries = args.queries
Expand Down
198 changes: 0 additions & 198 deletions benchmarks/tpch/env.yml

This file was deleted.

4 changes: 2 additions & 2 deletions benchmarks/tpch/pds-benchmark/queries/dask/q12.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import datetime
import warnings
from datetime import datetime

import pandas as pd
from queries.dask import utils
Expand All @@ -17,7 +17,7 @@ def query() -> pd.DataFrame:
orders = utils.get_orders_ds()
lineitem = utils.get_line_item_ds()

receiptdate_from = datetime.strptime("1994-01-01", "%Y-%m-%d")
receiptdate_from = datetime.datetime.strptime("1994-01-01", "%Y-%m-%d")
receiptdate_to = receiptdate_from + datetime.timedelta(days=365)

table = orders.merge(lineitem, left_on="o_orderkey", right_on="l_orderkey")
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/tpch/pds-benchmark/queries/dask/q15.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=DeprecationWarning)

Q_NUM = 16
Q_NUM = 15


def q() -> None:
Expand Down
8 changes: 6 additions & 2 deletions benchmarks/tpch/pds-benchmark/queries/dask/q20.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def query() -> pd.DataFrame:
supplier = utils.get_supplier_ds()
nation = utils.get_nation_ds()
part = utils.get_part_ds()
partsupp = utils.get_partsupp_ds()
partsupp = utils.get_part_supp_ds()
shipdate_from = datetime.strptime("1994-01-01", "%Y-%m-%d")
shipdate_to = datetime.strptime("1995-01-01", "%Y-%m-%d")

Expand Down Expand Up @@ -49,7 +49,11 @@ def query() -> pd.DataFrame:
q_final, how="leftsemi", left_on="s_suppkey", right_on="ps_suppkey"
)
q_final["s_address"] = q_final["s_address"].str.strip()
return q_final[["s_name", "s_address"]].sort_values("s_name", ascending=True)
return (
q_final[["s_name", "s_address"]]
.sort_values("s_name", ascending=True)
.compute()
)

utils.run_query(Q_NUM, query)

Expand Down
9 changes: 5 additions & 4 deletions benchmarks/tpch/pds-benchmark/queries/dask/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

from typing import TYPE_CHECKING, Any

import dask
import dask.dataframe as dd
from dask.distributed import Client
from queries.common_utils import (
check_query_result_pd,
get_table_path,
Expand All @@ -18,8 +18,6 @@

settings = Settings()

dask.config.set(scheduler="threads")


def read_ds(table_name: str) -> DataFrame:
if settings.run.io_type == "skip":
Expand Down Expand Up @@ -81,4 +79,7 @@ def get_part_supp_ds() -> DataFrame:


def run_query(query_number: int, query: Callable[..., Any]) -> None:
run_query_generic(query, query_number, "dask", query_checker=check_query_result_pd)
with Client(): # Use default LocalCluster settings
run_query_generic(
query, query_number, "dask", query_checker=check_query_result_pd
)
2 changes: 1 addition & 1 deletion benchmarks/tpch/pds-benchmark/queries/pyspark/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def get_or_create_spark() -> SparkSession:
"s3://"
) or settings.dataset_base_dir.startswith("s3a://"):
builder = builder.config(
"spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.4.1"
"spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4"
)
if settings.dataset_base_dir.startswith("s3a://"):
builder = builder.config(
Expand Down
Loading
Loading