Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions python/cuml/cuml/dask/ensemble/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,18 @@ def apply_reduction(self, reduce, partial_infs, datatype, delayed):
else:
return delayed_res.persist()

def _handle_deprecated_predict_model(self, predict_model):
Comment thread
csadorf marked this conversation as resolved.
if predict_model != "deprecated":
warnings.warn(
(
"`predict_model` is deprecated (and ignored) and will be removed "
"in 25.12. The default of `predict_model='GPU'` should suffice in "
"all situations. When inferring on small datasets you may also "
"want to try setting ``broadcast_data=True``."
),
FutureWarning,
)


def _func_fit(model, input_data, convert_dtype):
X = concatenate([item[0] for item in input_data])
Expand Down
157 changes: 32 additions & 125 deletions python/cuml/cuml/dask/ensemble/randomforestclassifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
#
import cupy as cp
import dask.array
import numpy as np
from dask.distributed import default_client

from cuml.dask.common.base import (
BaseEstimator,
Expand Down Expand Up @@ -275,7 +273,7 @@ def predict(
X,
threshold=0.5,
convert_dtype=True,
predict_model="GPU",
predict_model="deprecated",
layout="depth_first",
default_chunk_size=None,
align_bytes=None,
Expand All @@ -285,40 +283,25 @@ def predict(
"""
Predicts the labels for X.

GPU-based prediction in a multi-node, multi-GPU context works
by sending the sub-forest from each worker to the client,
concatenating these into one forest with the full
`n_estimators` set of trees, and sending this combined forest to
the workers, which will each infer on their local set of data.
Within the worker, this uses the cuML Forest Inference Library
(cuml.fil) for high-throughput prediction.

This allows inference to scale to large datasets, but the forest
transmission incurs overheads for very large trees. For inference
on small datasets, this overhead may dominate prediction time.

The 'CPU' fallback method works with sub-forests in-place, broadcasting
the datasets to all workers and combining predictions via a voting
method at the end. This method is slower on a per-row basis but may be
faster for problems with many trees and few rows.

Parameters
----------
X : Dask cuDF dataframe or CuPy backed Dask Array (n_rows, n_features)
Distributed dense matrix (floats or doubles) of shape
(n_samples, n_features).
threshold : float (default = 0.5)
Threshold used for classification. Optional and required only
while performing the predict operation on the GPU, that is for,
predict_model='GPU'.
Threshold used for classification.
convert_dtype : bool, optional (default = True)
When set to True, the predict method will, when necessary, convert
the input to the data type which was used to train the model. This
will increase memory used for the method.
predict_model : String (default = 'GPU')
'GPU' to predict using the GPU, 'CPU' otherwise. The GPU can only
be used if the model was trained on float32 data and `X` is float32
or convert_dtype is set to True.
predict_model : string (default = 'deprecated')

.. deprecated:: 25.10
`predict_model` is deprecated (and ignored) and will be removed
in 25.12. The default of `predict_model="GPU"` should suffice in
all situations. When inferring on small datasets you may also
want to try setting ``broadcast_data=True``.

layout : string (default = 'depth_first')
Specifies the in-memory layout of nodes in FIL forests. Options:
'depth_first', 'layered', 'breadth_first'.
Expand All @@ -330,11 +313,10 @@ def predict(
If specified, trees will be padded such that their in-memory size is
a multiple of this value. This can improve performance by guaranteeing
that memory reads from trees begin on a cache line boundary.
Typical values are 0 or 128 on GPU and 0 or 64 on CPU.
Typical values are 0 or 128.
delayed : bool (default = True)
Whether to do a lazy prediction (and return Delayed objects) or an
eagerly executed one. It is not required while using
predict_model='CPU'.
eagerly executed one.
broadcast_data : bool (default = False)
If False, the trees are merged in a single model before the workers
perform inference on their share of the prediction workload.
Expand All @@ -348,29 +330,26 @@ def predict(
y : Dask cuDF dataframe or CuPy backed Dask Array (n_rows, 1)
The predicted class labels.
"""
if predict_model == "CPU":
preds = self.predict_model_on_cpu(X=X, convert_dtype=convert_dtype)
else:
if broadcast_data:
preds = self.partial_inference(
X,
convert_dtype=convert_dtype,
layout=layout,
default_chunk_size=default_chunk_size,
align_bytes=align_bytes,
delayed=delayed,
)
else:
preds = self._predict_using_fil(
X,
threshold=threshold,
convert_dtype=convert_dtype,
layout=layout,
default_chunk_size=default_chunk_size,
align_bytes=align_bytes,
delayed=delayed,
)
return preds
self._handle_deprecated_predict_model(predict_model)

if broadcast_data:
return self.partial_inference(
X,
convert_dtype=convert_dtype,
layout=layout,
default_chunk_size=default_chunk_size,
align_bytes=align_bytes,
delayed=delayed,
)
return self._predict_using_fil(
X,
threshold=threshold,
convert_dtype=convert_dtype,
layout=layout,
default_chunk_size=default_chunk_size,
align_bytes=align_bytes,
delayed=delayed,
)

def partial_inference(self, X, delayed, **kwargs):
partial_infs = self._partial_inference(
Expand All @@ -392,78 +371,6 @@ def partial_inference(self, X, delayed, **kwargs):
else:
return pred_class.persist()

def predict_using_fil(self, X, delayed, **kwargs):
if self._get_internal_model() is None:
self._set_internal_model(self._concat_treelite_models())

return self._predict_using_fil(X=X, delayed=delayed, **kwargs)

"""
TODO : Update function names used for CPU predict.
Cuml issue #1854 has been created to track this.
"""

def predict_model_on_cpu(self, X, convert_dtype=True):
"""
Predicts the labels for X.

Parameters
----------
X : Dask cuDF dataframe or CuPy backed Dask Array (n_rows, n_features)
Distributed dense matrix (floats or doubles) of shape
(n_samples, n_features).
convert_dtype : bool, optional (default = True)
When set to True, the predict method will, when necessary, convert
the input to the data type which was used to train the model. This
will increase memory used for the method.

Returns
-------
y : Dask cuDF dataframe or CuPy backed Dask Array (n_rows, 1)
"""
c = default_client()
workers = self.workers

X_Scattered = c.scatter(X)
futures = list()
for n, w in enumerate(workers):
futures.append(
c.submit(
RandomForestClassifier._predict_model_on_cpu,
self.rfs[w],
X_Scattered,
convert_dtype,
workers=[w],
)
)

rslts = self.client.gather(futures, errors="raise")
indexes = np.zeros(len(futures), dtype=np.int32)
pred = list()

for i in range(len(X)):
classes = dict()
max_class = -1
max_val = 0

for d in range(len(rslts)):
for j in range(self.n_estimators_per_worker[d]):
sub_ind = indexes[d] + j
cls = rslts[d][sub_ind]
if cls not in classes.keys():
classes[cls] = 1
else:
classes[cls] = classes[cls] + 1

if classes[cls] > max_val:
max_val = classes[cls]
max_class = cls

indexes[d] = indexes[d] + self.n_estimators_per_worker[d]

pred.append(max_class)
return pred

def predict_proba(self, X, delayed=True, **kwargs):
"""
Predicts the probability of each class for X.
Expand Down
119 changes: 29 additions & 90 deletions python/cuml/cuml/dask/ensemble/randomforestregressor.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,6 @@ def _construct_rf(n_estimators, random_state, **kwargs):
n_estimators=n_estimators, random_state=random_state, **kwargs
)

@staticmethod
def _predict_model_on_cpu(model, X, convert_dtype):
return model._predict_model_on_cpu(X, convert_dtype=convert_dtype)

def get_summary_text(self):
"""
Obtain the text summary of the random forest model
Expand Down Expand Up @@ -254,7 +250,7 @@ def predict(
self,
X,
convert_dtype=True,
predict_model="GPU",
predict_model="deprecated",
layout="depth_first",
default_chunk_size=None,
align_bytes=None,
Expand All @@ -264,24 +260,6 @@ def predict(
"""
Predicts the regressor outputs for X.

GPU-based prediction in a multi-node, multi-GPU context works
by sending the sub-forest from each worker to the client,
concatenating these into one forest with the full
`n_estimators` set of trees, and sending this combined forest to
the workers, which will each infer on their local set of data.
Within the worker, this uses the cuML Forest Inference Library
(cuml.fil) for high-throughput prediction.

This allows inference to scale to large datasets, but the forest
transmission incurs overheads for very large trees. For inference
on small datasets, this overhead may dominate prediction time.

The 'CPU' fallback method works with sub-forests in-place,
broadcasting the datasets to all workers and combining predictions
via an averaging method at the end. This method is slower
on a per-row basis but may be faster for problems with many trees
and few rows.

Parameters
----------
X : Dask cuDF dataframe or CuPy backed Dask Array (n_rows, n_features)
Expand All @@ -291,10 +269,14 @@ def predict(
When set to True, the predict method will, when necessary, convert
the input to the data type which was used to train the model. This
will increase memory used for the method.
predict_model : String (default = 'GPU')
'GPU' to predict using the GPU, 'CPU' otherwise. The GPU can only
be used if the model was trained on float32 data and `X` is float32
or convert_dtype is set to True.
predict_model : string (default = 'deprecated')

.. deprecated:: 25.10
`predict_model` is deprecated (and ignored) and will be removed
in 25.12. The default of `predict_model="GPU"` should suffice in
all situations. When inferring on small datasets you may also
want to try setting ``broadcast_data=True``.

layout : string (default = 'depth_first')
Specifies the in-memory layout of nodes in FIL forests. Options:
'depth_first', 'layered', 'breadth_first'.
Expand All @@ -306,7 +288,7 @@ def predict(
If specified, trees will be padded such that their in-memory size is
a multiple of this value. This can improve performance by guaranteeing
that memory reads from trees begin on a cache line boundary.
Typical values are 0 or 128 on GPU and 0 or 64 on CPU.
Typical values are 0 or 128.
delayed : bool (default = True)
Whether to do a lazy prediction (and return Delayed objects) or an
eagerly executed one.
Expand All @@ -322,29 +304,25 @@ def predict(
-------
y : Dask cuDF dataframe or CuPy backed Dask Array (n_rows, 1)
"""
if predict_model == "CPU":
preds = self.predict_model_on_cpu(X, convert_dtype=convert_dtype)

else:
if broadcast_data:
preds = self.partial_inference(
X,
convert_dtype=convert_dtype,
layout=layout,
default_chunk_size=default_chunk_size,
align_bytes=align_bytes,
delayed=delayed,
)
else:
preds = self._predict_using_fil(
X,
convert_dtype=convert_dtype,
layout=layout,
default_chunk_size=default_chunk_size,
align_bytes=align_bytes,
delayed=delayed,
)
return preds
self._handle_deprecated_predict_model(predict_model)

if broadcast_data:
return self.partial_inference(
X,
convert_dtype=convert_dtype,
layout=layout,
default_chunk_size=default_chunk_size,
align_bytes=align_bytes,
delayed=delayed,
)
return self._predict_using_fil(
X,
convert_dtype=convert_dtype,
layout=layout,
default_chunk_size=default_chunk_size,
align_bytes=align_bytes,
delayed=delayed,
)

def partial_inference(self, X, delayed, **kwargs):
partial_infs = self._partial_inference(
Expand All @@ -360,45 +338,6 @@ def partial_inference(self, X, delayed, **kwargs):
else:
return merged_regressions.persist()

def predict_using_fil(self, X, delayed, **kwargs):
if self._get_internal_model() is None:
self._set_internal_model(self._concat_treelite_models())
return self._predict_using_fil(X=X, delayed=delayed, **kwargs)

"""
TODO : Update function names used for CPU predict.
Cuml issue #1854 has been created to track this.
"""

def predict_model_on_cpu(self, X, convert_dtype):
workers = self.workers

X_Scattered = self.client.scatter(X)

futures = list()
for n, w in enumerate(workers):
futures.append(
self.client.submit(
RandomForestRegressor._predict_model_on_cpu,
self.rfs[w],
X_Scattered,
convert_dtype,
workers=[w],
)
)

rslts = self.client.gather(futures, errors="raise")
pred = list()

for i in range(len(X)):
pred_per_worker = 0.0
for d in range(len(rslts)):
pred_per_worker = pred_per_worker + rslts[d][i]

pred.append(pred_per_worker / len(rslts))

return pred

def get_params(self, deep=True):
"""
Returns the value of all parameters
Expand Down
Loading