Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion plugins/extractors/bigquery/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ source:
| `project_id` | `string` | `my-project` | BigQuery Project ID | *required* |
| `credentials_json` | `string` | `{"private_key": .., "private_id": ...}` | Service Account in JSON string | *optional* |
| `table_pattern` | `string` | `gofood.fact_` | Regex pattern to filter which bigquery table to scan (whitelist) | *optional* |
| `profile_column` | `bool` | `true` | true if you want to profile the column value such min, max, med, avg, top, and freq | *optional* |
| `include_column_profile` | `bool` | `true` | true if you want to profile the column value such min, max, med, avg, top, and freq | *optional* |
| `max_preview_rows` | `int` | `30` | max number of preview rows to fetch, `0` will skip preview fetching. Default to `30`. | *optional* |

### *Notes*

Expand Down
11 changes: 6 additions & 5 deletions plugins/extractors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,13 @@ import (
//go:embed README.md
var summary string

// previewTotalRows is the number of rows to preview
const previewTotalRows = 30

// Config hold the set of configuration for the bigquery extractor
type Config struct {
ProjectID string `mapstructure:"project_id" validate:"required"`
ServiceAccountJSON string `mapstructure:"service_account_json"`
TablePattern string `mapstructure:"table_pattern"`
IncludeColumnProfile bool `mapstructure:"include_column_profile"`
MaxPreviewRows int `mapstructure:"max_preview_rows" default:"30"`
}

var sampleConfig = `
Expand Down Expand Up @@ -92,7 +90,7 @@ func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{})

e.client, err = e.createClient(ctx)
if err != nil {
return
return errors.Wrap(err, "error creating client")
}

return
Expand Down Expand Up @@ -235,11 +233,14 @@ func (e *Extractor) buildColumn(ctx context.Context, field *bigquery.FieldSchema
func (e *Extractor) buildPreview(ctx context.Context, t *bigquery.Table) (fields []interface{}, preview []interface{}, err error) {
fields = []interface{}{} // list of column names
preview = []interface{}{} // rows of column values
if e.config.MaxPreviewRows == 0 {
return
}

rows := []interface{}{}
totalRows := 0
ri := t.Read(ctx)
for totalRows < previewTotalRows {
for totalRows < e.config.MaxPreviewRows {
var row []bigquery.Value
err = ri.Next(&row)
if err == iterator.Done {
Expand Down
10 changes: 10 additions & 0 deletions plugins/extractors/bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,14 @@ func TestInit(t *testing.T) {

assert.Equal(t, plugins.InvalidConfigError{}, err)
})
t.Run("should not return invalid config error if config is valid", func(t *testing.T) {
extr := bigquery.New(test.Logger)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := extr.Init(ctx, map[string]interface{}{
"project_id": "sample-project",
})

assert.NotEqual(t, plugins.InvalidConfigError{}, err)
})
}
4 changes: 2 additions & 2 deletions utils/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ func init() {

// BuildConfig builds a config struct from a map
func BuildConfig(configMap map[string]interface{}, c interface{}) (err error) {
defaults.SetDefaults(c)

err = mapstructure.Decode(configMap, c)
if err != nil {
return err
}

defaults.SetDefaults(c)

err = validate.Struct(c)
if err != nil {
return err
Expand Down