diff --git a/plugins/extractors/bigquery/README.md b/plugins/extractors/bigquery/README.md index 25f88c875..93b4dd8c9 100644 --- a/plugins/extractors/bigquery/README.md +++ b/plugins/extractors/bigquery/README.md @@ -8,6 +8,12 @@ source: config: project_id: google-project-id table_pattern: gofood.fact_ + exclude: + datasets: + - dataset_a + - dataset_b + tables: + - dataset_c.table_a max_page_size: 100 profile_column: true service_account_base64: _________BASE64_ENCODED_SERVICE_ACCOUNT_________________ diff --git a/plugins/extractors/bigquery/bigquery.go b/plugins/extractors/bigquery/bigquery.go index d2d25ada9..0f09d8a5f 100644 --- a/plugins/extractors/bigquery/bigquery.go +++ b/plugins/extractors/bigquery/bigquery.go @@ -5,6 +5,7 @@ import ( _ "embed" // used to print the embedded assets "encoding/base64" "encoding/json" + "fmt" "html/template" "strings" "sync" @@ -36,6 +37,7 @@ type Config struct { ServiceAccountJSON string `mapstructure:"service_account_json"` MaxPageSize int `mapstructure:"max_page_size"` TablePattern string `mapstructure:"table_pattern"` + Exclude Exclude `mapstructure:"exclude"` IncludeColumnProfile bool `mapstructure:"include_column_profile"` MaxPreviewRows int `mapstructure:"max_preview_rows" default:"30"` IsCollectTableUsage bool `mapstructure:"collect_table_usage" default:"false"` @@ -43,6 +45,13 @@ type Config struct { UsageProjectIDs []string `mapstructure:"usage_project_ids"` } +type Exclude struct { + // list of datasetIDs + Datasets []string `mapstructure:"datasets"` + // list of tableNames in format - datasetID.tableID + Tables []string `mapstructure:"tables"` +} + const ( maxPageSizeDefault = 100 ) @@ -50,6 +59,12 @@ const ( var sampleConfig = ` project_id: google-project-id table_pattern: gofood.fact_ +exclude: + datasets: + - dataset_a + - dataset_b + tables: + - dataset_c.table_a max_page_size: 100 include_column_profile: true # Only one of service_account_base64 / service_account_json is needed. @@ -142,6 +157,10 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error) if err != nil { return errors.Wrap(err, "failed to fetch dataset") } + if IsExcludedDataset(ds.DatasetID, e.config.Exclude.Datasets) { + e.logger.Debug("excluding dataset from bigquery extract", "dataset_id", ds.DatasetID) + continue + } e.extractTable(ctx, ds, emit) } @@ -180,6 +199,11 @@ func (e *Extractor) extractTable(ctx context.Context, ds *bigquery.Dataset, emit continue } + if IsExcludedTable(ds.DatasetID, table.TableID, e.config.Exclude.Tables) { + e.logger.Debug("excluding table from bigquery extract", "dataset_id", ds.DatasetID, "table_id", table.TableID) + continue + } + tableFQN := table.FullyQualifiedName() e.logger.Debug("extracting table", "table", tableFQN) @@ -311,7 +335,14 @@ func (e *Extractor) buildPreview(ctx context.Context, t *bigquery.Table) (fields tempRows := []interface{}{} totalRows := 0 ri := t.Read(ctx) - ri.PageInfo().MaxSize = e.getMaxPageSize() + // fetch only the required amount of rows + maxPageSize := e.getMaxPageSize() + if maxPageSize > e.config.MaxPreviewRows { + ri.PageInfo().MaxSize = e.config.MaxPreviewRows + } else { + ri.PageInfo().MaxSize = maxPageSize + } + for totalRows < e.config.MaxPreviewRows { var row []bigquery.Value err = ri.Next(&row) @@ -446,6 +477,27 @@ func (e *Extractor) getColumnMode(col *bigquery.FieldSchema) string { } } +func IsExcludedDataset(datasetID string, excludedDatasets []string) bool { + for _, d := range excludedDatasets { + if datasetID == d { + return true + } + } + + return false +} + +func IsExcludedTable(datasetID, tableID string, excludedTables []string) bool { + tableName := fmt.Sprintf("%s.%s", datasetID, tableID) + for _, t := range excludedTables { + if tableName == t { + return true + } + } + + return false +} + // getMaxPageSize returns max_page_size if configured in recipe, otherwise returns default value func (e *Extractor) getMaxPageSize() int { if e.config.MaxPageSize > 0 { diff --git a/plugins/extractors/bigquery/bigquery_test.go b/plugins/extractors/bigquery/bigquery_test.go index ef458f7b2..e2748f46e 100644 --- a/plugins/extractors/bigquery/bigquery_test.go +++ b/plugins/extractors/bigquery/bigquery_test.go @@ -53,3 +53,108 @@ func TestInit(t *testing.T) { assert.ErrorContains(t, err, "failed to decode base64 service account") }) } + +func TestIsExcludedTable(t *testing.T) { + type args struct { + datasetID string + tableID string + excludedTables []string + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "should return false when excluded table list is nil", + args: args{ + datasetID: "dataset_a", + tableID: "table_b", + excludedTables: nil, + }, + want: false, + }, + { + name: "should return false when excluded table list is empty", + args: args{ + datasetID: "dataset_a", + tableID: "table_b", + excludedTables: []string{}, + }, + want: false, + }, + { + name: "should return false if table is not in excluded list", + args: args{ + datasetID: "dataset_a", + tableID: "table_b", + excludedTables: []string{"ds1.table1", "playground.test_weekly"}, + }, + want: false, + }, + { + name: "should return true if table is in excluded list", + args: args{ + datasetID: "dataset_a", + tableID: "table_b", + excludedTables: []string{"ds1.table1", "playground.test_weekly", "dataset_a.table_b"}, + }, + want: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equalf(t, tt.want, bigquery.IsExcludedTable(tt.args.datasetID, tt.args.tableID, tt.args.excludedTables), "IsExcludedTable(%v, %v, %v)", tt.args.datasetID, tt.args.tableID, tt.args.excludedTables) + }) + } +} + +func TestIsExcludedDataset(t *testing.T) { + type args struct { + datasetID string + excludedDatasets []string + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "should return false is list is empty", + args: args{ + datasetID: "dataset_a", + excludedDatasets: []string{}, + }, + want: false, + }, + { + name: "should return false is list is nil", + args: args{ + datasetID: "dataset_a", + excludedDatasets: nil, + }, + want: false, + }, + { + name: "should return false is dataset is not in excluded list", + args: args{ + datasetID: "dataset_a", + excludedDatasets: []string{"dataset_b", "dataset_c"}, + }, + want: false, + }, + { + name: "should return true is dataset is in excluded list", + args: args{ + datasetID: "dataset_a", + excludedDatasets: []string{"dataset_a", "dataset_b", "dataset_c"}, + }, + want: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equalf(t, tt.want, bigquery.IsExcludedDataset(tt.args.datasetID, tt.args.excludedDatasets), "IsExcludedDataset(%v, %v)", tt.args.datasetID, tt.args.excludedDatasets) + }) + } +}