diff --git a/dask_planner/src/sql.rs b/dask_planner/src/sql.rs index d87bfcc76..a0e238727 100644 --- a/dask_planner/src/sql.rs +++ b/dask_planner/src/sql.rs @@ -244,6 +244,11 @@ impl ContextProvider for DaskSQLContext { DataType::Int64, DataType::Timestamp(TimeUnit::Nanosecond, None), ]), + TypeSignature::Exact(vec![ + DataType::Utf8, + DataType::Int64, + DataType::Int64, + ]), ], Volatility::Immutable, ); @@ -251,11 +256,23 @@ impl ContextProvider for DaskSQLContext { return Some(Arc::new(ScalarUDF::new(name, &sig, &rtf, &fun))); } "timestampdiff" => { - let sig = Signature::exact( + let sig = Signature::one_of( vec![ - DataType::Utf8, - DataType::Timestamp(TimeUnit::Nanosecond, None), - DataType::Timestamp(TimeUnit::Nanosecond, None), + TypeSignature::Exact(vec![ + DataType::Utf8, + DataType::Timestamp(TimeUnit::Nanosecond, None), + DataType::Timestamp(TimeUnit::Nanosecond, None), + ]), + TypeSignature::Exact(vec![ + DataType::Utf8, + DataType::Date64, + DataType::Date64, + ]), + TypeSignature::Exact(vec![ + DataType::Utf8, + DataType::Int64, + DataType::Int64, + ]), ], Volatility::Immutable, ); diff --git a/dask_sql/physical/rex/core/call.py b/dask_sql/physical/rex/core/call.py index 712173704..85d083d78 100644 --- a/dask_sql/physical/rex/core/call.py +++ b/dask_sql/physical/rex/core/call.py @@ -251,6 +251,9 @@ def cast(self, operand, rex=None) -> SeriesOrScalar: if output_type == "DECIMAL": sql_type_args = rex.getPrecisionScale() + if output_type == "TIMESTAMP" and pd.api.types.is_integer_dtype(operand): + operand = operand * 10**9 + if not is_frame(operand): # pragma: no cover return sql_to_python_value(sql_type, operand) @@ -613,17 +616,8 @@ def to_timestamp(self, df, format): format = format.replace('"', "") format = format.replace("'", "") - # TODO: format timestamps for GPU tests - if is_cudf_type(df): - if format != default_format: - raise RuntimeError("Non-default timestamp formats not supported on GPU") - if df.dtype == "object": - return df - else: - nanoseconds_to_seconds = 10**9 - return df * nanoseconds_to_seconds # String cases - elif type(df) == str: + if type(df) == str: return np.datetime64(datetime.strptime(df, format)) elif df.dtype == "object": return dd.to_datetime(df, format=format) @@ -656,7 +650,11 @@ def timestampadd(self, unit, interval, df: SeriesOrScalar): interval = int(interval) if interval < 0: raise RuntimeError(f"Negative time interval {interval} is not supported.") - df = df.astype("datetime64[ns]") + df = ( + df.astype("datetime64[s]") + if pd.api.types.is_integer_dtype(df) + else df.astype("datetime64[ns]") + ) if is_cudf_type(df): from cudf import DateOffset @@ -700,11 +698,23 @@ def __init__(self): super().__init__(self.datetime_sub) def datetime_sub(self, unit, df1, df2): + if pd.api.types.is_integer_dtype(df1): + df1 = df1 * 10**9 + if pd.api.types.is_integer_dtype(df2): + df2 = df2 * 10**9 + if "datetime64[s]" == str(getattr(df1, "dtype", "")): + df1 = df1.astype("datetime64[ns]") + if "datetime64[s]" == str(getattr(df2, "dtype", "")): + df2 = df2.astype("datetime64[ns]") + subtraction_op = ReduceOperation( operation=operator.sub, unary_operation=lambda x: -x ) result = subtraction_op(df2, df1) + if is_cudf_type(df1): + result = result.astype("int") + if unit in {"NANOSECOND", "NANOSECONDS"}: return result elif unit in {"MICROSECOND", "MICROSECONDS"}: diff --git a/tests/integration/test_rex.py b/tests/integration/test_rex.py index 93d77dca5..b49a687d2 100644 --- a/tests/integration/test_rex.py +++ b/tests/integration/test_rex.py @@ -940,21 +940,7 @@ def test_timestampdiff(c): assert_eq(ddf, expected_df, check_dtype=False) -@pytest.mark.parametrize( - "gpu", - [ - False, - pytest.param( - True, - marks=( - pytest.mark.gpu, - pytest.mark.xfail( - reason="Failing due to dask-cudf bug https://github.com/rapidsai/cudf/issues/12062" - ), - ), - ), - ], -) +@pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)]) def test_totimestamp(c, gpu): df = pd.DataFrame( { @@ -1121,3 +1107,134 @@ def test_extract_date(c, gpu): {"i": [datetime(2021, 1, 3), datetime(2022, 2, 4), datetime(2023, 3, 5)]} ) assert_eq(result, expected_df) + + +@pytest.mark.parametrize( + "gpu", + [ + False, + pytest.param( + True, + marks=( + pytest.mark.gpu, + pytest.mark.xfail( + not DASK_CUDF_TODATETIME_SUPPORT, + reason="Requires https://github.com/dask/dask/pull/9881", + raises=RuntimeError, + ), + ), + ), + ], +) +def test_scalar_timestamps(c, gpu): + df = pd.DataFrame({"d": [1203073300, 1503073700]}) + c.create_table("df", df, gpu=gpu) + + expected_df = pd.DataFrame( + { + "dt": [datetime(2008, 2, 20, 11, 1, 40), datetime(2017, 8, 23, 16, 28, 20)], + } + ) + + df1 = c.sql("SELECT to_timestamp(d) + INTERVAL '5 days' AS dt FROM df") + assert_eq(df1, expected_df) + df2 = c.sql("SELECT CAST(d AS TIMESTAMP) + INTERVAL '5 days' AS dt FROM df") + assert_eq(df2, expected_df) + + df1 = c.sql("SELECT TIMESTAMPADD(DAY, 5, to_timestamp(d)) AS dt FROM df") + assert_eq(df1, expected_df) + df2 = c.sql("SELECT TIMESTAMPADD(DAY, 5, d) AS dt FROM df") + assert_eq(df2, expected_df) + df3 = c.sql("SELECT TIMESTAMPADD(DAY, 5, CAST(d AS TIMESTAMP)) AS dt FROM df") + assert_eq(df3, expected_df) + + expected_df = pd.DataFrame({"day": [15, 18]}) + df1 = c.sql("SELECT EXTRACT(DAY FROM to_timestamp(d)) AS day FROM df") + assert_eq(df1, expected_df, check_dtype=False) + df2 = c.sql("SELECT EXTRACT(DAY FROM CAST(d AS TIMESTAMP)) AS day FROM df") + assert_eq(df2, expected_df, check_dtype=False) + + expected_df = pd.DataFrame( + { + "ceil_to_day": [datetime(2008, 2, 16), datetime(2017, 8, 19)], + } + ) + df1 = c.sql("SELECT CEIL(to_timestamp(d) TO DAY) AS ceil_to_day FROM df") + assert_eq(df1, expected_df) + df2 = c.sql("SELECT CEIL(CAST(d AS TIMESTAMP) TO DAY) AS ceil_to_day FROM df") + assert_eq(df2, expected_df) + + expected_df = pd.DataFrame( + { + "floor_to_day": [datetime(2008, 2, 15), datetime(2017, 8, 18)], + } + ) + df1 = c.sql("SELECT FLOOR(to_timestamp(d) TO DAY) AS floor_to_day FROM df") + assert_eq(df1, expected_df) + df2 = c.sql("SELECT FLOOR(CAST(d AS TIMESTAMP) TO DAY) AS floor_to_day FROM df") + assert_eq(df2, expected_df) + + df = pd.DataFrame({"d1": [1203073300], "d2": [1503073700]}) + c.create_table("df", df, gpu=gpu) + expected_df = pd.DataFrame({"dt": [3472]}) + df1 = c.sql( + "SELECT TIMESTAMPDIFF(DAY, to_timestamp(d1), to_timestamp(d2)) AS dt FROM df" + ) + # TODO: The GPU case returns an incorrect value here + if not gpu: + assert_eq(df1, expected_df) + df2 = c.sql("SELECT TIMESTAMPDIFF(DAY, d1, d2) AS dt FROM df") + assert_eq(df2, expected_df, check_dtype=False) + df3 = c.sql( + "SELECT TIMESTAMPDIFF(DAY, CAST(d1 AS TIMESTAMP), CAST(d2 AS TIMESTAMP)) AS dt FROM df" + ) + assert_eq(df3, expected_df) + + scalar1 = 1203073300 + scalar2 = 1503073700 + + expected_df = pd.DataFrame({"dt": [datetime(2008, 2, 20, 11, 1, 40)]}) + + df1 = c.sql(f"SELECT to_timestamp({scalar1}) + INTERVAL '5 days' AS dt") + assert_eq(df1, expected_df) + # TODO: Fix seconds/nanoseconds conversion + # df2 = c.sql(f"SELECT CAST({scalar1} AS TIMESTAMP) + INTERVAL '5 days' AS dt") + # assert_eq(df2, expected_df) + + df1 = c.sql(f"SELECT TIMESTAMPADD(DAY, 5, to_timestamp({scalar1})) AS dt") + assert_eq(df1, expected_df) + df2 = c.sql(f"SELECT TIMESTAMPADD(DAY, 5, {scalar1}) AS dt") + assert_eq(df2, expected_df) + df3 = c.sql(f"SELECT TIMESTAMPADD(DAY, 5, CAST({scalar1} AS TIMESTAMP)) AS dt") + assert_eq(df3, expected_df) + + expected_df = pd.DataFrame({"day": [15]}) + df1 = c.sql(f"SELECT EXTRACT(DAY FROM to_timestamp({scalar1})) AS day") + assert_eq(df1, expected_df, check_dtype=False) + # TODO: Fix seconds/nanoseconds conversion + # df2 = c.sql(f"SELECT EXTRACT(DAY FROM CAST({scalar1} AS TIMESTAMP)) AS day") + # assert_eq(df2, expected_df, check_dtype=False) + + expected_df = pd.DataFrame({"ceil_to_day": [datetime(2008, 2, 16)]}) + df1 = c.sql(f"SELECT CEIL(to_timestamp({scalar1}) TO DAY) AS ceil_to_day") + assert_eq(df1, expected_df) + df2 = c.sql(f"SELECT CEIL(CAST({scalar1} AS TIMESTAMP) TO DAY) AS ceil_to_day") + assert_eq(df2, expected_df) + + expected_df = pd.DataFrame({"floor_to_day": [datetime(2008, 2, 15)]}) + df1 = c.sql(f"SELECT FLOOR(to_timestamp({scalar1}) TO DAY) AS floor_to_day") + assert_eq(df1, expected_df) + df2 = c.sql(f"SELECT FLOOR(CAST({scalar1} AS TIMESTAMP) TO DAY) AS floor_to_day") + assert_eq(df2, expected_df) + + expected_df = pd.DataFrame({"dt": [3472]}) + df1 = c.sql( + f"SELECT TIMESTAMPDIFF(DAY, to_timestamp({scalar1}), to_timestamp({scalar2})) AS dt" + ) + assert_eq(df1, expected_df) + df2 = c.sql(f"SELECT TIMESTAMPDIFF(DAY, {scalar1}, {scalar2}) AS dt") + assert_eq(df2, expected_df, check_dtype=False) + df3 = c.sql( + f"SELECT TIMESTAMPDIFF(DAY, CAST({scalar1} AS TIMESTAMP), CAST({scalar2} AS TIMESTAMP)) AS dt" + ) + assert_eq(df3, expected_df)