diff --git a/agent/agent.go b/agent/agent.go index b05170810..6102d87c0 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -171,7 +171,7 @@ func (r *Agent) Run(ctx context.Context, recipe recipe.Recipe) (run Run) { go func() { defer func() { if r := recover(); r != nil { - run.Error = fmt.Errorf("%s", r) + run.Error = fmt.Errorf("agent run: close stream: panic: %s", r) } stream.Close() }() diff --git a/plugins/extractors/bigquery/bigquery.go b/plugins/extractors/bigquery/bigquery.go index 2c270092f..4a0eeea8d 100644 --- a/plugins/extractors/bigquery/bigquery.go +++ b/plugins/extractors/bigquery/bigquery.go @@ -150,10 +150,9 @@ func (e *Extractor) extractTable(ctx context.Context, ds *bigquery.Dataset, emit tb := ds.Tables(ctx) for { table, err := tb.Next() - if err == iterator.Done { + if errors.Is(err, iterator.Done) || errors.Is(err, context.Canceled) { break - } - if err != nil { + } else if err != nil { e.logger.Error("failed to get table, skipping table", "err", err) continue } @@ -220,7 +219,6 @@ func (e *Extractor) buildAsset(ctx context.Context, t *bigquery.Table, md *bigqu "project": t.ProjectID, "type": string(md.Type), "partition_field": partitionField, - "labels": md.Labels, }), }) if err != nil { @@ -234,6 +232,7 @@ func (e *Extractor) buildAsset(ctx context.Context, t *bigquery.Table, md *bigqu Description: md.Description, Service: "bigquery", Data: table, + Labels: md.Labels, CreateTime: timestamppb.New(md.CreationTime), UpdateTime: timestamppb.New(md.LastModifiedTime), }, nil