Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions python/paddle/base/device_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Definition of device workers."""

import sys

__all__ = []
Expand Down Expand Up @@ -626,9 +627,10 @@ def _gen_worker_desc(self, trainer_desc):
# then runs Backward phase for all microbatches.
# 1F1B scheduler, which runs forward phase and backward phase alternatively
# after startup phase.
assert schedule_mode_str in ["F-then-B", "1F1B"], (
"The schedule mode " "for pipeline must be one of F-then-B or 1F1B"
)
assert schedule_mode_str in [
"F-then-B",
"1F1B",
], "The schedule mode for pipeline must be one of F-then-B or 1F1B"
schedule_mode = 0 if schedule_mode_str == "F-then-B" else 1
section_param.schedule_mode = schedule_mode
cfg = section_param.section_config
Expand Down
7 changes: 5 additions & 2 deletions python/paddle/cinn/runtime/cinn_jit.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,16 @@ def _make_launcher(self):
jit_input_args = ', '.join(arg_name for arg_name in self.arg_names)
lazy_compile = f"""
import paddle.cinn as cinn
def {self.fn.__name__}({jit_input_args}, target=cinn.common.DefaultHostTarget()):
def {self.fn.__name__}({
jit_input_args
}, target=cinn.common.DefaultHostTarget()):
from paddle.cinn.compiler import compile
jit_inputs = {', '.join([f'{arg}' for arg in self.arg_names])}
jit_inputs_signature = {{ i: self._convert_arg_type(arg) \
for i, arg in enumerate(jit_inputs)}}
module = compile(self, jit_inputs_signature=jit_inputs_signature, arg_names={
self.arg_names}, target=target)
self.arg_names
}, target=target)
module({jit_input_args})

return module
Expand Down
12 changes: 11 additions & 1 deletion python/paddle/dataset/conll05.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,17 @@ def reader():
pred_idx = [predicate_dict.get(predicate)] * sen_len
label_idx = [label_dict.get(w) for w in labels]

yield word_idx, ctx_n2_idx, ctx_n1_idx, ctx_0_idx, ctx_p1_idx, ctx_p2_idx, pred_idx, mark, label_idx
yield (
word_idx,
ctx_n2_idx,
ctx_n1_idx,
ctx_0_idx,
ctx_p1_idx,
ctx_p2_idx,
pred_idx,
mark,
label_idx,
)

return reader

Expand Down
11 changes: 8 additions & 3 deletions python/paddle/dataset/imdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,14 @@ def tokenize(pattern):
while tf is not None:
if bool(pattern.match(tf.name)):
# newline and punctuations removal and ad-hoc tokenization.
yield tarf.extractfile(tf).read().rstrip(b'\n\r').translate(
None, string.punctuation.encode('latin-1')
).lower().split()
yield (
tarf.extractfile(tf)
.read()
.rstrip(b'\n\r')
.translate(None, string.punctuation.encode('latin-1'))
.lower()
.split()
)
tf = tarf.next()


Expand Down
3 changes: 1 addition & 2 deletions python/paddle/dataset/wmt14.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@
__all__ = []

URL_DEV_TEST = (
'http://www-lium.univ-lemans.fr/~schwenk/'
'cslm_joint_paper/data/dev+test.tgz'
'http://www-lium.univ-lemans.fr/~schwenk/cslm_joint_paper/data/dev+test.tgz'
)
MD5_DEV_TEST = '7d7897317ddd8ba0ae5c5fa7248d3ff5'
# this is a small set of data for test. The original data is too large and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ def extract_tensors_with_grads(
# Deactivate auto mixed precision context in the backward phase
with paddle.amp.auto_cast(enable=False):
paddle.autograd.backward(
stage_output_tensors, grad_tensors=output_grad_tensors # type: ignore[arg-type]
stage_output_tensors,
grad_tensors=output_grad_tensors,
)

# Extract gradients wrt the input values
Expand Down
12 changes: 9 additions & 3 deletions python/paddle/distributed/auto_parallel/pipelining/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,9 @@ def _step_microbatches(
for work in works.values():
work.wait()

output = self._stage.forward_one_chunk(i, arg_mbs[i], kwarg_mbs[i]) # type: ignore[index]
output = self._stage.forward_one_chunk(
i, arg_mbs[i], kwarg_mbs[i]
)

ops = self._stage.get_fwd_send_ops(i)
works = _sorted_batch_p2p(ops, desc="fwd_send")
Expand Down Expand Up @@ -577,7 +579,9 @@ def _step_microbatches(
recv_work.wait()

# Compute
output = self._stage.forward_one_chunk(fwd_mb_index, arg_mbs[fwd_mb_index], kwarg_mbs[fwd_mb_index]) # type: ignore[index]
output = self._stage.forward_one_chunk(
fwd_mb_index, arg_mbs[fwd_mb_index], kwarg_mbs[fwd_mb_index]
)

# Clear previous chunk's forward sends (hopefully they have well
# finished, otherwise, we are heavily communication bound, in which
Expand Down Expand Up @@ -639,7 +643,9 @@ def _step_microbatches(
fuse_work.wait()

# Now do the fwd
output = self._stage.forward_one_chunk(fwd_mb_index, arg_mbs[fwd_mb_index], kwarg_mbs[fwd_mb_index]) # type: ignore[index]
output = self._stage.forward_one_chunk(
fwd_mb_index, arg_mbs[fwd_mb_index], kwarg_mbs[fwd_mb_index]
)

# Compute loss
self._maybe_compute_loss(
Expand Down
5 changes: 3 additions & 2 deletions python/paddle/distributed/auto_parallel/ring_attention.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,9 @@ def update_out_and_lse(
old_lse[:, old_lse.shape[1] // 2 :, :, :] = second_chunk_lse
return old_out, old_lse
else:
block_out, block_lse = paddle.cast(block_out, "float32"), paddle.cast(
block_lse, "float32"
block_out, block_lse = (
paddle.cast(block_out, "float32"),
paddle.cast(block_lse, "float32"),
)
with paddle.amp.auto_cast(enable=False):
return old_out - (old_out - block_out) * F.sigmoid(
Expand Down
6 changes: 2 additions & 4 deletions python/paddle/distributed/auto_parallel/static/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ def _check_tensor_dict(self, tensors_dict):
def _check_pre_strategy(self, pre_strategy):
if not pre_strategy:
raise ValueError(
"'pre_strategy' is None, "
"there are not tensors in pre process."
"'pre_strategy' is None, there are not tensors in pre process."
)
if not isinstance(pre_strategy, dict):
raise TypeError(
Expand All @@ -74,8 +73,7 @@ def _check_pre_strategy(self, pre_strategy):
def _check_cur_strategy(self, cur_strategy):
if not cur_strategy:
warnings.warn(
"'cur_strategy' is None, "
"there are not tensors in cur process"
"'cur_strategy' is None, there are not tensors in cur process"
)
if not isinstance(cur_strategy, dict):
raise TypeError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,10 @@ def _filter_vars_with_zero_in_degree_and_ignore_feed_fetch_vars():
# ignore communication op from graph, because sometimes we want to profile a sub-graph
# and these dangling operators will not work (no graph to communicate to/from)
continue
input_var_names, output_var_names = _collect_op_input_var_names(
op
), _collect_op_output_var_names(op)
input_var_names, output_var_names = (
_collect_op_input_var_names(op),
_collect_op_output_var_names(op),
)
for var_name in input_var_names + output_var_names:
if var_name not in var_in_degree:
var_in_degree[var_name] = 0
Expand Down Expand Up @@ -280,10 +281,9 @@ def measure_program_real_op_cost(
isinstance(place, supported_place)
for supported_place in supported_places
), f'Current place ({place}) does not support runtime profiling. "place" should be one of the following: {supported_places}.'
assert isinstance(run_iters, int) and run_iters >= 1, (
'Invalid parameter run_iters set. run_iters '
'should be an integer >= 1.'
)
assert (
isinstance(run_iters, int) and run_iters >= 1
), 'Invalid parameter run_iters set. run_iters should be an integer >= 1.'
if run_iters == 1:
warnings.warn(
'run_iters was set to 1, profiling results might be inaccurate due to outliers.'
Expand Down
6 changes: 3 additions & 3 deletions python/paddle/distributed/auto_parallel/static/reshard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1096,9 +1096,9 @@ def __init__(
"The type of auto_parallel_startup_prog should be Program or None, "
f"but got {type(auto_parallel_startup_prog)}."
)
assert isinstance(rank_id, int), (
"The type of rank_id should be int, " f"but got {type(rank_id)}."
)
assert isinstance(
rank_id, int
), f"The type of rank_id should be int, but got {type(rank_id)}."
assert isinstance(dist_context, DistributedContext), (
"The type of dist_context should be DistributedContext, "
f"but got {type(dist_context)}."
Expand Down
4 changes: 2 additions & 2 deletions python/paddle/distributed/auto_tuner/prune.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ def prune_by_vpp_history(tuner_cfg, cur_cfg, history_cfgs=[], pruned_cfgs=[]):
cfg["vpp_degree"] > vpp_degree
and cfg.get("max_mem_usage") == "OOM"
):
pruned_reason = f"vpp_degree {vpp_degree} may cause oom because { cfg['vpp_degree']} already oom."
pruned_reason = f"vpp_degree {vpp_degree} may cause oom because {cfg['vpp_degree']} already oom."
log_pruned_info(cur_cfg, pruned_reason, tuner_cfg)
cur_cfg["max_mem_usage"] = "OOM"
return True
Expand Down Expand Up @@ -464,7 +464,7 @@ def prune_by_sharding_history(
cfg["sharding_stage"] < sharding_stage
and cfg.get("time", -1) > 0
):
pruned_reason = f"sharding_stage {sharding_stage} may be slower because {cfg['sharding_stage'] } has been already runnable."
pruned_reason = f"sharding_stage {sharding_stage} may be slower because {cfg['sharding_stage']} has been already runnable."
log_pruned_info(cur_cfg, pruned_reason, tuner_cfg)
cur_cfg["time"] = cfg["time"]
return True
Expand Down
2 changes: 1 addition & 1 deletion python/paddle/distributed/communication/deep_ep/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def __init__(
# Make sure QP depth is always larger than the number of on-flight WRs, so that we can skip WQ slot check
os.environ['NVSHMEM_QP_DEPTH'] = '1024'
# NOTES: NVSHMEM initialization requires at least 256 MiB
os.environ['NVSHMEM_CUMEM_GRANULARITY'] = f'{2 ** 29}'
os.environ['NVSHMEM_CUMEM_GRANULARITY'] = f'{2**29}'

nvshmem_unique_ids = []
if (low_latency_mode and self.rank == 0) or (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def _all_to_all_in_static_mode(
if isinstance(out_tensor_or_tensor_list, list):
if len(out_tensor_or_tensor_list) != 0:
raise ValueError(
"The 'out_tensor_list' for all_to_all " "must be an empty list."
"The 'out_tensor_list' for all_to_all must be an empty list."
)
out_tensor = helper.create_variable_for_type_inference(
dtype=in_tensor.dtype
Expand Down
2 changes: 1 addition & 1 deletion python/paddle/distributed/fleet/base/graphviz.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ def add_param(self, name, data_type, highlight=False):
' <tr>',
' <td>',
str(data_type),
' </td>' ' </tr>',
' </td> </tr>',
'</table>>',
]
)
Expand Down
19 changes: 9 additions & 10 deletions python/paddle/distributed/fleet/layers/mpu/mp_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -899,16 +899,15 @@ def split(
... num_partitions=2)

"""
assert isinstance(size, (list, tuple)), (
"The type of size for "
"paddle.distributed.split must be list or tuple."
)
assert len(size) == 2, (
"Number of elements in size of " "paddle.distributed.split must be two."
)
assert isinstance(operation, str), (
"The type of operation for " "paddle.distributed.split must be str."
)
assert isinstance(
size, (list, tuple)
), "The type of size for paddle.distributed.split must be list or tuple."
assert (
len(size) == 2
), "Number of elements in size of paddle.distributed.split must be two."
assert isinstance(
operation, str
), "The type of operation for paddle.distributed.split must be str."
supported_operations = [
'linear',
'embedding',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ def _get_hybrid_degree(self):
assert strategy.pipeline is True

if os.getenv("PADDLE_MANUAL_PIPELINE_STAGE", None):
assert pp_degree == 2, (
"For manually set pipeline, only " "pp_degree = 2 is supported."
)
assert (
pp_degree == 2
), "For manually set pipeline, only pp_degree = 2 is supported."
assert (
global_world_size == mp_degree * sharding_degree * dp_degree
), f"global work size [{global_world_size}], mp_degree [{mp_degree}], sharding_degree [{sharding_degree}], dp_degree [{dp_degree}]."
Expand Down Expand Up @@ -1565,9 +1565,9 @@ def _build_groups(self):
# sharding-hybrid-dp as one scenario of outer-pure-dp
local_pp_degree = self.pp_degree
if os.getenv("PADDLE_MANUAL_PIPELINE_STAGE", None):
assert self.pp_degree == 2, (
"For manually set pipeline, only " "pp_degree = 2 is supported."
)
assert (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里,再使用 ruff 的话还会再变回去,或者说 ruff format 的风格就是

assert xxx, (
    "xxxxxxxx"
)

而不是

assert (
    xxx
), "xxxxxxxx"

self.pp_degree == 2
), "For manually set pipeline, only pp_degree = 2 is supported."
assert (
self.global_word_size
== self.mp_degree * self.sharding_degree * self.dp_degree
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -832,9 +832,10 @@ def forward_backward_pipeline(
if not self.is_pipeline_last_stage():
self._release_output(output_tensor_tuple)

input_tensor, output_tensor = input_buffers.pop(
0
), output_buffers.pop(0)
input_tensor, output_tensor = (
input_buffers.pop(0),
output_buffers.pop(0),
)

self._record_stamp("B", i, '"B"', self._backward_color)
input_tensor_grad = self._backward_step(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,9 +349,10 @@ def _segment_params(self):
Divide all optimizer parameters equally into rank.
"""
if len(self.__segment_params) == 0:
self.__segment_params, param_lists = [
[] for _ in range(self.world_size)
], [[] for _ in range(self.world_size)]
self.__segment_params, param_lists = (
[[] for _ in range(self.world_size)],
[[] for _ in range(self.world_size)],
)
sizes = [0] * self.world_size
for param in self._local_params:
# Add this param to rank with smallest size.
Expand Down
4 changes: 3 additions & 1 deletion python/paddle/distributed/fleet/utils/log_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ def get_rotate_file_logger(log_level, name='root'):

path = os.path.join(log_dir, f"worker_{device_id}.log")
handler = RotatingFileHandler(
path, maxBytes=2 * 1024 * 1024 * 1024, backupCount=3 # 2GB
path,
maxBytes=2 * 1024 * 1024 * 1024,
backupCount=3, # 2GB
)

log_format = logging.Formatter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def assign_group_by_size(parameters, group_size=128 * 1024 * 1024):
group_size += np.prod(parameters[index].shape)
dtype = parameters[indices[0]].dtype
bytes = group_size * core.size_of_dtype(dtype)
msg = f"group_{group_idx}: {bytes / 1024 ** 2:.4f} MB, dtype: {dtype!s}"
msg = f"group_{group_idx}: {bytes / 1024**2:.4f} MB, dtype: {dtype!s}"
group_msg.append(msg)

logger.info(f"Tensor Fusion Group Info:\n{group_msg}\n")
Expand Down Expand Up @@ -416,7 +416,6 @@ def get_grad_address(param, use_main_grad):


class FusedCommBuffer:

class Status(enum.Enum):
"""Status of this bucket, Only useful when param allgather overlap is enabled"""

Expand Down
5 changes: 2 additions & 3 deletions python/paddle/distributed/transpiler/distribute_transpiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,7 @@ def transpile(
index += 1
else:
AssertionError(
"Can not insert the send op by original " "variable name :",
"Can not insert the send op by original variable name :",
splited_grad_varname,
)

Expand Down Expand Up @@ -2370,8 +2370,7 @@ def _insert_split_op(self, program, orig_var, index, splited_vars):
)
else:
AssertionError(
"Variable type should be in set "
"[DENSE_TENSOR, SELECTED_ROWS]"
"Variable type should be in set [DENSE_TENSOR, SELECTED_ROWS]"
)

def _get_optimizer_input_shape(
Expand Down
5 changes: 3 additions & 2 deletions python/paddle/distributed/utils/process_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ def _process_raw_cpu_info(i):
processed_cpu_info = []
cpu_ranges = i.split(',')
for cpu_range in cpu_ranges:
start, end = int(cpu_range.split("-")[0]), int(
cpu_range.split("-")[1]
start, end = (
int(cpu_range.split("-")[0]),
int(cpu_range.split("-")[1]),
)
processed_cpu_info.extend(list(range(start, end + 1)))
return processed_cpu_info
Expand Down
2 changes: 1 addition & 1 deletion python/paddle/distribution/lkj_cholesky.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def tril_matrix_to_vec(mat: Tensor, diag: int = 0) -> Tensor:
out_shape = mat.shape[:-2]
n = mat.shape[-1]
if diag < -n or diag >= n:
raise ValueError(f"diag ({diag}) provided is outside [{-n}, {n-1}].")
raise ValueError(f"diag ({diag}) provided is outside [{-n}, {n - 1}].")

rows, cols = paddle.meshgrid(paddle.arange(n), paddle.arange(n))
tril_mask = diag + rows >= cols
Expand Down
Loading