diff --git a/plugins/extractors/bigquery/auditlog/auditlog.go b/plugins/extractors/bigquery/auditlog/auditlog.go index 2e8211944..a44914192 100644 --- a/plugins/extractors/bigquery/auditlog/auditlog.go +++ b/plugins/extractors/bigquery/auditlog/auditlog.go @@ -26,7 +26,7 @@ type Config struct { const advancedFilterTemplate = `protoPayload.methodName="jobservice.jobcompleted" AND ` + `resource.type="bigquery_resource" AND NOT ` + `protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.query.query:(INFORMATION_SCHEMA OR __TABLES__) AND ` + - `timestamp >= "%s" AND timestamp < "%s"` + `timestamp >= "%s" AND timestamp < "%s" AND %s` type AuditLog struct { logger log.Logger @@ -40,17 +40,22 @@ func New(logger log.Logger) *AuditLog { } } -func (l *AuditLog) Init(ctx context.Context, cfg Config) (err error) { - if len(cfg.UsageProjectIDs) == 0 { - cfg.UsageProjectIDs = []string{cfg.ProjectID} +func (l *AuditLog) Init(ctx context.Context, opts ...InitOption) (err error) { + for _, opt := range opts { + opt(l) } - l.config = cfg - l.client, err = l.createClient(ctx) - if err != nil { - err = errors.Wrap(err, "failed to create logadmin client") - return + + if len(l.config.UsageProjectIDs) == 0 { + l.config.UsageProjectIDs = []string{l.config.ProjectID} } + if l.client == nil { + l.client, err = l.createClient(ctx) + if err != nil { + err = errors.Wrap(err, "failed to create logadmin client") + return + } + } return } @@ -68,10 +73,15 @@ func (l *AuditLog) createClient(ctx context.Context) (client *logadmin.Client, e return } -func (l *AuditLog) Collect(ctx context.Context) (tableStats *TableStats, err error) { +func (l *AuditLog) Collect(ctx context.Context, tableID string) (tableStats *TableStats, err error) { + if l.client == nil { + err = errors.New("auditlog client is nil") + return + } + tableStats = NewTableStats() - filter := l.buildFilter() + filter := l.buildFilter(tableID) it := l.client.Entries(ctx, logadmin.ProjectIDs(l.config.UsageProjectIDs), logadmin.Filter(filter)) @@ -91,7 +101,7 @@ func (l *AuditLog) Collect(ctx context.Context) (tableStats *TableStats, err err logData, errF := parsePayload(entry.Payload) if errF != nil { - l.logger.Warn("error parsing LogEntry payload", "err", errF, "payload", entry.Payload) + l.logger.Warn("error parsing LogEntry payload", "err", errF) continue } @@ -103,7 +113,7 @@ func (l *AuditLog) Collect(ctx context.Context) (tableStats *TableStats, err err return } -func (l *AuditLog) buildFilter() string { +func (l *AuditLog) buildFilter(tableID string) string { timeNow := time.Now().UTC() dayDuration := time.Duration(24*l.config.UsagePeriodInDay) * time.Hour @@ -112,7 +122,7 @@ func (l *AuditLog) buildFilter() string { timeNowFormatted := timeNow.Format(time.RFC3339) timeFromFormatted := timeFrom.Format(time.RFC3339) - return fmt.Sprintf(advancedFilterTemplate, timeFromFormatted, timeNowFormatted) + return fmt.Sprintf(advancedFilterTemplate, timeFromFormatted, timeNowFormatted, tableID) } func parsePayload(payload interface{}) (ld *LogData, err error) { @@ -125,7 +135,7 @@ func parsePayload(payload interface{}) (ld *LogData, err error) { } if errPB := getAuditData(pl, ad); errPB != nil { - err = errors.Wrap(err, "failed to get audit data from metadata") + err = errors.Wrap(errPB, "failed to get audit data from metadata") return } diff --git a/plugins/extractors/bigquery/auditlog/auditlog_test.go b/plugins/extractors/bigquery/auditlog/auditlog_test.go index 3a34e6b5e..68d4ab36e 100644 --- a/plugins/extractors/bigquery/auditlog/auditlog_test.go +++ b/plugins/extractors/bigquery/auditlog/auditlog_test.go @@ -4,6 +4,7 @@ import ( "context" "testing" + "cloud.google.com/go/logging/logadmin" "github.com/odpf/meteor/plugins" "github.com/odpf/meteor/test/utils" "github.com/stretchr/testify/assert" @@ -15,29 +16,51 @@ import ( ) func TestInit(t *testing.T) { - t.Run("should return error if failed to init client", func(t *testing.T) { + t.Run("should return error if config is wrong to init client", func(t *testing.T) { la := New(utils.Logger) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err := la.Init(ctx, Config{ - ProjectID: "---", - ServiceAccountJSON: "---", - }) + err := la.Init(ctx, + InitWithConfig(Config{ + ProjectID: "---", + ServiceAccountJSON: "---", + }), + ) assert.EqualError(t, err, "failed to create logadmin client: client is nil, failed initiating client") }) - t.Run("should not return error if init client is success", func(t *testing.T) { + t.Run("should not return error invalid config if config is not wrong", func(t *testing.T) { la := New(utils.Logger) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err := la.Init(ctx, Config{}) + err := la.Init(ctx) assert.NotEqual(t, plugins.InvalidConfigError{}, err) }) + + t.Run("should return no error init succeed", func(t *testing.T) { + la := New(utils.Logger) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err := la.Init(ctx, InitWithClient(&logadmin.Client{})) + + assert.Nil(t, err) + }) +} + +func TestBuildFilter(t *testing.T) { + var ( + la = &AuditLog{} + tableID = "table-id" + ) + + filterQuery := la.buildFilter(tableID) + + assert.Contains(t, filterQuery, tableID) } -func TestGetAuditData(t *testing.T) { +func TestParsePayload(t *testing.T) { t.Run("should parse with service data if service data exists", func(t *testing.T) { loggingData, err := anypb.New(&loggingpb.AuditData{ JobCompletedEvent: &loggingpb.JobCompletedEvent{ @@ -62,37 +85,48 @@ func TestGetAuditData(t *testing.T) { ServiceData: loggingData, } - auditData := &loggingpb.AuditData{} - err = getAuditData(auditLog, auditData) + ld, err := parsePayload(auditLog) + assert.Nil(t, err) - assert.NotNil(t, auditData) - assert.NotEmpty(t, auditData) + assert.NotNil(t, ld.AuditData) + assert.NotEmpty(t, ld.AuditData) }) t.Run("should parse with metadata if service data not exists and metadata exist", func(t *testing.T) { loggingData, err := structpb.NewStruct(map[string]interface{}{ - "jobCompletedEvent": nil, + "jobCompletedEvent": map[string]interface{}{ + "event_name": "name", + "job": map[string]interface{}{ + "job_statistics": map[string]interface{}{ + "referenced_tables": []interface{}{map[string]interface{}{ + "project_id": "project_id", + }}, + }, + "job_status": map[string]interface{}{ + "state": "DONE", + }, + }, + }, }) - require.Nil(t, err) auditLog := &auditpb.AuditLog{ Metadata: loggingData, } - auditData := &loggingpb.AuditData{} - err = getAuditData(auditLog, auditData) + ld, err := parsePayload(auditLog) + assert.Nil(t, err) - assert.NotNil(t, auditData) - assert.NotEmpty(t, auditData) + assert.NotNil(t, ld.AuditData) + assert.NotEmpty(t, ld.AuditData) }) t.Run("should return error if neither service data nor metadata field exist", func(t *testing.T) { auditLog := &auditpb.AuditLog{} - auditData := &loggingpb.AuditData{} - err := getAuditData(auditLog, auditData) - assert.EqualError(t, err, "metadata field is nil") - assert.Empty(t, auditData) + ld, err := parsePayload(auditLog) + + assert.EqualError(t, err, "failed to get audit data from metadata: metadata field is nil") + assert.Nil(t, ld) }) } diff --git a/plugins/extractors/bigquery/auditlog/options.go b/plugins/extractors/bigquery/auditlog/options.go new file mode 100644 index 000000000..5db3fa7bb --- /dev/null +++ b/plugins/extractors/bigquery/auditlog/options.go @@ -0,0 +1,17 @@ +package auditlog + +import "cloud.google.com/go/logging/logadmin" + +type InitOption func(*AuditLog) + +func InitWithClient(c *logadmin.Client) func(*AuditLog) { + return func(al *AuditLog) { + al.client = c + } +} + +func InitWithConfig(cfg Config) func(*AuditLog) { + return func(al *AuditLog) { + al.config = cfg + } +} diff --git a/plugins/extractors/bigquery/bigquery.go b/plugins/extractors/bigquery/bigquery.go index 87bdc87d0..eab822990 100644 --- a/plugins/extractors/bigquery/bigquery.go +++ b/plugins/extractors/bigquery/bigquery.go @@ -61,11 +61,10 @@ usage_period_in_day: 7` // Extractor manages the communication with the bigquery service type Extractor struct { - logger log.Logger - client *bigquery.Client - config Config - galClient *auditlog.AuditLog - tableStats *auditlog.TableStats + logger log.Logger + client *bigquery.Client + config Config + galClient *auditlog.AuditLog } func New(logger log.Logger) *Extractor { @@ -104,13 +103,15 @@ func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{}) } if e.config.IsCollectTableUsage { - errL := e.galClient.Init(ctx, auditlog.Config{ - ProjectID: e.config.ProjectID, - ServiceAccountJSON: e.config.ServiceAccountJSON, - IsCollectTableUsage: e.config.IsCollectTableUsage, - UsagePeriodInDay: e.config.UsagePeriodInDay, - UsageProjectIDs: e.config.UsageProjectIDs, - }) + errL := e.galClient.Init(ctx, + auditlog.InitWithConfig(auditlog.Config{ + ProjectID: e.config.ProjectID, + ServiceAccountJSON: e.config.ServiceAccountJSON, + IsCollectTableUsage: e.config.IsCollectTableUsage, + UsagePeriodInDay: e.config.UsagePeriodInDay, + UsageProjectIDs: e.config.UsageProjectIDs, + }), + ) if errL != nil { e.logger.Error("failed to create google audit log client", "err", errL) } @@ -121,14 +122,6 @@ func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{}) // Extract checks if the table is valid and extracts the table schema func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error) { - if e.config.IsCollectTableUsage { - // Fetch and extract logs first to build a map - ts, errL := e.galClient.Collect(ctx) - e.tableStats = ts - if errL != nil { - e.logger.Warn("error populating table stats usage", "error", errL) - } - } // Fetch and iterate over datasets it := e.client.Datasets(ctx) @@ -181,10 +174,20 @@ func (e *Extractor) extractTable(ctx context.Context, ds *bigquery.Dataset, emit // Build the bigquery table metadata func (e *Extractor) buildTable(ctx context.Context, t *bigquery.Table, md *bigquery.TableMetadata) *assetsv1beta1.Table { + var tableStats *auditlog.TableStats + if e.config.IsCollectTableUsage { + // Fetch and extract logs first to build a map + var errL error + tableStats, errL = e.galClient.Collect(ctx, t.TableID) + if errL != nil { + e.logger.Warn("error populating table stats usage", "error", errL) + } + } + tableFQN := t.FullyQualifiedName() tableURN := models.TableURN("bigquery", t.ProjectID, t.DatasetID, t.TableID) - tableProfile := e.buildTableProfile(tableURN) + tableProfile := e.buildTableProfile(tableURN, tableStats) var partitionField string if md.TimePartitioning != nil { diff --git a/plugins/extractors/bigquery/profile.go b/plugins/extractors/bigquery/profile.go index 2875e9ef0..d50618498 100644 --- a/plugins/extractors/bigquery/profile.go +++ b/plugins/extractors/bigquery/profile.go @@ -1,18 +1,21 @@ package bigquery -import assetsv1beta1 "github.com/odpf/meteor/models/odpf/assets/v1beta1" +import ( + assetsv1beta1 "github.com/odpf/meteor/models/odpf/assets/v1beta1" + "github.com/odpf/meteor/plugins/extractors/bigquery/auditlog" +) -func (e *Extractor) buildTableProfile(tableURN string) (tp *assetsv1beta1.TableProfile) { +func (e *Extractor) buildTableProfile(tableURN string, tableStats *auditlog.TableStats) (tp *assetsv1beta1.TableProfile) { var tableUsage int64 var commonJoins []*assetsv1beta1.Join var filterConditions []string - if e.config.IsCollectTableUsage && e.tableStats != nil { + if e.config.IsCollectTableUsage && tableStats != nil { // table usage - tableUsage = e.tableStats.TableUsage[tableURN] + tableUsage = tableStats.TableUsage[tableURN] // common join - if jdMapping, exist := e.tableStats.JoinDetail[tableURN]; exist { + if jdMapping, exist := tableStats.JoinDetail[tableURN]; exist { for joinedTableURN, jd := range jdMapping { var joinConditions []string for jc := range jd.Conditions { @@ -27,7 +30,7 @@ func (e *Extractor) buildTableProfile(tableURN string) (tp *assetsv1beta1.TableP } // filter conditions - if filterMapping, exist := e.tableStats.FilterConditions[tableURN]; exist { + if filterMapping, exist := tableStats.FilterConditions[tableURN]; exist { for filterExpression := range filterMapping { filterConditions = append(filterConditions, filterExpression) } diff --git a/plugins/extractors/bigquery/profile_test.go b/plugins/extractors/bigquery/profile_test.go index 1e5601edb..10a570899 100644 --- a/plugins/extractors/bigquery/profile_test.go +++ b/plugins/extractors/bigquery/profile_test.go @@ -15,13 +15,15 @@ import ( func TestBuildTableProfile(t *testing.T) { tableURN := models.TableURN("bigquery", "project1", "dataset1", "table1") t.Run("table profile usage related fields are empty if usage collection is disabled", func(t *testing.T) { + + var tableStats *auditlog.TableStats extr := &Extractor{ config: Config{ IsCollectTableUsage: false, }, } - tp := extr.buildTableProfile(tableURN) + tp := extr.buildTableProfile(tableURN, tableStats) assert.Empty(t, tp.UsageCount) assert.Empty(t, tp.Joins) @@ -32,43 +34,43 @@ func TestBuildTableProfile(t *testing.T) { config: Config{ IsCollectTableUsage: true, }, - tableStats: nil, } - tp := extr.buildTableProfile(tableURN) + tp := extr.buildTableProfile(tableURN, nil) assert.Empty(t, tp.UsageCount) assert.Empty(t, tp.Joins) }) t.Run("table profile usage related fields are populated if table stats is not nil and usage collection is enabled", func(t *testing.T) { - extr := &Extractor{ - config: Config{ - IsCollectTableUsage: true, + tableStats := &auditlog.TableStats{ + TableUsage: map[string]int64{ + models.TableURN("bigquery", "project1", "dataset1", "table1"): 5, + models.TableURN("bigquery", "project2", "dataset1", "table1"): 3, + models.TableURN("bigquery", "project3", "dataset1", "table1"): 1, }, - tableStats: &auditlog.TableStats{ - TableUsage: map[string]int64{ - models.TableURN("bigquery", "project1", "dataset1", "table1"): 5, - models.TableURN("bigquery", "project2", "dataset1", "table1"): 3, - models.TableURN("bigquery", "project3", "dataset1", "table1"): 1, - }, - JoinDetail: map[string]map[string]auditlog.JoinDetail{ - models.TableURN("bigquery", "project1", "dataset1", "table1"): { - models.TableURN("bigquery", "project2", "dataset1", "table1"): auditlog.JoinDetail{ - Usage: 1, - }, - models.TableURN("bigquery", "project3", "dataset1", "table1"): auditlog.JoinDetail{ - Usage: 3, - }, - models.TableURN("bigquery", "project4", "dataset1", "table1"): auditlog.JoinDetail{ - Usage: 1, - }, + JoinDetail: map[string]map[string]auditlog.JoinDetail{ + models.TableURN("bigquery", "project1", "dataset1", "table1"): { + models.TableURN("bigquery", "project2", "dataset1", "table1"): auditlog.JoinDetail{ + Usage: 1, + }, + models.TableURN("bigquery", "project3", "dataset1", "table1"): auditlog.JoinDetail{ + Usage: 3, + }, + models.TableURN("bigquery", "project4", "dataset1", "table1"): auditlog.JoinDetail{ + Usage: 1, }, }, }, } - tp := extr.buildTableProfile(tableURN) + extr := &Extractor{ + config: Config{ + IsCollectTableUsage: true, + }, + } + + tp := extr.buildTableProfile(tableURN, tableStats) assert.EqualValues(t, 5, tp.UsageCount) assert.Contains(t, tp.Joins, &assetsv1beta1.Join{