Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 55 additions & 12 deletions pkg/sync/expand/cycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func (g *EntitlementGraph) GetFirstCycle(ctx context.Context) []int {
if g.HasNoCycles {
return nil
}
comps := g.ComputeCyclicComponents(ctx)
comps, _ := g.ComputeCyclicComponents(ctx)
if len(comps) == 0 {
return nil
}
Expand All @@ -26,7 +26,8 @@ func (g *EntitlementGraph) HasCycles(ctx context.Context) bool {
if g.HasNoCycles {
return false
}
return len(g.ComputeCyclicComponents(ctx)) > 0
comps, _ := g.ComputeCyclicComponents(ctx)
return len(comps) > 0
}

func (g *EntitlementGraph) cycleDetectionHelper(
Expand All @@ -36,35 +37,77 @@ func (g *EntitlementGraph) cycleDetectionHelper(
if len(reach) == 0 {
return nil, false
}
adj := g.toAdjacency(reach)
groups := scc.CondenseFWBWGroupsFromAdj(context.Background(), adj, scc.DefaultOptions())
fg := filteredGraph{g: g, include: func(id int) bool { _, ok := reach[id]; return ok }}
groups, _ := scc.CondenseFWBW(context.Background(), fg, scc.DefaultOptions())
for _, comp := range groups {
if len(comp) > 1 || (len(comp) == 1 && adj[comp[0]][comp[0]] != 0) {
if len(comp) > 1 || (len(comp) == 1 && g.hasSelfLoop(comp[0])) {
return comp, true
}
}
return nil, false
}

func (g *EntitlementGraph) FixCycles(ctx context.Context) error {
return g.FixCyclesFromComponents(ctx, g.ComputeCyclicComponents(ctx))
comps, _ := g.ComputeCyclicComponents(ctx)
return g.FixCyclesFromComponents(ctx, comps)
}

// ComputeCyclicComponents runs SCC once and returns only cyclic components.
// A component is cyclic if len>1 or a singleton with a self-loop.
func (g *EntitlementGraph) ComputeCyclicComponents(ctx context.Context) [][]int {
func (g *EntitlementGraph) ComputeCyclicComponents(ctx context.Context) ([][]int, *scc.Metrics) {
if g.HasNoCycles {
return nil
return nil, nil
}
adj := g.toAdjacency(nil)
groups := scc.CondenseFWBWGroupsFromAdj(ctx, adj, scc.DefaultOptions())
groups, metrics := scc.CondenseFWBW(ctx, g, scc.DefaultOptions())
cyclic := make([][]int, 0)
for _, comp := range groups {
if len(comp) > 1 || (len(comp) == 1 && adj[comp[0]][comp[0]] != 0) {
if len(comp) > 1 || (len(comp) == 1 && g.hasSelfLoop(comp[0])) {
cyclic = append(cyclic, comp)
}
}
return cyclic
return cyclic, metrics
}

// hasSelfLoop reports whether a node has a self-edge.
func (g *EntitlementGraph) hasSelfLoop(id int) bool {
if row, ok := g.SourcesToDestinations[id]; ok {
_, ok := row[id]
return ok
}
return false
}

// filteredGraph restricts EntitlementGraph iteration to nodes for which include(id) is true.
type filteredGraph struct {
g *EntitlementGraph
include func(int) bool
}

func (fg filteredGraph) ForEachNode(fn func(id int) bool) {
for id := range fg.g.Nodes {
if fg.include != nil && !fg.include(id) {
continue
}
if !fn(id) {
return
}
}
}

func (fg filteredGraph) ForEachEdgeFrom(src int, fn func(dst int) bool) {
if fg.include != nil && !fg.include(src) {
return
}
if dsts, ok := fg.g.SourcesToDestinations[src]; ok {
for dst := range dsts {
if fg.include != nil && !fg.include(dst) {
continue
}
if !fn(dst) {
return
}
}
}
}

// removeNode obliterates a node and all incoming/outgoing edges.
Expand Down
37 changes: 16 additions & 21 deletions pkg/sync/expand/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2"
"github.com/conductorone/baton-sdk/pkg/sync/expand/scc"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -313,36 +314,30 @@ func (g *EntitlementGraph) DeleteEdge(ctx context.Context, srcEntitlementID stri
// toAdjacency builds an adjacency map for SCC. If nodesSubset is non-nil, only
// include those nodes (and edges between them). Always include all nodes in the
// subset as keys, even if they have zero outgoing edges.
func (g *EntitlementGraph) toAdjacency(nodesSubset map[int]struct{}) map[int]map[int]int {
adj := make(map[int]map[int]int, len(g.Nodes))
include := func(id int) bool {
if nodesSubset == nil {
return true
}
_, ok := nodesSubset[id]
return ok
}
// toAdjacency removed: use SCC via scc.Source on EntitlementGraph

// Ensure keys for all included nodes.
var _ scc.Source = (*EntitlementGraph)(nil)

// ForEachNode implements scc.Source iteration over nodes (including isolated nodes).
// It does not import scc; matching the method names/signatures is sufficient.
func (g *EntitlementGraph) ForEachNode(fn func(id int) bool) {
for id := range g.Nodes {
if include(id) {
adj[id] = make(map[int]int)
if !fn(id) {
return
}
}
}

// Add edges where both endpoints are included.
for src, dsts := range g.SourcesToDestinations {
if !include(src) {
continue
}
row := adj[src]
// ForEachEdgeFrom implements scc.Source iteration of outgoing edges for src.
// It enumerates unique destination node IDs.
func (g *EntitlementGraph) ForEachEdgeFrom(src int, fn func(dst int) bool) {
if dsts, ok := g.SourcesToDestinations[src]; ok {
for dst := range dsts {
if include(dst) {
row[dst] = 1
if !fn(dst) {
return
}
}
}
return adj
}

// reachableFrom computes the set of node IDs reachable from start over
Expand Down
117 changes: 117 additions & 0 deletions pkg/sync/expand/scc/bitset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package scc

import (
"math/bits"
"sync/atomic"
)

// bitset is a packed, atomically updatable bitset.
//
// Concurrency notes:
// - Only testAndSetAtomic and clearAtomic are safe concurrently.
// - All other methods must not race with writers.
// - Slice storage aligns on 64-bit boundaries for atomic ops.
type bitset struct{ w []uint64 }

func newBitset(n int) *bitset {
if n <= 0 {
return &bitset{}
}
return &bitset{w: make([]uint64, (n+63)>>6)}
}

func (b *bitset) test(i int) bool {
if i < 0 {
return false
}
w := i >> 6
return (b.w[w] & (1 << (uint(i) & 63))) != 0
}

func (b *bitset) set(i int) {
if i < 0 {
return
}
w := i >> 6
b.w[w] |= 1 << (uint(i) & 63)
}

func (b *bitset) testAndSetAtomic(i int) bool {
if i < 0 {
return false
}
w := i >> 6
mask := uint64(1) << (uint(i) & 63)
addr := &b.w[w]
for {
old := atomic.LoadUint64(addr)
if old&mask != 0 {
return true
}
if atomic.CompareAndSwapUint64(addr, old, old|mask) {
return false
}
}
}

func (b *bitset) clearAtomic(i int) {
if i < 0 {
return
}
w := i >> 6
mask := ^(uint64(1) << (uint(i) & 63))
addr := &b.w[w]
for {
old := atomic.LoadUint64(addr)
if atomic.CompareAndSwapUint64(addr, old, old&mask) {
return
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Atomic ops: add index guards; unify mask types.

Same out-of-bounds risk here. Suggest cheap precondition guards; returning early is safer than panicking in production paths. Also use uint64(1) consistently for clarity.

 func (b *bitset) testAndSetAtomic(i int) bool {
-	if i < 0 {
+	if i < 0 {
 		return false
 	}
 	w := i >> 6
-	mask := uint64(1) << (uint(i) & 63)
+	if w >= len(b.w) {
+		// Treat OOB as already-set to avoid enqueueing invalid work.
+		return true
+	}
+	mask := uint64(1) << (uint(i) & 63)
 	addr := &b.w[w]
 	for {
 		old := atomic.LoadUint64(addr)
 		if old&mask != 0 {
 			return true
 		}
 		if atomic.CompareAndSwapUint64(addr, old, old|mask) {
 			return false
 		}
 	}
 }
 
 func (b *bitset) clearAtomic(i int) {
-	if i < 0 {
+	if i < 0 {
 		return
 	}
 	w := i >> 6
-	mask := ^(uint64(1) << (uint(i) & 63))
+	if w >= len(b.w) {
+		return
+	}
+	mask := ^(uint64(1) << (uint(i) & 63))
 	addr := &b.w[w]
 	for {
 		old := atomic.LoadUint64(addr)
 		if atomic.CompareAndSwapUint64(addr, old, old&mask) {
 			return
 		}
 	}
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (b *bitset) testAndSetAtomic(i int) bool {
if i < 0 {
return false
}
w := i >> 6
mask := uint64(1) << (uint(i) & 63)
addr := &b.w[w]
for {
old := atomic.LoadUint64(addr)
if old&mask != 0 {
return true
}
if atomic.CompareAndSwapUint64(addr, old, old|mask) {
return false
}
}
}
func (b *bitset) clearAtomic(i int) {
if i < 0 {
return
}
w := i >> 6
mask := ^(uint64(1) << (uint(i) & 63))
addr := &b.w[w]
for {
old := atomic.LoadUint64(addr)
if atomic.CompareAndSwapUint64(addr, old, old&mask) {
return
}
}
}
func (b *bitset) testAndSetAtomic(i int) bool {
if i < 0 {
return false
}
w := i >> 6
if w >= len(b.w) {
// Treat OOB as already-set to avoid enqueueing invalid work.
return true
}
mask := uint64(1) << (uint(i) & 63)
addr := &b.w[w]
for {
old := atomic.LoadUint64(addr)
if old&mask != 0 {
return true
}
if atomic.CompareAndSwapUint64(addr, old, old|mask) {
return false
}
}
}
func (b *bitset) clearAtomic(i int) {
if i < 0 {
return
}
w := i >> 6
if w >= len(b.w) {
return
}
mask := ^(uint64(1) << (uint(i) & 63))
addr := &b.w[w]
for {
old := atomic.LoadUint64(addr)
if atomic.CompareAndSwapUint64(addr, old, old&mask) {
return
}
}
}
🤖 Prompt for AI Agents
In pkg/sync/expand/scc/bitset.go around lines 39 to 70, both testAndSetAtomic
and clearAtomic lack bounds checks on the computed word index and mix literal 1
with uint64; add a cheap precondition guard at the top of each function to
return immediately if i < 0 or w >= len(b.w) (where w := i >> 6), and construct
masks with uint64(1) (e.g., mask := uint64(1) << (uint(i) & 63) and mask :=
^(uint64(1) << (uint(i) & 63))) so mask is a uint64 in both functions; keep the
existing atomic loop logic unchanged otherwise.


func (b *bitset) clone() *bitset {
cp := &bitset{w: make([]uint64, len(b.w))}
copy(cp.w, b.w)
return cp
}

func (b *bitset) and(x *bitset) *bitset {
for i := range b.w {
b.w[i] &= x.w[i]
}
return b
}

func (b *bitset) or(x *bitset) *bitset {
for i := range b.w {
b.w[i] |= x.w[i]
}
return b
}

func (b *bitset) andNot(x *bitset) *bitset {
for i := range b.w {
b.w[i] &^= x.w[i]
}
return b
}

func (b *bitset) isEmpty() bool {
for _, w := range b.w {
if w != 0 {
return false
}
}
return true
}

func (b *bitset) forEachSet(fn func(i int)) {
for wi, w := range b.w {
for w != 0 {
tz := bits.TrailingZeros64(w)
i := (wi << 6) + tz
fn(i)
w &^= 1 << uint(tz) //nolint:gosec // trailing zeros is non-negative
}
Comment on lines +117 to +124
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Use uint64 mask in forEachSet.

Same mixed-type issue; current code won’t compile.

-			w &^= 1 << uint(tz) //nolint:gosec // trailing zeros is non-negative
+			w &^= uint64(1) << uint(tz) //nolint:gosec // trailing zeros is non-negative
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (b *bitset) forEachSet(fn func(i int)) {
for wi, w := range b.w {
for w != 0 {
tz := bits.TrailingZeros64(w)
i := (wi << 6) + tz
fn(i)
w &^= 1 << uint(tz) //nolint:gosec // trailing zeros is non-negative
}
func (b *bitset) forEachSet(fn func(i int)) {
for wi, w := range b.w {
for w != 0 {
tz := bits.TrailingZeros64(w)
i := (wi << 6) + tz
fn(i)
w &^= uint64(1) << uint(tz) //nolint:gosec // trailing zeros is non-negative
}
}
}
🤖 Prompt for AI Agents
In pkg/sync/expand/scc/bitset.go around lines 117 to 124, the bit-clear
expression mixes integer types and won't compile; replace the mask expression w
&^= 1 << uint(tz) with a uint64 mask (e.g. w &^= uint64(1) << uint(tz)) so both
operands are uint64 and the shift uses a uint cast for tz.

}
}
Loading
Loading