Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 25 additions & 15 deletions plugins/extractors/bigquery/auditlog/auditlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Config struct {
const advancedFilterTemplate = `protoPayload.methodName="jobservice.jobcompleted" AND ` +
`resource.type="bigquery_resource" AND NOT ` +
`protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.query.query:(INFORMATION_SCHEMA OR __TABLES__) AND ` +
`timestamp >= "%s" AND timestamp < "%s"`
`timestamp >= "%s" AND timestamp < "%s" AND %s`

type AuditLog struct {
logger log.Logger
Expand All @@ -40,17 +40,22 @@ func New(logger log.Logger) *AuditLog {
}
}

func (l *AuditLog) Init(ctx context.Context, cfg Config) (err error) {
if len(cfg.UsageProjectIDs) == 0 {
cfg.UsageProjectIDs = []string{cfg.ProjectID}
func (l *AuditLog) Init(ctx context.Context, opts ...InitOption) (err error) {
for _, opt := range opts {
opt(l)
}
l.config = cfg
l.client, err = l.createClient(ctx)
if err != nil {
err = errors.Wrap(err, "failed to create logadmin client")
return

if len(l.config.UsageProjectIDs) == 0 {
l.config.UsageProjectIDs = []string{l.config.ProjectID}
}

if l.client == nil {
l.client, err = l.createClient(ctx)
if err != nil {
err = errors.Wrap(err, "failed to create logadmin client")
return
}
}
return
}

Expand All @@ -68,10 +73,15 @@ func (l *AuditLog) createClient(ctx context.Context) (client *logadmin.Client, e
return
}

func (l *AuditLog) Collect(ctx context.Context) (tableStats *TableStats, err error) {
func (l *AuditLog) Collect(ctx context.Context, tableID string) (tableStats *TableStats, err error) {
if l.client == nil {
err = errors.New("auditlog client is nil")
return
}

tableStats = NewTableStats()

filter := l.buildFilter()
filter := l.buildFilter(tableID)
it := l.client.Entries(ctx,
logadmin.ProjectIDs(l.config.UsageProjectIDs),
logadmin.Filter(filter))
Expand All @@ -91,7 +101,7 @@ func (l *AuditLog) Collect(ctx context.Context) (tableStats *TableStats, err err

logData, errF := parsePayload(entry.Payload)
if errF != nil {
l.logger.Warn("error parsing LogEntry payload", "err", errF, "payload", entry.Payload)
l.logger.Warn("error parsing LogEntry payload", "err", errF)
continue
}

Expand All @@ -103,7 +113,7 @@ func (l *AuditLog) Collect(ctx context.Context) (tableStats *TableStats, err err
return
}

func (l *AuditLog) buildFilter() string {
func (l *AuditLog) buildFilter(tableID string) string {

timeNow := time.Now().UTC()
dayDuration := time.Duration(24*l.config.UsagePeriodInDay) * time.Hour
Expand All @@ -112,7 +122,7 @@ func (l *AuditLog) buildFilter() string {
timeNowFormatted := timeNow.Format(time.RFC3339)
timeFromFormatted := timeFrom.Format(time.RFC3339)

return fmt.Sprintf(advancedFilterTemplate, timeFromFormatted, timeNowFormatted)
return fmt.Sprintf(advancedFilterTemplate, timeFromFormatted, timeNowFormatted, tableID)
}

func parsePayload(payload interface{}) (ld *LogData, err error) {
Expand All @@ -125,7 +135,7 @@ func parsePayload(payload interface{}) (ld *LogData, err error) {
}

if errPB := getAuditData(pl, ad); errPB != nil {
err = errors.Wrap(err, "failed to get audit data from metadata")
err = errors.Wrap(errPB, "failed to get audit data from metadata")
return
}

Expand Down
78 changes: 56 additions & 22 deletions plugins/extractors/bigquery/auditlog/auditlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"testing"

"cloud.google.com/go/logging/logadmin"
"github.com/odpf/meteor/plugins"
"github.com/odpf/meteor/test/utils"
"github.com/stretchr/testify/assert"
Expand All @@ -15,29 +16,51 @@ import (
)

func TestInit(t *testing.T) {
t.Run("should return error if failed to init client", func(t *testing.T) {
t.Run("should return error if config is wrong to init client", func(t *testing.T) {
la := New(utils.Logger)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := la.Init(ctx, Config{
ProjectID: "---",
ServiceAccountJSON: "---",
})
err := la.Init(ctx,
InitWithConfig(Config{
ProjectID: "---",
ServiceAccountJSON: "---",
}),
)

assert.EqualError(t, err, "failed to create logadmin client: client is nil, failed initiating client")
})

t.Run("should not return error if init client is success", func(t *testing.T) {
t.Run("should not return error invalid config if config is not wrong", func(t *testing.T) {
la := New(utils.Logger)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := la.Init(ctx, Config{})
err := la.Init(ctx)

assert.NotEqual(t, plugins.InvalidConfigError{}, err)
})

t.Run("should return no error init succeed", func(t *testing.T) {
la := New(utils.Logger)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := la.Init(ctx, InitWithClient(&logadmin.Client{}))

assert.Nil(t, err)
})
}

func TestBuildFilter(t *testing.T) {
var (
la = &AuditLog{}
tableID = "table-id"
)

filterQuery := la.buildFilter(tableID)

assert.Contains(t, filterQuery, tableID)
}

func TestGetAuditData(t *testing.T) {
func TestParsePayload(t *testing.T) {
t.Run("should parse with service data if service data exists", func(t *testing.T) {
loggingData, err := anypb.New(&loggingpb.AuditData{
JobCompletedEvent: &loggingpb.JobCompletedEvent{
Expand All @@ -62,37 +85,48 @@ func TestGetAuditData(t *testing.T) {
ServiceData: loggingData,
}

auditData := &loggingpb.AuditData{}
err = getAuditData(auditLog, auditData)
ld, err := parsePayload(auditLog)

assert.Nil(t, err)
assert.NotNil(t, auditData)
assert.NotEmpty(t, auditData)
assert.NotNil(t, ld.AuditData)
assert.NotEmpty(t, ld.AuditData)
})

t.Run("should parse with metadata if service data not exists and metadata exist", func(t *testing.T) {
loggingData, err := structpb.NewStruct(map[string]interface{}{
"jobCompletedEvent": nil,
"jobCompletedEvent": map[string]interface{}{
"event_name": "name",
"job": map[string]interface{}{
"job_statistics": map[string]interface{}{
"referenced_tables": []interface{}{map[string]interface{}{
"project_id": "project_id",
}},
},
"job_status": map[string]interface{}{
"state": "DONE",
},
},
},
})

require.Nil(t, err)

auditLog := &auditpb.AuditLog{
Metadata: loggingData,
}

auditData := &loggingpb.AuditData{}
err = getAuditData(auditLog, auditData)
ld, err := parsePayload(auditLog)

assert.Nil(t, err)
assert.NotNil(t, auditData)
assert.NotEmpty(t, auditData)
assert.NotNil(t, ld.AuditData)
assert.NotEmpty(t, ld.AuditData)
})

t.Run("should return error if neither service data nor metadata field exist", func(t *testing.T) {
auditLog := &auditpb.AuditLog{}

auditData := &loggingpb.AuditData{}
err := getAuditData(auditLog, auditData)
assert.EqualError(t, err, "metadata field is nil")
assert.Empty(t, auditData)
ld, err := parsePayload(auditLog)

assert.EqualError(t, err, "failed to get audit data from metadata: metadata field is nil")
assert.Nil(t, ld)
})
}
17 changes: 17 additions & 0 deletions plugins/extractors/bigquery/auditlog/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package auditlog

import "cloud.google.com/go/logging/logadmin"

type InitOption func(*AuditLog)

func InitWithClient(c *logadmin.Client) func(*AuditLog) {
return func(al *AuditLog) {
al.client = c
}
}

func InitWithConfig(cfg Config) func(*AuditLog) {
return func(al *AuditLog) {
al.config = cfg
}
}
45 changes: 24 additions & 21 deletions plugins/extractors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,10 @@ usage_period_in_day: 7`

// Extractor manages the communication with the bigquery service
type Extractor struct {
logger log.Logger
client *bigquery.Client
config Config
galClient *auditlog.AuditLog
tableStats *auditlog.TableStats
logger log.Logger
client *bigquery.Client
config Config
galClient *auditlog.AuditLog
}

func New(logger log.Logger) *Extractor {
Expand Down Expand Up @@ -104,13 +103,15 @@ func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{})
}

if e.config.IsCollectTableUsage {
errL := e.galClient.Init(ctx, auditlog.Config{
ProjectID: e.config.ProjectID,
ServiceAccountJSON: e.config.ServiceAccountJSON,
IsCollectTableUsage: e.config.IsCollectTableUsage,
UsagePeriodInDay: e.config.UsagePeriodInDay,
UsageProjectIDs: e.config.UsageProjectIDs,
})
errL := e.galClient.Init(ctx,
auditlog.InitWithConfig(auditlog.Config{
ProjectID: e.config.ProjectID,
ServiceAccountJSON: e.config.ServiceAccountJSON,
IsCollectTableUsage: e.config.IsCollectTableUsage,
UsagePeriodInDay: e.config.UsagePeriodInDay,
UsageProjectIDs: e.config.UsageProjectIDs,
}),
)
if errL != nil {
e.logger.Error("failed to create google audit log client", "err", errL)
}
Expand All @@ -121,14 +122,6 @@ func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{})

// Extract checks if the table is valid and extracts the table schema
func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error) {
if e.config.IsCollectTableUsage {
// Fetch and extract logs first to build a map
ts, errL := e.galClient.Collect(ctx)
e.tableStats = ts
if errL != nil {
e.logger.Warn("error populating table stats usage", "error", errL)
}
}

// Fetch and iterate over datasets
it := e.client.Datasets(ctx)
Expand Down Expand Up @@ -181,10 +174,20 @@ func (e *Extractor) extractTable(ctx context.Context, ds *bigquery.Dataset, emit

// Build the bigquery table metadata
func (e *Extractor) buildTable(ctx context.Context, t *bigquery.Table, md *bigquery.TableMetadata) *assetsv1beta1.Table {
var tableStats *auditlog.TableStats
if e.config.IsCollectTableUsage {
// Fetch and extract logs first to build a map
var errL error
tableStats, errL = e.galClient.Collect(ctx, t.TableID)
if errL != nil {
e.logger.Warn("error populating table stats usage", "error", errL)
}
}

tableFQN := t.FullyQualifiedName()
tableURN := models.TableURN("bigquery", t.ProjectID, t.DatasetID, t.TableID)

tableProfile := e.buildTableProfile(tableURN)
tableProfile := e.buildTableProfile(tableURN, tableStats)

var partitionField string
if md.TimePartitioning != nil {
Expand Down
15 changes: 9 additions & 6 deletions plugins/extractors/bigquery/profile.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
package bigquery

import assetsv1beta1 "github.com/odpf/meteor/models/odpf/assets/v1beta1"
import (
assetsv1beta1 "github.com/odpf/meteor/models/odpf/assets/v1beta1"
"github.com/odpf/meteor/plugins/extractors/bigquery/auditlog"
)

func (e *Extractor) buildTableProfile(tableURN string) (tp *assetsv1beta1.TableProfile) {
func (e *Extractor) buildTableProfile(tableURN string, tableStats *auditlog.TableStats) (tp *assetsv1beta1.TableProfile) {
var tableUsage int64
var commonJoins []*assetsv1beta1.Join
var filterConditions []string

if e.config.IsCollectTableUsage && e.tableStats != nil {
if e.config.IsCollectTableUsage && tableStats != nil {
// table usage
tableUsage = e.tableStats.TableUsage[tableURN]
tableUsage = tableStats.TableUsage[tableURN]

// common join
if jdMapping, exist := e.tableStats.JoinDetail[tableURN]; exist {
if jdMapping, exist := tableStats.JoinDetail[tableURN]; exist {
for joinedTableURN, jd := range jdMapping {
var joinConditions []string
for jc := range jd.Conditions {
Expand All @@ -27,7 +30,7 @@ func (e *Extractor) buildTableProfile(tableURN string) (tp *assetsv1beta1.TableP
}

// filter conditions
if filterMapping, exist := e.tableStats.FilterConditions[tableURN]; exist {
if filterMapping, exist := tableStats.FilterConditions[tableURN]; exist {
for filterExpression := range filterMapping {
filterConditions = append(filterConditions, filterExpression)
}
Expand Down
Loading