Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugins/extractors/bigquery/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ source:
- dataset_c.table_a
max_page_size: 100
profile_column: true
build_view_lineage: true
# Only one of service_account_base64 / service_account_json is needed.
# If both are present, service_account_base64 takes precedence
service_account_base64: _________BASE64_ENCODED_SERVICE_ACCOUNT_________________
Expand Down
121 changes: 73 additions & 48 deletions plugins/extractors/bigquery/auditlog/auditlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@ import (
"fmt"
"time"

"cloud.google.com/go/bigquery"
"cloud.google.com/go/logging/logadmin"
"github.com/pkg/errors"
"github.com/raystack/meteor/plugins"
"github.com/raystack/salt/log"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
auditpb "google.golang.org/genproto/googleapis/cloud/audit"
Expand All @@ -23,24 +28,38 @@ type Config struct {
UsageProjectIDs []string
}

const advancedFilterTemplate = `protoPayload.methodName="jobservice.jobcompleted" AND ` +
`resource.type="bigquery_resource" AND NOT ` +
`protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.query.query:(INFORMATION_SCHEMA OR __TABLES__) AND ` +
`timestamp >= "%s" AND timestamp < "%s" AND %s`
const (
advancedFilterTemplate = `protoPayload.methodName="jobservice.jobcompleted" AND ` +
`resource.type="bigquery_resource" AND NOT ` +
`protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.query.query:(INFORMATION_SCHEMA OR __TABLES__) AND ` +
`timestamp >= "%s" AND timestamp < "%s" AND %s`

metricTableDurn = "meteor.bq.client.table.duration"
)

type AuditLog struct {
logger log.Logger
client *logadmin.Client
config Config

histogram metric.Int64Histogram
}

func New(logger log.Logger) *AuditLog {
h, err := otel.Meter("github.com/raystack/meteor/plugins/extractors/bigquery").
Int64Histogram(metricTableDurn, metric.WithUnit("ms"))
if err != nil {
otel.Handle(err)
}

return &AuditLog{
logger: logger,

histogram: h,
}
}

func (l *AuditLog) Init(ctx context.Context, opts ...InitOption) (err error) {
func (l *AuditLog) Init(ctx context.Context, opts ...InitOption) error {
for _, opt := range opts {
opt(l)
}
Expand All @@ -50,71 +69,78 @@ func (l *AuditLog) Init(ctx context.Context, opts ...InitOption) (err error) {
}

if l.client == nil {
var err error
l.client, err = l.createClient(ctx)
if err != nil {
err = errors.Wrap(err, "failed to create logadmin client")
return
return fmt.Errorf("create logadmin client: %w", err)
}
}
return

return nil
}

func (l *AuditLog) createClient(ctx context.Context) (client *logadmin.Client, err error) {
func (l *AuditLog) createClient(ctx context.Context) (*logadmin.Client, error) {
if l.config.ServiceAccountJSON == "" {
l.logger.Info("credentials are not specified, creating logadmin using default credentials...")
client, err = logadmin.NewClient(ctx, l.config.ProjectID)
return
return logadmin.NewClient(ctx, l.config.ProjectID)
}

client, err = logadmin.NewClient(ctx, l.config.ProjectID, option.WithCredentialsJSON([]byte(l.config.ServiceAccountJSON)))
if err != nil {
err = errors.New("client is nil, failed initiating client")
}
return
return logadmin.NewClient(ctx, l.config.ProjectID, option.WithCredentialsJSON([]byte(l.config.ServiceAccountJSON)))
}

func (l *AuditLog) Collect(ctx context.Context, tableID string) (tableStats *TableStats, err error) {
func (l *AuditLog) Collect(ctx context.Context, tbl *bigquery.Table) (stats *TableStats, err error) {
defer func(start time.Time) {
attrs := []attribute.KeyValue{
attribute.String("bq.operation", "table.audit_logs"),
attribute.String("bq.project_id", tbl.ProjectID),
attribute.String("bq.dataset_id", tbl.DatasetID),
}
if err != nil {
attrs = append(attrs, attribute.String("bq.error_code", plugins.BQErrReason(err)))
}

l.histogram.Record(
ctx, time.Since(start).Milliseconds(), metric.WithAttributes(attrs...),
)
}(time.Now())

if l.client == nil {
err = errors.New("auditlog client is nil")
return
return nil, errors.New("auditlog client is nil")
}

tableStats = NewTableStats()

filter := l.buildFilter(tableID)
filter := l.buildFilter(tbl.TableID)
it := l.client.Entries(ctx,
logadmin.ProjectIDs(l.config.UsageProjectIDs),
logadmin.Filter(filter))

l.logger.Info("getting logs in these projects", "projects", l.config.UsageProjectIDs)
l.logger.Info("getting logs with the filter", "filter", filter)

stats = NewTableStats()
for {
entry, errF := it.Next()
if errF == iterator.Done {
entry, err := it.Next()
if errors.Is(err, iterator.Done) {
break
}
if errF != nil {
err = errors.Wrap(errF, "error iterating logEntries")
break
if err != nil {
return nil, fmt.Errorf("error iterating logEntries: %w", err)
}

logData, errF := parsePayload(entry.Payload)
if errF != nil {
l.logger.Warn("error parsing LogEntry payload", "err", errF)
logData, err := parsePayload(entry.Payload)
if err != nil {
l.logger.Warn("error parsing LogEntry payload", "err", err)
continue
}

if errF := tableStats.Populate(logData); errF != nil {
if errF := stats.Populate(logData); errF != nil {
l.logger.Warn("error populating logdata", "err", errF)
continue
}
}
return
return stats, nil
}

func (l *AuditLog) buildFilter(tableID string) string {

timeNow := time.Now().UTC()
dayDuration := time.Duration(24*l.config.UsagePeriodInDay) * time.Hour
timeFrom := timeNow.Add(-1 * dayDuration)
Expand All @@ -125,23 +151,23 @@ func (l *AuditLog) buildFilter(tableID string) string {
return fmt.Sprintf(advancedFilterTemplate, timeFromFormatted, timeNowFormatted, tableID)
}

func parsePayload(payload interface{}) (ld *LogData, err error) {

ad := &loggingpb.AuditData{}
func parsePayload(payload interface{}) (*LogData, error) {
pl, ok := payload.(*auditpb.AuditLog)
if !ok {
err = errors.New("cannot parse payload to AuditLog")
return
return nil, errors.New("parse payload to AuditLog")
}

if errPB := getAuditData(pl, ad); errPB != nil {
err = errors.Wrap(errPB, "failed to get audit data from metadata")
return
var ad loggingpb.AuditData
if err := getAuditData(pl, &ad); err != nil {
return nil, fmt.Errorf("get audit data from metadata: %w", err)
}

ld = &LogData{ad}
err = ld.validateAuditData()
return
ld := &LogData{&ad}
if err := ld.validateAuditData(); err != nil {
return nil, err
}

return ld, nil
}

func getAuditData(pl *auditpb.AuditLog, ad *loggingpb.AuditData) error {
Expand All @@ -160,24 +186,23 @@ func getAuditData(pl *auditpb.AuditLog, ad *loggingpb.AuditData) error {
func getAuditDataFromServiceData(pl *auditpb.AuditLog, ad *loggingpb.AuditData) error {
//nolint:staticcheck
if err := pl.GetServiceData().UnmarshalTo(ad); err != nil {
return errors.Wrap(err, "failed to marshal service data to audit data")
return fmt.Errorf("marshal service data to audit data: %w", err)
}
return nil
}

func getAuditDataFromMetadata(pl *auditpb.AuditLog, ad *loggingpb.AuditData) error {

if pl.GetMetadata() == nil {
return errors.New("metadata field is nil")
}

mdJSON, err := pl.GetMetadata().MarshalJSON()
if err != nil {
return errors.Wrap(err, "cannot marshal payload metadata")
return fmt.Errorf("marshal payload metadata: %w", err)
}

if err := protojson.Unmarshal(mdJSON, ad); err != nil {
return errors.Wrap(err, "cannot parse service data to Audit")
return fmt.Errorf("parse service data to Audit: %w", err)
}

return nil
Expand Down
Loading