Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion examples/multimodal/convert_ckpt_to_nemo.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ def convert(local_rank, rank, world_size, args):
parallel_state.initialize_model_parallel(
tensor_model_parallel_size=app_state.tensor_model_parallel_size,
pipeline_model_parallel_size=app_state.pipeline_model_parallel_size,
pipeline_model_parallel_split_rank=app_state.pipeline_model_parallel_split_rank,
)

app_state.pipeline_model_parallel_rank = parallel_state.get_pipeline_model_parallel_rank()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ def convert(local_rank, rank, world_size, args):
parallel_state.initialize_model_parallel(
tensor_model_parallel_size=app_state.tensor_model_parallel_size,
pipeline_model_parallel_size=app_state.pipeline_model_parallel_size,
pipeline_model_parallel_split_rank=app_state.pipeline_model_parallel_split_rank,
)

app_state.pipeline_model_parallel_rank = parallel_state.get_pipeline_model_parallel_rank()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,6 @@ def convert(local_rank, rank, world_size, args):
parallel_state.initialize_model_parallel(
tensor_model_parallel_size=app_state.tensor_model_parallel_size,
pipeline_model_parallel_size=app_state.pipeline_model_parallel_size,
pipeline_model_parallel_split_rank=app_state.pipeline_model_parallel_split_rank,
)

app_state.pipeline_model_parallel_rank = parallel_state.get_pipeline_model_parallel_rank()
Expand Down
1 change: 0 additions & 1 deletion examples/nlp/language_modeling/megatron_ckpt_to_nemo.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ def convert(local_rank, rank, world_size, args):
parallel_state.initialize_model_parallel(
tensor_model_parallel_size=app_state.tensor_model_parallel_size,
pipeline_model_parallel_size=app_state.pipeline_model_parallel_size,
pipeline_model_parallel_split_rank=app_state.pipeline_model_parallel_split_rank,
)

app_state.pipeline_model_parallel_rank = parallel_state.get_pipeline_model_parallel_rank()
Expand Down
1 change: 0 additions & 1 deletion examples/nlp/language_modeling/megatron_gpt_drop_layers.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ def main(local_rank, rank, world_size, args):
parallel_state.initialize_model_parallel(
tensor_model_parallel_size=app_state.tensor_model_parallel_size,
pipeline_model_parallel_size=app_state.pipeline_model_parallel_size,
pipeline_model_parallel_split_rank=app_state.pipeline_model_parallel_split_rank,
)

app_state.pipeline_model_parallel_rank = parallel_state.get_pipeline_model_parallel_rank()
Expand Down
1 change: 0 additions & 1 deletion examples/vision/convert_ckpt_to_nemo.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ def convert(local_rank, rank, world_size, args):
parallel_state.initialize_model_parallel(
tensor_model_parallel_size=app_state.tensor_model_parallel_size,
pipeline_model_parallel_size=app_state.pipeline_model_parallel_size,
pipeline_model_parallel_split_rank=app_state.pipeline_model_parallel_split_rank,
)

app_state.pipeline_model_parallel_rank = parallel_state.get_pipeline_model_parallel_rank()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ def __init__(self, cfg: DictConfig, trainer: Trainer, no_lm_init=True):
pipeline_model_parallel_size=cfg.get('pipeline_model_parallel_size', 1),
pipeline_model_parallel_comm_backend=cfg.get('pipeline_model_parallel_comm_backend', None),
virtual_pipeline_model_parallel_size=vp_size,
pipeline_model_parallel_split_rank=cfg.get('pipeline_model_parallel_split_rank', 0),
pipeline_model_parallel_split_rank=cfg.get('pipeline_model_parallel_split_rank', None),
use_tp_pp_dp_mapping=cfg.get('use_tp_pp_dp_mapping', False),
num_distributed_optimizer_instances=self.cfg.optim.get('num_distributed_optimizer_instances', 1),
context_parallel_size=cfg.get('context_parallel_size', 1),
Expand Down Expand Up @@ -1170,17 +1170,13 @@ def _get_total_params_across_model_parallel_groups_enc_dec(self, model):
num_parameters_on_device -= num_word_embedding_parameters

# Subtract decoder position embedding params that are shared with encoder.
if (
parallel_state.is_pipeline_stage_at_split()
and self.cfg.encoder.get("position_embedding_type", "learned_absolute") == "learned_absolute"
):
if self.cfg.encoder.get("position_embedding_type", "learned_absolute") == "learned_absolute":
num_position_embedding_parameters = sum([p.nelement() for p in model.position_embeddings_weight()])
num_parameters_on_device -= num_position_embedding_parameters

# Check and remove RPE embeddings from the encoder that are replicated.
if (
parallel_state.get_pipeline_model_parallel_world_size() > 1
and parallel_state.is_pipeline_stage_before_split()
and not parallel_state.is_pipeline_first_stage()
and self.cfg.encoder.get("position_embedding_type", "learned_absolute") == "relative"
):
Expand All @@ -1191,8 +1187,6 @@ def _get_total_params_across_model_parallel_groups_enc_dec(self, model):
# Check and remove RPE embeddings from the decoder that are replicated.
if (
parallel_state.get_pipeline_model_parallel_world_size() > 1
and parallel_state.is_pipeline_stage_after_split()
and not parallel_state.is_pipeline_stage_at_split()
and self.cfg.encoder.get("position_embedding_type", "learned_absolute") == "relative"
):
# substract the RPE params on intermediate pipeline stages.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,13 @@ def __init__(self, cfg: DictConfig, trainer: Trainer):
model_provider_func=self.model_provider_func,
wrap_with_ddp=False,
on_cpu=True,
model_type=ModelType.encoder_and_decoder,
model_type=ModelType.encoder_or_decoder,
)[0]
else:
self.enc_dec_model = build_model(
model_provider_func=self.model_provider_func,
wrap_with_ddp=False,
model_type=ModelType.encoder_and_decoder,
model_type=ModelType.encoder_or_decoder,
)[0]

# We don't need to call it explicitly? Since it is a pytorch lightning hook function
Expand All @@ -154,7 +154,7 @@ def __init__(self, cfg: DictConfig, trainer: Trainer):
True if (not self.megatron_amp_O2) and (self.autocast_dtype in [torch.float16, torch.bfloat16]) else False
)

self.enc_dec_model.model_type = ModelType.encoder_and_decoder
self.enc_dec_model.model_type = ModelType.encoder_or_decoder

def setup_optimizer_param_groups(self):
"""ModelPT override. Optimizer will get self._optimizer_param_groups"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class MegatronT5PromptLearningModel(MegatronBasePromptLearningModel):

def __init__(self, cfg: DictConfig, trainer: Trainer):
super().__init__(cfg, trainer)
self.model_type = ModelType.encoder_and_decoder
self.model_type = ModelType.encoder_or_decoder

def first_stage_of_pipeline(self):
if self.frozen_model.enc_dec_model.pre_process and parallel_state.get_pipeline_model_parallel_rank() == 0:
Expand Down
23 changes: 0 additions & 23 deletions nemo/collections/nlp/modules/common/megatron/build_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,29 +94,6 @@ def build_model(
pre_process=parallel_state.is_pipeline_first_stage(),
post_process=parallel_state.is_pipeline_last_stage(),
)
elif model_type == ModelType.encoder_and_decoder:
pre_process = parallel_state.is_pipeline_first_stage()
post_process = parallel_state.is_pipeline_last_stage()
# `add_encoder` & `add_decoder` logic.
add_encoder, add_decoder = True, True
if parallel_state.get_pipeline_model_parallel_world_size() > 1:
split_rank = parallel_state.get_pipeline_model_parallel_split_rank()
if split_rank is None:
raise RuntimeError("Split rank needs to be specified for model with both encoder and decoder.")
rank = parallel_state.get_pipeline_model_parallel_rank()
world_size = parallel_state.get_pipeline_model_parallel_world_size()
pre_process = rank == 0 or rank == split_rank
post_process = rank == (split_rank - 1) or rank == (world_size - 1)
add_encoder = parallel_state.is_pipeline_stage_before_split()
add_decoder = parallel_state.is_pipeline_stage_after_split()
model = model_provider_func(
*args,
**kwargs,
pre_process=pre_process,
post_process=post_process,
add_encoder=add_encoder,
add_decoder=add_decoder,
)
else:
raise ValueError(f"Unrecognized ModelType '{model_type}'")

Expand Down
19 changes: 17 additions & 2 deletions nemo/collections/nlp/modules/common/megatron/megatron_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
set_expert_model_parallel_rank,
set_expert_model_parallel_world_size,
set_pipeline_model_parallel_rank,
set_pipeline_model_parallel_split_rank,
set_pipeline_model_parallel_world_size,
set_tensor_model_parallel_rank,
set_tensor_model_parallel_world_size,
Expand Down Expand Up @@ -101,6 +100,15 @@ def initialize_model_parallel_for_nemo(
"""
Initialize the model parallel groups for NeMo.
"""
assert (
pipeline_model_parallel_split_rank is None or pipeline_model_parallel_split_rank == 0
), "pipeline_model_parallel_split_rank is deprecated."
assert encoder_pipeline_model_parallel_size == 0 and (
encoder_tensor_model_parallel_size == 0 or encoder_tensor_model_parallel_size == tensor_model_parallel_size
), (
"encoder_pipeline_model_parallel_size is temporarily "
"unavailable. We are working on a refactoring to add it back."
)

# updating NeMo globals
app_state = AppState()
Expand Down Expand Up @@ -151,7 +159,6 @@ def initialize_model_parallel_for_nemo(
set_pipeline_model_parallel_world_size(
app_state.pipeline_model_parallel_size + app_state.encoder_pipeline_model_parallel_size
)
set_pipeline_model_parallel_split_rank(app_state.pipeline_model_parallel_split_rank)
set_pipeline_model_parallel_rank(app_state.pipeline_model_parallel_rank)

tensor_parallel.random.initialize_rng_tracker(use_te_rng_tracker=use_te_rng_tracker)
Expand Down Expand Up @@ -282,6 +289,14 @@ def fake_initialize_model_parallel(
ranks 8 to 15 belong to the second box.
"""

assert encoder_pipeline_model_parallel_size_ == 0 and (
encoder_tensor_model_parallel_size_ == 0 or encoder_tensor_model_parallel_size_ == tensor_model_parallel_size_
), (
"encoder_pipeline_model_parallel_size is temporarily "
"unavailable. We are working on a refactoring to add it back."
)
assert pipeline_model_parallel_split_rank_ is None, "pipeline_model_parallel_split_rank is deprecated."

# Get world size and rank. Ensure some consistencies.
tensor_model_parallel_size = min(tensor_model_parallel_size_, world_size)
pipeline_model_parallel_size = min(pipeline_model_parallel_size_, world_size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,9 @@ def __init__(
init_method=encoder_init,
scaled_init_method=encoder_scaled_init,
pre_process=pre_process,
post_process=False
if megatron_lm_compatible
else post_process, # megatron lm model has no final layer_norm
post_process=(
False if megatron_lm_compatible else post_process
), # megatron lm model has no final layer_norm
init_method_std=init_method_std,
hidden_dropout=hidden_dropout,
attention_dropout=attention_dropout,
Expand All @@ -196,7 +196,7 @@ def __init__(
normalization=normalization,
transformer_block_type=transformer_block_type,
headscale=headscale,
parent_model_type=ModelType.encoder_and_decoder,
parent_model_type=ModelType.encoder_or_decoder,
layer_type=enc_layer_types,
chunk_size=chunk_size,
layer_number_offset=0,
Expand Down Expand Up @@ -260,7 +260,7 @@ def __init__(
normalization=normalization,
transformer_block_type=transformer_block_type,
headscale=headscale,
parent_model_type=ModelType.encoder_and_decoder,
parent_model_type=ModelType.encoder_or_decoder,
layer_type=pre_decoder_layer_types,
chunk_size=chunk_size,
layer_number_offset=0,
Expand Down Expand Up @@ -304,7 +304,7 @@ def __init__(
normalization=normalization,
headscale=headscale,
transformer_block_type=transformer_block_type,
parent_model_type=ModelType.encoder_and_decoder,
parent_model_type=ModelType.encoder_or_decoder,
layer_type=post_decoder_layer_types,
chunk_size=chunk_size,
layer_number_offset=pre_decoder_num_layers + 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

# pylint: disable=C0301,C0115,C0116
import torch
from omegaconf import DictConfig

Expand Down Expand Up @@ -248,7 +249,7 @@ def __init__(
normalization=encoder_cfg.get('normalization', 'layernorm'),
transformer_block_type=encoder_cfg.get('transformer_block_type', 'pre_ln'),
headscale=encoder_cfg.get('headscale', False),
parent_model_type=ModelType.encoder_and_decoder,
parent_model_type=ModelType.encoder_or_decoder,
num_self_attention_per_cross_attention=encoder_cfg.get('num_self_attention_per_cross_attention', 1),
megatron_legacy=encoder_cfg.get('megatron_legacy', False),
normalize_attention_scores=encoder_cfg.get('normalize_attention_scores', True),
Expand Down Expand Up @@ -386,7 +387,7 @@ def __init__(
normalization=decoder_cfg.get('normalization', 'layernorm'),
transformer_block_type=decoder_cfg.get('transformer_block_type', 'pre_ln'),
headscale=decoder_cfg.get('headscale', False),
parent_model_type=ModelType.encoder_and_decoder,
parent_model_type=ModelType.encoder_or_decoder,
megatron_legacy=decoder_cfg.get('megatron_legacy', False),
normalize_attention_scores=decoder_cfg.get('normalize_attention_scores', True),
num_moe_experts=decoder_cfg.get('num_moe_experts', 1),
Expand Down Expand Up @@ -455,7 +456,7 @@ def _validate_perceiver_config(self, cfg):
cfg.get("position_embedding_type", "learned_absolute") == "relative"
and cfg.get("arch", "transformer") == "perceiver"
):
raise ValueError(f"Perceivers with relative position embeddings are not supported")
raise ValueError("Perceivers with relative position embeddings are not supported")

def _validate_config(self):
encoder_kv_channels = self._validate_kv_channels(self.encoder_cfg)
Expand Down
4 changes: 2 additions & 2 deletions nemo/collections/nlp/modules/common/megatron/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1240,7 +1240,7 @@ def build_layer(layer_number):
else:
# Each stage gets a contiguous set of layers.
if (
self.model_type == ModelType.encoder_and_decoder
self.model_type == ModelType.encoder_or_decoder
and parallel_state.get_pipeline_model_parallel_world_size() > 1
):
pipeline_rank = parallel_state.get_pipeline_model_parallel_rank()
Expand Down Expand Up @@ -1307,7 +1307,7 @@ def _get_layer(self, layer_number):
def get_num_layers(self, num_layers):
"""Compute the number of transformer layers resident on the current rank."""
if parallel_state.get_pipeline_model_parallel_world_size() > 1:
if self.model_type == ModelType.encoder_and_decoder:
if self.model_type == ModelType.encoder_or_decoder:
assert parallel_state.get_pipeline_model_parallel_split_rank() is not None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would this assertion still be valid after we remove pipeline_model_parallel_split_rank in setting?

num_ranks_in_encoder = parallel_state.get_pipeline_model_parallel_split_rank()
num_ranks_in_decoder = parallel_state.get_pipeline_model_parallel_world_size() - num_ranks_in_encoder
Expand Down
4 changes: 3 additions & 1 deletion nemo/collections/nlp/parts/nlp_overrides.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,15 @@ def init_model_parallel(
if app_state.model_parallel_size is not None:
# destroy groups in case they have already been created
# this happens with multiple calls to trainer.test for example
assert (
app_state.pipeline_model_parallel_split_rank is None
), "pipeline_model_parallel_split_rank is deprecated."
parallel_state.destroy_model_parallel()
if torch.distributed.is_initialized():
parallel_state.initialize_model_parallel(
tensor_model_parallel_size=app_state.tensor_model_parallel_size,
pipeline_model_parallel_size=app_state.pipeline_model_parallel_size,
virtual_pipeline_model_parallel_size=app_state.virtual_pipeline_model_parallel_size,
pipeline_model_parallel_split_rank=app_state.pipeline_model_parallel_split_rank,
pipeline_model_parallel_comm_backend=app_state.pipeline_model_parallel_comm_backend,
context_parallel_size=app_state.context_parallel_size,
nccl_communicator_config_path=nccl_communicator_config_path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class MegatronT5SpeechLMModel(MegatronBaseSpeechLM):

def __init__(self, cfg: DictConfig, trainer: Trainer):
super().__init__(cfg, trainer)
self.model_type = ModelType.encoder_and_decoder
self.model_type = ModelType.encoder_or_decoder
speech_codebook_size = cfg.data.get('speech_codebook_size', 1024)
num_speech_codebooks = cfg.data.get('num_speech_codebooks', 8)
speech_offset = cfg.data.get('speech_offset', 30000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,10 +544,7 @@ def build_layer(layer_number):
) + (parallel_state.get_pipeline_model_parallel_rank() * self.num_layers)
else:
# Each stage gets a contiguous set of layers.
if (
self.model_type == ModelType.encoder_and_decoder
and parallel_state.get_pipeline_model_parallel_world_size() > 1
):
if parallel_state.get_pipeline_model_parallel_world_size() > 1:
pipeline_rank = parallel_state.get_pipeline_model_parallel_rank()
if layer_type == LayerType.encoder:
offset = pipeline_rank * self.num_layers
Expand Down
2 changes: 1 addition & 1 deletion nemo/collections/vlm/mllama/model/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ def __init__(
if self.add_encoder:
self.vision_model = vision_model_config.configure_model()

self.model_type = ModelType.encoder_and_decoder
self.model_type = ModelType.encoder_or_decoder
self.xattn_needed = True

self.patch_size = 14
Expand Down
Loading
Loading