Skip to content
7 changes: 7 additions & 0 deletions pkg/sync/expand/cycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
// exists. Returns nil if no cycle exists. If there is a single
// node pointing to itself, that will count as a cycle.
func (g *EntitlementGraph) GetFirstCycle() []int {
if g.HasNoCycles {
return nil
}
visited := mapset.NewSet[int]()
for nodeID := range g.Nodes {
cycle, hasCycle := g.cycleDetectionHelper(nodeID, visited, []int{})
Expand Down Expand Up @@ -87,8 +90,12 @@ func (g *EntitlementGraph) removeNode(nodeID int) {
// FixCycles if any cycles of nodes exist, merge all nodes in that cycle into a
// single node and then repeat. Iteration ends when there are no more cycles.
func (g *EntitlementGraph) FixCycles() error {
if g.HasNoCycles {
return nil
}
cycle := g.GetFirstCycle()
if cycle == nil {
g.HasNoCycles = true
return nil
}

Expand Down
31 changes: 19 additions & 12 deletions pkg/sync/expand/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,29 @@ import (
"go.uber.org/zap"
)

// JSON tags for actions, edges, and nodes are short to minimize size of serialized data when checkpointing

type EntitlementGraphAction struct {
SourceEntitlementID string `json:"source_entitlement_id"`
DescendantEntitlementID string `json:"descendant_entitlement_id"`
Shallow bool `json:"shallow"`
ResourceTypeIDs []string `json:"resource_types_ids"`
PageToken string `json:"page_token"`
SourceEntitlementID string `json:"sid"`
DescendantEntitlementID string `json:"did"`
Shallow bool `json:"s"`
ResourceTypeIDs []string `json:"rtids"`
PageToken string `json:"pt"`
}

type Edge struct {
EdgeID int `json:"edge_id"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we used these previously as a effective pagination token, does this mean ideally for in-place upgades we throw away "old" pagination state, and start over? should "version" the pagination token?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sure we've broken this before. In an ideal world we'd version this stuff and write migrations, but I think the moneyball solution would be to end the sync if we error unmarshaling the json. I pushed a change that does this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don;'t think we need a migration, we just need to start the specified sync step over from zero

SourceID int `json:"source_id"`
DestinationID int `json:"destination_id"`
IsExpanded bool `json:"expanded"`
IsShallow bool `json:"shallow"`
ResourceTypeIDs []string `json:"resource_type_ids"`
EdgeID int `json:"id"`
SourceID int `json:"sid"`
DestinationID int `json:"did"`
IsExpanded bool `json:"e"`
IsShallow bool `json:"s"`
ResourceTypeIDs []string `json:"rtids"`
}

// Node represents a list of entitlements. It is the base element of the graph.
type Node struct {
Id int `json:"id"`
EntitlementIDs []string `json:"entitlementIds"` // List of entitlements.
EntitlementIDs []string `json:"eids"` // List of entitlements.
}

// EntitlementGraph - a directed graph representing the relationships between
Expand All @@ -47,6 +49,7 @@ type EntitlementGraph struct {
Loaded bool `json:"loaded"`
Depth int `json:"depth"`
Actions []EntitlementGraphAction `json:"actions"`
HasNoCycles bool `json:"has_no_cycles"`
}

func NewEntitlementGraph(_ context.Context) *EntitlementGraph {
Expand All @@ -56,6 +59,7 @@ func NewEntitlementGraph(_ context.Context) *EntitlementGraph {
EntitlementsToNodes: make(map[string]int),
Nodes: make(map[int]Node),
SourcesToDestinations: make(map[int]map[int]int),
HasNoCycles: false,
}
}

Expand Down Expand Up @@ -155,6 +159,7 @@ func (g *EntitlementGraph) AddEntitlement(entitlement *v2.Entitlement) {
if found != nil {
return
}
g.HasNoCycles = false // Reset this since we're changing the graph.

// Start at 1 in case we don't initialize something and try to get node 0.
g.NextNodeID++
Expand Down Expand Up @@ -265,6 +270,8 @@ func (g *EntitlementGraph) AddEdge(
g.DestinationsToSources[dstNode.Id] = make(map[int]int)
}

g.HasNoCycles = false // Reset this since we're changing the graph.

// Start at 1 in case we don't initialize something and try to get edge 0.
g.NextEdgeID++
edge := Edge{
Expand Down
27 changes: 17 additions & 10 deletions pkg/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,29 @@ type Syncer interface {

// syncer orchestrates a connector sync and stores the results using the provided datasource.Writer.
type syncer struct {
c1zManager manager.Manager
c1zPath string
store connectorstore.Writer
connector types.ConnectorClient
state State
runDuration time.Duration
transitionHandler func(s Action)
progressHandler func(p *Progress)
tmpDir string
skipFullSync bool
c1zManager manager.Manager
c1zPath string
store connectorstore.Writer
connector types.ConnectorClient
state State
runDuration time.Duration
transitionHandler func(s Action)
progressHandler func(p *Progress)
tmpDir string
skipFullSync bool
lastCheckPointTime time.Time

skipEGForResourceType map[string]bool
}

const minCheckpointInterval = 10 * time.Second

// Checkpoint marshals the current state and stores it.
func (s *syncer) Checkpoint(ctx context.Context) error {
if time.Since(s.lastCheckPointTime) < minCheckpointInterval {
return nil
}
s.lastCheckPointTime = time.Now()
checkpoint, err := s.state.Marshal()
if err != nil {
return err
Expand Down
181 changes: 111 additions & 70 deletions pkg/sync/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sync

import (
"context"
"fmt"
"os"
"path/filepath"
"strconv"
Expand All @@ -16,88 +17,62 @@ import (
"google.golang.org/grpc"
)

var groupResourceType = &v2.ResourceType{
Id: "group",
DisplayName: "Group",
Traits: []v2.ResourceType_Trait{v2.ResourceType_TRAIT_GROUP},
}
var userResourceType = &v2.ResourceType{
Id: "user",
DisplayName: "User",
Traits: []v2.ResourceType_Trait{v2.ResourceType_TRAIT_USER},
Annotations: annotations.New(&v2.SkipEntitlementsAndGrants{}),
}

func BenchmarkExpandCircle(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// create a loop of N entitlements
circleSize := 9
circleSize := 7
// with different principal + grants at each layer
usersPerLayer := 100
groupCount := 100

mc := newMockConnector()

groupResourceType := &v2.ResourceType{
Id: "group",
DisplayName: "Group",
Traits: []v2.ResourceType_Trait{v2.ResourceType_TRAIT_GROUP},
}
userResourceType := &v2.ResourceType{
Id: "user",
DisplayName: "User",
Traits: []v2.ResourceType_Trait{v2.ResourceType_TRAIT_USER},
Annotations: annotations.New(&v2.SkipEntitlementsAndGrants{}),
}
mc.rtDB = append(mc.rtDB, groupResourceType, userResourceType)

for i := 0; i < circleSize; i++ {
resoruceId := "g_" + strconv.Itoa(i)
resoruce, err := rs.NewGroupResource(
resoruceId,
groupResourceType,
resoruceId,
[]rs.GroupTraitOption{},
)
for i := 0; i < groupCount; i++ {
groupId := "group_" + strconv.Itoa(i)
group, _, err := mc.AddGroup(ctx, groupId)
require.NoError(b, err)

mc.resourceDB = append(mc.resourceDB, resoruce)
childGroupId := "child_group_" + strconv.Itoa(i)
childGroup, childEnt, err := mc.AddGroup(ctx, childGroupId)
require.NoError(b, err)

ent := et.NewAssignmentEntitlement(
resoruce,
"member",
et.WithGrantableTo(groupResourceType, userResourceType),
)
ent.Slug = "member"
_ = mc.AddGroupMember(ctx, group, childGroup, childEnt)

mc.entDB = append(mc.entDB, ent)
for j := 0; j < usersPerLayer; j++ {
pid := "u_" + strconv.Itoa(i*usersPerLayer+j)
principal, err := rs.NewUserResource(
pid,
userResourceType,
pid,
[]rs.UserTraitOption{},
rs.WithAnnotation(&v2.SkipEntitlementsAndGrants{}),
)
pid := "user_" + strconv.Itoa(i*usersPerLayer+j)
principal, err := mc.AddUser(ctx, pid)
require.NoError(b, err)
mc.userDB = append(mc.userDB, principal)

grant := gt.NewGrant(
resoruce,
"member",
principal,
)
mc.grantDB = append(mc.grantDB, grant)

// This isn't needed because grant expansion will create this grant
// _ = mc.AddGroupMember(ctx, group, principal)
_ = mc.AddGroupMember(ctx, childGroup, principal)
}
}

// create the circle
for i := 0; i < circleSize; i++ {
currentEnt := mc.entDB[i]
nextEnt := mc.entDB[(i+1)%circleSize] // Wrap around to the start for the last element

grant := gt.NewGrant(
nextEnt.Resource,
"member",
currentEnt.Resource,
gt.WithAnnotation(&v2.GrantExpandable{
EntitlementIds: []string{
currentEnt.Id,
},
}),
)

mc.grantDB = append(mc.grantDB, grant)
for i := 0; i < circleSize; i += 1 {
groupId := "group_" + strconv.Itoa(i)
nextGroupId := "group_" + strconv.Itoa((i+1)%circleSize) // Wrap around to the start for the last element
currentEnt := mc.entDB[groupId][0]
nextEnt := mc.entDB[nextGroupId][0]

_ = mc.AddGroupMember(ctx, currentEnt.Resource, nextEnt.Resource, nextEnt)
}

tempDir, err := os.MkdirTemp("", "baton-benchmark-expand-circle")
Expand All @@ -120,9 +95,9 @@ func newMockConnector() *mockConnector {
mc := &mockConnector{
rtDB: make([]*v2.ResourceType, 0),
resourceDB: make([]*v2.Resource, 0),
entDB: make([]*v2.Entitlement, 0),
entDB: make(map[string][]*v2.Entitlement),
userDB: make([]*v2.Resource, 0),
grantDB: make([]*v2.Grant, 0),
grantDB: make(map[string][]*v2.Grant),
}
return mc
}
Expand All @@ -131,9 +106,9 @@ type mockConnector struct {
metadata *v2.ConnectorMetadata
rtDB []*v2.ResourceType
resourceDB []*v2.Resource
entDB []*v2.Entitlement
entDB map[string][]*v2.Entitlement // resource id to entitlements
userDB []*v2.Resource
grantDB []*v2.Grant
grantDB map[string][]*v2.Grant // resource id to grants
v2.AssetServiceClient
v2.GrantManagerServiceClient
v2.ResourceManagerServiceClient
Expand All @@ -143,23 +118,89 @@ type mockConnector struct {
v2.TicketsServiceClient
}

func (mc *mockConnector) AddGroup(ctx context.Context, groupId string) (*v2.Resource, *v2.Entitlement, error) {
group, err := rs.NewGroupResource(
groupId,
groupResourceType,
groupId,
[]rs.GroupTraitOption{},
)
if err != nil {
return nil, nil, err
}

mc.resourceDB = append(mc.resourceDB, group)

ent := et.NewAssignmentEntitlement(
group,
"member",
et.WithGrantableTo(groupResourceType, userResourceType),
)
ent.Slug = "member"
mc.entDB[groupId] = append(mc.entDB[groupId], ent)

return group, ent, nil
}

func (mc *mockConnector) AddUser(ctx context.Context, userId string) (*v2.Resource, error) {
user, err := rs.NewUserResource(
userId,
userResourceType,
userId,
[]rs.UserTraitOption{},
rs.WithAnnotation(&v2.SkipEntitlementsAndGrants{}),
)
if err != nil {
return nil, err
}

mc.userDB = append(mc.userDB, user)
return user, nil
}

func (mc *mockConnector) AddGroupMember(ctx context.Context, resource *v2.Resource, principal *v2.Resource, expandEnts ...*v2.Entitlement) *v2.Grant {
grantOpts := []gt.GrantOption{}

for _, ent := range expandEnts {
grantOpts = append(grantOpts, gt.WithAnnotation(&v2.GrantExpandable{
EntitlementIds: []string{
ent.Id,
},
}))
}

grant := gt.NewGrant(
resource,
"member",
principal,
grantOpts...,
)

mc.grantDB[resource.Id.Resource] = append(mc.grantDB[resource.Id.Resource], grant)

return grant
}

func (mc *mockConnector) ListResourceTypes(context.Context, *v2.ResourceTypesServiceListResourceTypesRequest, ...grpc.CallOption) (*v2.ResourceTypesServiceListResourceTypesResponse, error) {
return &v2.ResourceTypesServiceListResourceTypesResponse{List: mc.rtDB}, nil
}

func (mc *mockConnector) ListResources(ctx context.Context, in *v2.ResourcesServiceListResourcesRequest, opts ...grpc.CallOption) (*v2.ResourcesServiceListResourcesResponse, error) {
all := make([]*v2.Resource, 0, len(mc.resourceDB)+len(mc.userDB))
all = append(all, mc.resourceDB...)
all = append(all, mc.userDB...)
return &v2.ResourcesServiceListResourcesResponse{List: all}, nil
if in.ResourceTypeId == "group" {
return &v2.ResourcesServiceListResourcesResponse{List: mc.resourceDB}, nil
}
if in.ResourceTypeId == "user" {
return &v2.ResourcesServiceListResourcesResponse{List: mc.userDB}, nil
}
return nil, fmt.Errorf("unknown resource type %s", in.ResourceTypeId)
}

func (mc *mockConnector) ListEntitlements(ctx context.Context, in *v2.EntitlementsServiceListEntitlementsRequest, opts ...grpc.CallOption) (*v2.EntitlementsServiceListEntitlementsResponse, error) {
return &v2.EntitlementsServiceListEntitlementsResponse{List: mc.entDB}, nil
return &v2.EntitlementsServiceListEntitlementsResponse{List: mc.entDB[in.Resource.Id.Resource]}, nil
}

func (mc *mockConnector) ListGrants(ctx context.Context, in *v2.GrantsServiceListGrantsRequest, opts ...grpc.CallOption) (*v2.GrantsServiceListGrantsResponse, error) {
return &v2.GrantsServiceListGrantsResponse{List: mc.grantDB}, nil
return &v2.GrantsServiceListGrantsResponse{List: mc.grantDB[in.Resource.Id.Resource]}, nil
}

func (mc *mockConnector) GetMetadata(ctx context.Context, in *v2.ConnectorServiceGetMetadataRequest, opts ...grpc.CallOption) (*v2.ConnectorServiceGetMetadataResponse, error) {
Expand Down
Loading