Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions plugins/extractors/bigquery/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ source:
config:
project_id: google-project-id
table_pattern: gofood.fact_
exclude:
datasets:
- dataset_a
- dataset_b
tables:
- dataset_c.table_a
max_page_size: 100
profile_column: true
service_account_base64: _________BASE64_ENCODED_SERVICE_ACCOUNT_________________
Expand Down
48 changes: 47 additions & 1 deletion plugins/extractors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
_ "embed" // used to print the embedded assets
"encoding/base64"
"encoding/json"
"fmt"
"html/template"
"strings"
"sync"
Expand Down Expand Up @@ -36,20 +37,34 @@ type Config struct {
ServiceAccountJSON string `mapstructure:"service_account_json"`
MaxPageSize int `mapstructure:"max_page_size"`
TablePattern string `mapstructure:"table_pattern"`
Exclude Exclude `mapstructure:"exclude"`
IncludeColumnProfile bool `mapstructure:"include_column_profile"`
MaxPreviewRows int `mapstructure:"max_preview_rows" default:"30"`
IsCollectTableUsage bool `mapstructure:"collect_table_usage" default:"false"`
UsagePeriodInDay int64 `mapstructure:"usage_period_in_day" default:"7"`
UsageProjectIDs []string `mapstructure:"usage_project_ids"`
}

type Exclude struct {
// list of datasetIDs
Datasets []string `mapstructure:"datasets"`
// list of tableNames in format - datasetID.tableID
Tables []string `mapstructure:"tables"`
}

const (
maxPageSizeDefault = 100
)

var sampleConfig = `
project_id: google-project-id
table_pattern: gofood.fact_
exclude:
datasets:
- dataset_a
- dataset_b
tables:
- dataset_c.table_a
max_page_size: 100
include_column_profile: true
# Only one of service_account_base64 / service_account_json is needed.
Expand Down Expand Up @@ -142,6 +157,10 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error)
if err != nil {
return errors.Wrap(err, "failed to fetch dataset")
}
if IsExcludedDataset(ds.DatasetID, e.config.Exclude.Datasets) {
e.logger.Debug(fmt.Sprintf("excluding dataset from bigquery extract: %v", ds.DatasetID))
continue
}
e.extractTable(ctx, ds, emit)
}

Expand Down Expand Up @@ -180,6 +199,11 @@ func (e *Extractor) extractTable(ctx context.Context, ds *bigquery.Dataset, emit
continue
}

if IsExcludedTable(ds.DatasetID, table.TableID, e.config.Exclude.Tables) {
e.logger.Debug(fmt.Sprintf("excluding table from bigquery extract: %v.%v", ds.DatasetID, table.TableID))
continue
}

tableFQN := table.FullyQualifiedName()

e.logger.Debug("extracting table", "table", tableFQN)
Expand Down Expand Up @@ -311,7 +335,8 @@ func (e *Extractor) buildPreview(ctx context.Context, t *bigquery.Table) (fields
tempRows := []interface{}{}
totalRows := 0
ri := t.Read(ctx)
ri.PageInfo().MaxSize = e.getMaxPageSize()
// fetch only the required amount of rows
ri.PageInfo().MaxSize = e.config.MaxPreviewRows
for totalRows < e.config.MaxPreviewRows {
var row []bigquery.Value
err = ri.Next(&row)
Expand Down Expand Up @@ -446,6 +471,27 @@ func (e *Extractor) getColumnMode(col *bigquery.FieldSchema) string {
}
}

func IsExcludedDataset(datasetID string, excludedDatasets []string) bool {
for _, d := range excludedDatasets {
if datasetID == d {
return true
}
}

return false
}

func IsExcludedTable(datasetID, tableID string, excludedTables []string) bool {
tableName := fmt.Sprintf("%s.%s", datasetID, tableID)
for _, t := range excludedTables {
if tableName == t {
return true
}
}

return false
}

// getMaxPageSize returns max_page_size if configured in recipe, otherwise returns default value
func (e *Extractor) getMaxPageSize() int {
if e.config.MaxPageSize > 0 {
Expand Down
105 changes: 105 additions & 0 deletions plugins/extractors/bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,108 @@ func TestInit(t *testing.T) {
assert.ErrorContains(t, err, "failed to decode base64 service account")
})
}

func TestIsExcludedTable(t *testing.T) {
type args struct {
datasetID string
tableID string
excludedTables []string
}
tests := []struct {
name string
args args
want bool
}{
{
name: "should return false when excluded table list is nil",
args: args{
datasetID: "dataset_a",
tableID: "table_b",
excludedTables: nil,
},
want: false,
},
{
name: "should return false when excluded table list is empty",
args: args{
datasetID: "dataset_a",
tableID: "table_b",
excludedTables: []string{},
},
want: false,
},
{
name: "should return false if table is not in excluded list",
args: args{
datasetID: "dataset_a",
tableID: "table_b",
excludedTables: []string{"ds1.table1", "playground.test_weekly"},
},
want: false,
},
{
name: "should return true if table is in excluded list",
args: args{
datasetID: "dataset_a",
tableID: "table_b",
excludedTables: []string{"ds1.table1", "playground.test_weekly", "dataset_a.table_b"},
},
want: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equalf(t, tt.want, bigquery.IsExcludedTable(tt.args.datasetID, tt.args.tableID, tt.args.excludedTables), "IsExcludedTable(%v, %v, %v)", tt.args.datasetID, tt.args.tableID, tt.args.excludedTables)
})
}
}

func TestIsExcludedDataset(t *testing.T) {
type args struct {
datasetID string
excludedDatasets []string
}
tests := []struct {
name string
args args
want bool
}{
{
name: "should return false is list is empty",
args: args{
datasetID: "dataset_a",
excludedDatasets: []string{},
},
want: false,
},
{
name: "should return false is list is nil",
args: args{
datasetID: "dataset_a",
excludedDatasets: nil,
},
want: false,
},
{
name: "should return false is dataset is not in excluded list",
args: args{
datasetID: "dataset_a",
excludedDatasets: []string{"dataset_b", "dataset_c"},
},
want: false,
},
{
name: "should return true is dataset is in excluded list",
args: args{
datasetID: "dataset_a",
excludedDatasets: []string{"dataset_a", "dataset_b", "dataset_c"},
},
want: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equalf(t, tt.want, bigquery.IsExcludedDataset(tt.args.datasetID, tt.args.excludedDatasets), "IsExcludedDataset(%v, %v)", tt.args.datasetID, tt.args.excludedDatasets)
})
}
}