diff --git a/jrpc2/client.go b/jrpc2/client.go index f972cca2..2fc918c3 100644 --- a/jrpc2/client.go +++ b/jrpc2/client.go @@ -107,6 +107,10 @@ func (c *Client) NextURL() *URL { return c.urls[next] } +func (c *Client) NumURLs() int { + return len(c.urls) +} + func (c *Client) WithMaxReads(n int) *Client { c.lcache.maxreads = n c.bcache.maxreads = n diff --git a/shovel/task.go b/shovel/task.go index 4bc6c6aa..cc38782a 100644 --- a/shovel/task.go +++ b/shovel/task.go @@ -36,6 +36,7 @@ type Source interface { Latest(context.Context, string, uint64) (uint64, []byte, error) Hash(context.Context, string, uint64) ([]byte, error) NextURL() *jrpc2.URL + NumURLs() int } type Destination interface { @@ -187,6 +188,8 @@ type Task struct { dests []Destination destFactory func(config.Integration) (Destination, error) destConfig config.Integration + + lastURLHost string // set by Converge for retry tracking } func (t *Task) update( @@ -350,6 +353,7 @@ func (task *Task) Converge() error { url = nextURL.String() nrpc = uint64(0) ) + task.lastURLHost = nextURL.Hostname() ctx = wctx.WithSrcHost(ctx, nextURL.Hostname()) ctx = wctx.WithCounter(ctx, &nrpc) @@ -487,7 +491,7 @@ func (t *Task) load( ctx = wctx.WithNumLimit(ctx, m, n) b, err := t.src.Get(ctx, url, &t.filter, m, n) if err != nil { - slog.ErrorContext(ctx, "loading blocks", "error", err) + slog.WarnContext(ctx, "loading blocks", "error", err) return fmt.Errorf("loading blocks: %w", err) } blocksMut.Lock() @@ -706,6 +710,8 @@ func (tm *Manager) Updates() uint64 { } func (tm *Manager) runTask(t *Task) { + numURLs := t.src.NumURLs() + failedURLs := make(map[string]struct{}, numURLs) for { select { case <-tm.restart: @@ -719,9 +725,21 @@ func (tm *Manager) runTask(t *Task) { case errors.Is(err, ErrNothingNew): time.Sleep(t.pollDuration) case err != nil: + failedURLs[t.lastURLHost] = struct{}{} time.Sleep(time.Second) - slog.ErrorContext(t.ctx, "converge-retry", "msg", err) + if len(failedURLs) >= numURLs { + slog.ErrorContext(t.ctx, "converge-retry", + "msg", err, + "failed-urls", len(failedURLs), + ) + } else { + slog.WarnContext(t.ctx, "converge-retry", + "msg", err, + "failed-urls", len(failedURLs), + ) + } default: + failedURLs = make(map[string]struct{}, numURLs) go func() { // try out best to deliver update // but don't stack up work diff --git a/shovel/task_test.go b/shovel/task_test.go index afbd4159..c90e5546 100644 --- a/shovel/task_test.go +++ b/shovel/task_test.go @@ -100,6 +100,10 @@ func (tg *testGeth) NextURL() *jrpc2.URL { return jrpc2.MustURL("") } +func (tg *testGeth) NumURLs() int { + return 1 +} + func (tg *testGeth) factory(config.Source, glf.Filter) Source { return tg }