From 3d0279e07dc4ccf90e6c92d1232a9b49ac6153de Mon Sep 17 00:00:00 2001 From: Stewart Jingga Date: Mon, 12 Sep 2022 16:14:18 +0700 Subject: [PATCH 1/2] fix: bigquery extractor panic --- agent/agent.go | 2 +- plugins/extractors/bigquery/bigquery.go | 7 +++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index b05170810..46ec361e4 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -171,7 +171,7 @@ func (r *Agent) Run(ctx context.Context, recipe recipe.Recipe) (run Run) { go func() { defer func() { if r := recover(); r != nil { - run.Error = fmt.Errorf("%s", r) + run.Error = fmt.Errorf("recover run panic: %s", r) } stream.Close() }() diff --git a/plugins/extractors/bigquery/bigquery.go b/plugins/extractors/bigquery/bigquery.go index 2c270092f..4a0eeea8d 100644 --- a/plugins/extractors/bigquery/bigquery.go +++ b/plugins/extractors/bigquery/bigquery.go @@ -150,10 +150,9 @@ func (e *Extractor) extractTable(ctx context.Context, ds *bigquery.Dataset, emit tb := ds.Tables(ctx) for { table, err := tb.Next() - if err == iterator.Done { + if errors.Is(err, iterator.Done) || errors.Is(err, context.Canceled) { break - } - if err != nil { + } else if err != nil { e.logger.Error("failed to get table, skipping table", "err", err) continue } @@ -220,7 +219,6 @@ func (e *Extractor) buildAsset(ctx context.Context, t *bigquery.Table, md *bigqu "project": t.ProjectID, "type": string(md.Type), "partition_field": partitionField, - "labels": md.Labels, }), }) if err != nil { @@ -234,6 +232,7 @@ func (e *Extractor) buildAsset(ctx context.Context, t *bigquery.Table, md *bigqu Description: md.Description, Service: "bigquery", Data: table, + Labels: md.Labels, CreateTime: timestamppb.New(md.CreationTime), UpdateTime: timestamppb.New(md.LastModifiedTime), }, nil From dd3e07fc588cb94182004d1d5ea8bbac5733dbb5 Mon Sep 17 00:00:00 2001 From: Stewart Jingga Date: Mon, 12 Sep 2022 16:26:01 +0700 Subject: [PATCH 2/2] refactor(agent): use better error message on panic --- agent/agent.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent/agent.go b/agent/agent.go index 46ec361e4..6102d87c0 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -171,7 +171,7 @@ func (r *Agent) Run(ctx context.Context, recipe recipe.Recipe) (run Run) { go func() { defer func() { if r := recover(); r != nil { - run.Error = fmt.Errorf("recover run panic: %s", r) + run.Error = fmt.Errorf("agent run: close stream: panic: %s", r) } stream.Close() }()