From b04d5f0f2fd9838c161d7e955762ceabca96f322 Mon Sep 17 00:00:00 2001 From: Chief-Rishab Date: Sun, 24 Sep 2023 21:47:33 +0530 Subject: [PATCH 1/2] feat: instrument bigquery extractor with OpenTelemetry --- .../extractors/bigquery/auditlog/auditlog.go | 121 +++++++++------- plugins/extractors/bigquery/bigquery.go | 131 ++++++++++++++++-- plugins/util.go | 11 ++ 3 files changed, 206 insertions(+), 57 deletions(-) diff --git a/plugins/extractors/bigquery/auditlog/auditlog.go b/plugins/extractors/bigquery/auditlog/auditlog.go index b2e81a300..1de5228a5 100644 --- a/plugins/extractors/bigquery/auditlog/auditlog.go +++ b/plugins/extractors/bigquery/auditlog/auditlog.go @@ -5,9 +5,14 @@ import ( "fmt" "time" + "cloud.google.com/go/bigquery" "cloud.google.com/go/logging/logadmin" "github.com/pkg/errors" + "github.com/raystack/meteor/plugins" "github.com/raystack/salt/log" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "google.golang.org/api/iterator" "google.golang.org/api/option" auditpb "google.golang.org/genproto/googleapis/cloud/audit" @@ -23,24 +28,38 @@ type Config struct { UsageProjectIDs []string } -const advancedFilterTemplate = `protoPayload.methodName="jobservice.jobcompleted" AND ` + - `resource.type="bigquery_resource" AND NOT ` + - `protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.query.query:(INFORMATION_SCHEMA OR __TABLES__) AND ` + - `timestamp >= "%s" AND timestamp < "%s" AND %s` +const ( + advancedFilterTemplate = `protoPayload.methodName="jobservice.jobcompleted" AND ` + + `resource.type="bigquery_resource" AND NOT ` + + `protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.query.query:(INFORMATION_SCHEMA OR __TABLES__) AND ` + + `timestamp >= "%s" AND timestamp < "%s" AND %s` + + metricTableDurn = "meteor.bq.client.table.duration" +) type AuditLog struct { logger log.Logger client *logadmin.Client config Config + + histogram metric.Int64Histogram } func New(logger log.Logger) *AuditLog { + h, err := otel.Meter("github.com/raystack/meteor/plugins/extractors/bigquery"). + Int64Histogram(metricTableDurn, metric.WithUnit("ms")) + if err != nil { + otel.Handle(err) + } + return &AuditLog{ logger: logger, + + histogram: h, } } -func (l *AuditLog) Init(ctx context.Context, opts ...InitOption) (err error) { +func (l *AuditLog) Init(ctx context.Context, opts ...InitOption) error { for _, opt := range opts { opt(l) } @@ -50,38 +69,46 @@ func (l *AuditLog) Init(ctx context.Context, opts ...InitOption) (err error) { } if l.client == nil { + var err error l.client, err = l.createClient(ctx) if err != nil { - err = errors.Wrap(err, "failed to create logadmin client") - return + return fmt.Errorf("create logadmin client: %w", err) } } - return + + return nil } -func (l *AuditLog) createClient(ctx context.Context) (client *logadmin.Client, err error) { +func (l *AuditLog) createClient(ctx context.Context) (*logadmin.Client, error) { if l.config.ServiceAccountJSON == "" { l.logger.Info("credentials are not specified, creating logadmin using default credentials...") - client, err = logadmin.NewClient(ctx, l.config.ProjectID) - return + return logadmin.NewClient(ctx, l.config.ProjectID) } - client, err = logadmin.NewClient(ctx, l.config.ProjectID, option.WithCredentialsJSON([]byte(l.config.ServiceAccountJSON))) - if err != nil { - err = errors.New("client is nil, failed initiating client") - } - return + return logadmin.NewClient(ctx, l.config.ProjectID, option.WithCredentialsJSON([]byte(l.config.ServiceAccountJSON))) } -func (l *AuditLog) Collect(ctx context.Context, tableID string) (tableStats *TableStats, err error) { +func (l *AuditLog) Collect(ctx context.Context, tbl *bigquery.Table) (stats *TableStats, err error) { + defer func(start time.Time) { + attrs := []attribute.KeyValue{ + attribute.String("bq.operation", "table.audit_logs"), + attribute.String("bq.project_id", tbl.ProjectID), + attribute.String("bq.dataset_id", tbl.DatasetID), + } + if err != nil { + attrs = append(attrs, attribute.String("bq.error_code", plugins.BQErrReason(err))) + } + + l.histogram.Record( + ctx, time.Since(start).Milliseconds(), metric.WithAttributes(attrs...), + ) + }(time.Now()) + if l.client == nil { - err = errors.New("auditlog client is nil") - return + return nil, errors.New("auditlog client is nil") } - tableStats = NewTableStats() - - filter := l.buildFilter(tableID) + filter := l.buildFilter(tbl.TableID) it := l.client.Entries(ctx, logadmin.ProjectIDs(l.config.UsageProjectIDs), logadmin.Filter(filter)) @@ -89,32 +116,31 @@ func (l *AuditLog) Collect(ctx context.Context, tableID string) (tableStats *Tab l.logger.Info("getting logs in these projects", "projects", l.config.UsageProjectIDs) l.logger.Info("getting logs with the filter", "filter", filter) + stats = NewTableStats() for { - entry, errF := it.Next() - if errF == iterator.Done { + entry, err := it.Next() + if errors.Is(err, iterator.Done) { break } - if errF != nil { - err = errors.Wrap(errF, "error iterating logEntries") - break + if err != nil { + return nil, fmt.Errorf("error iterating logEntries: %w", err) } - logData, errF := parsePayload(entry.Payload) - if errF != nil { - l.logger.Warn("error parsing LogEntry payload", "err", errF) + logData, err := parsePayload(entry.Payload) + if err != nil { + l.logger.Warn("error parsing LogEntry payload", "err", err) continue } - if errF := tableStats.Populate(logData); errF != nil { + if errF := stats.Populate(logData); errF != nil { l.logger.Warn("error populating logdata", "err", errF) continue } } - return + return stats, nil } func (l *AuditLog) buildFilter(tableID string) string { - timeNow := time.Now().UTC() dayDuration := time.Duration(24*l.config.UsagePeriodInDay) * time.Hour timeFrom := timeNow.Add(-1 * dayDuration) @@ -125,23 +151,23 @@ func (l *AuditLog) buildFilter(tableID string) string { return fmt.Sprintf(advancedFilterTemplate, timeFromFormatted, timeNowFormatted, tableID) } -func parsePayload(payload interface{}) (ld *LogData, err error) { - - ad := &loggingpb.AuditData{} +func parsePayload(payload interface{}) (*LogData, error) { pl, ok := payload.(*auditpb.AuditLog) if !ok { - err = errors.New("cannot parse payload to AuditLog") - return + return nil, errors.New("parse payload to AuditLog") } - if errPB := getAuditData(pl, ad); errPB != nil { - err = errors.Wrap(errPB, "failed to get audit data from metadata") - return + var ad loggingpb.AuditData + if err := getAuditData(pl, &ad); err != nil { + return nil, fmt.Errorf("get audit data from metadata: %w", err) } - ld = &LogData{ad} - err = ld.validateAuditData() - return + ld := &LogData{&ad} + if err := ld.validateAuditData(); err != nil { + return nil, err + } + + return ld, nil } func getAuditData(pl *auditpb.AuditLog, ad *loggingpb.AuditData) error { @@ -160,24 +186,23 @@ func getAuditData(pl *auditpb.AuditLog, ad *loggingpb.AuditData) error { func getAuditDataFromServiceData(pl *auditpb.AuditLog, ad *loggingpb.AuditData) error { //nolint:staticcheck if err := pl.GetServiceData().UnmarshalTo(ad); err != nil { - return errors.Wrap(err, "failed to marshal service data to audit data") + return fmt.Errorf("marshal service data to audit data: %w", err) } return nil } func getAuditDataFromMetadata(pl *auditpb.AuditLog, ad *loggingpb.AuditData) error { - if pl.GetMetadata() == nil { return errors.New("metadata field is nil") } mdJSON, err := pl.GetMetadata().MarshalJSON() if err != nil { - return errors.Wrap(err, "cannot marshal payload metadata") + return fmt.Errorf("marshal payload metadata: %w", err) } if err := protojson.Unmarshal(mdJSON, ad); err != nil { - return errors.Wrap(err, "cannot parse service data to Audit") + return fmt.Errorf("parse service data to Audit: %w", err) } return nil diff --git a/plugins/extractors/bigquery/bigquery.go b/plugins/extractors/bigquery/bigquery.go index cdb3a9bef..e125ed767 100755 --- a/plugins/extractors/bigquery/bigquery.go +++ b/plugins/extractors/bigquery/bigquery.go @@ -10,6 +10,7 @@ import ( "math/rand" "strings" "sync" + "time" "cloud.google.com/go/bigquery" datacatalog "cloud.google.com/go/datacatalog/apiv1" @@ -22,6 +23,9 @@ import ( "github.com/raystack/meteor/registry" "github.com/raystack/meteor/utils" "github.com/raystack/salt/log" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "google.golang.org/api/iterator" "google.golang.org/api/option" "google.golang.org/protobuf/types/known/anypb" @@ -60,6 +64,12 @@ type Exclude struct { const ( maxPageSizeDefault = 100 + + metricDatasetsDurn = "meteor.bq.client.datasets.duration" + metricTablesDurn = "meteor.bq.client.tables.duration" + metricTableDurn = "meteor.bq.client.table.duration" + metricExcludedDatasetCtr = "meteor.bq.dataset.excluded" + metricExcludedTableCtr = "meteor.bq.table.excluded" ) var sampleConfig = ` @@ -108,6 +118,12 @@ type Extractor struct { policyTagClient *datacatalog.PolicyTagManagerClient newClient NewClientFunc randFn randFn + + datasetsDurn metric.Int64Histogram + tablesDurn metric.Int64Histogram + tableDurn metric.Int64Histogram + excludedDatasetCtr metric.Int64Counter + excludedTableCtr metric.Int64Counter } type randFn func(rndSeed int64) func(int64) int64 @@ -115,6 +131,23 @@ type randFn func(rndSeed int64) func(int64) int64 type NewClientFunc func(ctx context.Context, logger log.Logger, config *Config) (*bigquery.Client, error) func New(logger log.Logger, newClient NewClientFunc, randFn randFn) *Extractor { + meter := otel.Meter("github.com/raystack/meteor/plugins/extractors/bigquery") + + datasetsDurn, err := meter.Int64Histogram(metricDatasetsDurn, metric.WithUnit("ms")) + handleOtelErr(err) + + tablesDurn, err := meter.Int64Histogram(metricTablesDurn, metric.WithUnit("ms")) + handleOtelErr(err) + + tableDurn, err := meter.Int64Histogram(metricTableDurn, metric.WithUnit("ms")) + handleOtelErr(err) + + excludedDatasetCtr, err := meter.Int64Counter(metricExcludedDatasetCtr) + handleOtelErr(err) + + excludedTableCtr, err := meter.Int64Counter(metricExcludedTableCtr) + handleOtelErr(err) + galc := auditlog.New(logger) e := &Extractor{ @@ -122,6 +155,12 @@ func New(logger log.Logger, newClient NewClientFunc, randFn randFn) *Extractor { galClient: galc, newClient: newClient, randFn: randFn, + + datasetsDurn: datasetsDurn, + tablesDurn: tablesDurn, + tableDurn: tableDurn, + excludedDatasetCtr: excludedDatasetCtr, + excludedTableCtr: excludedTableCtr, } e.BaseExtractor = plugins.NewBaseExtractor(info, &e.config) e.ScopeNotRequired = true @@ -171,21 +210,23 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error { // Fetch and iterate over datasets pager := iterator.NewPager(e.client.Datasets(ctx), pageSize, "") for { - var datasets []*bigquery.Dataset - nextToken, err := pager.NextPage(&datasets) + datasets, hasNext, err := e.fetchDatasetsNextPage(ctx, pager) if err != nil { - return fmt.Errorf("fetch dataset: %w", err) + return err } for _, ds := range datasets { if IsExcludedDataset(ds.DatasetID, e.config.Exclude.Datasets) { + e.excludedDatasetCtr.Add( + ctx, 1, metric.WithAttributes(attribute.String("bq.project_id", e.config.ProjectID)), + ) e.logger.Debug("excluding dataset from bigquery extract", "dataset_id", ds.DatasetID) continue } e.extractTable(ctx, ds, emit) } - if nextToken == "" { + if !hasNext { break } } @@ -193,6 +234,25 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error { return nil } +func (e *Extractor) fetchDatasetsNextPage(ctx context.Context, pager *iterator.Pager) (datasets []*bigquery.Dataset, hasNext bool, err error) { + defer func(start time.Time) { + attrs := []attribute.KeyValue{attribute.String("bq.project_id", e.config.ProjectID)} + if err != nil { + attrs = append(attrs, attribute.String("bq.error_code", plugins.BQErrReason(err))) + } + e.datasetsDurn.Record( + ctx, time.Since(start).Milliseconds(), metric.WithAttributes(attrs...), + ) + }(time.Now()) + + nextToken, err := pager.NextPage(&datasets) + if err != nil { + return nil, false, fmt.Errorf("fetch dataset: %w", err) + } + + return datasets, nextToken != "", nil +} + // CreateClient creates a bigquery client func CreateClient(ctx context.Context, logger log.Logger, config *Config) (*bigquery.Client, error) { if config.ServiceAccountBase64 == "" && config.ServiceAccountJSON == "" { @@ -227,8 +287,7 @@ func (e *Extractor) extractTable(ctx context.Context, ds *bigquery.Dataset, emit pager := iterator.NewPager(ds.Tables(ctx), pageSize, "") for { - var tables []*bigquery.Table - nextToken, err := pager.NextPage(&tables) + tables, hasNext, err := e.fetchTablesNextPage(ctx, ds.DatasetID, pager) if err != nil { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { break @@ -240,6 +299,10 @@ func (e *Extractor) extractTable(ctx context.Context, ds *bigquery.Dataset, emit for _, table := range tables { if IsExcludedTable(ds.DatasetID, table.TableID, e.config.Exclude.Tables) { + e.excludedTableCtr.Add(ctx, 1, metric.WithAttributes( + attribute.String("bq.project_id", e.config.ProjectID), + attribute.String("bq.dataset_id", ds.DatasetID), + )) e.logger.Debug("excluding table from bigquery extract", "dataset_id", ds.DatasetID, "table_id", table.TableID) continue } @@ -247,7 +310,7 @@ func (e *Extractor) extractTable(ctx context.Context, ds *bigquery.Dataset, emit tableFQN := table.FullyQualifiedName() e.logger.Debug("extracting table", "table", tableFQN) - tmd, err := table.Metadata(ctx) + tmd, err := e.fetchTableMetadata(ctx, table) if err != nil { e.logger.Error("failed to fetch table metadata", "err", err, "table", tableFQN) continue @@ -262,19 +325,44 @@ func (e *Extractor) extractTable(ctx context.Context, ds *bigquery.Dataset, emit emit(models.NewRecord(asset)) } - if nextToken == "" { + if !hasNext { break } } } +func (e *Extractor) fetchTablesNextPage( + ctx context.Context, datasetID string, pager *iterator.Pager, +) (tables []*bigquery.Table, hasNext bool, err error) { + defer func(start time.Time) { + attrs := []attribute.KeyValue{ + attribute.String("bq.project_id", e.config.ProjectID), + attribute.String("bq.dataset_id", datasetID), + } + if err != nil { + attrs = append(attrs, attribute.String("bq.error_code", plugins.BQErrReason(err))) + } + + e.tablesDurn.Record( + ctx, time.Since(start).Milliseconds(), metric.WithAttributes(attrs...), + ) + }(time.Now()) + + nextToken, err := pager.NextPage(&tables) + if err != nil { + return nil, false, err + } + + return tables, nextToken != "", nil +} + // Build the bigquery table metadata func (e *Extractor) buildAsset(ctx context.Context, t *bigquery.Table, md *bigquery.TableMetadata) (*v1beta2.Asset, error) { var tableStats *auditlog.TableStats if e.config.IsCollectTableUsage { // Fetch and extract logs first to build a map var errL error - tableStats, errL = e.galClient.Collect(ctx, t.TableID) + tableStats, errL = e.galClient.Collect(ctx, t) if errL != nil { e.logger.Warn("error populating table stats usage", "error", errL) } @@ -637,6 +725,25 @@ func (e *Extractor) getMaxPageSize() int { return maxPageSizeDefault } +func (e *Extractor) fetchTableMetadata(ctx context.Context, tbl *bigquery.Table) (md *bigquery.TableMetadata, err error) { + defer func(start time.Time) { + attrs := []attribute.KeyValue{ + attribute.String("bq.operation", "table.metadata"), + attribute.String("bq.project_id", tbl.ProjectID), + attribute.String("bq.dataset_id", tbl.DatasetID), + } + if err != nil { + attrs = append(attrs, attribute.String("bq.error_code", plugins.BQErrReason(err))) + } + + e.tableDurn.Record( + ctx, time.Since(start).Milliseconds(), metric.WithAttributes(attrs...), + ) + }(time.Now()) + + return tbl.Metadata(ctx) +} + // Register the extractor to catalog func init() { if err := registry.Extractors.Register("bigquery", func() plugins.Extractor { @@ -661,3 +768,9 @@ func pickFirstNonZero(ints ...int) int { } return 0 } + +func handleOtelErr(err error) { + if err != nil { + otel.Handle(err) + } +} diff --git a/plugins/util.go b/plugins/util.go index 73756d590..ceff3e12e 100644 --- a/plugins/util.go +++ b/plugins/util.go @@ -11,6 +11,7 @@ import ( "strings" "github.com/go-playground/validator/v10" + "github.com/googleapis/gax-go/v2/apierror" "github.com/mcuadros/go-defaults" "github.com/mitchellh/mapstructure" "github.com/raystack/meteor/models" @@ -124,6 +125,16 @@ func DrainBody(resp *http.Response) { _ = resp.Body.Close() } +func BQErrReason(err error) string { + reason := "UNKNOWN" + var apiErr *apierror.APIError + if errors.As(err, &apiErr) { + reason = apiErr.Reason() + } + + return reason +} + func parseBQTableFQN(fqn string) (projectID, datasetID, tableID string, err error) { // fqn is the ID of the table in projectID:datasetID.tableID format. if !strings.ContainsRune(fqn, ':') || strings.IndexRune(fqn, '.') < strings.IndexRune(fqn, ':') { From c16307c4bbde2cdd5709b3a7fcf722890b96f45b Mon Sep 17 00:00:00 2001 From: Chief-Rishab Date: Sun, 24 Sep 2023 22:03:46 +0530 Subject: [PATCH 2/2] feat: add upstream lineage dependencies for view/materialized view in bigquery extractor --- plugins/extractors/bigquery/README.md | 1 + plugins/extractors/bigquery/bigquery.go | 43 +- .../extractors/bigquery/upstream/parser.go | 106 +++++ .../bigquery/upstream/parser_test.go | 427 ++++++++++++++++++ .../extractors/bigquery/upstream/resource.go | 26 ++ 5 files changed, 601 insertions(+), 2 deletions(-) create mode 100644 plugins/extractors/bigquery/upstream/parser.go create mode 100644 plugins/extractors/bigquery/upstream/parser_test.go create mode 100644 plugins/extractors/bigquery/upstream/resource.go diff --git a/plugins/extractors/bigquery/README.md b/plugins/extractors/bigquery/README.md index 12c931d2f..64c88aa8b 100644 --- a/plugins/extractors/bigquery/README.md +++ b/plugins/extractors/bigquery/README.md @@ -16,6 +16,7 @@ source: - dataset_c.table_a max_page_size: 100 profile_column: true + build_view_lineage: true # Only one of service_account_base64 / service_account_json is needed. # If both are present, service_account_base64 takes precedence service_account_base64: _________BASE64_ENCODED_SERVICE_ACCOUNT_________________ diff --git a/plugins/extractors/bigquery/bigquery.go b/plugins/extractors/bigquery/bigquery.go index e125ed767..fc4448758 100755 --- a/plugins/extractors/bigquery/bigquery.go +++ b/plugins/extractors/bigquery/bigquery.go @@ -20,6 +20,7 @@ import ( v1beta2 "github.com/raystack/meteor/models/raystack/assets/v1beta2" "github.com/raystack/meteor/plugins" "github.com/raystack/meteor/plugins/extractors/bigquery/auditlog" + "github.com/raystack/meteor/plugins/extractors/bigquery/upstream" "github.com/raystack/meteor/registry" "github.com/raystack/meteor/utils" "github.com/raystack/salt/log" @@ -53,6 +54,7 @@ type Config struct { IsCollectTableUsage bool `json:"collect_table_usage" yaml:"collect_table_usage" mapstructure:"collect_table_usage" default:"false"` UsagePeriodInDay int64 `json:"usage_period_in_day" yaml:"usage_period_in_day" mapstructure:"usage_period_in_day" default:"7"` UsageProjectIDs []string `json:"usage_project_ids" yaml:"usage_project_ids" mapstructure:"usage_project_ids"` + BuildViewLineage bool `json:"build_view_lineage" yaml:"build_view_lineage" mapstructure:"build_view_lineage" default:"false"` } type Exclude struct { @@ -83,6 +85,7 @@ exclude: - dataset_c.table_a max_page_size: 100 include_column_profile: true +build_view_lineage: true # Only one of service_account_base64 / service_account_json is needed. # If both are present, service_account_base64 takes precedence service_account_base64: ____base64_encoded_service_account____ @@ -432,7 +435,7 @@ func (e *Extractor) buildAsset(ctx context.Context, t *bigquery.Table, md *bigqu e.logger.Warn("error creating Any struct", "error", err) } - return &v1beta2.Asset{ + asset := &v1beta2.Asset{ Urn: tableURN, Name: t.TableID, Type: "table", @@ -440,7 +443,17 @@ func (e *Extractor) buildAsset(ctx context.Context, t *bigquery.Table, md *bigqu Service: "bigquery", Data: table, Labels: md.Labels, - }, nil + } + + if e.config.BuildViewLineage && (md.Type == bigquery.ViewTable || md.Type == bigquery.MaterializedView) { + query := getViewQuery(md) + upstreamResources := getUpstreamResources(query) + asset.Lineage = &v1beta2.Lineage{ + Upstreams: upstreamResources, + } + } + + return asset, nil } // Extract table schema @@ -744,6 +757,32 @@ func (e *Extractor) fetchTableMetadata(ctx context.Context, tbl *bigquery.Table) return tbl.Metadata(ctx) } +func getViewQuery(md *bigquery.TableMetadata) string { + switch md.Type { + case bigquery.ViewTable: + return md.ViewQuery + case bigquery.MaterializedView: + return md.MaterializedView.Query + } + return "" +} + +func getUpstreamResources(query string) []*v1beta2.Resource { + upstreamDependencies := upstream.ParseTopLevelUpstreamsFromQuery(query) + uniqueUpstreamDependencies := upstream.UniqueFilterResources(upstreamDependencies) + var upstreams []*v1beta2.Resource + for _, dependency := range uniqueUpstreamDependencies { + urn := plugins.BigQueryURN(dependency.Project, dependency.Dataset, dependency.Name) + upstreams = append(upstreams, &v1beta2.Resource{ + Urn: urn, + Name: dependency.Name, + Type: "table", + Service: "bigquery", + }) + } + return upstreams +} + // Register the extractor to catalog func init() { if err := registry.Extractors.Register("bigquery", func() plugins.Extractor { diff --git a/plugins/extractors/bigquery/upstream/parser.go b/plugins/extractors/bigquery/upstream/parser.go new file mode 100644 index 000000000..fda115fdf --- /dev/null +++ b/plugins/extractors/bigquery/upstream/parser.go @@ -0,0 +1,106 @@ +package upstream + +import ( + "regexp" + "strings" +) + +type QueryParser func(query string) []Resource + +var ( + topLevelUpstreamsPattern = regexp.MustCompile("" + + "(?i)(?:FROM)\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+)\\.([\\w-]+)\\.([\\w-\\*?]+)`?" + + "|" + + "(?i)(?:JOIN)\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+)\\.([\\w-]+)\\.([\\w-]+)`?" + + "|" + + "(?i)(?:WITH)\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+)\\.([\\w-]+)\\.([\\w-]+)`?\\s+(?:AS)" + + "|" + + "(?i)(?:VIEW)\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+)\\.([\\w-]+)\\.([\\w-]+)`?" + + "|" + + "(?i)(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`([\\w-]+)\\.([\\w-]+)\\.([\\w-]+)`\\s*(?:AS)?") + + singleLineCommentsPattern = regexp.MustCompile(`(--.*)`) + multiLineCommentsPattern = regexp.MustCompile(`(((/\*)+?[\w\W]*?(\*/)+))`) + specialCommentPattern = regexp.MustCompile(`(/\*\s*(@[a-zA-Z0-9_-]+)\s*\*/)`) +) + +func ParseTopLevelUpstreamsFromQuery(query string) []Resource { + cleanedQuery := cleanQueryFromComment(query) + + resourcesFound := make(map[Resource]bool) + pseudoResources := make(map[Resource]bool) + + matches := topLevelUpstreamsPattern.FindAllStringSubmatch(cleanedQuery, -1) + + for _, match := range matches { + var projectIdx, datasetIdx, nameIdx, ignoreUpstreamIdx int + tokens := strings.Fields(match[0]) + clause := strings.ToLower(tokens[0]) + + switch clause { + case "from": + ignoreUpstreamIdx, projectIdx, datasetIdx, nameIdx = 1, 2, 3, 4 + case "join": + ignoreUpstreamIdx, projectIdx, datasetIdx, nameIdx = 5, 6, 7, 8 + case "with": + ignoreUpstreamIdx, projectIdx, datasetIdx, nameIdx = 9, 10, 11, 12 + case "view": + ignoreUpstreamIdx, projectIdx, datasetIdx, nameIdx = 13, 14, 15, 16 + default: + ignoreUpstreamIdx, projectIdx, datasetIdx, nameIdx = 17, 18, 19, 20 + } + + project := match[projectIdx] + dataset := match[datasetIdx] + name := match[nameIdx] + + if project == "" || dataset == "" || name == "" { + continue + } + + if strings.TrimSpace(match[ignoreUpstreamIdx]) == "@ignoreupstream" { + continue + } + + if clause == "view" { + continue + } + + resource := Resource{ + Project: project, + Dataset: dataset, + Name: name, + } + + if clause == "with" { + pseudoResources[resource] = true + } else { + resourcesFound[resource] = true + } + } + + var output []Resource + + for resource := range resourcesFound { + if pseudoResources[resource] { + continue + } + output = append(output, resource) + } + + return output +} + +func cleanQueryFromComment(query string) string { + cleanedQuery := singleLineCommentsPattern.ReplaceAllString(query, "") + + matches := multiLineCommentsPattern.FindAllString(query, -1) + for _, match := range matches { + if specialCommentPattern.MatchString(match) { + continue + } + cleanedQuery = strings.ReplaceAll(cleanedQuery, match, "") + } + + return cleanedQuery +} diff --git a/plugins/extractors/bigquery/upstream/parser_test.go b/plugins/extractors/bigquery/upstream/parser_test.go new file mode 100644 index 000000000..f388d3e92 --- /dev/null +++ b/plugins/extractors/bigquery/upstream/parser_test.go @@ -0,0 +1,427 @@ +package upstream_test + +import ( + "testing" + + "github.com/raystack/meteor/plugins/extractors/bigquery/upstream" + "github.com/stretchr/testify/assert" +) + +func TestParseTopLevelUpstreamsFromQuery(t *testing.T) { + t.Run("parse test", func(t *testing.T) { + type _set map[upstream.Resource]bool + newSetFn := func(resources ...upstream.Resource) _set { + set := make(_set) + + for _, r := range resources { + set[r] = true + } + + return set + } + + testCases := []struct { + Name string + InputQuery string + ExpectedSources []upstream.Resource + }{ + { + Name: "simple query", + InputQuery: "select * from data-engineering.testing.table1", + ExpectedSources: []upstream.Resource{ + { + Project: "data-engineering", + Dataset: "testing", + Name: "table1", + }, + }, + }, + { + Name: "simple query with hyphenated table name", + InputQuery: "select * from data-engineering.testing.table_name-1", + ExpectedSources: []upstream.Resource{ + { + Project: "data-engineering", + Dataset: "testing", + Name: "table_name-1", + }, + }, + }, + { + Name: "simple query with quotes", + InputQuery: "select * from `data-engineering.testing.table1`", + ExpectedSources: []upstream.Resource{ + { + Project: "data-engineering", + Dataset: "testing", + Name: "table1", + }, + }, + }, + { + Name: "simple query without project name", + InputQuery: "select * from testing.table1", + ExpectedSources: []upstream.Resource{}, + }, + { + Name: "simple query with simple join", + InputQuery: "select * from data-engineering.testing.table1 join data-engineering.testing.table2 on some_field", + ExpectedSources: []upstream.Resource{ + { + Project: "data-engineering", + Dataset: "testing", + Name: "table1", + }, + { + Project: "data-engineering", + Dataset: "testing", + Name: "table2", + }, + }, + }, + { + Name: "simple query with outer join", + InputQuery: "select * from data-engineering.testing.table1 outer join data-engineering.testing.table2 on some_field", + ExpectedSources: []upstream.Resource{ + { + Project: "data-engineering", + Dataset: "testing", + Name: "table1", + }, + { + Project: "data-engineering", + Dataset: "testing", + Name: "table2", + }, + }, + }, + { + Name: "subquery", + InputQuery: "select * from (select order_id from data-engineering.testing.orders)", + ExpectedSources: []upstream.Resource{ + { + Project: "data-engineering", + Dataset: "testing", + Name: "orders", + }, + }, + }, + { + Name: "`with` clause + simple query", + InputQuery: "with `information.foo.bar` as (select * from `data-engineering.testing.data`) select * from `information.foo.bar`", + ExpectedSources: []upstream.Resource{ + { + Project: "data-engineering", + Dataset: "testing", + Name: "data", + }, + }, + }, + { + Name: "`with` clause with missing project name", + InputQuery: "with `foo.bar` as (select * from `data-engineering.testing.data`) select * from `foo.bar`", + ExpectedSources: []upstream.Resource{ + { + Project: "data-engineering", + Dataset: "testing", + Name: "data", + }, + }, + }, + { + Name: "project name with dashes", + InputQuery: "select * from `foo-bar.baz.data`", + ExpectedSources: []upstream.Resource{ + { + Project: "foo-bar", + Dataset: "baz", + Name: "data", + }, + }, + }, + { + Name: "dataset and project name with dashes", + InputQuery: "select * from `foo-bar.bar-baz.data", + ExpectedSources: []upstream.Resource{ + { + Project: "foo-bar", + Dataset: "bar-baz", + Name: "data", + }, + }, + }, + { + Name: "`with` clause + join", + InputQuery: "with dedup_source as (select * from `project.fire.fly`) select * from dedup_source join `project.maximum.overdrive` on dedup_source.left = `project.maximum.overdrive`.right", + ExpectedSources: []upstream.Resource{ + { + Project: "project", + Dataset: "fire", + Name: "fly", + }, + { + Project: "project", + Dataset: "maximum", + Name: "overdrive", + }, + { + Project: "project", + Dataset: "maximum", + Name: "overdrive", + }, + }, + }, + { + Name: "double `with` + pseudoreference", + InputQuery: "with s1 as (select * from internal.pseudo.ref), with internal.pseudo.ref as (select * from `project.another.name`) select * from s1", + ExpectedSources: []upstream.Resource{ + { + Project: "project", + Dataset: "another", + Name: "name", + }, + }, + }, + { + Name: "simple query that ignores from upstream", + InputQuery: "select * from /* @ignoreupstream */ data-engineering.testing.table1", + ExpectedSources: []upstream.Resource{}, + }, + { + Name: "simple query that ignores from upstream with quotes", + InputQuery: "select * from /* @ignoreupstream */ `data-engineering.testing.table1`", + ExpectedSources: []upstream.Resource{}, + }, + { + Name: "simple query with simple join that ignores from upstream", + InputQuery: "select * from /* @ignoreupstream */ data-engineering.testing.table1 join data-engineering.testing.table2 on some_field", + ExpectedSources: []upstream.Resource{ + { + Project: "data-engineering", + Dataset: "testing", + Name: "table2", + }, + }, + }, + { + Name: "simple query with simple join that has comments but does not ignores upstream", + InputQuery: "select * from /* */ data-engineering.testing.table1 join data-engineering.testing.table2 on some_field", + ExpectedSources: []upstream.Resource{ + { + Project: "data-engineering", + Dataset: "testing", + Name: "table1", + }, + { + Project: "data-engineering", + Dataset: "testing", + Name: "table2", + }, + }, + }, + { + Name: "simple query with simple join that ignores upstream of join", + InputQuery: "select * from data-engineering.testing.table1 join /* @ignoreupstream */ data-engineering.testing.table2 on some_field", + ExpectedSources: []upstream.Resource{ + { + Project: "data-engineering", + Dataset: "testing", + Name: "table1", + }, + }, + }, + { + Name: "simple query with an ignoreupstream for an alias should still consider it as dependency", + InputQuery: ` + WITH my_temp_table AS ( + SELECT id, name FROM data-engineering.testing.an_upstream_table + ) + SELECT id FROM /* @ignoreupstream */ my_temp_table + `, + ExpectedSources: []upstream.Resource{ + { + Project: "data-engineering", + Dataset: "testing", + Name: "an_upstream_table", + }, + }, + }, + { + Name: "simple query should have alias in the actual name rather than with alias", + InputQuery: ` + WITH my_temp_table AS ( + SELECT id, name FROM /* @ignoreupstream */ data-engineering.testing.an_upstream_table + ) + SELECT id FROM my_temp_table + `, + ExpectedSources: []upstream.Resource{}, + }, + { + Name: "simple query with simple join that ignores upstream of join", + InputQuery: "WITH my_temp_table AS ( SELECT id, name FROM /* @ignoreupstream */ data-engineering.testing.an_upstream_table ) SELECT id FROM /* @ignoreupstream */ my_temp_table", + ExpectedSources: []upstream.Resource{}, + }, + { + Name: "simple query with another query inside comment", + InputQuery: ` + select * from data-engineering.testing.tableABC + -- select * from data-engineering.testing.table1 join data-engineering.testing.table2 on some_field + `, + ExpectedSources: []upstream.Resource{ + { + Project: "data-engineering", + Dataset: "testing", + Name: "tableABC", + }, + }, + }, + { + Name: "query with another query inside comment and a join that uses helper", + InputQuery: ` + select * from data-engineering.testing.tableABC + /* select * from data-engineering.testing.table1 join data-engineering.testing.table2 on some_field */ + join /* @ignoreupstream */ data-engineering.testing.table2 on some_field + `, + ExpectedSources: []upstream.Resource{ + { + Project: "data-engineering", + Dataset: "testing", + Name: "tableABC", + }, + }, + }, + { + Name: "ignore `create view` in ddl query", + InputQuery: ` + create view data-engineering.testing.tableABC + select * + from + data-engineering.testing.tableDEF, + `, + ExpectedSources: []upstream.Resource{ + { + Project: "data-engineering", + Dataset: "testing", + Name: "tableDEF", + }, + }, + }, + { + Name: "one or more sources are stated together under from clauses", + InputQuery: ` + select * + from + pseudo_table1, + ` + "`data-engineering.testing.tableABC`," + ` + pseudo_table2 as pt2 + ` + "`data-engineering.testing.tableDEF`," + ` as backup_table, + /* @ignoreupstream */ data-engineering.testing.tableGHI as ignored_table, + `, + ExpectedSources: []upstream.Resource{ + { + Project: "data-engineering", + Dataset: "testing", + Name: "tableABC", + }, + { + Project: "data-engineering", + Dataset: "testing", + Name: "tableDEF", + }, + }, + }, + { + Name: "one or more sources are from wild-card query", + InputQuery: ` + select * + from data-engineering.testing.tableA* + + select * + from ` + + "`data-engineering.testing.tableB*`" + ` + + select * + from + /*@ignoreupstream*/ data-engineering.testing.tableC* + `, + ExpectedSources: []upstream.Resource{ + { + Project: "data-engineering", + Dataset: "testing", + Name: "tableA*", + }, + { + Project: "data-engineering", + Dataset: "testing", + Name: "tableB*", + }, + }, + }, + { + Name: "ignore characters after -- comment", + InputQuery: ` + -- sources + -- data-engineering.testing.table_a + -- + -- related + -- ` + "`data-engineering.testing.table_b`" + ` + -- from data-engineering.testing.table_c + + select * + from data-engineering.testing.table_a + join /* @ignoreupstream */ data-engineering.testing.table_d + `, + ExpectedSources: []upstream.Resource{ + { + Project: "data-engineering", + Dataset: "testing", + Name: "table_a", + }, + }, + }, + { + Name: "ignore characters within multi-line comment /* (separate line) */", + InputQuery: ` + /* + this the following relates to this table: + + with ` + "`data-engineering.testing.tabel_b`" + ` + from data-engineering.testing.tabel_c + */ + + + select * + from + data-engineering.testing.table_a + join + data-engineering.testing.table_d + join + /* @ignoreupstream */ data-engineering.testing.table_e + `, + ExpectedSources: []upstream.Resource{ + { + Project: "data-engineering", + Dataset: "testing", + Name: "table_a", + }, + { + Project: "data-engineering", + Dataset: "testing", + Name: "table_d", + }, + }, + }, + } + + for _, test := range testCases { + t.Run(test.Name, func(t *testing.T) { + actualSources := upstream.ParseTopLevelUpstreamsFromQuery(test.InputQuery) + + actualSet := newSetFn(actualSources...) + expectedSet := newSetFn(test.ExpectedSources...) + + assert.Equal(t, expectedSet, actualSet) + }) + } + }) +} diff --git a/plugins/extractors/bigquery/upstream/resource.go b/plugins/extractors/bigquery/upstream/resource.go new file mode 100644 index 000000000..9f8ce7340 --- /dev/null +++ b/plugins/extractors/bigquery/upstream/resource.go @@ -0,0 +1,26 @@ +package upstream + +type Resource struct { + Project string + Dataset string + Name string +} + +func (r Resource) URN() string { + return r.Project + "." + r.Dataset + "." + r.Name +} + +func UniqueFilterResources(input []Resource) []Resource { + ref := make(map[string]Resource) + for _, i := range input { + key := i.URN() + ref[key] = i + } + + var output []Resource + for _, r := range ref { + output = append(output, r) + } + + return output +}