Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 47 additions & 6 deletions pkg/sync/expand/cycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ func (g *EntitlementGraph) cycleDetectionHelper(
if len(reach) == 0 {
return nil, false
}
adj := g.toAdjacency(reach)
groups := scc.CondenseFWBWGroupsFromAdj(context.Background(), adj, scc.DefaultOptions())
fg := filteredGraph{g: g, include: func(id int) bool { _, ok := reach[id]; return ok }}
groups := scc.CondenseFWBW(context.Background(), fg, scc.DefaultOptions())
for _, comp := range groups {
if len(comp) > 1 || (len(comp) == 1 && adj[comp[0]][comp[0]] != 0) {
if len(comp) > 1 || (len(comp) == 1 && g.hasSelfLoop(comp[0])) {
return comp, true
}
}
Expand All @@ -56,17 +56,58 @@ func (g *EntitlementGraph) ComputeCyclicComponents(ctx context.Context) [][]int
if g.HasNoCycles {
return nil
}
adj := g.toAdjacency(nil)
groups := scc.CondenseFWBWGroupsFromAdj(ctx, adj, scc.DefaultOptions())
groups := scc.CondenseFWBW(ctx, g, scc.DefaultOptions())
cyclic := make([][]int, 0)
for _, comp := range groups {
if len(comp) > 1 || (len(comp) == 1 && adj[comp[0]][comp[0]] != 0) {
if len(comp) > 1 || (len(comp) == 1 && g.hasSelfLoop(comp[0])) {
cyclic = append(cyclic, comp)
}
}
return cyclic
}

// hasSelfLoop reports whether a node has a self-edge.
func (g *EntitlementGraph) hasSelfLoop(id int) bool {
if row, ok := g.SourcesToDestinations[id]; ok {
_, ok := row[id]
return ok
}
return false
}

// filteredGraph restricts EntitlementGraph iteration to nodes for which include(id) is true.
type filteredGraph struct {
g *EntitlementGraph
include func(int) bool
}

func (fg filteredGraph) ForEachNode(fn func(id int) bool) {
for id := range fg.g.Nodes {
if fg.include != nil && !fg.include(id) {
continue
}
if !fn(id) {
return
}
}
}

func (fg filteredGraph) ForEachEdgeFrom(src int, fn func(dst int) bool) {
if fg.include != nil && !fg.include(src) {
return
}
if dsts, ok := fg.g.SourcesToDestinations[src]; ok {
for dst := range dsts {
if fg.include != nil && !fg.include(dst) {
continue
}
if !fn(dst) {
return
}
}
}
}

// removeNode obliterates a node and all incoming/outgoing edges.
func (g *EntitlementGraph) removeNode(nodeID int) {
// Delete from reverse mapping.
Expand Down
37 changes: 16 additions & 21 deletions pkg/sync/expand/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2"
"github.com/conductorone/baton-sdk/pkg/sync/expand/scc"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -313,36 +314,30 @@ func (g *EntitlementGraph) DeleteEdge(ctx context.Context, srcEntitlementID stri
// toAdjacency builds an adjacency map for SCC. If nodesSubset is non-nil, only
// include those nodes (and edges between them). Always include all nodes in the
// subset as keys, even if they have zero outgoing edges.
func (g *EntitlementGraph) toAdjacency(nodesSubset map[int]struct{}) map[int]map[int]int {
adj := make(map[int]map[int]int, len(g.Nodes))
include := func(id int) bool {
if nodesSubset == nil {
return true
}
_, ok := nodesSubset[id]
return ok
}
// toAdjacency removed: use SCC via scc.Source on EntitlementGraph

// Ensure keys for all included nodes.
var _ scc.Source = (*EntitlementGraph)(nil)

// ForEachNode implements scc.Source iteration over nodes (including isolated nodes).
// It does not import scc; matching the method names/signatures is sufficient.
func (g *EntitlementGraph) ForEachNode(fn func(id int) bool) {
for id := range g.Nodes {
if include(id) {
adj[id] = make(map[int]int)
if !fn(id) {
return
}
}
}

// Add edges where both endpoints are included.
for src, dsts := range g.SourcesToDestinations {
if !include(src) {
continue
}
row := adj[src]
// ForEachEdgeFrom implements scc.Source iteration of outgoing edges for src.
// It enumerates unique destination node IDs.
func (g *EntitlementGraph) ForEachEdgeFrom(src int, fn func(dst int) bool) {
if dsts, ok := g.SourcesToDestinations[src]; ok {
for dst := range dsts {
if include(dst) {
row[dst] = 1
if !fn(dst) {
return
}
}
}
return adj
}

// reachableFrom computes the set of node IDs reachable from start over
Expand Down
117 changes: 117 additions & 0 deletions pkg/sync/expand/scc/bitset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package scc

import (
"math/bits"
"sync/atomic"
)

// bitset is a packed, atomically updatable bitset.
//
// Concurrency notes:
// - Only testAndSetAtomic and clearAtomic are safe concurrently.
// - All other methods must not race with writers.
// - Slice storage aligns on 64-bit boundaries for atomic ops.
type bitset struct{ w []uint64 }

func newBitset(n int) *bitset {
if n <= 0 {
return &bitset{}
}
return &bitset{w: make([]uint64, (n+63)>>6)}
}

func (b *bitset) test(i int) bool {
if i < 0 {
return false
}
w := i >> 6
return (b.w[w] & (1 << (uint(i) & 63))) != 0
}

func (b *bitset) set(i int) {
if i < 0 {
return
}
w := i >> 6
b.w[w] |= 1 << (uint(i) & 63)
}

func (b *bitset) testAndSetAtomic(i int) bool {
if i < 0 {
return false
}
w := i >> 6
mask := uint64(1) << (uint(i) & 63)
addr := &b.w[w]
for {
old := atomic.LoadUint64(addr)
if old&mask != 0 {
return true
}
if atomic.CompareAndSwapUint64(addr, old, old|mask) {
return false
}
}
}

func (b *bitset) clearAtomic(i int) {
if i < 0 {
return
}
w := i >> 6
mask := ^(uint64(1) << (uint(i) & 63))
addr := &b.w[w]
for {
old := atomic.LoadUint64(addr)
if atomic.CompareAndSwapUint64(addr, old, old&mask) {
return
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Atomic ops: add index guards; unify mask types.

Same out-of-bounds risk here. Suggest cheap precondition guards; returning early is safer than panicking in production paths. Also use uint64(1) consistently for clarity.

 func (b *bitset) testAndSetAtomic(i int) bool {
-	if i < 0 {
+	if i < 0 {
 		return false
 	}
 	w := i >> 6
-	mask := uint64(1) << (uint(i) & 63)
+	if w >= len(b.w) {
+		// Treat OOB as already-set to avoid enqueueing invalid work.
+		return true
+	}
+	mask := uint64(1) << (uint(i) & 63)
 	addr := &b.w[w]
 	for {
 		old := atomic.LoadUint64(addr)
 		if old&mask != 0 {
 			return true
 		}
 		if atomic.CompareAndSwapUint64(addr, old, old|mask) {
 			return false
 		}
 	}
 }
 
 func (b *bitset) clearAtomic(i int) {
-	if i < 0 {
+	if i < 0 {
 		return
 	}
 	w := i >> 6
-	mask := ^(uint64(1) << (uint(i) & 63))
+	if w >= len(b.w) {
+		return
+	}
+	mask := ^(uint64(1) << (uint(i) & 63))
 	addr := &b.w[w]
 	for {
 		old := atomic.LoadUint64(addr)
 		if atomic.CompareAndSwapUint64(addr, old, old&mask) {
 			return
 		}
 	}
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (b *bitset) testAndSetAtomic(i int) bool {
if i < 0 {
return false
}
w := i >> 6
mask := uint64(1) << (uint(i) & 63)
addr := &b.w[w]
for {
old := atomic.LoadUint64(addr)
if old&mask != 0 {
return true
}
if atomic.CompareAndSwapUint64(addr, old, old|mask) {
return false
}
}
}
func (b *bitset) clearAtomic(i int) {
if i < 0 {
return
}
w := i >> 6
mask := ^(uint64(1) << (uint(i) & 63))
addr := &b.w[w]
for {
old := atomic.LoadUint64(addr)
if atomic.CompareAndSwapUint64(addr, old, old&mask) {
return
}
}
}
func (b *bitset) testAndSetAtomic(i int) bool {
if i < 0 {
return false
}
w := i >> 6
if w >= len(b.w) {
// Treat OOB as already-set to avoid enqueueing invalid work.
return true
}
mask := uint64(1) << (uint(i) & 63)
addr := &b.w[w]
for {
old := atomic.LoadUint64(addr)
if old&mask != 0 {
return true
}
if atomic.CompareAndSwapUint64(addr, old, old|mask) {
return false
}
}
}
func (b *bitset) clearAtomic(i int) {
if i < 0 {
return
}
w := i >> 6
if w >= len(b.w) {
return
}
mask := ^(uint64(1) << (uint(i) & 63))
addr := &b.w[w]
for {
old := atomic.LoadUint64(addr)
if atomic.CompareAndSwapUint64(addr, old, old&mask) {
return
}
}
}
🤖 Prompt for AI Agents
In pkg/sync/expand/scc/bitset.go around lines 39 to 70, both testAndSetAtomic
and clearAtomic lack bounds checks on the computed word index and mix literal 1
with uint64; add a cheap precondition guard at the top of each function to
return immediately if i < 0 or w >= len(b.w) (where w := i >> 6), and construct
masks with uint64(1) (e.g., mask := uint64(1) << (uint(i) & 63) and mask :=
^(uint64(1) << (uint(i) & 63))) so mask is a uint64 in both functions; keep the
existing atomic loop logic unchanged otherwise.


func (b *bitset) clone() *bitset {
cp := &bitset{w: make([]uint64, len(b.w))}
copy(cp.w, b.w)
return cp
}

func (b *bitset) and(x *bitset) *bitset {
for i := range b.w {
b.w[i] &= x.w[i]
}
return b
}

func (b *bitset) or(x *bitset) *bitset {
for i := range b.w {
b.w[i] |= x.w[i]
}
return b
}

func (b *bitset) andNot(x *bitset) *bitset {
for i := range b.w {
b.w[i] &^= x.w[i]
}
return b
}

func (b *bitset) isEmpty() bool {
for _, w := range b.w {
if w != 0 {
return false
}
}
return true
}

func (b *bitset) forEachSet(fn func(i int)) {
for wi, w := range b.w {
for w != 0 {
tz := bits.TrailingZeros64(w)
i := (wi << 6) + tz
fn(i)
w &^= 1 << uint(tz) //nolint:gosec // trailing zeros is non-negative
}
Comment on lines +117 to +124
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Use uint64 mask in forEachSet.

Same mixed-type issue; current code won’t compile.

-			w &^= 1 << uint(tz) //nolint:gosec // trailing zeros is non-negative
+			w &^= uint64(1) << uint(tz) //nolint:gosec // trailing zeros is non-negative
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (b *bitset) forEachSet(fn func(i int)) {
for wi, w := range b.w {
for w != 0 {
tz := bits.TrailingZeros64(w)
i := (wi << 6) + tz
fn(i)
w &^= 1 << uint(tz) //nolint:gosec // trailing zeros is non-negative
}
func (b *bitset) forEachSet(fn func(i int)) {
for wi, w := range b.w {
for w != 0 {
tz := bits.TrailingZeros64(w)
i := (wi << 6) + tz
fn(i)
w &^= uint64(1) << uint(tz) //nolint:gosec // trailing zeros is non-negative
}
}
}
🤖 Prompt for AI Agents
In pkg/sync/expand/scc/bitset.go around lines 117 to 124, the bit-clear
expression mixes integer types and won't compile; replace the mask expression w
&^= 1 << uint(tz) with a uint64 mask (e.g. w &^= uint64(1) << uint(tz)) so both
operands are uint64 and the shift uses a uint cast for tz.

}
}
124 changes: 124 additions & 0 deletions pkg/sync/expand/scc/bitset_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package scc

import (
"sync"
"testing"
)

func TestBitsetBasicSetTest(t *testing.T) {
b := newBitset(130) // spans 3 words
if b.isEmpty() == false {
t.Fatalf("new bitset should be empty")
}

// Set and test a few indices across word boundaries
indices := []int{0, 1, 63, 64, 65, 129}
for _, i := range indices {
b.set(i)
if !b.test(i) {
t.Fatalf("expected bit %d to be set", i)
}
}
if b.isEmpty() {
t.Fatalf("bitset should not be empty after sets")
}
}

func TestBitsetCloneAndOps(t *testing.T) {
b1 := newBitset(128)
b2 := newBitset(128)
for _, i := range []int{0, 2, 64, 127} {
b1.set(i)
}
for _, i := range []int{2, 3, 64, 100} {
b2.set(i)
}

c := b1.clone().and(b2)
for _, i := range []int{2, 64} {
if !c.test(i) {
t.Fatalf("AND missing expected bit %d", i)
}
}
for _, i := range []int{0, 3, 100, 127} {
if c.test(i) {
t.Fatalf("AND has unexpected bit %d", i)
}
}

u := b1.clone().or(b2)
for _, i := range []int{0, 2, 3, 64, 100, 127} {
if !u.test(i) {
t.Fatalf("OR missing expected bit %d", i)
}
}

d := b1.clone().andNot(c)
for _, i := range []int{0, 127} {
if !d.test(i) {
t.Fatalf("ANDNOT missing expected bit %d", i)
}
}
for _, i := range []int{2, 64} {
if d.test(i) {
t.Fatalf("ANDNOT has unexpected bit %d", i)
}
}
}

func TestBitsetForEachSetOrder(t *testing.T) {
b := newBitset(70)
for _, i := range []int{69, 0, 1, 63, 64} {
b.set(i)
}

var seen []int
b.forEachSet(func(i int) { seen = append(seen, i) })
expected := []int{0, 1, 63, 64, 69}
if len(seen) != len(expected) {
t.Fatalf("unexpected count: got %d, want %d", len(seen), len(expected))
}
for i := range expected {
if seen[i] != expected[i] {
t.Fatalf("order mismatch at %d: got %d, want %d", i, seen[i], expected[i])
}
}
}

func TestBitsetAtomicOps(t *testing.T) {
b := newBitset(128)
// Concurrent testAndSetAtomic should set each bit exactly once
var wg sync.WaitGroup
N := 1000
idx := 73 // arbitrary
wg.Add(N)
setCount := 0
var mu sync.Mutex
for i := 0; i < N; i++ {
go func() {
defer wg.Done()
if !b.testAndSetAtomic(idx) {
mu.Lock()
setCount++
mu.Unlock()
}
}()
}
wg.Wait()
if setCount != 1 {
t.Fatalf("expected exactly one set, got %d", setCount)
}
if !b.test(idx) {
t.Fatalf("bit should be set after atomic operations")
}

// clearAtomic should clear once and be idempotent
b.clearAtomic(idx)
if b.test(idx) {
t.Fatalf("bit should be cleared")
}
b.clearAtomic(idx)
if b.test(idx) {
t.Fatalf("bit should remain cleared after second clear")
}
}
Loading
Loading