diff --git a/.github/workflows/build-test-publish-wheel.yml b/.github/workflows/build-test-publish-wheel.yml index d109b746c..b3625e525 100644 --- a/.github/workflows/build-test-publish-wheel.yml +++ b/.github/workflows/build-test-publish-wheel.yml @@ -19,31 +19,23 @@ on: branches: - main - 'r**' - + pull_request: + defaults: run: shell: bash -x -e -u -o pipefail {0} jobs: build-test-publish-wheel: - uses: NVIDIA/NeMo-FW-CI-templates/.github/workflows/_build_test_publish_wheel.yml@v0.12.2 + uses: NVIDIA/NeMo-FW-CI-templates/.github/workflows/_build_test_publish_wheel.yml@v0.22.3 with: - image-name: nemo_container - dockerfile: Dockerfile - image-label: nemo-aligner - build-args: | - IMAGE_LABEL=nemo-aligner - MAX_JOBS=32 - ALIGNER_COMMIT=${{ github.event.pull_request.head.sha || github.sha }} - prune-filter-timerange: 24h dry-run: true python-package: nemo_aligner - container-workdir: /opt/NeMo-Aligner - environment: public + python-version: "3.11" secrets: TWINE_USERNAME: ${{ secrets.TWINE_USERNAME }} TWINE_PASSWORD: ${{ secrets.TWINE_PASSWORD }} + SLACK_WEBHOOK: ${{ secrets.SLACK_RELEASE_ENDPOINT }} SLACK_WEBHOOK_ADMIN: ${{ secrets.SLACK_WEBHOOK_ADMIN }} - SLACK_WEBHOOK: ${{ secrets.SLACK_WEBHOOK }} \ No newline at end of file diff --git a/.github/workflows/cherry-pick-release-commit.yml b/.github/workflows/cherry-pick-release-commit.yml index a1d300ca4..9ba66b1db 100644 --- a/.github/workflows/cherry-pick-release-commit.yml +++ b/.github/workflows/cherry-pick-release-commit.yml @@ -4,10 +4,11 @@ on: push: branches: - main + - dev jobs: cherry-pick: - uses: NVIDIA/NeMo-FW-CI-templates/.github/workflows/_cherry_pick.yml@v0.6.0 + uses: NVIDIA/NeMo-FW-CI-templates/.github/workflows/_cherry_pick.yml@v0.22.7 secrets: PAT: ${{ secrets.PAT }} SLACK_WEBHOOK_ADMIN: ${{ secrets.SLACK_WEBHOOK_ADMIN }} diff --git a/.github/workflows/cicd-main.yml b/.github/workflows/cicd-main.yml index 728f3b6bf..a9bf76daf 100644 --- a/.github/workflows/cicd-main.yml +++ b/.github/workflows/cicd-main.yml @@ -55,45 +55,46 @@ jobs: run: | echo "main=${{ contains(fromJSON(steps.test_to_run.outputs.main), 'all') }}" | tee -a "$GITHUB_OUTPUT" - - name: Get changed files - id: changed-files - if: github.event_name == 'pull_request' - uses: tj-actions/changed-files@v44 - with: - files_yaml: | - doc: - - '**.md' - - docs/** - src: - - '!**.md' - - '!docs/**' + # - name: Get changed files + # id: changed-files + # if: github.event_name == 'pull_request' + # uses: tj-actions/changed-files@v44 + # with: + # files_yaml: | + # doc: + # - '**.md' + # - docs/** + # src: + # - '!**.md' + # - '!docs/**' - name: Evaluate conditions id: evaluate env: - DOCS_ONLY: ${{ steps.changed-files.outputs.doc_any_changed == 'true' && steps.changed-files.outputs.src_any_changed == 'false' }} - CHANGED_DOCS: ${{ steps.changed-files.outputs.doc_all_changed_files }} - CHANGED_SRC: ${{ steps.changed-files.outputs.src_all_changed_files }} + # DOCS_ONLY: ${{ steps.changed-files.outputs.doc_any_changed == 'true' && steps.changed-files.outputs.src_any_changed == 'false' }} + # CHANGED_DOCS: ${{ steps.changed-files.outputs.doc_all_changed_files }} + # CHANGED_SRC: ${{ steps.changed-files.outputs.src_all_changed_files }} IS_PULLREQUEST: ${{ github.event_name == 'pull_request' }} LABEL: ${{ github.event.label.name == 'Run CICD' }} MERGE_GROUP: ${{ github.event_name == 'merge_group' }} run: | # Some output that's helpful for debugging - echo "Docs changed: $CHANGED_DOCS" - echo "Src changed: $CHANGED_SRC" + # echo "Docs changed: $CHANGED_DOCS" + # echo "Src changed: $CHANGED_SRC" - echo "DOCS_ONLY: $DOCS_ONLY" + # echo "DOCS_ONLY: $DOCS_ONLY" echo "LABEL: $LABEL" echo "IS_PULLREQUEST: $IS_PULLREQUEST" # Run CI only (on main or if label is attached) and if it's not only docs - echo run_ci=$([[ ("$LABEL" = "true" || "$IS_PULLREQUEST" = "false" || "$MERGE_GROUP" = "true") && "$DOCS_ONLY" = "false" ]] && echo "true" || echo "false") | tee -a "$GITHUB_OUTPUT" + echo run_ci=$([[ ("$LABEL" = "true" || "$IS_PULLREQUEST" = "false" || "$MERGE_GROUP" = "true") ]] && echo "true" || echo "false") | tee -a "$GITHUB_OUTPUT" build-container: if: ${{ needs.pre-flight.outputs.run_ci == 'true' }} needs: [pre-flight] - uses: NVIDIA/NeMo-FW-CI-templates/.github/workflows/_build_container.yml@v0.1.0 + uses: NVIDIA/NeMo-FW-CI-templates/.github/workflows/_build_container.yml@v0.22.7 with: + build-ref: ${{ github.sha }} image-name: nemo_aligner_container dockerfile: Dockerfile image-label: nemo-aligner @@ -160,4 +161,4 @@ jobs: echo '🤖: CICD Result' >> $GITHUB_STEP_SUMMARY echo "$SUMMARY" >> $GITHUB_STEP_SUMMARY - test "$ALL_SUCCESS" = "true" || test "$CI_SKIP" = "true" \ No newline at end of file + test "$ALL_SUCCESS" = "true" || test "$CI_SKIP" = "true" diff --git a/.github/workflows/release-freeze.yml b/.github/workflows/release-freeze.yml index 10ae3386e..85d7db004 100644 --- a/.github/workflows/release-freeze.yml +++ b/.github/workflows/release-freeze.yml @@ -14,14 +14,20 @@ on: description: Commit SHA to use for cut-off required: false default: main + dry-run: + type: boolean + description: Dry-run of code-freeze + required: false + default: true jobs: code-freeze: - uses: NVIDIA/NeMo-FW-CI-templates/.github/workflows/_code_freeze.yml@v0.17.3 + uses: NVIDIA/NeMo-FW-CI-templates/.github/workflows/_code_freeze.yml@v0.22.5 with: library-name: NeMo-Aligner python-package: nemo_aligner release-type: ${{ inputs.release-type }} freeze-commit: ${{ inputs.freeze-commit }} + dry-run: ${{ inputs.dry-run }} secrets: - SLACK_RELEASE_ENDPOINT: ${{ secrets.SLACK_RELEASE_ENDPOINT }} + SLACK_WEBHOOK: ${{ secrets.SLACK_RELEASE_ENDPOINT }} SLACK_WEBHOOK_ADMIN: ${{ secrets.SLACK_WEBHOOK_ADMIN }} diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 9678d8db5..e474fa72f 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -32,24 +32,17 @@ on: jobs: release: - uses: NVIDIA/NeMo-FW-CI-templates/.github/workflows/_release_library.yml@v0.18.4 + uses: NVIDIA/NeMo-FW-CI-templates/.github/workflows/_release_library.yml@v0.22.6 with: release-ref: ${{ inputs.release-ref }} - image-name: nemo_aligner_container - dockerfile: Dockerfile - image-label: nemo-aligner - build-args: | - MAX_JOBS=32 - ALIGNER_COMMIT=${{ inputs.release-ref }} - prune-filter-timerange: 24h python-package: nemo_aligner - container-workdir: /opt/NeMo-Aligner + python-version: "3.11" library-name: NeMo-Aligner dry-run: ${{ inputs.dry-run }} version-bump-branch: ${{ inputs.version-bump-branch }} secrets: TWINE_USERNAME: ${{ secrets.TWINE_USERNAME }} TWINE_PASSWORD: ${{ secrets.TWINE_PASSWORD }} - SLACK_RELEASE_ENDPOINT: ${{ secrets.SLACK_RELEASE_ENDPOINT }} - SLACK_WEBHOOK: ${{ secrets.SLACK_WEBHOOK }} + SLACK_WEBHOOK_ADMIN: ${{ secrets.SLACK_WEBHOOK_ADMIN }} + SLACK_WEBHOOK: ${{ secrets.SLACK_RELEASE_ENDPOINT }} PAT: ${{ secrets.PAT }} diff --git a/CHANGELOG.md b/CHANGELOG.md index ca8bc3b37..077f4bfbe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,12 +38,15 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) ``` - Add code and instructions for replicating Reward Modeling training in HelpSteer2 and HelpSteer2-Preference - Implement REINFORCE algorithm. +- Add support for multiple validation sets when training a Reward Model. +- Add support for `validation_drop_last=False` when training a Reward Model. ### Breaking Changes - Upgrade TRTLLM dependency from v0.10.0 to v0.12.0 and migrate from `GPTSession` cpp runtime to `ModelRunner` python runtime. Please use the latest Dockerfile. - Using latest TransformerEngine versions may require `++model.dist_ckpt_load_strictness=log_all` when loading from a older pre-existing checkpoint to not error out. - NeMo-Aligner now requires Megatron-LM==0.9.0 for the APIs to calculate the microbatch sizes (API introduced `megatron.core.num_microbatches_calculator.reconfigure_num_microbatch_calculator`). - NeMo-Aligner now requires a version of NeMo with this change to how the MoE spec is handled: https://github.com/NVIDIA/NeMo/pull/9035 . +- Validation metrics of the SupervisedTrainer (used by SFT and Reward Model) are now logged in Weights&Biases as / instead of val/. ### Bug Fixes - It is now required, for stability, to add `export NCCL_ALGO=...` to scripts launching PPO training loop. Please see the [RLHF docs](./docs/user-guide/rlhf.rst) for information. diff --git a/README.md b/README.md index a2a500e26..8c942f875 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,7 @@ # NVIDIA NeMo-Aligner +⚠️ **As of 5/15/2025, this repository is no longer actively maintained. We recommend switching to [NeMo RL](https://github.com/NVIDIA/NeMo-RL), a scalable and modular post-training library with seamless Hugging Face integration, Megatron Core optimizations, and uses Ray as the scheduling backbone.** ⚠️ + # Latest News - We released Nemotron-4-340B [Base](https://huggingface.co/nvidia/Nemotron-4-340B-Base), [Instruct](https://huggingface.co/nvidia/Nemotron-4-340B-Instruct), [Reward](https://huggingface.co/nvidia/Nemotron-4-340B-Reward). The Instruct and Reward variants are trained in Nemo-Aligner. Please see the [Helpsteer2](https://arxiv.org/abs/2406.08673) paper for more details on the reward model training. - We are excited to announce the release of accelerated generation support in our RLHF pipeline using [TensorRT-LLM](https://github.com/NVIDIA/TensorRT-LLM). For more information, please refer to our [RLHF documentation](https://docs.nvidia.com/nemo-framework/user-guide/latest/modelalignment/rlhf.html). diff --git a/docs/user-guide/nemoaligner.rsts b/docs/user-guide/nemoaligner.rsts index 1e54ed676..4a3ac6b99 100644 --- a/docs/user-guide/nemoaligner.rsts +++ b/docs/user-guide/nemoaligner.rsts @@ -45,6 +45,7 @@ NeMo-Aligner also provides its own `dockerfile None: load_gbs=True, use_random_sampler=cfg.trainer.rm.train_random_sampler, ) - - val_dataloader = build_dataloader( - cfg=cfg, - dataset=validation_ds, - consumed_samples=0, - mbs=cfg.model.micro_batch_size, - gbs=cfg.model.global_batch_size, - load_gbs=True, - use_random_sampler=cfg.trainer.rm.val_random_sampler, - ) + if isinstance(validation_ds, RewardModelDataset): + drop_last = cfg.model.data.get("validation_drop_last", True) + val_dataloader = build_dataloader( + cfg=cfg, + dataset=validation_ds, + consumed_samples=0, + mbs=cfg.model.micro_batch_size, + gbs=cfg.model.global_batch_size, + load_gbs=True, + use_random_sampler=cfg.trainer.rm.val_random_sampler, + drop_last=drop_last, + pad_samples_to_global_batch_size=not drop_last, + ) + elif isinstance(validation_ds, dict): + drop_last = cfg.model.data.get("validation_drop_last", True) + val_dataloader = { + key: build_dataloader( + cfg=cfg, + dataset=dataset, + consumed_samples=0, + mbs=cfg.model.micro_batch_size, + gbs=cfg.model.global_batch_size, + load_gbs=True, + use_random_sampler=cfg.trainer.rm.val_random_sampler, + drop_last=drop_last, + pad_samples_to_global_batch_size=not drop_last, + ) + for key, dataset in validation_ds.items() + } + else: + raise NotImplementedError("Unsupported validation dataset type") init_using_ptl(trainer, ptl_model, train_dataloader, train_ds) optimizer, scheduler = extract_optimizer_scheduler_from_ptl_model(ptl_model) diff --git a/nemo_aligner/__init__.py b/nemo_aligner/__init__.py index 4df0c0dc8..f3639afdd 100644 --- a/nemo_aligner/__init__.py +++ b/nemo_aligner/__init__.py @@ -13,12 +13,17 @@ # limitations under the License. import os -from nemo.utils import logging os.environ["DISABLE_TORCH_DEVICE_SET"] = "1" -logging.info( - f"Importing NeMo-Aligner sets DISABLE_TORCH_DEVICE_SET=1 to disable device reassignment within TensorRT-LLM" -) + +try: + from nemo.utils import logging + + logging.info( + f"Importing NeMo-Aligner sets DISABLE_TORCH_DEVICE_SET=1 to disable device reassignment within TensorRT-LLM" + ) +except ImportError: + pass from nemo_aligner.package_info import ( diff --git a/nemo_aligner/algorithms/supervised.py b/nemo_aligner/algorithms/supervised.py index 3f2f67c61..8cf0e6455 100644 --- a/nemo_aligner/algorithms/supervised.py +++ b/nemo_aligner/algorithms/supervised.py @@ -52,7 +52,7 @@ def __init__( ): self.model = model self.train_dataloader = train_dataloader - self.val_dataloader = val_dataloader + self.val_dataloaders = val_dataloader if isinstance(val_dataloader, dict) else {"validation": val_dataloader} self.test_dataloader = test_dataloader self.logger = logger self.cfg = cfg @@ -72,8 +72,10 @@ def __init__( self.num_steps_per_epoch = compute_num_steps_per_epoch( self.train_dataloader.batch_sampler, self.cfg.get("limit_train_batches", 1.0) ) - - self.limit_val_batches = compute_limit_batches(len(val_dataloader), self.cfg.limit_val_batches) + self.dict_limit_val_batches = { + key: compute_limit_batches(len(val_dataloader), self.cfg.limit_val_batches) + for key, val_dataloader in self.val_dataloaders.items() + } self.val_check_interval = ( int(self.cfg.val_check_interval * self.num_steps_per_epoch) if isinstance(self.cfg.val_check_interval, float) @@ -98,12 +100,23 @@ def validation_step(self, batch): @torch.no_grad() def run_validation(self): + val_loss, val_metrics = None, None + + for key in self.val_dataloaders: + loss, metrics = self.run_validation_one_dataset(key) + if key == "validation": + val_loss, val_metrics = loss, metrics + + return val_loss, val_metrics + + @torch.no_grad() + def run_validation_one_dataset(self, key: str): loss_means = [] val_metrics = defaultdict(list) val_pbar = tqdm( - zip(range(self.limit_val_batches), self.val_dataloader), - total=self.limit_val_batches, + zip(range(self.dict_limit_val_batches[key]), self.val_dataloaders[key]), + total=self.dict_limit_val_batches[key], leave=True, desc="Validation steps", ) @@ -130,11 +143,21 @@ def run_validation(self): log_val_metrics = {f"val_{k}": v for k, v in metrics.items()} val_pbar.set_postfix(log_val_metrics) - val_metrics = {k: mean(v) for k, v in val_metrics.items()} + if "weights" in val_metrics: + w = val_metrics.pop("weights") + val_loss = sum([value * weight for value, weight in zip(loss_means, w)]) / sum(w) + val_metrics = { + k: sum([value * weight for value, weight in zip(v, w)]) / sum(w) for k, v in val_metrics.items() + } + else: + val_loss = mean(loss_means) + val_metrics = {k: mean(v) for k, v in val_metrics.items()} val_metrics.update(self.inference_metrics_handler.compute()) self.inference_metrics_handler.reset() - return mean(loss_means), val_metrics + self.logger.log_metrics(val_metrics, step=self.step, prefix=f"{key}/") + + return val_loss, val_metrics def train_single_step(self, batch): self.optimizer.zero_grad() @@ -188,7 +211,6 @@ def fit(self): val_loss, val_metrics = self.run_validation() # validation is done on the UPDATED weights # so we use the incremented self.step - self.logger.log_metrics(val_metrics, step=self.step, prefix="val/") logging.info("Initial validation metrics logged.") for _ in epoch_iter: @@ -229,7 +251,7 @@ def fit(self): self.max_steps, self.val_check_interval, self.cfg.save_interval, - self.limit_val_batches, + self.dict_limit_val_batches["validation"], run_time_exceeded=run_time_exceeded, ) @@ -237,7 +259,6 @@ def fit(self): val_loss, val_metrics = self.run_validation() # validation is done on the UPDATED weights # so we use the incremented self.step - self.logger.log_metrics(val_metrics, step=self.step, prefix="val/") val_metrics = {f"val_{k}": v for k, v in val_metrics.items()} metrics.update(val_metrics) diff --git a/nemo_aligner/data/nlp/builders.py b/nemo_aligner/data/nlp/builders.py index 85ecac1e1..c87ee5bb0 100644 --- a/nemo_aligner/data/nlp/builders.py +++ b/nemo_aligner/data/nlp/builders.py @@ -129,7 +129,7 @@ def _build_dataset(current_data_prefix, current_num_samples): logging.info(" Total {} documents is : {} ".format(name, total_num_of_documents)) drop_last = True - if name == "valid": + if name.startswith("validation"): drop_last = cfg.data.get("validation_drop_last", True) dataset = cls( @@ -197,6 +197,12 @@ def build_train_valid_test_datasets( validation_n_chunks = n_chunks test_n_chunks = n_chunks + for key in data_prefix.keys(): + if key not in ["train", "test"] and not key.startswith("validation"): + raise NotImplementedError( + f"Unsupported key '{key}' found in data_prefix. Supported keys are: 'train', 'validation*', and 'test'." + ) + train_ds = build_dataset_generic( cls=cls, cfg=cfg, @@ -210,19 +216,27 @@ def build_train_valid_test_datasets( n_chunks=train_n_chunks, n_examples_per_chunk=train_examples_per_chunk, ) - validation_ds = build_dataset_generic( - cls=cls, - cfg=cfg, - data_prefix=data_prefix["validation"], - data_impl=data_impl, - num_samples=int(train_valid_test_num_samples[1]), - seq_length=seq_length, - seed=seed, - tokenizer=tokenizer, - name="validation", - n_chunks=validation_n_chunks, - n_examples_per_chunk=validation_examples_per_chunk, - ) + validation_keys = [key for key in data_prefix if key.startswith("validation")] + validation_ds = { + key: build_dataset_generic( + cls=cls, + cfg=cfg, + data_prefix=data_prefix[key], + data_impl=data_impl, + num_samples=int(train_valid_test_num_samples[1]), + seq_length=seq_length, + seed=seed, + tokenizer=tokenizer, + name=key, + n_chunks=validation_n_chunks, + n_examples_per_chunk=validation_examples_per_chunk, + ) + for key in validation_keys + } + if len(validation_ds) <= 1: + assert len(validation_ds) == 1, "No validation key found in data_prefix" + validation_ds = next(iter(validation_ds.values())) # Use validation dataset directly instead of a dict + test_ds = build_dataset_generic( cls=cls, cfg=cfg, diff --git a/nemo_aligner/data/nlp/datasets.py b/nemo_aligner/data/nlp/datasets.py index b1b6e2d6c..f6ba43d41 100644 --- a/nemo_aligner/data/nlp/datasets.py +++ b/nemo_aligner/data/nlp/datasets.py @@ -445,9 +445,6 @@ def __getitem__(self, idx): prompt_fmtd = payload["prompt"] chosen_fmtd = payload["prompt"] + payload["chosen_response"] rejected_fmtd = payload["prompt"] + payload["rejected_response"] - logging.warning( - "Pre-formatting chat conversation as string with hardcoded chat tokens will be deprecated." - ) # (@adithyare) this will spam the console for now. else: prompt_fmtd = self.convert(payload["prompt"]) # (@adithyare) read var as "prompt formatted" chosen_fmtd = self.convert(payload["prompt"] + [payload["chosen_response"]]) diff --git a/nemo_aligner/models/nlp/gpt/megatron_gpt_reward_model.py b/nemo_aligner/models/nlp/gpt/megatron_gpt_reward_model.py index 596436c67..0a667cf0e 100644 --- a/nemo_aligner/models/nlp/gpt/megatron_gpt_reward_model.py +++ b/nemo_aligner/models/nlp/gpt/megatron_gpt_reward_model.py @@ -184,49 +184,54 @@ def gather_and_split_rewards(rewards_out): def loss_func(output_tensor): # Loss per micro batch (ub). - loss_for_ub, acc_chosen = self.loss_func(output_tensor) + loss_for_ub = self.loss_func(output_tensor) + # Number of valid pairs in the micro batch. + mask_valid_pairs = batch["loss_mask"].sum(dim=1) > 0 + num_valid_pairs = mask_valid_pairs.sum() + # Compute loss average over valid pairs only. + loss_for_ub = loss_for_ub[mask_valid_pairs].mean() + # Compute accuracy over valid pairs only. + out_chosen, out_rejected = self.split_output_tensor(output_tensor) + comp = out_chosen > out_rejected + num_correct_pairs = comp[mask_valid_pairs].sum() + acc_chosen = num_correct_pairs / max(1, num_valid_pairs) if validation_step and not self.cfg.data.get("validation_drop_last", True): - num_valid_tokens_in_ub = batch["loss_mask"].sum() + if loss_for_ub.isnan(): - assert batch["loss_mask"].count_nonzero() == 0, "Got NaN loss with non-empty input" - loss_sum_for_ub = torch.zeros_like(num_valid_tokens_in_ub) + assert num_valid_pairs == 0, "Got NaN loss with non-empty input" + loss_sum_for_ub = torch.zeros_like(num_valid_pairs, dtype=loss_for_ub.dtype) else: - loss_sum_for_ub = num_valid_tokens_in_ub * loss_for_ub - - loss_sum_and_ub_size_all_gpu = torch.cat( - [ - loss_sum_for_ub.clone().detach().view(1), - torch.tensor([num_valid_tokens_in_ub]).cuda().clone().detach(), - ] - ) - torch.distributed.all_reduce( - loss_sum_and_ub_size_all_gpu, group=parallel_state.get_data_parallel_group() - ) - out_chosen, out_rejected = gather_and_split_rewards(output_tensor) - - return ( - loss_for_ub, - { - "loss_sum_and_ub_size": loss_sum_and_ub_size_all_gpu, - "out_chosen": out_chosen, - "out_rejected": out_rejected, - }, - ) + loss_sum_for_ub = num_valid_pairs * loss_for_ub + + tensor_to_reduce = torch.stack([loss_sum_for_ub, num_valid_pairs, num_correct_pairs,]) + torch.distributed.all_reduce(tensor_to_reduce, group=parallel_state.get_data_parallel_group()) + loss_sum_for_ub, num_valid_pairs, num_correct_pairs = tensor_to_reduce + + if num_valid_pairs > 0: + reduced_loss = loss_sum_for_ub / num_valid_pairs + reduced_acc = num_correct_pairs / num_valid_pairs + else: + reduced_loss = torch.zeros_like(loss_sum_for_ub) + reduced_acc = torch.zeros_like(num_correct_pairs) + else: reduced_loss = average_losses_across_data_parallel_group([loss_for_ub]) reduced_acc = average_losses_across_data_parallel_group([acc_chosen]) - - out_chosen, out_rejected = gather_and_split_rewards(output_tensor) - - return ( - loss_for_ub, - { - "avg": reduced_loss, - "acc": reduced_acc, - "out_chosen": out_chosen, - "out_rejected": out_rejected, - }, - ) + # This assumes `drop_last=True` -- which is normally the case during training. + num_valid_pairs = num_valid_pairs * parallel_state.get_data_parallel_world_size() + + out_chosen, out_rejected = gather_and_split_rewards(output_tensor) + + return ( + loss_for_ub, + { + "num_valid_pairs": num_valid_pairs, + "avg": reduced_loss, + "acc": reduced_acc, + "out_chosen": out_chosen, + "out_rejected": out_rejected, + }, + ) return output_tensor, loss_func @@ -238,10 +243,8 @@ def split_output_tensor(self, output_tensor): def loss_func(self, output_tensor): out_chosen, out_rejected = self.split_output_tensor(output_tensor) - comp = out_chosen > out_rejected - acc_chosen = torch.sum(comp) / comp.shape[0] - loss = -torch.nn.functional.logsigmoid(out_chosen - out_rejected).mean() - return loss, acc_chosen + loss = -torch.nn.functional.logsigmoid(out_chosen - out_rejected) + return loss def get_loss_and_metrics(self, batch, forward_only): data_iter = get_iterator_k_split(batch, get_num_microbatches()) @@ -272,21 +275,21 @@ def get_loss_and_metrics(self, batch, forward_only): rewards_all_mean = rewards_all.mean() rewards_all_std = rewards_all.std() - # average loss across micro batches - loss_tensors_list = [loss_reduced["avg"] for loss_reduced in losses_reduced_per_micro_batch] - loss_tensor = torch.concat(loss_tensors_list) - loss_mean = loss_tensor.mean() - acc_tensors_list = [loss_reduced["acc"] for loss_reduced in losses_reduced_per_micro_batch] - - if len(acc_tensors_list) == 1: - acc_tensor = acc_tensors_list[0] - elif len(acc_tensors_list) > 1: - acc_tensor = torch.concat(acc_tensors_list) - acc_mean = acc_tensor.mean() + num_valid_pairs = torch.stack( + [loss_reduced["num_valid_pairs"] for loss_reduced in losses_reduced_per_micro_batch] + ) + loss_tensor = torch.cat([loss_reduced["avg"].view(1) for loss_reduced in losses_reduced_per_micro_batch]) + acc_tensor = torch.cat([loss_reduced["acc"].view(1) for loss_reduced in losses_reduced_per_micro_batch]) + + weights = num_valid_pairs.sum().float() + loss_mean = (loss_tensor * num_valid_pairs).sum() / weights + acc_mean = (acc_tensor * num_valid_pairs).sum() / weights + else: loss_mean = torch.tensor(0.0, device=torch.cuda.current_device()) acc_mean = torch.tensor(0.0, device=torch.cuda.current_device()) + weights = torch.tensor(0.0, device=torch.cuda.current_device()) rewards_chosen_mean = torch.tensor(0.0, device=torch.cuda.current_device()) rewards_rejected_mean = torch.tensor(0.0, device=torch.cuda.current_device()) @@ -296,6 +299,7 @@ def get_loss_and_metrics(self, batch, forward_only): # we can only log on one rank if it is rank zero so we broadcast from last rank torch.distributed.broadcast(loss_mean, get_last_rank()) torch.distributed.broadcast(acc_mean, get_last_rank()) + torch.distributed.broadcast(weights, get_last_rank()) torch.distributed.broadcast(rewards_chosen_mean, get_last_rank()) torch.distributed.broadcast(rewards_rejected_mean, get_last_rank()) @@ -305,6 +309,7 @@ def get_loss_and_metrics(self, batch, forward_only): metrics = { "loss": loss_mean, "acc": acc_mean, + "weights": weights, "rewards_chosen_mean": rewards_chosen_mean, "rewards_rejected_mean": rewards_rejected_mean, "rewards_all_mean": rewards_all_mean, diff --git a/nemo_aligner/package_info.py b/nemo_aligner/package_info.py index e219e397d..2dd79d2ef 100644 --- a/nemo_aligner/package_info.py +++ b/nemo_aligner/package_info.py @@ -14,7 +14,7 @@ MAJOR = 0 -MINOR = 6 +MINOR = 8 PATCH = 0 PRE_RELEASE = "rc0" DEV = "dev0" diff --git a/setup/requirements.txt b/setup/requirements.txt index 4aa22afa1..80758d8ad 100644 --- a/setup/requirements.txt +++ b/setup/requirements.txt @@ -1,8 +1,8 @@ Jinja2~=3.1.4 jsonlines -megatron_core>=0.8 -nemo_toolkit[nlp] -nvidia-pytriton +# megatron_core>=0.8 +# nemo_toolkit[nlp] +# nvidia-pytriton # pynvml pin is needed for TRTLLM v0.13.0 since 12.0.0 contains a breaking change. -pynvml==11.5.3 -tensorrt-llm==0.13.0 +# pynvml==11.5.3 +# tensorrt-llm==0.13.0