Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions internal/extractor/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func extractBlocksAndTransactions(gRPCClient *client.GRPCClient, start, stop uin
} else {
slog.Info("Extracting blocks and transactions", "height", start)
}

var bar *progressbar.ProgressBar
if displayProgress {
bar = progressbar.NewOptions64(
Expand Down Expand Up @@ -101,15 +102,15 @@ func processBlocks(gRPCClient *client.GRPCClient, start, stop uint64, outputHand

err := processSingleBlockWithRetry(clientWithCtx, blockHeight, outputHandler, maxRetries)
if err != nil {
if !errors.Is(err, context.Canceled) {
slog.Error("Block processing error",
"height", blockHeight,
"error", err,
"errorType", fmt.Sprintf("%T", err))
return err
if errors.Is(err, context.Canceled) {
return fmt.Errorf("failed to process block %d: %w", blockHeight, err)
}
slog.Error("Failed to process block", "height", blockHeight, "error", err, "retries", maxRetries)
return fmt.Errorf("failed to process block %d: %w", blockHeight, err)

slog.Error("Block processing error",
"height", blockHeight,
"error", err,
"errorType", fmt.Sprintf("%T", err))
return err
}

if bar != nil {
Expand Down
30 changes: 20 additions & 10 deletions internal/extractor/extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,36 +47,46 @@ func Extract(gRPCClient *client.GRPCClient, outputHandler output.OutputHandler,
return nil
}

// setBlockRange sets correct the block range based on the configuration.
// If the start block is not set, it will be set to the latest block in the database.
// If the stop block is not set, it will be set to the latest block in the gRPC server.
// If the start block is greater than the stop block, an error will be returned.
// setBlockRange sets the block range based on the configuration.
// If the start block is not set, it will be set to the latest block in the database + 1.
// If the database is empty, it queries the node for the earliest available block.
// If the stop block is not set, it will be set to the latest block on the node.
// Returns an error if the start block is greater than the stop block.
func setBlockRange(gRPCClient *client.GRPCClient, outputHandler output.OutputHandler, cfg *config.ExtractConfig) error {
if cfg.ReIndex {
slog.Info("Reindexing entire database...")
// TODO: Get the earliest block from the gRPC server
// See https://github.com/manifest-network/yaci/issues/28
cfg.BlockStart = 1
earliestLocalBlock, err := outputHandler.GetEarliestBlock(gRPCClient.Ctx)
if err != nil {
return fmt.Errorf("failed to get the earliest local block: %w", err)
}
if earliestLocalBlock != nil {
cfg.BlockStart = earliestLocalBlock.ID
} else {
// Fresh DB with reindex - probe for earliest available
earliestAvailable, err := utils.GetEarliestBlockHeight(gRPCClient, cfg.MaxRetries)
if err != nil {
return fmt.Errorf("failed to determine earliest available block: %w", err)
}
cfg.BlockStart = earliestAvailable
}
cfg.BlockStop = 0
}

if cfg.BlockStart == 0 {
// TODO: Get the earliest block from the gRPC server
// See https://github.com/manifest-network/yaci/issues/28
cfg.BlockStart = 1
latestLocalBlock, err := outputHandler.GetLatestBlock(gRPCClient.Ctx)
if err != nil {
return fmt.Errorf("failed to get the latest block: %w", err)
}
if latestLocalBlock != nil {
// Resume from existing DB - no probe needed
cfg.BlockStart = latestLocalBlock.ID + 1
} else {
// Fresh DB - probe to find earliest available block on node
earliestAvailable, err := utils.GetEarliestBlockHeight(gRPCClient, cfg.MaxRetries)
if err != nil {
return fmt.Errorf("failed to determine earliest available block: %w", err)
}
cfg.BlockStart = earliestAvailable
}
}

Expand Down
47 changes: 47 additions & 0 deletions internal/utils/block.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package utils

import (
"fmt"
"regexp"
"strconv"
"strings"

"github.com/manifest-network/yaci/internal/client"
"github.com/pkg/errors"
)

const statusMethod = "cosmos.base.node.v1beta1.Service.Status"
const getBlockByHeightMethod = "cosmos.base.tendermint.v1beta1.Service.GetBlockByHeight"

// GetLatestBlockHeightWithRetry retrieves the latest block height from the gRPC server with retry logic.
func GetLatestBlockHeightWithRetry(gRPCClient *client.GRPCClient, maxRetries uint) (uint64, error) {
Expand All @@ -25,3 +29,46 @@ func GetLatestBlockHeightWithRetry(gRPCClient *client.GRPCClient, maxRetries uin
},
)
}

// GetEarliestBlockHeight determines the earliest available block on a node.
// It probes block 1 to check if the node is an archive node or pruned.
// For archive nodes, returns 1. For pruned nodes, parses the error message
// to extract the lowest available height.
func GetEarliestBlockHeight(gRPCClient *client.GRPCClient, maxRetries uint) (uint64, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this function a little more robust like suggested in the first review?

I saw cases there the lowest height from error wasn't working because the query hit another node and it didn't have that height. I.e., the other node has a lowest height higher than previously reported.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I didn't consider the case of load balancers using nodes with varying heights.. is this common?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I didn't consider the case of load balancers using nodes with varying heights.. is this common?

I encountered this issue multiple times while building this project, primarily with Osmosis and the Hub. I'm not sure if it's common, but I believe it's common enough to address. I'm surprised you didn't encounter this issue during your tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I typically use my own nodes for dev/testing (mainly because of issues like this with public ones).
I will have something for this early tomorrow.

This comment was marked as outdated.

inputParams := []byte(`{"height":"1"}`)

// Fast path: single attempt to check if block 1 exists
_, err := GetGRPCResponse(gRPCClient, getBlockByHeightMethod, 1, inputParams)
if err == nil {
return 1, nil // Archive node with full history
}

// Check if error reveals the pruning boundary
if lowestHeight := parseLowestHeightFromError(err.Error()); lowestHeight > 0 {
return lowestHeight, nil
}

// Error was neither "block exists" nor "pruned" - retry in case of transient failure
_, err = GetGRPCResponse(gRPCClient, getBlockByHeightMethod, maxRetries, inputParams)
if err == nil {
return 1, nil
}

return 0, fmt.Errorf("failed to determine earliest block height: %w", err)
}

// parseLowestHeightFromError extracts lowest height from pruned node errors.
// CosmosSDK nodes return errors like "height 1 is not available, lowest height is 28566001".
func parseLowestHeightFromError(errMsg string) uint64 {
re := regexp.MustCompile(`lowest height is (\d+)`)
matches := re.FindStringSubmatch(strings.ToLower(errMsg))

if len(matches) >= 2 {
height, err := strconv.ParseUint(matches[1], 10, 64)
if err == nil {
return height
}
}

return 0
}
41 changes: 41 additions & 0 deletions internal/utils/block_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package utils

import "testing"

func TestParseLowestHeightFromError(t *testing.T) {
tests := []struct {
name string
errMsg string
want uint64
}{
{
name: "standard pruned node error",
errMsg: "height 1 is not available, lowest height is 28566001",
want: 28566001,
},
{
name: "wrapped error",
errMsg: "rpc error: code = Unknown desc = height 1 is not available, lowest height is 12345",
want: 12345,
},
{
name: "unrelated error",
errMsg: "connection refused",
want: 0,
},
{
name: "empty string",
errMsg: "",
want: 0,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := parseLowestHeightFromError(tt.errMsg)
if got != tt.want {
t.Errorf("parseLowestHeightFromError() = %d, want %d", got, tt.want)
}
})
}
}
Loading