Skip to content

Conversation

@wujingyue
Copy link
Collaborator

@wujingyue wujingyue commented Nov 18, 2025

Fixes #5307

@wujingyue
Copy link
Collaborator Author

!test

@github-actions
Copy link

github-actions bot commented Nov 18, 2025

Review updated until commit 6e7e138

Description

  • Inline Linear+Allreduce operations into same loop for better performance

  • Add row-parallel linear forward test with stream parallelization

  • Improve allocation placement and error handling in host IR lowering

  • Fix type annotations and error message formatting

Changes walkthrough

Relevant files
Enhancement
4 files
lowering.cpp
Major refactor of loop nesting and communication handling
+65/-28 
evaluator.cpp
Change stream_index evaluation to use auto type                   
+1/-1     
fusion_executor_cache.cpp
Replace NVF_ERROR with NVF_ERROR_EQ for index type check 
+4/-3     
communicator.h
Change auto return types to explicit types                             
+3/-3     
Tests
1 files
test_overlap.py
Add row-parallel linear forward test case                               
+100/-0 
Bug fix
1 files
host_ir.cpp
Fix error message formatting in Wait constructor                 
+1/-1     
Formatting
1 files
fusion_kernel_runtime.cpp
Reorder includes for CUDAGuard                                                     
+2/-2     
Cleanup
1 files
test_multidevice_stream_parallel_type.cpp
Remove unused includes and using declarations                       
+0/-3     
Configuration changes
1 files
CMakeLists.txt
Reorder multidevice test files in build list                         
+2/-2     

PR Reviewer Guide

Here are some key observations to aid the review process:

🧪 PR contains tests
⚡ Recommended focus areas for review
Loop Scope Management

The PR introduces complex loop scope management with innermost_scope, innermost.loop, and innermost.parent_scope. Need to verify that the scope hierarchy is correctly maintained and that allocations are properly placed (inside vs outside loops) based on sharding configuration.

Scope& innermost_scope = loop_nest.innermostScope();
LoopInfo innermost;
if (!loop_nest.empty()) {
  innermost = loop_nest.innermost();
}
Communication Fusion Logic

The Communication case now uses a replacement_map to handle tensor sharding and creates allocations with conditional logic based on sharded domains. Need to validate that the fusion of Linear+AllReduce works correctly and that the replacement logic properly handles both input and output tensors.

// TODO: `replacement_map` should be associated with the scope os
// ShardByStream across segments in the same for-loop can be reused.
std::unordered_map<Val*, Val*> replacement_map;
for (Expr* c : convertSingleOpToCommunication(e, device_id)) {
  NVF_ERROR(
      c->isA<Communication>(),
      "Exprs in a Communication group should be Communication: ",
      c);
  auto* communication = c->as<Communication>();
  TensorView* in = communication->in();
  TensorView* out = communication->out();
  if (getShardedIterDomain(in, ParallelType::Stream, DomainType::kLoop) !=
          nullptr &&
      getShardedIterDomain(
          in, ParallelType::Stream, DomainType::kAllocation) == nullptr) {
    auto [i, inserted] = replacement_map.try_emplace(
        in, hir::shardByStream(in, innermost.loop->index()));
    if (inserted) {
      innermost_scope.push_back(i->second->definition());
    }
  }

  // Allocate the recv buffers of communications
  auto* allocate =
      IrBuilder::create<kir::Allocate>(out, MemoryType::Global);
  if (getShardedIterDomain(
          out, ParallelType::Stream, DomainType::kLoop) != nullptr &&
      getShardedIterDomain(
          out, ParallelType::Stream, DomainType::kAllocation) ==
          nullptr) {
    innermost.parent_scope->insert(
        innermost.parent_insertion_point, allocate);
    auto [i, inserted] = replacement_map.try_emplace(
        out, hir::shardByStream(out, innermost.loop->index()));
    NVF_ERROR(inserted);
    innermost_scope.push_back(i->second->definition());
  } else {
    innermost_scope.push_back(allocate);
  }

  Expr* new_c = cloneWithNewOperands(c, replacement_map);
  innermost_scope.push_back(new_c);

  auto* wait = IrBuilder::create<hir::Wait>(new_c);
  innermost_scope.push_back(wait);
}
Performance Validation

The test validates kernel count per iteration (2 for single GPU, 3 for multi-GPU) but doesn't provide actual performance metrics. Consider adding timing comparisons to ensure the fusion actually provides the expected performance benefits mentioned in the issue.

with torch.profiler.profile(
    activities=[torch.profiler.ProfilerActivity.CUDA]
) as prof:
    (out,) = fd.execute([inp, weight], _enable_options=["host_ir_lowering"])

kernel_events = [
    event
    for event in prof.events()
    if event.device_type == torch.profiler.DeviceType.CUDA
]

# When multiple GPUs, expect three kernels per iteration: linear, memcpy,
# allreduce.  The memcpy is from
# https://github.com/NVIDIA/Fuser/blob/cce887595dc86b099506b70f88d653880fde5116/csrc/multidevice/communication.cpp#L493.
# When single GPU, expect two kernels per iteration: linear, memcpy.
num_kernels_per_iteration = 2 if d == 1 else 3
assert len(kernel_events) == s * num_kernels_per_iteration

Test failures

  • (Medium, 2) Profiler kernel count mismatch in tests.python.multidevice.test_overlap

    Test Name A100 A100 (dist.) Source
    tests.python.multidevice.test_overlap.test_row_parallel_linear_forward

@wujingyue
Copy link
Collaborator Author

!test

@wujingyue wujingyue changed the base branch from main to wjy/out November 18, 2025 21:55
@wujingyue wujingyue marked this pull request as ready for review November 19, 2025 02:18
@greptile-apps
Copy link
Contributor

greptile-apps bot commented Nov 19, 2025

Greptile Overview

Greptile Summary

This PR implements the ability to inline linear operations and allreduce communications into the same loop for stream-parallelized matmul operations. The main change refactors csrc/host_ir/lowering.cpp to handle stream-parallelized tensors by sharding them by stream index and placing allocations outside the loop when needed.

Key Changes:

  • Modified lowerSegment to extract innermost_scope and innermost at the start, enabling both Communication and ExprEval cases to handle stream-parallelized operations consistently
  • Added logic to shard input/output tensors by stream when they have stream-parallelized loop domains but not allocation domains
  • Changed return value of cloneWithNewOperands from 0 to e when no replacements are needed (line 140)
  • Added comprehensive test test_row_parallel_linear_forward demonstrating the feature with validation

Critical Issues:

  • Null pointer dereferences in the Communication case (lines 188-215) when loop_nest is empty but tensors are stream-parallelized. The code accesses innermost.loop->index(), innermost.parent_scope, and innermost.parent_insertion_point without checking if the loop nest is empty. The ExprEval case has proper guards (line 251), but Communication case is missing them.

Minor Issues:

  • Typo in comment on line 177: "os" should be "so"

Confidence Score: 1/5

  • This PR has critical null pointer dereference bugs that will cause crashes in production
  • Score reflects critical bugs in csrc/host_ir/lowering.cpp lines 188-215 where the Communication case accesses innermost.loop->index(), innermost.parent_scope, and innermost.parent_insertion_point without checking if loop_nest is empty. These are the same issues flagged in previous comments but remain unfixed. The ExprEval case properly guards against empty loop_nest (line 251), but Communication case lacks these safeguards. While the test passes, it likely doesn't exercise the edge case where loop_nest is empty with stream-parallelized Communications.
  • csrc/host_ir/lowering.cpp requires immediate attention - add null checks before accessing innermost.loop, innermost.parent_scope, and innermost.parent_insertion_point in the Communication case (lines 188-215)

Important Files Changed

File Analysis

Filename Score Overview
csrc/host_ir/lowering.cpp 1/5 Implements inlining of linear+allreduce into same loop, but has critical null pointer dereference bugs in Communication case when loop_nest is empty
csrc/host_ir/evaluator.cpp 5/5 Changed stream_index type from int64_t to auto for type inference, no functional changes
tests/python/multidevice/test_overlap.py 5/5 Added comprehensive test for row-parallel linear forward pass with stream parallelization and allreduce

Sequence Diagram

sequenceDiagram
    participant LF as lowerSegmentedFusionToHostIr
    participant LN as LoopNest
    participant LS as lowerSegment
    participant IC as IrCloner
    
    LF->>LN: Create LoopNest(topLevel)
    loop For each SegmentedGroup
        LF->>LF: computeInlinePosition()
        LF->>LN: closeLoop() until inline_position
        LF->>LN: openLoop() for stream IDs
        LF->>LS: lowerSegment(group, loop_nest)
        
        alt SchedulerType::Communication
            LS->>IC: clone(group.exprs().front())
            LS->>LS: convertSingleOpToCommunication()
            loop For each Communication
                LS->>LS: Check if input is stream-parallelized
                alt Input has Stream loop domain
                    LS->>LS: shardByStream(in, innermost.loop->index())
                    Note right of LS: ⚠️ Potential null pointer if loop_nest empty
                    LS->>LN: push_back(ShardByStream)
                end
                LS->>LS: Allocate output buffer
                alt Output has Stream loop domain
                    LS->>LN: insert allocate before loop
                    LS->>LS: shardByStream(out, innermost.loop->index())
                    Note right of LS: ⚠️ Potential null pointer if loop_nest empty
                    LS->>LN: push_back(ShardByStream)
                else
                    LS->>LN: push_back(allocate)
                end
                LS->>IC: cloneWithNewOperands(communication)
                LS->>LN: push_back(communication)
                LS->>LN: push_back(Wait)
            end
        else SchedulerType::ExprEval
            alt loop_nest.empty()
                LS->>LN: push_back(exprs) to top level
            else
                loop For each expression
                    LS->>LS: Check inputs for stream parallelization
                    LS->>LS: shardByStream if needed
                    LS->>LS: Check outputs for stream parallelization
                    LS->>LN: insert allocate before loop if needed
                    LS->>IC: cloneWithNewOperands(expr)
                    LS->>LN: push_back(expr)
                end
            end
        else Default (LaunchKernel)
            LS->>LN: Allocate outputs
            LS->>LN: push_back(LaunchKernel)
        end
    end
    LF->>LF: InsertDeallocations()
Loading

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

8 files reviewed, 2 comments

Edit Code Review Agent Settings | Greptile
React with 👍 or 👎 to share your feedback on this new summary format

Comment on lines +187 to +195
if (getShardedIterDomain(in, ParallelType::Stream, DomainType::kLoop) !=
nullptr &&
getShardedIterDomain(
in, ParallelType::Stream, DomainType::kAllocation) == nullptr) {
auto [i, inserted] = replacement_map.try_emplace(
in, hir::shardByStream(in, innermost.loop->index()));
if (inserted) {
innermost_scope.push_back(i->second->definition());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: null pointer dereference if loop_nest is empty. innermost.loop is null when loop_nest.empty() is true (line 161-164), but this code calls innermost.loop->index() on line 192 without checking if innermost.loop is null

Suggested change
if (getShardedIterDomain(in, ParallelType::Stream, DomainType::kLoop) !=
nullptr &&
getShardedIterDomain(
in, ParallelType::Stream, DomainType::kAllocation) == nullptr) {
auto [i, inserted] = replacement_map.try_emplace(
in, hir::shardByStream(in, innermost.loop->index()));
if (inserted) {
innermost_scope.push_back(i->second->definition());
}
if (getShardedIterDomain(in, ParallelType::Stream, DomainType::kLoop) !=
nullptr &&
getShardedIterDomain(
in, ParallelType::Stream, DomainType::kAllocation) == nullptr) {
NVF_ERROR(
!loop_nest.empty(),
"Stream-parallelized input requires a non-empty loop nest");
auto [i, inserted] = replacement_map.try_emplace(
in, hir::shardByStream(in, innermost.loop->index()));

Comment on lines +201 to +211
if (getShardedIterDomain(
out, ParallelType::Stream, DomainType::kLoop) != nullptr &&
getShardedIterDomain(
out, ParallelType::Stream, DomainType::kAllocation) ==
nullptr) {
innermost.parent_scope->insert(
innermost.parent_insertion_point, allocate);
auto [i, inserted] = replacement_map.try_emplace(
out, hir::shardByStream(out, innermost.loop->index()));
NVF_ERROR(inserted);
innermost_scope.push_back(i->second->definition());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: null pointer dereference if loop_nest is empty. Multiple accesses to innermost.loop and innermost.parent_scope without null checks

Suggested change
if (getShardedIterDomain(
out, ParallelType::Stream, DomainType::kLoop) != nullptr &&
getShardedIterDomain(
out, ParallelType::Stream, DomainType::kAllocation) ==
nullptr) {
innermost.parent_scope->insert(
innermost.parent_insertion_point, allocate);
auto [i, inserted] = replacement_map.try_emplace(
out, hir::shardByStream(out, innermost.loop->index()));
NVF_ERROR(inserted);
innermost_scope.push_back(i->second->definition());
if (getShardedIterDomain(
out, ParallelType::Stream, DomainType::kLoop) != nullptr &&
getShardedIterDomain(
out, ParallelType::Stream, DomainType::kAllocation) ==
nullptr) {
NVF_ERROR(
!loop_nest.empty(),
"Stream-parallelized output requires a non-empty loop nest");
innermost.parent_scope->insert(
innermost.parent_insertion_point, allocate);

inp = (multidevice_direct_test.shard_tensor(inp_ref, -1, mesh),)
weight = (multidevice_direct_test.shard_tensor(weight_ref, -1, mesh),)
(out,) = fd.execute([inp, weight], _enable_options=["host_ir_lowering"])
torch.testing.assert_close(out.cpu(), out_ref)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to verify inlining actually happened?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about torch.profiler to count how many kernels are launched?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try using PythonProfiler (python interface of FusionProfiler). It should give us the name of schedulers. It also records the stream id if needed. See https://github.com/NVIDIA/Fuser/pull/5563/files for an example.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines +187 to +195
if (getShardedIterDomain(in, ParallelType::Stream, DomainType::kLoop) !=
nullptr &&
getShardedIterDomain(
in, ParallelType::Stream, DomainType::kAllocation) == nullptr) {
auto [i, inserted] = replacement_map.try_emplace(
in, hir::shardByStream(in, innermost.loop->index()));
if (inserted) {
innermost_scope.push_back(i->second->definition());
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This analysis is used at multiple locations. It can be moved to a util function.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Not for this PR though. I'll try to fix a bug around 187 and then think about refactoring.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notes for myself: one consideration was that a communication segment always writes to a pre-allocated output and an expr-eval segment may or may not. But I'll think about how to DRY.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to fix a bug around 187

What is the bug around 187?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#5562

There's one more on top of that which I'll try to fix in the same PR.

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

8 files reviewed, no comments

Edit Code Review Agent Settings | Greptile

@wujingyue wujingyue requested a review from Priya2698 November 20, 2025 20:19
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

9 files reviewed, no comments

Edit Code Review Agent Settings | Greptile

@wujingyue
Copy link
Collaborator Author

!test

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

9 files reviewed, 3 comments

Edit Code Review Agent Settings | Greptile

Comment on lines +188 to 197
if (getShardedIterDomain(in, ParallelType::Stream, DomainType::kLoop) !=
nullptr &&
getShardedIterDomain(
in, ParallelType::Stream, DomainType::kAllocation) == nullptr) {
auto [i, inserted] = replacement_map.try_emplace(
in, hir::shardByStream(in, innermost.loop->index()));
if (inserted) {
innermost_scope.push_back(i->second->definition());
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: null pointer dereference if loop_nest is empty on line 193. When getShardedIterDomain returns non-null but loop_nest.empty() is true, innermost.loop is null (set at lines 161-164), causing innermost.loop->index() to crash. The ExprEval case has an early return for empty loop_nest (line 251), but Communication case is missing this safeguard.

Suggested change
if (getShardedIterDomain(in, ParallelType::Stream, DomainType::kLoop) !=
nullptr &&
getShardedIterDomain(
in, ParallelType::Stream, DomainType::kAllocation) == nullptr) {
auto [i, inserted] = replacement_map.try_emplace(
in, hir::shardByStream(in, innermost.loop->index()));
if (inserted) {
innermost_scope.push_back(i->second->definition());
}
}
if (getShardedIterDomain(in, ParallelType::Stream, DomainType::kLoop) !=
nullptr &&
getShardedIterDomain(
in, ParallelType::Stream, DomainType::kAllocation) == nullptr) {
NVF_ERROR(!loop_nest.empty(), "Stream-parallelized input requires a loop nest");
auto [i, inserted] = replacement_map.try_emplace(
in, hir::shardByStream(in, innermost.loop->index()));
if (inserted) {
innermost_scope.push_back(i->second->definition());
}
}

Comment on lines +202 to +215
if (getShardedIterDomain(
out, ParallelType::Stream, DomainType::kLoop) != nullptr &&
getShardedIterDomain(
out, ParallelType::Stream, DomainType::kAllocation) ==
nullptr) {
innermost.parent_scope->insert(
innermost.parent_insertion_point, allocate);
auto [i, inserted] = replacement_map.try_emplace(
out, hir::shardByStream(out, innermost.loop->index()));
NVF_ERROR(inserted);
innermost_scope.push_back(i->second->definition());
} else {
innermost_scope.push_back(allocate);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: null pointer dereference if loop_nest is empty. Multiple accesses to innermost.loop->index() (line 210), innermost.parent_scope (line 207), and innermost.parent_insertion_point (line 208) without null checks. Same issue as lines 188-197.

Suggested change
if (getShardedIterDomain(
out, ParallelType::Stream, DomainType::kLoop) != nullptr &&
getShardedIterDomain(
out, ParallelType::Stream, DomainType::kAllocation) ==
nullptr) {
innermost.parent_scope->insert(
innermost.parent_insertion_point, allocate);
auto [i, inserted] = replacement_map.try_emplace(
out, hir::shardByStream(out, innermost.loop->index()));
NVF_ERROR(inserted);
innermost_scope.push_back(i->second->definition());
} else {
innermost_scope.push_back(allocate);
}
if (getShardedIterDomain(
out, ParallelType::Stream, DomainType::kLoop) != nullptr &&
getShardedIterDomain(
out, ParallelType::Stream, DomainType::kAllocation) ==
nullptr) {
NVF_ERROR(!loop_nest.empty(), "Stream-parallelized output requires a loop nest");
innermost.parent_scope->insert(
innermost.parent_insertion_point, allocate);
auto [i, inserted] = replacement_map.try_emplace(
out, hir::shardByStream(out, innermost.loop->index()));
NVF_ERROR(inserted);
innermost_scope.push_back(i->second->definition());
} else {
innermost_scope.push_back(allocate);
}

Expr* e = ir_cloner.clone(group.exprs().front());

for (auto* c : convertSingleOpToCommunication(e, device_id)) {
// TODO: `replacement_map` should be associated with the scope os
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

syntax: typo: 'os' should be 'so'

Suggested change
// TODO: `replacement_map` should be associated with the scope os
// TODO: `replacement_map` should be associated with the scope so

Copy link
Collaborator

@Priya2698 Priya2698 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Please also incorporate greptile changes on spelling and adding error checks.


# Collect CUDA kernels after a warmup run to exclude autotuning.
# nvfuser_direct.PythonProfiler failed with host IR lowering. The main
# reason is that HostIrContainer doesn't keep segments while SegmentProfiler
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The torch.profiler use looks good to me.

Is it as simple as adding

FusionProfiler::createSegments(kernel_runtime->executors().size());
in the host IR code?

I'll try to take a look at this to extend FusionProfiler for Host IR.

Copy link
Collaborator Author

@wujingyue wujingyue Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did try a couple of superficial things and figured it's more complicated than that. I stashed my changes unfortunately.

Furthermore, when we switch to HostIrJit generating an LLVM IR program that has no nvFuser dependency, what extra values does PythonProfiler provide in addition to torch.profiler?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants