Skip to content
141 changes: 96 additions & 45 deletions reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ func New(
highWaterMark: -1,
seenAccounts: map[string]struct{}{},
inactiveQueue: []*InactiveEntry{},
inactiveQueueMutex: new(utils.PriorityMutex),
backlogSize: defaultBacklogSize,
lastIndexChecked: -1,
queueMap: map[string]*utils.BST{},
processQueue: make(chan *blockRequest, processQueueBacklog),
}

for _, opt := range options {
Expand All @@ -57,6 +58,10 @@ func New(
// Create change queue
r.changeQueue = make(chan *parser.BalanceChange, r.backlogSize)

// Create queueMap
desiredShardCount := shardBuffer * (r.ActiveConcurrency + r.InactiveConcurrency)
r.queueMap = utils.NewShardedMap(desiredShardCount)

return r
}

Expand Down Expand Up @@ -98,7 +103,7 @@ func (r *Reconciler) wrappedInactiveEnqueue(
accountCurrency *types.AccountCurrency,
liveBlock *types.BlockIdentifier,
) {
if err := r.inactiveAccountQueue(true, accountCurrency, liveBlock); err != nil {
if err := r.inactiveAccountQueue(true, accountCurrency, liveBlock, false); err != nil {
log.Printf(
"%s: unable to queue account %s",
err.Error(),
Expand All @@ -110,17 +115,18 @@ func (r *Reconciler) wrappedInactiveEnqueue(
// addToQueueMap adds a *types.AccountCurrency
// to the prune map at the provided index.
func (r *Reconciler) addToQueueMap(
acctCurrency *types.AccountCurrency,
m map[string]interface{},
key string,
index int64,
) {
key := types.Hash(acctCurrency)
if _, ok := r.queueMap[key]; !ok {
r.queueMap[key] = &utils.BST{}
if _, ok := m[key]; !ok {
m[key] = &utils.BST{}
}

existing := r.queueMap[key].Get(index)
bst := m[key].(*utils.BST)
existing := bst.Get(index)
if existing == nil {
r.queueMap[key].Set(index, 1)
bst.Set(index, 1)
} else {
existing.Value++
}
Expand All @@ -132,6 +138,39 @@ func (r *Reconciler) QueueChanges(
ctx context.Context,
block *types.BlockIdentifier,
balanceChanges []*parser.BalanceChange,
) error {
// If the processQueue fills up, we block
// until some items are dequeued.
select {
case r.processQueue <- &blockRequest{
Block: block,
Changes: balanceChanges,
}:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

// queueWorker processes items from the processQueue.
func (r *Reconciler) queueWorker(ctx context.Context) error {
for {
select {
case req := <-r.processQueue:
if err := r.queueChanges(ctx, req.Block, req.Changes); err != nil {
return err
}
case <-ctx.Done():
return ctx.Err()
}
}
}

// queueChanges processes a block for reconciliation.
func (r *Reconciler) queueChanges(
ctx context.Context,
block *types.BlockIdentifier,
balanceChanges []*parser.BalanceChange,
) error {
// Ensure all interestingAccounts are checked
for _, account := range r.interestingAccounts {
Expand Down Expand Up @@ -183,16 +222,20 @@ func (r *Reconciler) QueueChanges(
Account: change.Account,
Currency: change.Currency,
}
err := r.inactiveAccountQueue(false, acctCurrency, block)

r.inactiveQueueMutex.Lock(true)
err := r.inactiveAccountQueue(false, acctCurrency, block, true)
r.inactiveQueueMutex.Unlock()
if err != nil {
return err
}

// Add change to queueMap before enqueuing to ensure
// there is no possible race.
r.queueMapMutex.Lock()
r.addToQueueMap(acctCurrency, change.Block.Index)
r.queueMapMutex.Unlock()
key := types.Hash(acctCurrency)
m := r.queueMap.Lock(key, true)
r.addToQueueMap(m, key, change.Block.Index)
r.queueMap.Unlock(key)

// Add change to active queue
r.wrappedActiveEnqueue(ctx, change)
Expand Down Expand Up @@ -499,8 +542,12 @@ func (r *Reconciler) inactiveAccountQueue(
inactive bool,
accountCurrency *types.AccountCurrency,
liveBlock *types.BlockIdentifier,
hasLock bool,
) error {
r.inactiveQueueMutex.Lock()
if !hasLock {
r.inactiveQueueMutex.Lock(false)
defer r.inactiveQueueMutex.Unlock()
}

// Only enqueue the first time we see an account on an active reconciliation.
shouldEnqueueInactive := false
Expand Down Expand Up @@ -594,33 +641,34 @@ func (r *Reconciler) updateQueueMap(
prune bool,
) error {
key := types.Hash(acctCurrency)
m := r.queueMap.Lock(key, false)
bst := m[key].(*utils.BST)

r.queueMapMutex.Lock()
existing := r.queueMap[key].Get(index)
existing := bst.Get(index)
existing.Value--
if existing.Value > 0 {
r.queueMapMutex.Unlock()
r.queueMap.Unlock(key)
return nil
}

// Cleanup indexes when we don't need them anymore
r.queueMap[key].Delete(index)
bst.Delete(index)

// Don't prune if there are items for this AccountCurrency
// less than this change.
if !r.queueMap[key].Empty() &&
index >= r.queueMap[key].Min().Key {
r.queueMapMutex.Unlock()
if !bst.Empty() &&
index >= bst.Min().Key {
r.queueMap.Unlock(key)
return nil
}

// Cleanup keys when we don't need them anymore
if r.queueMap[key].Empty() {
delete(r.queueMap, key)
if bst.Empty() {
delete(m, key)
}

// Unlock before pruning as this could take some time
r.queueMapMutex.Unlock()
r.queueMap.Unlock(key)

// Attempt to prune historical balances that will not be used
// anymore.
Expand Down Expand Up @@ -755,37 +803,34 @@ func (r *Reconciler) shouldAttemptInactiveReconciliation(
func (r *Reconciler) reconcileInactiveAccounts( // nolint:gocognit
ctx context.Context,
) error {
for {
if ctx.Err() != nil {
return ctx.Err()
for ctx.Err() == nil {
r.inactiveQueueMutex.Lock(false)
queueLen := len(r.inactiveQueue)
if queueLen == 0 {
r.inactiveQueueMutex.Unlock()
r.debugLog(
"no accounts ready for inactive reconciliation (0 accounts in queue)",
)
time.Sleep(inactiveReconciliationSleep)
continue
}

nextAcct := r.inactiveQueue[0]
key := types.Hash(nextAcct.Entry)

// Lock BST while determining if we should attempt reconciliation
// to ensure we don't allow any accounts to be pruned at retrieved
// head index. Although this appears to be a long time to hold
// this mutex, this lookup takes less than a millisecond.
r.queueMapMutex.Lock()
m := r.queueMap.Lock(key, false)

shouldAttempt, head := r.shouldAttemptInactiveReconciliation(ctx)
if !shouldAttempt {
r.queueMapMutex.Unlock()
r.queueMap.Unlock(key)
time.Sleep(inactiveReconciliationSleep)
continue
}

r.inactiveQueueMutex.Lock()
queueLen := len(r.inactiveQueue)
if queueLen == 0 {
r.inactiveQueueMutex.Unlock()
r.queueMapMutex.Unlock()
r.debugLog(
"no accounts ready for inactive reconciliation (0 accounts in queue)",
)
time.Sleep(inactiveReconciliationSleep)
continue
}

nextAcct := r.inactiveQueue[0]
nextValidIndex := int64(-1)
if nextAcct.LastCheck != nil { // block is set to nil when loaded from previous run
nextValidIndex = nextAcct.LastCheck.Index + r.inactiveFrequency
Expand All @@ -797,8 +842,8 @@ func (r *Reconciler) reconcileInactiveAccounts( // nolint:gocognit

// Add nextAcct to queueMap before returning
// queueMapMutex lock.
r.addToQueueMap(nextAcct.Entry, head.Index)
r.queueMapMutex.Unlock()
r.addToQueueMap(m, key, head.Index)
r.queueMap.Unlock(key)

amount, block, err := r.bestLiveBalance(
ctx,
Expand Down Expand Up @@ -872,13 +917,13 @@ func (r *Reconciler) reconcileInactiveAccounts( // nolint:gocognit
// Always re-enqueue accounts after they have been inactively
// reconciled. If we don't re-enqueue, we will never check
// these accounts again.
err = r.inactiveAccountQueue(true, nextAcct.Entry, block)
err = r.inactiveAccountQueue(true, nextAcct.Entry, block, false)
if err != nil {
return err
}
} else {
r.inactiveQueueMutex.Unlock()
r.queueMapMutex.Unlock()
r.queueMap.Unlock(key)
r.debugLog(
"no accounts ready for inactive reconciliation (%d accounts in queue, will reconcile next account at index %d)",
queueLen,
Expand All @@ -887,12 +932,18 @@ func (r *Reconciler) reconcileInactiveAccounts( // nolint:gocognit
time.Sleep(inactiveReconciliationSleep)
}
}

return ctx.Err()
}

// Reconcile starts the active and inactive Reconciler goroutines.
// If any goroutine errors, the function will return an error.
func (r *Reconciler) Reconcile(ctx context.Context) error {
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
return r.queueWorker(ctx)
})

for j := 0; j < r.ActiveConcurrency; j++ {
g.Go(func() error {
return r.reconcileActiveAccounts(ctx)
Expand Down
Loading